From ae6a4b0161c093fa78288ea8ef4d9d06c637a2d5 Mon Sep 17 00:00:00 2001 From: Efrem Ropelato Date: Sat, 9 Nov 2024 18:39:48 +0100 Subject: [PATCH 1/5] add WAMP-RPC Producer --- pkg/emitter/emitter.go | 12 +++ pkg/producers/wamprpc/config.json.example | 10 +++ pkg/producers/wamprpc/wampProducer.go | 100 +++++++++++++++++++++ pkg/producers/wamprpc/wampProducer_test.go | 58 ++++++++++++ 4 files changed, 180 insertions(+) create mode 100644 pkg/producers/wamprpc/config.json.example create mode 100644 pkg/producers/wamprpc/wampProducer.go create mode 100644 pkg/producers/wamprpc/wampProducer_test.go diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index e3aef58..95c42a4 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -47,6 +47,7 @@ import ( "github.com/jrnd-io/jr/pkg/producers/s3" "github.com/jrnd-io/jr/pkg/producers/server" "github.com/jrnd-io/jr/pkg/producers/wamp" + "github.com/jrnd-io/jr/pkg/producers/wamprpc" "github.com/jrnd-io/jr/pkg/tpl" "github.com/rs/zerolog/log" ) @@ -179,6 +180,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi e.Producer = createWAMPProducer(ctx, conf.WAMPConfig) return } + if e.Output == "wamprpc" { + e.Producer = createWAMPRPCProducer(ctx, conf.WAMPConfig) + return + } } @@ -294,6 +299,13 @@ func createWAMPProducer(ctx context.Context, config string) Producer { return producer } +func createWAMPRPCProducer(ctx context.Context, config string) Producer { + producer := &wamprpc.Producer{} + producer.Initialize(ctx, config) + + return producer +} + func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager { kManager := &kafka.Manager{ diff --git a/pkg/producers/wamprpc/config.json.example b/pkg/producers/wamprpc/config.json.example new file mode 100644 index 0000000..c0ea7d8 --- /dev/null +++ b/pkg/producers/wamprpc/config.json.example @@ -0,0 +1,10 @@ +{ + "wamp_uri": "ws://localhost:9009/ws", + "username": "admin", + "password": "password", + "realm": "realm1", + "procedure": "example", + "serType": "json", + "compress": true, + "authid": "clientJR" +} \ No newline at end of file diff --git a/pkg/producers/wamprpc/wampProducer.go b/pkg/producers/wamprpc/wampProducer.go new file mode 100644 index 0000000..2caf639 --- /dev/null +++ b/pkg/producers/wamprpc/wampProducer.go @@ -0,0 +1,100 @@ +package wamprpc + +import ( + "context" + "encoding/json" + "os" + + "github.com/gammazero/nexus/v3/client" + "github.com/gammazero/nexus/v3/wamp" + "github.com/rs/zerolog/log" +) + +type Config struct { + WampURI string `json:"wamp_uri"` + Username string `json:"username"` + Password string `json:"password"` + Realm string `json:"realm"` + Procedure string `json:"procedure"` + SerType string `json:"serType"` + Compress bool `json:"compress"` + Authid string `json:"authid"` +} + +type Producer struct { + client client.Client + realm string + procedure string + authid string +} + +func (p *Producer) Initialize(ctx context.Context, configFile string) { + var config Config + file, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read configuration file") + } + err = json.Unmarshal(file, &config) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse configuration parameters") + } + var wampclient *client.Client + + // Get requested serialization. + serialization := client.JSON + switch config.SerType { + case "json": + case "msgpack": + serialization = client.MSGPACK + case "cbor": + serialization = client.CBOR + default: + log.Fatal().Err(err).Msg("Invalid serialization, muse be one of: json, msgpack, cbor") + } + + cfg := client.Config{ + Realm: config.Realm, + Serialization: serialization, + HelloDetails: wamp.Dict{ + "authid": config.Authid, + }, + } + + if config.Compress { + cfg.WsCfg.EnableCompression = true + } + + addr := config.WampURI + + wampclient, err = client.ConnectNet(context.Background(), addr, cfg) + if err != nil { + log.Fatal().Err(err).Msg("Can't connect to WAMP Router") + } + // defer wampclient.Close() + + p.realm = config.Realm + p.procedure = config.Procedure + p.authid = config.Authid + + p.client = *wampclient +} + +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + data := string(v) + args := wamp.List{data} + opts := wamp.Dict{ + "authid": p.authid, + } + _, err := p.client.Call(ctx, p.procedure, opts, args, nil, nil) + if err != nil { + log.Fatal().Err(err).Msgf("call error: %s", err) + } +} + +func (p *Producer) Close(ctx context.Context) error { + err := p.client.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close WAMP connection") + } + return err +} diff --git a/pkg/producers/wamprpc/wampProducer_test.go b/pkg/producers/wamprpc/wampProducer_test.go new file mode 100644 index 0000000..222f600 --- /dev/null +++ b/pkg/producers/wamprpc/wampProducer_test.go @@ -0,0 +1,58 @@ +//go:build exclude + +package wamprpc + +import ( + "context" + "testing" + + "github.com/jrnd-io/jr/pkg/producers/wamp" +) + +func TestProducer_Initialize(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } +} + +func TestProducer_Close(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + + producer.Close() +} + +func TestProducer_Produce(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamp.ProducerFactory("wamp") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error initializing producer: %v", err) + } + + ctx := context.Background() + key := "loo" + val := "foo" + exp := 0 + producer.Produce(ctx, key, val, exp) + +} From dbf0d41496d68bd2deda4f324bd52406bc74405d Mon Sep 17 00:00:00 2001 From: Efrem Ropelato Date: Sat, 9 Nov 2024 18:44:01 +0100 Subject: [PATCH 2/5] add wampRpcConfig --- pkg/cmd/templateRun.go | 4 +++- pkg/configuration/configuration.go | 1 + pkg/emitter/emitter.go | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index af6ad84..2ffe671 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -127,6 +127,8 @@ jr template run --template "{{name}}" configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name) case "wampConfig": configuration.GlobalCfg.WAMPConfig, _ = cmd.Flags().GetString(f.Name) + case "wampRpcConfig": + configuration.GlobalCfg.WAMPRPCConfig, _ = cmd.Flags().GetString(f.Name) } } }) @@ -196,6 +198,6 @@ func init() { templateRunCmd.Flags().String("cassandraConfig", "", "Cassandra configuration") templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration") templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration") - templateRunCmd.Flags().String("wampConfig", "", "WAMP configuration") + templateRunCmd.Flags().String("wampRpcConfig", "", "WAMP-RPC configuration") } diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index ae3b7f2..64ab840 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -45,6 +45,7 @@ type GlobalConfiguration struct { LUAScriptConfig string WASMConfig string WAMPConfig string + WAMPRPCConfig string Url string EmbeddedTemplate bool FileNameTemplate bool diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 95c42a4..067234a 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -181,7 +181,7 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi return } if e.Output == "wamprpc" { - e.Producer = createWAMPRPCProducer(ctx, conf.WAMPConfig) + e.Producer = createWAMPRPCProducer(ctx, conf.WAMPRPCConfig) return } From 2f943bd1255287d9d7bbf61cb3829db5808c3204 Mon Sep 17 00:00:00 2001 From: Efrem Ropelato Date: Sat, 9 Nov 2024 21:59:25 +0100 Subject: [PATCH 3/5] add code configs --- .editorconfig | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) create mode 100644 .editorconfig diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..0778c7a --- /dev/null +++ b/.editorconfig @@ -0,0 +1,26 @@ +################################################ +# ╔═╗╔╦╗╦╔╦╗╔═╗╦═╗┌─┐┌─┐┌┐┌┌─┐┬┌─┐ +# ║╣ ║║║ ║ ║ ║╠╦╝│ │ ││││├┤ ││ ┬ +# o╚═╝═╩╝╩ ╩ ╚═╝╩╚═└─┘└─┘┘└┘└ ┴└─┘ +# +# Formatting conventions for your app. +# +# To review what each of these options mean, see: +# http://editorconfig.org/ +# +################################################ +root = true + +[*] +indent_style = space +indent_size = 4 +end_of_line = LF +charset = utf-8 +trim_trailing_whitespace = true +insert_final_newline = true + +[*.{yml,yaml}] +indent_size = 2 + +[*.go] +indent_style = tab \ No newline at end of file From aaaddebd1f04f90dee002be2b808061cd05d5e9d Mon Sep 17 00:00:00 2001 From: Efrem Ropelato Date: Sun, 10 Nov 2024 09:08:46 +0100 Subject: [PATCH 4/5] fix wamprpc producer file naming --- .../wamprpc/{wampProducer.go => wampRPCProducer.go} | 0 .../{wampProducer_test.go => wampRPCProducer_test.go} | 8 ++++---- 2 files changed, 4 insertions(+), 4 deletions(-) rename pkg/producers/wamprpc/{wampProducer.go => wampRPCProducer.go} (100%) rename pkg/producers/wamprpc/{wampProducer_test.go => wampRPCProducer_test.go} (83%) diff --git a/pkg/producers/wamprpc/wampProducer.go b/pkg/producers/wamprpc/wampRPCProducer.go similarity index 100% rename from pkg/producers/wamprpc/wampProducer.go rename to pkg/producers/wamprpc/wampRPCProducer.go diff --git a/pkg/producers/wamprpc/wampProducer_test.go b/pkg/producers/wamprpc/wampRPCProducer_test.go similarity index 83% rename from pkg/producers/wamprpc/wampProducer_test.go rename to pkg/producers/wamprpc/wampRPCProducer_test.go index 222f600..8e20028 100644 --- a/pkg/producers/wamprpc/wampProducer_test.go +++ b/pkg/producers/wamprpc/wampRPCProducer_test.go @@ -6,13 +6,13 @@ import ( "context" "testing" - "github.com/jrnd-io/jr/pkg/producers/wamp" + "github.com/jrnd-io/jr/pkg/producers/wamprpc" ) func TestProducer_Initialize(t *testing.T) { configFile := "config.json.example" - producer, err := wamp.ProducerFactory("wamp") + producer, err := wamprpc.ProducerFactory("wamprpc") if err != nil { t.Fatalf("Error reading configuration file: %v", err) } @@ -25,7 +25,7 @@ func TestProducer_Initialize(t *testing.T) { func TestProducer_Close(t *testing.T) { configFile := "config.json.example" - producer, err := wamp.ProducerFactory("wamp") + producer, err := wamprpc.ProducerFactory("wamprpc") if err != nil { t.Fatalf("Error reading configuration file: %v", err) } @@ -40,7 +40,7 @@ func TestProducer_Close(t *testing.T) { func TestProducer_Produce(t *testing.T) { configFile := "config.json.example" - producer, err := wamp.ProducerFactory("wamp") + producer, err := wamprpc.ProducerFactory("wamprpc") if err != nil { t.Fatalf("Error reading configuration file: %v", err) } From 119c67841699bb52dc96c4055ee25c15a69de3e9 Mon Sep 17 00:00:00 2001 From: Efrem Ropelato Date: Sun, 10 Nov 2024 14:09:21 +0100 Subject: [PATCH 5/5] add wamp rpc producer to list --- pkg/cmd/producerList.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index d8d6d74..b01067e 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -60,6 +60,7 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sLUA Script%s (--output = luascript)\n", Green, Reset) fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset) fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset) + fmt.Printf("%sWAMP RPC%s (--output = wamprpc)\n", Green, Reset) fmt.Println() },