Skip to content

Commit

Permalink
update status and cancel date
Browse files Browse the repository at this point in the history
  • Loading branch information
v9n committed Nov 24, 2023
1 parent 80f85b2 commit cf44f76
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 6 deletions.
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,6 @@ query {
}
}
}
```
```

# Local development
58 changes: 53 additions & 5 deletions mixer/src/etl/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import client from "../db";


const MAYBE_NEW_TASK = "new_task";
const DEBUG = process.env["DEBUG"] == "1";

export interface ChangeColumn {
name: string;
Expand All @@ -16,6 +17,15 @@ export interface ChangeRow {
columns: ChangeColumn[];
}

enum TaskStatus {
Active = 'active',
Completed = 'completed',
Canceled = 'canceled',
}

const TaskScheduledEvent = 'TaskScheduled',
TaskCanceledEvent = 'TaskCancelled',
TaskCompletedEvent = 'TaskCompleted';

export const processChange = async (doc: ChangeRow, api): Promise<void|string[]> => {
if (doc.action != "I") {
Expand All @@ -32,9 +42,18 @@ export const processChange = async (doc: ChangeRow, api): Promise<void|string[]>
if (doc.columns.some(a => a.name == "module" && a.value == "automationTime")) {
const rawData = doc.columns.find(a => a.name == "data");
const data = JSON.parse(rawData?.value || '{}');
console.log("change doc: ", doc, data);
console.log("automationTime event", data);
if (data?.taskId) {
await populateTask();
if (doc.columns.some(a => a.name == "method" && a.value == TaskScheduledEvent)) {
// TODO: This query isn't efficent enough, it doesn't track last populate.
await populateTask();
}
if (doc.columns.some(a => a.name == "method" && a.value == TaskCompletedEvent)) {
await updateTaskStatus(TaskStatus.Completed);
}
if (doc.columns.some(a => a.name == "method" && a.value == TaskCanceledEvent)) {
await updateTaskStatus(TaskStatus.Canceled);
}
}
}
}
Expand Down Expand Up @@ -89,14 +108,43 @@ export const populateTask = async() => {
insert into tasks (
id, block_height, event_id, extrinsic_id, timestamp,
creator_id,
creator_id, status,
_id, _block_range)
select
d.task_id, d.block_height, d.event_id, d.extrinsic_id, d.timestamp,
d.task_creator_id,
-- when task first scheduled, it's in actived status
d.task_creator_id, $1,
to_uuid(d.task_id), int8range(d.block_height::int8, null)
from data as d
on conflict do nothing;
`);
`, [TaskStatus.Active]);
}

export const updateTaskStatus = async(status: TaskStatus) => {
let method: string = TaskCompletedEvent;

if (status == TaskStatus.Canceled) {
method = TaskCanceledEvent;
};

const query = `
with filter_tasks as (
select
event_id, task_id, method, timestamp as event_at
from task_events
inner join tasks on tasks.id=task_events.task_id and tasks.status !=30
where module = 'automationTime' and method = $1
)
update tasks
set status = $2
completed_at = c.event_at
from filter_tasks as c
where c.task_id = tasks.id and (status is null or status != $2)
`;

try {
await client.query(query, [method, status]);
} catch (e) {
console.log("error when updating task status", e, query)
}
}
2 changes: 2 additions & 0 deletions mixer/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ const listener = new Wal2JSONListener(

const listen = async() => {
// Now listen to the change
console.log("listen to changes");
listener.start();

try {
console.log("wait to next changes");
for await (const change of listener.next()) {
if (!change.data) {
continue;
Expand Down

0 comments on commit cf44f76

Please sign in to comment.