feedleware/feedleware/youtube/youtube.py

185 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gzip
import http
import json
import logging
import urllib
import urllib.request
from typing import Any, Iterable, Tuple
from cachetools import cached, TTLCache
from ..util import send_with_retry, parse_iso_date, parse_iso_duration
HTTPError = urllib.error.HTTPError
HTTPRequest = urllib.request.Request
HTTPResponse = http.client.HTTPResponse
HTTPException = http.client.HTTPException
logger = logging.getLogger(__name__)
class NoSuchChannel(Exception):
"""Raised when an unknown channel is queried."""
class APIClient:
"""Client for the YouTube Data API."""
def __init__(self, key: str = "", retries: int = 3):
"""
Create a YouTube Data API client.
See <https://developers.google.com/youtube/v3/docs> for details.
:param key: YouTube API key
:param retries: number of times to retry each request in case of failure
"""
self.key = key
self.retries = retries
def _query(
self,
url: str,
method: str = "GET",
data: Iterable[Tuple[str, str]] = []
) -> Any:
"""
Low-level method to query the API.
:param url: URL to query
:param method: HTTP method to use
:param data: payload dictionary to send
:returns: JSON data
:throws HTTPException: if the query fails
"""
logger.debug("Querying %s %s %s", method, url, data)
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
}
payload = (
*data,
("key", self.key),
)
request = HTTPRequest(
url=f"{url}?{urllib.parse.urlencode(payload)}",
headers=headers,
method=method,
)
http_response = send_with_retry(request, self.retries)
if http_response.info().get("Content-Encoding") == "gzip":
return json.loads(gzip.decompress(http_response.read()))
else:
return json.loads(http_response.read())
@cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60))
def channel(self, channel_id: str) -> Any:
"""
Get information about a channel.
See <https://developers.google.com/youtube/v3/docs/channels>
for details.
:param channel_id: channel ID
:returns: channel information
:throws HTTPException: if the query fails
:throws NoSuchChannel: if the channel doesnt exist
"""
response = self._query(
url="https://youtube.googleapis.com/youtube/v3/channels",
method="GET",
data=(
("part", "id"),
("part", "snippet"),
("part", "contentDetails"),
("id", channel_id),
("maxResults", 1),
)
)
if response["pageInfo"]["totalResults"] == 0:
raise NoSuchChannel(f"Channel '{channel_id}' does not exist")
data = response["items"][0]
return {
"id": data["id"],
"playlist": data["contentDetails"]["relatedPlaylists"]["uploads"],
**response["items"][0]["snippet"],
}
@cached(cache=TTLCache(maxsize=1000, ttl=30 * 60))
def playlist(self, playlist_id: str) -> Any:
"""
Get the latest videos from a playlist.
See <https://developers.google.com/youtube/v3/docs/playlistItems>
for details.
:param playlist_id: channel ID
:returns: list of latest videos
:throws HTTPException: if the query fails
"""
# Query list of latest videos
try:
playlist_response = self._query(
url="https://youtube.googleapis.com/youtube/v3/playlistItems",
method="GET",
data=(
("part", "snippet"),
("part", "status"),
("part", "contentDetails"),
("playlistId", playlist_id),
("maxResults", 50),
)
)
except HTTPError as err:
if err.code == 404:
return []
raise err
# Filter only public videos
videos = [
item["snippet"]
for item in playlist_response["items"]
if item["status"]["privacyStatus"] == "public"
and item["snippet"]["resourceId"]["kind"] == "youtube#video"
]
# Retrieve video durations
videos_response = self._query(
url="https://youtube.googleapis.com/youtube/v3/videos",
method="GET",
data=(
*[("id", video["resourceId"]["videoId"]) for video in videos],
("part", "contentDetails"),
),
)
# Merge and normalize data
results = []
for video_item, detail_item in zip(videos, videos_response["items"]):
video_id = video_item["resourceId"]["videoId"]
thumbnail = ""
for size in ("standard", "maxres", *video_item["thumbnails"].keys()):
if size in video_item["thumbnails"]:
thumbnail = video_item["thumbnails"][size]["url"]
results.append({
"id": video_id,
"title": video_item.get("title", "Untitled Video"),
"description": video_item["description"],
"url": f"https://www.youtube.com/watch?v={video_id}",
"thumbnail": thumbnail,
"published": parse_iso_date(video_item["publishedAt"]),
"duration": parse_iso_duration(detail_item["contentDetails"]["duration"]),
})
return results