diff --git a/pkg/cmd/producerList.go b/pkg/cmd/producerList.go index 8b055de..d7e5865 100644 --- a/pkg/cmd/producerList.go +++ b/pkg/cmd/producerList.go @@ -61,6 +61,7 @@ var producerListCmd = &cobra.Command{ fmt.Printf("%sWASM Function%s (--output = wasm)\n", Green, Reset) fmt.Printf("%sAWS DynamoDB%s (--output = awsdynamodb)\n", Green, Reset) fmt.Printf("%sWAMP Topic%s (--output = wamp)\n", Green, Reset) + fmt.Printf("%sWAMP RPC%s (--output = wamprpc)\n", Green, Reset) fmt.Println() }, diff --git a/pkg/cmd/templateRun.go b/pkg/cmd/templateRun.go index ce043fb..5cfa149 100644 --- a/pkg/cmd/templateRun.go +++ b/pkg/cmd/templateRun.go @@ -128,6 +128,8 @@ jr template run --template "{{name}}" configuration.GlobalCfg.WASMConfig, _ = cmd.Flags().GetString(f.Name) case "wampConfig": configuration.GlobalCfg.WAMPConfig, _ = cmd.Flags().GetString(f.Name) + case "wampRpcConfig": + configuration.GlobalCfg.WAMPRPCConfig, _ = cmd.Flags().GetString(f.Name) } } }) @@ -201,5 +203,6 @@ func init() { templateRunCmd.Flags().String("luascriptConfig", "", "LUA Script configuration") templateRunCmd.Flags().String("wasmConfig", "", "WASM configuration") templateRunCmd.Flags().String("wampConfig", "", "WAMP configuration") + templateRunCmd.Flags().String("wampRpcConfig", "", "WAMP-RPC configuration") } diff --git a/pkg/configuration/configuration.go b/pkg/configuration/configuration.go index ae3b7f2..64ab840 100644 --- a/pkg/configuration/configuration.go +++ b/pkg/configuration/configuration.go @@ -45,6 +45,7 @@ type GlobalConfiguration struct { LUAScriptConfig string WASMConfig string WAMPConfig string + WAMPRPCConfig string Url string EmbeddedTemplate bool FileNameTemplate bool diff --git a/pkg/emitter/emitter.go b/pkg/emitter/emitter.go index 4db63da..43eb320 100644 --- a/pkg/emitter/emitter.go +++ b/pkg/emitter/emitter.go @@ -47,6 +47,7 @@ import ( "github.com/jrnd-io/jr/pkg/producers/s3" "github.com/jrnd-io/jr/pkg/producers/server" "github.com/jrnd-io/jr/pkg/producers/wamp" + "github.com/jrnd-io/jr/pkg/producers/wamprpc" "github.com/jrnd-io/jr/pkg/tpl" "github.com/rs/zerolog/log" ) @@ -182,6 +183,10 @@ func (e *Emitter) Initialize(ctx context.Context, conf configuration.GlobalConfi e.Producer = createWAMPProducer(ctx, conf.WAMPConfig) return } + if e.Output == "wamprpc" { + e.Producer = createWAMPRPCProducer(ctx, conf.WAMPRPCConfig) + return + } } @@ -297,6 +302,13 @@ func createWAMPProducer(ctx context.Context, config string) Producer { return producer } +func createWAMPRPCProducer(ctx context.Context, config string) Producer { + producer := &wamprpc.Producer{} + producer.Initialize(ctx, config) + + return producer +} + func createKafkaProducer(ctx context.Context, conf configuration.GlobalConfiguration, topic string, templateType string) *kafka.Manager { kManager := &kafka.Manager{ diff --git a/pkg/producers/wamprpc/config.json.example b/pkg/producers/wamprpc/config.json.example new file mode 100644 index 0000000..c0ea7d8 --- /dev/null +++ b/pkg/producers/wamprpc/config.json.example @@ -0,0 +1,10 @@ +{ + "wamp_uri": "ws://localhost:9009/ws", + "username": "admin", + "password": "password", + "realm": "realm1", + "procedure": "example", + "serType": "json", + "compress": true, + "authid": "clientJR" +} \ No newline at end of file diff --git a/pkg/producers/wamprpc/wampRPCProducer.go b/pkg/producers/wamprpc/wampRPCProducer.go new file mode 100644 index 0000000..2caf639 --- /dev/null +++ b/pkg/producers/wamprpc/wampRPCProducer.go @@ -0,0 +1,100 @@ +package wamprpc + +import ( + "context" + "encoding/json" + "os" + + "github.com/gammazero/nexus/v3/client" + "github.com/gammazero/nexus/v3/wamp" + "github.com/rs/zerolog/log" +) + +type Config struct { + WampURI string `json:"wamp_uri"` + Username string `json:"username"` + Password string `json:"password"` + Realm string `json:"realm"` + Procedure string `json:"procedure"` + SerType string `json:"serType"` + Compress bool `json:"compress"` + Authid string `json:"authid"` +} + +type Producer struct { + client client.Client + realm string + procedure string + authid string +} + +func (p *Producer) Initialize(ctx context.Context, configFile string) { + var config Config + file, err := os.ReadFile(configFile) + if err != nil { + log.Fatal().Err(err).Msg("Failed to read configuration file") + } + err = json.Unmarshal(file, &config) + if err != nil { + log.Fatal().Err(err).Msg("Failed to parse configuration parameters") + } + var wampclient *client.Client + + // Get requested serialization. + serialization := client.JSON + switch config.SerType { + case "json": + case "msgpack": + serialization = client.MSGPACK + case "cbor": + serialization = client.CBOR + default: + log.Fatal().Err(err).Msg("Invalid serialization, muse be one of: json, msgpack, cbor") + } + + cfg := client.Config{ + Realm: config.Realm, + Serialization: serialization, + HelloDetails: wamp.Dict{ + "authid": config.Authid, + }, + } + + if config.Compress { + cfg.WsCfg.EnableCompression = true + } + + addr := config.WampURI + + wampclient, err = client.ConnectNet(context.Background(), addr, cfg) + if err != nil { + log.Fatal().Err(err).Msg("Can't connect to WAMP Router") + } + // defer wampclient.Close() + + p.realm = config.Realm + p.procedure = config.Procedure + p.authid = config.Authid + + p.client = *wampclient +} + +func (p *Producer) Produce(ctx context.Context, k []byte, v []byte, _ any) { + data := string(v) + args := wamp.List{data} + opts := wamp.Dict{ + "authid": p.authid, + } + _, err := p.client.Call(ctx, p.procedure, opts, args, nil, nil) + if err != nil { + log.Fatal().Err(err).Msgf("call error: %s", err) + } +} + +func (p *Producer) Close(ctx context.Context) error { + err := p.client.Close() + if err != nil { + log.Warn().Err(err).Msg("Failed to close WAMP connection") + } + return err +} diff --git a/pkg/producers/wamprpc/wampRPCProducer_test.go b/pkg/producers/wamprpc/wampRPCProducer_test.go new file mode 100644 index 0000000..8e20028 --- /dev/null +++ b/pkg/producers/wamprpc/wampRPCProducer_test.go @@ -0,0 +1,58 @@ +//go:build exclude + +package wamprpc + +import ( + "context" + "testing" + + "github.com/jrnd-io/jr/pkg/producers/wamprpc" +) + +func TestProducer_Initialize(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamprpc.ProducerFactory("wamprpc") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } +} + +func TestProducer_Close(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamprpc.ProducerFactory("wamprpc") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + + producer.Close() +} + +func TestProducer_Produce(t *testing.T) { + configFile := "config.json.example" + + producer, err := wamprpc.ProducerFactory("wamprpc") + if err != nil { + t.Fatalf("Error reading configuration file: %v", err) + } + err = producer.Initialize(configFile) + if err != nil { + t.Fatalf("Error initializing producer: %v", err) + } + + ctx := context.Background() + key := "loo" + val := "foo" + exp := 0 + producer.Produce(ctx, key, val, exp) + +}