diff --git a/CHANGELOG.md b/CHANGELOG.md index 8cd433d..6efa67f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +v9.1.9 (2024-06-10) +------------------------- + * Record stats inside indexing batch loop + * Split up created vs updated in progress logging + * Add track_total_hits to GetESLastModified + +v9.1.8 (2024-06-05) +------------------------- + * Update github actions versions + * Add healthcheck for elastic service in CI tests + * Update goreleaser config to v2 + v9.1.7 (2024-06-05) ------------------------- * Remove multi-search-db CI testing because it's unreliable diff --git a/README.md b/README.md index d79b474..0baa9d2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# Indexer +# 🗃️ Indexer [![Build Status](https://github.com/nyaruka/rp-indexer/workflows/CI/badge.svg)](https://github.com/nyaruka/rp-indexer/actions?query=workflow%3ACI) [![codecov](https://codecov.io/gh/nyaruka/rp-indexer/branch/main/graph/badge.svg)](https://codecov.io/gh/nyaruka/rp-indexer) diff --git a/indexers/base.go b/indexers/base.go index 348ea35..6737329 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -24,7 +24,7 @@ const deleteCommand = `{ "delete" : { "_id": %d, "version": %d, "version_type": type Stats struct { Indexed int64 // total number of documents indexed Deleted int64 // total number of documents deleted - Elapsed time.Duration // total time spent actually indexing + Elapsed time.Duration // total time spent actually indexing (excludes poll delay) } // Indexer is base interface for indexers @@ -84,8 +84,8 @@ func (i *baseIndexer) log() *slog.Logger { return slog.With("indexer", i.name) } -// records a complete index and updates statistics -func (i *baseIndexer) recordComplete(indexed, deleted int, elapsed time.Duration) { +// records indexing activity and updates statistics +func (i *baseIndexer) recordActivity(indexed, deleted int, elapsed time.Duration) { i.stats.Indexed += int64(indexed) i.stats.Deleted += int64(deleted) i.stats.Elapsed += elapsed @@ -267,20 +267,23 @@ type indexResponse struct { } // indexes the batch of contacts -func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { +func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, int, error) { response := indexResponse{} indexURL := fmt.Sprintf("%s/%s/_bulk", i.elasticURL, index) _, err := utils.MakeJSONRequest(http.MethodPut, indexURL, batch, &response) if err != nil { - return 0, 0, err + return 0, 0, 0, err } - createdCount, deletedCount, conflictedCount := 0, 0, 0 + createdCount, updatedCount, deletedCount, conflictedCount := 0, 0, 0, 0 + for _, item := range response.Items { if item.Index.ID != "" { slog.Debug("index response", "id", item.Index.ID, "status", item.Index.Status) - if item.Index.Status == 200 || item.Index.Status == 201 { + if item.Index.Status == 200 { + updatedCount++ + } else if item.Index.Status == 201 { createdCount++ } else if item.Index.Status == 409 { conflictedCount++ @@ -298,8 +301,10 @@ func (i *baseIndexer) indexBatch(index string, batch []byte) (int, int, error) { slog.Error("unparsed item in response") } } - slog.Debug("indexed batch", "created", createdCount, "deleted", deletedCount, "conflicted", conflictedCount) - return createdCount, deletedCount, nil + + slog.Debug("indexed batch", "created", createdCount, "updated", updatedCount, "deleted", deletedCount, "conflicted", conflictedCount) + + return createdCount, updatedCount, deletedCount, nil } // our response for finding the last modified document @@ -326,7 +331,7 @@ func (i *baseIndexer) GetESLastModified(index string) (time.Time, error) { _, err := utils.MakeJSONRequest( http.MethodPost, fmt.Sprintf("%s/%s/_search", i.elasticURL, index), - []byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1}`), + []byte(`{ "sort": [{ "modified_on_mu": "desc" }], "_source": {"includes": ["modified_on", "id"]}, "size": 1, "track_total_hits": false}`), queryResponse, ) if err != nil { diff --git a/indexers/contacts.go b/indexers/contacts.go index 8a9aef7..e15520d 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -63,14 +63,11 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified) // now index our docs - start := time.Now() - indexed, deleted, err := i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) + err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) if err != nil { return "", fmt.Errorf("error indexing documents: %w", err) } - i.recordComplete(indexed, deleted, time.Since(start)) - // if the index didn't previously exist or we are rebuilding, remap to our alias if remapAlias { err := i.updateAlias(physicalIndex) @@ -153,8 +150,8 @@ SELECT org_id, id, modified_on, is_active, row_to_json(t) FROM ( ` // IndexModified queries and indexes all contacts with a lastModified greater than or equal to the passed in time -func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) (int, int, error) { - totalFetched, totalCreated, totalDeleted := 0, 0, 0 +func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index string, lastModified time.Time, rebuild bool) error { + totalFetched, totalCreated, totalUpdated, totalDeleted := 0, 0, 0, 0 var modifiedOn time.Time var contactJSON string @@ -168,18 +165,20 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st batchStart := time.Now() // start time for this batch batchFetched := 0 // contacts fetched in this batch batchCreated := 0 // contacts created in ES + batchUpdated := 0 // contacts updated in ES batchDeleted := 0 // contacts deleted in ES batchESTime := time.Duration(0) // time spent indexing for this batch indexSubBatch := func(b *bytes.Buffer) error { t := time.Now() - created, deleted, err := i.indexBatch(index, b.Bytes()) + created, updated, deleted, err := i.indexBatch(index, b.Bytes()) if err != nil { return err } batchESTime += time.Since(t) batchCreated += created + batchUpdated += updated batchDeleted += deleted b.Reset() return nil @@ -191,17 +190,17 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // no more rows? return if err == sql.ErrNoRows { - return 0, 0, nil + return nil } if err != nil { - return 0, 0, err + return err } defer rows.Close() for rows.Next() { err = rows.Scan(&orgID, &id, &modifiedOn, &isActive, &contactJSON) if err != nil { - return 0, 0, err + return err } batchFetched++ @@ -224,14 +223,14 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st // write to elastic search in batches if batchFetched%i.batchSize == 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, err + return err } } } if subBatch.Len() > 0 { if err := indexSubBatch(subBatch); err != nil { - return 0, 0, err + return err } } @@ -239,6 +238,7 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st totalFetched += batchFetched totalCreated += batchCreated + totalUpdated += batchUpdated totalDeleted += batchDeleted totalTime := time.Since(start) @@ -249,10 +249,12 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st "rate", batchRate, "batch_fetched", batchFetched, "batch_created", batchCreated, + "batch_updated", batchUpdated, "batch_elapsed", batchTime, "batch_elapsed_es", batchESTime, "total_fetched", totalFetched, "total_created", totalCreated, + "total_updated", totalUpdated, "total_elapsed", totalTime, ) @@ -263,13 +265,15 @@ func (i *ContactIndexer) indexModified(ctx context.Context, db *sql.DB, index st log.Debug("indexed contact batch") } + i.recordActivity(batchCreated+batchUpdated, batchDeleted, time.Since(batchStart)) + // last modified stayed the same and we didn't add anything, seen it all, break out if lastModified.Equal(queryModified) && batchCreated == 0 { break } } - return totalCreated, totalDeleted, nil + return nil } func (i *ContactIndexer) GetDBLastModified(ctx context.Context, db *sql.DB) (time.Time, error) {