[!INFO] This documentation is kept to support a legacy version. Please consider upgrading to the latest version.
Install the client library with npm install salesforce-pubsub-api-client
.
Create a .env
file at the root of the project for configuration.
Pick one of these authentication flows and fill the relevant configuration:
- User supplied authentication
- Username/password authentication (recommended for tests)
- OAuth 2.0 client credentials
- OAuth 2.0 JWT Bearer (recommended for production)
Tip
The default client logger is fine for a test environment but you'll want to switch to a custom logger with asynchronous logging for increased performance.
If you already have a Salesforce client in your app, you can reuse its authentication information. You only need this minimal configuration:
SALESFORCE_AUTH_TYPE=user-supplied
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
When connecting to the Pub/Sub API, use the following method instead of the standard connect()
method to specify authentication information:
await client.connectWithAuth(accessToken, instanceUrl, organizationId);
Warning
Relying on a username/password authentication flow for production is not recommended. Consider switching to JWT auth for extra security.
SALESFORCE_AUTH_TYPE=username-password
SALESFORCE_LOGIN_URL=https://login.salesforce.com
SALESFORCE_USERNAME=YOUR_SALESFORCE_USERNAME
SALESFORCE_PASSWORD=YOUR_SALESFORCE_PASSWORD
SALESFORCE_TOKEN=YOUR_SALESFORCE_USER_SECURITY_TOKEN
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
SALESFORCE_AUTH_TYPE=oauth-client-credentials
SALESFORCE_LOGIN_URL=YOUR_DOMAIN_URL
SALESFORCE_CLIENT_ID=YOUR_CONNECTED_APP_CLIENT_ID
SALESFORCE_CLIENT_SECRET=YOUR_CONNECTED_APP_CLIENT_SECRET
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
This is the most secure authentication option. Recommended for production use.
SALESFORCE_AUTH_TYPE=oauth-jwt-bearer
SALESFORCE_LOGIN_URL=https://login.salesforce.com
SALESFORCE_CLIENT_ID=YOUR_CONNECTED_APP_CLIENT_ID
SALESFORCE_USERNAME=YOUR_SALESFORCE_USERNAME
SALESFORCE_PRIVATE_KEY_FILE=PATH_TO_YOUR_KEY_FILE
PUB_SUB_ENDPOINT=api.pubsub.salesforce.com:7443
Here's an example that will get you started quickly. It listens to a single account change event.
-
Activate Account change events in Salesforce Setup > Change Data Capture.
-
Create a
sample.js
file with this content:import PubSubApiClient from 'salesforce-pubsub-api-client'; async function run() { try { const client = new PubSubApiClient(); await client.connect(); // Subscribe to account change events const eventEmitter = await client.subscribe( '/data/AccountChangeEvent' ); // Handle incoming events eventEmitter.on('data', (event) => { console.log( `Handling ${event.payload.ChangeEventHeader.entityName} change event ` + `with ID ${event.replayId} ` + `on channel ${eventEmitter.getTopicName()} ` + `(${eventEmitter.getReceivedEventCount()}/${eventEmitter.getRequestedEventCount()} ` + `events received so far)` ); // Safely log event as a JSON string console.log( JSON.stringify( event, (key, value) => /* Convert BigInt values into strings and keep other types unchanged */ typeof value === 'bigint' ? value.toString() : value, 2 ) ); }); } catch (error) { console.error(error); } } run();
-
Run the project with
node sample.js
If everything goes well, you'll see output like this:
Connected to Salesforce org https://pozil-dev-ed.my.salesforce.com as grpc@pozil.com Connected to Pub/Sub API endpoint api.pubsub.salesforce.com:7443 Topic schema loaded: /data/AccountChangeEvent Subscribe request sent for 100 events from /data/AccountChangeEvent...
At this point the script will be on hold and will wait for events.
-
Modify an account record in Salesforce. This fires an account change event.
Once the client receives an event, it will display it like this:
Received 1 events, latest replay ID: 18098167 Handling Account change event with ID 18098167 on channel /data/AccountChangeEvent (1/100 events received so far) { "replayId": 18098167, "payload": { "ChangeEventHeader": { "entityName": "Account", "recordIds": [ "0014H00002LbR7QQAV" ], "changeType": "UPDATE", "changeOrigin": "com/salesforce/api/soap/58.0;client=SfdcInternalAPI/", "transactionKey": "000046c7-a642-11e2-c29b-229c6786473e", "sequenceNumber": 1, "commitTimestamp": 1696444513000, "commitNumber": 11657372702432, "commitUser": "00558000000yFyDAAU", "nulledFields": [], "diffFields": [], "changedFields": [ "LastModifiedDate", "BillingAddress.City", "BillingAddress.State" ] }, "Name": null, "Type": null, "ParentId": null, "BillingAddress": { "Street": null, "City": "San Francisco", "State": "CA", "PostalCode": null, "Country": null, "StateCode": null, "CountryCode": null, "Latitude": null, "Longitude": null, "Xyz": null, "GeocodeAccuracy": null }, "ShippingAddress": null, "Phone": null, "Fax": null, "AccountNumber": null, "Website": null, "Sic": null, "Industry": null, "AnnualRevenue": null, "NumberOfEmployees": null, "Ownership": null, "TickerSymbol": null, "Description": null, "Rating": null, "Site": null, "OwnerId": null, "CreatedDate": null, "CreatedById": null, "LastModifiedDate": 1696444513000, "LastModifiedById": null, "Jigsaw": null, "JigsawCompanyId": null, "CleanStatus": null, "AccountSource": null, "DunsNumber": null, "Tradestyle": null, "NaicsCode": null, "NaicsDesc": null, "YearStarted": null, "SicDesc": null, "DandbCompanyId": null } }
Note that the change event payloads include all object fields but fields that haven't changed are null. In the above example, the only changes are the Billing State, Billing City and Last Modified Date.
Use the values from
ChangeEventHeader.nulledFields
,ChangeEventHeader.diffFields
andChangeEventHeader.changedFields
to identify actual value changes.
Publish a Sample__e
Platform Event with a Message__c
field:
const payload = {
CreatedDate: new Date().getTime(), // Non-null value required but there's no validity check performed on this field
CreatedById: '005_________', // Valid user ID
Message__c: { string: 'Hello world' } // Field is nullable so we need to specify the 'string' type
};
const publishResult = await client.publish('/event/Sample__e', payload);
console.log('Published event: ', JSON.stringify(publishResult));
Subscribe to 5 account change events starting from a replay ID:
const eventEmitter = await client.subscribeFromReplayId(
'/data/AccountChangeEvent',
5,
17092989
);
Subscribe to the 3 earliest past account change events in retention window:
const eventEmitter = await client.subscribeFromEarliestEvent(
'/data/AccountChangeEvent',
3
);
When working with high volumes of events you can control the incoming flow of events by requesting a limited batch of events. This event flow control ensures that the client doesn’t get overwhelmed by accepting more events that it can handle if there is a spike in event publishing.
This is the overall process:
- Pass a number of requested events in your subscribe call.
- Handle the
lastevent
event fromPubSubEventEmitter
to detect the end of the event batch. - Subscribe to an additional batch of events with
client.requestAdditionalEvents(...)
. If you don't request additional events at this point, the gRPC subscription will close automatically (default Pub/Sub API behavior).
The code below illustrate how you can achieve event flow control:
try {
// Connect with the Pub/Sub API
const client = new PubSubApiClient();
await client.connect();
// Subscribe to a batch of 10 account change event
const eventEmitter = await client.subscribe('/data/AccountChangeEvent', 10);
// Handle incoming events
eventEmitter.on('data', (event) => {
// Logic for handling a single event.
// Unless you request additional events later, this should get called up to 10 times
// given the initial subscription boundary.
});
// Handle last requested event
eventEmitter.on('lastevent', () => {
console.log(
`Reached last requested event on channel ${eventEmitter.getTopicName()}.`
);
// Request 10 additional events
client.requestAdditionalEvents(eventEmitter, 10);
});
} catch (error) {
console.error(error);
}
Use the EventEmmitter
returned by subscribe methods to handle gRPC stream lifecycle events:
// Stream end
eventEmitter.on('end', () => {
console.log('gRPC stream ended');
});
// Stream error
eventEmitter.on('error', (error) => {
console.error('gRPC stream error: ', JSON.stringify(error));
});
// Stream status update
eventEmitter.on('status', (status) => {
console.log('gRPC stream status: ', status);
});
The client logs output to the console by default but you can provide your favorite logger in the client constructor.
When in production, asynchronous logging is preferable for performance reasons.
For example:
import pino from 'pino';
const logger = pino();
const client = new PubSubApiClient(logger);
If you attempt to call JSON.stringify
on an event you will likely see the following error:
TypeError: Do not know how to serialize a BigInt
This happens when an integer value stored in an event field exceeds the range of the Number
JS type (this typically happens with commitNumber
values). In this case, we use a BigInt
type to safely store the integer value. However, the BigInt
type is not yet supported in standard JSON representation (see step 10 in the BigInt TC39 spec) so this triggers a TypeError
.
To avoid this error, use a replacer function to safely escape BigInt values so that they can be serialized as a string (or any other format of your choice) in JSON:
// Safely log event as a JSON string
console.log(
JSON.stringify(
event,
(key, value) =>
/* Convert BigInt values into strings and keep other types unchanged */
typeof value === 'bigint' ? value.toString() : value,
2
)
);
Client for the Salesforce Pub/Sub API
Builds a new Pub/Sub API client.
Name | Type | Description |
---|---|---|
logger |
Logger | an optional custom logger. The client uses the console if no value is supplied. |
Closes the gRPC connection. The client will no longer receive events for any topic.
Authenticates with Salesforce then, connects to the Pub/Sub API.
Returns: Promise that resolves once the connection is established.
Connects to the Pub/Sub API with user-supplied authentication.
Returns: Promise that resolves once the connection is established.
Name | Type | Description |
---|---|---|
accessToken |
string | Salesforce access token |
instanceUrl |
string | Salesforce instance URL |
organizationId |
string | optional organization ID. If you don't provide one, we'll attempt to parse it from the accessToken. |
Get connectivity state from current channel.
Returns: Promise that holds channel's connectivity state.
Publishes a payload to a topic using the gRPC client.
Returns: Promise holding a PublishResult
object with replayId
and correlationKey
.
Name | Type | Description |
---|---|---|
topicName |
string | name of the topic that we're subscribing to |
payload |
Object | |
correlationKey |
string | optional correlation key. If you don't provide one, we'll generate a random UUID for you. |
Subscribes to a topic.
Returns: Promise that holds an PubSubEventEmitter
that allows you to listen to received events and stream lifecycle events.
Name | Type | Description |
---|---|---|
topicName |
string | name of the topic that we're subscribing to |
numRequested |
number | optional number of events requested. If not supplied or null, the client keeps the subscription alive forever. |
Subscribes to a topic and retrieves all past events in retention window.
Returns: Promise that holds an PubSubEventEmitter
that allows you to listen to received events and stream lifecycle events.
Name | Type | Description |
---|---|---|
topicName |
string | name of the topic that we're subscribing to |
numRequested |
number | optional number of events requested. If not supplied or null, the client keeps the subscription alive forever. |
Subscribes to a topic and retrieves past events starting from a replay ID.
Returns: Promise that holds an PubSubEventEmitter
that allows you to listen to received events and stream lifecycle events.
Name | Type | Description |
---|---|---|
topicName |
string | name of the topic that we're subscribing to |
numRequested |
number | number of events requested. If null , the client keeps the subscription alive forever. |
replayId |
number | replay ID |
Request additional events on an existing subscription.
Name | Type | Description |
---|---|---|
eventEmitter |
PubSubEventEmitter | event emitter that was obtained in the first subscribe call |
numRequested |
number | number of events requested. |
EventEmitter wrapper for processing incoming Pub/Sub API events while keeping track of the topic name and the volume of events requested/received.
The emitter sends the following events:
Event Name | Event Data | Description |
---|---|---|
data |
Object | Client received a new event. The attached data is the parsed event data. |
error |
EventParseError | Object |
Signals an event parsing error or a gRPC stream error. |
lastevent |
void | Signals that we received the last event that the client requested. The stream will end shortly. |
keepalive |
{ latestReplayId: number, pendingNumRequested: number } |
Server publishes this keep alive message every 270 seconds (or less) if there are no events. |
end |
void | Signals the end of the gRPC stream. |
status |
Object | Misc gRPC stream status information. |
The emitter also exposes these methods:
Method | Description |
---|---|
getRequestedEventCount() |
Returns the number of events that were requested when subscribing. |
getReceivedEventCount() |
Returns the number of events that were received since subscribing. |
getTopicName() |
Returns the topic name for this subscription. |
getLatestReplayId() |
Returns the replay ID of the last processed event or null if no event was processed yet. |
Holds the information related to an event parsing error. This class attempts to extract the event replay ID from the event that caused the error.
Name | Type | Description |
---|---|---|
message |
string | The error message. |
cause |
Error | The cause of the error. |
replayId |
number | The replay ID of the event at the origin of the error. Could be undefined if we're not able to extract it from the event data. |
event |
Object | The un-parsed event data at the origin of the error. |
latestReplayId |
number | The latest replay ID that was received before the error. |