Skip to content

Commit

Permalink
Refactor dts-process.js and add dtsProcessEventRule.json
Browse files Browse the repository at this point in the history
  • Loading branch information
nikiwycherley committed Apr 30, 2024
1 parent 0c2df83 commit 6a73f0e
Show file tree
Hide file tree
Showing 4 changed files with 165 additions and 0 deletions.
7 changes: 7 additions & 0 deletions config/dtsProcessEventRule.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"Description": "Event rule to schedule the dtsProcess lambda execution",
"Name": "{PLACEHOLDER}",
"RoleArn": "{PLACEHOLDER}",
"ScheduleExpression": "{PLACEHOLDER}",
"State": "ENABLED"
}
129 changes: 129 additions & 0 deletions lib/functions/dts-process.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
const parseStation = require('../models/parse-time-series')
const logger = require('../helpers/logging')
const pg = require('../helpers/db')
const invokeLambda = require('../helpers/invoke-lambda')
const Joi = require('joi')
const axios = require('axios')

async function deleteStation (stationId) {
try {
await pg('station_display_time_series').where({ station_id: stationId }).delete()
logger.info(`Deleted data for RLOI id ${stationId}`)
} catch (error) {
logger.error(`Error deleting data for station ${stationId}`, error)
throw error
}
}

async function insertStation (stationDataArray) {
try {
await pg.transaction(async trx => {
await Promise.all(stationDataArray.map(async (stationData) => {
await trx('station_display_time_series').where({ station_id: stationData.station_id }).delete()
await trx('station_display_time_series').insert(stationData)
logger.info(`Processed displayTimeSeries for RLOI id ${stationData.station_id}`)
}))
})
} catch (error) {
logger.error('Database error processing stationData for', error)
throw error
}
}

async function getImtdApiResponse (stationId) {
const hostname = 'imfs-prd1-thresholds-api.azurewebsites.net'
try {
return await axios.get(`https://${hostname}/Location/${stationId}?version=2`)
} catch (error) {
if (error.response?.status === 404) {
logger.info(`Station ${stationId} not found (HTTP Status: 404)`)
} else {
const message = error.response?.status ? `HTTP Status: ${error.response.status}` : `Error: ${error.message}`
throw Error(`IMTD API request for station ${stationId} failed (${message})`)
}
return {}
}
}

async function getStationData (stationId) {
const response = await getImtdApiResponse(stationId)
if (response.data) {
return parseStation(response.data[0].TimeSeriesMetaData, stationId)
}
return []
}

async function getData (stationId) {
try {
const stationData = await getStationData(stationId)
await validateStationData(stationData)
await insertStation(stationData)
} catch (error) {
logger.error(`Could not process data for station ${stationId} (${error.message})`)
await deleteStation(stationId)
}
}

async function validateStationData (stationDataArray) {
const schema = Joi.object({
station_id: Joi.number().required(),
direction: Joi.string().required(),
display_time_series: Joi.boolean().required()
})

try {
const validatedData = await Promise.all(
stationDataArray.map((stationData) => schema.validateAsync(stationData))
)
return validatedData
} catch (error) {
throw new Error(`Validation error: ${error.message}`)
}
}

async function getRloiIds ({ limit, offset } = {}) {
try {
logger.info(`Retrieving up to ${limit} rloi_ids with an offset of ${offset}`)
const result = await pg('rivers_mview')
.distinct('rloi_id')
.whereNotNull('rloi_id')
.orderBy('rloi_id', 'asc')
.limit(limit)
.offset(offset)
logger.info(`Retrieved ${result.length} rloi_ids`)
return result
} catch (error) {
throw Error(`Could not get list of id's from database (Error: ${error.message})`)
}
}

async function handler ({ offset = 0 } = {}) {
const BATCH_SIZE = parseInt(process.env.IMTD_BATCH_SIZE || '500')

const stations = await getRloiIds({
offset,
limit: BATCH_SIZE
})

for (const station of stations) {
await getData(station.rloi_id)
}

if (stations.length >= BATCH_SIZE) {
const functionName = process.env.AWS_LAMBDA_FUNCTION_NAME
const newOffset = offset + BATCH_SIZE
logger.info(`Invoking ${functionName} with an offset of ${newOffset}`)

await invokeLambda(functionName, {
offset: newOffset
})
}
}

module.exports.handler = handler

process.on('SIGTERM', async () => {
logger.info('SIGTERM received, destroying DB connection')
await pg.destroy()
process.exit(0)
})
25 changes: 25 additions & 0 deletions lib/models/parse-time-series.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/**
* @param {Object} data - The data to be parsed.
* @returns {Object} - The processed data.
*/
function parseTimeSeries (data, stationId) {
if (!data) {
return {}
}

const processedData = data.map((item) => ({
station_id: stationId,
direction: item.qualifier === 'Downstream Stage' ? 'd' : 'u',
display_time_series: item.DisplayTimeSeries
}))

const uniqueProcessedData = processedData.filter((item, index, self) =>
index === self.findIndex((t) => (
t.station_id === item.station_id && t.direction === item.direction
))
)

return uniqueProcessedData
}

module.exports = parseTimeSeries
4 changes: 4 additions & 0 deletions serverless.yml
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,7 @@ functions:
name: ${env:LFW_DATA_TARGET_ENV_NAME}${self:service}-imtdProcess
handler: lib/functions/imtd-process.handler
timeout: 900
dtsProcess:
name: ${env:LFW_DATA_TARGET_ENV_NAME}${self:service}-dtsProcess
handler: lib/functions/dts-process.handler
timeout: 900

0 comments on commit 6a73f0e

Please sign in to comment.