Skip to content

Commit

Permalink
build: release 3.4.0
Browse files Browse the repository at this point in the history
  • Loading branch information
pozil committed Dec 21, 2023
1 parent c6919fe commit 06d2973
Show file tree
Hide file tree
Showing 18 changed files with 607 additions and 445 deletions.
319 changes: 118 additions & 201 deletions README.md

Large diffs are not rendered by default.

215 changes: 141 additions & 74 deletions dist/client.cjs
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ module.exports = __toCommonJS(client_exports);
var import_crypto2 = __toESM(require("crypto"), 1);
var import_fs2 = __toESM(require("fs"), 1);
var import_url = require("url");
var import_avro_js2 = __toESM(require("avro-js"), 1);
var import_avro_js3 = __toESM(require("avro-js"), 1);
var import_certifi = __toESM(require("certifi"), 1);
var import_grpc_js = __toESM(require("@grpc/grpc-js"), 1);
var import_proto_loader = __toESM(require("@grpc/proto-loader"), 1);

// src/eventParseError.js
// src/utils/eventParseError.js
var EventParseError = class extends Error {
/**
* The cause of the error.
Expand Down Expand Up @@ -87,12 +87,13 @@ var EventParseError = class extends Error {
}
};

// src/pubSubEventEmitter.js
// src/utils/pubSubEventEmitter.js
var import_events = require("events");
var PubSubEventEmitter = class extends import_events.EventEmitter {
#topicName;
#requestedEventCount;
#receivedEventCount;
#latestReplayId;
/**
* Create a new EventEmitter for Pub/Sub API events
* @param {string} topicName
Expand All @@ -104,36 +105,85 @@ var PubSubEventEmitter = class extends import_events.EventEmitter {
this.#topicName = topicName;
this.#requestedEventCount = requestedEventCount;
this.#receivedEventCount = 0;
this.#latestReplayId = null;
}
emit(eventName, args) {
if (eventName === "data") {
this.#receivedEventCount++;
this.#latestReplayId = args.replayId;
}
return super.emit(eventName, args);
}
/**
* Returns the number of events that were requested during the subscription
* Returns the number of events that were requested when subscribing.
* @returns {number} the number of events that were requested
*/
getRequestedEventCount() {
return this.#requestedEventCount;
}
/**
* Returns the number of events that were received since the subscription
* Returns the number of events that were received since subscribing.
* @returns {number} the number of events that were received
*/
getReceivedEventCount() {
return this.#receivedEventCount;
}
/**
* Returns the topic name for this subscription
* Returns the topic name for this subscription.
* @returns {string} the topic name
*/
getTopicName() {
return this.#topicName;
}
/**
* Returns the replay ID of the last processed event or null if no event was processed yet.
* @return {number} replay ID
*/
getLatestReplayId() {
return this.#latestReplayId;
}
/**
* @protected
* Resets the requested/received event counts.
* This method should only be be used internally by the client when it resubscribes.
* @param {number} newRequestedEventCount
*/
_resetEventCount(newRequestedEventCount) {
this.#requestedEventCount = newRequestedEventCount;
this.#receivedEventCount = 0;
}
};

// src/utils/avroHelper.js
var import_avro_js = __toESM(require("avro-js"), 1);
var CustomLongAvroType = import_avro_js.default.types.LongType.using({
fromBuffer: (buf) => {
const big = buf.readBigInt64LE();
if (big < Number.MIN_SAFE_INTEGER || big > Number.MAX_SAFE_INTEGER) {
return big;
}
return Number(BigInt.asIntN(64, big));
},
toBuffer: (n) => {
const buf = Buffer.allocUnsafe(8);
if (n instanceof BigInt) {
buf.writeBigInt64LE(n);
} else {
buf.writeBigInt64LE(BigInt(n));
}
return buf;
},
fromJSON: BigInt,
toJSON: Number,
isValid: (n) => {
const type = typeof n;
return type === "number" && n % 1 === 0 || type === "bigint";
},
compare: (n1, n2) => {
return n1 === n2 ? 0 : n1 < n2 ? -1 : 1;
}
});

// src/utils/configuration.js
var dotenv = __toESM(require("dotenv"), 1);
var import_fs = __toESM(require("fs"), 1);
Expand Down Expand Up @@ -230,7 +280,7 @@ var Configuration = class _Configuration {
};

// src/utils/eventParser.js
var import_avro_js = __toESM(require("avro-js"), 1);
var import_avro_js2 = __toESM(require("avro-js"), 1);
function parseEvent(schema, event) {
const allFields = schema.type.getFields();
const replayId = decodeReplayId(event.replayId);
Expand Down Expand Up @@ -313,7 +363,7 @@ function getChildFields(parentField) {
const types = parentField._type.getTypes();
let fields = [];
types.forEach((type) => {
if (type instanceof import_avro_js.default.types.RecordType) {
if (type instanceof import_avro_js2.default.types.RecordType) {
fields = fields.concat(type.getFields());
}
});
Expand Down Expand Up @@ -477,33 +527,7 @@ function base64url(input) {
}

// src/client.js
var CUSTOM_LONG_AVRO_TYPE = import_avro_js2.default.types.LongType.using({
fromBuffer: (buf) => {
const big = buf.readBigInt64LE();
if (big < Number.MIN_SAFE_INTEGER || big > Number.MAX_SAFE_INTEGER) {
return big;
}
return Number(BigInt.asIntN(64, big));
},
toBuffer: (n) => {
const buf = Buffer.allocUnsafe(8);
if (n instanceof BigInt) {
buf.writeBigInt64LE(n);
} else {
buf.writeBigInt64LE(BigInt(n));
}
return buf;
},
fromJSON: BigInt,
toJSON: Number,
isValid: (n) => {
const type = typeof n;
return type === "number" && n % 1 === 0 || type === "bigint";
},
compare: (n1, n2) => {
return n1 === n2 ? 0 : n1 < n2 ? -1 : 1;
}
});
var MAX_EVENT_BATCH_SIZE = 100;
var PubSubApiClient = class {
/**
* gRPC client
Expand All @@ -515,14 +539,20 @@ var PubSubApiClient = class {
* @type {Map<string,Schema>}
*/
#schemaChache;
/**
* Map of subscribitions indexed by topic name
* @type {Map<string,Object>}
*/
#subscriptions;
#logger;
/**
* Builds a new Pub/Sub API client
* @param {Logger} logger an optional custom logger. The client uses the console if no value is supplied.
* @param {Logger} [logger] an optional custom logger. The client uses the console if no value is supplied.
*/
constructor(logger = console) {
this.#logger = logger;
this.#schemaChache = /* @__PURE__ */ new Map();
this.#subscriptions = /* @__PURE__ */ new Map();
try {
Configuration.load();
} catch (error) {
Expand Down Expand Up @@ -636,21 +666,21 @@ var PubSubApiClient = class {
/**
* Subscribes to a topic and retrieves all past events in retention window.
* @param {string} topicName name of the topic that we're subscribing to
* @param {number} numRequested number of events requested
* @param {number} [numRequested] optional number of events requested. If not supplied or null, the client keeps the subscription alive forever.
* @returns {Promise<EventEmitter>} Promise that holds an emitter that allows you to listen to received events and stream lifecycle events
* @memberof PubSubApiClient.prototype
*/
async subscribeFromEarliestEvent(topicName, numRequested) {
async subscribeFromEarliestEvent(topicName, numRequested = null) {
return this.#subscribe({
topicName,
numRequested,
replayPreset: 1
});
}
/**
* Subscribes to a topic and retrieve past events starting from a replay ID
* Subscribes to a topic and retrieves past events starting from a replay ID.
* @param {string} topicName name of the topic that we're subscribing to
* @param {number} numRequested number of events requested
* @param {number} numRequested number of events requested. If null, the client keeps the subscription alive forever.
* @param {number} replayId replay ID
* @returns {Promise<EventEmitter>} Promise that holds an emitter that allows you to listen to received events and stream lifecycle events
* @memberof PubSubApiClient.prototype
Expand All @@ -664,13 +694,13 @@ var PubSubApiClient = class {
});
}
/**
* Subscribes to a topic
* Subscribes to a topic.
* @param {string} topicName name of the topic that we're subscribing to
* @param {number} numRequested number of events requested
* @param {number} [numRequested] optional number of events requested. If not supplied or null, the client keeps the subscription alive forever.
* @returns {Promise<EventEmitter>} Promise that holds an emitter that allows you to listen to received events and stream lifecycle events
* @memberof PubSubApiClient.prototype
*/
async subscribe(topicName, numRequested) {
async subscribe(topicName, numRequested = null) {
return this.#subscribe({
topicName,
numRequested
Expand All @@ -682,28 +712,44 @@ var PubSubApiClient = class {
* @return {PubSubEventEmitter} emitter that allows you to listen to received events and stream lifecycle events
*/
async #subscribe(subscribeRequest) {
let { topicName, numRequested } = subscribeRequest;
try {
if (typeof subscribeRequest.numRequested !== "number") {
throw new Error(
`Expected a number type for number of requested events but got ${typeof subscribeRequest.numRequested}`
);
}
if (!Number.isSafeInteger(subscribeRequest.numRequested) || subscribeRequest.numRequested < 1) {
throw new Error(
`Expected an integer greater than 1 for number of requested events but got ${subscribeRequest.numRequested}`
);
let isInfiniteEventRequest = false;
if (numRequested === null) {
isInfiniteEventRequest = true;
subscribeRequest.numRequested = numRequested = MAX_EVENT_BATCH_SIZE;
} else {
if (typeof numRequested !== "number") {
throw new Error(
`Expected a number type for number of requested events but got ${typeof numRequested}`
);
}
if (!Number.isSafeInteger(numRequested) || numRequested < 1) {
throw new Error(
`Expected an integer greater than 1 for number of requested events but got ${numRequested}`
);
}
if (numRequested > MAX_EVENT_BATCH_SIZE) {
this.#logger.warn(
`The number of requested events for ${topicName} exceeds max event batch size (${MAX_EVENT_BATCH_SIZE}).`
);
}
}
if (!this.#client) {
throw new Error("Pub/Sub API client is not connected.");
}
const subscription = this.#client.Subscribe();
let subscription = this.#subscriptions.get(topicName);
if (!subscription) {
subscription = this.#client.Subscribe();
this.#subscriptions.set(topicName, subscription);
}
subscription.write(subscribeRequest);
this.#logger.info(
`Subscribe request sent for ${subscribeRequest.numRequested} events from ${subscribeRequest.topicName}...`
`Subscribe request sent for ${numRequested} events from ${topicName}...`
);
const eventEmitter = new PubSubEventEmitter(
subscribeRequest.topicName,
subscribeRequest.numRequested
topicName,
numRequested
);
subscription.on("data", (data) => {
const latestReplayId = decodeReplayId(data.latestReplayId);
Expand All @@ -713,19 +759,13 @@ var PubSubApiClient = class {
);
data.events.forEach(async (event) => {
try {
let schema = await this.#getEventSchema(
subscribeRequest.topicName
);
if (schema.id !== event.schemaId) {
let schema = await this.#getEventSchema(topicName);
if (schema.id !== event.event.schemaId) {
this.#logger.info(
`Event schema changed, reloading: ${subscribeRequest.topicName}`
);
this.#clearEventSchemaFromCache(
subscribeRequest.topicName
);
schema = await this.#getEventSchema(
subscribeRequest.topicName
`Event schema changed (${schema.id} != ${event.event.schemaId}), reloading: ${topicName}`
);
this.#schemaChache.delete(topicName);
schema = await this.#getEventSchema(topicName);
}
const parsedEvent = parseEvent(schema, event);
this.#logger.debug(parsedEvent);
Expand All @@ -748,7 +788,14 @@ var PubSubApiClient = class {
this.#logger.error(parseError);
}
if (eventEmitter.getReceivedEventCount() === eventEmitter.getRequestedEventCount()) {
eventEmitter.emit("lastevent");
if (isInfiniteEventRequest) {
this.requestAdditionalEvents(
eventEmitter,
MAX_EVENT_BATCH_SIZE
);
} else {
eventEmitter.emit("lastevent");
}
}
});
} else {
Expand All @@ -760,6 +807,7 @@ var PubSubApiClient = class {
}
});
subscription.on("end", () => {
this.#subscriptions.delete(topicName);
this.#logger.info("gRPC stream ended");
eventEmitter.emit("end");
});
Expand All @@ -778,11 +826,33 @@ var PubSubApiClient = class {
return eventEmitter;
} catch (error) {
throw new Error(
`Failed to subscribe to events for topic ${subscribeRequest.topicName}`,
`Failed to subscribe to events for topic ${topicName}`,
{ cause: error }
);
}
}
/**
* Request additional events on an existing subscription.
* @param {PubSubEventEmitter} eventEmitter event emitter that was obtained in the first subscribe call
* @param {number} numRequested number of events requested.
*/
async requestAdditionalEvents(eventEmitter, numRequested) {
const topicName = eventEmitter.getTopicName();
const subscription = this.#subscriptions.get(topicName);
if (!subscription) {
throw new Error(
`Failed to request additional events for topic ${topicName}, no active subscription found.`
);
}
eventEmitter._resetEventCount(numRequested);
subscription.write({
topicName,
numRequested
});
this.#logger.debug(
`Resubscribing to a batch of ${numRequested} events for: ${topicName}`
);
}
/**
* Publishes a payload to a topic using the gRPC client.
* @param {string} topicName name of the topic that we're subscribing to
Expand Down Expand Up @@ -874,8 +944,8 @@ var PubSubApiClient = class {
if (schemaError) {
reject(schemaError);
} else {
const schemaType = import_avro_js2.default.parse(res.schemaJson, {
registry: { long: CUSTOM_LONG_AVRO_TYPE }
const schemaType = import_avro_js3.default.parse(res.schemaJson, {
registry: { long: CustomLongAvroType }
});
this.#logger.info(
`Topic schema loaded: ${topicName}`
Expand All @@ -890,7 +960,4 @@ var PubSubApiClient = class {
});
});
}
#clearEventSchemaFromCache(topicName) {
this.#schemaChache.delete(topicName);
}
};
Loading

0 comments on commit 06d2973

Please sign in to comment.