implemented new helix twitch API support

This commit is contained in:
Mattia Di Eleuterio 2021-08-16 14:11:54 +02:00
parent 9470cc1fd5
commit 70b27b87eb
1 changed files with 81 additions and 39 deletions

View File

@ -14,33 +14,68 @@
# limitations under the License. # limitations under the License.
# #
from cachetools import cached, TTLCache, LRUCache
from feedformatter import Feed
from flask import abort, Flask, request from flask import abort, Flask, request
import urllib from io import BytesIO
import json from os import environ
import datetime import datetime
import gzip
import time
import json
import logging import logging
import re import re
from os import environ import urllib
from feedformatter import Feed
from cachetools import cached, TTLCache, LRUCache
from io import BytesIO
import gzip
VOD_URL_TEMPLATE = 'https://api.twitch.tv/kraken/channels/%s/videos?broadcast_type=archive,highlight,upload&limit=10' VOD_URL_TEMPLATE = 'https://api.twitch.tv/helix/videos?user_id=%s&type=all'
USERID_URL_TEMPLATE = 'https://api.twitch.tv/kraken/users?login=%s' USERID_URL_TEMPLATE = 'https://api.twitch.tv/helix/users?login=%s'
VODCACHE_LIFETIME = 10 * 60 VODCACHE_LIFETIME = 10 * 60
USERIDCACHE_LIFETIME = 24 * 60 * 60 USERIDCACHE_LIFETIME = 24 * 60 * 60
CHANNEL_FILTER = re.compile("^[a-zA-Z0-9_]{2,25}$") CHANNEL_FILTER = re.compile("^[a-zA-Z0-9_]{2,25}$")
TWITCH_CLIENT_ID = environ.get("TWITCH_CLIENT_ID") TWITCH_CLIENT_ID = environ.get("TWITCH_CLIENT_ID")
TWITCH_SECRET = environ.get("TWITCH_SECRET")
TWITCH_OAUTH_TOKEN = ""
TWITCH_OAUTH_EXPIRE_EPOCH = 0
logging.basicConfig(level=logging.DEBUG if environ.get('DEBUG') else logging.INFO) logging.basicConfig(level=logging.DEBUG if environ.get('DEBUG') else logging.INFO)
if not TWITCH_CLIENT_ID: if not TWITCH_CLIENT_ID:
raise Exception("Twitch API client id is not set.") raise Exception("Twitch API client id is not set.")
if not TWITCH_SECRET:
raise Exception("Twitch API secret env variable not set.")
app = Flask(__name__) app = Flask(__name__)
def authorize():
global TWITCH_OAUTH_TOKEN
global TWITCH_OAUTH_EXPIRE_EPOCH
# return if token has not expired
if (TWITCH_OAUTH_EXPIRE_EPOCH >= round(time.time())):
return
logging.debug("requesting a new oauth token")
data = {
'client_id': TWITCH_CLIENT_ID,
'client_secret': TWITCH_SECRET,
'grant_type': 'client_credentials',
}
url = 'https://id.twitch.tv/oauth2/token'
request = urllib.request.Request(url, data=urllib.parse.urlencode(data).encode("utf-8"), method='POST')
retries = 0
while retries < 3:
try:
result = urllib.request.urlopen(request, timeout=3)
r = json.loads(result.read().decode("utf-8"))
TWITCH_OAUTH_TOKEN = r['access_token']
TWITCH_OAUTH_EXPIRE_EPOCH = int(r['expires_in']) + round(time.time())
logging.debug("oauth token aquired")
return
except Exception as e:
logging.warning("Fetch exception caught: %s" % e)
retries += 1
abort(503)
@app.route('/vod/<string:channel>', methods=['GET', 'HEAD']) @app.route('/vod/<string:channel>', methods=['GET', 'HEAD'])
def vod(channel): def vod(channel):
@ -63,13 +98,13 @@ def get_inner(channel, add_live=True):
if not userid_json: if not userid_json:
abort(404) abort(404)
(channel_display_name, channel_id) = extract_userid(json.loads(userid_json)) (channel_display_name, channel_id) = extract_userid(json.loads(userid_json)['data'][0])
channel_json = fetch_vods(channel_id) channel_json = fetch_vods(channel_id)
if not channel_json: if not channel_json:
abort(404) abort(404)
decoded_json = json.loads(channel_json) decoded_json = json.loads(channel_json)['data']
rss_data = construct_rss(channel, decoded_json, channel_display_name, add_live) rss_data = construct_rss(channel, decoded_json, channel_display_name, add_live)
headers = {'Content-Type': 'application/rss+xml'} headers = {'Content-Type': 'application/rss+xml'}
@ -91,10 +126,13 @@ def fetch_vods(channel_id):
def fetch_json(id, url_template): def fetch_json(id, url_template):
#update the oauth token
authorize()
url = url_template % id url = url_template % id
headers = { headers = {
'Accept': 'application/vnd.twitchtv.v5+json', 'Authorization': 'Bearer '+TWITCH_OAUTH_TOKEN,
'Client-ID': TWITCH_CLIENT_ID, 'Client-Id': TWITCH_CLIENT_ID,
'Accept-Encoding': 'gzip' 'Accept-Encoding': 'gzip'
} }
request = urllib.request.Request(url, headers=headers) request = urllib.request.Request(url, headers=headers)
@ -114,13 +152,9 @@ def fetch_json(id, url_template):
def extract_userid(user_info): def extract_userid(user_info):
userlist = user_info.get('users')
if not userlist:
logging.info('No such user found.')
abort(404)
# Get the first id in the list # Get the first id in the list
userid = userlist[0].get('_id') userid = user_info['id']
username = userlist[0].get('display_name') username = user_info['display_name']
if username and userid: if username and userid:
return username, userid return username, userid
else: else:
@ -140,30 +174,38 @@ def construct_rss(channel_name, vods_info, display_name, add_live=True):
# Create an item # Create an item
try: try:
if vods_info['videos']: if vods_info:
for vod in vods_info['videos']: for vod in vods_info:
item = {} item = {}
if vod["status"] == "recording":
if not add_live: # @madiele: in twitch new API the current stream now it's not bundled in the same request
continue # maybe to be re-implemented later on
link = "http://www.twitch.tv/%s" % channel_name
item["title"] = "%s - LIVE" % vod['title'] #if vod["status"] == "recording":
item["category"] = "live" # if not add_live:
else: # continue
link = vod['url'] # link = "http://www.twitch.tv/%s" % channel_name
item["title"] = vod['title'] # item["title"] = "%s - LIVE" % vod['title']
item["category"] = vod['broadcast_type'] # item["category"] = "live"
#else:
link = vod['url']
item["title"] = vod['title']
item["category"] = vod['type']
item["link"] = link item["link"] = link
item["description"] = "<a href=\"%s\"><img src=\"%s\" /></a>" % (link, vod['preview']['large']) item["description"] = "<a href=\"%s\"><img src=\"%s\" /></a>" % (link, vod['thumbnail_url'].replace("%{width}", "512").replace("%{height}","288"))
if vod.get('game'):
item["description"] += "<br/>" + vod['game'] #@madiele: for some reason the new API does not have the game field anymore...
if vod.get('description_html'): #if vod.get('game'):
item["description"] += "<br/>" + vod['description_html'] # item["description"] += "<br/>" + vod['game']
if vod.get('description'):
item["description"] += "<br/>" + vod['description']
d = datetime.datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ') d = datetime.datetime.strptime(vod['created_at'], '%Y-%m-%dT%H:%M:%SZ')
item["pubDate"] = d.timetuple() item["pubDate"] = d.timetuple()
item["guid"] = vod['_id'] item["guid"] = vod['id']
if vod["status"] == "recording": # To show a different news item when recording is over #if vod["status"] == "recording": # To show a different news item when recording is over
item["guid"] += "_live" # item["guid"] += "_live"
feed.items.append(item) feed.items.append(item)
except KeyError as e: except KeyError as e:
logging.warning('Issue with json: %s\nException: %s' % (vods_info, e)) logging.warning('Issue with json: %s\nException: %s' % (vods_info, e))