const unzip = require('unzip-stream'); const csv = require('csv-parse'); const axios = require('axios'); const overpassEndpoint = 'https://lz4.overpass-api.de/api/interpreter'; /** * Submit an Overpass query. * * @async * @param query Query in Overpass QL. * @return Results as provided by the endpoint. */ const queryOverpass = query => axios.post( overpassEndpoint, 'data=' + query ).then(res => res.data); exports.queryOverpass = queryOverpass; /** * Process a CSV stream to extract passings. * * @private * @param csvStream Stream containing CSV data. * @param callback See fetchTamRealtime for a description of the callback. */ const processTamPassingStream = (csvStream, callback) => { const parser = csv({ delimiter: ';', }); const rowStream = csvStream.pipe(parser); rowStream.on('readable', () => { let row; while ((row = rowStream.read())) { if (row.length === 0 || row[0] === 'course') { // Ignore les lignes invalides et l’en-tête continue; } callback(null, { course: row[0], stopCode: row[1], stopId: row[2], stopName: row[3], routeShortName: row[4], tripHeadsign: row[5], directionId: row[6], departureTime: row[7], isTheorical: row[8], delaySec: row[9], destArCode: row[10], }); } }); rowStream.on('end', () => callback(null, null)); rowStream.on('error', err => callback(err)); }; const tamRealtimeEndpoint = 'http://data.montpellier3m.fr/node/10732/download'; /** * Fetch realtime passings for the current day across the network. * * @param callback Called for each passing during parsing. First argument will * be non-null only if an error occurred. Second argument will contain passings * or be null if the end was reached. */ const fetchTamRealtime = (callback) => { axios.get(tamRealtimeEndpoint, { responseType: 'stream' }).then(res => processTamPassingStream(res.data, callback)); }; exports.fetchTamRealtime = fetchTamRealtime; const tamTheoreticalEndpoint = 'http://data.montpellier3m.fr/node/10731/download'; const tamTheoreticalFileName = 'offre_du_jour.csv'; /** * Fetch theoretical passings for the current day across the network. * * @param callback Called for each passing during parsing. First argument will * be non-null only if an error occurred. Second argument will contain passings * or be null if the end was reached. */ const fetchTamTheoretical = (callback) => { axios.get(tamTheoreticalEndpoint, { responseType: 'stream' }).then(res => { const fileStream = res.data.pipe(unzip.Parse()); fileStream.on('entry', entry => { if (entry.type !== 'File' || entry.path !== tamTheoreticalFileName) { entry.autodrain(); return; } processTamPassingStream(entry, callback); }); fileStream.on('error', err => callback(err)); }); }; exports.fetchTamTheoretical = fetchTamTheoretical;