diff --git a/cmd/rp-indexer/main.go b/cmd/rp-indexer/main.go index 6b9d172..e79d214 100644 --- a/cmd/rp-indexer/main.go +++ b/cmd/rp-indexer/main.go @@ -14,6 +14,7 @@ import ( "github.com/nyaruka/ezconf" indexer "github.com/nyaruka/rp-indexer/v9" "github.com/nyaruka/rp-indexer/v9/indexers" + "github.com/nyaruka/rp-indexer/v9/runtime" slogmulti "github.com/samber/slog-multi" slogsentry "github.com/samber/slog-sentry" ) @@ -25,7 +26,7 @@ var ( ) func main() { - cfg := indexer.NewDefaultConfig() + cfg := runtime.NewDefaultConfig() loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"}) loader.MustLoad() @@ -36,6 +37,8 @@ func main() { os.Exit(1) } + rt := &runtime.Runtime{Config: cfg} + // configure our logger logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level}) slog.SetDefault(slog.New(logHandler)) @@ -44,7 +47,7 @@ func main() { logger.Info("starting indexer", "version", version, "released", date) // if we have a DSN entry, try to initialize it - if cfg.SentryDSN != "" { + if rt.Config.SentryDSN != "" { err := sentry.Init(sentry.ClientOptions{ Dsn: cfg.SentryDSN, EnableTracing: false, @@ -66,24 +69,24 @@ func main() { slog.SetDefault(logger) } - db, err := sql.Open("postgres", cfg.DB) + rt.DB, err = sql.Open("postgres", cfg.DB) if err != nil { logger.Error("unable to connect to database") } idxrs := []indexers.Indexer{ - indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500), + indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, rt.Config.ContactsShards, rt.Config.ContactsReplicas, 500), } - if cfg.Rebuild { + if rt.Config.Rebuild { // if rebuilding, just do a complete index and quit. In future when we support multiple indexers, // the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts idxr := idxrs[0] - if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil { + if _, err := idxr.Index(rt, true, rt.Config.Cleanup); err != nil { logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name()) } } else { - d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second) + d := indexer.NewDaemon(rt, idxrs) d.Start() handleSignals(d) diff --git a/daemon.go b/daemon.go index 2a3517d..844e986 100644 --- a/daemon.go +++ b/daemon.go @@ -2,7 +2,6 @@ package indexer import ( "context" - "database/sql" "fmt" "log/slog" "sync" @@ -10,11 +9,11 @@ import ( "github.com/nyaruka/gocommon/analytics" "github.com/nyaruka/rp-indexer/v9/indexers" + "github.com/nyaruka/rp-indexer/v9/runtime" ) type Daemon struct { - cfg *Config - db *sql.DB + rt *runtime.Runtime wg *sync.WaitGroup quit chan bool indexers []indexers.Indexer @@ -24,14 +23,13 @@ type Daemon struct { } // NewDaemon creates a new daemon to run the given indexers -func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon { +func NewDaemon(rt *runtime.Runtime, ixs []indexers.Indexer) *Daemon { return &Daemon{ - cfg: cfg, - db: db, + rt: rt, wg: &sync.WaitGroup{}, quit: make(chan bool), indexers: ixs, - poll: poll, + poll: time.Duration(rt.Config.Poll) * time.Second, prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)), } } @@ -39,8 +37,8 @@ func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Durati // Start starts this daemon func (d *Daemon) Start() { // if we have a librato token, configure it - if d.cfg.LibratoToken != "" { - analytics.RegisterBackend(analytics.NewLibrato(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg)) + if d.rt.Config.LibratoToken != "" { + analytics.RegisterBackend(analytics.NewLibrato(d.rt.Config.LibratoUsername, d.rt.Config.LibratoToken, d.rt.Config.InstanceName, time.Second, d.wg)) } analytics.Start() @@ -68,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) { case <-d.quit: return case <-time.After(d.poll): - _, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup) + _, err := indexer.Index(d.rt, d.rt.Config.Rebuild, d.rt.Config.Cleanup) if err != nil { log.Error("error during indexing", "error", err) } @@ -151,7 +149,7 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du return 0, fmt.Errorf("error getting ES last modified: %w", err) } - dbLastModified, err := ix.GetDBLastModified(ctx, d.db) + dbLastModified, err := ix.GetDBLastModified(ctx, d.rt.DB) if err != nil { return 0, fmt.Errorf("error getting DB last modified: %w", err) } diff --git a/indexers/base.go b/indexers/base.go index 6737329..8cfff34 100644 --- a/indexers/base.go +++ b/indexers/base.go @@ -12,6 +12,7 @@ import ( "time" "github.com/nyaruka/gocommon/jsonx" + "github.com/nyaruka/rp-indexer/v9/runtime" "github.com/nyaruka/rp-indexer/v9/utils" ) @@ -30,7 +31,7 @@ type Stats struct { // Indexer is base interface for indexers type Indexer interface { Name() string - Index(db *sql.DB, rebuild, cleanup bool) (string, error) + Index(rt *runtime.Runtime, rebuild, cleanup bool) (string, error) Stats() Stats GetESLastModified(index string) (time.Time, error) diff --git a/indexers/base_test.go b/indexers/base_test.go index b63882f..53a8150 100644 --- a/indexers/base_test.go +++ b/indexers/base_test.go @@ -15,14 +15,14 @@ import ( "github.com/nyaruka/gocommon/elastic" "github.com/nyaruka/gocommon/httpx" "github.com/nyaruka/gocommon/jsonx" - indexer "github.com/nyaruka/rp-indexer/v9" "github.com/nyaruka/rp-indexer/v9/indexers" + "github.com/nyaruka/rp-indexer/v9/runtime" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) -func setup(t *testing.T) (*indexer.Config, *sql.DB) { - cfg := indexer.NewDefaultConfig() +func setup(t *testing.T) *runtime.Runtime { + cfg := runtime.NewDefaultConfig() cfg.DB = "postgres://indexer_test:temba@localhost:5432/indexer_test?sslmode=disable" cfg.ContactsIndex = "indexer_test" @@ -46,10 +46,10 @@ func setup(t *testing.T) (*indexer.Config, *sql.DB) { slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug}))) - return cfg, db + return &runtime.Runtime{Config: cfg, DB: db} } -func assertQuery(t *testing.T, cfg *indexer.Config, query elastic.Query, expected []int64, msgAndArgs ...interface{}) { +func assertQuery(t *testing.T, cfg *runtime.Config, query elastic.Query, expected []int64, msgAndArgs ...interface{}) { results := elasticRequest(t, cfg, http.MethodPost, "/"+cfg.ContactsIndex+"/_search", map[string]any{"query": query, "sort": []map[string]any{{"id": "asc"}}}, ) @@ -65,7 +65,7 @@ func assertQuery(t *testing.T, cfg *indexer.Config, query elastic.Query, expecte assert.Equal(t, expected, actual, msgAndArgs...) } -func assertIndexesWithPrefix(t *testing.T, cfg *indexer.Config, prefix string, expected []string) { +func assertIndexesWithPrefix(t *testing.T, cfg *runtime.Config, prefix string, expected []string) { all := elasticRequest(t, cfg, http.MethodGet, "/_aliases", nil) actual := []string{} @@ -84,7 +84,7 @@ func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expe assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch") } -func elasticRequest(t *testing.T, cfg *indexer.Config, method, path string, data map[string]any) map[string]any { +func elasticRequest(t *testing.T, cfg *runtime.Config, method, path string, data map[string]any) map[string]any { var body io.Reader if data != nil { body = bytes.NewReader(jsonx.MustMarshal(data)) diff --git a/indexers/contacts.go b/indexers/contacts.go index 64e059e..9002136 100644 --- a/indexers/contacts.go +++ b/indexers/contacts.go @@ -7,6 +7,8 @@ import ( _ "embed" "fmt" "time" + + "github.com/nyaruka/rp-indexer/v9/runtime" ) //go:embed contacts.index.json @@ -30,7 +32,7 @@ func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int) } // Index indexes modified contacts and returns the name of the concrete index -func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) { +func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (string, error) { ctx := context.TODO() var err error @@ -63,7 +65,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified) // now index our docs - err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild) + err = i.indexModified(ctx, rt.DB, physicalIndex, lastModified.Add(-5*time.Second), rebuild) if err != nil { return "", fmt.Errorf("error indexing documents: %w", err) } diff --git a/indexers/contacts_test.go b/indexers/contacts_test.go index 814ec80..94933e7 100644 --- a/indexers/contacts_test.go +++ b/indexers/contacts_test.go @@ -165,36 +165,36 @@ var contactQueryTests = []struct { } func TestContacts(t *testing.T) { - cfg, db := setup(t) + rt := setup(t) - ix1 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4) + ix1 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4) assert.Equal(t, "indexer_test", ix1.Name()) - dbModified, err := ix1.GetDBLastModified(context.Background(), db) + dbModified, err := ix1.GetDBLastModified(context.Background(), rt.DB) assert.NoError(t, err) assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), dbModified, 0) // error trying to get ES last modified on before index exists - _, err = ix1.GetESLastModified(cfg.ContactsIndex) + _, err = ix1.GetESLastModified(rt.Config.ContactsIndex) assert.Error(t, err) expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02")) - indexName, err := ix1.Index(db, false, false) + indexName, err := ix1.Index(rt, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) time.Sleep(1 * time.Second) - esModified, err := ix1.GetESLastModified(cfg.ContactsIndex) + esModified, err := ix1.GetESLastModified(rt.Config.ContactsIndex) assert.NoError(t, err) assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0) assertIndexerStats(t, ix1, 9, 0) - assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName}) + assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName}) for _, tc := range contactQueryTests { - assertQuery(t, cfg, tc.query, tc.expected, "query mismatch for %s", tc.query) + assertQuery(t, rt.Config, tc.query, tc.expected, "query mismatch for %s", tc.query) } lastModified, err := ix1.GetESLastModified(indexName) @@ -202,37 +202,37 @@ func TestContacts(t *testing.T) { assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC)) // now make some contact changes, removing one contact, updating another - _, err = db.Exec(` + _, err = rt.DB.Exec(` DELETE FROM contacts_contactgroup_contacts WHERE contact_id = 2 AND contactgroup_id = 4; UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2; UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`) require.NoError(t, err) // and index again... - indexName, err = ix1.Index(db, false, false) + indexName, err = ix1.Index(rt, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName, indexName) // same index used assertIndexerStats(t, ix1, 10, 1) time.Sleep(1 * time.Second) - assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName}) + assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName}) // should only match new john, old john is gone - assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2}) + assertQuery(t, rt.Config, elastic.Match("name", "john"), []int64{2}) // 3 is no longer in our group - assertQuery(t, cfg, elastic.Match("group_ids", 4), []int64{1}) + assertQuery(t, rt.Config, elastic.Match("group_ids", 4), []int64{1}) // change John's name to Eric.. - _, err = db.Exec(` + _, err = rt.DB.Exec(` UPDATE contacts_contact SET name = 'Eric', modified_on = '2020-08-20 14:00:00+00' where id = 2;`) require.NoError(t, err) // and simulate another indexer doing a parallel rebuild - ix2 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4) + ix2 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4) - indexName2, err := ix2.Index(db, true, false) + indexName2, err := ix2.Index(rt, true, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used assertIndexerStats(t, ix2, 8, 0) @@ -240,23 +240,23 @@ func TestContacts(t *testing.T) { time.Sleep(1 * time.Second) // check we have a new index but the old index is still around - assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName, expectedIndexName + "_1"}) + assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName, expectedIndexName + "_1"}) // and the alias points to the new index - assertQuery(t, cfg, elastic.Match("name", "eric"), []int64{2}) + assertQuery(t, rt.Config, elastic.Match("name", "eric"), []int64{2}) // simulate another indexer doing a parallel rebuild with cleanup - ix3 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4) - indexName3, err := ix3.Index(db, true, true) + ix3 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4) + indexName3, err := ix3.Index(rt, true, true) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used assertIndexerStats(t, ix3, 8, 0) // check we cleaned up indexes besides the new one - assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName + "_2"}) + assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName + "_2"}) // check that the original indexer now indexes against the new index - indexName, err = ix1.Index(db, false, false) + indexName, err = ix1.Index(rt, false, false) assert.NoError(t, err) assert.Equal(t, expectedIndexName+"_2", indexName) } diff --git a/config.go b/runtime/config.go similarity index 98% rename from config.go rename to runtime/config.go index 24ec114..aaf30a1 100644 --- a/config.go +++ b/runtime/config.go @@ -1,4 +1,4 @@ -package indexer +package runtime import "os" diff --git a/runtime/runtime.go b/runtime/runtime.go new file mode 100644 index 0000000..fa0d19c --- /dev/null +++ b/runtime/runtime.go @@ -0,0 +1,8 @@ +package runtime + +import "database/sql" + +type Runtime struct { + Config *Config + DB *sql.DB +}