wikimedica-disease-search/data/fetch/wikipedia_pageviews.py

183 lines
5.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import collections
from datetime import datetime, timedelta
from .http import session
import numpy
from scipy import stats
# Chemin racine pour les API Wikimedia
pageviews_root = 'https://wikimedia.org/api/rest_v1'
# Patron daccès à lAPI pageviews pour un article
pageviews_article_endpoint = '/' + '/'.join([
'metrics', 'pageviews', 'per-article', '{project}',
'{access}', '{agent}', '{article}', '{granularity}',
'{start}', '{end}'
])
# Chemin daccès à lAPI pageviews pour un projet complet
pageviews_project_endpoint = '/' + '/'.join([
'metrics', 'pageviews', 'aggregate', '{project}',
'{access}', '{agent}', '{granularity}',
'{start}', '{end}'
])
# Format de dates utilisée pour lAPI Wikimedia
pageviews_date_format = '%Y%m%d'
# Date de première disponibilité des pageviews sur lAPI Wikimedia
pageviews_first_data = datetime(2015, 7, 1)
# Date de dernière disponibilité des pageviews sur lAPI Wikimedia
pageviews_last_data = datetime.today() - timedelta(days=1)
# Tableau contenant tous les jours de lannée de 1 à 365
year_all_days = numpy.arange(1, 366)
def _year_date_distance(a, b):
"""
Calcule la distance entre deux jours de lannée.
:param a: Première valeur (peut être un tableau numpy).
:param b: Seconde valeur (peut être un tableau numpy).
:return: Valeur de la distance.
>>> _year_date_distance(10, 360)
15
>>> _year_date_distance(numpy.array([10, 182, 355]), 182)
[172, 0, 173]
"""
return numpy.stack((
numpy.mod(a - b, len(year_all_days)),
numpy.mod(b - a, len(year_all_days))
)).min(axis=0)
def smooth(views, scale):
"""
Lisse un tableau de valeurs contenant une valeur par jour de lannée en
utilisant un noyau gaussien.
À la bordure de fin ou de début dannée, le lissage est fait avec lautre
morceau de lannée.
:param views: Données à lisser.
:param scale: Variance du noyau gaussien.
:return: Données lissées.
"""
ref_pdf = stats.norm.pdf(
_year_date_distance(year_all_days, 1),
scale=scale
)
pdf_matrix = numpy.stack([
numpy.roll(ref_pdf, day - 1)
for day in year_all_days
])
return pdf_matrix.dot(views)
def get_aggregate(project):
"""
Obtient le nombre de visites sur Wikipédia par jour.
:param project: Projet Wikipédia ciblé.
:return: Compteur associant chaque jour à son nombre de visites.
"""
res = session.get(pageviews_root + pageviews_project_endpoint.format(
project=project,
access='all-access',
agent='user',
granularity='daily',
start=pageviews_first_data.strftime(pageviews_date_format),
end=pageviews_last_data.strftime(pageviews_date_format)
))
data = res.json()
# Vérifie que la réponse reçue indique un succès
if res.status_code != 200:
if 'detail' in data:
detail = data['detail']
message = ', '.join(detail) if type(detail) == list else detail
raise Exception(message)
else:
raise Exception('Erreur {}'.format(res.status_code))
# Construit le dictionnaire résultant
return collections.Counter(dict(
(record['timestamp'][:8], record['views'])
for record in data['items']
))
def get_article(project, article):
"""
Obtient le nombre de visites sur un article Wikipédia par jour.
:param project: Projet Wikipédia ciblé.
:param article: Article ciblé dans le site.
:return: Compteur associant chaque jour à son nombre de visites.
"""
res = session.get(pageviews_root + pageviews_article_endpoint.format(
project=project,
article=article,
access='all-access',
agent='user',
granularity='daily',
start=pageviews_first_data.strftime(pageviews_date_format),
end=pageviews_last_data.strftime(pageviews_date_format)
))
data = res.json()
# Vérifie que la réponse reçue indique un succès
if res.status_code != 200:
if 'detail' in data:
detail = data['detail']
message = ', '.join(detail) if type(detail) == list else detail
raise Exception(message)
else:
raise Exception('Erreur {}'.format(res.status_code))
# Construit le dictionnaire résultant
return collections.Counter(dict(
(record['timestamp'][:8], record['views'])
for record in data['items']
))
def mean(views):
"""
Calcule les vues moyennes par jour de lannée à partir dun ensemble de
vues enregistrées (omettant le 29 février pour les années bissextiles).
:param views: Vues enregistrées par date.
:return: Tableau de taille 365 contenant le nombre de visites moyennes pour
chaque jour de lannée.
"""
# Fait la moyenne pour chaque jour hormis le 29 février
accumulator = {}
datemonth_format = '%m%d'
for day in range(1, 366):
datemonth = datetime.fromordinal(day).strftime(datemonth_format)
accumulator[datemonth] = []
for date_str, views in views.items():
date = datetime.strptime(date_str, pageviews_date_format)
if not (date.month == 2 and date.day == 29):
datemonth = date.strftime(datemonth_format)
accumulator[datemonth].append(views)
for datemonth, value in accumulator.items():
accumulator[datemonth] = sum(value) / len(value) if value else 0
# Rassemble les valeurs moyennes pour chaque jour dans l'ordre de l'année
return [item[1] for item in sorted(
list(accumulator.items()),
key=lambda x: x[0]
)]