diff --git a/src/activity/activity.module.ts b/src/activity/activity.module.ts index e234a88f8..3b5d9e1e6 100644 --- a/src/activity/activity.module.ts +++ b/src/activity/activity.module.ts @@ -1,12 +1,12 @@ -/* eslint-disable @typescript-eslint/no-unused-vars */ import { EventEmitter } from "eventemitter3"; import { Agreement } from "../market/agreement"; -import { Activity, IActivityApi } from "./index"; +import { Activity, IActivityApi, ActivityEvents, Result } from "./index"; import { defaultLogger } from "../shared/utils"; import { GolemServices } from "../golem-network/golem-network"; import { WorkContext, WorkOptions } from "./work"; - -export interface ActivityEvents {} +import { ExeScriptExecutor, ExeScriptRequest, ExecutionOptions } from "./exe-script-executor"; +import { Observable, catchError, tap } from "rxjs"; +import { StreamingBatchEvent } from "./results"; export interface ActivityModule { events: EventEmitter; @@ -23,7 +23,19 @@ export interface ActivityModule { * * @return The activity that was permanently terminated */ - destroyActivity(activity: Activity, reason?: string): Promise; + destroyActivity(activity: Activity): Promise; + + /** + * Fetches the latest state of the activity. It's recommended to use this method + * before performing any actions on the activity to make sure it's in the correct state. + * If the fetched activity's state is different from the one you have, an event will be emitted. + */ + refreshActivity(staleActivity: Activity): Promise; + + /** + * Fetches the activity by its ID from yagna. If the activity doesn't exist, an error will be thrown. + */ + findActivityById(activityId: string): Promise; /** * Create a work context "within" the activity so that you can perform commands on the rented resources @@ -31,6 +43,30 @@ export interface ActivityModule { * @return An WorkContext that's fully commissioned and the user can execute their commands */ createWorkContext(activity: Activity, options?: WorkOptions): Promise; + + /** + * Factory method for creating a script executor for the activity + */ + createScriptExecutor(activity: Activity, options?: ExecutionOptions): ExeScriptExecutor; + + /** + * Execute a script on the activity. + */ + executeScript(activity: Activity, script: ExeScriptRequest): Promise; + + /** + * Fetch the results of a batch execution. + */ + getBatchResults(activity: Activity, batchId: string, commandIndex?: number, timeout?: number): Promise; + + /** + * Create an observable that will emit events from the streaming batch. + */ + observeStreamingBatchEvents( + activity: Activity, + batchId: string, + commandIndex?: number, + ): Observable; } /** @@ -78,52 +114,133 @@ export class ActivityModuleImpl implements ActivityModule { private readonly activityApi: IActivityApi; - private readonly fileServer?: IFileServer; - - constructor( - private readonly services: GolemServices, - private readonly options: ActivityModuleOptions = { - fileServer: false, - }, - ) { + constructor(private readonly services: GolemServices) { this.logger = services.logger; this.activityApi = services.activityApi; } + createScriptExecutor(activity: Activity, options?: ExecutionOptions): ExeScriptExecutor { + return new ExeScriptExecutor(activity, this, this.logger.child("executor"), options); + } - async createActivity(agreement: Agreement): Promise { - const activity = await this.activityApi.createActivity(agreement); + async executeScript(activity: Activity, script: ExeScriptRequest): Promise { + this.logger.info("Executing script on activity", { activityId: activity.id }); + try { + const result = await this.activityApi.executeScript(activity, script); + this.events.emit("scriptExecuted", activity, script, result); + return result; + } catch (error) { + this.events.emit("errorExecutingScript", activity, script, error); + throw error; + } + } + async getBatchResults( + activity: Activity, + batchId: string, + commandIndex?: number | undefined, + timeout?: number | undefined, + ): Promise { + this.logger.info("Fetching batch results", { activityId: activity.id, batchId }); + try { + const results = await this.activityApi.getExecBatchResults(activity, batchId, commandIndex, timeout); + this.events.emit("batchResultsReceived", activity, batchId, results); + return results; + } catch (error) { + this.events.emit("errorGettingBatchResults", activity, batchId, error); + throw error; + } + } + observeStreamingBatchEvents( + activity: Activity, + batchId: string, + commandIndex?: number | undefined, + ): Observable { + this.logger.info("Observing streaming batch events", { activityId: activity.id, batchId }); + return this.activityApi.getExecBatchEvents(activity, batchId, commandIndex).pipe( + tap((event) => { + this.events.emit("batchEventsReceived", activity, batchId, event); + }), + catchError((error) => { + this.events.emit("errorGettingBatchEvents", activity, batchId, error); + throw error; + }), + ); + } - this.logger.info("Created activity", { - activityId: activity.id, + async createActivity(agreement: Agreement): Promise { + this.logger.info("Creating activity", { agreementId: agreement.id, provider: agreement.getProviderInfo(), }); - - return activity; + try { + const activity = await this.activityApi.createActivity(agreement); + this.events.emit("activityCreated", activity); + return activity; + } catch (error) { + this.events.emit("errorCreatingActivity", error); + throw error; + } } - async destroyActivity(activity: Activity, reason: string): Promise { - const updated = await this.activityApi.destroyActivity(activity); + async destroyActivity(activity: Activity): Promise { + this.logger.info("Destroying activity", { + activityId: activity.id, + agreementId: activity.agreement.id, + provider: activity.agreement.getProviderInfo(), + }); + try { + const updated = await this.activityApi.destroyActivity(activity); + this.events.emit("activityDestroyed", updated); + return updated; + } catch (error) { + this.events.emit("errorDestroyingActivity", activity, error); + throw error; + } + } - this.logger.info("Destroyed activity", { - activityId: updated.id, - agreementId: updated.agreement.id, - provider: updated.agreement.getProviderInfo(), + async refreshActivity(staleActivity: Activity): Promise { + // logging to debug level to avoid spamming the logs because this method is called frequently + this.logger.debug("Fetching latest activity state", { + activityId: staleActivity.id, + lastState: staleActivity.getState(), }); + try { + const freshActivity = await this.activityApi.getActivity(staleActivity.id); + if (freshActivity.getState() !== staleActivity.getState()) { + this.logger.debug("Activity state changed", { + activityId: staleActivity.id, + previousState: staleActivity.getState(), + newState: freshActivity.getState(), + }); + this.events.emit("activityStateChanged", freshActivity, staleActivity.getState()); + } + return freshActivity; + } catch (error) { + this.events.emit("errorRefreshingActivity", staleActivity, error); + throw error; + } + } - return updated; + async findActivityById(activityId: string): Promise { + this.logger.info("Fetching activity by ID", { activityId }); + return await this.activityApi.getActivity(activityId); } async createWorkContext(activity: Activity, options?: WorkOptions): Promise { - this.logger.debug("Creating work context for activity", { activityId: activity.id }); - const ctx = new WorkContext(activity, this.services.activityApi, this.services.networkApi, { + this.logger.info("Creating work context for activity", { activityId: activity.id }); + const ctx = new WorkContext(activity, this, { yagnaOptions: this.services.yagna.yagnaOptions, + logger: this.logger.child("work-context"), ...options, }); this.logger.debug("Initializing the exe-unit for activity", { activityId: activity.id }); - await ctx.before(); - - return ctx; + try { + await ctx.before(); + this.events.emit("activityInitialized", activity); + return ctx; + } catch (error) { + this.events.emit("errorInitializingActivity", activity, error); + throw error; + } } } diff --git a/src/activity/activity.ts b/src/activity/activity.ts index ba866b7a1..bde48d0cb 100644 --- a/src/activity/activity.ts +++ b/src/activity/activity.ts @@ -1,7 +1,4 @@ -import { Logger } from "../shared/utils"; import { Agreement, ProviderInfo } from "../market/agreement"; -import { ExecutionOptions, ExeScriptExecutor } from "./exe-script-executor"; -import { IActivityApi } from "./types"; export enum ActivityStateEnum { New = "New", @@ -47,13 +44,6 @@ export class Activity { return this.agreement.getProviderInfo(); } - /** - * Temporary helper method that will build a script executor bound to this activity - */ - public createExeScriptExecutor(activityApi: IActivityApi, logger: Logger, options?: ExecutionOptions) { - return new ExeScriptExecutor(this, activityApi, logger, options); - } - public getState() { return this.currentState; } diff --git a/src/activity/api.ts b/src/activity/api.ts new file mode 100644 index 000000000..5e3174981 --- /dev/null +++ b/src/activity/api.ts @@ -0,0 +1,47 @@ +import { Activity, ActivityStateEnum } from "./activity"; +import { Agreement } from "../market/agreement"; +import { ExeScriptRequest } from "./exe-script-executor"; +import { Result, StreamingBatchEvent } from "./results"; +import { Observable } from "rxjs"; + +export type ActivityEvents = { + activityCreated: (activity: Activity) => void; + errorCreatingActivity: (error: Error) => void; + + activityDestroyed: (activity: Activity) => void; + errorDestroyingActivity: (activity: Activity, error: Error) => void; + + activityInitialized: (activity: Activity) => void; + errorInitializingActivity: (activity: Activity, error: Error) => void; + + activityStateChanged: (activity: Activity, previousState: ActivityStateEnum) => void; + errorRefreshingActivity: (activity: Activity, error: Error) => void; + + scriptExecuted: (activity: Activity, script: ExeScriptRequest, result: string) => void; + errorExecutingScript: (activity: Activity, script: ExeScriptRequest, error: Error) => void; + + batchResultsReceived: (activity: Activity, batchId: string, results: Result[]) => void; + errorGettingBatchResults: (activity: Activity, batchId: string, error: Error) => void; + + batchEventsReceived: (activity: Activity, batchId: string, event: StreamingBatchEvent) => void; + errorGettingBatchEvents: (activity: Activity, batchId: string, error: Error) => void; +}; + +/** + * Represents a set of use cases related to managing the lifetime of an activity + */ +export interface IActivityApi { + getActivity(id: string): Promise; + + createActivity(agreement: Agreement): Promise; + + destroyActivity(activity: Activity): Promise; + + getActivityState(id: string): Promise; + + executeScript(activity: Activity, script: ExeScriptRequest): Promise; + + getExecBatchResults(activity: Activity, batchId: string, commandIndex?: number, timeout?: number): Promise; + + getExecBatchEvents(activity: Activity, batchId: string, commandIndex?: number): Observable; +} diff --git a/src/activity/config.ts b/src/activity/config.ts index 4a68aa2c0..13bc9f9f6 100644 --- a/src/activity/config.ts +++ b/src/activity/config.ts @@ -1,4 +1,3 @@ -import { Logger, defaultLogger } from "../shared/utils"; import { ExecutionOptions } from "./exe-script-executor"; const DEFAULTS = { @@ -16,7 +15,6 @@ export class ExecutionConfig { public readonly activityExecuteTimeout: number; public readonly activityExeBatchResultPollIntervalSeconds: number; public readonly activityExeBatchResultMaxRetries: number; - public readonly logger: Logger; constructor(options?: ExecutionOptions) { this.activityRequestTimeout = options?.activityRequestTimeout || DEFAULTS.activityRequestTimeout; @@ -25,6 +23,5 @@ export class ExecutionConfig { options?.activityExeBatchResultMaxRetries || DEFAULTS.activityExeBatchResultMaxRetries; this.activityExeBatchResultPollIntervalSeconds = options?.activityExeBatchResultPollIntervalSeconds || DEFAULTS.activityExeBatchResultPollIntervalSeconds; - this.logger = options?.logger || defaultLogger("work"); } } diff --git a/src/activity/exe-script-executor.test.ts b/src/activity/exe-script-executor.test.ts index eb620a55e..73cd93550 100644 --- a/src/activity/exe-script-executor.test.ts +++ b/src/activity/exe-script-executor.test.ts @@ -1,5 +1,5 @@ import { Activity } from "./activity"; -import { _, anything, imock, instance, mock, verify, when } from "@johanblumenberg/ts-mockito"; +import { _, anything, imock, instance, mock, reset, verify, when } from "@johanblumenberg/ts-mockito"; import { Capture, Deploy, DownloadFile, Run, Script, Start, Terminate, UploadFile } from "./script"; import { buildExeScriptSuccessResult } from "../../tests/unit/helpers"; import { GolemWorkError, WorkErrorCode } from "./work"; @@ -9,29 +9,37 @@ import { ExeScriptExecutor } from "./exe-script-executor"; import { StorageProvider } from "../shared/storage"; import { from, of, throwError } from "rxjs"; import { Result, StreamingBatchEvent } from "./results"; -import { IActivityApi } from "./types"; import resetAllMocks = jest.resetAllMocks; +import { ActivityModule } from "./activity.module"; describe("ExeScriptExecutor", () => { const mockActivity = mock(Activity); const mockLogger = imock(); - const mockActivityApi = imock(); + const mockActivityModule = imock(); const mockStorageProvider = imock(); - when(mockActivity.getProviderInfo()).thenReturn({ - id: "test-provider-id", - name: "test-provider-name", - walletAddress: "0xProviderWallet", - }); beforeEach(() => { + reset(mockActivity); + reset(mockLogger); + reset(mockStorageProvider); + reset(mockActivityModule); resetAllMocks(); + when(mockActivity.getProviderInfo()).thenReturn({ + id: "test-provider-id", + name: "test-provider-name", + walletAddress: "0xProviderWallet", + }); }); describe("Executing", () => { it("should execute commands on activity", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenResolve([ + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenResolve([ new Result({ isBatchFinished: true, result: "Ok", @@ -48,7 +56,11 @@ describe("ExeScriptExecutor", () => { }); it("should execute script and get results by iterator", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -56,7 +68,7 @@ describe("ExeScriptExecutor", () => { const command5 = new Terminate(); const script = Script.create([command1, command2, command3, command4, command5]); - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenResolve([ + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenResolve([ buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("stdout_test_command_run_1"), @@ -78,7 +90,11 @@ describe("ExeScriptExecutor", () => { }); it("should execute script and get results by events", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new UploadFile(instance(mockStorageProvider), "testSrc", "testDst"); @@ -87,7 +103,7 @@ describe("ExeScriptExecutor", () => { const command6 = new Terminate(); const script = Script.create([command1, command2, command3, command4, command5, command6]); - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenResolve([ + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenResolve([ buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("test"), buildExeScriptSuccessResult("stdout_test_command_run_1"), @@ -123,7 +139,11 @@ describe("ExeScriptExecutor", () => { }); it("should execute script by streaming batch", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const capture: Capture = { @@ -199,7 +219,7 @@ describe("ExeScriptExecutor", () => { kind: { finished: { return_code: 0, message: null } }, }, ]; - when(mockActivityApi.getExecBatchEvents(_, _)).thenReturn(from(mockedEvents)); + when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(from(mockedEvents)); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), true); let expectedStdout; @@ -215,7 +235,11 @@ describe("ExeScriptExecutor", () => { describe("Cancelling", () => { it("should cancel executor", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -236,8 +260,12 @@ describe("ExeScriptExecutor", () => { }); it("should cancel executor while streaming batch", async () => { - when(mockActivityApi.getExecBatchEvents(_, _)).thenReturn(of()); - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn(of()); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const capture: Capture = { @@ -262,14 +290,18 @@ describe("ExeScriptExecutor", () => { describe("Error handling", () => { it("should handle some error", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); const script = Script.create([command1, command2, command3]); const error = new Error("Some undefined error"); - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenReject(error); + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); const results = await executor.execute(script.getExeScriptRequest(), false, 200, 0); @@ -289,9 +321,14 @@ describe("ExeScriptExecutor", () => { }); it("should handle non-retryable error", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger), { - activityExeBatchResultPollIntervalSeconds: 10, - }); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + { + activityExeBatchResultPollIntervalSeconds: 10, + }, + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -302,7 +339,7 @@ describe("ExeScriptExecutor", () => { status: 401, toString: () => `Error: non-retryable error`, }; - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenReject(error); + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); const results = await executor.execute(script.getExeScriptRequest(), false, 1_000, 3); @@ -321,8 +358,11 @@ describe("ExeScriptExecutor", () => { }); it("should retry when a retryable error occurs", async () => { - const mockActivityApi = imock(); - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -341,7 +381,7 @@ describe("ExeScriptExecutor", () => { index: 1, eventDate: new Date().toISOString(), }); - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())) + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())) .thenReject(error) .thenReject(error) .thenResolve([testResult]); @@ -351,11 +391,15 @@ describe("ExeScriptExecutor", () => { for await (const result of results) { expect(result).toEqual(testResult); } - verify(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).times(3); + verify(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).times(3); }, 7_000); it("should handle termination error", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -366,7 +410,7 @@ describe("ExeScriptExecutor", () => { toString: () => "Error: GSB error: endpoint address not found. Terminated.", }; - when(mockActivityApi.getExecBatchResults(anything(), anything(), anything(), anything())).thenReject(error); + when(mockActivityModule.getBatchResults(anything(), anything(), anything(), anything())).thenReject(error); const results = await executor.execute(script.getExeScriptRequest(), false, undefined, 1); return new Promise((res) => { @@ -387,7 +431,11 @@ describe("ExeScriptExecutor", () => { }); it("should handle timeout error", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const command3 = new Run("test_command1"); @@ -408,9 +456,15 @@ describe("ExeScriptExecutor", () => { }); it("should handle timeout error while streaming batch", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger), { - activityExecuteTimeout: 1, - }); + when(mockActivityModule.observeStreamingBatchEvents(anything(), anything())).thenReturn(of()); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + { + activityExecuteTimeout: 1, + }, + ); const command1 = new Deploy(); const command2 = new Start(); const capture: Capture = { @@ -434,7 +488,11 @@ describe("ExeScriptExecutor", () => { }); it("should handle some error while streaming batch", async () => { - const executor = new ExeScriptExecutor(instance(mockActivity), instance(mockActivityApi), instance(mockLogger)); + const executor = new ExeScriptExecutor( + instance(mockActivity), + instance(mockActivityModule), + instance(mockLogger), + ); const command1 = new Deploy(); const command2 = new Start(); const capture: Capture = { @@ -445,7 +503,9 @@ describe("ExeScriptExecutor", () => { const command4 = new Terminate(); const script = Script.create([command1, command2, command3, command4]); const mockedEventSourceErrorMessage = "Some undefined error"; - when(mockActivityApi.getExecBatchEvents(_, _)).thenReturn(throwError(() => mockedEventSourceErrorMessage)); + when(mockActivityModule.observeStreamingBatchEvents(_, _)).thenReturn( + throwError(() => mockedEventSourceErrorMessage), + ); await script.before(); const results = await executor.execute(script.getExeScriptRequest(), true); return new Promise((res) => { diff --git a/src/activity/exe-script-executor.ts b/src/activity/exe-script-executor.ts index a9834cd4d..dfead36a8 100644 --- a/src/activity/exe-script-executor.ts +++ b/src/activity/exe-script-executor.ts @@ -9,7 +9,7 @@ import { Result, StreamingBatchEvent } from "./results"; import sleep from "../shared/utils/sleep"; import { Activity } from "./activity"; import { getMessageFromApiError } from "../shared/utils/apiErrorMessage"; -import { IActivityApi } from "./types"; +import { ActivityModule } from "./activity.module"; export interface ExeScriptRequest { text: string; @@ -24,8 +24,6 @@ export interface ExecutionOptions { activityExeBatchResultPollIntervalSeconds?: number; /** maximum number of retries retrieving results when an error occurs, default: 10 */ activityExeBatchResultMaxRetries?: number; - /** Logger module */ - logger?: Logger; } const RETRYABLE_ERROR_STATUS_CODES = [408, 500]; @@ -36,7 +34,7 @@ export class ExeScriptExecutor { constructor( public readonly activity: Activity, - private readonly activityApi: IActivityApi, + private readonly activityModule: ActivityModule, private readonly logger: Logger, options?: ExecutionOptions, ) { @@ -97,7 +95,7 @@ export class ExeScriptExecutor { } protected async send(script: ExeScriptRequest): Promise { - return withTimeout(this.activityApi.executeScript(this.activity, script), this.options.activityRequestTimeout); + return withTimeout(this.activityModule.executeScript(this.activity, script), this.options.activityRequestTimeout); } private async pollingBatch( @@ -117,7 +115,7 @@ export class ExeScriptExecutor { const { activityExecuteTimeout, activityExeBatchResultPollIntervalSeconds, activityExeBatchResultMaxRetries } = this.options; - const { logger, activity, activityApi } = this; + const { logger, activity, activityModule } = this; return new Readable({ objectMode: true, @@ -140,7 +138,7 @@ export class ExeScriptExecutor { logger.debug("Activity is no longer running, will stop polling for batch execution results"); return bail(new GolemAbortError(`Activity ${activityId} has been interrupted.`)); } - return await activityApi.getExecBatchResults( + return await activityModule.getBatchResults( activity, batchId, undefined, @@ -204,7 +202,7 @@ export class ExeScriptExecutor { const errors: object[] = []; const results: Result[] = []; - const source = this.activityApi.getExecBatchEvents(this.activity, batchId).subscribe({ + const source = this.activityModule.observeStreamingBatchEvents(this.activity, batchId).subscribe({ next: (resultEvents) => results.push(this.parseEventToResult(resultEvents, batchSize)), error: (err) => errors.push(err.data?.message ?? err), complete: () => this.logger.debug("Finished reading batch results"), diff --git a/src/activity/index.ts b/src/activity/index.ts index dd12db0ae..06fb85957 100644 --- a/src/activity/index.ts +++ b/src/activity/index.ts @@ -3,4 +3,4 @@ export { Result } from "./results"; export { ExecutionConfig } from "./config"; export * from "./activity.module"; export * from "./work"; -export { IActivityApi } from "./types"; +export * from "./api"; diff --git a/src/activity/types.ts b/src/activity/types.ts deleted file mode 100644 index ac46c26f0..000000000 --- a/src/activity/types.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { Activity, ActivityStateEnum } from "./activity"; -import { Agreement } from "../market/agreement"; -import { ExeScriptRequest } from "./exe-script-executor"; -import { Result, StreamingBatchEvent } from "./results"; -import { Observable } from "rxjs"; - -/** - * Represents a set of use cases related to managing the lifetime of an activity - */ -export interface IActivityApi { - getActivity(id: string): Promise; - - createActivity(agreement: Agreement): Promise; - - destroyActivity(activity: Activity): Promise; - - getActivityState(id: string): Promise; - - executeScript(activity: Activity, script: ExeScriptRequest): Promise; - - getExecBatchResults(activity: Activity, batchId: string, commandIndex?: number, timeout?: number): Promise; - - getExecBatchEvents(activity: Activity, batchId: string, commandIndex?: number): Observable; -} diff --git a/src/activity/work/process.spec.ts b/src/activity/work/process.spec.ts index 2c32cdf18..660f7368d 100644 --- a/src/activity/work/process.spec.ts +++ b/src/activity/work/process.spec.ts @@ -2,7 +2,7 @@ import { RemoteProcess } from "./process"; import { imock, instance, mock, reset } from "@johanblumenberg/ts-mockito"; import { Logger, YagnaApi } from "../../shared/utils"; import { Agreement } from "../../market/agreement"; -import { Activity, IActivityApi } from "../index"; +import { Activity, ActivityModule } from "../index"; import { buildExecutorResults, buildExeScriptErrorResult, @@ -13,14 +13,16 @@ const mockYagna = mock(YagnaApi); const mockAgreement = mock(Agreement); const mockActivity = mock(Activity); const mockLogger = imock(); -const mockActivityApi = imock(); - +const mockActivityModule = imock(); describe("RemoteProcess", () => { let activity: Activity; beforeEach(() => { reset(mockYagna); reset(mockAgreement); + reset(mockActivity); + reset(mockLogger); + reset(mockActivityModule); activity = instance(mockActivity); }); @@ -28,7 +30,7 @@ describe("RemoteProcess", () => { it("should create remote process", async () => { const streamOfActivityResults = buildExecutorResults([buildExeScriptSuccessResult("ok")]); const remoteProcess = new RemoteProcess( - instance(mockActivityApi), + instance(mockActivityModule), streamOfActivityResults, activity, instance(mockLogger), @@ -39,7 +41,7 @@ describe("RemoteProcess", () => { it("should read stdout from remote process", async () => { const streamOfActivityResults = buildExecutorResults([buildExeScriptSuccessResult("Output")]); const remoteProcess = new RemoteProcess( - instance(mockActivityApi), + instance(mockActivityModule), streamOfActivityResults, activity, instance(mockLogger), @@ -52,7 +54,7 @@ describe("RemoteProcess", () => { it("should read stderr from remote process", async () => { const streamOfActivityResults = buildExecutorResults(undefined, [buildExeScriptErrorResult("Error", "Error")]); const remoteProcess = new RemoteProcess( - instance(mockActivityApi), + instance(mockActivityModule), streamOfActivityResults, activity, instance(mockLogger), @@ -65,7 +67,7 @@ describe("RemoteProcess", () => { it("should wait for exit", async () => { const streamOfActivityResults = buildExecutorResults([buildExeScriptSuccessResult("Ok")]); const remoteProcess = new RemoteProcess( - instance(mockActivityApi), + instance(mockActivityModule), streamOfActivityResults, activity, instance(mockLogger), diff --git a/src/activity/work/process.ts b/src/activity/work/process.ts index 9519876ea..3cac0375b 100644 --- a/src/activity/work/process.ts +++ b/src/activity/work/process.ts @@ -1,5 +1,5 @@ import { Readable, Transform } from "stream"; -import { Activity, IActivityApi, Result } from "../index"; +import { Activity, ActivityModule, Result } from "../index"; import { GolemWorkError, WorkErrorCode } from "./error"; import { GolemTimeoutError } from "../../shared/error/golem-error"; import { Logger } from "../../shared/utils"; @@ -26,7 +26,7 @@ export class RemoteProcess { private streamError?: Error; constructor( - private readonly activityApi: IActivityApi, + private readonly activityModule: ActivityModule, private streamOfActivityResults: Readable, private activity: Activity, private readonly logger: Logger, @@ -57,7 +57,7 @@ export class RemoteProcess { new GolemTimeoutError(`The waiting time (${timeoutInMs} ms) for the final result has been exceeded`), ), ); - this.activityApi + this.activityModule .destroyActivity(this.activity) .catch((err) => this.logger.error(`Error when destroying activity`, err)); }, timeoutInMs); @@ -75,7 +75,7 @@ export class RemoteProcess { this.activity.getProviderInfo(), ), ); - this.activityApi + this.activityModule .destroyActivity(this.activity) .catch((err) => this.logger.error(`Error when destroying activity`, err)); } diff --git a/src/activity/work/work.ts b/src/activity/work/work.ts index 638d5144d..2f18b8705 100644 --- a/src/activity/work/work.ts +++ b/src/activity/work/work.ts @@ -1,4 +1,4 @@ -import { Activity, ActivityStateEnum, IActivityApi, Result } from "../"; +import { Activity, ActivityModule, ActivityStateEnum, Result } from "../"; import { Capture, Command, @@ -22,7 +22,6 @@ import { GolemConfigError, GolemTimeoutError } from "../../shared/error/golem-er import { Agreement, ProviderInfo } from "../../market/agreement"; import { TcpProxy } from "../../network/tcpProxy"; import { ExecutionOptions, ExeScriptExecutor } from "../exe-script-executor"; -import { INetworkApi } from "../../network/api"; export type Worker = (ctx: WorkContext) => Promise; @@ -71,8 +70,7 @@ export class WorkContext { constructor( public readonly activity: Activity, - public readonly activityApi: IActivityApi, - private readonly networkApi: INetworkApi, + public readonly activityModule: ActivityModule, private options?: WorkOptions, ) { this.activityPreparingTimeout = options?.activityPreparingTimeout || DEFAULTS.activityPreparingTimeout; @@ -84,15 +82,27 @@ export class WorkContext { this.networkNode = options?.networkNode; - this.executor = this.activity.createExeScriptExecutor(this.activityApi, this.logger, this.options?.execution); + this.executor = this.activityModule.createScriptExecutor(this.activity, this.options?.execution); } - async before(): Promise { - let state = await this.activityApi - .getActivity(this.activity.id) + private async fetchState(): Promise { + return this.activityModule + .refreshActivity(this.activity) .then((activity) => activity.getState()) - .catch((err) => this.logger.error("Failed to read activity state", err)); + .catch((err) => { + this.logger.error("Failed to read activity state", err); + throw new GolemWorkError( + "Failed to read activity state", + WorkErrorCode.ActivityStatusQueryFailed, + this.activity.agreement, + this.activity, + err, + ); + }); + } + async before(): Promise { + let state = await this.fetchState(); if (state === ActivityStateEnum.Ready) { await this.setupActivity(); return; @@ -157,10 +167,7 @@ export class WorkContext { await sleep(this.activityStateCheckingInterval, true); - state = await this.activityApi - .getActivity(this.activity.id) - .then((activity) => activity.getState()) - .catch((err) => this.logger.error("Failed to read activity state", err)); + state = await this.fetchState(); if (state !== ActivityStateEnum.Ready) { throw new GolemWorkError( @@ -261,7 +268,7 @@ export class WorkContext { ); }); - return new RemoteProcess(this.activityApi, streamOfActivityResults, this.activity, this.logger); + return new RemoteProcess(this.activityModule, streamOfActivityResults, this.activity, this.logger); } /** @@ -338,7 +345,7 @@ export class WorkContext { this.activity.getProviderInfo(), ); - return this.networkApi.getWebsocketUri(this.networkNode, port); + return this.networkNode.getWebsocketUri(port); } getIp(): string { @@ -376,22 +383,6 @@ export class WorkContext { }; } - async getState(): Promise { - return this.activityApi - .getActivity(this.activity.id) - .then((activity) => activity.getState()) - .catch((err) => { - this.logger.error("Failed to read activity state", err); - throw new GolemWorkError( - "Failed to read activity state", - WorkErrorCode.ActivityStatusQueryFailed, - this.activity.agreement, - this.activity, - err, - ); - }); - } - private async runOneCommand(command: Command, options?: CommandOptions): Promise> { // Initialize script. const script = new Script([command]); diff --git a/src/network/api.ts b/src/network/api.ts index 122d1dda3..cda1945f2 100644 --- a/src/network/api.ts +++ b/src/network/api.ts @@ -35,11 +35,4 @@ export interface INetworkApi { * Returns the identifier of the requesor */ getIdentity(): Promise; - - /** - * Retrieves the WebSocket URI for a specified network node and port. - * @param networkNode - The network node for which the WebSocket URI is retrieved. - * @param port - The port number for the WebSocket connection. - */ - getWebsocketUri(networkNode: NetworkNode, port: number): string; } diff --git a/src/network/network.module.ts b/src/network/network.module.ts index 9d933ab58..2821fcb84 100644 --- a/src/network/network.module.ts +++ b/src/network/network.module.ts @@ -71,13 +71,6 @@ export interface NetworkModule { * @param node - The node to be removed. */ removeNetworkNode(network: Network, node: NetworkNode): Promise; - - /** - * Retrieves the WebSocket URI for a specified network node and port. - * @param networkNode - The network node for which the WebSocket URI is retrieved. - * @param port - The port number for the WebSocket connection. - */ - getWebsocketUri(networkNode: NetworkNode, port: number): string; } export class NetworkModuleImpl implements NetworkModule { @@ -180,10 +173,6 @@ export class NetworkModuleImpl implements NetworkModule { }); } - getWebsocketUri(networkNode: NetworkNode, port: number): string { - return this.deps.networkApi.getWebsocketUri(networkNode, port); - } - private getFreeIpInNetwork(network: Network, targetIp?: string): IPv4 { if (!targetIp) { return network.getFirstAvailableIpAddress(); diff --git a/src/network/node.ts b/src/network/node.ts index 8705132a3..ceb6a91ae 100644 --- a/src/network/node.ts +++ b/src/network/node.ts @@ -8,6 +8,7 @@ export class NetworkNode { public readonly id: string, public readonly ip: string, public getNetworkInfo: () => NetworkInfo, + public yagnaBaseUri: string, ) {} /** @@ -25,4 +26,10 @@ export class NetworkNode { ], }; } + + getWebsocketUri(port: number): string { + const url = new URL(this.yagnaBaseUri); + url.protocol = "ws"; + return `${url.href}/net/${this.getNetworkInfo().id}/tcp/${this.ip}/${port}`; + } } diff --git a/src/shared/yagna/adapters/network-api-adapter.ts b/src/shared/yagna/adapters/network-api-adapter.ts index b3641de3e..c17046697 100644 --- a/src/shared/yagna/adapters/network-api-adapter.ts +++ b/src/shared/yagna/adapters/network-api-adapter.ts @@ -41,7 +41,12 @@ export class NetworkApiAdapter implements INetworkApi { async createNetworkNode(network: Network, nodeId: string, nodeIp: string): Promise { try { await this.yagnaApi.net.addNode(network.id, { id: nodeId, ip: nodeIp }); - const networkNode = new NetworkNode(nodeId, nodeIp, network.getNetworkInfo.bind(network)); + const networkNode = new NetworkNode( + nodeId, + nodeIp, + network.getNetworkInfo.bind(network), + this.yagnaApi.net.httpRequest.config.BASE, + ); return networkNode; } catch (error) { @@ -81,10 +86,4 @@ export class NetworkApiAdapter implements INetworkApi { ); } } - - getWebsocketUri(networkNode: NetworkNode, port: number) { - const url = new URL(this.yagnaApi.net.httpRequest.config.BASE); - url.protocol = "ws"; - return `${url.href}/net/${networkNode.getNetworkInfo().id}/tcp/${networkNode.ip}/${port}`; - } } diff --git a/tests/unit/work.test.ts b/tests/unit/work.test.ts index 24924de5b..85689d8ba 100644 --- a/tests/unit/work.test.ts +++ b/tests/unit/work.test.ts @@ -1,5 +1,6 @@ import { Activity, + ActivityModule, ActivityStateEnum, Agreement, GolemModuleError, @@ -27,6 +28,7 @@ const mockActivityControl = imock(); const mockExecObserver = imock(); const mockStorageProvider = imock(); const mockAgreement = mock(Agreement); +const mockActivityModule = imock(); describe("Work Context", () => { beforeEach(() => { @@ -37,14 +39,16 @@ describe("Work Context", () => { reset(mockActivityControl); reset(mockExecObserver); reset(mockStorageProvider); + reset(mockAgreement); + reset(mockActivityModule); when(mockActivity.getProviderInfo()).thenReturn({ id: "test-provider-id", name: "test-provider-name", walletAddress: "0xProviderWallet", }); - when(mockActivity.createExeScriptExecutor(_, _, _)).thenReturn(instance(mockExecutor)); + when(mockActivityModule.createScriptExecutor(_, _)).thenReturn(instance(mockExecutor)); when(mockActivity.getState()).thenReturn(ActivityStateEnum.Ready); - when(mockActivityApi.getActivity(_)).thenResolve(instance(mockActivity)); + when(mockActivityModule.refreshActivity(_)).thenResolve(instance(mockActivity)); when(mockActivity.agreement).thenReturn(instance(mockAgreement)); when(mockExecutor.activity).thenReturn(instance(mockActivity)); }); @@ -64,7 +68,7 @@ describe("Work Context", () => { ); const worker = async (ctx: WorkContext) => ctx.run("some_shell_command"); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); await ctx.before(); const results = await worker(ctx); expect(results?.stdout).toEqual("test_result"); @@ -84,7 +88,7 @@ describe("Work Context", () => { ); const worker = async (ctx: WorkContext) => ctx.run("/bin/ls", ["-R"]); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); await ctx.before(); const results = await worker(ctx); @@ -107,7 +111,7 @@ describe("Work Context", () => { ]), ); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); await ctx.before(); @@ -131,7 +135,7 @@ describe("Work Context", () => { ]), ); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); await ctx.before(); @@ -158,7 +162,7 @@ describe("Work Context", () => { ]), ); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); @@ -191,7 +195,7 @@ describe("Work Context", () => { return "/golem/file.txt"; }); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); const result = await ctx.downloadJson("/golem/file.txt"); @@ -224,7 +228,7 @@ describe("Work Context", () => { return "/golem/file.txt"; }); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); const result = await ctx.downloadData("/golem/file.txt"); @@ -243,7 +247,7 @@ describe("Work Context", () => { describe("Exec and stream", () => { it("should execute runAndStream command", async () => { - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); when(mockExecutor.execute(_, _, _)).thenResolve( buildExecutorResults([ { @@ -263,7 +267,7 @@ describe("Work Context", () => { describe("transfer()", () => { it("should execute transfer command", async () => { - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); when(mockExecutor.execute(_, _, _)).thenResolve( buildExecutorResults([ @@ -294,7 +298,7 @@ describe("Work Context", () => { .downloadFile("/golem/file.txt", "./file.txt") .end(); }; - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); const expectedStdout = [ @@ -326,7 +330,7 @@ describe("Work Context", () => { .downloadFile("/golem/file.txt", "./file.txt") .endStream(); }; - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); const expectedStdout = [ @@ -358,31 +362,36 @@ describe("Work Context", () => { }); }); - describe("getState() helper function", () => { + describe("fetchState() helper function", () => { it("should return activity state", async () => { when(mockActivity.getState()).thenReturn(ActivityStateEnum.Deployed); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); - await expect(ctx.getState()).resolves.toEqual(ActivityStateEnum.Deployed); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); + await expect(ctx["fetchState"]()).resolves.toEqual(ActivityStateEnum.Deployed); }); }); describe("getIp()", () => { it("should throw error if there is no network node", async () => { - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); expect(() => ctx.getIp()).toThrow(new Error("There is no network in this work context")); }); it("should return ip address of provider vpn network node", async () => { - const networkNode = new NetworkNode("test-node", "192.168.0.10", () => ({ - id: "test-network", - ip: "192.168.0.0/24", - nodes: { - "192.168.0.10": "example-provider-id", - }, - mask: "255.255.255.0", - })); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const networkNode = new NetworkNode( + "test-node", + "192.168.0.10", + () => ({ + id: "test-network", + ip: "192.168.0.0/24", + nodes: { + "192.168.0.10": "example-provider-id", + }, + mask: "255.255.255.0", + }), + "http://127.0.0.1:7465", + ); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { networkNode, }); @@ -392,7 +401,7 @@ describe("Work Context", () => { describe("getWebsocketUri()", () => { it("should throw error if there is no network node", async () => { - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); expect(() => ctx.getWebsocketUri(80)).toThrow(new Error("There is no network in this work context")); }); @@ -400,11 +409,11 @@ describe("Work Context", () => { it("should return websocket URI from the NetworkNode", async () => { const mockNode = mock(NetworkNode); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { networkNode: instance(mockNode), }); - when(mockNetworkApi.getWebsocketUri(instance(mockNode), 20)).thenReturn("ws://localhost:20"); + when(mockNode.getWebsocketUri(20)).thenReturn("ws://localhost:20"); expect(ctx.getWebsocketUri(20)).toEqual("ws://localhost:20"); }); @@ -427,7 +436,7 @@ describe("Work Context", () => { ]), ); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { storageProvider: instance(mockStorageProvider), }); @@ -453,7 +462,7 @@ describe("Work Context", () => { async () => calls.push("3"), ]; - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi), { + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule), { activityReadySetupFunctions, }); @@ -466,7 +475,7 @@ describe("Work Context", () => { describe("Error handling", () => { it("should return a result with error in case the command to execute is invalid", async () => { const worker = async (ctx: WorkContext) => ctx.beginBatch().run("invalid_shell_command").end(); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); when(mockExecutor.execute(_)).thenResolve( buildExecutorResults(undefined, [buildExeScriptErrorResult("error", "Some error occurred")]), @@ -479,7 +488,7 @@ describe("Work Context", () => { it("should catch error while executing batch as stream with invalid command", async () => { const worker = async (ctx: WorkContext) => ctx.beginBatch().run("invalid_shell_command").endStream(); - const ctx = new WorkContext(instance(mockActivity), instance(mockActivityApi), instance(mockNetworkApi)); + const ctx = new WorkContext(instance(mockActivity), instance(mockActivityModule)); when(mockExecutor.execute(_)).thenResolve( buildExecutorResults(undefined, [buildExeScriptErrorResult("error", "Some error occurred", "test_result")]),