diff --git a/src/client.js b/src/client.js index fea7a44..b0ca74b 100644 --- a/src/client.js +++ b/src/client.js @@ -39,6 +39,39 @@ import SalesforceAuth from './utils/auth.js'; * @protected */ +/** + * Custom Long Avro type used for deserializing large numbers with BitInt. + * This fixes a deserialization bug with Avro not supporting large values. + * @private + */ +const CUSTOM_LONG_AVRO_TYPE = avro.types.LongType.using({ + fromBuffer: (buf) => { + const big = buf.readBigInt64LE(); + if (big > Number.MAX_SAFE_INTEGER) { + return big; + } + return Number(BigInt.asIntN(64, big)); + }, + toBuffer: (n) => { + const buf = Buffer.alloc(8); + if (n instanceof BigInt) { + buf.writeBigInt64LE(n); + } else { + buf.writeBigInt64LE(BigInt(n)); + } + return buf; + }, + fromJSON: BigInt, + toJSON: Number, + isValid: (n) => { + const type = typeof n; + return type === 'bigint' || type === 'number'; + }, + compare: (n1, n2) => { + return n1 === n2 ? 0 : n1 < n2 ? -1 : 1; + } +}); + /** * Client for the Salesforce Pub/Sub API * @alias PubSubApiClient @@ -285,7 +318,7 @@ export default class PubSubApiClient { // eslint-disable-next-line no-empty } catch (error) {} const message = replayId - ? `Failed to parse event with replay ID ${this.replayId}` + ? `Failed to parse event with replay ID ${replayId}` : `Failed to parse event with unknown replay ID (latest replay ID was ${latestReplayId})`; const parseError = new EventParseError( message, @@ -295,6 +328,7 @@ export default class PubSubApiClient { latestReplayId ); eventEmitter.emit('error', parseError); + this.#logger.error(parseError); } // Emit a 'lastevent' event when reaching the last requested event count if ( @@ -434,7 +468,9 @@ export default class PubSubApiClient { if (schemaError) { reject(schemaError); } else { - const schemaType = avro.parse(res.schemaJson); + const schemaType = avro.parse(res.schemaJson, { + registry: { long: CUSTOM_LONG_AVRO_TYPE } + }); this.#logger.info( `Topic schema loaded: ${topicName}` );