import gzip import http import json import logging import re import time import urllib import urllib.request from typing import Any, Dict, Optional from cachetools import cached, TTLCache from ..util import send_with_retry HTTPRequest = urllib.request.Request HTTPResponse = http.client.HTTPResponse HTTPException = http.client.HTTPException logger = logging.getLogger(__name__) def now(): """Get current epoch in seconds.""" return int(time.time()) class NoSuchUser(Exception): """Raised when an unknown user is queried.""" class APIClient: """Client for the Twitch API app endpoints.""" # Expression that matches allowed logins CHANNEL_FILTER = re.compile("^[a-zA-Z0-9_]{2,25}$") def __init__(self, client_id: str = "", secret: str = "", retries: int = 3): """ Create a Twitch API client. See for details. :param client_id: client ID of the app :param secret: app secret :param retries: number of times to retry each request in case of failure """ self.client_id: str = client_id self.secret: str = secret self.retries = retries self.oauth_token: str = "" self.oauth_expire_epoch: int = 0 def authorize(self) -> bool: """ Refresh the current OAuth app access token if needed. This uses the Twitch OAuth client credentials flow. See for details. :returns: true if the OAuth app access token was refreshed, false if the existing one was still valid :throws HTTPException: if the request fails """ if now() < self.oauth_expire_epoch: return False logger.debug("Refreshing the OAuth app access token") payload = { "client_id": self.client_id, "client_secret": self.secret, "grant_type": "client_credentials", } request = HTTPRequest( url="https://id.twitch.tv/oauth2/token", data=urllib.parse.urlencode(payload).encode("utf-8"), method="POST", ) http_response = send_with_retry(request, self.retries) response = json.loads(http_response.read().decode("utf-8")) self.oauth_token = response["access_token"] self.oauth_expire_epoch = now() + int(response["expires_in"]) return True def _query( self, url: str, method: str = "GET", data: Optional[Dict[str, str]] = None ) -> Any: """ Low-level method to make an authenticated query to the API. :param url: URL to query :param method: HTTP method to use :param data: payload dictionary to send :returns: JSON data :throws HTTPException: if the query fails """ self.authorize() logger.debug("Querying %s %s %s", method, url, data) headers = { "Authorization": f"Bearer {self.oauth_token}", "Client-Id": self.client_id, "Accept-Encoding": "gzip", } payload = data if data is not None else {} request = HTTPRequest( url=f"{url}?{urllib.parse.urlencode(payload)}", headers=headers, method=method, ) http_response = send_with_retry(request, self.retries) if http_response.info().get("Content-Encoding") == "gzip": return json.loads(gzip.decompress(http_response.read())) else: return json.loads(http_response.read()) @cached(cache=TTLCache(maxsize=1000, ttl=24 * 60 * 60)) def user(self, login: str) -> Any: """ Get information about a user. See for details. :param login: user login :returns: user information :throws HTTPException: if the query fails :throws NoSuchUser: if the user doesn’t exist """ if not self.CHANNEL_FILTER.match(login): raise NoSuchUser(f"Login '{login}' is invalid") response = self._query( url="https://api.twitch.tv/helix/users", method="GET", data={"login": login}, ) if "data" not in response or not response["data"]: raise NoSuchUser(f"User '{login}' does not exist") return response["data"][0] @cached(cache=TTLCache(maxsize=1000, ttl=10 * 60)) def videos(self, channel_id: str) -> Any: """ Get the list of videos from a channel. See for details. :param channel_id: channel ID :returns: list of videos :throws HTTPException: if the query fails :throws RuntimeError: if the server response is malformed """ response = self._query( url="https://api.twitch.tv/helix/videos", method="GET", data={"user_id": channel_id}, ) if "data" not in response: raise RuntimeError("Unexpected response from Twitch API") return response["data"] @cached(cache=TTLCache(maxsize=1000, ttl=10 * 60)) def stream(self, channel_id: str) -> Optional[Any]: """ Get the information about the stream currently active on a channel. See for details. :param channel_id: channel ID :returns: stream information or None :throws HTTPException: if the query fails :throws RuntimeError: if the server response is malformed """ response = self._query( url="https://api.twitch.tv/helix/streams", method="GET", data={"user_id": channel_id}, ) if "data" not in response or not response["data"]: return None if response["data"][0]["type"] != "live": return None return response["data"][0]