feedleware/feedleware/youtube/youtube.py

185 lines
5.6 KiB
Python
Raw Normal View History

2021-09-12 22:00:24 +00:00
import gzip
import http
import json
import logging
import urllib
import urllib.request
from typing import Any, Iterable, Tuple
2021-09-12 22:00:24 +00:00
from cachetools import cached, TTLCache
from ..util import send_with_retry, parse_iso_date, parse_iso_duration
2021-09-12 22:00:24 +00:00
HTTPError = urllib.error.HTTPError
2021-09-12 22:00:24 +00:00
HTTPRequest = urllib.request.Request
HTTPResponse = http.client.HTTPResponse
HTTPException = http.client.HTTPException
logger = logging.getLogger(__name__)
class NoSuchChannel(Exception):
"""Raised when an unknown channel is queried."""
class APIClient:
"""Client for the YouTube Data API."""
def __init__(self, key: str = "", retries: int = 3):
"""
Create a YouTube Data API client.
See <https://developers.google.com/youtube/v3/docs> for details.
:param key: YouTube API key
:param retries: number of times to retry each request in case of failure
"""
self.key = key
self.retries = retries
def _query(
self,
url: str,
method: str = "GET",
data: Iterable[Tuple[str, str]] = []
) -> Any:
"""
Low-level method to query the API.
:param url: URL to query
:param method: HTTP method to use
:param data: payload dictionary to send
:returns: JSON data
:throws HTTPException: if the query fails
"""
logger.debug("Querying %s %s %s", method, url, data)
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
}
payload = (
*data,
("key", self.key),
)
request = HTTPRequest(
url=f"{url}?{urllib.parse.urlencode(payload)}",
headers=headers,
method=method,
)
http_response = send_with_retry(request, self.retries)
if http_response.info().get("Content-Encoding") == "gzip":
return json.loads(gzip.decompress(http_response.read()))
else:
return json.loads(http_response.read())
@cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60))
2021-09-12 22:00:24 +00:00
def channel(self, channel_id: str) -> Any:
"""
Get information about a channel.
See <https://developers.google.com/youtube/v3/docs/channels>
for details.
:param channel_id: channel ID
:returns: channel information
:throws HTTPException: if the query fails
:throws NoSuchChannel: if the channel doesnt exist
"""
response = self._query(
url="https://youtube.googleapis.com/youtube/v3/channels",
method="GET",
data=(
("part", "id"),
("part", "snippet"),
("part", "contentDetails"),
("id", channel_id),
("maxResults", 1),
)
)
if response["pageInfo"]["totalResults"] == 0:
raise NoSuchChannel(f"Channel '{channel_id}' does not exist")
data = response["items"][0]
return {
"id": data["id"],
"playlist": data["contentDetails"]["relatedPlaylists"]["uploads"],
**response["items"][0]["snippet"],
}
@cached(cache=TTLCache(maxsize=1000, ttl=30 * 60))
def playlist(self, playlist_id: str) -> Any:
"""
Get the latest videos from a playlist.
See <https://developers.google.com/youtube/v3/docs/playlistItems>
for details.
:param playlist_id: channel ID
:returns: list of latest videos
:throws HTTPException: if the query fails
"""
# Query list of latest videos
try:
playlist_response = self._query(
url="https://youtube.googleapis.com/youtube/v3/playlistItems",
method="GET",
data=(
("part", "snippet"),
("part", "status"),
("part", "contentDetails"),
("playlistId", playlist_id),
("maxResults", 50),
)
2021-09-12 22:00:24 +00:00
)
except HTTPError as err:
if err.code == 404:
return []
raise err
2021-09-12 22:00:24 +00:00
# Filter only public videos
videos = [
2021-09-12 22:00:24 +00:00
item["snippet"]
for item in playlist_response["items"]
2021-09-12 22:00:24 +00:00
if item["status"]["privacyStatus"] == "public"
and item["snippet"]["resourceId"]["kind"] == "youtube#video"
]
# Retrieve video durations
videos_response = self._query(
url="https://youtube.googleapis.com/youtube/v3/videos",
method="GET",
data=(
*[("id", video["resourceId"]["videoId"]) for video in videos],
("part", "contentDetails"),
),
)
# Merge and normalize data
results = []
for video_item, detail_item in zip(videos, videos_response["items"]):
video_id = video_item["resourceId"]["videoId"]
thumbnail = ""
for size in ("standard", "maxres", *video_item["thumbnails"].keys()):
if size in video_item["thumbnails"]:
thumbnail = video_item["thumbnails"][size]["url"]
results.append({
"id": video_id,
"title": video_item.get("title", "Untitled Video"),
"description": video_item["description"],
"url": f"https://www.youtube.com/watch?v={video_id}",
"thumbnail": thumbnail,
"published": parse_iso_date(video_item["publishedAt"]),
"duration": parse_iso_duration(detail_item["contentDetails"]["duration"]),
})
return results