185 lines
5.6 KiB
Python
185 lines
5.6 KiB
Python
import gzip
|
||
import http
|
||
import json
|
||
import logging
|
||
import urllib
|
||
import urllib.request
|
||
from typing import Any, Iterable, Tuple
|
||
from cachetools import cached, TTLCache
|
||
from ..util import send_with_retry, parse_iso_date, parse_iso_duration
|
||
|
||
|
||
HTTPError = urllib.error.HTTPError
|
||
HTTPRequest = urllib.request.Request
|
||
HTTPResponse = http.client.HTTPResponse
|
||
HTTPException = http.client.HTTPException
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class NoSuchChannel(Exception):
|
||
"""Raised when an unknown channel is queried."""
|
||
|
||
|
||
class APIClient:
|
||
"""Client for the YouTube Data API."""
|
||
|
||
def __init__(self, key: str = "", retries: int = 3):
|
||
"""
|
||
Create a YouTube Data API client.
|
||
|
||
See <https://developers.google.com/youtube/v3/docs> for details.
|
||
|
||
:param key: YouTube API key
|
||
:param retries: number of times to retry each request in case of failure
|
||
"""
|
||
self.key = key
|
||
self.retries = retries
|
||
|
||
def _query(
|
||
self,
|
||
url: str,
|
||
method: str = "GET",
|
||
data: Iterable[Tuple[str, str]] = []
|
||
) -> Any:
|
||
"""
|
||
Low-level method to query the API.
|
||
|
||
:param url: URL to query
|
||
:param method: HTTP method to use
|
||
:param data: payload dictionary to send
|
||
:returns: JSON data
|
||
:throws HTTPException: if the query fails
|
||
"""
|
||
logger.debug("Querying %s %s %s", method, url, data)
|
||
|
||
headers = {
|
||
"Accept": "application/json",
|
||
"Accept-Encoding": "gzip",
|
||
}
|
||
|
||
payload = (
|
||
*data,
|
||
("key", self.key),
|
||
)
|
||
|
||
request = HTTPRequest(
|
||
url=f"{url}?{urllib.parse.urlencode(payload)}",
|
||
headers=headers,
|
||
method=method,
|
||
)
|
||
|
||
http_response = send_with_retry(request, self.retries)
|
||
|
||
if http_response.info().get("Content-Encoding") == "gzip":
|
||
return json.loads(gzip.decompress(http_response.read()))
|
||
else:
|
||
return json.loads(http_response.read())
|
||
|
||
@cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60))
|
||
def channel(self, channel_id: str) -> Any:
|
||
"""
|
||
Get information about a channel.
|
||
|
||
See <https://developers.google.com/youtube/v3/docs/channels>
|
||
for details.
|
||
|
||
:param channel_id: channel ID
|
||
:returns: channel information
|
||
:throws HTTPException: if the query fails
|
||
:throws NoSuchChannel: if the channel doesn’t exist
|
||
"""
|
||
response = self._query(
|
||
url="https://youtube.googleapis.com/youtube/v3/channels",
|
||
method="GET",
|
||
data=(
|
||
("part", "id"),
|
||
("part", "snippet"),
|
||
("part", "contentDetails"),
|
||
("id", channel_id),
|
||
("maxResults", 1),
|
||
)
|
||
)
|
||
|
||
if response["pageInfo"]["totalResults"] == 0:
|
||
raise NoSuchChannel(f"Channel '{channel_id}' does not exist")
|
||
|
||
data = response["items"][0]
|
||
|
||
return {
|
||
"id": data["id"],
|
||
"playlist": data["contentDetails"]["relatedPlaylists"]["uploads"],
|
||
**response["items"][0]["snippet"],
|
||
}
|
||
|
||
@cached(cache=TTLCache(maxsize=1000, ttl=30 * 60))
|
||
def playlist(self, playlist_id: str) -> Any:
|
||
"""
|
||
Get the latest videos from a playlist.
|
||
|
||
See <https://developers.google.com/youtube/v3/docs/playlistItems>
|
||
for details.
|
||
|
||
:param playlist_id: channel ID
|
||
:returns: list of latest videos
|
||
:throws HTTPException: if the query fails
|
||
"""
|
||
# Query list of latest videos
|
||
try:
|
||
playlist_response = self._query(
|
||
url="https://youtube.googleapis.com/youtube/v3/playlistItems",
|
||
method="GET",
|
||
data=(
|
||
("part", "snippet"),
|
||
("part", "status"),
|
||
("part", "contentDetails"),
|
||
("playlistId", playlist_id),
|
||
("maxResults", 50),
|
||
)
|
||
)
|
||
except HTTPError as err:
|
||
if err.code == 404:
|
||
return []
|
||
|
||
raise err
|
||
|
||
# Filter only public videos
|
||
videos = [
|
||
item["snippet"]
|
||
for item in playlist_response["items"]
|
||
if item["status"]["privacyStatus"] == "public"
|
||
and item["snippet"]["resourceId"]["kind"] == "youtube#video"
|
||
]
|
||
|
||
# Retrieve video durations
|
||
videos_response = self._query(
|
||
url="https://youtube.googleapis.com/youtube/v3/videos",
|
||
method="GET",
|
||
data=(
|
||
*[("id", video["resourceId"]["videoId"]) for video in videos],
|
||
("part", "contentDetails"),
|
||
),
|
||
)
|
||
|
||
# Merge and normalize data
|
||
results = []
|
||
|
||
for video_item, detail_item in zip(videos, videos_response["items"]):
|
||
video_id = video_item["resourceId"]["videoId"]
|
||
thumbnail = ""
|
||
|
||
for size in ("standard", "maxres", *video_item["thumbnails"].keys()):
|
||
if size in video_item["thumbnails"]:
|
||
thumbnail = video_item["thumbnails"][size]["url"]
|
||
|
||
results.append({
|
||
"id": video_id,
|
||
"title": video_item.get("title", "Untitled Video"),
|
||
"description": video_item["description"],
|
||
"url": f"https://www.youtube.com/watch?v={video_id}",
|
||
"thumbnail": thumbnail,
|
||
"published": parse_iso_date(video_item["publishedAt"]),
|
||
"duration": parse_iso_duration(detail_item["contentDetails"]["duration"]),
|
||
})
|
||
|
||
return results
|