Add working realtime endpoint in backend

This commit is contained in:
Mattéo Delabre 2020-07-17 16:40:09 +02:00
parent dd154afa9d
commit 2c3b16caef
Signed by: matteo
GPG Key ID: AE3FBD02DC583ABB
3 changed files with 90 additions and 171 deletions

View File

@ -1,177 +1,73 @@
const axios = require('axios'); const tam = require('./sources/tam');
const csv = require('csv-parse'); const util = require('../util');
const network = require('./network'); /**
const {TAM_REALTIME} = require('./endpoints'); * Fetch real-time information about courses in the TaM network.
*
* New data will only be fetched from the TaM server if necessary, otherwise
* pulling from the in-memory cache.
*
* @return Mapping from active course IDs to current information about the
* course, including its line number, next stop and next stop estimated
* time of arrival (ETA).
*/
let nextUpdate = null;
let currentCourses = null;
const sortByFirstKey = (a, b) => a[0] - b[0]; const getCourses = () => new Promise((res, rej) =>
const fetchRealtime = () => new Promise((res, rej) =>
{ {
const stream = axios.get(TAM_REALTIME, { if (nextUpdate !== null && Date.now() < nextUpdate)
responseType: 'stream'
}).then(stream =>
{ {
const parser = csv({ res(currentCourses);
delimiter: ';', return;
}); }
const courses = {}; const courses = {};
stream.pipe(parser); let lastUpdate = null;
stream.on('readable', () => tam.fetchRealtime((err, entry) =>
{ {
let row; if (err)
while (row = stream.read())
{ {
if (row.length === 0 || row[0] === 'course') rej(err);
{ return;
// Ignore les lignes invalides et len-tête
continue;
} }
const course = row[0]; if (!util.isObject(entry))
const stopRef = row[2];
const lineRef = row[4];
const eta = row[9];
const destinationRef = row[10];
if (!(course in courses))
{ {
courses[course] = { currentCourses = courses;
lineRef, res(currentCourses);
destinationRef, return;
stops: [],
};
} }
courses[course].stops.push([parseInt(eta, 10), stopRef]); if ('lastUpdate' in entry)
courses[course].stops.sort(sortByFirstKey); {
lastUpdate = entry.lastUpdate;
nextUpdate = entry.nextUpdate;
return;
} }
});
stream.on('end', () => res(courses)); const {
stream.on('error', err => rej(err)); course: id,
routeShortName: line,
stopId: nextStop,
destArCode: finalStop,
} = entry;
const arrivalTime = lastUpdate + parseInt(entry.delaySec, 10) * 1000;
if (!(id in courses))
{
courses[id] = {id, line, nextStop, arrivalTime, finalStop};
}
else if (arrivalTime < courses[id].arrivalTime)
{
// The stop where the next passing is soonest is assumed
// to be the next stop
courses[id].nextStop = nextStop;
courses[id].arrivalTime = arrivalTime;
}
}); });
}); });
const updateVehicles = async (lines, vehicles) => exports.getCourses = getCourses;
{
const courses = await fetchRealtime();
const currentTime = Math.floor(Date.now() / 1000);
for (let [courseRef, course] of Object.entries(courses))
{
if (course.lineRef in lines)
{
if (!(courseRef in vehicles))
{
// New vehicle: identify which route it pertains to
const line = lines[course.lineRef];
let routeIndex = null;
for (let [index, route] of Object.entries(line.routes))
{
const destRef = route.stops[route.stops.length - 1].ref;
if (destRef === course.destinationRef)
{
routeIndex = index;
}
}
if (routeIndex !== null)
{
const route = line.routes[routeIndex];
// Convert ETAs to absolute times
const nextStops = course.stops.map(([eta, ref]) => [
eta + currentTime,
ref
]);
// Convert stop refs to indices
const stopIndices = course.stops.map(([eta, ref]) => [
eta,
]);
// Find the preceding stop from which the vehicle is coming
const arrivingStop = stopIndices[0][1];
const arrivingStopIndex = route.stops.findIndex(
stop => stop.ref === arrivingStop
);
const leavingStop = arrivingStopIndex === 0
? route.stops[0]
: route.stops[arrivingStopIndex - 1];
if (nextStop === 0)
{
// Vehicle at starting point
vehicles[courseRef] = {
lineRef: course.lineRef,
stopRef
stopIndex: 0,
nextStops: stopIndices,
distance: 0,
speed: 0,
};
}
else
{
// Vehicle in transit between two stops
vehicles[courseRef] = {
lineRef: course.lineRef,
routeIndex,
stopIndex: nextStop - 1,
nextStops: stopIndices,
distance: 0,
speed: route.distances[nextStop - 1] / eta,
};
}
}
}
else
{
// Existing vehicle: update information
const vehicle = vehicles[courseRef];
const line = lines[vehicle.lineRef];
const route = line.routes[vehicle.routeIndex];
// Convert stop refs to indices
const stopIndices = course.stops.map(([eta, ref]) => [
eta,
route.stops.findIndex(stop => stop.ref === ref),
]);
console.log(stopIndices);
console.log(vehicle);
console.log(course);
console.log('---');
}
}
}
};
const sleep = time => new Promise(res => setTimeout(res, time));
const updateLoop = async (lines, vehicles = {}) =>
{
await updateVehicles(lines, vehicles);
await sleep(30000);
return updateLoop(lines, vehicles);
};
(async () =>
{
const lines = {'1': await network.fetchLineData('1')};
updateLoop(lines);
// console.log(require('util').inspect(vehicles, true, 10));
})();

View File

@ -52,17 +52,36 @@ const processTamPassingStream = (csvStream, callback) =>
const tamRealtimeEndpoint = 'http://data.montpellier3m.fr/node/10732/download'; const tamRealtimeEndpoint = 'http://data.montpellier3m.fr/node/10732/download';
/** /**
* Fetch realtime passings for the current day across the network. * Fetch realtime passings across the network.
* *
* @param callback Called for each passing during parsing. First argument will * The callback always receives two arguments. If an error occurs, the first
* be non-null only if an error occurred. Second argument will contain passings * argument will contain an object with information about the error. Otherwise,
* or be null if the end was reached. * it will be null and the second argument will be the payload.
*
* The first call will provide metadata, specifically the time at which
* the data that follows was last updated (`lastUpdate`) and the time at
* which it will be updated next (`nextUpdate`).
*
* Following calls will provide each passing of the dataset individually,
* and will be closed with a call where both arguments are null.
*
* @param callback Called for each passing during parsing.
*/ */
const fetchRealtime = (callback) => const fetchRealtime = callback =>
{ {
axios.get(tamRealtimeEndpoint, { axios.get(tamRealtimeEndpoint, {
responseType: 'stream' responseType: 'stream'
}).then(res => processTamPassingStream(res.data, callback)); }).then(res =>
{
const lastUpdate = new Date(res.headers['last-modified']).getTime();
// Data is advertised as being updated every minute. Add a small
// margin to account for potential delays
const nextUpdate = lastUpdate + 65 * 1000;
callback(null, {lastUpdate, nextUpdate});
processTamPassingStream(res.data, callback);
}).catch(err => callback(err));
}; };
exports.fetchRealtime = fetchRealtime; exports.fetchRealtime = fetchRealtime;
@ -78,7 +97,7 @@ const tamTheoreticalFileName = 'offre_du_jour.csv';
* be non-null only if an error occurred. Second argument will contain passings * be non-null only if an error occurred. Second argument will contain passings
* or be null if the end was reached. * or be null if the end was reached.
*/ */
const fetchTheoretical = (callback) => const fetchTheoretical = callback =>
{ {
axios.get(tamTheoreticalEndpoint, { axios.get(tamTheoreticalEndpoint, {
responseType: 'stream' responseType: 'stream'

View File

@ -1,16 +1,20 @@
const express = require('express'); const express = require('express');
const util = require('./util');
const network = require('./data/network.json'); const network = require('./data/network.json');
const realtime = require('./data/realtime');
const app = express(); const app = express();
const port = 3000; const port = 3000;
app.use(express.static('dist')); app.use(express.static('dist'));
app.get('/realtime', (req, res) => app.get('/courses', async (req, res) =>
{ {
const courses = await realtime.getCourses();
return res.json(courses);
}); });
app.get('/network', async (req, res) => res.json(network)); app.get('/network', (req, res) => res.json(network));
app.listen(port, () => console.log(`App listening on port ${port}`)); app.listen(port, () => console.log(`App listening on port ${port}`));