Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BulkIndexer do not provide info to BulkIndexerItem on global worker.flush error #179

Open
ValentinVERGEZ opened this issue Aug 7, 2020 · 16 comments · May be fixed by #615
Open

BulkIndexer do not provide info to BulkIndexerItem on global worker.flush error #179

ValentinVERGEZ opened this issue Aug 7, 2020 · 16 comments · May be fixed by #615

Comments

@ValentinVERGEZ
Copy link

Hello,

I started to use esutil.BulkIndexer recently (with elasticsearch v6) and got an error by not providing a type on an index action. I expected to obtain the info thanks to the OnFailure callback but got nothing. I now understand that OnFailure does not report errors that occur with the whole worker.flush() and that only the BulkIndexer.OnError() callback does so.
I did not find any easy workaround helping me to detect when the worker.flush() in charge of my BulkIndexerItem fails.

Do you have a workaround for this situation or does it requires an evolution of the BulkIndexer?
If it needs a PR, what do you recommend me to do?

  • Calling OnFailure for each item when an error occurs
  • Adding another callback (for example OnFlushError) to the items, dedicated to this situation
  • Something else ...

Regards

@karmi
Copy link
Contributor

karmi commented Aug 8, 2020

Hello! This is strange, as all kinds of errors should be propagated. I've tried the executable example _examples/bulk/indexer.go against the 6.x branch, and there's a subtle difference between OnError for the whole indexer and OnError for individual items. In case of omitting the _type on Elasticsearch 6.x, no items are returned at all (the logging output shows it), therefore, calling OnFailure wouldn't help.

I've added some logging and error callback to the example, so you can try it locally:

diff --git i/_examples/bulk/indexer.go w/_examples/bulk/indexer.go
index e5a45b91..163eed29 100644
--- i/_examples/bulk/indexer.go
+++ w/_examples/bulk/indexer.go
@@ -17,8 +17,10 @@ import (
        "context"
        "encoding/json"
        "flag"
+       "fmt"
        "log"
        "math/rand"
+       "os"
        "runtime"
        "strconv"
        "strings"
@@ -30,6 +32,7 @@ import (
 
        "github.com/elastic/go-elasticsearch/v6"
        "github.com/elastic/go-elasticsearch/v6/esapi"
+       "github.com/elastic/go-elasticsearch/v6/estransport"
        "github.com/elastic/go-elasticsearch/v6/esutil"
 )
 
@@ -110,6 +113,8 @@ func main() {
                // Retry up to 5 attempts
                //
                MaxRetries: 5,
+
+               Logger: &estransport.ColorLogger{Output: os.Stdout, EnableRequestBody: true, EnableResponseBody: true},
        })
        if err != nil {
                log.Fatalf("Error creating the client: %s", err)
@@ -124,12 +129,13 @@ func main() {
        //       See an example in the "benchmarks" folder.
        //
        bi, err := esutil.NewBulkIndexer(esutil.BulkIndexerConfig{
-               Index:         indexName,        // The default index name
-               DocumentType:  "_doc",           // The default document type
+               Index: indexName, // The default index name
+               // DocumentType:  "_doc",           // The default document type
                Client:        es,               // The Elasticsearch client
                NumWorkers:    numWorkers,       // The number of worker goroutines
                FlushBytes:    int(flushBytes),  // The flush threshold in bytes
                FlushInterval: 30 * time.Second, // The periodic flush interval
+               OnError:       func(ctx context.Context, err error) { fmt.Printf("BulkIndexer Error: %s\n", err) },
        })
        if err != nil {
                log.Fatalf("Error creating the indexer: %s", err)

The OnError callback for the indexer should handle the situation you describe.

I'm out of office this months, so I have limited time to look into that, but I can continue the conversation in this ticket.

@karmi
Copy link
Contributor

karmi commented Sep 28, 2020

Hello, any news here?

@sumanthakannantha
Copy link

We are facing a similar issue. In flush function, errors are not propagated to individual items using onFailure callback.
func (w *worker) flush(ctx context.Context) error {
.
.
.
.

res, err := req.Do(ctx, w.bi.config.Client)
if err != nil {
atomic.AddUint64(&w.bi.stats.numFailed, uint64(len(w.items)))
if w.bi.config.OnError != nil {
w.bi.config.OnError(ctx, fmt.Errorf("flush: %s", err))
}
return fmt.Errorf("flush: %s", err)
}

If the API fails due to a timeout/connection error, this is will be conveyed through "w.bi.config.OnError". It should notify to the individual item.OnFailure(ctx, item, info, nil)

@itzikYeret
Copy link

Hi
any update on this issue?

@pundliksarafdar
Copy link

Any update on this? This update will be very helpful for us in many use cases

@sumanthakannantha
Copy link

Hi
Any update on this issue?

@brouillette
Copy link

Hello!
Any updates on this?

@ansssu
Copy link

ansssu commented Feb 22, 2023

Hi guys,
Any update on this issue?

@ansssu ansssu linked a pull request Mar 6, 2023 that will close this issue
@tornaci45
Copy link

Hi guys,
Any update on this issue?

1 similar comment
@rpecb
Copy link

rpecb commented Dec 8, 2023

Hi guys,
Any update on this issue?

@amalic
Copy link

amalic commented Jan 21, 2024

Any update on this issue?

@sumantha-kannantha-hpe
Copy link

Hi Team

Is it possible to expedite this critical requirement?

@bailaodito
Copy link

Hey there! Looks like we've got a PR that tackles this issue. Any chance someone from the Elastic team could lend a hand with it?

@wahlflo
Copy link

wahlflo commented Jul 2, 2024

Without that requirement the lib is really not usable in production systems...

@nathan-nguyen-sa
Copy link

Bumping this. It'd be great if somebody could take a look at the PR that @bailaodito mentioned. It's #615 I believe.

@agorman
Copy link

agorman commented Dec 9, 2024

I really need this too. If my connection to Elasticsearch is lost the bulk indexer drops the messages and I have no way to know what was dropped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.