Skip to content

Commit

Permalink
Merge pull request #19 from allanhvam/dev
Browse files Browse the repository at this point in the history
feat!: trigger run will now return workflow handle
  • Loading branch information
allanhvam authored Sep 28, 2024
2 parents 770e04d + d54a721 commit fe4b162
Show file tree
Hide file tree
Showing 7 changed files with 34 additions and 31 deletions.
29 changes: 15 additions & 14 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "simple-workflows",
"version": "0.1.0",
"version": "0.2.0",
"description": "Workflows as code in TypeScript",
"main": "lib/index.js",
"type": "module",
Expand Down
3 changes: 1 addition & 2 deletions src/triggers/startup.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { nanoid } from "nanoid";
import type { Trigger } from "../workflows/index.js";

export const startup = () => {
return {
name: "startup",
options: undefined,
description: "Trigger runs on application startup",
start: async (workflow: { name: string }, run: (id: string, _: any) => Promise<void>) => {
start: async (workflow: { name: string }, run) => {
const id = new Date().getTime().toString();
void run(id, undefined);
return Promise.resolve();
Expand Down
4 changes: 2 additions & 2 deletions src/worker/IWorker.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { type IWorkflowHistoryStore } from "../stores/IWorkflowHistoryStore.js";
import { type BaseWorkflowHandle, type WorkflowFunction } from "./WorkflowFunction.js";
import { type WorkflowHandle, type WorkflowFunction } from "./WorkflowFunction.js";

export declare type WithWorkflowArgs<W extends WorkflowFunction, T> = T & (Parameters<W> extends [any, ...any[]] ? {
/**
Expand Down Expand Up @@ -28,7 +28,7 @@ export declare type WorkflowOptions = {
export declare type WorkflowStartOptions<T extends WorkflowFunction = WorkflowFunction> = WithWorkflowArgs<T, WorkflowOptions>;

export interface IWorker {
start: <T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>) => Promise<BaseWorkflowHandle<T>>
start: <T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>) => Promise<WorkflowHandle<T>>
store: IWorkflowHistoryStore
log?: (s: string) => void
}
4 changes: 2 additions & 2 deletions src/worker/Worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { AsyncLocalStorage } from "async_hooks";
import { type IWorkflowContext } from "./IWorkflowContext.js";
import { type IWorkflowHistoryStore } from "../stores/IWorkflowHistoryStore.js";
import { MemoryWorkflowHistoryStore } from "../stores/MemoryWorkflowHistoryStore.js";
import { type BaseWorkflowHandle, type WorkflowFunction, type WorkflowResultType, type WorkflowFunctionReturnType } from "./WorkflowFunction.js";
import { type WorkflowHandle, type WorkflowFunction, type WorkflowResultType, type WorkflowFunctionReturnType } from "./WorkflowFunction.js";
import msPkg from "ms";
import { Mutex } from "async-mutex";
import { sleep } from "../sleep.js";
Expand All @@ -28,7 +28,7 @@ export class Worker implements IWorker {
return Worker.instance;
}

public async start<T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>): Promise<BaseWorkflowHandle<T>> {
public async start<T extends WorkflowFunction>(workflow: T, options?: WorkflowStartOptions<T>): Promise<WorkflowHandle<T>> {
let workflowId = "workflow-" + nanoid();
if (options?.workflowId) {
workflowId = options.workflowId;
Expand Down
8 changes: 4 additions & 4 deletions src/worker/WorkflowFunction.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { type IWorkflowHistoryStore } from "../stores/IWorkflowHistoryStore.js";

export declare type WorkflowFunctionReturnType = Promise<any>;
export type WorkflowFunctionReturnType = Promise<any>;

export declare type WorkflowFunction = (...args: any[]) => WorkflowFunctionReturnType;
export type WorkflowFunction = (...args: any[]) => WorkflowFunctionReturnType;

export declare type WorkflowResultType<W extends WorkflowFunction> = ReturnType<W> extends Promise<infer R> ? R : never;
export type WorkflowResultType<W extends WorkflowFunction> = ReturnType<W> extends Promise<infer R> ? R : never;

export interface BaseWorkflowHandle<T extends WorkflowFunction> {
export type WorkflowHandle<T extends WorkflowFunction> = {
result: () => Promise<WorkflowResultType<T>>
store?: IWorkflowHistoryStore
readonly workflowId: string
Expand Down
15 changes: 9 additions & 6 deletions src/workflows/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ import { Worker as WorkflowWorker } from "../worker/Worker.js";
import { proxyActivities } from "../proxy/proxyActivities.js";
import { OnlyAsync } from "../types/OnlyAsync.js";
import { nanoid } from "nanoid";
import { WorkflowHandle } from "../worker/WorkflowFunction.js";

export type Trigger<P> = {
name: string,
options: any,
description?: string,
start: (workflow: any, run: (id: string, triggerData: P) => Promise<void>) => void | Promise<void>;
start: (workflow: any, run: (id: string, triggerData: P) => Promise<WorkflowHandle<(triggerData: P) => any> | undefined>) => void | Promise<void>;
stop?: (workflow: any) => void | Promise<void>;
};

Expand All @@ -28,7 +29,7 @@ export const workflows = new Map<string, Workflow<any, any>>();
export const workflow = <S extends Record<string, any>, P = void>(workflow: Workflow<S, P>) => {
workflows.set(workflow.name, workflow);

const runInternal = (id: string) => (services: S) => async (triggerData: P) => {
const runInternal = async (id: string, services: S, triggerData: P): Promise<WorkflowHandle<(triggerData: P) => any>> => {
// Proxy services
const proxies = {} as any;
Object.keys(services).forEach((key) => {
Expand All @@ -40,7 +41,7 @@ export const workflow = <S extends Record<string, any>, P = void>(workflow: Work
workflowId: `${workflow.name} ${id}`,
args: [triggerData],
});
return handle.result();
return handle;
}

return {
Expand All @@ -52,18 +53,20 @@ export const workflow = <S extends Record<string, any>, P = void>(workflow: Work
worker.log?.(`Workflow '${workflow.name}' disabled, skip`);
return;
}
runInternal(id)(workflow.services)(payload);
return runInternal(id, workflow.services, payload);
}

await workflow.trigger.start(workflow, run);
},
// Run workflow
run: (services: S) => async (triggerData: P) => {
return await runInternal(nanoid())(services)(triggerData);
const handle = await runInternal(nanoid(), services, triggerData);
return await handle.result();
},
// Invoke workflow with trigger data
invoke: async (triggerData: P) => {
return await runInternal(nanoid())(workflow.services)(triggerData);
const handle = await runInternal(nanoid(), workflow.services, triggerData);
return await handle.result();
},
};
};

0 comments on commit fe4b162

Please sign in to comment.