openalex: Create endpoint

This commit is contained in:
Mattéo Delabre 2023-05-13 12:35:40 -04:00
parent 23c2b4b664
commit 2f8f7f1804
Signed by: matteo
GPG Key ID: AE3FBD02DC583ABB
4 changed files with 217 additions and 1 deletions

View File

@ -3,13 +3,14 @@ import logging
import sys import sys
from os import environ from os import environ
from flask import Flask from flask import Flask
from . import twitch, youtube from . import twitch, youtube, openalex
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
blueprints = { blueprints = {
"twitch": twitch, "twitch": twitch,
"youtube": youtube, "youtube": youtube,
"openalex": openalex,
} }

View File

@ -0,0 +1,18 @@
from flask import abort, Blueprint
from .openalex import APIClient
from .feed import construct_rss
def create_blueprint(config):
"""Create an OpenAlex endpoint blueprint."""
client = APIClient(config["email"])
openalex = Blueprint("openalex", __name__)
@openalex.route("/<string:name>", methods=["GET", "HEAD"])
def get(name: str):
return (
construct_rss(client, name),
{"Content-Type": "application/rss+xml"},
)
return openalex

View File

@ -0,0 +1,41 @@
from ..feedformatter import Feed
from .openalex import APIClient
def construct_rss(client: APIClient, name: str) -> str:
"""
Build a RSS stream for an academic author.
:param client: OpenAlex API client
:param name: author display name
:returns: RSS stream
:raises HTTPException: if one of the requests fail
"""
author_list = client.author(name)
works = client.works(author_list)
feed = Feed()
# Set the feed/author level properties
feed.feed["title"] = name
feed.feed["link"] = "https://example.org"
feed.feed["description"] = f"Latest works of {name} from the OpenAlex dataset."
feed.feed["author"] = "Feedleware"
feed.feed["ttl"] = "30"
for work in works:
feed.items.append({
"guid": work["id"],
"title": work["title"],
"link": work["url"],
"description": (
"; ".join(work["authors"])
+ "<br>"
+ work["source"]
+ "<br><br>"
+ work["description"]
),
"pubDate": work["date"].timetuple(),
})
return feed.format_rss2_string()

View File

@ -0,0 +1,156 @@
import gzip
import http
import json
import logging
import urllib
import urllib.request
from typing import Any, Iterable, Tuple
from cachetools import cached, TTLCache
from ..util import send_with_retry, parse_iso_date
HTTPError = urllib.error.HTTPError
HTTPRequest = urllib.request.Request
HTTPResponse = http.client.HTTPResponse
HTTPException = http.client.HTTPException
logger = logging.getLogger(__name__)
class APIClient:
"""Client for the OpenAlex API."""
def __init__(self, email: str = "", retries: int = 3):
"""
Create an OpenAlex API client.
See <https://docs.openalex.org> for details.
:param email: Contact email.
:param retries: number of times to retry each request in case of failure
"""
self.email: str = email
self.retries: int = retries
def _query(
self,
url: str,
method: str = "GET",
data: Iterable[Tuple[str, str]] = []
) -> Any:
"""
Low-level method to query the API.
:param url: URL to query
:param method: HTTP method to use
:param data: payload dictionary to send
:returns: JSON data
:throws HTTPException: if the query fails
"""
logger.debug("Querying %s %s %s", method, url, data)
headers = {
"Accept": "application/json",
"Accept-Encoding": "gzip",
}
payload = (
*data,
("mailto", self.email),
)
request = HTTPRequest(
url=f"{url}?{urllib.parse.urlencode(payload)}",
headers=headers,
method=method,
)
http_response = send_with_retry(request, self.retries)
if http_response.info().get("Content-Encoding") == "gzip":
return json.loads(gzip.decompress(http_response.read()))
else:
return json.loads(http_response.read())
@cached(cache=TTLCache(maxsize=1000, ttl=7 * 24 * 60 * 60))
def author(self, name: str) -> Any:
"""
Search for an author by their display name.
See <https://docs.openalex.org/api-entities/authors> for details.
:param name: author display name
:returns: list of matching author ids
:throws HTTPException: if the query fails
"""
authors_response = self._query(
url="https://api.openalex.org/authors",
method="GET",
data=(
("search", name),
("per-page", "50"),
),
)
return tuple(
author["id"].removeprefix("https://openalex.org/")
for author in authors_response["results"]
)
@cached(cache=TTLCache(maxsize=1000, ttl=30 * 60))
def works(self, author_ids: Iterable[str]) -> Any:
"""
Get list of most recent works by a set of authors.
See <https://docs.openalex.org/api-entities/works> for details.
:param author_ids: set of authors
:returns: list of latest works
:throws HTTPException: if the query fails
"""
works_response = self._query(
url="https://api.openalex.org/works",
method="GET",
data=(
("filter", f"author.id:{'|'.join(author_ids)}"),
("sort", "publication_date:desc"),
("per-page", "50"),
),
)
results = []
for work in works_response["results"]:
abstract = "No abstract"
source = "Unknown"
authors = [
authorship["author"]["display_name"]
for authorship in work["authorships"]
]
if work["abstract_inverted_index"] is not None:
abstract_reverse = work["abstract_inverted_index"]
abstract_forward = {}
for word, positions in abstract_reverse.items():
for position in positions:
abstract_forward[position] = word
abstract = " ".join(map(
lambda item: item[1],
sorted(abstract_forward.items()),
))
if work["primary_location"]["source"] is not None:
source = work["primary_location"]["source"]["display_name"]
results.append({
"id": work["id"],
"title": work["title"],
"description": abstract,
"authors": authors,
"source": source,
"date": parse_iso_date(work["publication_date"]),
"url": work["doi"],
})
return results