From 937433afacd9d66eeb99b7b3669014ad496071df Mon Sep 17 00:00:00 2001 From: Luca Burgazzoli Date: Thu, 22 Aug 2024 22:34:18 +0200 Subject: [PATCH] Pass context down to producers --- pkg/cmd/emitterRun.go | 9 +-- pkg/cmd/server.go | 20 +++--- pkg/cmd/templateRun.go | 2 +- pkg/emitter/emitter.go | 75 +++++++++++---------- pkg/emitter/loop.go | 25 ++++--- pkg/producers/awsdynamodb/producer.go | 12 ++-- pkg/producers/azblobstorage/producer.go | 10 +-- pkg/producers/azcosmosdb/producer.go | 6 +- pkg/producers/cassandra/producer.go | 5 +- pkg/producers/console/ConsoleProducer.go | 5 +- pkg/producers/elastic/elasticProducer.go | 6 +- pkg/producers/gcs/gcsProducer.go | 9 +-- pkg/producers/http/producer.go | 5 +- pkg/producers/http/producer_test.go | 3 +- pkg/producers/kafka/kafkaProducer.go | 14 ++-- pkg/producers/luascript/producer.go | 5 +- pkg/producers/luascript/producer_test.go | 3 +- pkg/producers/mongoDB/mongoProducer.go | 12 ++-- pkg/producers/mongoDB/mongoProducer_test.go | 2 +- pkg/producers/redis/redisProducer.go | 5 +- pkg/producers/redis/redisProducer_test.go | 2 +- pkg/producers/s3/s3Producer.go | 12 ++-- pkg/producers/server/JsonProducer.go | 5 +- pkg/producers/test/TestProducer.go | 5 +- 24 files changed, 131 insertions(+), 126 deletions(-) diff --git a/pkg/cmd/emitterRun.go b/pkg/cmd/emitterRun.go index e5902b9b..cc4a84a2 100644 --- a/pkg/cmd/emitterRun.go +++ b/pkg/cmd/emitterRun.go @@ -21,6 +21,7 @@ package cmd import ( + "context" "github.com/jrnd-io/jr/pkg/emitter" "github.com/spf13/cobra" ) @@ -32,15 +33,15 @@ var emitterRunCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { dryrun, _ := cmd.Flags().GetBool("dryrun") - RunEmitters(args, emitters2, dryrun) + RunEmitters(cmd.Context(), args, emitters2, dryrun) }, } -func RunEmitters(emitterNames []string, ems map[string][]emitter.Emitter, dryrun bool) { +func RunEmitters(ctx context.Context, emitterNames []string, ems map[string][]emitter.Emitter, dryrun bool) { defer emitter.WriteStats() - defer emitter.CloseProducers(ems) - emittersToRun := emitter.Initialize(emitterNames, ems, dryrun) + defer emitter.CloseProducers(ctx, ems) + emittersToRun := emitter.Initialize(ctx, emitterNames, ems, dryrun) emitter.DoLoop(emittersToRun) } diff --git a/pkg/cmd/server.go b/pkg/cmd/server.go index a3c15003..ee08959e 100644 --- a/pkg/cmd/server.go +++ b/pkg/cmd/server.go @@ -118,11 +118,11 @@ var serverCmd = &cobra.Command{ router.Use(middleware.Timeout(60 * time.Second)) router.Use(SessionMiddleware) - //comment for local dev + // comment for local dev embeddedFileRoutes(router) - //Uncomment for local dev - //localDevServerSetup(router) + // Uncomment for local dev + // localDevServerSetup(router) router.Route("/emitters", func(r chi.Router) { r.Get("/", listEmitters) @@ -298,15 +298,15 @@ func addEmitter(w http.ResponseWriter, r *http.Request) { } func updateEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO update emitter by name + // @TODO update emitter by name } func deleteEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO delete emitter by name + // @TODO delete emitter by name } func startEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO start emitter by name + // @TODO start emitter by name w.Header().Set("Content-Type", "application/json") url := chi.URLParam(r, "emitter") @@ -317,7 +317,7 @@ func startEmitter(w http.ResponseWriter, r *http.Request) { } func stopEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO stop emitter by name + // @TODO stop emitter by name w.Header().Set("Content-Type", "application/json") url := chi.URLParam(r, "emitter") @@ -328,7 +328,7 @@ func stopEmitter(w http.ResponseWriter, r *http.Request) { } func pauseEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO pause emitter by name + // @TODO pause emitter by name w.Header().Set("Content-Type", "application/json") url := chi.URLParam(r, "emitter") @@ -345,7 +345,7 @@ func runEmitter(w http.ResponseWriter, r *http.Request) { if firstRun[url] == false { for i := 0; i < len(emitters); i++ { if functions.Contains([]string{url}, emitters[i].Name) { - emitters[i].Initialize(configuration.GlobalCfg) + emitters[i].Initialize(r.Context(), configuration.GlobalCfg) emitterToRun[url] = append(emitterToRun[url], emitters[i]) if emitters[i].Preload > 0 { emitters[i].Run(emitters[i].Preload, w) @@ -364,7 +364,7 @@ func runEmitter(w http.ResponseWriter, r *http.Request) { } func statusEmitter(w http.ResponseWriter, r *http.Request) { - //@TODO status emitter by name + // @TODO status emitter by name w.Header().Set("Content-Type", "application/json") url := chi.URLParam(r, "emitter") diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index f1f5c641..9d5075d3 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -147,7 +147,7 @@ jr template run --template "{{name}}" functions.SetSeed(seed) es := map[string][]emitter.Emitter{constants.DEFAULT_EMITTER_NAME: {e}} - RunEmitters([]string{e.Name}, es, false) + RunEmitters(cmd.Context(), []string{e.Name}, es, false) }, } diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index ad9c74f2..69694de1 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -21,13 +21,14 @@ package emitter import ( + "context" "fmt" "os" "time" "github.com/jrnd-io/jr/pkg/configuration" "github.com/jrnd-io/jr/pkg/constants" - "github.com/jrnd-io/jr/pkg/ctx" + jtctx "github.com/jrnd-io/jr/pkg/ctx" "github.com/jrnd-io/jr/pkg/functions" "github.com/jrnd-io/jr/pkg/producers/awsdynamodb" "github.com/jrnd-io/jr/pkg/producers/azblobstorage" @@ -68,7 +69,7 @@ type Emitter struct { VTpl tpl.Tpl } -func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { +func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfiguration) { functions.InitCSV(e.Csv) @@ -83,11 +84,11 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { } } - keyTpl, err := tpl.NewTpl("key", e.KeyTemplate, functions.FunctionsMap(), &ctx.JrContext) + keyTpl, err := tpl.NewTpl("key", e.KeyTemplate, functions.FunctionsMap(), &jtctx.JrContext) if err != nil { log.Fatal().Err(err).Msg("Failed to create key template") } - valueTpl, err := tpl.NewTpl("value", e.EmbeddedTemplate, functions.FunctionsMap(), &ctx.JrContext) + valueTpl, err := tpl.NewTpl("value", e.EmbeddedTemplate, functions.FunctionsMap(), &jtctx.JrContext) if err != nil { log.Fatal().Err(err).Msg("Failed to create value template") } @@ -102,7 +103,7 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { } if e.Output == "kafka" { - e.Producer = createKafkaProducer(conf, e.Topic, templateName) + e.Producer = createKafkaProducer(ctx, conf, e.Topic, templateName) return } else { if conf.SchemaRegistry { @@ -111,41 +112,41 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { } if e.Output == "redis" { - e.Producer = createRedisProducer(conf.RedisTtl, conf.RedisConfig) + e.Producer = createRedisProducer(ctx, conf.RedisTtl, conf.RedisConfig) return } if e.Output == "mongo" || e.Output == "mongodb" { - e.Producer = createMongoProducer(conf.MongoConfig) + e.Producer = createMongoProducer(ctx, conf.MongoConfig) return } if e.Output == "elastic" { - e.Producer = createElasticProducer(conf.ElasticConfig) + e.Producer = createElasticProducer(ctx, conf.ElasticConfig) return } if e.Output == "s3" { - e.Producer = createS3Producer(conf.S3Config) + e.Producer = createS3Producer(ctx, conf.S3Config) return } if e.Output == "awsdynamodb" { - e.Producer = createAWSDynamoDB(conf.AWSDynamoDBConfig) + e.Producer = createAWSDynamoDB(ctx, conf.AWSDynamoDBConfig) return } if e.Output == "gcs" { - e.Producer = createGCSProducer(conf.GCSConfig) + e.Producer = createGCSProducer(ctx, conf.GCSConfig) return } if e.Output == "azblobstorage" { - e.Producer = createAZBlobStorageProducer(conf.AzBlobStorageConfig) + e.Producer = createAZBlobStorageProducer(ctx, conf.AzBlobStorageConfig) return } if e.Output == "azcosmosdb" { - e.Producer = createAZCosmosDBProducer(conf.AzCosmosDBConfig) + e.Producer = createAZCosmosDBProducer(ctx, conf.AzCosmosDBConfig) return } @@ -155,16 +156,16 @@ func (e *Emitter) Initialize(conf configuration.GlobalConfiguration) { } if e.Output == "http" { - e.Producer = createHTTPProducer(conf.HTTPConfig) + e.Producer = createHTTPProducer(ctx, conf.HTTPConfig) return } if e.Output == "cassandra" { - e.Producer = createCassandraProducer(conf.CassandraConfig) + e.Producer = createCassandraProducer(ctx, conf.CassandraConfig) return } if e.Output == "luascript" { - e.Producer = createLUAScriptProducer(conf.LUAScriptConfig) + e.Producer = createLUAScriptProducer(ctx, conf.LUAScriptConfig) return } @@ -179,18 +180,18 @@ func (e *Emitter) Run(num int, o any) { kInValue := functions.GetV("KEY") if kInValue != "" { - e.Producer.Produce([]byte(kInValue), []byte(v), o) + e.Producer.Produce(context.TODO(), []byte(kInValue), []byte(v), o) } else { - e.Producer.Produce([]byte(k), []byte(v), o) + e.Producer.Produce(context.TODO(), []byte(k), []byte(v), o) } - ctx.JrContext.GeneratedObjects++ - ctx.JrContext.GeneratedBytes += int64(len(v)) + jtctx.JrContext.GeneratedObjects++ + jtctx.JrContext.GeneratedBytes += int64(len(v)) } } -func createRedisProducer(ttl time.Duration, redisConfig string) Producer { +func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer { rProducer := &redis.RedisProducer{ Ttl: ttl, } @@ -198,77 +199,77 @@ func createRedisProducer(ttl time.Duration, redisConfig string) Producer { return rProducer } -func createMongoProducer(mongoConfig string) Producer { +func createMongoProducer(ctx context.Context, mongoConfig string) Producer { mProducer := &mongoDB.MongoProducer{} - mProducer.Initialize(mongoConfig) + mProducer.Initialize(ctx, mongoConfig) return mProducer } -func createElasticProducer(elasticConfig string) Producer { +func createElasticProducer(_ context.Context, elasticConfig string) Producer { eProducer := &elastic.ElasticProducer{} eProducer.Initialize(elasticConfig) return eProducer } -func createS3Producer(s3Config string) Producer { +func createS3Producer(ctx context.Context, s3Config string) Producer { sProducer := &s3.S3Producer{} - sProducer.Initialize(s3Config) + sProducer.Initialize(ctx, s3Config) return sProducer } -func createAWSDynamoDB(config string) Producer { +func createAWSDynamoDB(ctx context.Context, config string) Producer { producer := &awsdynamodb.Producer{} - producer.Initialize(config) + producer.Initialize(ctx, config) return producer } -func createAZBlobStorageProducer(azConfig string) Producer { +func createAZBlobStorageProducer(ctx context.Context, azConfig string) Producer { producer := &azblobstorage.Producer{} - producer.Initialize(azConfig) + producer.Initialize(ctx, azConfig) return producer } -func createAZCosmosDBProducer(azConfig string) Producer { +func createAZCosmosDBProducer(_ context.Context, azConfig string) Producer { producer := &azcosmosdb.Producer{} producer.Initialize(azConfig) return producer } -func createGCSProducer(gcsConfig string) Producer { +func createGCSProducer(ctx context.Context, gcsConfig string) Producer { gProducer := &gcs.GCSProducer{} - gProducer.Initialize(gcsConfig) + gProducer.Initialize(ctx, gcsConfig) return gProducer } -func createHTTPProducer(httpConfig string) Producer { +func createHTTPProducer(_ context.Context, httpConfig string) Producer { httpProducer := &http.Producer{} httpProducer.Initialize(httpConfig) return httpProducer } -func createCassandraProducer(config string) Producer { +func createCassandraProducer(_ context.Context, config string) Producer { producer := &cassandra.Producer{} producer.Initialize(config) return producer } -func createLUAScriptProducer(config string) Producer { +func createLUAScriptProducer(_ context.Context, config string) Producer { producer := &luascript.Producer{} producer.Initialize(config) return producer } -func createKafkaProducer(conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager { +func createKafkaProducer(_ context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.KafkaManager { kManager := &kafka.KafkaManager{ Serializer: conf.Serializer, diff --git a/pkg/emitter/loop.go b/pkg/emitter/loop.go index 19e6efb0..3b9003d1 100644 --- a/pkg/emitter/loop.go +++ b/pkg/emitter/loop.go @@ -26,7 +26,6 @@ import ( "github.com/jrnd-io/jr/pkg/configuration" "github.com/jrnd-io/jr/pkg/ctx" "github.com/jrnd-io/jr/pkg/functions" - "io" "os" "os/signal" "strings" @@ -35,24 +34,24 @@ import ( ) type Producer interface { - Produce(k []byte, v []byte, o any) - io.Closer + Produce(ctx context.Context, key []byte, val []byte, o any) + Close(ctx context.Context) error } -func Initialize(emitterNames []string, es map[string][]Emitter, dryrun bool) []Emitter { +func Initialize(ctx context.Context, emitterNames []string, es map[string][]Emitter, dryrun bool) []Emitter { runAll := len(emitterNames) == 0 emittersToRun := make([]Emitter, 0, len(es)) if runAll { for _, emitters := range es { - emittersToRun = InitializeEmitters(emitters, dryrun, emittersToRun) + emittersToRun = InitializeEmitters(ctx, emitters, dryrun, emittersToRun) } } else { for _, name := range emitterNames { emitters, enabled := es[name] if enabled { - emittersToRun = InitializeEmitters(emitters, dryrun, emittersToRun) + emittersToRun = InitializeEmitters(ctx, emitters, dryrun, emittersToRun) } } } @@ -60,12 +59,12 @@ func Initialize(emitterNames []string, es map[string][]Emitter, dryrun bool) []E return emittersToRun } -func InitializeEmitters(emitters []Emitter, dryrun bool, emittersToRun []Emitter) []Emitter { +func InitializeEmitters(ctx context.Context, emitters []Emitter, dryrun bool, emittersToRun []Emitter) []Emitter { for i := 0; i < len(emitters); i++ { if dryrun { emitters[i].Output = "stdout" } - emitters[i].Initialize(configuration.GlobalCfg) + emitters[i].Initialize(ctx, configuration.GlobalCfg) emittersToRun = append(emittersToRun, emitters[i]) emitters[i].Run(emitters[i].Preload, nil) } @@ -140,9 +139,9 @@ func doTemplate(emitter Emitter) { kInValue := functions.GetV("KEY") if (kInValue) != "" { - emitter.Producer.Produce([]byte(kInValue), []byte(v), nil) + emitter.Producer.Produce(context.TODO(), []byte(kInValue), []byte(v), nil) } else { - emitter.Producer.Produce([]byte(k), []byte(v), nil) + emitter.Producer.Produce(context.TODO(), []byte(k), []byte(v), nil) } ctx.JrContext.GeneratedObjects++ @@ -151,12 +150,12 @@ func doTemplate(emitter Emitter) { } -func CloseProducers(es map[string][]Emitter) { +func CloseProducers(ctx context.Context, es map[string][]Emitter) { for _, v := range es { for i := 0; i < len(v); i++ { p := v[i].Producer if p != nil { - if err := p.Close(); err != nil { + if err := p.Close(ctx); err != nil { fmt.Printf("Error in closing producers: %v\n", err) } } @@ -169,7 +168,7 @@ func addEmitterToExpectedObjects(e Emitter) { d := e.Duration.Milliseconds() f := e.Frequency.Milliseconds() n := e.Num - //fmt.Printf("%d %d %d\n", d, f, n) + // fmt.Printf("%d %d %d\n", d, f, n) if d > 0 && f > 0 && n > 0 { expected := (d / f) * int64(n) diff --git a/pkg/producers/awsdynamodb/producer.go b/pkg/producers/awsdynamodb/producer.go index e4d442d8..9705aba3 100644 --- a/pkg/producers/awsdynamodb/producer.go +++ b/pkg/producers/awsdynamodb/producer.go @@ -38,7 +38,7 @@ type Producer struct { client *dynamodb.Client } -func (p *Producer) Initialize(configFile string) { +func (p *Producer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -53,7 +53,7 @@ func (p *Producer) Initialize(configFile string) { log.Fatal().Msg("Table is mandatory") } - awsConfig, err := awsconfig.LoadDefaultConfig(context.TODO()) + awsConfig, err := awsconfig.LoadDefaultConfig(ctx) if err != nil { log.Fatal().Err(err).Msg("Failed to load default AWS config") } @@ -63,10 +63,10 @@ func (p *Producer) Initialize(configFile string) { p.configuration = config } -func (p *Producer) Produce(_ []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, _ []byte, val []byte, _ any) { var jsonMap map[string]interface{} - if err := json.Unmarshal(v, &jsonMap); err != nil { + if err := json.Unmarshal(val, &jsonMap); err != nil { log.Fatal().Err(err).Msg("Failed to unmarshal json") } @@ -75,7 +75,7 @@ func (p *Producer) Produce(_ []byte, v []byte, _ any) { log.Fatal().Err(err).Msg("Failed to marshal map") } - _, err = p.client.PutItem(context.TODO(), &dynamodb.PutItemInput{ + _, err = p.client.PutItem(ctx, &dynamodb.PutItemInput{ TableName: aws.String(p.configuration.Table), Item: item, }) @@ -85,6 +85,6 @@ func (p *Producer) Produce(_ []byte, v []byte, _ any) { } -func (p *Producer) Close() error { +func (p *Producer) Close(_ context.Context) error { return nil } diff --git a/pkg/producers/azblobstorage/producer.go b/pkg/producers/azblobstorage/producer.go index fa2e1212..95a17f85 100644 --- a/pkg/producers/azblobstorage/producer.go +++ b/pkg/producers/azblobstorage/producer.go @@ -37,7 +37,7 @@ type Producer struct { client *azblob.Client } -func (p *Producer) Initialize(configFile string) { +func (p *Producer) Initialize(ctx context.Context, configFile string) { cfgBytes, err := os.ReadFile(configFile) if err != nil { log.Fatal().Err(err).Msg("Failed to read config file") @@ -72,7 +72,7 @@ func (p *Producer) Initialize(configFile string) { } if config.Container.Create { - _, err := client.CreateContainer(context.TODO(), config.Container.Name, nil) + _, err := client.CreateContainer(ctx, config.Container.Name, nil) if err != nil { log.Fatal().Err(err).Msg("Failed to create container") } @@ -82,7 +82,7 @@ func (p *Producer) Initialize(configFile string) { } -func (p *Producer) Produce(k []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { var key string if len(k) == 0 || strings.ToLower(string(k)) == "null" { @@ -93,7 +93,7 @@ func (p *Producer) Produce(k []byte, v []byte, _ any) { } resp, err := p.client.UploadBuffer( - context.TODO(), + ctx, p.configuration.Container.Name, key, v, @@ -111,6 +111,6 @@ func (p *Producer) Produce(k []byte, v []byte, _ any) { } -func (p *Producer) Close() error { +func (p *Producer) Close(_ context.Context) error { return nil } diff --git a/pkg/producers/azcosmosdb/producer.go b/pkg/producers/azcosmosdb/producer.go index f20989f9..9c41a662 100644 --- a/pkg/producers/azcosmosdb/producer.go +++ b/pkg/producers/azcosmosdb/producer.go @@ -72,7 +72,7 @@ func (p *Producer) Initialize(configFile string) { } -func (p *Producer) Produce(_ []byte, v []byte, _ any) { +func (p *Producer) Produce(ctx context.Context, _ []byte, v []byte, _ any) { // This is ugly but it works var jsonMap map[string]interface{} @@ -93,7 +93,7 @@ func (p *Producer) Produce(_ []byte, v []byte, _ any) { } pk := azcosmos.NewPartitionKeyString(pkValue.(string)) - resp, err := container.CreateItem(context.Background(), pk, v, nil) + resp, err := container.CreateItem(ctx, pk, v, nil) if err != nil { log.Fatal().Err(err).Msg("Failed to create item") } @@ -102,6 +102,6 @@ func (p *Producer) Produce(_ []byte, v []byte, _ any) { } -func (p *Producer) Close() error { +func (p *Producer) Close(_ context.Context) error { return nil } diff --git a/pkg/producers/cassandra/producer.go b/pkg/producers/cassandra/producer.go index 4ff9c72e..5631ff48 100644 --- a/pkg/producers/cassandra/producer.go +++ b/pkg/producers/cassandra/producer.go @@ -21,6 +21,7 @@ package cassandra import ( + "context" "encoding/json" "fmt" "os" @@ -97,7 +98,7 @@ func (p *Producer) Initialize(configFile string) { } -func (p *Producer) Produce(_ []byte, v []byte, _ any) { +func (p *Producer) Produce(_ context.Context, _ []byte, v []byte, _ any) { stmt := fmt.Sprintf("INSERT INTO %s.%s JSON ?", p.configuration.Keyspace, @@ -108,7 +109,7 @@ func (p *Producer) Produce(_ []byte, v []byte, _ any) { } } -func (p *Producer) Close() error { +func (p *Producer) Close(_ context.Context) error { p.session.Close() return nil } diff --git a/pkg/producers/console/ConsoleProducer.go b/pkg/producers/console/ConsoleProducer.go index 79c0f10d..8fcdbe9b 100644 --- a/pkg/producers/console/ConsoleProducer.go +++ b/pkg/producers/console/ConsoleProducer.go @@ -21,6 +21,7 @@ package console import ( + "context" "fmt" "github.com/jrnd-io/jr/pkg/tpl" ) @@ -29,12 +30,12 @@ type ConsoleProducer struct { OutputTpl *tpl.Tpl } -func (c *ConsoleProducer) Close() error { +func (c *ConsoleProducer) Close(_ context.Context) error { // no need to close return nil } -func (c *ConsoleProducer) Produce(key []byte, value []byte, o any) { +func (c *ConsoleProducer) Produce(_ context.Context, key []byte, value []byte, _ any) { data := struct { K string diff --git a/pkg/producers/elastic/elasticProducer.go b/pkg/producers/elastic/elasticProducer.go index 5d4fbfe6..b18b2851 100644 --- a/pkg/producers/elastic/elasticProducer.go +++ b/pkg/producers/elastic/elasticProducer.go @@ -86,7 +86,7 @@ func (p *ElasticProducer) Initialize(configFile string) { p.client = *client } -func (p *ElasticProducer) Produce(k []byte, v []byte, o any) { +func (p *ElasticProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { var req esapi.IndexRequest @@ -109,7 +109,7 @@ func (p *ElasticProducer) Produce(k []byte, v []byte, o any) { } } - res, err := req.Do(context.Background(), &p.client) + res, err := req.Do(ctx, &p.client) if err != nil { log.Fatal().Err(err).Msg("Failed to write data in Elastic") } @@ -120,7 +120,7 @@ func (p *ElasticProducer) Produce(k []byte, v []byte, o any) { } } -func (p *ElasticProducer) Close() error { +func (p *ElasticProducer) Close(_ context.Context) error { log.Warn().Msg("elasticsearch Client doesn't provide a close method!") return nil } diff --git a/pkg/producers/gcs/gcsProducer.go b/pkg/producers/gcs/gcsProducer.go index ff3c4b73..53d0c013 100644 --- a/pkg/producers/gcs/gcsProducer.go +++ b/pkg/producers/gcs/gcsProducer.go @@ -41,7 +41,7 @@ type GCSProducer struct { bucket string } -func (p *GCSProducer) Initialize(configFile string) { +func (p *GCSProducer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -53,7 +53,6 @@ func (p *GCSProducer) Initialize(configFile string) { log.Fatal().Err(err).Msg("Failed to parse configuration parameters") } - ctx := context.Background() // Use Google Application Default Credentials to authorize and authenticate the client. // More information about Application Default Credentials and how to enable is at // https://developers.google.com/identity/protocols/application-default-credentials. @@ -66,9 +65,7 @@ func (p *GCSProducer) Initialize(configFile string) { p.bucket = config.Bucket } -func (p *GCSProducer) Produce(k []byte, v []byte, o any) { - ctx := context.Background() - +func (p *GCSProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { bucket := p.bucket var key string @@ -92,7 +89,7 @@ func (p *GCSProducer) Produce(k []byte, v []byte, o any) { } -func (p *GCSProducer) Close() error { +func (p *GCSProducer) Close(_ context.Context) error { p.client.Close() return nil } diff --git a/pkg/producers/http/producer.go b/pkg/producers/http/producer.go index d9f18951..6cdab83c 100644 --- a/pkg/producers/http/producer.go +++ b/pkg/producers/http/producer.go @@ -21,6 +21,7 @@ package http import ( + "context" "crypto/tls" "encoding/json" "net/http" @@ -133,7 +134,7 @@ func (p *Producer) InitializeFromConfig(config Config) { } -func (p *Producer) Produce(k []byte, v []byte, o any) { +func (p *Producer) Produce(_ context.Context, _ []byte, v []byte, _ any) { var err error @@ -162,7 +163,7 @@ func (p *Producer) Produce(k []byte, v []byte, o any) { } -func (p *Producer) Close() error { +func (p *Producer) Close(_ context.Context) error { return nil } diff --git a/pkg/producers/http/producer_test.go b/pkg/producers/http/producer_test.go index 7b3c1ea9..1a9efb32 100644 --- a/pkg/producers/http/producer_test.go +++ b/pkg/producers/http/producer_test.go @@ -21,6 +21,7 @@ package http_test import ( + "context" "encoding/base64" "fmt" "io" @@ -252,7 +253,7 @@ func TestProducer(t *testing.T) { fakeUrl, mr.serveHTTP) - producer.Produce([]byte("key"), defaultBody, nil) + producer.Produce(context.TODO(), []byte("key"), defaultBody, nil) httpmock.DeactivateAndReset() }) } diff --git a/pkg/producers/kafka/kafkaProducer.go b/pkg/producers/kafka/kafkaProducer.go index 56484b70..8f3ed546 100644 --- a/pkg/producers/kafka/kafkaProducer.go +++ b/pkg/producers/kafka/kafkaProducer.go @@ -178,14 +178,14 @@ func registerProviders() { encryption.Register() } -func (k *KafkaManager) Close() error { +func (k *KafkaManager) Close(_ context.Context) error { k.admin.Close() k.producer.Flush(15 * 1000) k.producer.Close() return nil } -func (k *KafkaManager) Produce(key []byte, data []byte, o any) { +func (k *KafkaManager) Produce(_ context.Context, key []byte, data []byte, _ any) { go listenToEventsFrom(k.producer, k.Topic) @@ -203,7 +203,7 @@ func (k *KafkaManager) Produce(key []byte, data []byte, o any) { } ser, err = avrov2.NewSerializer(k.schema, serde.ValueSerde, serConfig) } else if k.Serializer == "protobuf" { - //ser, err = protobuf.NewSerializer(k.schema, serde.ValueSerde, protobuf.NewSerializerConfig()) + // ser, err = protobuf.NewSerializer(k.schema, serde.ValueSerde, protobuf.NewSerializerConfig()) log.Fatal().Msg("Protobuf not yet implemented") } else if k.Serializer == "json-schema" { ser, err = jsonschema.NewSerializer(k.schema, serde.ValueSerde, jsonschema.NewSerializerConfig()) @@ -239,15 +239,15 @@ func (k *KafkaManager) Produce(key []byte, data []byte, o any) { TopicPartition: kafka.TopicPartition{Topic: &k.Topic, Partition: kafka.PartitionAny}, Key: key, Value: data, - //Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, + // Headers: []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}}, }, nil) if err != nil { if err.(kafka.Error).Code() == kafka.ErrQueueFull { // Producer queue is full, wait 1s for messages // to be delivered then try again. - //time.Sleep(time.Second) - //continue + // time.Sleep(time.Second) + // continue } log.Error().Err(err).Msg("Failed to produce message") } @@ -305,7 +305,7 @@ func listenToEventsFrom(k *kafka.Producer, topic string) { if m.TopicPartition.Error != nil { log.Error().Err(m.TopicPartition.Error).Msg("Delivery failed") } else { - //fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) + // fmt.Printf("Delivered message to topic %s [%d] at offset %v\n", *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset) } case kafka.Error: log.Error().Err(ev).Msg("Kafka error") diff --git a/pkg/producers/luascript/producer.go b/pkg/producers/luascript/producer.go index c2b722a9..2987354f 100644 --- a/pkg/producers/luascript/producer.go +++ b/pkg/producers/luascript/producer.go @@ -19,6 +19,7 @@ package luascript import ( + "context" "encoding/json" "os" "strings" @@ -82,7 +83,7 @@ func (p *Producer) InitializeFromConfig(config Config) { } -func (p *Producer) Produce(k []byte, v []byte, _ any) { +func (p *Producer) Produce(_ context.Context, k []byte, v []byte, _ any) { L := lua.NewState() libs.Preload(L) @@ -99,6 +100,6 @@ func (p *Producer) Produce(k []byte, v []byte, _ any) { } -func (p *Producer) Close() error { +func (p *Producer) Close(ctx context.Context) error { return nil } diff --git a/pkg/producers/luascript/producer_test.go b/pkg/producers/luascript/producer_test.go index a6246107..90e8343a 100644 --- a/pkg/producers/luascript/producer_test.go +++ b/pkg/producers/luascript/producer_test.go @@ -19,6 +19,7 @@ package luascript_test import ( + "context" "testing" "github.com/jrnd-io/jr/pkg/producers/luascript" @@ -46,7 +47,7 @@ func TestProducer(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &luascript.Producer{} p.InitializeFromConfig(tc.config) - p.Produce([]byte("somekey"), []byte(someJson), nil) + p.Produce(context.TODO(), []byte("somekey"), []byte(someJson), nil) }) } diff --git a/pkg/producers/mongoDB/mongoProducer.go b/pkg/producers/mongoDB/mongoProducer.go index 0354e27c..a9cf0a6e 100644 --- a/pkg/producers/mongoDB/mongoProducer.go +++ b/pkg/producers/mongoDB/mongoProducer.go @@ -44,7 +44,7 @@ type MongoProducer struct { collection string } -func (p *MongoProducer) Initialize(configFile string) { +func (p *MongoProducer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -66,7 +66,7 @@ func (p *MongoProducer) Initialize(configFile string) { p.collection = config.Collection p.database = config.Database - client, err := mongo.Connect(context.Background(), clientOptions) + client, err := mongo.Connect(ctx, clientOptions) if err != nil { log.Fatal().Err(err).Msg("Can't connect to Mongo") @@ -75,7 +75,7 @@ func (p *MongoProducer) Initialize(configFile string) { p.client = *client } -func (p *MongoProducer) Produce(k []byte, v []byte, o any) { +func (p *MongoProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { collection := p.client.Database(p.database).Collection(p.collection) @@ -89,14 +89,14 @@ func (p *MongoProducer) Produce(k []byte, v []byte, o any) { dev["_id"] = string(k) } - _, err = collection.InsertOne(context.Background(), dev) + _, err = collection.InsertOne(ctx, dev) if err != nil { log.Fatal().Err(err).Msg("Failed to write data in Mongo") } } -func (p *MongoProducer) Close() error { - err := p.client.Disconnect(context.Background()) +func (p *MongoProducer) Close(ctx context.Context) error { + err := p.client.Disconnect(ctx) if err != nil { log.Warn().Err(err).Msg("Failed to close Mongo connection") } diff --git a/pkg/producers/mongoDB/mongoProducer_test.go b/pkg/producers/mongoDB/mongoProducer_test.go index a8b0bb29..d9e50c9e 100644 --- a/pkg/producers/mongoDB/mongoProducer_test.go +++ b/pkg/producers/mongoDB/mongoProducer_test.go @@ -24,7 +24,7 @@ package mongoDB import ( "context" - "jr/storage" + "github.com/jrnd-io/jr/pkg/storage" "testing" ) diff --git a/pkg/producers/redis/redisProducer.go b/pkg/producers/redis/redisProducer.go index 777bebf4..4f16240f 100644 --- a/pkg/producers/redis/redisProducer.go +++ b/pkg/producers/redis/redisProducer.go @@ -51,7 +51,7 @@ func (p *RedisProducer) Initialize(configFile string) { p.client = *redis.NewClient(&options) } -func (p *RedisProducer) Close() error { +func (p *RedisProducer) Close(_ context.Context) error { err := p.client.Close() if err != nil { log.Warn().Err(err).Msg("Failed to close Redis connection") @@ -59,8 +59,7 @@ func (p *RedisProducer) Close() error { return err } -func (p *RedisProducer) Produce(k []byte, v []byte, o any) { - ctx := context.Background() +func (p *RedisProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { err := p.client.Set(ctx, string(k), string(v), p.Ttl).Err() if err != nil { log.Fatal().Err(err).Msg("Failed to write data in Redis") diff --git a/pkg/producers/redis/redisProducer_test.go b/pkg/producers/redis/redisProducer_test.go index abc120cf..ec6d9c28 100644 --- a/pkg/producers/redis/redisProducer_test.go +++ b/pkg/producers/redis/redisProducer_test.go @@ -24,7 +24,7 @@ package redis import ( "context" - "jr/storage" + "github.com/jrnd-io/jr/pkg/storage" "testing" ) diff --git a/pkg/producers/s3/s3Producer.go b/pkg/producers/s3/s3Producer.go index 271061f5..a4ed2279 100644 --- a/pkg/producers/s3/s3Producer.go +++ b/pkg/producers/s3/s3Producer.go @@ -43,7 +43,7 @@ type S3Producer struct { bucket string } -func (p *S3Producer) Initialize(configFile string) { +func (p *S3Producer) Initialize(ctx context.Context, configFile string) { var config Config file, err := os.ReadFile(configFile) if err != nil { @@ -54,7 +54,7 @@ func (p *S3Producer) Initialize(configFile string) { log.Fatal().Err(err).Msg("Failed to parse configuration parameters") } - awsConfig, err := awsconfig.LoadDefaultConfig(context.TODO()) + awsConfig, err := awsconfig.LoadDefaultConfig(ctx) if err != nil { log.Fatal().Err(err).Msg("Failed to load default AWS config") } @@ -65,7 +65,7 @@ func (p *S3Producer) Initialize(configFile string) { p.bucket = config.Bucket } -func (p *S3Producer) Produce(k []byte, v []byte, o any) { +func (p *S3Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { bucket := p.bucket var key string @@ -77,8 +77,8 @@ func (p *S3Producer) Produce(k []byte, v []byte, o any) { key = string(k) } - //object will be stored with no content type - _, err := p.client.PutObject(context.TODO(), &s3.PutObjectInput{ + // object will be stored with no content type + _, err := p.client.PutObject(ctx, &s3.PutObjectInput{ Body: bytes.NewReader(v), Bucket: aws.String(bucket), Key: aws.String(key), @@ -89,7 +89,7 @@ func (p *S3Producer) Produce(k []byte, v []byte, o any) { } } -func (p *S3Producer) Close() error { +func (p *S3Producer) Close(_ context.Context) error { log.Warn().Msg("S3 Client doesn't provide a close method!") return nil } diff --git a/pkg/producers/server/JsonProducer.go b/pkg/producers/server/JsonProducer.go index cf042f8d..a7832635 100644 --- a/pkg/producers/server/JsonProducer.go +++ b/pkg/producers/server/JsonProducer.go @@ -21,6 +21,7 @@ package server import ( + "context" "net/http" "github.com/jrnd-io/jr/pkg/tpl" @@ -31,12 +32,12 @@ type JsonProducer struct { OutputTpl *tpl.Tpl } -func (c *JsonProducer) Close() error { +func (c *JsonProducer) Close(_ context.Context) error { // no need to close return nil } -func (c *JsonProducer) Produce(key []byte, value []byte, o interface{}) { +func (c *JsonProducer) Produce(_ context.Context, key []byte, value []byte, o any) { if o != nil { respWriter := o.(http.ResponseWriter) diff --git a/pkg/producers/test/TestProducer.go b/pkg/producers/test/TestProducer.go index 55fc9713..70544069 100644 --- a/pkg/producers/test/TestProducer.go +++ b/pkg/producers/test/TestProducer.go @@ -22,6 +22,7 @@ package test import ( "bytes" + "context" "github.com/jrnd-io/jr/pkg/tpl" "github.com/rs/zerolog/log" @@ -31,12 +32,12 @@ type TestProducer struct { OutputTpl *tpl.Tpl } -func (c *TestProducer) Close() error { +func (c *TestProducer) Close(ctx context.Context) error { // no need to close return nil } -func (c *TestProducer) Produce(key []byte, value []byte, o interface{}) { +func (c *TestProducer) Produce(_ context.Context, key []byte, value []byte, o any) { if o != nil { respWriter := o.(*bytes.Buffer)