Skip to content

Commit

Permalink
Merge pull request #370 from timgit/maint-crash-prevention
Browse files Browse the repository at this point in the history
crash prevention on connection loss
  • Loading branch information
timgit authored Mar 3, 2023
2 parents 32f5ba2 + 7bcfeb4 commit 5479fac
Show file tree
Hide file tree
Showing 7 changed files with 295 additions and 207 deletions.
367 changes: 193 additions & 174 deletions package-lock.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "pg-boss",
"version": "8.4.0",
"version": "8.4.1",
"description": "Queueing jobs in Node.js using PostgreSQL like a boss",
"main": "./src/index.js",
"engines": {
Expand Down
10 changes: 5 additions & 5 deletions src/manager.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ class Manager extends EventEmitter {
}

async sendOnce (name, data, options, key) {
options = options || {}
options = options ? { ...options } : {}

options.singletonKey = key || name

Expand All @@ -351,7 +351,7 @@ class Manager extends EventEmitter {
}

async sendSingleton (name, data, options) {
options = options || {}
options = options ? { ...options } : {}

options.singletonKey = SINGLETON_QUEUE_KEY

Expand All @@ -361,7 +361,7 @@ class Manager extends EventEmitter {
}

async sendAfter (name, data, options, after) {
options = options || {}
options = options ? { ...options } : {}
options.startAfter = after

const result = Attorney.checkSendArgs([name, data, options], this.config)
Expand All @@ -370,7 +370,7 @@ class Manager extends EventEmitter {
}

async sendThrottled (name, data, options, seconds, key) {
options = options || {}
options = options ? { ...options } : {}
options.singletonSeconds = seconds
options.singletonNextSlot = false
options.singletonKey = key
Expand All @@ -381,7 +381,7 @@ class Manager extends EventEmitter {
}

async sendDebounced (name, data, options, seconds, key) {
options = options || {}
options = options ? { ...options } : {}
options.singletonSeconds = seconds
options.singletonNextSlot = true
options.singletonKey = key
Expand Down
44 changes: 31 additions & 13 deletions src/timekeeper.js
Original file line number Diff line number Diff line change
Expand Up @@ -88,29 +88,47 @@ class Timekeeper extends EventEmitter {
}

async monitorCron () {
const { secondsAgo } = await this.getCronTime()
try {
if (this.config.__test__force_cron_monitoring_error) {
throw new Error(this.config.__test__force_cron_monitoring_error)
}

if (secondsAgo > 60) {
await this.checkSchedulesAsync()
const { secondsAgo } = await this.getCronTime()

if (secondsAgo > 60) {
await this.checkSchedulesAsync()
}
} catch (err) {
this.emit(this.events.error, err)
}
}

async cacheClockSkew () {
const { rows } = await this.db.executeSql(this.getTimeCommand)
let skew = 0

try {
if (this.config.__test__force_clock_monitoring_error) {
throw new Error(this.config.__test__force_clock_monitoring_error)
}

const local = Date.now()
const { rows } = await this.db.executeSql(this.getTimeCommand)

const dbTime = parseFloat(rows[0].time)
const local = Date.now()

const skew = dbTime - local
const dbTime = parseFloat(rows[0].time)

const skewSeconds = Math.abs(skew) / 1000
skew = dbTime - local

if (skewSeconds >= 60 || this.config.__test__force_clock_skew_warning) {
Attorney.warnClockSkew(`Instance clock is ${skewSeconds}s ${skew > 0 ? 'slower' : 'faster'} than database.`)
}
const skewSeconds = Math.abs(skew) / 1000

this.clockSkew = skew
if (skewSeconds >= 60 || this.config.__test__force_clock_skew_warning) {
Attorney.warnClockSkew(`Instance clock is ${skewSeconds}s ${skew > 0 ? 'slower' : 'faster'} than database.`)
}
} catch (err) {
this.emit(this.events.error, err)
} finally {
this.clockSkew = skew
}
}

async checkSchedulesAsync () {
Expand All @@ -128,7 +146,7 @@ class Timekeeper extends EventEmitter {

try {
if (this.config.__test__throw_clock_monitoring) {
throw new Error('clock monitoring error')
throw new Error(this.config.__test__throw_clock_monitoring)
}

const items = await this.getSchedules()
Expand Down
29 changes: 18 additions & 11 deletions test/backgroundErrorTest.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
const assert = require('assert')
const PgBoss = require('../')
const delay = require('delay')

describe('background processing error handling', function () {
it('maintenance error handling works', async function () {
Expand Down Expand Up @@ -52,20 +54,25 @@ describe('background processing error handling', function () {
})

it('clock monitoring error handling works', async function () {
const config = { ...this.test.bossConfig, __test__throw_clock_monitoring: true }
const boss = this.test.boss = new PgBoss(config)
const config = {
...this.test.bossConfig,
clockMonitorIntervalSeconds: 1,
__test__throw_clock_monitoring: 'pg-boss mock error: clock monitoring'
}

return new Promise((resolve) => {
let resolved = false
let errorCount = 0

boss.on('error', () => {
if (!resolved) {
resolved = true
resolve()
}
})
const boss = this.test.boss = new PgBoss(config)

boss.start().then(() => {})
boss.once('error', (error) => {
assert.strictEqual(error.message, config.__test__throw_clock_monitoring)
errorCount++
})

await boss.start()

await delay(8000)

assert.strictEqual(errorCount, 1)
})
})
4 changes: 1 addition & 3 deletions test/readme.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ async function readme () {
const PgBoss = require('../src')
const boss = new PgBoss(helper.getConnectionString())

boss.on('error', error => {
console.error(error)
})
boss.on('error', console.error)

await boss.start()

Expand Down
46 changes: 46 additions & 0 deletions test/scheduleTest.js
Original file line number Diff line number Diff line change
Expand Up @@ -189,4 +189,50 @@ describe('schedule', function () {

assert.strictEqual(warningCount, 1)
})

it('errors during clock skew monitoring should emit', async function () {
const config = {
...this.test.bossConfig,
clockMonitorIntervalSeconds: 1,
__test__force_clock_monitoring_error: 'pg-boss mock error: clock skew monitoring'
}

let errorCount = 0

const boss = this.test.boss = new PgBoss(config)

boss.once('error', error => {
assert.strictEqual(error.message, config.__test__force_clock_monitoring_error)
errorCount++
})

await boss.start()

await delay(2000)

assert.strictEqual(errorCount, 1)
})

it('errors during cron monitoring should emit', async function () {
const config = {
...this.test.bossConfig,
cronMonitorIntervalSeconds: 1,
__test__force_cron_monitoring_error: 'pg-boss mock error: cron monitoring'
}

let errorCount = 0

const boss = this.test.boss = new PgBoss(config)

boss.once('error', error => {
assert.strictEqual(error.message, config.__test__force_cron_monitoring_error)
errorCount++
})

await boss.start()

await delay(2000)

assert.strictEqual(errorCount, 1)
})
})

0 comments on commit 5479fac

Please sign in to comment.