From d2c5dde8108e6037283db5d9453246ec7233b6c3 Mon Sep 17 00:00:00 2001 From: Ren0503 Date: Mon, 6 Jan 2025 15:30:42 +0700 Subject: [PATCH] feat(microservices): add mqtt transport --- microservices/mqtt/connect.go | 195 ++++++++++++++++++++++++ microservices/mqtt/connect_test.go | 229 +++++++++++++++++++++++++++++ microservices/mqtt/go.mod | 20 +++ microservices/mqtt/go.sum | 18 +++ 4 files changed, 462 insertions(+) create mode 100644 microservices/mqtt/connect.go create mode 100644 microservices/mqtt/connect_test.go create mode 100644 microservices/mqtt/go.mod create mode 100644 microservices/mqtt/go.sum diff --git a/microservices/mqtt/connect.go b/microservices/mqtt/connect.go new file mode 100644 index 0000000..7a02c96 --- /dev/null +++ b/microservices/mqtt/connect.go @@ -0,0 +1,195 @@ +package mqtt + +import ( + "fmt" + "reflect" + + mqtt_store "github.com/eclipse/paho.mqtt.golang" + "github.com/tinh-tinh/tinhtinh/v2/common" + "github.com/tinh-tinh/tinhtinh/v2/core" + "github.com/tinh-tinh/tinhtinh/v2/microservices" +) + +type Connect struct { + Module core.Module + client mqtt_store.Client + config microservices.Config +} + +type Options struct { + *mqtt_store.ClientOptions + microservices.Config +} + +func NewClient(opt Options) microservices.ClientProxy { + conn := mqtt_store.NewClient(opt.ClientOptions) + + if reflect.ValueOf(opt.Config).IsZero() { + opt.Config = microservices.DefaultConfig() + } + + connect := &Connect{ + client: conn, + config: opt.Config, + } + + return connect +} + +func (c *Connect) Send(event string, data interface{}, headers ...microservices.Header) error { + message := microservices.Message{ + Type: microservices.RPC, + Headers: common.CloneMap(c.config.Header), + Event: event, + Data: data, + } + if len(headers) > 0 { + for _, v := range headers { + common.MergeMaps(message.Headers, v) + } + } + + payload, err := c.Serializer(message) + if err != nil { + return err + } + + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + token := c.client.Publish(event, 0, false, payload) + token.Wait() + + c.client.Disconnect(250) + return nil +} + +func (c *Connect) Publish(event string, data interface{}, headers ...microservices.Header) error { + message := microservices.Message{ + Type: microservices.PubSub, + Headers: common.CloneMap(c.config.Header), + Event: event, + Data: data, + } + if len(headers) > 0 { + for _, v := range headers { + common.MergeMaps(message.Headers, v) + } + } + + payload, err := c.Serializer(message) + if err != nil { + return err + } + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + return token.Error() + } + + token := c.client.Publish(event, 0, false, payload) + token.Wait() + + c.client.Disconnect(250) + return nil +} + +func (c *Connect) Serializer(v interface{}) ([]byte, error) { + return c.config.Serializer(v) +} + +func (c *Connect) Deserializer(data []byte, v interface{}) error { + return c.config.Deserializer(data, v) +} + +func (c *Connect) ErrorHandler(err error) { + c.config.ErrorHandler(err) +} + +// Server usage +func New(module core.ModuleParam, opts ...Options) microservices.Service { + connect := &Connect{ + Module: module(), + config: microservices.DefaultConfig(), + } + + if len(opts) > 0 { + if opts[0].ClientOptions != nil { + conn := mqtt_store.NewClient(opts[0].ClientOptions) + connect.client = conn + } + if !reflect.ValueOf(opts[0].Config).IsZero() { + connect.config = microservices.ParseConfig(opts[0].Config) + } + } + + return connect +} +func Open(opts ...Options) core.Service { + connect := &Connect{ + config: microservices.DefaultConfig(), + } + + if len(opts) > 0 { + if opts[0].ClientOptions != nil { + conn := mqtt_store.NewClient(opts[0].ClientOptions) + connect.client = conn + } + if !reflect.ValueOf(opts[0].Config).IsZero() { + connect.config = microservices.ParseConfig(opts[0].Config) + } + } + + return connect +} + +func (c *Connect) Create(module core.Module) { + c.Module = module +} + +func (c *Connect) Listen() { + store := c.Module.Ref(microservices.STORE).(*microservices.Store) + if store == nil { + panic("store not found") + } + + if token := c.client.Connect(); token.Wait() && token.Error() != nil { + panic(token.Error()) + } + + if store.Subscribers[string(microservices.RPC)] != nil { + for _, sub := range store.Subscribers[string(microservices.RPC)] { + token := c.client.Subscribe(sub.Name, 0, func(client mqtt_store.Client, m mqtt_store.Message) { + c.handler(m, sub) + }) + token.Wait() + if token.Error() != nil { + fmt.Println(token.Error(), common.GetStructName(c.Module)) + continue + } + } + } + + if store.Subscribers[string(microservices.PubSub)] != nil { + for _, sub := range store.Subscribers[string(microservices.PubSub)] { + token := c.client.Subscribe(sub.Name, 0, func(client mqtt_store.Client, m mqtt_store.Message) { + c.handler(m, sub) + }) + token.Wait() + if token.Error() != nil { + fmt.Println(token.Error(), common.GetStructName(c.Module)) + continue + } + } + } +} + +func (c *Connect) handler(msg mqtt_store.Message, sub microservices.SubscribeHandler) { + var message microservices.Message + err := c.Deserializer(msg.Payload(), &message) + if err != nil { + fmt.Println("Error deserializing message: ", err) + return + } + + sub.Handle(c, message) +} diff --git a/microservices/mqtt/connect_test.go b/microservices/mqtt/connect_test.go new file mode 100644 index 0000000..aab18a0 --- /dev/null +++ b/microservices/mqtt/connect_test.go @@ -0,0 +1,229 @@ +package mqtt_test + +import ( + "fmt" + "io" + "net/http" + "net/http/httptest" + "sync" + "testing" + "time" + + mqtt_store "github.com/eclipse/paho.mqtt.golang" + "github.com/stretchr/testify/require" + "github.com/tinh-tinh/tinhtinh/microservices/mqtt" + "github.com/tinh-tinh/tinhtinh/v2/core" + "github.com/tinh-tinh/tinhtinh/v2/microservices" +) + +type Order struct { + ID string `json:"id"` + Name string `json:"name"` +} + +func OrderApp() *core.App { + type OrderService struct { + orders map[string]interface{} + mutex sync.RWMutex + } + + const ORDER core.Provide = "orders" + service := func(module core.Module) core.Provider { + prd := module.NewProvider(core.ProviderOptions{ + Name: ORDER, + Value: &OrderService{ + orders: make(map[string]interface{}), + mutex: sync.RWMutex{}, + }, + }) + + return prd + } + + guard := func(ref core.RefProvider, ctx microservices.Ctx) bool { + return ctx.Headers("tenant") != "1" + } + + handlerService := func(module core.Module) core.Provider { + handler := microservices.NewHandler(module, core.ProviderOptions{}) + + orderService := module.Ref(ORDER).(*OrderService) + handler.OnResponse("order.created", func(ctx microservices.Ctx) error { + data := ctx.Payload(&Order{}).(*Order) + + orderService.mutex.Lock() + if orderService.orders[data.ID] == nil { + orderService.orders[data.ID] = true + } + orderService.mutex.Unlock() + + fmt.Printf("Order created: %v\n", orderService.orders) + return nil + }) + + handler.Guard(guard).OnEvent("order.*", func(ctx microservices.Ctx) error { + fmt.Printf("Order Updated: %v\n", ctx.Payload()) + return nil + }) + + return handler + } + + controller := func(module core.Module) core.Controller { + ctrl := module.NewController("orders") + + ctrl.Get("", func(ctx core.Ctx) error { + orderService := module.Ref(ORDER).(*OrderService) + return ctx.JSON(core.Map{ + "data": orderService.orders, + }) + }) + + return ctrl + } + + appModule := func() core.Module { + module := core.NewModule(core.NewModuleOptions{ + Imports: []core.Modules{microservices.Register()}, + Controllers: []core.Controllers{controller}, + Providers: []core.Providers{ + service, + handlerService, + }, + }) + return module + } + + app := core.CreateFactory(appModule) + app.SetGlobalPrefix("order-api") + + return app +} + +func ProductApp(addr string) *core.App { + controller := func(module core.Module) core.Controller { + ctrl := module.NewController("products") + + client := microservices.Inject(module) + ctrl.Post("", func(ctx core.Ctx) error { + go client.Send("order.created", &Order{ + ID: "order1", + Name: "order1", + }) + return ctx.JSON(core.Map{ + "data": []string{"product1", "product2"}, + }) + }) + + ctrl.Post("multiple", func(ctx core.Ctx) error { + go client.Publish("order.updated", &Order{ + ID: "order1", + Name: "order1", + }, microservices.Header{"tenant": "1"}) + return ctx.JSON(core.Map{ + "data": []string{"product1", "product2"}, + }) + }) + + return ctrl + } + + appModule := func() core.Module { + opts := mqtt_store.NewClientOptions().AddBroker(addr).SetClientID("product-app") + module := core.NewModule(core.NewModuleOptions{ + Imports: []core.Modules{ + microservices.RegisterClient(mqtt.NewClient(mqtt.Options{ + ClientOptions: opts, + })), + }, + Controllers: []core.Controllers{controller}, + }) + return module + } + + app := core.CreateFactory(appModule) + app.SetGlobalPrefix("product-api") + + return app +} + +func DeliveryApp(addr string) microservices.Service { + service := func(module core.Module) core.Provider { + handler := microservices.NewHandler(module, core.ProviderOptions{}) + + handler.OnEvent("order.*", func(ctx microservices.Ctx) error { + fmt.Println("Delivery when have order:", ctx.Payload(&Order{})) + return nil + }) + + return handler + } + + appModule := func() core.Module { + module := core.NewModule(core.NewModuleOptions{ + Imports: []core.Modules{microservices.Register()}, + Providers: []core.Providers{ + service, + }, + }) + return module + } + + opts := mqtt_store.NewClientOptions().AddBroker(addr).SetClientID("delivery-app") + app := mqtt.New(appModule, mqtt.Options{ + ClientOptions: opts, + }) + + return app +} + +func Test_Practice(t *testing.T) { + deliveryApp := DeliveryApp("mqtt://localhost:1883") + go deliveryApp.Listen() + + orderApp := OrderApp() + opts := mqtt_store.NewClientOptions().AddBroker("mqtt://localhost:1883").SetClientID("order-app") + orderApp.ConnectMicroservice(mqtt.Open(mqtt.Options{ + ClientOptions: opts, + })) + + orderApp.StartAllMicroservices() + testOrderServer := httptest.NewServer(orderApp.PrepareBeforeListen()) + defer testOrderServer.Close() + + testClientOrder := testOrderServer.Client() + + resp, err := testClientOrder.Get(testOrderServer.URL + "/order-api/orders") + require.Nil(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + data, err := io.ReadAll(resp.Body) + require.Nil(t, err) + require.Equal(t, `{"data":{}}`, string(data)) + + productApp := ProductApp("mqtt://localhost:1883") + testProductServer := httptest.NewServer(productApp.PrepareBeforeListen()) + defer testProductServer.Close() + + testClientProduct := testProductServer.Client() + + resp, err = testClientProduct.Post(testProductServer.URL+"/product-api/products", "application/json", nil) + require.Nil(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + time.Sleep(1000 * time.Millisecond) + + resp, err = testClientOrder.Get(testOrderServer.URL + "/order-api/orders") + require.Nil(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + data, err = io.ReadAll(resp.Body) + require.Nil(t, err) + require.Equal(t, `{"data":{"order1":true}}`, string(data)) + + resp, err = testClientProduct.Post(testProductServer.URL+"/product-api/products/multiple", "application/json", nil) + require.Nil(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + time.Sleep(1000 * time.Millisecond) +} diff --git a/microservices/mqtt/go.mod b/microservices/mqtt/go.mod new file mode 100644 index 0000000..71b2009 --- /dev/null +++ b/microservices/mqtt/go.mod @@ -0,0 +1,20 @@ +module github.com/tinh-tinh/tinhtinh/microservices/mqtt + +go 1.22.2 + +require ( + github.com/eclipse/paho.mqtt.golang v1.5.0 + github.com/stretchr/testify v1.9.0 + github.com/tinh-tinh/tinhtinh/v2 v2.0.0-beta.5 +) + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/gorilla/websocket v1.5.3 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + golang.org/x/net v0.27.0 // indirect + golang.org/x/sync v0.7.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) + +replace github.com/tinh-tinh/tinhtinh/v2 => ../../ diff --git a/microservices/mqtt/go.sum b/microservices/mqtt/go.sum new file mode 100644 index 0000000..11df4ed --- /dev/null +++ b/microservices/mqtt/go.sum @@ -0,0 +1,18 @@ +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/eclipse/paho.mqtt.golang v1.5.0 h1:EH+bUVJNgttidWFkLLVKaQPGmkTUfQQqjOsyvMGvD6o= +github.com/eclipse/paho.mqtt.golang v1.5.0/go.mod h1:du/2qNQVqJf/Sqs4MEL77kR8QTqANF7XU7Fk0aOTAgk= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +golang.org/x/net v0.27.0 h1:5K3Njcw06/l2y9vpGCSdcxWOYHOUk3dVNGDXN+FvAys= +golang.org/x/net v0.27.0/go.mod h1:dDi0PyhWNoiUOrAS8uXv/vnScO4wnHQO4mj9fn/RytE= +golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M= +golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=