import gzip import http import json import logging import re import time import urllib import urllib.request from typing import Any, Dict, Optional from cachetools import cached, TTLCache HTTPRequest = urllib.request.Request HTTPResponse = http.client.HTTPResponse HTTPException = http.client.HTTPException logger = logging.getLogger(__name__) def now(): """Get current epoch in seconds.""" return int(time.time()) class NoSuchUser(Exception): """Raised when an unknown user is queried.""" class APIClient: """Client for the Twitch API app endpoints.""" # Expression that matches allowed logins CHANNEL_FILTER = re.compile("^[a-zA-Z0-9_]{2,25}$") def __init__(self, client_id: str = "", secret: str = "", retries: int = 3): """ Create a Twitch API client. See for details. :param client_id: client ID of the app :param secret: app secret :param retries: number of times to retry each request in case of failure """ self.client_id: str = client_id self.secret: str = secret self.retries: int = 3 self.oauth_token: str = "" self.oauth_expire_epoch: int = 0 def _send_with_retry(self, request: HTTPRequest) -> HTTPResponse: """ Send an HTTP request and retry in case of failure. The number of retries is configured by `self.retries`. :param request: request to try sending :returns: response sent by the server :throws HTTPException: if the number of retries is exceeded """ retries = self.retries last_err = HTTPException() while retries: try: return urllib.request.urlopen(request, timeout=3) except HTTPException as err: logger.warning("HTTP error: %s", err) retries -= 1 last_err = err raise last_err def authorize(self) -> bool: """ Refresh the current OAuth app access token if needed. This uses the Twitch OAuth client credentials flow. See: :returns: true if the OAuth app access token was refreshed, false if the existing one was still valid :throws HTTPException: if the request fails """ if now() < self.oauth_expire_epoch: return False logger.debug("Refreshing the OAuth app access token") payload = { "client_id": self.client_id, "client_secret": self.secret, "grant_type": "client_credentials", } request = HTTPRequest( url="https://id.twitch.tv/oauth2/token", data=urllib.parse.urlencode(payload).encode("utf-8"), method="POST", ) http_response = self._send_with_retry(request) response = json.loads(http_response.read().decode("utf-8")) self.oauth_token = response["access_token"] self.oauth_expire_epoch = now() + int(response["expires_in"]) return True def _query( self, url: str, method: str = "GET", data: Optional[Dict[str, str]] = None ) -> Any: """ Low-level method to make an authenticated query to the API. :param url: URL to query :param method: HTTP method to use :param data: payload dictionary to send :returns: JSON data :throws HTTPException: if the query fails """ self.authorize() logger.debug("Querying %s %s %s", method, url, data) headers = { "Authorization": f"Bearer {self.oauth_token}", "Client-Id": self.client_id, "Accept-Encoding": "gzip", } payload = data if data is not None else {} request = HTTPRequest( url=f"{url}?{urllib.parse.urlencode(payload)}", headers=headers, method=method, ) http_response = self._send_with_retry(request) if http_response.info().get("Content-Encoding") == "gzip": return json.loads(gzip.decompress(http_response.read())) else: return json.loads(http_response.read()) @cached(cache=TTLCache(maxsize=1000, ttl=24 * 60 * 60)) def user(self, login: str) -> Any: """ Get information about a user. :param login: user login :returns: user information :throws HTTPException: if the query fails :throws NoSuchUser: if the user doesn’t exist """ if not self.CHANNEL_FILTER.match(login): raise NoSuchUser(f"Login '{login}' is invalid") response = self._query( url="https://api.twitch.tv/helix/users", method="GET", data={"login": login}, ) assert type(response) == dict if "data" not in response or not response["data"]: raise NoSuchUser(f"User '{login}' does not exist") assert type(response["data"]) == list assert type(response["data"][0]) == dict return response["data"][0] @cached(cache=TTLCache(maxsize=1000, ttl=10 * 60)) def videos(self, channel_id: str) -> Any: """ Get the list of videos from a channel. :param channel_id: channel ID :returns: list of videos :throws HTTPException: if the query fails :throws RuntimeError: if the server response is malformed """ response = self._query( url="https://api.twitch.tv/helix/videos", method="GET", data={"user_id": channel_id}, ) if "data" not in response: raise RuntimeError("Unexpected response from Twitch API") return response["data"] @cached(cache=TTLCache(maxsize=1000, ttl=10 * 60)) def stream(self, channel_id: str) -> Optional[Any]: """ Get the information about the stream currently active on a channel. :param channel_id: channel ID :returns: stream information or None :throws HTTPException: if the query fails :throws RuntimeError: if the server response is malformed """ response = self._query( url="https://api.twitch.tv/helix/streams", method="GET", data={"user_id": channel_id}, ) if "data" not in response or not response["data"]: return None if response["data"][0]["type"] != "live": return None return response["data"][0]