Skip to content

Commit

Permalink
Merge pull request #21 from allanhvam/dev
Browse files Browse the repository at this point in the history
fix: workflow services optional, docs, lint
  • Loading branch information
allanhvam authored Dec 6, 2024
2 parents ed031ce + 96268b3 commit 7a28c4b
Show file tree
Hide file tree
Showing 20 changed files with 112 additions and 44 deletions.
File renamed without changes.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.2.1",
"version": "0.2.2",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"type": "module",
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ export * from "./worker/WorkflowContext.js";
export * from "./worker/IWorkflowContext.js";
export * from "./workflows/index.js";
export * from "./serialization/index.js";
export * from "./triggers/index.js";
export * from "./triggers/index.js";
4 changes: 2 additions & 2 deletions src/serialization/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
export { serializeError, deserializeError } from "./serialize-error.js";
export { ISerializer } from "./ISerializer.js";
export { DefaultSerializer } from "./DefaultSerializer.js";
export type { ISerializer } from "./ISerializer.js";
export { DefaultSerializer } from "./DefaultSerializer.js";
4 changes: 2 additions & 2 deletions src/stores/DurableFunctionsWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import type { GetInstancesOptions, GetInstancesResult, WorkflowActivity, WorkflowInstance, WorkflowInstanceHeader } from "./IWorkflowHistoryStore.js";
import { type GetTableEntityResponse, TableClient, type TableEntity, type TableEntityResult, TableServiceClient, TableTransaction } from "@azure/data-tables";
import { deserializeError, serializeError } from "../serialization/index.js";
import { AnonymousCredential, BlobServiceClient, StorageSharedKeyCredential, type ContainerClient } from "@azure/storage-blob";
import { BlobServiceClient, type ContainerClient } from "@azure/storage-blob";
import zlib from "zlib";
import { Mutex } from "async-mutex";
import { type ISerializer } from "../serialization/ISerializer.js";
import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore.js";
import { TokenCredential } from "@azure/core-auth";
import { type TokenCredential } from "@azure/core-auth";

interface IDurableFunctionsWorkflowHistory {
Name: string
Expand Down
2 changes: 1 addition & 1 deletion src/stores/FileSystemWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { SerializedWorkflowHistoryStore } from "./SerializedWorkflowHistoryStore

export class FileSystemWorkflowHistoryStore extends SerializedWorkflowHistoryStore {
public readonly name = "file-system";

public workflowHistory: Array<WorkflowInstance> = [];
private readonly options: { path: string };

Expand Down
2 changes: 1 addition & 1 deletion src/stores/IWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ export type GetInstancesResult = Promise<{

export interface IWorkflowHistoryStore {
name: string;

getInstance: (id: string) => Promise<WorkflowInstance | undefined>;
setInstance: (instance: WorkflowInstance) => Promise<void>;
removeInstance: (id: string) => Promise<void>;
Expand Down
2 changes: 1 addition & 1 deletion src/stores/MemoryWorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { WorkflowHistoryStore } from "./WorkflowHistoryStore.js";

export class MemoryWorkflowHistoryStore extends WorkflowHistoryStore {
public readonly name = "memory";

public workflowHistory: Array<WorkflowInstance> = [];

public getInstance = async (id: string): Promise<WorkflowInstance | undefined> => {
Expand Down
2 changes: 1 addition & 1 deletion src/stores/WorkflowHistoryStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { IWorkflowHistoryStore, WorkflowInstance, GetInstancesOptions, GetI

export abstract class WorkflowHistoryStore implements IWorkflowHistoryStore {
abstract name: string;

protected getWorkflowInstanceHeaders = async (instances: Array<WorkflowInstance>, options?: GetInstancesOptions): GetInstancesResult => {
let headers = instances.map(this.getWorkflowInstanceHeader);

Expand Down
2 changes: 1 addition & 1 deletion src/tests/services/MathService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ import { add } from "../activities/add.js";

export class MathService {
public add = add;
}
}
5 changes: 2 additions & 3 deletions src/tests/store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,13 +93,12 @@ void test("Workflow store, getInstances options", async (t) => {
assert.equal(all.length, 100);
});


void test("Workflow store, getInstances error", async (t) => {
// Arrange
const worker = Worker.getInstance();

// Act
let workflowId : string | undefined;
let workflowId: string | undefined;
try {
const handle = await worker.start(throwErrorWorkflow);
workflowId = handle.workflowId;
Expand All @@ -114,4 +113,4 @@ void test("Workflow store, getInstances error", async (t) => {
const instance = instances.instances.find(wi => wi.instanceId === workflowId);
assert.ok(instance, "Expected instance to be found.");
assert.ok(instance.error, "Expected error to be true.");
});
});
51 changes: 51 additions & 0 deletions src/tests/triggers.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { test } from "node:test";
import assert from "node:assert";
import { Worker } from "../worker/Worker.js";
import { DurableFunctionsWorkflowHistoryStore } from "../stores/index.js";
import { startup } from "./workflows/startup.js";
import { sleep } from "../sleep.js";

test.before(async () => {
const worker = Worker.getInstance();

let isStorageEmulatorRunning = false;
try {
const response = await fetch("http://127.0.0.1:10000");
if (response.status === 400) {
isStorageEmulatorRunning = true;
}
} catch {
console.log("Storage emulator not running, using memory.");
}

if (isStorageEmulatorRunning) {
const store = new DurableFunctionsWorkflowHistoryStore({
connectionString: "UseDevelopmentStorage=true",
taskHubName: "Workflows",
});
await store.clear();
worker.store = store;
}
worker.log = (s: string) => console.log(`[${new Date().toISOString()}] ${s}`);
});

void test("startup trigger args", async (t) => {
// Arrange
const workflow = startup;
const worker = Worker.getInstance();
const store = worker.store;

// Act
await workflow.start();
await sleep("1s"); // Wait for trigger to fire

// Assert
const result = await store.getInstances();
const instanceHeader = result.instances.find(i => i.instanceId.startsWith(workflow.name));
assert.ok(instanceHeader);

const instance = await store.getInstance(instanceHeader.instanceId);

assert.ok(instance);
assert.deepEqual(instance?.args, [undefined]);
});
10 changes: 4 additions & 6 deletions src/tests/workflows.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import { DurableFunctionsWorkflowHistoryStore } from "../stores/index.js";
import { math } from "./workflows/math.js";
import { workflows } from "../workflows/index.js";
import ms from "ms";
import { WorkflowWorker } from "../index.js";

test.before(async () => {
const worker = Worker.getInstance();
Expand Down Expand Up @@ -42,13 +41,12 @@ void test("Workflow", async (t) => {

// Assert
assert.equal(result, 3);
assert.ok(workflows.has("math"));
assert.ok(workflows.has(workflow.name));

const now = new Date();
const from = new Date(now.getTime() - ms("2m"));
const instances = await store.getInstances({ filter: { from: from, to: now } });
const mathInstances = instances.instances.filter(i => i.instanceId.indexOf("math ") === 0);
const from = new Date(now.getTime() - ms("2m"));
const instances = await store.getInstances({ filter: { from, to: now } });
const mathInstances = instances.instances.filter(i => i.instanceId.indexOf(`${workflow.name} `) === 0);

assert.ok(mathInstances.length >= 1);
});

3 changes: 1 addition & 2 deletions src/tests/workflows/math.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { manual } from "../../triggers/manual.js";
import { workflow } from "../../workflows/index.js";
import { add } from "../activities/add.js";
import { MathService } from "../services/MathService.js";

export const math = workflow({
Expand All @@ -13,4 +12,4 @@ export const math = workflow({
run: (services) => async () => {
return await services.math.add(1, 2);
},
})
});
10 changes: 10 additions & 0 deletions src/tests/workflows/startup.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { startup as startupTrigger } from "../../triggers/startup.js";
import { workflow } from "../../workflows/index.js";

export const startup = workflow({
name: "startup",
description: "Test for startup trigger.",
trigger: startupTrigger(),
run: () => async () => {
},
});
2 changes: 1 addition & 1 deletion src/triggers/manual.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ export const manual = () => {
console.log(`manual: trigger stop for ${workflow.name}`);
},
} satisfies Trigger<void>;
}
};
8 changes: 4 additions & 4 deletions src/triggers/startup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@ export const startup = () => {
name: "startup",
options: undefined,
description: "Trigger runs on application startup",
start: async (workflow: { name: string }, run) => {
start: async (_, run) => {
const id = new Date().getTime().toString();
void run(id, undefined);
return Promise.resolve();
void run(id);
return await Promise.resolve();
},
stop: (workflow: { name: string }) => {
stop: () => {
},
} satisfies Trigger<void>;
};
2 changes: 1 addition & 1 deletion src/worker/WorkflowFunction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ export type WorkflowHandle<T extends WorkflowFunction> = {
result: () => Promise<WorkflowResultType<T>>
store?: IWorkflowHistoryStore
readonly workflowId: string
}
};
39 changes: 25 additions & 14 deletions src/workflows/index.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Worker as WorkflowWorker } from "../worker/Worker.js";
import { proxyActivities } from "../proxy/proxyActivities.js";
import { OnlyAsync } from "../types/OnlyAsync.js";
import { type OnlyAsync } from "../types/OnlyAsync.js";
import { nanoid } from "nanoid";
import { WorkflowHandle } from "../worker/WorkflowFunction.js";
import { type WorkflowHandle } from "../worker/WorkflowFunction.js";

export type Trigger<P> = {
name: string,
Expand All @@ -20,7 +20,7 @@ export type Workflow<S extends Record<string, object>, P = void> = {
tags?: Array<string>;
disabled?: boolean;
trigger: Trigger<P>;
services: S;
services?: S;
run: (services: Services<S>) => (triggerData: P) => any;
};

Expand All @@ -29,44 +29,55 @@ export const workflows = new Map<string, Workflow<any, any>>();
export const workflow = <S extends Record<string, object>, P = void>(workflow: Workflow<S, P>) => {
workflows.set(workflow.name, workflow);

const runInternal = async (id: string, services: S, triggerData: P): Promise<WorkflowHandle<(triggerData: P) => any>> => {
const runInternal = async (id: string, services: S | undefined, triggerData: P): Promise<WorkflowHandle<(triggerData: P) => any>> => {
// Proxy services
const proxies = {} as any;
Object.keys(services).forEach((key) => {
proxies[key] = proxyActivities(services[key], { retry: 5 });
});
if (services) {
Object.keys(services).forEach((key) => {
proxies[key] = proxyActivities(services[key], { retry: 5 });
});
}

const worker = WorkflowWorker.getInstance();
const handle = await worker.start(workflow.run(proxies), {
workflowId: `${workflow.name} ${id}`,
args: [triggerData],
});
return handle;
}
};

return {
// Start the workflow trigger
name: workflow.name,
description: workflow.description,
tags: workflow.tags,
/**
* Start the workflow trigger
*/
start: async () => {
const run = async (id: string, payload: any) => {
const worker = WorkflowWorker.getInstance();
if (workflow.disabled) {
worker.log?.(`Workflow '${workflow.name}' disabled, skip`);
return;
}
return runInternal(id, workflow.services, payload);
}
return await runInternal(id, workflow.services, payload);
};

await workflow.trigger.start(workflow, run);
},
// Run workflow
/**
* Run workflow
*/
run: (services: S) => async (triggerData: P) => {
const handle = await runInternal(nanoid(), services, triggerData);
return await handle.result();
},
// Invoke workflow with trigger data
/**
* Invoke workflow with trigger data
*/
invoke: async (triggerData: P) => {
const handle = await runInternal(nanoid(), workflow.services, triggerData);
return await handle.result();
},
};
};
};

0 comments on commit 7a28c4b

Please sign in to comment.