diff --git a/README.md b/README.md index eb86a84..45dadfd 100644 --- a/README.md +++ b/README.md @@ -128,4 +128,6 @@ query { } } } -``` \ No newline at end of file +``` + +# Local development \ No newline at end of file diff --git a/mixer/src/etl/index.ts b/mixer/src/etl/index.ts index 9eaff7a..7dbce27 100644 --- a/mixer/src/etl/index.ts +++ b/mixer/src/etl/index.ts @@ -2,6 +2,7 @@ import client from "../db"; const MAYBE_NEW_TASK = "new_task"; +const DEBUG = process.env["DEBUG"] == "1"; export interface ChangeColumn { name: string; @@ -16,6 +17,15 @@ export interface ChangeRow { columns: ChangeColumn[]; } +enum TaskStatus { + Active = 'active', + Completed = 'completed', + Canceled = 'canceled', +} + +const TaskScheduledEvent = 'TaskScheduled', + TaskCanceledEvent = 'TaskCancelled', + TaskCompletedEvent = 'TaskCompleted'; export const processChange = async (doc: ChangeRow, api): Promise => { if (doc.action != "I") { @@ -32,9 +42,18 @@ export const processChange = async (doc: ChangeRow, api): Promise if (doc.columns.some(a => a.name == "module" && a.value == "automationTime")) { const rawData = doc.columns.find(a => a.name == "data"); const data = JSON.parse(rawData?.value || '{}'); - console.log("change doc: ", doc, data); + console.log("automationTime event", data); if (data?.taskId) { - await populateTask(); + if (doc.columns.some(a => a.name == "method" && a.value == TaskScheduledEvent)) { + // TODO: This query isn't efficent enough, it doesn't track last populate. + await populateTask(); + } + if (doc.columns.some(a => a.name == "method" && a.value == TaskCompletedEvent)) { + await updateTaskStatus(TaskStatus.Completed); + } + if (doc.columns.some(a => a.name == "method" && a.value == TaskCanceledEvent)) { + await updateTaskStatus(TaskStatus.Canceled); + } } } } @@ -89,14 +108,43 @@ export const populateTask = async() => { insert into tasks ( id, block_height, event_id, extrinsic_id, timestamp, - creator_id, + creator_id, status, _id, _block_range) select d.task_id, d.block_height, d.event_id, d.extrinsic_id, d.timestamp, - d.task_creator_id, + -- when task first scheduled, it's in actived status + d.task_creator_id, $1, to_uuid(d.task_id), int8range(d.block_height::int8, null) from data as d on conflict do nothing; - `); + `, [TaskStatus.Active]); +} + +export const updateTaskStatus = async(status: TaskStatus) => { + let method: string = TaskCompletedEvent; + + if (status == TaskStatus.Canceled) { + method = TaskCanceledEvent; + }; + + const query = ` + with filter_tasks as ( + select + event_id, task_id, method, timestamp as event_at + from task_events + inner join tasks on tasks.id=task_events.task_id and tasks.status !=30 + where module = 'automationTime' and method = $1 + ) + update tasks + set status = $2 + completed_at = c.event_at + from filter_tasks as c + where c.task_id = tasks.id and (status is null or status != $2) + `; + try { + await client.query(query, [method, status]); + } catch (e) { + console.log("error when updating task status", e, query) + } } diff --git a/mixer/src/index.ts b/mixer/src/index.ts index 7254337..0923464 100644 --- a/mixer/src/index.ts +++ b/mixer/src/index.ts @@ -64,9 +64,11 @@ const listener = new Wal2JSONListener( const listen = async() => { // Now listen to the change + console.log("listen to changes"); listener.start(); try { + console.log("wait to next changes"); for await (const change of listener.next()) { if (!change.data) { continue;