import gzip import http import json import logging import urllib import urllib.request from typing import Any, Iterable, Tuple from cachetools import cached, TTLCache from ..util import send_with_retry, parse_iso_date, parse_iso_duration HTTPError = urllib.error.HTTPError HTTPRequest = urllib.request.Request HTTPResponse = http.client.HTTPResponse HTTPException = http.client.HTTPException logger = logging.getLogger(__name__) class NoSuchChannel(Exception): """Raised when an unknown channel is queried.""" class APIClient: """Client for the YouTube Data API.""" def __init__(self, key: str = "", retries: int = 3): """ Create a YouTube Data API client. See for details. :param key: YouTube API key :param retries: number of times to retry each request in case of failure """ self.key = key self.retries = retries def _query( self, url: str, method: str = "GET", data: Iterable[Tuple[str, str]] = [] ) -> Any: """ Low-level method to query the API. :param url: URL to query :param method: HTTP method to use :param data: payload dictionary to send :returns: JSON data :throws HTTPException: if the query fails """ logger.debug("Querying %s %s %s", method, url, data) headers = { "Accept": "application/json", "Accept-Encoding": "gzip", } payload = ( *data, ("key", self.key), ) request = HTTPRequest( url=f"{url}?{urllib.parse.urlencode(payload)}", headers=headers, method=method, ) http_response = send_with_retry(request, self.retries) if http_response.info().get("Content-Encoding") == "gzip": return json.loads(gzip.decompress(http_response.read())) else: return json.loads(http_response.read()) @cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60)) def channel(self, channel_id: str) -> Any: """ Get information about a channel. See for details. :param channel_id: channel ID :returns: channel information :throws HTTPException: if the query fails :throws NoSuchChannel: if the channel doesn’t exist """ response = self._query( url="https://youtube.googleapis.com/youtube/v3/channels", method="GET", data=( ("part", "id"), ("part", "snippet"), ("part", "contentDetails"), ("id", channel_id), ("maxResults", 1), ) ) if response["pageInfo"]["totalResults"] == 0: raise NoSuchChannel(f"Channel '{channel_id}' does not exist") data = response["items"][0] return { "id": data["id"], "playlist": data["contentDetails"]["relatedPlaylists"]["uploads"], **response["items"][0]["snippet"], } @cached(cache=TTLCache(maxsize=1000, ttl=30 * 60)) def playlist(self, playlist_id: str) -> Any: """ Get the latest videos from a playlist. See for details. :param playlist_id: channel ID :returns: list of latest videos :throws HTTPException: if the query fails """ # Query list of latest videos try: playlist_response = self._query( url="https://youtube.googleapis.com/youtube/v3/playlistItems", method="GET", data=( ("part", "snippet"), ("part", "status"), ("part", "contentDetails"), ("playlistId", playlist_id), ("maxResults", 50), ) ) except HTTPError as err: if err.code == 404: return [] raise err # Filter only public videos videos = [ item["snippet"] for item in playlist_response["items"] if item["status"]["privacyStatus"] == "public" and item["snippet"]["resourceId"]["kind"] == "youtube#video" ] # Retrieve video durations videos_response = self._query( url="https://youtube.googleapis.com/youtube/v3/videos", method="GET", data=( *[("id", video["resourceId"]["videoId"]) for video in videos], ("part", "contentDetails"), ), ) # Merge and normalize data results = [] for video_item, detail_item in zip(videos, videos_response["items"]): video_id = video_item["resourceId"]["videoId"] thumbnail = "" for size in ("standard", "maxres", *video_item["thumbnails"].keys()): if size in video_item["thumbnails"]: thumbnail = video_item["thumbnails"][size]["url"] results.append({ "id": video_id, "title": video_item.get("title", "Untitled Video"), "description": video_item["description"], "url": f"https://www.youtube.com/watch?v={video_id}", "thumbnail": thumbnail, "published": parse_iso_date(video_item["publishedAt"]), "duration": parse_iso_duration(detail_item["contentDetails"]["duration"]), }) return results