-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CopyFail is never sent to DB #115
Comments
From what I can tell, the copy-in PG stream is closed in this node-pg-copy-streams/copy-from.js Lines 42 to 56 in 543cd9a
However, my understanding is that |
For my own use case, I was able to work around this with the following: const copyStream = require('pg-copy-streams').from(sql);
copyStream._destroy = function (err, callback) {
this._final(() => callback(err));
}; This will complete the copy-in with a |
Hello, We should try and see in the future how best to implement / test / document how to bail out of the COPY operation with a copyFail. As a workaround, (it was not designed for this module), it could maybe be sufficient to call Don't hesitate to tell me if you have time to test this. |
I think we're running into this issue as well. In our case, we want to reuse the same client retrieved from a pool, then loop through a set of files in AWS S3 and pipe a read stream to directly load into pg. This works wonderfully 99% of the time, but sometimes the AWS SDK throws an error and times out the read stream. We can catch the error and log it via the stream.pipeline() we've done between these streams, but the pg client stays open and shows up as an active query that we have to manually terminate. Again, this doesn't happen very often from S3 fortunately, but I will try out the code posted above to see if it resolves our issue. |
I would be interested in seeing your code for what you're doing to see how we could transform it into a reproducible test and then handle it in the library implementing _destroy() like in gabegorelick's comment (but using it to send copyFail) could be the solution but I suspect there are other error modes where we would want to send copyFail that would not be handled by this. Otherwise, depending on what is possible, calling this.connection.sendCopyFail("Error causing the failure") should work. |
I will try and reproduce the error by building a filestream and force-throwing to verify later. For now, I can verify the above code doesn't just resolve it. In fact, it just caused all loading to stop, lol. Here's an overview of our loader in a set of abstractions to see the general flow. We will either open a file stream or an S3 stream. This code works with either. If you're wondering what AsyncPipeline is, it's a very thin wrapper over const { Transform } = require('stream');
const copyFrom = require('pg-copy-streams').from;
const AsyncPipeline = require('./AsyncPipeline')
const StreamValues = require('stream-json/streamers/StreamValues');
const Db = require('./Db');
const { getCopyRowFromObject } = require('./Db')
const zlib = require('zlib')
function getCopyStream(sql) {
// following _destroy implementation is an attempt to clean up the connection
// after an error occurs in a pipeline, such as AWS S3 download stream throttling
// From: https://github.com/brianc/node-pg-copy-streams/issues/115#issuecomment-670293136
const copyStream = copyFrom(sql)
copyStream._destroy = function (err, callback) {
if(err) {
this.connection.sendCopyFail(err.message)
}
callback()
}
return copyStream
}
class Loader {
constructor(stream, authenticator, options = {}) {
this.stream = stream
this.db = new Db(authenticator, options)
this.options = options
}
copyTransformer(props, options = {}) {
let accessor = options.accessor || (data => data)
let map = options.map || (data => data)
let filter = options.filter || (() => true)
let copyRow = obj => getCopyRowFromObject(obj, props)
return new Transform({
objectMode: true,
transform: function(chunk, encoding, callback) {
chunk = accessor(chunk)
try {
if(filter(chunk)){
let mapped = map(chunk)
if(mapped) {
const row = copyRow(mapped)
this.push(row)
}
}
callback()
} catch(err) {
callback(err)
}
}
})
}
async load(table, options = {}) {
await this.db.createSchemaConcurrently()
await this.db.createTableConcurrently(table)
// stream-json produces { key: <index> , value: <object> }
options.accessor = data => data.value
const steps = [ this.stream ]
if(this.options.gunzip) {
steps.push(zlib.createGunzip())
}
steps.push(StreamValues.withParser())
steps.push(this.copyTransformer(this.db.fieldNames[table], options))
steps.push(this.db.stream(getCopyStream(this.db.getModelCopyStatement(table))))
const pipeline = new AsyncPipeline(steps)
await pipeline.run()
}
}
module.exports = Loader |
Ok I understand better. It seems the use of pipeline automates the forwarding of errors and the call to destroy on each stream in the pipeline
which seem to makes it possible in this specific case to automate the copyFail bailout via destroy. In other cases for those not using pipeline, they would have to call destroy themselves. Error modes in streams are always something not easy to implement so we need tests for both cases. I need to find a clear explanation on how _destroy and _final may or may not both be called. your code (just sending copyFail) should work I suppose but it could leave dangling chunks in memory (those are flushed in _final before sending copyDone). Anyway upon an error, the doc states it is probably better to discard the stream and not reuse it. |
I think that from https://github.com/nodejs/node/blob/master/lib/internal/streams/writable.js#L716 we can make the hypothesis that once destroy has been called, _final will not be called. from https://github.com/nodejs/node/blob/master/lib/internal/streams/destroy.js#L109 it seems that _destroy can now return a promise so that the destroying mechanism can be async before the internal engines sends 'close' or 'error'. We might need this or some other mechanism because in the _final case for example, there is a waiting mechanism that waits for postgres to acknowledge the copyDone before sending 'finish' (otherwise, people think that the COPY is finished, by postgres is still ingesting the data) And it needs to make sense with https://www.postgresql.org/docs/10/protocol-flow.html
So when in _destroy we send copyFail, we need to wait for pg to intercept the error. It will then call this.handleError which then need to call the callback received via _destroy or something along those lines. This could work but we need to clarify if pg is going to send the COPY command via and extended-query message or not to see if we need to send a Sync message if pg is going to do it by itself. Otherwise the postgres connection could maybe be in some unfinished state which could be a problem if it is release to a pool. But the first step is to have simple tests |
I looked a bit into this. I found a slight interaction issue between node-pg-copy-stream and pg. When _destroy sends copyFail, postgres receives it and returns an ErrorResponse message that is catched by pg which calls the handleError hook. This is good to know that postgres has errored. BUT inside pg it is then decided that the activeQuery is no longer active here So we are not informed of when the connection is really available for new queries. According to the documentation of postgres
so in theory it should be possible inside pg to keep the query active until ReadyForQuery is received by removing the @brianc can I ask you what you think about this ? could we consider that since the postgres documentation states that ReadyForQuery is always sent (success and error), only ReadyForQuery should be setting the activeQuery to null when postgres sends ReadyForQuery ? |
that might be the case yeah - there've been a smattering of bugs throughout the years related to error happening and then ready for query and the active query already being null. A few even caused by slightly different implementations of the postgres protocol in other databases...so it's a bit tricky to change that code but if we can do it w/ all tests passing it might be worth it. |
thanks for your reply.
for example, the test break on https://github.com/brianc/node-postgres/blob/master/packages/pg/test/integration/client/query-error-handling-prepared-statement-tests.js#L85 where the test expects 'terminating connection due to administrator command' but we now get 'Connection terminated unexpectedly' So it may not be reasonable for a query to always only wait for ReadyForQuery before ackowledging that an ErrorResponse was received. ReadyForQuery is sort of in-between : it does not fully belong to the prev query and it does not fully belong to the next query either. it is a sort of grey area. I'll try to dig a bit deeper into this handling inside |
@brianc I digged a little further into the error handling mechanism. In any case this need a new |
I just published version 6.0.0 of the module that implements a I am interested in your feedback if you can test this version in your setups. |
I just tested
|
@willfarrell thanks for the feedback. Do you have simple test that shows this behavior ? All tests are passing on my side and after looking rapidly I cannot find a way to reproduce this. Can you tell me what node version you are using ? |
I'm using node v14 I've been thinking on this, it may have been due to other processes. I'll do some testing an create a new issue if it actually an issue. |
@willfarrell there was an issue related to the precedence of _destroy vs _final that was modified in node 14. There is an additional test and a fix in version 6.0.1 (and node 14 is now tested in travis). If you have time to test this version and see if it fixes your issue it would be helpful. Thanks. |
Thanks for taking a deeper look. That seems to have done the trick, working as expected now. Is it worth adding node v16 to the ci list, just in case? |
I will add v16 but it seems in v16 something else changed around memory management as it breaks a "performance" test on my machine - https://github.com/brianc/node-pg-copy-streams/blob/master/test/binary.js#L55 - I may need to remove this test or accept a higher memory consumption in the test. |
or take time to understand what happens in node v16.. I created issue #124 in order to avoid polluting the current issue. I will leave this issue open for a few weeks in case some of you have time to test 6.0.1 and confirm if it behaves correctly regarding the rollback / CopyFail mechanism. |
This version is working on our setup using Node 14. I will report back if we see this issue again. Thanks |
#74 mentions that in the event of an error in your stream, you can close the client. However, often times you want to continue using the connection and instead simply rollback the COPY statement. My understanding is that this can be done for copy-in streams by sending
CopyFail
to Postgres:https://www.postgresql.org/docs/current/protocol-flow.html#PROTOCOL-COPY
However, it doesn't appear that node-pg-copy-streams ever sends
CopyFail
to Postgres. Instead, the COPY query is left running waiting for more input that will never come.It seems like maybe the copy-from stream should implement
_final
and sendCopyFail
orCopyDone
. Barring that, is there a way to terminate a query without releasing the client?The text was updated successfully, but these errors were encountered: