I am using node v16.14.0.
The below is a node script that is trying to insert message data into Postgres db after pulling a message from a platform event, but some how it is not able to.
It hits the line "let client = await pool.connect();" and continues with the next message. It never inserts into the table.
I am not sure what I am doing wrong here?
const jsforce = require("jsforce");
const { Pool }= require("pg");
const channel = "/event/Short_Url_Click_Notification__e";
let replayId = -2 // -1 = Only New messages | -2 = All Window and New
console.log('db = ' +process.env.DATABASE_URL);
console.log('sfu= ' +process.env.SF_USERNAME);
console.log('sfp= ' +process.env.SF_PASSWORD);
// sf connection
const sfConnection = new jsforce.Connection();
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
ssl:{
rejectUnauthorized: false
}
});
try {
pool.query("SELECT replay_id, date_str, channel FROM platform_event_setting WHERE channel = $1 LIMIT 1", [channel], (err, res) => {
if (err) {
console.error("Unable to Query platform_event_setting: ", err);
pgClient.end();
process.exit(1);
}
if (res.rowCount > 0) {
replayId = res.rows[0].replay_id;
}
sfConnection.login(process.env.SF_USERNAME, process.env.SF_PASSWORD, (err, res) => {
if (err) {
console.error(err);
return process.exit(1);
}
const streamingClient = sfConnection.streaming.createClient([
new jsforce.StreamingExtension.Replay(channel, replayId),
new jsforce.StreamingExtension.AuthFailure(function () {
return process.exit(1);
}),
]);
subscription = streamingClient.subscribe(channel, async (message) => {
try {
console.log("received data", JSON.stringify(message));
event_values = [channel, message.event.replayId, message.payload.CreatedDate];
let client = await pool.connect();
client.query("BEGIN");
try {
// process message
await client.query(`INSERT INTO platform_event_setting
(channel, replay_id, date_str)
VALUES ($1, $2, $3)
ON CONFLICT ON CONSTRAINT platform_event_setting_pkey
DO UPDATE SET (replay_id, date_str) = (EXCLUDED.replay_id, EXCLUDED.date_str)
`, event_values);
client.query("COMMIT");
} catch (e) {
console.error(e);
client.query("ROLLBACK");
return process.exit(1);
}
} finally {
client.release();
}
}); // subscritpion
}); // sf connection
}); // pool
} catch (e) {
console.error(e);
}
The above script pulls a message from Salesforce platform events bus and trying to insert into a Postgres database.