Skip to content

Commit

Permalink
feat: add cron schedule for periodic crash checks
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <yogeshone678@gmail.com>
  • Loading branch information
Yogesh01000100 committed Nov 21, 2024
1 parent 8cb647e commit 057304c
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 58 deletions.
3 changes: 3 additions & 0 deletions packages/cactus-plugin-satp-hermes/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,11 @@
"jsonc": "2.0.0",
"knex": "2.4.0",
"kubo-rpc-client": "3.0.1",
"node-cron": "3.0.2",
"npm-run-all": "4.1.5",
"openzeppelin-solidity": "3.4.2",
"pg": "^8.13.0",
"safe-stable-stringify": "2.5.0",
"secp256k1": "4.0.3",
"socket.io": "4.6.2",
"sqlite3": "5.1.5",
Expand All @@ -157,6 +159,7 @@
"@types/fs-extra": "11.0.4",
"@types/google-protobuf": "3.15.5",
"@types/node": "18.18.2",
"@types/node-cron": "3.0.11",
"@types/pg": "8.6.5",
"@types/swagger-ui-express": "4.1.6",
"@types/tape": "4.13.4",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import {
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SessionType } from "../session-utils";
import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager";
import cron from "node-cron";

export enum CrashStatus {
IN_RECOVERY = "IN_RECOVERY",
Expand Down Expand Up @@ -50,7 +51,7 @@ export class CrashRecoveryManager {
public static readonly CLASS_NAME = "CrashRecoveryManager";
private readonly log: Logger;
private readonly instanceId: string;
private sessions: Map<string, SessionData>;
private sessions: Map<string, SATPSession>;
private crashRecoveryHandler: CrashRecoveryHandler;
private factory: RollbackStrategyFactory;
private logRepository: ILocalLogRepository;
Expand All @@ -63,7 +64,7 @@ export class CrashRecoveryManager {
const label = this.className;
this.log = LoggerProvider.getOrCreate({ level, label });
this.instanceId = options.instanceId;
this.sessions = new Map<string, SessionData>();
this.sessions = new Map<string, SATPSession>();
this.log.info(`Instantiated ${this.className} OK`);
this.logRepository = new LocalLogRepository(options.knexConfig);
this.factory = new RollbackStrategyFactory(
Expand Down Expand Up @@ -103,67 +104,39 @@ export class CrashRecoveryManager {
const sessionId = log.sessionID;
this.log.info(`${fnTag}, recovering session: ${sessionId}`);

if (log == undefined || log.data == undefined) {
throw new Error(`${fnTag}, invalid log}`);
if (!log || !log.data) {
throw new Error(`${fnTag}, invalid log`);
}

try {
const logEntry: SessionData = JSON.parse(log.data);
this.sessions.set(sessionId, logEntry);
const sessionData: SessionData = JSON.parse(log.data);
const satpSession = SATPSession.fromSessionData(sessionData);
this.sessions.set(sessionId, satpSession);
} catch (error) {
this.log.error(
`Error parsing log data for session Id: ${sessionId}: ${error}`,
);
}
}
this.detectCrash();
} catch (error) {
this.log.error(`Error initializing sessions: ${error}`);
}
}

private async checkCrash(session: SATPSession): Promise<CrashStatus> {
const fnTag = `${this.className}#checkCrash()`;
const sessionData = session.hasClientSessionData()
? session.getClientSessionData()
: session.getServerSessionData();

try {
session.verify(
fnTag,
session.hasClientSessionData()
? SessionType.CLIENT
: SessionType.SERVER,
);

const lastLog = await this.logRepository.readLastestLog(
session.getSessionId(),
);

if (lastLog && lastLog.operation !== "done") {
this.log.debug(
`${fnTag} Crash detected for session ID: ${session.getSessionId()} last log operation: ${lastLog.operation}`,
);
return CrashStatus.IN_RECOVERY;
}

const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime();
const currentTime = new Date().getTime();
const timeDifference = currentTime - logTimestamp;

if (timeDifference > Number(sessionData.maxTimeout)) {
this.log.warn(
`${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`,
);
return CrashStatus.IN_RECOVERY;
}
private detectCrash() {
const fnTag = `${this.className}#startCrashDetectionCron()`;
cron.schedule("*/10 * * * * *", async () => {
this.log.debug(`${fnTag} Running crash detection cron job.`);
// helper function
await this.checkAndResolveCrashes();
});
this.log.info(`${fnTag} Crash detection cron job scheduled.`);
}

this.log.info(
`${fnTag} No crash detected for session ID: ${session.getSessionId()}`,
);
return CrashStatus.NO_CRASH;
} catch (error) {
this.log.error(`${fnTag} Error occured !`);
return CrashStatus.ERROR;
public async checkAndResolveCrashes(): Promise<void> {
for (const session of this.sessions.values()) {
await this.checkAndResolveCrash(session);
}
}

Expand Down Expand Up @@ -212,16 +185,16 @@ export class CrashRecoveryManager {
`${fnTag} Retry attempt ${attempts} for sessionID: ${session.getSessionId()}`,
);
}
if (attempts != 0) {
this.log.warn(`${fnTag} All retries exhausted ! Initiating Rollback`);
if (attempts !== 0) {
this.log.warn(`${fnTag} All retries exhausted! Initiating Rollback`);
const rollBackStatus = await this.initiateRollback(session, true);
if (rollBackStatus) {
this.log.info(
`${fnTag} rollback was success: ${session.getSessionId()}`,
`${fnTag} Rollback was successful for sessionID: ${session.getSessionId()}`,
);
} else {
this.log.error(
`${fnTag} rollback failed ! ${session.getSessionId()}`,
`${fnTag} Rollback failed for sessionID: ${session.getSessionId()}`,
);
}
}
Expand All @@ -230,6 +203,52 @@ export class CrashRecoveryManager {
}
}

private async checkCrash(session: SATPSession): Promise<CrashStatus> {
const fnTag = `${this.className}#checkCrash()`;
const sessionData = session.hasClientSessionData()
? session.getClientSessionData()
: session.getServerSessionData();

try {
session.verify(
fnTag,
session.hasClientSessionData()
? SessionType.CLIENT
: SessionType.SERVER,
);

const lastLog = await this.logRepository.readLastestLog(
session.getSessionId(),
);

if (lastLog && lastLog.operation !== "done") {
this.log.debug(
`${fnTag} Crash detected for session ID: ${session.getSessionId()}, last log operation: ${lastLog.operation}`,
);
return CrashStatus.IN_RECOVERY;
}

const logTimestamp = new Date(lastLog?.timestamp ?? 0).getTime();
const currentTime = new Date().getTime();
const timeDifference = currentTime - logTimestamp;

if (timeDifference > Number(sessionData.maxTimeout)) {
this.log.warn(
`${fnTag} Timeout exceeded by ${timeDifference} ms for session ID: ${session.getSessionId()}`,
);
return CrashStatus.IN_RECOVERY;
}

this.log.info(
`${fnTag} No crash detected for session ID: ${session.getSessionId()}`,
);
return CrashStatus.NO_CRASH;
} catch (error) {
this.log.error(`${fnTag} Error occurred during crash check: ${error}`);
return CrashStatus.ERROR;
}
}

public async handleRecovery(session: SATPSession): Promise<boolean> {
const fnTag = `${this.className}#handleRecovery()`;

Expand Down Expand Up @@ -316,12 +335,23 @@ export class CrashRecoveryManager {
return false;
}

let reconstructedSessionData = new SessionData();
let reconstructedSessionData: SessionData | undefined;

for (const logEntry of allLogs) {
const data = JSON.parse(logEntry.data);
reconstructedSessionData = data;
this.sessions.set(sessionId, reconstructedSessionData);

if (reconstructedSessionData) {
// Reconstruct SATPSession from SessionData
const satpSession = SATPSession.fromSessionData(
reconstructedSessionData,
);
this.sessions.set(sessionId, satpSession);
} else {
this.log.error(
`${fnTag} Reconstructed session data is undefined for session ID: ${sessionId}`,
);
}
}

this.log.info(
Expand Down Expand Up @@ -396,6 +426,7 @@ export class CrashRecoveryManager {
return await strategy.execute(session);
} catch (error) {
this.log.error(`${fnTag} Error executing rollback strategy: ${error}`);
return undefined;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,21 @@ import {
ILoggerOptions,
} from "@hyperledger/cactus-common";
import { Empty } from "@bufbuild/protobuf";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
//import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import { ILocalLogRepository } from "../../repository/interfaces/repository";
import { getSatpLogKey } from "../../gateway-utils";
import { SATPSession } from "../satp-session";

interface HandlerOptions {
crashService: CrashRecoveryService;
loggerOptions: ILoggerOptions;
sessions: Map<string, SessionData>;
sessions: Map<string, SATPSession>;
logRepository: ILocalLogRepository;
}

export class CrashRecoveryHandler {
public static readonly CLASS_NAME = "CrashRecoveryHandler";
public sessions: Map<string, SessionData>;
public sessions: Map<string, SATPSession>;
private service: CrashRecoveryService;
private log: Logger;
private logRepository: ILocalLogRepository;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,29 @@ export class SATPSession {
return this.clientSessionData;
}

public static fromSessionData(sessionData: SessionData): SATPSession {
// Determine if it's a client or server session based on the presence of gateway pubkeys
const isServer = sessionData.serverGatewayPubkey !== "";
const isClient = sessionData.clientGatewayPubkey !== "";

const session = new SATPSession({
contextID: sessionData.transferContextId,
sessionID: sessionData.id,
server: isServer,
client: isClient,
});

// Assign the sessionData to the appropriate property
if (isServer) {
session.serverSessionData = sessionData;
}
if (isClient) {
session.clientSessionData = sessionData;
}

return session;
}

public createSessionData(
type: SessionType,
sessionId: string,
Expand Down
Loading

0 comments on commit 057304c

Please sign in to comment.