Skip to content

Commit

Permalink
Merge pull request #86 from nyaruka/runtime
Browse files Browse the repository at this point in the history
Add runtime.Runtime like our other go services have
  • Loading branch information
rowanseymour authored Dec 12, 2024
2 parents c01ca7e + eaa6514 commit 16ad34e
Show file tree
Hide file tree
Showing 8 changed files with 63 additions and 51 deletions.
17 changes: 10 additions & 7 deletions cmd/rp-indexer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/nyaruka/ezconf"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
slogmulti "github.com/samber/slog-multi"
slogsentry "github.com/samber/slog-sentry"
)
Expand All @@ -25,7 +26,7 @@ var (
)

func main() {
cfg := indexer.NewDefaultConfig()
cfg := runtime.NewDefaultConfig()
loader := ezconf.NewLoader(cfg, "indexer", "Indexes RapidPro contacts to ElasticSearch", []string{"indexer.toml"})
loader.MustLoad()

Expand All @@ -36,6 +37,8 @@ func main() {
os.Exit(1)
}

rt := &runtime.Runtime{Config: cfg}

// configure our logger
logHandler := slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: level})
slog.SetDefault(slog.New(logHandler))
Expand All @@ -44,7 +47,7 @@ func main() {
logger.Info("starting indexer", "version", version, "released", date)

// if we have a DSN entry, try to initialize it
if cfg.SentryDSN != "" {
if rt.Config.SentryDSN != "" {
err := sentry.Init(sentry.ClientOptions{
Dsn: cfg.SentryDSN,
EnableTracing: false,
Expand All @@ -66,24 +69,24 @@ func main() {
slog.SetDefault(logger)
}

db, err := sql.Open("postgres", cfg.DB)
rt.DB, err = sql.Open("postgres", cfg.DB)
if err != nil {
logger.Error("unable to connect to database")
}

idxrs := []indexers.Indexer{
indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, cfg.ContactsShards, cfg.ContactsReplicas, 500),
indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, rt.Config.ContactsShards, rt.Config.ContactsReplicas, 500),
}

if cfg.Rebuild {
if rt.Config.Rebuild {
// if rebuilding, just do a complete index and quit. In future when we support multiple indexers,
// the rebuild argument can be become the name of the index to rebuild, e.g. --rebuild=contacts
idxr := idxrs[0]
if _, err := idxr.Index(db, true, cfg.Cleanup); err != nil {
if _, err := idxr.Index(rt, true, rt.Config.Cleanup); err != nil {
logger.Error("error during rebuilding", "error", err, "indexer", idxr.Name())
}
} else {
d := indexer.NewDaemon(cfg, db, idxrs, time.Duration(cfg.Poll)*time.Second)
d := indexer.NewDaemon(rt, idxrs)
d.Start()

handleSignals(d)
Expand Down
20 changes: 9 additions & 11 deletions daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,18 @@ package indexer

import (
"context"
"database/sql"
"fmt"
"log/slog"
"sync"
"time"

"github.com/nyaruka/gocommon/analytics"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
)

type Daemon struct {
cfg *Config
db *sql.DB
rt *runtime.Runtime
wg *sync.WaitGroup
quit chan bool
indexers []indexers.Indexer
Expand All @@ -24,23 +23,22 @@ type Daemon struct {
}

// NewDaemon creates a new daemon to run the given indexers
func NewDaemon(cfg *Config, db *sql.DB, ixs []indexers.Indexer, poll time.Duration) *Daemon {
func NewDaemon(rt *runtime.Runtime, ixs []indexers.Indexer) *Daemon {
return &Daemon{
cfg: cfg,
db: db,
rt: rt,
wg: &sync.WaitGroup{},
quit: make(chan bool),
indexers: ixs,
poll: poll,
poll: time.Duration(rt.Config.Poll) * time.Second,
prevStats: make(map[indexers.Indexer]indexers.Stats, len(ixs)),
}
}

// Start starts this daemon
func (d *Daemon) Start() {
// if we have a librato token, configure it
if d.cfg.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(d.cfg.LibratoUsername, d.cfg.LibratoToken, d.cfg.InstanceName, time.Second, d.wg))
if d.rt.Config.LibratoToken != "" {
analytics.RegisterBackend(analytics.NewLibrato(d.rt.Config.LibratoUsername, d.rt.Config.LibratoToken, d.rt.Config.InstanceName, time.Second, d.wg))
}

analytics.Start()
Expand Down Expand Up @@ -68,7 +66,7 @@ func (d *Daemon) startIndexer(indexer indexers.Indexer) {
case <-d.quit:
return
case <-time.After(d.poll):
_, err := indexer.Index(d.db, d.cfg.Rebuild, d.cfg.Cleanup)
_, err := indexer.Index(d.rt, d.rt.Config.Rebuild, d.rt.Config.Cleanup)
if err != nil {
log.Error("error during indexing", "error", err)
}
Expand Down Expand Up @@ -151,7 +149,7 @@ func (d *Daemon) calculateLag(ctx context.Context, ix indexers.Indexer) (time.Du
return 0, fmt.Errorf("error getting ES last modified: %w", err)
}

dbLastModified, err := ix.GetDBLastModified(ctx, d.db)
dbLastModified, err := ix.GetDBLastModified(ctx, d.rt.DB)
if err != nil {
return 0, fmt.Errorf("error getting DB last modified: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion indexers/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/nyaruka/gocommon/jsonx"
"github.com/nyaruka/rp-indexer/v9/runtime"
"github.com/nyaruka/rp-indexer/v9/utils"
)

Expand All @@ -30,7 +31,7 @@ type Stats struct {
// Indexer is base interface for indexers
type Indexer interface {
Name() string
Index(db *sql.DB, rebuild, cleanup bool) (string, error)
Index(rt *runtime.Runtime, rebuild, cleanup bool) (string, error)
Stats() Stats

GetESLastModified(index string) (time.Time, error)
Expand Down
14 changes: 7 additions & 7 deletions indexers/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@ import (
"github.com/nyaruka/gocommon/elastic"
"github.com/nyaruka/gocommon/httpx"
"github.com/nyaruka/gocommon/jsonx"
indexer "github.com/nyaruka/rp-indexer/v9"
"github.com/nyaruka/rp-indexer/v9/indexers"
"github.com/nyaruka/rp-indexer/v9/runtime"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func setup(t *testing.T) (*indexer.Config, *sql.DB) {
cfg := indexer.NewDefaultConfig()
func setup(t *testing.T) *runtime.Runtime {
cfg := runtime.NewDefaultConfig()
cfg.DB = "postgres://indexer_test:temba@localhost:5432/indexer_test?sslmode=disable"
cfg.ContactsIndex = "indexer_test"

Expand All @@ -46,10 +46,10 @@ func setup(t *testing.T) (*indexer.Config, *sql.DB) {

slog.SetDefault(slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: slog.LevelDebug})))

return cfg, db
return &runtime.Runtime{Config: cfg, DB: db}
}

func assertQuery(t *testing.T, cfg *indexer.Config, query elastic.Query, expected []int64, msgAndArgs ...interface{}) {
func assertQuery(t *testing.T, cfg *runtime.Config, query elastic.Query, expected []int64, msgAndArgs ...interface{}) {
results := elasticRequest(t, cfg, http.MethodPost, "/"+cfg.ContactsIndex+"/_search",
map[string]any{"query": query, "sort": []map[string]any{{"id": "asc"}}},
)
Expand All @@ -65,7 +65,7 @@ func assertQuery(t *testing.T, cfg *indexer.Config, query elastic.Query, expecte
assert.Equal(t, expected, actual, msgAndArgs...)
}

func assertIndexesWithPrefix(t *testing.T, cfg *indexer.Config, prefix string, expected []string) {
func assertIndexesWithPrefix(t *testing.T, cfg *runtime.Config, prefix string, expected []string) {
all := elasticRequest(t, cfg, http.MethodGet, "/_aliases", nil)

actual := []string{}
Expand All @@ -84,7 +84,7 @@ func assertIndexerStats(t *testing.T, ix indexers.Indexer, expectedIndexed, expe
assert.Equal(t, expectedDeleted, actual.Deleted, "deleted mismatch")
}

func elasticRequest(t *testing.T, cfg *indexer.Config, method, path string, data map[string]any) map[string]any {
func elasticRequest(t *testing.T, cfg *runtime.Config, method, path string, data map[string]any) map[string]any {
var body io.Reader
if data != nil {
body = bytes.NewReader(jsonx.MustMarshal(data))
Expand Down
6 changes: 4 additions & 2 deletions indexers/contacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
_ "embed"
"fmt"
"time"

"github.com/nyaruka/rp-indexer/v9/runtime"
)

//go:embed contacts.index.json
Expand All @@ -30,7 +32,7 @@ func NewContactIndexer(elasticURL, name string, shards, replicas, batchSize int)
}

// Index indexes modified contacts and returns the name of the concrete index
func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error) {
func (i *ContactIndexer) Index(rt *runtime.Runtime, rebuild, cleanup bool) (string, error) {
ctx := context.TODO()
var err error

Expand Down Expand Up @@ -63,7 +65,7 @@ func (i *ContactIndexer) Index(db *sql.DB, rebuild, cleanup bool) (string, error
i.log().Debug("indexing newer than last modified", "index", physicalIndex, "last_modified", lastModified)

// now index our docs
err = i.indexModified(ctx, db, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
err = i.indexModified(ctx, rt.DB, physicalIndex, lastModified.Add(-5*time.Second), rebuild)
if err != nil {
return "", fmt.Errorf("error indexing documents: %w", err)
}
Expand Down
44 changes: 22 additions & 22 deletions indexers/contacts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,98 +165,98 @@ var contactQueryTests = []struct {
}

func TestContacts(t *testing.T) {
cfg, db := setup(t)
rt := setup(t)

ix1 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4)
ix1 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4)
assert.Equal(t, "indexer_test", ix1.Name())

dbModified, err := ix1.GetDBLastModified(context.Background(), db)
dbModified, err := ix1.GetDBLastModified(context.Background(), rt.DB)
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), dbModified, 0)

// error trying to get ES last modified on before index exists
_, err = ix1.GetESLastModified(cfg.ContactsIndex)
_, err = ix1.GetESLastModified(rt.Config.ContactsIndex)
assert.Error(t, err)

expectedIndexName := fmt.Sprintf("indexer_test_%s", time.Now().Format("2006_01_02"))

indexName, err := ix1.Index(db, false, false)
indexName, err := ix1.Index(rt, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName)

time.Sleep(1 * time.Second)

esModified, err := ix1.GetESLastModified(cfg.ContactsIndex)
esModified, err := ix1.GetESLastModified(rt.Config.ContactsIndex)
assert.NoError(t, err)
assert.WithinDuration(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), esModified, 0)

assertIndexerStats(t, ix1, 9, 0)
assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName})
assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName})

for _, tc := range contactQueryTests {
assertQuery(t, cfg, tc.query, tc.expected, "query mismatch for %s", tc.query)
assertQuery(t, rt.Config, tc.query, tc.expected, "query mismatch for %s", tc.query)
}

lastModified, err := ix1.GetESLastModified(indexName)
assert.NoError(t, err)
assert.Equal(t, time.Date(2017, 11, 10, 21, 11, 59, 890662000, time.UTC), lastModified.In(time.UTC))

// now make some contact changes, removing one contact, updating another
_, err = db.Exec(`
_, err = rt.DB.Exec(`
DELETE FROM contacts_contactgroup_contacts WHERE contact_id = 2 AND contactgroup_id = 4;
UPDATE contacts_contact SET name = 'John Deer', modified_on = '2020-08-20 14:00:00+00' where id = 2;
UPDATE contacts_contact SET is_active = FALSE, modified_on = '2020-08-22 15:00:00+00' where id = 4;`)
require.NoError(t, err)

// and index again...
indexName, err = ix1.Index(db, false, false)
indexName, err = ix1.Index(rt, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName, indexName) // same index used
assertIndexerStats(t, ix1, 10, 1)

time.Sleep(1 * time.Second)

assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName})
assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName})

// should only match new john, old john is gone
assertQuery(t, cfg, elastic.Match("name", "john"), []int64{2})
assertQuery(t, rt.Config, elastic.Match("name", "john"), []int64{2})

// 3 is no longer in our group
assertQuery(t, cfg, elastic.Match("group_ids", 4), []int64{1})
assertQuery(t, rt.Config, elastic.Match("group_ids", 4), []int64{1})

// change John's name to Eric..
_, err = db.Exec(`
_, err = rt.DB.Exec(`
UPDATE contacts_contact SET name = 'Eric', modified_on = '2020-08-20 14:00:00+00' where id = 2;`)
require.NoError(t, err)

// and simulate another indexer doing a parallel rebuild
ix2 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4)
ix2 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4)

indexName2, err := ix2.Index(db, true, false)
indexName2, err := ix2.Index(rt, true, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_1", indexName2) // new index used
assertIndexerStats(t, ix2, 8, 0)

time.Sleep(1 * time.Second)

// check we have a new index but the old index is still around
assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName, expectedIndexName + "_1"})
assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName, expectedIndexName + "_1"})

// and the alias points to the new index
assertQuery(t, cfg, elastic.Match("name", "eric"), []int64{2})
assertQuery(t, rt.Config, elastic.Match("name", "eric"), []int64{2})

// simulate another indexer doing a parallel rebuild with cleanup
ix3 := indexers.NewContactIndexer(cfg.ElasticURL, cfg.ContactsIndex, 2, 1, 4)
indexName3, err := ix3.Index(db, true, true)
ix3 := indexers.NewContactIndexer(rt.Config.ElasticURL, rt.Config.ContactsIndex, 2, 1, 4)
indexName3, err := ix3.Index(rt, true, true)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName3) // new index used
assertIndexerStats(t, ix3, 8, 0)

// check we cleaned up indexes besides the new one
assertIndexesWithPrefix(t, cfg, cfg.ContactsIndex, []string{expectedIndexName + "_2"})
assertIndexesWithPrefix(t, rt.Config, rt.Config.ContactsIndex, []string{expectedIndexName + "_2"})

// check that the original indexer now indexes against the new index
indexName, err = ix1.Index(db, false, false)
indexName, err = ix1.Index(rt, false, false)
assert.NoError(t, err)
assert.Equal(t, expectedIndexName+"_2", indexName)
}
2 changes: 1 addition & 1 deletion config.go → runtime/config.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package indexer
package runtime

import "os"

Expand Down
8 changes: 8 additions & 0 deletions runtime/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package runtime

import "database/sql"

type Runtime struct {
Config *Config
DB *sql.DB
}

0 comments on commit 16ad34e

Please sign in to comment.