Skip to content

Commit

Permalink
feat: add orchestrator communication layer using connect-RPC
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <yogeshone678@gmail.com>
  • Loading branch information
Yogesh01000100 committed Nov 26, 2024
1 parent 1622096 commit 4eef528
Show file tree
Hide file tree
Showing 14 changed files with 819 additions and 808 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,19 +11,23 @@ import {
RollbackStrategy,
RollbackStrategyFactory,
} from "./rollback/rollback-strategy-factory";
import { CrashRecoveryService } from "./crash-utils";
import { KnexLocalLogRepository as LocalLogRepository } from "../../repository/knex-local-log-repository";
import { ILocalLogRepository } from "../../repository/interfaces/repository";
import { Knex } from "knex";
import {
RecoverMessage,
RecoverSuccessMessage,
RecoverUpdateMessage,
RollbackState,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { SessionType } from "../session-utils";
import { ISATPBridgesOptions } from "../../gol/satp-bridges-manager";
import { SATPBridgesManager } from "../../gol/satp-bridges-manager";
import cron, { ScheduledTask } from "node-cron";
import { CrashRecoveryServerService } from "./recovery-server-service";
import { CrashRecoveryClientService } from "./recovery-client-service";
import { GatewayOrchestrator } from "../../gol/gateway-orchestrator";
import { PromiseClient as PromiseConnectClient } from "@connectrpc/connect";
import { GatewayIdentity, SupportedChain } from "../types";
import { CrashRecovery } from "../../generated/proto/cacti/satp/v02/crash_recovery_connect";

export enum CrashStatus {
IN_RECOVERY = "IN_RECOVERY",
RECOVERED = "RECOVERED",
Expand All @@ -44,7 +48,8 @@ export interface ICrashRecoveryManagerOptions {
logLevel?: LogLevelDesc;
instanceId: string;
knexConfig?: Knex.Config;
bridgeConfig: ISATPBridgesOptions;
bridgeConfig: SATPBridgesManager;
orchestrator: GatewayOrchestrator;
}

export class CrashRecoveryManager {
Expand All @@ -56,6 +61,11 @@ export class CrashRecoveryManager {
private factory: RollbackStrategyFactory;
public logRepository: ILocalLogRepository;
private crashDetectionTask!: ScheduledTask;
private crashRecoveryServerService: CrashRecoveryServerService;
private crashRecoveryClientService: CrashRecoveryClientService;
private orchestrator: GatewayOrchestrator;
private gatewaysPubKeys: Map<string, string> = new Map();
private readonly bridgesManager: SATPBridgesManager;

constructor(public readonly options: ICrashRecoveryManagerOptions) {
const fnTag = `${CrashRecoveryManager.CLASS_NAME}#constructor()`;
Expand All @@ -68,28 +78,28 @@ export class CrashRecoveryManager {
this.sessions = new Map<string, SATPSession>();
this.log.info(`Instantiated ${this.className} OK`);
this.logRepository = new LocalLogRepository(options.knexConfig);
this.orchestrator = options.orchestrator;
this.bridgesManager = options.bridgeConfig;
this.loadPubKeys(this.orchestrator.getCounterPartyGateways());
this.factory = new RollbackStrategyFactory(
options.bridgeConfig,
this.bridgesManager,
this.logRepository,
);
const crashRecoveryServiceOptions = {
logLevel: this.options.logLevel,
instanceId: this.instanceId,
loggerOptions: {
label: "CrashRecoveryService",
level: this.options.logLevel || "DEBUG",
},
logRepository: this.logRepository,
};
this.crashRecoveryHandler = new CrashRecoveryHandler({
loggerOptions: {
label: "CrashRecoveryHandler",
level: "DEBUG",
},
crashService: new CrashRecoveryService(crashRecoveryServiceOptions),
sessions: this.sessions,
logRepository: this.logRepository,
});
this.crashRecoveryServerService = new CrashRecoveryServerService(
this.logRepository,
this.sessions,
);

this.crashRecoveryClientService = new CrashRecoveryClientService();

this.crashRecoveryHandler = new CrashRecoveryHandler(
this.crashRecoveryServerService,
this.crashRecoveryClientService,
);
this.crashRecoveryHandler = new CrashRecoveryHandler(
this.crashRecoveryServerService,
this.crashRecoveryClientService,
);
}

get className(): string {
Expand Down Expand Up @@ -278,56 +288,58 @@ export class CrashRecoveryManager {

public async handleRecovery(session: SATPSession): Promise<boolean> {
const fnTag = `${this.className}#handleRecovery()`;

const sessionData = session.hasClientSessionData()
? session.getClientSessionData()
: session.getServerSessionData();
if (!sessionData) {
throw new Error(`${fnTag}, session data is not correctly initialized`);
}
this.log.debug(
`${fnTag} - Starting crash recovery for sessionId: ${session.getSessionId()}`,
);

try {
this.log.info(
`${fnTag} Initiating recovery for session ID: ${session.getSessionId()}`,
const channel = this.orchestrator.getChannel(
session.getClientSessionData()
.recipientGatewayNetworkId as SupportedChain,
);

const recoverMessage = new RecoverMessage({
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-msg",
satpPhase: "sessionData.hashes?.stage0", //todo: get phase info
sequenceNumber: Number(sessionData.lastSequenceNumber),
isBackup: false,
newIdentityPublicKey: "",
lastEntryTimestamp: sessionData.lastSequenceNumber,
senderSignature: "",
});

const recoverUpdateMessage =
await this.crashRecoveryHandler.handleRecover(recoverMessage);

const response = await this.processRecoverUpdate(recoverUpdateMessage);

if (response) {
const recoverSuccessMessage = new RecoverSuccessMessage({
sessionId: session.getSessionId(),
messageType: "urn:ietf:SATP-2pc:msgtype:recover-success-msg",
hashRecoverUpdateMessage: "",
success: true,
entriesChanged: [],
senderSignature: "",
});

await this.crashRecoveryHandler.handleRecoverSuccess(
recoverSuccessMessage,
if (!channel) {
throw new Error(
`${fnTag} - Channel not found for the recipient gateway network ID.`,
);
}

this.log.info(
`${fnTag} Recovery handled successfully for session ID: ${session.getSessionId()}`,
);
return true;
} else {
return false;
const counterGatewayID = this.orchestrator.getGatewayIdentity(
channel.toGatewayID,
);
if (!counterGatewayID) {
throw new Error(`${fnTag} - Counterparty gateway ID not found.`);
}

const clientCrashRecovery: PromiseConnectClient<typeof CrashRecovery> =
channel.clients.get("4") as PromiseConnectClient<typeof CrashRecovery>;

if (!clientCrashRecovery) {
throw new Error(`${fnTag} - Failed to get clientCrashRecovery.`);
}

const recoverMessage =
await this.crashRecoveryHandler.createRecoverMessage(session);

const recoverUpdateMessage =
await clientCrashRecovery.recoverV2Message(recoverMessage);

this.log.info(
`${fnTag} - Received RecoverUpdateMessage: ${JSON.stringify(recoverUpdateMessage)}`,
);

await this.processRecoverUpdateMessage(recoverUpdateMessage);

const recoverSuccessMessage =
await this.crashRecoveryHandler.createRecoverSuccessMessage(session);

await clientCrashRecovery.recoverV2SuccessMessage(recoverSuccessMessage);

this.log.info(
`${fnTag} - Crash recovery completed for sessionId: ${session.getSessionId()}`,
);

return true;
} catch (error) {
this.log.error(
`${fnTag} Error during recovery process for session ID: ${session.getSessionId()} - ${error}`,
Expand All @@ -338,7 +350,7 @@ export class CrashRecoveryManager {
}
}

private async processRecoverUpdate(
private async processRecoverUpdateMessage(
message: RecoverUpdateMessage,
): Promise<boolean> {
const fnTag = `${this.className}#processRecoverUpdate()`;
Expand All @@ -355,32 +367,24 @@ export class CrashRecoveryManager {
await this.logRepository.create(logEntry);
}

const allLogs = await this.logRepository.readLogsBySessionId(sessionId);

if (!allLogs || allLogs.length === 0) {
this.log.error(`${fnTag} No logs found for session ID: ${sessionId}`);
return false;
}

let reconstructedSessionData: SessionData | undefined;
for (const log of recoveredLogs) {
const sessionId = log.sessionID;
this.log.info(`${fnTag}, recovering session: ${sessionId}`);

for (const logEntry of allLogs) {
const data = JSON.parse(logEntry.data);
reconstructedSessionData = data;
if (!log || !log.data) {
throw new Error(`${fnTag}, invalid log`);
}

if (reconstructedSessionData) {
// Reconstruct SATPSession from SessionData
const satpSession = SATPSession.fromSessionData(
reconstructedSessionData,
);
try {
const sessionData: SessionData = JSON.parse(log.data);
const satpSession = SATPSession.fromSessionData(sessionData);
this.sessions.set(sessionId, satpSession);
} else {
} catch (error) {
this.log.error(
`${fnTag} Reconstructed session data is undefined for session ID: ${sessionId}`,
`Error parsing log data for session Id: ${sessionId}: ${error}`,
);
}
}

this.log.info(
`${fnTag} Session data successfully reconstructed for session ID: ${sessionId}`,
);
Expand Down Expand Up @@ -481,4 +485,16 @@ export class CrashRecoveryManager {
return false;
}
}

private loadPubKeys(gateways: Map<string, GatewayIdentity>): void {
gateways.forEach((gateway) => {
if (gateway.pubKey) {
this.gatewaysPubKeys.set(gateway.id, gateway.pubKey);
}
});
this.gatewaysPubKeys.set(
this.orchestrator.getSelfId(),
this.orchestrator.ourGateway.pubKey!,
);
}
}
Loading

0 comments on commit 4eef528

Please sign in to comment.