Skip to content

Commit

Permalink
Merge pull request #22 from allanhvam/dev
Browse files Browse the repository at this point in the history
feat: trigger output type
  • Loading branch information
allanhvam authored Dec 18, 2024
2 parents 7a28c4b + 1197ae6 commit 393d0cc
Show file tree
Hide file tree
Showing 12 changed files with 79 additions and 31 deletions.
3 changes: 2 additions & 1 deletion .eslintrc.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ module.exports = {
"@typescript-eslint/consistent-type-definitions": "off",
"@typescript-eslint/no-unused-vars": "warn",
"@typescript-eslint/no-extraneous-class": "off",
"@typescript-eslint/member-delimiter-style": "off"
"@typescript-eslint/member-delimiter-style": "off",
"@typescript-eslint/explicit-function-return-type": "off"
}
}
13 changes: 8 additions & 5 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.2.2",
"version": "0.3.0",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"type": "module",
Expand Down Expand Up @@ -28,14 +28,14 @@
"eslint-plugin-import": "^2.29.1",
"eslint-plugin-n": "^16.5.0",
"eslint-plugin-promise": "^6.1.1",
"ms": "^2.1.3",
"superjson": "^2.2.1",
"typescript": "^5.3.3"
},
"dependencies": {
"@azure/data-tables": "^13.0.1",
"@azure/storage-blob": "^12.10.0",
"async-mutex": "^0.4.0",
"ms": "^2.1.3",
"nanoid": "^5.0.7"
}
}
1 change: 1 addition & 0 deletions src/tests/triggers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export { math } from "./math.js";
14 changes: 14 additions & 0 deletions src/tests/triggers/math.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { type Trigger } from "../../index.js";

export const math = () => {
return {
name: "math",
options: undefined,
start: (workflow: { name: string }, run) => {
console.log(`math: trigger start for ${workflow.name}`);
},
stop: (workflow: { name: string }) => {
console.log(`math: trigger stop for ${workflow.name}`);
},
} satisfies Trigger<number, number>;
};
12 changes: 12 additions & 0 deletions src/tests/workflows.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { DurableFunctionsWorkflowHistoryStore } from "../stores/index.js";
import { math } from "./workflows/math.js";
import { workflows } from "../workflows/index.js";
import ms from "ms";
import { addTow } from "./workflows/add-two.js";

test.before(async () => {
const worker = Worker.getInstance();
Expand Down Expand Up @@ -50,3 +51,14 @@ void test("Workflow", async (t) => {

assert.ok(mathInstances.length >= 1);
});

void test("Workflow add-tow", async (t) => {
// Arrange
const workflow = addTow;

// Act
const result = await workflow.invoke(2) satisfies number;

// Assert
assert.equal(result, 4);
});
16 changes: 16 additions & 0 deletions src/tests/workflows/add-two.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { workflow } from "../../workflows/index.js";
import { MathService } from "../services/MathService.js";
import { math } from "../triggers/index.js";

export const addTow = workflow({
name: "add-tow",
description: "Adds 2 to argument.",
trigger: math(),
services: {
math: new MathService(),
},
run: (services) => async (input: number) => {
const output = await services.math.add(input, 2);
return output;
},
});
2 changes: 1 addition & 1 deletion src/triggers/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
export { manual } from "./manual.js";
export { startup } from "./startup.js";
export { startup } from "./startup.js";
14 changes: 7 additions & 7 deletions src/worker/IWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,25 @@ export declare type WithWorkflowArgs<W extends WorkflowFunction, T> = T & (Param
/**
* Arguments to pass to the Workflow
*/
args?: Parameters<W>
args?: Parameters<W>;
});

export declare type WorkflowOptions = {
workflowId?: string
workflowId?: string;
/**
* @format {@link https://www.npmjs.com/package/ms | ms} formatted string or number of milliseconds
*/
workflowExecutionTimeout?: string | number
workflowExecutionTimeout?: string | number;
/**
* Store for the workflow instance, overwrites the global instance (if set)
*/
store?: IWorkflowHistoryStore
store?: IWorkflowHistoryStore;
};

export declare type WorkflowStartOptions<T extends WorkflowFunction = WorkflowFunction> = WithWorkflowArgs<T, WorkflowOptions>;

export interface IWorker {
start: <T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>) => Promise<WorkflowHandle<T>>
store: IWorkflowHistoryStore
log?: (s: string) => void
start: <T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>) => Promise<WorkflowHandle<T>>;
store: IWorkflowHistoryStore;
log?: (s: string) => void;
}
4 changes: 2 additions & 2 deletions src/worker/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AsyncLocalStorage } from "async_hooks";
import { type IWorkflowContext } from "./IWorkflowContext.js";
import { type IWorkflowHistoryStore } from "../stores/IWorkflowHistoryStore.js";
import { MemoryWorkflowHistoryStore } from "../stores/MemoryWorkflowHistoryStore.js";
import { type WorkflowHandle, type WorkflowFunction, type WorkflowResultType, type WorkflowFunctionReturnType } from "./WorkflowFunction.js";
import { type WorkflowHandle, type WorkflowFunction, type WorkflowResultType } from "./WorkflowFunction.js";
import msPkg from "ms";
import { Mutex } from "async-mutex";
import { sleep } from "../sleep.js";
Expand Down Expand Up @@ -96,7 +96,7 @@ export class Worker implements IWorker {
await store?.setInstance(workflowInstance);
}

let promise: WorkflowFunctionReturnType = Worker.asyncLocalStorage.run(workflowContext, async () => {
let promise = Worker.asyncLocalStorage.run(workflowContext, async () => {
let result: any;
let error: any;
let isError = false;
Expand Down
10 changes: 4 additions & 6 deletions src/worker/WorkflowFunction.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
import { type IWorkflowHistoryStore } from "../stores/IWorkflowHistoryStore.js";

export type WorkflowFunctionReturnType = Promise<any>;

export type WorkflowFunction = (...args: any[]) => WorkflowFunctionReturnType;
export type WorkflowFunction = (...args: any[]) => Promise<any>;

export type WorkflowResultType<W extends WorkflowFunction> = ReturnType<W> extends Promise<infer R> ? R : never;

export type WorkflowHandle<T extends WorkflowFunction> = {
result: () => Promise<WorkflowResultType<T>>
store?: IWorkflowHistoryStore
readonly workflowId: string
result: () => Promise<WorkflowResultType<T>>;
store?: IWorkflowHistoryStore;
readonly workflowId: string;
};
17 changes: 10 additions & 7 deletions src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,32 +4,34 @@ import { type OnlyAsync } from "../types/OnlyAsync.js";
import { nanoid } from "nanoid";
import { type WorkflowHandle } from "../worker/WorkflowFunction.js";

export type Trigger<P> = {
// P: Payload
// O: Output
export type Trigger<P, O = unknown> = {
name: string,
options: any,
description?: string,
start: (workflow: any, run: (id: string, triggerData: P) => Promise<WorkflowHandle<(triggerData: P) => any> | undefined>) => void | Promise<void>;
start: (workflow: any, run: (id: string, triggerData: P) => Promise<WorkflowHandle<(triggerData: P) => Promise<O>> | undefined>) => void | Promise<void>;
stop?: (workflow: any) => void | Promise<void>;
};

export type Services<T> = { [P in keyof T]: OnlyAsync<T[P]> };

export type Workflow<S extends Record<string, object>, P = void> = {
export type Workflow<S extends Record<string, object>, P = void, O = unknown> = {
name: string;
description?: string;
tags?: Array<string>;
disabled?: boolean;
trigger: Trigger<P>;
services?: S;
run: (services: Services<S>) => (triggerData: P) => any;
run: (services: Services<S>) => (triggerData: P) => Promise<O>;
};

export const workflows = new Map<string, Workflow<any, any>>();

export const workflow = <S extends Record<string, object>, P = void>(workflow: Workflow<S, P>) => {
export const workflow = <S extends Record<string, object>, P = void, O = unknown>(workflow: Workflow<S, P, O>) => {
workflows.set(workflow.name, workflow);

const runInternal = async (id: string, services: S | undefined, triggerData: P): Promise<WorkflowHandle<(triggerData: P) => any>> => {
const runInternal = async (id: string, services: S | undefined, triggerData: P) => {
// Proxy services
const proxies = {} as any;
if (services) {
Expand All @@ -39,6 +41,7 @@ export const workflow = <S extends Record<string, object>, P = void>(workflow: W
}

const worker = WorkflowWorker.getInstance();
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
const handle = await worker.start(workflow.run(proxies), {
workflowId: `${workflow.name} ${id}`,
args: [triggerData],
Expand All @@ -54,7 +57,7 @@ export const workflow = <S extends Record<string, object>, P = void>(workflow: W
* Start the workflow trigger
*/
start: async () => {
const run = async (id: string, payload: any) => {
const run = async (id: string, payload: P) => {
const worker = WorkflowWorker.getInstance();
if (workflow.disabled) {
worker.log?.(`Workflow '${workflow.name}' disabled, skip`);
Expand Down

0 comments on commit 393d0cc

Please sign in to comment.