-
-
Notifications
You must be signed in to change notification settings - Fork 92
/
Copy pathupsert.js
111 lines (93 loc) · 3.61 KB
/
upsert.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
import { EJSON } from 'meteor/ejson'
import { CollectionHooks } from './collection-hooks'
const isEmpty = a => !Array.isArray(a) || !a.length
CollectionHooks.defineWrapper('upsert', async function (userId, _super, instance, hookGroup, getTransform, args, suppressHooks) {
args[0] = CollectionHooks.normalizeSelector(instance._getFindSelector(args))
const ctx = { context: this, _super, args }
let [selector, mutator, options, callback] = args
if (typeof options === 'function') {
callback = options
options = {}
}
const async = typeof callback === 'function'
let docs
let docIds
let abort
const prev = {}
if (!suppressHooks) {
if (!isEmpty(hookGroup.upsert.before) || !isEmpty(hookGroup.update.after)) {
const cursor = await CollectionHooks.getDocs.call(this, instance, selector, options)
docs = await cursor.fetch()
docIds = docs.map(doc => doc._id)
}
// copy originals for convenience for the 'after' pointcut
if (!isEmpty(hookGroup.update.after)) {
if (hookGroup.update.after.some(o => o.options.fetchPrevious !== false) &&
CollectionHooks.extendOptions(instance.hookOptions, {}, 'after', 'update').fetchPrevious !== false) {
prev.mutator = EJSON.clone(mutator)
prev.options = EJSON.clone(options)
prev.docs = {}
docs.forEach((doc) => {
prev.docs[doc._id] = EJSON.clone(doc)
})
}
}
// before
for (const fn of hookGroup.upsert.before) {
const r = await fn.hook.call(ctx, userId, selector, mutator, options)
if (r === false) abort = true
}
if (abort) return { numberAffected: 0 }
}
const afterUpdate = async (affected, err) => {
if (!suppressHooks && !isEmpty(hookGroup.update.after)) {
const fields = CollectionHooks.getFields(mutator)
const docs = await CollectionHooks.getDocs.call(this, instance, { _id: { $in: docIds } }, options).fetchAsync()
for (const o of hookGroup.update.after) {
for (const doc of docs) {
await o.hook.call({
transform: getTransform(doc),
previous: prev.docs && prev.docs[doc._id],
affected,
err,
...ctx
}, userId, doc, fields, prev.mutator, prev.options)
}
}
}
}
const afterInsert = async (_id, err) => {
if (!suppressHooks && !isEmpty(hookGroup.insert.after)) {
const docs = await CollectionHooks.getDocs.call(this, instance, { _id }, selector, {}).fetchAsync() // 3rd argument passes empty object which causes magic logic to imply limit:1
const doc = docs[0]
const lctx = { transform: getTransform(doc), _id, err, ...ctx }
for (const o of hookGroup.insert.after) {
await o.hook.call(lctx, userId, doc)
}
}
}
if (async) {
const wrappedCallback = async function (err, ret) {
const { insertedId, numberAffected } = (ret ?? {})
if (err || insertedId) {
// Send any errors to afterInsert
await afterInsert(insertedId, err)
} else {
await afterUpdate(numberAffected, err) // Note that err can never reach here
}
return CollectionHooks.hookedOp(function () {
return callback.call(this, err, ret)
})
}
return CollectionHooks.directOp(() => _super.call(this, selector, mutator, options, wrappedCallback))
} else {
const ret = await CollectionHooks.directOp(() => _super.call(this, selector, mutator, options, callback))
const { insertedId, numberAffected } = (ret ?? {})
if (insertedId) {
await afterInsert(insertedId)
} else {
await afterUpdate(numberAffected)
}
return ret
}
})