feedleware/feedleware/youtube/youtube.py

149 lines
4.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import gzip
import http
import json
import logging
import urllib
import urllib.request
from typing import Any, Iterable, Tuple
from cachetools import cached, TTLCache
from ..util import send_with_retry
HTTPError = urllib.error.HTTPError
HTTPRequest = urllib.request.Request
HTTPResponse = http.client.HTTPResponse
HTTPException = http.client.HTTPException
logger = logging.getLogger(__name__)
class NoSuchChannel(Exception):
"""Raised when an unknown channel is queried."""
class APIClient:
"""Client for the YouTube Data API."""
def __init__(self, key: str = "", retries: int = 3):
"""
Create a YouTube Data API client.
See <https://developers.google.com/youtube/v3/docs> for details.
:param key: YouTube API key
:param retries: number of times to retry each request in case of failure
"""
self.key = key
self.retries = retries
def _query(
self,
url: str,
method: str = "GET",
data: Iterable[Tuple[str, str]] = []
) -> Any:
"""
Low-level method to query the API.
:param url: URL to query
:param method: HTTP method to use
:param data: payload dictionary to send
:returns: JSON data
:throws HTTPException: if the query fails
"""
logger.debug("Querying %s %s %s", method, url, data)
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
}
payload = (
*data,
("key", self.key),
)
request = HTTPRequest(
url=f"{url}?{urllib.parse.urlencode(payload)}",
headers=headers,
method=method,
)
http_response = send_with_retry(request, self.retries)
if http_response.info().get("Content-Encoding") == "gzip":
return json.loads(gzip.decompress(http_response.read()))
else:
return json.loads(http_response.read())
@cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60))
def channel(self, channel_id: str) -> Any:
"""
Get information about a channel.
See <https://developers.google.com/youtube/v3/docs/channels>
for details.
:param channel_id: channel ID
:returns: channel information
:throws HTTPException: if the query fails
:throws NoSuchChannel: if the channel doesnt exist
"""
response = self._query(
url="https://youtube.googleapis.com/youtube/v3/channels",
method="GET",
data=(
("part", "id"),
("part", "snippet"),
("part", "contentDetails"),
("id", channel_id),
("maxResults", 1),
)
)
if response["pageInfo"]["totalResults"] == 0:
raise NoSuchChannel(f"Channel '{channel_id}' does not exist")
data = response["items"][0]
return {
"id": data["id"],
"playlist": data["contentDetails"]["relatedPlaylists"]["uploads"],
**response["items"][0]["snippet"],
}
@cached(cache=TTLCache(maxsize=1000, ttl=30 * 60))
def playlist(self, playlist_id: str) -> Any:
"""
Get the latest videos from a playlist.
See <https://developers.google.com/youtube/v3/docs/playlistItems>
for details.
:param playlist_id: channel ID
:returns: list of latest videos
:throws HTTPException: if the query fails
"""
try:
response = self._query(
url="https://youtube.googleapis.com/youtube/v3/playlistItems",
method="GET",
data=(
("part", "snippet"),
("part", "status"),
("playlistId", playlist_id),
("maxResults", 50),
)
)
except HTTPError as err:
if err.code == 404:
return []
raise err
return [
item["snippet"]
for item in response["items"]
if item["status"]["privacyStatus"] == "public"
and item["snippet"]["resourceId"]["kind"] == "youtube#video"
]