diff --git a/back/data/realtime.js b/back/data/realtime.js index 27c50bc..c059448 100644 --- a/back/data/realtime.js +++ b/back/data/realtime.js @@ -1,177 +1,73 @@ -const axios = require('axios'); -const csv = require('csv-parse'); +const tam = require('./sources/tam'); +const util = require('../util'); -const network = require('./network'); -const {TAM_REALTIME} = require('./endpoints'); +/** + * Fetch real-time information about courses in the TaM network. + * + * New data will only be fetched from the TaM server if necessary, otherwise + * pulling from the in-memory cache. + * + * @return Mapping from active course IDs to current information about the + * course, including its line number, next stop and next stop estimated + * time of arrival (ETA). + */ +let nextUpdate = null; +let currentCourses = null; -const sortByFirstKey = (a, b) => a[0] - b[0]; - -const fetchRealtime = () => new Promise((res, rej) => +const getCourses = () => new Promise((res, rej) => { - const stream = axios.get(TAM_REALTIME, { - responseType: 'stream' - }).then(stream => + if (nextUpdate !== null && Date.now() < nextUpdate) { - const parser = csv({ - delimiter: ';', - }); + res(currentCourses); + return; + } - const courses = {}; - stream.pipe(parser); + const courses = {}; + let lastUpdate = null; - stream.on('readable', () => + tam.fetchRealtime((err, entry) => + { + if (err) { - let row; + rej(err); + return; + } - while (row = stream.read()) - { - if (row.length === 0 || row[0] === 'course') - { - // Ignore les lignes invalides et l’en-tête - continue; - } + if (!util.isObject(entry)) + { + currentCourses = courses; + res(currentCourses); + return; + } - const course = row[0]; - const stopRef = row[2]; - const lineRef = row[4]; - const eta = row[9]; - const destinationRef = row[10]; + if ('lastUpdate' in entry) + { + lastUpdate = entry.lastUpdate; + nextUpdate = entry.nextUpdate; + return; + } - if (!(course in courses)) - { - courses[course] = { - lineRef, - destinationRef, - stops: [], - }; - } + const { + course: id, + routeShortName: line, + stopId: nextStop, + destArCode: finalStop, + } = entry; - courses[course].stops.push([parseInt(eta, 10), stopRef]); - courses[course].stops.sort(sortByFirstKey); - } - }); + const arrivalTime = lastUpdate + parseInt(entry.delaySec, 10) * 1000; - stream.on('end', () => res(courses)); - stream.on('error', err => rej(err)); + if (!(id in courses)) + { + courses[id] = {id, line, nextStop, arrivalTime, finalStop}; + } + else if (arrivalTime < courses[id].arrivalTime) + { + // The stop where the next passing is soonest is assumed + // to be the next stop + courses[id].nextStop = nextStop; + courses[id].arrivalTime = arrivalTime; + } }); }); -const updateVehicles = async (lines, vehicles) => -{ - const courses = await fetchRealtime(); - const currentTime = Math.floor(Date.now() / 1000); - - for (let [courseRef, course] of Object.entries(courses)) - { - if (course.lineRef in lines) - { - if (!(courseRef in vehicles)) - { - // New vehicle: identify which route it pertains to - const line = lines[course.lineRef]; - let routeIndex = null; - - for (let [index, route] of Object.entries(line.routes)) - { - const destRef = route.stops[route.stops.length - 1].ref; - - if (destRef === course.destinationRef) - { - routeIndex = index; - } - } - - if (routeIndex !== null) - { - const route = line.routes[routeIndex]; - - // Convert ETAs to absolute times - const nextStops = course.stops.map(([eta, ref]) => [ - eta + currentTime, - ref - ]); - - // Convert stop refs to indices - const stopIndices = course.stops.map(([eta, ref]) => [ - eta, - ]); - - // Find the preceding stop from which the vehicle is coming - const arrivingStop = stopIndices[0][1]; - const arrivingStopIndex = route.stops.findIndex( - stop => stop.ref === arrivingStop - ); - - const leavingStop = arrivingStopIndex === 0 - ? route.stops[0] - : route.stops[arrivingStopIndex - 1]; - - if (nextStop === 0) - { - // Vehicle at starting point - vehicles[courseRef] = { - lineRef: course.lineRef, - stopRef - - stopIndex: 0, - nextStops: stopIndices, - - distance: 0, - speed: 0, - }; - } - else - { - // Vehicle in transit between two stops - vehicles[courseRef] = { - lineRef: course.lineRef, - routeIndex, - - stopIndex: nextStop - 1, - nextStops: stopIndices, - - distance: 0, - speed: route.distances[nextStop - 1] / eta, - }; - } - } - } - else - { - // Existing vehicle: update information - const vehicle = vehicles[courseRef]; - - const line = lines[vehicle.lineRef]; - const route = line.routes[vehicle.routeIndex]; - - // Convert stop refs to indices - const stopIndices = course.stops.map(([eta, ref]) => [ - eta, - route.stops.findIndex(stop => stop.ref === ref), - ]); - - console.log(stopIndices); - console.log(vehicle); - console.log(course); - console.log('---'); - } - } - } -}; - -const sleep = time => new Promise(res => setTimeout(res, time)); - -const updateLoop = async (lines, vehicles = {}) => -{ - await updateVehicles(lines, vehicles); - await sleep(30000); - return updateLoop(lines, vehicles); -}; - -(async () => -{ - const lines = {'1': await network.fetchLineData('1')}; - updateLoop(lines); - - // console.log(require('util').inspect(vehicles, true, 10)); -})(); +exports.getCourses = getCourses; diff --git a/back/data/sources/tam.js b/back/data/sources/tam.js index 2870991..c528454 100644 --- a/back/data/sources/tam.js +++ b/back/data/sources/tam.js @@ -52,17 +52,36 @@ const processTamPassingStream = (csvStream, callback) => const tamRealtimeEndpoint = 'http://data.montpellier3m.fr/node/10732/download'; /** - * Fetch realtime passings for the current day across the network. + * Fetch realtime passings across the network. * - * @param callback Called for each passing during parsing. First argument will - * be non-null only if an error occurred. Second argument will contain passings - * or be null if the end was reached. + * The callback always receives two arguments. If an error occurs, the first + * argument will contain an object with information about the error. Otherwise, + * it will be null and the second argument will be the payload. + * + * The first call will provide metadata, specifically the time at which + * the data that follows was last updated (`lastUpdate`) and the time at + * which it will be updated next (`nextUpdate`). + * + * Following calls will provide each passing of the dataset individually, + * and will be closed with a call where both arguments are null. + * + * @param callback Called for each passing during parsing. */ -const fetchRealtime = (callback) => +const fetchRealtime = callback => { axios.get(tamRealtimeEndpoint, { responseType: 'stream' - }).then(res => processTamPassingStream(res.data, callback)); + }).then(res => + { + const lastUpdate = new Date(res.headers['last-modified']).getTime(); + + // Data is advertised as being updated every minute. Add a small + // margin to account for potential delays + const nextUpdate = lastUpdate + 65 * 1000; + + callback(null, {lastUpdate, nextUpdate}); + processTamPassingStream(res.data, callback); + }).catch(err => callback(err)); }; exports.fetchRealtime = fetchRealtime; @@ -78,7 +97,7 @@ const tamTheoreticalFileName = 'offre_du_jour.csv'; * be non-null only if an error occurred. Second argument will contain passings * or be null if the end was reached. */ -const fetchTheoretical = (callback) => +const fetchTheoretical = callback => { axios.get(tamTheoreticalEndpoint, { responseType: 'stream' diff --git a/back/index.js b/back/index.js index b93e57d..ce76faa 100644 --- a/back/index.js +++ b/back/index.js @@ -1,16 +1,20 @@ const express = require('express'); +const util = require('./util'); const network = require('./data/network.json'); +const realtime = require('./data/realtime'); const app = express(); const port = 3000; app.use(express.static('dist')); -app.get('/realtime', (req, res) => +app.get('/courses', async (req, res) => { + const courses = await realtime.getCourses(); + return res.json(courses); }); -app.get('/network', async (req, res) => res.json(network)); +app.get('/network', (req, res) => res.json(network)); app.listen(port, () => console.log(`App listening on port ${port}`));