267 lines
7.7 KiB
Python
267 lines
7.7 KiB
Python
import re
|
|
import random
|
|
from datetime import datetime
|
|
import tweepy
|
|
import requests
|
|
import os
|
|
import pinhook.plugin
|
|
import json
|
|
|
|
TWITTER_RE = re.compile(r"(?:(?:www.)?twitter.com\/(?:[-_a-zA-Z0-9]+)\/status\/)([0-9]+)", re.I)
|
|
COMMAND_RE = re.compile(r"(?:!tw(?:itter|eet)) ?(\w+) (.*?)", re.I)
|
|
|
|
with open('config.json') as f:
|
|
config = json.load(f)
|
|
|
|
twitter_acct = config["twitter"]["username"]
|
|
consumer_key = config["twitter"]["consumer_key"]
|
|
consumer_secret = config["twitter"]["consumer_secret"]
|
|
oauth_token = config["twitter"]["access_token"]
|
|
oauth_secret = config["twitter"]["access_secret"]
|
|
|
|
print("twitter loaded")
|
|
|
|
if not all((consumer_key, consumer_secret, oauth_token, oauth_secret)):
|
|
print("missing twitter keys!");
|
|
tw_api = None
|
|
else:
|
|
print("tweepy loades")
|
|
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
|
|
auth.set_access_token(oauth_token, oauth_secret)
|
|
tw_api = tweepy.API(auth)
|
|
|
|
|
|
@pinhook.plugin.listener('twitter_url')
|
|
def twitter_url(msg):
|
|
m = re.match(TWITTER_RE, msg.text)
|
|
if m:
|
|
print("twitter: match!")
|
|
else:
|
|
print("twitter: not match")
|
|
return
|
|
|
|
print("found twitter URL");
|
|
# Find the tweet ID from the URL
|
|
tweet_id = m.group(1)
|
|
|
|
if msg.channel == "#discord":
|
|
return
|
|
|
|
# Get the tweet using the tweepy API
|
|
if tw_api is None:
|
|
return
|
|
|
|
try:
|
|
tweet = tw_api.get_status(tweet_id, tweet_mode='extended')
|
|
user = tweet.user
|
|
except tweepy.error.TweepError:
|
|
return
|
|
|
|
# Format the return the text of the tweet
|
|
text = " ".join(tweet._json['full_text'].split())
|
|
|
|
if user.verified:
|
|
prefix = "\u2713"
|
|
else:
|
|
prefix = ""
|
|
|
|
return "{}@\x02{}\x02 ({}): {}".format(prefix, user.screen_name, user.name, text.decode('utf-8'))
|
|
|
|
|
|
|
|
def twitter_get(text):
|
|
"""twitter get <user> [n] -- Gets last/[n]th tweet from <user>"""
|
|
|
|
if tw_api is None:
|
|
return "This command requires a twitter API key."
|
|
|
|
if re.match(r'^\d+$', text):
|
|
# user is getting a tweet by id
|
|
|
|
try:
|
|
# get tweet by id
|
|
tweet = tw_api.get_status(tweet_id, tweet_mode='extended')
|
|
tweet = " ".join(tweet._json['full_text'].split())
|
|
except tweepy.error.TweepError as e:
|
|
if "404" in e.reason:
|
|
return "Could not find tweet."
|
|
else:
|
|
return "Error: {}".format(e.reason)
|
|
|
|
user = tweet.user
|
|
|
|
elif re.match(r'^\w{1,15}$', text) or re.match(r'^\w{1,15}\s+\d+$', text):
|
|
# user is getting a tweet by name
|
|
|
|
if text.find(' ') == -1:
|
|
username = text
|
|
tweet_number = 0
|
|
else:
|
|
username, tweet_number = text.split()
|
|
tweet_number = int(tweet_number) - 1
|
|
|
|
if tweet_number > 200:
|
|
return "This command can only find the last \x02200\x02 tweets."
|
|
|
|
try:
|
|
# try to get user by username
|
|
user = tw_api.get_user(username)
|
|
except tweepy.error.TweepError as e:
|
|
if "404" in e.reason:
|
|
return "Could not find user."
|
|
else:
|
|
return "Error: {}".format(e.reason)
|
|
|
|
# get the users tweets
|
|
user_timeline = tw_api.user_timeline(id=user.id, count=tweet_number + 1, tweet_mode="extended")
|
|
|
|
# if the timeline is empty, return an error
|
|
if not user_timeline:
|
|
return "The user \x02{}\x02 has no tweets.".format(user.screen_name)
|
|
|
|
# grab the newest tweet from the users timeline
|
|
try:
|
|
tweet = user_timeline[tweet_number]
|
|
except IndexError:
|
|
tweet_count = len(user_timeline)
|
|
return "The user \x02{}\x02 only has \x02{}\x02 tweets.".format(user.screen_name, tweet_count)
|
|
|
|
elif re.match(r'^#\w+$', text):
|
|
# user is searching by hashtag
|
|
search = tw_api.search(text)
|
|
|
|
if not search:
|
|
return "No tweets found."
|
|
|
|
tweet = random.choice(search)
|
|
user = tweet.user
|
|
else:
|
|
# ???
|
|
return "Invalid Input"
|
|
|
|
# Format the return the text of the tweet
|
|
text = " ".join(tweet._json['full_text'].split())
|
|
|
|
if user.verified:
|
|
prefix = "\u2713"
|
|
else:
|
|
prefix = ""
|
|
|
|
return "{}@\x02{}\x02 ({}): {}".format(prefix, user.screen_name, user.name, text)
|
|
|
|
def twpost(text):
|
|
"""twitter post <text> -- tweet the text <text>"""
|
|
if tw_api is None:
|
|
return
|
|
|
|
try:
|
|
user = tw_api.update_status(text)
|
|
|
|
except tweepy.error.TweepError as e:
|
|
if "404" in e.reason:
|
|
return "Could not send tweet."
|
|
else:
|
|
return "Error: {}".format(e.reason)
|
|
|
|
|
|
def twuser(text):
|
|
"""twitter info <user> -- Get info on the Twitter user <user>"""
|
|
|
|
if tw_api is None:
|
|
return
|
|
|
|
try:
|
|
# try to get user by username
|
|
user = tw_api.get_user(text)
|
|
except tweepy.error.TweepError as e:
|
|
if "404" in e.reason:
|
|
return "Could not find user."
|
|
else:
|
|
return "Error: {}".format(e.reason)
|
|
|
|
if user.verified:
|
|
prefix = "\u2713"
|
|
else:
|
|
prefix = ""
|
|
|
|
if user.location:
|
|
loc_str = " is located in \x02{}\x02 and".format(user.location)
|
|
else:
|
|
loc_str = ""
|
|
|
|
if user.description:
|
|
desc_str = " The users description is \"{}\"".format(user.description)
|
|
else:
|
|
desc_str = ""
|
|
|
|
return "{}@\x02{}\x02 ({}){} has \x02{:,}\x02 tweets and \x02{:,}\x02 followers.{}" \
|
|
"".format(prefix, user.screen_name, user.name, loc_str, user.statuses_count, user.followers_count,
|
|
desc_str)
|
|
|
|
|
|
def tweet_image(inp):
|
|
inp = inp.split(' ')
|
|
url = inp[0]
|
|
message = " ".join(inp[1:])
|
|
filename = 'temp.jpg'
|
|
request = requests.get(url, stream=True)
|
|
if request.status_code == 200:
|
|
with open(filename, 'wb') as image:
|
|
for chunk in request:
|
|
image.write(chunk)
|
|
|
|
tw_api.update_with_media(filename, status=message)
|
|
os.remove(filename)
|
|
return "Tweet posted at https://twitter.com/" + twitter_acct
|
|
else:
|
|
return "Unable to load image"
|
|
|
|
|
|
#@pinhook.plugin.register('tweet')
|
|
def tweet(inp):
|
|
try:
|
|
tw_api.update_status(status=inp.replace('~~', '\n'))
|
|
return "Tweet posted at https://twitter.com/" + twitter_acct
|
|
except tweepy.error.TweepError:
|
|
return "Tweet exceeds 280 characters [" + str(len(inp.replace("~~"," ")))+"]"
|
|
|
|
#@pinhook.plugin.register('tsearch')
|
|
def search_tweet(inp):
|
|
results = tw_api.search(q=inp)
|
|
for result in results:
|
|
return result.text
|
|
|
|
#@pinhook.plugin.register('ttrends')
|
|
def get_trends(inp):
|
|
results = tw_api.trends_place(23424977)
|
|
data = results[0]
|
|
trends= data['trends']
|
|
names = [trend['name'] for trend in trends]
|
|
trendsName = ', '.join(names)
|
|
return "US Twitter Trends: " + trendsName
|
|
|
|
|
|
def twitter(text):
|
|
'''!twitter [show <user> [#]|info <user> |post <tweet>|image <image url> <tweet>|search <keyword>|trends]'''
|
|
inp = text.split()
|
|
if len(inp):
|
|
if inp[0]=='show' or inp[0]=='get':
|
|
return twitter_get(" ".join(inp[1:]))
|
|
elif inp[0]=='post' or inp[0]=='send':
|
|
return tweet(" ".join(inp[1:]))
|
|
elif inp[0]=='image' or inp[0]=='pic':
|
|
return tweet_image(" ".join(inp[1:]))
|
|
elif inp[0] == 'info':
|
|
return twuser(" ".join(inp[1:]))
|
|
elif inp[0]=='search':
|
|
return search_tweet(" ".join(inp[1:]))
|
|
elif inp[0]=='trends':
|
|
return get_trends(" ".join(inp[1:]))
|
|
return "!twitter [show <user> [#]|info <user> |post <tweet> |image <image url> <tweet> |search <keyword>|trends]"
|
|
|
|
@pinhook.plugin.register('!twitter')
|
|
def twitter_cmd(msg):
|
|
print("twitter_cmd run!")
|
|
return pinhook.plugin.message(twitter(msg.arg))
|
|
|