From 40c5ad2efa3350c145b098f86a97be4dbf2a5191 Mon Sep 17 00:00:00 2001 From: Foogaro Date: Mon, 2 Dec 2024 23:15:16 +0100 Subject: [PATCH 1/2] Added hash adn json support for Redis. --- pkg/cmd/producerList.go | 2 + pkg/cmd/templateRun.go | 2 +- pkg/emitter/emitter.go | 26 +++++++++ pkg/producers/redis/hashProducer.go | 63 +++++++++++++++++++++ pkg/producers/redis/hashProducer_test.go | 60 ++++++++++++++++++++ pkg/producers/redis/jsonProducer.go | 63 +++++++++++++++++++++ pkg/producers/redis/jsonProducer_test.go | 72 ++++++++++++++++++++++++ 7 files changed, 287 insertions(+), 1 deletion(-) create mode 100644 pkg/producers/redis/hashProducer.go create mode 100644 pkg/producers/redis/hashProducer_test.go create mode 100644 pkg/producers/redis/jsonProducer.go create mode 100644 pkg/producers/redis/jsonProducer_test.go diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index 8b055dec..98c9b9de 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -50,6 +50,8 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sKafka%s (--output = kafka)\n", Green, Reset) fmt.Printf("%sHTTP%s (--output = http)\n", Green, Reset) fmt.Printf("%sRedis%s (--output = redis)\n", Green, Reset) + fmt.Printf("%sRedis HASH%s (--output = redishash)\n", Green, Reset) + fmt.Printf("%sRedis JSON%s (--output = redisjson)\n", Green, Reset) fmt.Printf("%sMongodb%s (--output = mongo)\n", Green, Reset) fmt.Printf("%sElastic%s (--output = elastic)\n", Green, Reset) fmt.Printf("%sS3%s (--output = s3)\n", Green, Reset) diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index ce043fbd..a2f18a4c 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -179,7 +179,7 @@ func init() { templateRunCmd.Flags().StringP("topic", "t", constants.DEFAULT_TOPIC, "Kafka topic") templateRunCmd.Flags().Bool("kcat", false, "If you want to pipe jr with kcat, use this flag: it is equivalent to --output stdout --outputTemplate '{{key}},{{value}}' --oneline") - templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb") + templateRunCmd.Flags().StringP("output", "o", constants.DEFAULT_OUTPUT, "can be one of stdout, kafka, http, redis, redishash, redisjson, mongo, elastic, s3, gcs, azblobstorage, azcosmosdb, cassandra, luascript, wasm, awsdynamodb") templateRunCmd.Flags().String("outputTemplate", constants.DEFAULT_OUTPUT_TEMPLATE, "Formatting of K,V on standard output") templateRunCmd.Flags().BoolP("oneline", "l", false, "strips /n from output, for example to be pipelined to tools like kcat") templateRunCmd.Flags().BoolP("autocreate", "a", false, "if enabled, autocreate topics") diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 4db63da4..6518ee16 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -122,6 +122,16 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi return } + if e.Output == "redishash" { + e.Producer = createRedisHashProducer(ctx, conf.RedisTtl, conf.RedisConfig) + return + } + + if e.Output == "redisjson" { + e.Producer = createRedisJSONProducer(ctx, conf.RedisTtl, conf.RedisConfig) + return + } + if e.Output == "mongo" || e.Output == "mongodb" { e.Producer = createMongoProducer(ctx, conf.MongoConfig) return @@ -213,6 +223,22 @@ func createRedisProducer(_ context.Context, ttl time.Duration, redisConfig strin return rProducer } +func createRedisHashProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer { + rProducer := &redis.HashProducer{ + Ttl: ttl, + } + rProducer.Initialize(redisConfig) + return rProducer +} + +func createRedisJSONProducer(_ context.Context, ttl time.Duration, redisConfig string) Producer { + rProducer := &redis.JSONProducer{ + Ttl: ttl, + } + rProducer.Initialize(redisConfig) + return rProducer +} + func createMongoProducer(ctx context.Context, mongoConfig string) Producer { mProducer := &mongodb.MongoProducer{} mProducer.Initialize(ctx, mongoConfig) diff --git a/pkg/producers/redis/hashProducer.go b/pkg/producers/redis/hashProducer.go new file mode 100644 index 00000000..120d8094 --- /dev/null +++ b/pkg/producers/redis/hashProducer.go @@ -0,0 +1,63 @@ +package redis + +import ( + "context" + "encoding/json" + "os" + "time" + + "github.com/redis/go-redis/v9" + "github.com/rs/zerolog/log" +) + +type HashProducer struct { + client redis.Client + Ttl time.Duration +} + +func (p *HashProducer) Initialize(configFile string) { + var options redis.Options + + data, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to load Redis configFile") + } + + err = json.Unmarshal(data, &options) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse configuration parameters") + } + + p.client = *redis.NewClient(&options) +} + +func (p *HashProducer) Close(_ context.Context) error { + err := p.client.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close Redis connection") + } + return err +} + +func (p *HashProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + // Parse the JSON value into a map + var fields map[string]interface{} + err := json.Unmarshal(v, &fields) + if err != nil { + log.Fatal().Err(err).Msg("Failed to unmarshal JSON into hash fields") + } + + // Use HSet to set multiple hash fields at once + err = p.client.HSet(ctx, string(k), fields).Err() + if err != nil { + log.Fatal().Err(err).Msg("Failed to write hash data to Redis") + } + + // Set TTL if specified + if p.Ttl > 0 { + err = p.client.Expire(ctx, string(k), p.Ttl).Err() + if err != nil { + log.Fatal().Err(err).Msg("Failed to set TTL on Redis hash") + } + } +} \ No newline at end of file diff --git a/pkg/producers/redis/hashProducer_test.go b/pkg/producers/redis/hashProducer_test.go new file mode 100644 index 00000000..d4dd470d --- /dev/null +++ b/pkg/producers/redis/hashProducer_test.go @@ -0,0 +1,60 @@ +package redis + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +func TestHashProducer_Initialize(t *testing.T) { + configFile := "config.json.example" + + producer := &HashProducer{} + producer.Initialize(configFile) + + assert.NotNil(t, producer.client, "Redis client should be initialized") +} + +func TestHashProducer_Produce(t *testing.T) { + configFile := "config.json.example" + + producer := &HashProducer{ + Ttl: time.Minute, + } + producer.Initialize(configFile) + + ctx := context.Background() + key := "test_hash_key" + value := map[string]interface{}{ + "field1": "value1", + "field2": "value2", + } + valueBytes, _ := json.Marshal(value) + + producer.Produce(ctx, []byte(key), valueBytes, nil) + + // Verify the data in Redis + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Adjust as necessary + }) + defer client.Close() + + result, err := client.HGetAll(ctx, key).Result() + assert.NoError(t, err, "Should not error when getting hash from Redis") + assert.Equal(t, "value1", result["field1"], "Field1 should match") + assert.Equal(t, "value2", result["field2"], "Field2 should match") +} + +func TestHashProducer_Close(t *testing.T) { + configFile := "config.json.example" + + producer := &HashProducer{} + producer.Initialize(configFile) + + err := producer.Close(context.Background()) + assert.NoError(t, err, "Should not error when closing Redis connection") +} \ No newline at end of file diff --git a/pkg/producers/redis/jsonProducer.go b/pkg/producers/redis/jsonProducer.go new file mode 100644 index 00000000..a99121ec --- /dev/null +++ b/pkg/producers/redis/jsonProducer.go @@ -0,0 +1,63 @@ +package redis + +import ( + "context" + "encoding/json" + "os" + "time" + + "github.com/redis/go-redis/v9" + "github.com/rs/zerolog/log" +) + +type JSONProducer struct { + client redis.Client + Ttl time.Duration +} + +func (p *JSONProducer) Initialize(configFile string) { + var options redis.Options + + data, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to load Redis configFile") + } + + err = json.Unmarshal(data, &options) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse configuration parameters") + } + + p.client = *redis.NewClient(&options) +} + +func (p *JSONProducer) Close(_ context.Context) error { + err := p.client.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close Redis connection") + } + return err +} + +func (p *JSONProducer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + // Verify the input is valid JSON + var jsonData interface{} + err := json.Unmarshal(v, &jsonData) + if err != nil { + log.Fatal().Err(err).Msg("Failed to validate JSON data") + } + + // Use JSON.SET to store the JSON document + err = p.client.Do(ctx, "JSON.SET", string(k), "$", string(v)).Err() + if err != nil { + log.Fatal().Err(err).Msg("Failed to write JSON data to Redis") + } + + // Set TTL if specified + if p.Ttl > 0 { + err = p.client.Expire(ctx, string(k), p.Ttl).Err() + if err != nil { + log.Fatal().Err(err).Msg("Failed to set TTL on Redis key") + } + } +} diff --git a/pkg/producers/redis/jsonProducer_test.go b/pkg/producers/redis/jsonProducer_test.go new file mode 100644 index 00000000..6cac3ed9 --- /dev/null +++ b/pkg/producers/redis/jsonProducer_test.go @@ -0,0 +1,72 @@ +package redis + +import ( + "context" + "encoding/json" + "testing" + "time" + + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" +) + +func TestJSONProducer_Initialize(t *testing.T) { + configFile := "config.json.example" + + producer := &JSONProducer{} + producer.Initialize(configFile) + + assert.NotNil(t, producer.client, "Redis client should be initialized") +} + +func TestJSONProducer_Produce(t *testing.T) { + configFile := "config.json.example" + + producer := &JSONProducer{ + Ttl: time.Minute, + } + producer.Initialize(configFile) + + ctx := context.Background() + key := "test_json_key" + // Create a test JSON object with nested structures + testJSON := `{ + "id": "2210", + "user": { + "name": "Foogaro", + "year": 1978, + "email": "luigi@foogaro.com" + } + }` + producer.Produce(ctx, []byte(key), []byte(testJSON), nil) + + // Verify the data in Redis + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Adjust as necessary + }) + defer client.Close() + + // Use JSON.GET to retrieve the stored JSON + result, err := client.Do(ctx, "JSON.GET", key, "$").Text() + assert.NoError(t, err, "Should not error when getting JSON from Redis") + + // Compare the JSON strings (after normalizing them) + var expected, actual interface{} + err = json.Unmarshal([]byte(testJSON), &expected) + assert.NoError(t, err, "Should parse expected JSON") + + err = json.Unmarshal([]byte(result), &actual) + assert.NoError(t, err, "Should parse actual JSON") + + assert.Equal(t, expected, actual, "Stored JSON should match original") +} + +func TestJSONProducer_Close(t *testing.T) { + configFile := "config.json.example" + + producer := &JSONProducer{} + producer.Initialize(configFile) + + err := producer.Close(context.Background()) + assert.NoError(t, err, "Should not error when closing Redis connection") +} From bd0e661cfab2991ce5491e7446cfe9413e1290af Mon Sep 17 00:00:00 2001 From: Foogaro Date: Mon, 2 Dec 2024 23:28:20 +0100 Subject: [PATCH 2/2] Added the directive 'go:build exclude' to the test file. --- pkg/producers/redis/hashProducer_test.go | 108 ++++++++++++++--------- pkg/producers/redis/jsonProducer_test.go | 22 +++++ 2 files changed, 87 insertions(+), 43 deletions(-) diff --git a/pkg/producers/redis/hashProducer_test.go b/pkg/producers/redis/hashProducer_test.go index d4dd470d..94c816a9 100644 --- a/pkg/producers/redis/hashProducer_test.go +++ b/pkg/producers/redis/hashProducer_test.go @@ -1,60 +1,82 @@ +//go:build exclude + +// Copyright © 2024 JR team +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package redis import ( - "context" - "encoding/json" - "testing" - "time" + "context" + "encoding/json" + "testing" + "time" - "github.com/redis/go-redis/v9" - "github.com/stretchr/testify/assert" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" ) func TestHashProducer_Initialize(t *testing.T) { - configFile := "config.json.example" + configFile := "config.json.example" - producer := &HashProducer{} - producer.Initialize(configFile) + producer := &HashProducer{} + producer.Initialize(configFile) - assert.NotNil(t, producer.client, "Redis client should be initialized") + assert.NotNil(t, producer.client, "Redis client should be initialized") } func TestHashProducer_Produce(t *testing.T) { - configFile := "config.json.example" - - producer := &HashProducer{ - Ttl: time.Minute, - } - producer.Initialize(configFile) - - ctx := context.Background() - key := "test_hash_key" - value := map[string]interface{}{ - "field1": "value1", - "field2": "value2", - } - valueBytes, _ := json.Marshal(value) - - producer.Produce(ctx, []byte(key), valueBytes, nil) - - // Verify the data in Redis - client := redis.NewClient(&redis.Options{ - Addr: "localhost:6379", // Adjust as necessary - }) - defer client.Close() - - result, err := client.HGetAll(ctx, key).Result() - assert.NoError(t, err, "Should not error when getting hash from Redis") - assert.Equal(t, "value1", result["field1"], "Field1 should match") - assert.Equal(t, "value2", result["field2"], "Field2 should match") + configFile := "config.json.example" + + producer := &HashProducer{ + Ttl: time.Minute, + } + producer.Initialize(configFile) + + ctx := context.Background() + key := "test_hash_key" + value := map[string]interface{}{ + "field1": "value1", + "field2": "value2", + } + valueBytes, _ := json.Marshal(value) + + producer.Produce(ctx, []byte(key), valueBytes, nil) + + // Verify the data in Redis + client := redis.NewClient(&redis.Options{ + Addr: "localhost:6379", // Adjust as necessary + }) + defer client.Close() + + result, err := client.HGetAll(ctx, key).Result() + assert.NoError(t, err, "Should not error when getting hash from Redis") + assert.Equal(t, "value1", result["field1"], "Field1 should match") + assert.Equal(t, "value2", result["field2"], "Field2 should match") } func TestHashProducer_Close(t *testing.T) { - configFile := "config.json.example" + configFile := "config.json.example" - producer := &HashProducer{} - producer.Initialize(configFile) + producer := &HashProducer{} + producer.Initialize(configFile) - err := producer.Close(context.Background()) - assert.NoError(t, err, "Should not error when closing Redis connection") -} \ No newline at end of file + err := producer.Close(context.Background()) + assert.NoError(t, err, "Should not error when closing Redis connection") +} diff --git a/pkg/producers/redis/jsonProducer_test.go b/pkg/producers/redis/jsonProducer_test.go index 6cac3ed9..09491276 100644 --- a/pkg/producers/redis/jsonProducer_test.go +++ b/pkg/producers/redis/jsonProducer_test.go @@ -1,3 +1,25 @@ +//go:build exclude + +// Copyright © 2024 JR team +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + package redis import (