diff --git a/packages/cactus-plugin-satp-hermes/docker-compose.yaml b/packages/cactus-plugin-satp-hermes/docker-compose.yaml deleted file mode 100644 index 831c4d3089..0000000000 --- a/packages/cactus-plugin-satp-hermes/docker-compose.yaml +++ /dev/null @@ -1,17 +0,0 @@ -version: '3.8' -services: - db: - image: postgres:13 - environment: - POSTGRES_DB: ${DB_NAME} - POSTGRES_USER: ${DB_USER} - POSTGRES_PASSWORD: ${DB_PASSWORD} - POSTGRES_HOST: ${DB_HOST} - PGPORT: ${DB_PORT} - ports: - - "${DB_PORT}:5432" - volumes: - - pgdata:/var/lib/postgresql/data - -volumes: - pgdata: diff --git a/packages/cactus-plugin-satp-hermes/docker-compose.yml b/packages/cactus-plugin-satp-hermes/docker-compose.yml index 75cc6307e4..f3d25c4350 100644 --- a/packages/cactus-plugin-satp-hermes/docker-compose.yml +++ b/packages/cactus-plugin-satp-hermes/docker-compose.yml @@ -9,3 +9,18 @@ services: - 3010:3010/tcp # SERVER_PORT - 3011:3011/tcp # CLIENT_PORT - 4010:4010/tcp # API_PORT + db: + image: postgres:13 + environment: + POSTGRES_DB: ${DB_NAME} + POSTGRES_USER: ${DB_USER} + POSTGRES_PASSWORD: ${DB_PASSWORD} + POSTGRES_HOST: ${DB_HOST} + PGPORT: ${DB_PORT} + ports: + - "${DB_PORT}:5432" + volumes: + - pgdata:/var/lib/postgresql/data + +volumes: + pgdata: \ No newline at end of file diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts index 617e885b82..de66e9d515 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-manager.ts @@ -23,22 +23,22 @@ import { } from "../../generated/proto/cacti/satp/v02/crash_recovery_pb"; import { SessionType } from "../session-utils"; import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager"; -import cron from "node-cron"; - +import cron, { ScheduledTask } from "node-cron"; export enum CrashStatus { IN_RECOVERY = "IN_RECOVERY", RECOVERED = "RECOVERED", NO_CRASH = "NO_CRASH", + ROLLBACK = "ROLLBACK_REQUIRED", ERROR = "ERROR", } -class CrashOccurrence { +/*class CrashOccurrence { constructor( public status: CrashStatus, public time: Date, public lastUpdate: Date, ) {} -} +}*/ export interface ICrashRecoveryManagerOptions { logLevel?: LogLevelDesc; @@ -54,7 +54,8 @@ export class CrashRecoveryManager { private sessions: Map; private crashRecoveryHandler: CrashRecoveryHandler; private factory: RollbackStrategyFactory; - private logRepository: ILocalLogRepository; + public logRepository: ILocalLogRepository; + private crashDetectionTask!: ScheduledTask; constructor(public readonly options: ICrashRecoveryManagerOptions) { const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`; @@ -125,18 +126,51 @@ export class CrashRecoveryManager { } private detectCrash() { - const fnTag = `${this.className}#startCrashDetectionCron()`; - cron.schedule("*/10 * * * * *", async () => { + const fnTag = `${this.className}#detectCrash()`; + + if (this.sessions.size === 0) { + this.log.warn( + `${fnTag} No active sessions. skipping cron job scheduling.`, + ); + return; + } + + this.crashDetectionTask = cron.schedule("*/15 * * * * *", async () => { this.log.debug(`${fnTag} Running crash detection cron job.`); - // helper function await this.checkAndResolveCrashes(); + + // stop the cron job if all sessions are resolved + if (this.sessions.size === 0) { + this.log.info(`${fnTag} all sessions resolved. Stopping cron job.`); + this.stopCrashDetection(); + } }); - this.log.info(`${fnTag} Crash detection cron job scheduled.`); + + this.log.info(`${fnTag} crash detection cron job scheduled.`); + } + + public stopCrashDetection() { + if (this.crashDetectionTask) { + this.crashDetectionTask.stop(); + this.log.info(`${this.className}#stopCrashDetection() Cron job stopped.`); + } } public async checkAndResolveCrashes(): Promise { - for (const session of this.sessions.values()) { + const fnTag = `${this.className}#checkAndResolveCrashes()`; + + for (const [sessionId, session] of this.sessions.entries()) { await this.checkAndResolveCrash(session); + + const sessionData = session.hasClientSessionData() + ? session.getClientSessionData() + : session.getServerSessionData(); + + // remove resolved sessions + if (sessionData?.completed) { + this.sessions.delete(sessionId); + this.log.info(`${fnTag} session ${sessionId} resolved and removed.`); + } } } @@ -153,51 +187,47 @@ export class CrashRecoveryManager { try { let attempts = 0; - let crashOccurrence: CrashOccurrence | undefined; + const maxRetries = Number(sessionData.maxRetries); - while (attempts < BigInt(sessionData.maxRetries)) { + while (attempts < maxRetries) { const crashStatus = await this.checkCrash(session); if (crashStatus === CrashStatus.IN_RECOVERY) { this.log.info(`${fnTag} Crash detected! Attempting recovery`); - if (!crashOccurrence) { - crashOccurrence = new CrashOccurrence( - CrashStatus.IN_RECOVERY, - new Date(), - new Date(), - ); - } else { - crashOccurrence.lastUpdate = new Date(); - } - - const status = await this.handleRecovery(session); - if (status) { - crashOccurrence.status = CrashStatus.RECOVERED; + const recoverySuccess = await this.handleRecovery(session); + if (recoverySuccess) { this.log.info( `${fnTag} Recovery successful for sessionID: ${session.getSessionId()}`, ); return; + } else { + attempts++; + this.log.info( + `${fnTag} Recovery attempt ${attempts} failed for sessionID: ${session.getSessionId()}`, + ); } - } - attempts++; - this.log.info( - `${fnTag} Retry attempt ${attempts} for sessionID: ${session.getSessionId()}`, - ); - } - if (attempts !== 0) { - this.log.warn(`${fnTag} All retries exhausted! Initiating Rollback`); - const rollBackStatus = await this.initiateRollback(session, true); - if (rollBackStatus) { + } else if (crashStatus === CrashStatus.ROLLBACK) { + this.log.warn( + `${fnTag} Crash requires rollback. Initiating rollback.`, + ); + await this.initiateRollback(session, true); + return; // Exit after rollback + } else if (crashStatus === CrashStatus.NO_CRASH) { this.log.info( - `${fnTag} Rollback was successful for sessionID: ${session.getSessionId()}`, + `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, ); + return; // Exit if no crash } else { - this.log.error( - `${fnTag} Rollback failed for sessionID: ${session.getSessionId()}`, - ); + this.log.warn(`${fnTag} Unexpected crash status: ${crashStatus}`); + return; } } + + this.log.warn( + `${fnTag} All recovery attempts exhausted. Initiating rollback.`, + ); + await this.initiateRollback(session, true); } catch (error) { this.log.error(`${fnTag} Error during crash resolution: ${error}`); } @@ -236,12 +266,9 @@ export class CrashRecoveryManager { this.log.warn( `${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`, ); - return CrashStatus.IN_RECOVERY; + return CrashStatus.ROLLBACK; } - this.log.info( - `${fnTag} No crash detected for session ID: ${session.getSessionId()}`, - ); return CrashStatus.NO_CRASH; } catch (error) { this.log.error(`${fnTag} Error occurred during crash check: ${error}`); diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts index 3c67fb087b..a4579c3fc0 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/crash-recovery-handler.ts @@ -54,6 +54,9 @@ export class CrashRecoveryHandler { if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } + const logData = sessionData.hasClientSessionData() + ? sessionData.getClientSessionData() + : sessionData.getServerSessionData(); const updateMessage = await this.service.createRecoverUpdateMessage(req); this.log.debug(`${fnTag}, Created RecoverUpdateMessage`); @@ -64,7 +67,7 @@ export class CrashRecoveryHandler { key: getSatpLogKey(req.sessionId, "RECOVER", "init"), operation: "init", timestamp: new Date().toISOString(), - data: JSON.stringify(sessionData), + data: JSON.stringify(logData), }; await this.logRepository.create(logEntry); @@ -84,6 +87,9 @@ export class CrashRecoveryHandler { if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } + const logData = sessionData.hasClientSessionData() + ? sessionData.getClientSessionData() + : sessionData.getServerSessionData(); const logEntry = { sessionID: req.sessionId, @@ -91,7 +97,7 @@ export class CrashRecoveryHandler { key: getSatpLogKey(req.sessionId, "RECOVER_SUCCESS", "init"), operation: "RECOVER_SUCCESS_MESSAGE_SENT", timestamp: new Date().toISOString(), - data: JSON.stringify(sessionData), + data: JSON.stringify(logData), }; await this.logRepository.create(logEntry); @@ -113,6 +119,9 @@ export class CrashRecoveryHandler { if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } + const logData = sessionData.hasClientSessionData() + ? sessionData.getClientSessionData() + : sessionData.getServerSessionData(); const ackMessage = this.service.createRollbackAckMessage(req); @@ -122,7 +131,7 @@ export class CrashRecoveryHandler { key: getSatpLogKey(req.sessionId, "ROLLBACK", "init"), operation: "ROLLBACK_MESSAGE_SENT", timestamp: new Date().toISOString(), - data: JSON.stringify(sessionData), + data: JSON.stringify(logData), }; await this.logRepository.create(logEntry); @@ -142,6 +151,9 @@ export class CrashRecoveryHandler { if (!sessionData) { throw new Error(`${fnTag}, Session not found`); } + const logData = sessionData.hasClientSessionData() + ? sessionData.getClientSessionData() + : sessionData.getServerSessionData(); const logEntry = { sessionID: req.sessionId, @@ -149,7 +161,7 @@ export class CrashRecoveryHandler { key: getSatpLogKey(req.sessionId, "ROLLBACK_ACK", "init"), operation: "ROLLBACK_ACK", timestamp: new Date().toISOString(), - data: JSON.stringify(sessionData), + data: JSON.stringify(logData), }; await this.logRepository.create(logEntry); diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts index b449821866..ecea3b4f4e 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage0-rollback-strategy.ts @@ -35,7 +35,7 @@ export class Stage0RollbackStrategy implements RollbackStrategy { currentStage: String(sessionData.hashes?.stage0), stepsRemaining: 0, rollbackLogEntries: [], - estimatedTimeToCompletion: "0", + estimatedTimeToCompletion: "", status: "IN_PROGRESS", details: "", }); @@ -54,7 +54,7 @@ export class Stage0RollbackStrategy implements RollbackStrategy { rollbackState.details = "Rollback of Stage 0 completed successfully"; this.log.info(`${fnTag} Rollback of Stage 0 completed successfully`); - + // todo: add logs for rollback //await this.logRepository.create(logEntry); return rollbackState; diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts index fbce7a40f7..8ab780ecbd 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage1-rollback-strategy.ts @@ -77,12 +77,13 @@ export class Stage1RollbackStrategy implements RollbackStrategy { rollbackState.rollbackLogEntries.push(rollbackLogEntry); rollbackState.stepsRemaining = 0; rollbackState.status = "COMPLETED"; - rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.estimatedTimeToCompletion = ""; rollbackState.details = "Rollback of Stage 1 completed successfully"; this.log.info( `${fnTag} Successfully rolled back Stage 1 for session ${session.getSessionId}`, ); + // todo: add logs for rollback //await this.logRepository.create(logEntry); return rollbackState; } catch (error) { diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts index 35dcea9e38..d24d9a22f0 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage2-rollback-strategy.ts @@ -48,7 +48,7 @@ export class Stage2RollbackStrategy implements RollbackStrategy { currentStage: String(sessionData.hashes?.stage2), stepsRemaining: 1, rollbackLogEntries: [], - estimatedTimeToCompletion: "0", + estimatedTimeToCompletion: "", status: "IN_PROGRESS", details: "", }); @@ -80,12 +80,13 @@ export class Stage2RollbackStrategy implements RollbackStrategy { rollbackState.rollbackLogEntries.push(rollbackLogEntry); rollbackState.stepsRemaining = 1; rollbackState.status = "COMPLETED"; - rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.estimatedTimeToCompletion = ""; rollbackState.details = "Rollback of Stage 2 completed successfully"; this.log.info( `${fnTag} Successfully rolled back Stage 2 for session ${session.getSessionId()}`, ); + // todo: add logs for rollback //await this.logRepository.create(logEntry); return rollbackState; } catch (error) { diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts index 1e0507ca10..37a9732e0b 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/core/recovery/rollback/stage3-rollback-strategy.ts @@ -81,12 +81,13 @@ export class Stage3RollbackStrategy implements RollbackStrategy { rollbackState.rollbackLogEntries.push(rollbackLogEntry); rollbackState.stepsRemaining = 2; rollbackState.status = "COMPLETED"; - rollbackState.estimatedTimeToCompletion = "0"; + rollbackState.estimatedTimeToCompletion = ""; rollbackState.details = "Rollback of Stage 3 completed successfully"; this.log.info( `${fnTag} Successfully rolled back Stage 3 for session ${session.getSessionId()}`, ); + // todo: add logs for rollback //await this.logRepository.create(logEntry); return rollbackState; } catch (error) { diff --git a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts index aea4092100..1ed1f37ba7 100644 --- a/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts +++ b/packages/cactus-plugin-satp-hermes/src/main/typescript/plugin-satp-hermes-gateway.ts @@ -179,11 +179,7 @@ export class SATPGateway implements IPluginWebService, ICactusPlugin { this.BLODispatcher = new BLODispatcher(dispatcherOps); this.OAPIServerEnabled = this.config.enableOpenAPI ?? true; - const specPath = path.join(__dirname, "../json/openapi-blo-bundled.json"); - this.OAS = JSON.parse(fs.readFileSync(specPath, "utf8")); - if (!this.OAS) { - this.logger.warn("Error loading OAS"); - } + this.OAS = OAS; // After setup, initialize crash manager and check if we crashed; const crashOptions: ICrashRecoveryManagerOptions = { diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts index 55168e48dd..efbe97410a 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/cron.test.ts @@ -2,7 +2,7 @@ import "jest-extended"; import { CrashRecoveryManager } from "../../../../main/typescript/core/recovery/crash-manager"; import { LogLevelDesc, Secp256k1Keys } from "@hyperledger/cactus-common"; import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/core/recovery/crash-manager"; -import knex from "knex"; +import { Knex, knex } from "knex"; import { LocalLog, SupportedChain, @@ -22,11 +22,14 @@ import { TokenType } from "../../../../main/typescript/core/stage-services/satp- const logLevel: LogLevelDesc = "DEBUG"; -let mockSession: SATPSession; -const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); -const sessionId = uuidv4(); +let crashManager: CrashRecoveryManager; +let knexInstance: Knex; -const createMockSession = (maxTimeout: string, maxRetries: string) => { +const createMockSession = ( + sessionId: string, + maxTimeout: string, + maxRetries: string, +) => { const mockSession = new SATPSession({ contextID: "MOCK_CONTEXT_ID", server: false, @@ -41,6 +44,7 @@ const createMockSession = (maxTimeout: string, maxRetries: string) => { sessionData.maxTimeout = maxTimeout; sessionData.maxRetries = maxRetries; sessionData.version = SATP_VERSION; + const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( "hex", ); @@ -86,10 +90,9 @@ const createMockSession = (maxTimeout: string, maxRetries: string) => { return mockSession; }; -let crashManager: CrashRecoveryManager; beforeAll(async () => { - const knexInstance = knex(knexClientConnection); + knexInstance = knex(knexClientConnection); await knexInstance.migrate.latest(); const crashManagerOptions: ICrashRecoveryManagerOptions = { @@ -106,21 +109,36 @@ beforeAll(async () => { crashManager = new CrashRecoveryManager(crashManagerOptions); }); -afterEach(() => { +beforeEach(async () => { + crashManager["sessions"].clear(); +}); + +afterEach(async () => { jest.clearAllMocks(); jest.useRealTimers(); + crashManager["sessions"].clear(); +}); + +afterAll(async () => { + if (crashManager) { + crashManager.stopCrashDetection(); + crashManager.logRepository.destroy(); + } + if (knexInstance) { + await knexInstance.destroy(); + } }); describe("CrashRecoveryManager Tests", () => { - it("should trigger checkAndResolveCrashes via cron schedule every 10 seconds for 30 seconds", async () => { + it("should trigger checkAndResolveCrashes via cron schedule every 15 seconds for 75 seconds", async () => { jest.useFakeTimers(); - mockSession = createMockSession("10000", "3"); + const sessionId = uuidv4(); + const mockSession = createMockSession(sessionId, "10000", "3"); const sessionData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); - const sessionId = sessionData.id; const key = getSatpLogKey(sessionId, "type", "operation"); const mockLogEntry: LocalLog = { sessionID: sessionId, @@ -140,12 +158,12 @@ describe("CrashRecoveryManager Tests", () => { await crashManager.recoverSessions(); - for (let i = 1; i <= 3; i++) { - jest.advanceTimersByTime(10000); + for (let i = 1; i <= 5; i++) { + jest.advanceTimersByTime(15000); await Promise.resolve(); } - expect(mockCheckAndResolveCrash).toHaveBeenCalledTimes(3); + expect(mockCheckAndResolveCrash).toHaveBeenCalledTimes(5); expect(mockCheckAndResolveCrash).toHaveBeenCalledWith( expect.any(SATPSession), ); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/recover.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/recover.test.ts new file mode 100644 index 0000000000..51f56b3779 --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/recover.test.ts @@ -0,0 +1,118 @@ +import "jest-extended"; +import { CrashRecoveryHandler } from "../../../../main/typescript/core/recovery/crash-recovery-handler"; +import { CrashRecoveryService } from "../../../../main/typescript/core/recovery/crash-utils"; +import { + RecoverMessage, + RecoverUpdateMessage, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/crash_recovery_pb"; +import { ILocalLogRepository } from "../../../../main/typescript/repository/interfaces/repository"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { LogLevelDesc } from "@hyperledger/cactus-common"; +import { Knex, knex } from "knex"; +import { knexClientConnection } from "../../knex.config"; +import { v4 as uuidv4 } from "uuid"; +import { getSatpLogKey } from "../../../../main/typescript/gateway-utils"; + +const logLevel: LogLevelDesc = "DEBUG"; + +describe("CrashRecoveryHandler Tests", () => { + let handler: CrashRecoveryHandler; + let mockService: jest.Mocked; + let mockLogRepository: jest.Mocked; + let sessions: Map; + let knexInstance: Knex; + const sessionId = uuidv4(); + + beforeAll(async () => { + knexInstance = knex(knexClientConnection); + await knexInstance.migrate.latest(); + }); + + afterAll(async () => { + if (knexInstance) { + await knexInstance.destroy(); + } + }); + + beforeEach(() => { + // change + mockService = { + createRecoverUpdateMessage: jest.fn(), + createRollbackAckMessage: jest.fn(), + } as unknown as jest.Mocked; + + // change + mockLogRepository = { + create: jest.fn(), + readLastestLog: jest.fn(), + readLogsBySessionId: jest.fn(), + readLogsNotProofs: jest.fn(), + deleteAllLogs: jest.fn(), + destroy: jest.fn(), + } as unknown as jest.Mocked; + + sessions = new Map(); + const mockSession = new SATPSession({ + contextID: "test-context-id", + server: false, + client: true, + }); + sessions.set(sessionId, mockSession); + + handler = new CrashRecoveryHandler({ + crashService: mockService, + loggerOptions: { + level: logLevel, + label: "CrashRecoveryHandlerTest", + }, + sessions: sessions, + logRepository: mockLogRepository, + }); + }); + + afterEach(() => { + jest.restoreAllMocks(); + }); + + it("should handle recover message and return RecoverUpdateMessage", async () => { + const recoverMessage = new RecoverMessage({ + sessionId: sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg", + satpPhase: "phase-1", + sequenceNumber: 1, + isBackup: false, + newIdentityPublicKey: "", + lastEntryTimestamp: BigInt(Date.now()), + senderSignature: "", + }); + + const expectedRecoverUpdateMessage = new RecoverUpdateMessage({ + sessionId: sessionId, + messageType: "urn:ietf:SATP-2pc:msgtype:recover-update-msg", + hashRecoverMessage: "", + recoveredLogs: [], + senderSignature: "", + }); + + mockService.createRecoverUpdateMessage.mockResolvedValue( + expectedRecoverUpdateMessage, + ); + + const result = await handler.handleRecover(recoverMessage); + + expect(result).toEqual(expectedRecoverUpdateMessage); + expect(mockService.createRecoverUpdateMessage).toHaveBeenCalledWith( + recoverMessage, + ); + + expect(mockLogRepository.create).toHaveBeenCalled(); + const logEntryArg = mockLogRepository.create.mock.calls[0][0]; + + expect(logEntryArg.sessionID).toBe(sessionId); + expect(logEntryArg.type).toBe("RECOVER"); + expect(logEntryArg.operation).toBe("init"); + expect(logEntryArg.key).toBe(getSatpLogKey(sessionId, "RECOVER", "init")); + expect(logEntryArg.timestamp).toBeDefined(); + expect(logEntryArg.data).toBeDefined(); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/rollback.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/rollback.test.ts new file mode 100644 index 0000000000..38ef0c4d5c --- /dev/null +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/rollback.test.ts @@ -0,0 +1,163 @@ +import "jest-extended"; +import { RollbackStrategyFactory } from "../../../../main/typescript/core/recovery/rollback/rollback-strategy-factory"; +import { Stage0RollbackStrategy } from "../../../../main/typescript/core/recovery/rollback/stage0-rollback-strategy"; +import { Stage1RollbackStrategy } from "../../../../main/typescript/core/recovery/rollback/stage1-rollback-strategy"; +import { Stage2RollbackStrategy } from "../../../../main/typescript/core/recovery/rollback/stage2-rollback-strategy"; +import { Stage3RollbackStrategy } from "../../../../main/typescript/core/recovery/rollback/stage3-rollback-strategy"; +import { ILocalLogRepository } from "../../../../main/typescript/repository/interfaces/repository"; +import { ISATPBridgesOptions } from "../../../../main/typescript/gol/satp-bridges-manager"; +import { Secp256k1Keys } from "@hyperledger/cactus-common"; +import { + Asset, + CredentialProfile, + LockType, + SignatureAlgorithm, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/message_pb"; +import { + MessageStagesHashes, + Stage1Hashes, + Stage2Hashes, + Stage3Hashes, +} from "../../../../main/typescript/generated/proto/cacti/satp/v02/common/session_pb"; +import { v4 as uuidv4 } from "uuid"; +import { SATP_VERSION } from "../../../../main/typescript/core/constants"; +import { SATPSession } from "../../../../main/typescript/core/satp-session"; +import { SupportedChain } from "../../../../main/typescript/core/types"; +import { TokenType } from "../../../../main/typescript/core/stage-services/satp-bridge/types/asset"; +import { Knex } from "knex"; +import { knexClientConnection } from "../../knex.config"; +import { KnexLocalLogRepository as LocalLogRepository } from "../../../../main/typescript/repository/knex-local-log-repository"; + +const createMockSession = (maxTimeout: string, maxRetries: string) => { + const sessionId = uuidv4(); + const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); + + const mockSession = new SATPSession({ + contextID: "MOCK_CONTEXT_ID", + server: false, + client: true, + }); + + const sessionData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + sessionData.id = sessionId; + sessionData.maxTimeout = maxTimeout; + sessionData.maxRetries = maxRetries; + sessionData.version = SATP_VERSION; + sessionData.clientGatewayPubkey = Buffer.from(keyPairs.publicKey).toString( + "hex", + ); + sessionData.serverGatewayPubkey = sessionData.clientGatewayPubkey; + sessionData.originatorPubkey = "MOCK_ORIGINATOR_PUBKEY"; + sessionData.beneficiaryPubkey = "MOCK_BENEFICIARY_PUBKEY"; + sessionData.digitalAssetId = "MOCK_DIGITAL_ASSET_ID"; + sessionData.assetProfileId = "MOCK_ASSET_PROFILE_ID"; + sessionData.receiverGatewayOwnerId = "MOCK_RECEIVER_GATEWAY_OWNER_ID"; + sessionData.recipientGatewayNetworkId = SupportedChain.FABRIC; + sessionData.senderGatewayOwnerId = "MOCK_SENDER_GATEWAY_OWNER_ID"; + sessionData.senderGatewayNetworkId = SupportedChain.BESU; + sessionData.signatureAlgorithm = SignatureAlgorithm.RSA; + sessionData.lockType = LockType.FAUCET; + sessionData.lockExpirationTime = BigInt(1000); + sessionData.credentialProfile = CredentialProfile.X509; + sessionData.loggingProfile = "MOCK_LOGGING_PROFILE"; + sessionData.accessControlProfile = "MOCK_ACCESS_CONTROL_PROFILE"; + sessionData.resourceUrl = "MOCK_RESOURCE_URL"; + sessionData.lockAssertionExpiration = BigInt(99999); + sessionData.receiverContractOntology = "MOCK_RECEIVER_CONTRACT_ONTOLOGY"; + sessionData.senderContractOntology = "MOCK_SENDER_CONTRACT_ONTOLOGY"; + sessionData.sourceLedgerAssetId = "MOCK_SOURCE_LEDGER_ASSET_ID"; + sessionData.senderAsset = new Asset(); + sessionData.senderAsset.tokenId = "MOCK_TOKEN_ID"; + sessionData.senderAsset.tokenType = TokenType.ERC20; + sessionData.senderAsset.amount = BigInt(0); + sessionData.senderAsset.owner = "MOCK_SENDER_ASSET_OWNER"; + sessionData.senderAsset.ontology = "MOCK_SENDER_ASSET_ONTOLOGY"; + sessionData.senderAsset.contractName = "MOCK_SENDER_ASSET_CONTRACT_NAME"; + sessionData.senderAsset.contractAddress = + "MOCK_SENDER_ASSET_CONTRACT_ADDRESS"; + sessionData.receiverAsset = new Asset(); + + sessionData.receiverAsset.tokenType = TokenType.ERC20; + sessionData.receiverAsset.amount = BigInt(0); + sessionData.receiverAsset.owner = "MOCK_RECEIVER_ASSET_OWNER"; + sessionData.receiverAsset.ontology = "MOCK_RECEIVER_ASSET_ONTOLOGY"; + sessionData.receiverAsset.contractName = "MOCK_RECEIVER_ASSET_CONTRACT_NAME"; + sessionData.receiverAsset.mspId = "MOCK_RECEIVER_ASSET_MSP_ID"; + sessionData.receiverAsset.channelName = "MOCK_CHANNEL_ID"; + sessionData.lastSequenceNumber = BigInt(4); + + return mockSession; +}; + +describe("RollbackStrategyFactory Tests", () => { + let factory: RollbackStrategyFactory; + let mockLogRepository: ILocalLogRepository; + let mockSATPBridgesOptions: ISATPBridgesOptions; + let knexInstance: Knex; + + beforeAll(async () => { + mockLogRepository = new LocalLogRepository(knexClientConnection); + mockSATPBridgesOptions = { + logLevel: "DEBUG", + networks: [], + supportedDLTs: [SupportedChain.BESU, SupportedChain.FABRIC], + } as ISATPBridgesOptions; + + factory = new RollbackStrategyFactory( + mockSATPBridgesOptions, + mockLogRepository, + ); + }); + + afterEach(() => { + jest.clearAllMocks(); + }); + + afterAll(async () => { + if (knexInstance) { + await knexInstance.destroy(); + } + }); + + it("should create Stage0RollbackStrategy when no hashes are present", () => { + const mockSession = createMockSession("1000", "3"); + const sessionData = mockSession.getClientSessionData(); + sessionData.hashes = undefined; + const strategy = factory.createStrategy(mockSession); + expect(strategy).toBeInstanceOf(Stage0RollbackStrategy); + }); + + it("should create Stage1RollbackStrategy when stage1 hashes are present", () => { + const mockSession = createMockSession("1000", "3"); + const sessionData = mockSession.getClientSessionData(); + sessionData.hashes = new MessageStagesHashes(); + sessionData.hashes.stage1 = new Stage1Hashes(); + const strategy = factory.createStrategy(mockSession); + expect(strategy).toBeInstanceOf(Stage1RollbackStrategy); + }); + + it("should create Stage1RollbackStrategy when stage2 hashes are present", () => { + const mockSession = createMockSession("1000", "3"); + const sessionData = mockSession.getClientSessionData(); + sessionData.hashes = new MessageStagesHashes(); + sessionData.hashes.stage2 = new Stage2Hashes(); + const strategy = factory.createStrategy(mockSession); + expect(strategy).toBeInstanceOf(Stage2RollbackStrategy); + }); + + it("should create Stage3RollbackStrategy when all hashes are present", () => { + const mockSession = createMockSession("1000", "3"); + const sessionData = mockSession.getClientSessionData(); + sessionData.hashes = new MessageStagesHashes(); + sessionData.hashes.stage1 = new Stage1Hashes(); + sessionData.hashes.stage2 = new Stage2Hashes(); + sessionData.hashes.stage3 = new Stage3Hashes(); + + const strategy = factory.createStrategy(mockSession); + + expect(strategy).toBeInstanceOf(Stage3RollbackStrategy); + }); +}); diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/scenarios.test.ts similarity index 86% rename from packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts rename to packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/scenarios.test.ts index 7fc34246c3..214d49dbd4 100644 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/logging.test.ts +++ b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/scenarios.test.ts @@ -5,7 +5,7 @@ import { } from "../../../../main/typescript/core/recovery/crash-manager"; import { LogLevelDesc, Secp256k1Keys } from "@hyperledger/cactus-common"; import { ICrashRecoveryManagerOptions } from "../../../../main/typescript/core/recovery/crash-manager"; -import knex from "knex"; +import { Knex, knex } from "knex"; import { LocalLog, SupportedChain, @@ -29,9 +29,9 @@ const logLevel: LogLevelDesc = "DEBUG"; let mockSession: SATPSession; const keyPairs = Secp256k1Keys.generateKeyPairsBuffer(); -const sessionId = uuidv4(); const createMockSession = (maxTimeout: string, maxRetries: string) => { + const sessionId = uuidv4(); const mockSession = new SATPSession({ contextID: "MOCK_CONTEXT_ID", server: false, @@ -92,9 +92,10 @@ const createMockSession = (maxTimeout: string, maxRetries: string) => { return mockSession; }; let crashManager: CrashRecoveryManager; +let knexInstance: Knex; beforeAll(async () => { - const knexInstance = knex(knexClientConnection); + knexInstance = knex(knexClientConnection); await knexInstance.migrate.latest(); const crashManagerOptions: ICrashRecoveryManagerOptions = { @@ -111,17 +112,64 @@ beforeAll(async () => { crashManager = new CrashRecoveryManager(crashManagerOptions); }); -afterEach(() => { +afterEach(async () => { + crashManager["sessions"].clear(); jest.clearAllMocks(); }); +afterAll(async () => { + if (crashManager) { + crashManager.stopCrashDetection(); + crashManager.logRepository.destroy(); + } + if (knexInstance) { + await knexInstance.destroy(); + } +}); + describe("CrashRecoveryManager Tests", () => { + it("should invoke rollback based on session timeout", async () => { + mockSession = createMockSession("1000", "3"); // timeout of 1 sec + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + const sessionId = testData.id; + + const handleRollbackSpy = jest + .spyOn(crashManager, "initiateRollback") + .mockImplementation(async () => true); + + const key = getSatpLogKey(sessionId, "type_o", "done"); + + const pastTime = new Date(Date.now() - 10000).toISOString(); + + const mockLogEntry: LocalLog = { + sessionID: sessionId, + type: "type_o", + key: key, + operation: "done", + timestamp: pastTime, + data: JSON.stringify(testData), + }; + + const mockLogRepository = crashManager["logRepository"]; + + await mockLogRepository.create(mockLogEntry); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRollbackSpy).toHaveBeenCalled(); + + handleRollbackSpy.mockRestore(); + }); it("should reconstruct session by fetching logs", async () => { mockSession = createMockSession("1000", "3"); const testData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); + const sessionId = testData.id; // load sample log in database const key = getSatpLogKey(sessionId, "type", "operation"); @@ -154,6 +202,33 @@ describe("CrashRecoveryManager Tests", () => { } }); + it("should not recover if no crash is detected", async () => { + mockSession = createMockSession("10000", "3"); + + const testData = mockSession.hasClientSessionData() + ? mockSession.getClientSessionData() + : mockSession.getServerSessionData(); + + const mockLogEntry: LocalLog = { + sessionID: testData.id, + type: "type", + key: getSatpLogKey(testData.id, "type", "done"), + operation: "done", + timestamp: new Date().toISOString(), + data: JSON.stringify(testData), + }; + + await crashManager.logRepository.create(mockLogEntry); + + const handleRecoverySpy = jest.spyOn(crashManager, "handleRecovery"); + const initiateRollbackSpy = jest.spyOn(crashManager, "initiateRollback"); + + await crashManager.checkAndResolveCrash(mockSession); + + expect(handleRecoverySpy).not.toHaveBeenCalled(); + expect(initiateRollbackSpy).not.toHaveBeenCalled(); + }); + it("should invoke handleRecovery when crash is initially detected", async () => { mockSession = createMockSession("1000", "3"); @@ -202,6 +277,7 @@ describe("CrashRecoveryManager Tests", () => { const testData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); + const sessionId = testData.id; const handleRecoverySpy = jest .spyOn(crashManager, "handleRecovery") @@ -229,47 +305,13 @@ describe("CrashRecoveryManager Tests", () => { handleRecoverySpy.mockRestore(); }); - it("should detect crash based on session timeout", async () => { - mockSession = createMockSession("1000", "3"); // timeout of 1 sec - - const testData = mockSession.hasClientSessionData() - ? mockSession.getClientSessionData() - : mockSession.getServerSessionData(); - - const handleRecoverySpy = jest - .spyOn(crashManager, "handleRecovery") - .mockImplementation(async () => true); - - const key = getSatpLogKey(sessionId, "type", "done"); - - const pastTime = new Date(Date.now() - 10000).toISOString(); - - const mockLogEntry: LocalLog = { - sessionID: sessionId, - type: "type", - key: key, - operation: "done", - timestamp: pastTime, - data: JSON.stringify(testData), - }; - - const mockLogRepository = crashManager["logRepository"]; - - await mockLogRepository.create(mockLogEntry); - - await crashManager.checkAndResolveCrash(mockSession); - - expect(handleRecoverySpy).toHaveBeenCalled(); - - handleRecoverySpy.mockRestore(); - }); - it("should detect crash based on incomplete operation in logs and initiate rollback when recovery fails", async () => { mockSession = createMockSession("10000", "3"); const testData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); + const sessionId = testData.id; const handleRecoverySpy = jest .spyOn(crashManager, "handleRecovery") @@ -309,6 +351,7 @@ describe("CrashRecoveryManager Tests", () => { const testData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); + const sessionId = testData.id; const handleRecoverySpy = jest .spyOn(crashManager, "handleRecovery") @@ -344,11 +387,12 @@ describe("CrashRecoveryManager Tests", () => { handleInitiateRollBackSpy.mockRestore(); }); - it("should process recovered logs and reconstruct SessionData", async () => { + it("should process recovered logs and add missing logs", async () => { const mockSession = createMockSession("1000", "3"); const testData = mockSession.hasClientSessionData() ? mockSession.getClientSessionData() : mockSession.getServerSessionData(); + const sessionId = testData.id; const recoveredLogs: LocalLog[] = [ { diff --git a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts b/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts deleted file mode 100644 index 020670e50f..0000000000 --- a/packages/cactus-plugin-satp-hermes/src/test/typescript/unit/recovery/start-gateway.test.ts +++ /dev/null @@ -1,90 +0,0 @@ -import "jest-extended"; -import { LoggerProvider, LogLevelDesc } from "@hyperledger/cactus-common"; -import { - SATPGateway, - SATPGatewayConfig, -} from "../../../../main/typescript/plugin-satp-hermes-gateway"; -import { PluginFactorySATPGateway } from "../../../../main/typescript/factory/plugin-factory-gateway-orchestrator"; -import { - IPluginFactoryOptions, - PluginImportType, -} from "@hyperledger/cactus-core-api"; -import { - SupportedChain, - Address, -} from "../../../../main/typescript/core/types"; - -const logLevel: LogLevelDesc = "DEBUG"; -const log = LoggerProvider.getOrCreate({ - level: logLevel, - label: "gateway-test", -}); - -describe("SATPGateway tests", () => { - it("should initialize two gateways and test their interaction", async () => { - const factoryOptions: IPluginFactoryOptions = { - pluginImportType: PluginImportType.Local, - }; - - const factory = new PluginFactorySATPGateway(factoryOptions); - const gatewayIdentity1 = { - id: "mockID-1", - name: "CustomGateway", - version: [ - { - Core: "v02", - Architecture: "v02", - Crash: "v02", - }, - ], - supportedDLTs: [SupportedChain.BESU], - proofID: "mockProofID10", - address: "http://localhost" as Address, - }; - - const gatewayIdentity2 = { - id: "mockID-2", - name: "CustomGateway", - version: [ - { - Core: "v02", - Architecture: "v02", - Crash: "v02", - }, - ], - supportedDLTs: [SupportedChain.FABRIC], - proofID: "mockProofID11", - address: "http://localhost" as Address, - gatewayServerPort: 3110, - gatewayClientPort: 3111, - gatewayOpenAPIPort: 4110, - }; - - const options1: SATPGatewayConfig = { - logLevel: "DEBUG", - gid: gatewayIdentity1, - counterPartyGateways: [gatewayIdentity2], - bridgesConfig: [], - }; - - const options2: SATPGatewayConfig = { - logLevel: "DEBUG", - gid: gatewayIdentity2, - counterPartyGateways: [gatewayIdentity1], - bridgesConfig: [], - }; - const gateway1 = await factory.create(options1); - expect(gateway1).toBeInstanceOf(SATPGateway); - await gateway1.startup(); - - const gateway2 = await factory.create(options2); - expect(gateway2).toBeInstanceOf(SATPGateway); - - await gateway2.startup(); - - log.info("gateway test!"); - - await gateway1.shutdown(); - await gateway2.shutdown(); - }); -}); diff --git a/yarn.lock b/yarn.lock index a89ed31492..7c2051b95d 100644 --- a/yarn.lock +++ b/yarn.lock @@ -40729,7 +40729,7 @@ __metadata: languageName: node linkType: hard -"pg-pool@npm:^3.5.2": +"pg-pool@npm:^3.5.2, pg-pool@npm:^3.7.0": version: 3.7.0 resolution: "pg-pool@npm:3.7.0" peerDependencies: @@ -40738,23 +40738,7 @@ __metadata: languageName: node linkType: hard -"pg-pool@npm:^3.7.0": - version: 3.7.0 - resolution: "pg-pool@npm:3.7.0" - peerDependencies: - pg: ">=8.0" - checksum: 10/a07a4f9e26eec9d7ac3597dc7b3469c62983edff9a321dbb7acbe1bbc7f5e9b2d33438e277d4cf8145071f3d63c7ebdc287a539fd69dfb8cdddb15b33eefe1a2 - languageName: node - linkType: hard - -"pg-protocol@npm:*, pg-protocol@npm:^1.5.0": - version: 1.7.0 - resolution: "pg-protocol@npm:1.7.0" - checksum: 10/ffffdf74426c9357b57050f1c191e84447c0e8b2a701b3ab302ac7dd0eb27b862d92e5e3b2d38876a1051de83547eb9165d6a58b3a8e90bb050dae97f9993d54 - languageName: node - linkType: hard - -"pg-protocol@npm:^1.7.0": +"pg-protocol@npm:*, pg-protocol@npm:^1.5.0, pg-protocol@npm:^1.7.0": version: 1.7.0 resolution: "pg-protocol@npm:1.7.0" checksum: 10/ffffdf74426c9357b57050f1c191e84447c0e8b2a701b3ab302ac7dd0eb27b862d92e5e3b2d38876a1051de83547eb9165d6a58b3a8e90bb050dae97f9993d54 @@ -45025,7 +45009,7 @@ __metadata: languageName: node linkType: hard -"safe-stable-stringify@npm:2.5.0": +"safe-stable-stringify@npm:2.5.0, safe-stable-stringify@npm:^2.4.3": version: 2.5.0 resolution: "safe-stable-stringify@npm:2.5.0" checksum: 10/2697fa186c17c38c3ca5309637b4ac6de2f1c3d282da27cd5e1e3c88eca0fb1f9aea568a6aabdf284111592c8782b94ee07176f17126031be72ab1313ed46c5c