const unzip = require('unzip-stream'); const csv = require('csv-parse'); const axios = require('axios'); /** * Process a CSV stream to extract passings. * * @private * @param csvStream Stream containing CSV data. * @param callback See fetchRealtime for a description of the callback. */ const processTamPassingStream = (csvStream, callback) => { const parser = csv({ delimiter: ';', }); const rowStream = csvStream.pipe(parser); rowStream.on('readable', () => { let row; while ((row = rowStream.read())) { if (row.length === 0 || row[0] === 'course') { // Ignore les lignes invalides et l’en-tête continue; } callback(null, { course: row[0], stopCode: row[1], stopId: row[2], stopName: row[3], routeShortName: row[4], tripHeadsign: row[5], directionId: row[6], departureTime: row[7], isTheorical: row[8], delaySec: row[9], destArCode: row[10], }); } }); rowStream.on('end', () => callback(null, null)); rowStream.on('error', err => callback(err)); }; const tamRealtimeEndpoint = 'http://data.montpellier3m.fr/node/10732/download'; /** * Fetch realtime passings across the network. * * The callback always receives two arguments. If an error occurs, the first * argument will contain an object with information about the error. Otherwise, * it will be null and the second argument will be the payload. * * The first call will provide metadata, specifically the time at which * the data that follows was last updated (`lastUpdate`) and the time at * which it will be updated next (`nextUpdate`). * * Following calls will provide each passing of the dataset individually, * and will be closed with a call where both arguments are null. * * @param callback Called for each passing during parsing. */ const fetchRealtime = callback => { axios.get(tamRealtimeEndpoint, { responseType: 'stream' }).then(res => { const lastUpdate = new Date(res.headers['last-modified']).getTime(); // Data is advertised as being updated every minute. Add a small // margin to account for potential delays const nextUpdate = lastUpdate + 65 * 1000; callback(null, {lastUpdate, nextUpdate}); processTamPassingStream(res.data, callback); }).catch(err => callback(err)); }; exports.fetchRealtime = fetchRealtime; const tamTheoreticalEndpoint = 'http://data.montpellier3m.fr/node/10731/download'; const tamTheoreticalFileName = 'offre_du_jour.csv'; /** * Fetch theoretical passings for the current day across the network. * * @param callback Called for each passing during parsing. First argument will * be non-null only if an error occurred. Second argument will contain passings * or be null if the end was reached. */ const fetchTheoretical = callback => { axios.get(tamTheoreticalEndpoint, { responseType: 'stream' }).then(res => { const fileStream = res.data.pipe(unzip.Parse()); fileStream.on('entry', entry => { if (entry.type !== 'File' || entry.path !== tamTheoreticalFileName) { entry.autodrain(); return; } processTamPassingStream(entry, callback); }); fileStream.on('error', err => callback(err)); }); }; exports.fetchTheoretical = fetchTheoretical;