Skip to content

Commit

Permalink
refactor(recovery): consolidate logic and improve SATP message handling
Browse files Browse the repository at this point in the history
Signed-off-by: Yogesh01000100 <yogeshone678@gmail.com>
  • Loading branch information
Yogesh01000100 committed Aug 26, 2024
1 parent e190c53 commit 008dd96
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 163 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ message RecoverUpdateMessage {
string sessionId = 1;
string messageType = 2;
string hashRecoverMessage = 3;
repeated LogEntry recoveredLogs = 4;
repeated LocalLog recoveredLogs = 4;
string senderSignature = 5;
}

Expand Down Expand Up @@ -62,7 +62,7 @@ message RollbackAckMessage {
string senderSignature = 6;
}

message LogEntry {
message LocalLog {
string key=1;
string sessionId=2;
string data=3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,7 @@ import {
RollbackStrategy,
RollbackStrategyFactory,
} from "./rollback/rollback-strategy-factory";
import { CrashRecoveryClientService } from "./crash-recovery-client-service";
import { CrashRecoveryServerService } from "./crash-recovery-server-service";
import { CrashRecoveryService } from "./crash-utils";
import { KnexLocalLogRepository as LocalLogRepository } from "../../repository/knex-local-log-repository";
import { ILocalLogRepository } from "../../repository/interfaces/repository";
import { Knex } from "knex";
Expand Down Expand Up @@ -67,9 +66,9 @@ export class CrashRecoveryManager {
label: "CrashRecoveryHandler",
level: "DEBUG",
},
serverService: new CrashRecoveryServerService(),
clientService: new CrashRecoveryClientService(),
crashService: new CrashRecoveryService(crashRecoveryServiceOptions),
sessions: this.sessions,
logRepository: this.logRepository,
});
}

Expand Down Expand Up @@ -196,16 +195,32 @@ export class CrashRecoveryManager {

try {
if (session.hasServerSessionData()) {
// todo: Handling recovery as a server
this.log.info(
`${fnTag} Initiating recovery as a server for session ID: ${session.getSessionId()}`,
);
} else if (session.hasClientSessionData()) {
// todo: Handling recovery as a client
this.log.info(
`${fnTag} Initiating recovery as a client for session ID: ${session.getSessionId()}`,
);
} else {
throw new Error(
`${fnTag} Neither client nor server session data is available`,
`${fnTag} Neither client nor server session data is available for session ID: ${session.getSessionId()}`,
);
}

const recoverMessage =
await this.crashRecoveryHandler.sendRecover(session);
const recoverUpdateMessage =
await this.crashRecoveryHandler.sendRecoverUpdate(recoverMessage);
await this.crashRecoveryHandler.sendRecoverSuccess(recoverUpdateMessage);

this.log.info(
`${fnTag} Recovery handled successfully for session ID: ${session.getSessionId()}`,
);
} catch (error) {
this.log.error(`${fnTag} Error during recovery process: ${error}`);
this.log.error(
`${fnTag} Error during recovery process for session ID: ${session.getSessionId()} - ${error}`,
);
throw new Error(
`Recovery failed for session ID: ${session.getSessionId()}`,
);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ import {
RollbackMessage,
RollbackAckMessage,
} from "../../generated/proto/cacti/satp/v02/crash_recovery_pb";
import { CrashRecoveryServerService } from "./crash-recovery-server-service";
import { CrashRecoveryClientService } from "./crash-recovery-client-service";
import { CrashRecoveryService } from "./crash-utils";
import {
Logger,
LoggerProvider,
Expand All @@ -17,28 +16,28 @@ import {
import { Empty } from "@bufbuild/protobuf";
import { SessionData } from "../../generated/proto/cacti/satp/v02/common/session_pb";
import { SATPSession } from "../satp-session";
import { ILocalLogRepository } from "../../repository/interfaces/repository";

interface HandlerOptions {
serverService: CrashRecoveryServerService;
clientService: CrashRecoveryClientService;
crashService: CrashRecoveryService;
loggerOptions: ILoggerOptions;
sessions: Map<string, SessionData>;
logRepository: ILocalLogRepository;
}

// todo : create specific class for crash-recovery later implement to specific satp-phases
export class CrashRecoveryHandler {
public static readonly CLASS_NAME = "CrashRecoveryHandler";
private sessions: Map<string, SessionData>;
private serverService: CrashRecoveryServerService;
private clientService: CrashRecoveryClientService;
private service: CrashRecoveryService;
private logger: Logger;
private logRepository: ILocalLogRepository;

constructor(ops: HandlerOptions) {
this.sessions = ops.sessions;
this.serverService = ops.serverService as CrashRecoveryServerService;
this.clientService = ops.clientService as CrashRecoveryClientService;
this.service = ops.crashService;
this.logger = LoggerProvider.getOrCreate(ops.loggerOptions);
this.logger.trace(`Initialized ${CrashRecoveryHandler.CLASS_NAME}`);
this.logRepository = ops.logRepository;
}

getHandlerIdentifier(): string {
Expand All @@ -49,27 +48,45 @@ export class CrashRecoveryHandler {
return this.logger;
}

async sendRecover(req: RecoverMessage): Promise<RecoverUpdateMessage> {
const stepTag = `RecoverV2MessageImplementation()`;
const fnTag = `${this.getHandlerIdentifier()}#${stepTag}`;
private generateKey(): string {
//todo: key generation logic
return "key";
}

async sendRecover(req: SATPSession): Promise<RecoverUpdateMessage> {
const fnTag = `${this.getHandlerIdentifier()}#sendRecover`;
try {
this.Log.debug(`${fnTag}, Recover V2 Message...`);

const sessionID = req.sessionId;
const sessionData = this.sessions.get(sessionID);
const sessionId = req.getSessionId();
const sessionData = this.sessions.get(sessionId);
if (!sessionData) {
throw new Error(`${fnTag}, Session not found`);
}

// todo : add checks
const updateMessage = this.serverService.createRecoverUpdateMessage(
req,
sessionData,
);

if (!updateMessage) {
throw new Error(`${fnTag}, Failed to create RecoverUpdateMessage`);
}
const recoverMessage: RecoverMessage = {
sessionId: sessionId,
messageType: "Recover",
satpPhase: sessionData.phase,
sequenceNumber: sessionData.lastSequenceNumber,
isBackup: false,
newIdentityPublicKey: "",
lastEntryTimestamp: sessionData.lastSequenceNumber,
senderSignature: "",
};
const updateMessage =
this.service.createRecoverUpdateMessage(recoverMessage);

const logEntry = {
sessionID: sessionId,
type: "RECOVER",
key: "key", // generateKey(),
operation: "RECOVER_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: "",
};

await this.logRepository.create(logEntry);
return updateMessage;
} catch (error) {
throw new Error(`${fnTag}, Failed to process RecoverV2Message ${error}`);
Expand All @@ -90,13 +107,19 @@ export class CrashRecoveryHandler {
);
}

// todo : add checks
const successMessage = this.serverService.createRecoverSuccessMessage(
req,
sessionData,
);
const successMessage = this.service.createRecoverSuccessMessage(req);

this.Log.debug(`${fnTag}, Recover Success Message created`);
const logEntry = {
sessionID: req.sessionId,
type: "RECOVER_UPDATE",
key: "key", // generateKey(),
operation: "RECOVER_UPDATE_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: "",
};

await this.logRepository.create(logEntry);

return successMessage;
} catch (error) {
Expand All @@ -117,6 +140,16 @@ export class CrashRecoveryHandler {
}

this.Log.debug(`${fnTag}, Session recovery successfully completed`);
const logEntry = {
sessionID: req.sessionId,
type: "RECOVER_SUCCESS",
key: "key", // generateKey(),
operation: "RECOVER_SUCCESS_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: "",
};

await this.logRepository.create(logEntry);

return new Empty();
} catch (error) {
Expand All @@ -136,10 +169,19 @@ export class CrashRecoveryHandler {
throw new Error(`${fnTag}, Session not found`);
}

// todo : add checks
const ackMessage = this.serverService.createRollbackAckMessage(req);
const ackMessage = this.service.createRollbackAckMessage(req);

this.Log.debug(`${fnTag}, Rollback Ack Message created`);
const logEntry = {
sessionID: req.sessionId,
type: "ROLLBACK",
key: "key", //generateKey(),
operation: "ROLLBACK_MESSAGE_SENT",
timestamp: new Date().toISOString(),
data: "",
};

await this.logRepository.create(logEntry);

return ackMessage;
} catch (error) {
Expand All @@ -158,6 +200,16 @@ export class CrashRecoveryHandler {
}

this.Log.debug(`${fnTag}, Rollback successfully acknowledged`);
const logEntry = {
sessionID: req.sessionId,
type: "ROLLBACK_",
key: "key", //generateKey(),
operation: "ROLLBACK_",
timestamp: new Date().toISOString(),
data: "",
};

await this.logRepository.create(logEntry);

return new Empty();
} catch (error) {
Expand All @@ -176,48 +228,4 @@ export class CrashRecoveryHandler {
rollbackV2AckMessage: this.sendRollbackAck,
});
}

// TODO! what is this function for? seems like a service function
public async sendRecoverTODO(
sessionId: string,
): Promise<RecoverUpdateMessage> {
const stepTag = `SendRecoverV2Message()`;
const fnTag = `${this.getHandlerIdentifier()}#${stepTag}`;
try {
this.Log.debug(`${fnTag}, Sending Recover V2 Message...`);

const session = this.sessions.get(sessionId);
if (!session) {
throw new Error(`${fnTag}, Session not found`);
}

const recoverMessage = this.clientService.createRecoverMessage(session);
const response = await this.clientService.sendRecover(recoverMessage);

if (!response) {
throw new Error(`${fnTag}, Failed to receive RecoverUpdateMessage`);
}
return response;
} catch (error) {
throw new Error(`${fnTag}, Failed to send RecoverV2Message ${error}`);
}
}

public async SendRecoverUpdate(
recoverUpdateMessage: RecoverUpdateMessage,
): Promise<RecoverSuccessMessage> {
const fnTag = `${this.getHandlerIdentifier()}#sendRecoverUpdate()`;
try {
this.Log.debug(`${fnTag}, Sending Recover Update...`);

const successMessage =
await this.clientService.sendRecoverUpdateMessage(recoverUpdateMessage);

this.Log.debug(`${fnTag}, Recover Success Message created`);

return successMessage;
} catch (error) {
throw new Error(`${fnTag}, Error sending Recover Update: ${error}`);
}
}
}

This file was deleted.

Loading

0 comments on commit 008dd96

Please sign in to comment.