Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cactus-connector-ethereum): add RunTransactionV1Exchange to share receipt data #3643

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ import {
Web3StringReturnFormat,
convertWeb3ReceiptStatusToBool,
} from "./types/util-types";
import { Observable, ReplaySubject } from "rxjs";

export interface RunTransactionV1Exchange {
request: InvokeContractV1Request;
response: RunTransactionResponse;
timestamp: Date;
}

// Used when waiting for WS requests to be send correctly before disconnecting
const waitForWsProviderRequestsTimeout = 5 * 1000; // 5s
Expand Down Expand Up @@ -150,6 +157,9 @@ export class PluginLedgerConnectorEthereum
private watchBlocksSubscriptions: Map<string, WatchBlocksV1Endpoint> =
new Map();

private txSubject: ReplaySubject<RunTransactionV1Exchange> =
new ReplaySubject();

public get className(): string {
return PluginLedgerConnectorEthereum.CLASS_NAME;
}
Expand Down Expand Up @@ -235,6 +245,10 @@ export class PluginLedgerConnectorEthereum
return this.instanceId;
}

public getTxSubjectObservable(): Observable<RunTransactionV1Exchange> {
return this.txSubject.asObservable();
}

private async removeWatchBlocksSubscriptionForSocket(socketId: string) {
try {
const subscription = this.watchBlocksSubscriptions.get(socketId);
Expand Down Expand Up @@ -655,6 +669,16 @@ export class PluginLedgerConnectorEthereum
});
const success = out.transactionReceipt.status;
const data = { success, out };

// create RunTransactionV1Exchange for transaction monitoring
const receiptData: RunTransactionV1Exchange = {
request: req,
response: out,
timestamp: new Date(),
};
this.log.debug(`RunTransactionV1Exchange created ${receiptData}`);
this.txSubject.next(receiptData);

return data;
} else {
throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from "./generated/openapi/typescript-axios";
export {
PluginLedgerConnectorEthereum,
IPluginLedgerConnectorEthereumOptions,
RunTransactionV1Exchange,
} from "./plugin-ledger-connector-ethereum";

export * from "./sign-utils";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,16 @@ export function signTransaction(
| FeeMarketEIP1559Transaction
| BlobEIP4844Transaction;
} {
let chainConfiguration = new Common({ chain: Chain.Mainnet });
let chainConfiguration = new Common({
chain: Chain.Mainnet,
hardfork: "istanbul",
});
if (customChainInfo) {
chainConfiguration = Common.custom(customChainInfo);
}

const transaction = TransactionFactory.fromTxData(txData, {
common: chainConfiguration,
common: chainConfiguration as any,
});
if (privateKey.toLowerCase().startsWith("0x")) {
privateKey = privateKey.slice(2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import Web3, { BlockHeaderOutput, FMT_BYTES, FMT_NUMBER } from "web3";
import { NewHeadsSubscription } from "web3-eth";
import { NewHeadsSubscription, RegisteredSubscription } from "web3-eth";
import type { Socket as SocketIoSocket } from "socket.io";

import {
Expand All @@ -21,10 +21,7 @@ import {
WatchBlocksV1,
Web3Transaction,
} from "../generated/openapi/typescript-axios";
import {
ConvertWeb3ReturnToString,
Web3StringReturnFormat,
} from "../types/util-types";
import { ConvertWeb3ReturnToString } from "../types/util-types";

const DEFAULT_HTTP_POLL_INTERVAL = 1000 * 5; // 5 seconds
const LAST_SEEN_LATEST_BLOCK = -1; // must be negative number, will be replaced with latest block in code
Expand Down Expand Up @@ -337,7 +334,7 @@ export class WatchBlocksV1SubscriptionEndpoint extends WatchBlocksV1Endpoint {
}

public async subscribe() {
const { socket, log, web3, isGetBlockData } = this;
const { socket, log, isGetBlockData } = this;
log.info(`${WatchBlocksV1.Subscribe} [WS Subscription] => ${socket.id}`);

if (this.isSubscribed) {
Expand All @@ -352,46 +349,53 @@ export class WatchBlocksV1SubscriptionEndpoint extends WatchBlocksV1Endpoint {
await this.emitAllSinceLastSeenBlock();

log.debug("Subscribing to Web3 new block headers event...");
this.newBlocksSubscription = await web3.eth.subscribe(
"newBlockHeaders",
undefined,
Web3StringReturnFormat,
);

this.newBlocksSubscription.on("data", async (blockHeader) => {
try {
log.debug("newBlockHeaders:", blockHeader);

if (typeof blockHeader.number === undefined) {
throw new Error(
`Missing block number in received block header (number: ${blockHeader.number}, hash: ${blockHeader.hash})`,
);
}
const blockNumber = Number(blockHeader.number);
if (blockNumber - this.lastSeenBlock > 2) {
log.info(
`Detected missing blocks since latest one (blockNumber: ${blockNumber}, lastSeenBlock: ${this.lastSeenBlock})`,
);
await this.emitAllSinceLastSeenBlock();
}
const options = {
subscription: "newBlockHeaders" as keyof RegisteredSubscription,
parameters: {},
};

let next: WatchBlocksV1Progress;
if (isGetBlockData) {
next = await this.getFullBlockProgress(blockNumber);
} else {
next = await this.headerDataToBlockProgress(blockHeader);
this.newBlocksSubscription = (await this.web3.eth.subscribe(
options.subscription,
options.parameters,
)) as unknown as NewHeadsSubscription;

this.newBlocksSubscription?.on(
"data",
async (blockHeader: BlockHeaderOutput) => {
try {
log.debug("newBlockHeaders:", blockHeader);

if (blockHeader.number === undefined || blockHeader.number === null) {
throw new Error(
`Missing block number in received block header (number: ${blockHeader.number}, hash: ${blockHeader.hash})`,
);
}
const blockNumber = Number(blockHeader.number);
if (blockNumber - this.lastSeenBlock > 2) {
log.info(
`Detected missing blocks since latest one (blockNumber: ${blockNumber}, lastSeenBlock: ${this.lastSeenBlock})`,
);
await this.emitAllSinceLastSeenBlock();
}

let next: WatchBlocksV1Progress;
if (isGetBlockData) {
next = await this.getFullBlockProgress(blockNumber);
} else {
next = await this.headerDataToBlockProgress(blockHeader);
}

socket.emit(WatchBlocksV1.Next, next);

this.lastSeenBlock = blockNumber;
} catch (error) {
log.warn("Error when parsing subscribed block data:", error);
}
},
);

socket.emit(WatchBlocksV1.Next, next);

this.lastSeenBlock = blockNumber;
} catch (error) {
log.warn("Error when parsing subscribed block data:", error);
}
});

this.newBlocksSubscription.on("error", async (error) => {
console.log("Error when subscribing to new block header: ", error);
this.newBlocksSubscription?.on("error", async (error: Error) => {
log.error("Error when subscribing to new block header: ", error);
socket.emit(WatchBlocksV1.Error, safeStringifyException(error));
await this.unsubscribe();
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { v4 as uuidV4 } from "uuid";
import { AddressInfo } from "net";
import { Server as SocketIoServer } from "socket.io";
import Web3, { HexString } from "web3";
import { Address } from "@ethereumjs/util";

import {
LogLevelDesc,
Expand Down Expand Up @@ -351,11 +352,11 @@ describe("Ethereum contract deploy and invoke using keychain tests", () => {

test("invoke Web3SigningCredentialType.None", async () => {
const testEthAccount2 = web3.eth.accounts.create();

const address = Address.fromString(testEthAccount2.address);
const value = 10e6;
const { serializedTransactionHex } = signTransaction(
{
to: testEthAccount2.address,
to: address,
value,
maxPriorityFeePerGas: 0,
maxFeePerGas: 0x40000000,
Expand Down
Loading