Skip to content

Commit

Permalink
Add a Modbus example
Browse files Browse the repository at this point in the history
Signed-off-by: Dusan Borovcanin <borovcanindusan1@gmail.com>
  • Loading branch information
dborovcanin committed Nov 6, 2023
1 parent 0a5ccbd commit 69063ba
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 68 deletions.
99 changes: 75 additions & 24 deletions cmd/modbus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"crypto/tls"
"crypto/x509"
"encoding/binary"
"fmt"
"io/ioutil"
"log"
Expand All @@ -16,19 +17,40 @@ import (
"github.com/goburrow/modbus"
"github.com/mainflux/agent/pkg/agent"
"github.com/mainflux/agent/pkg/bootstrap"
"github.com/mainflux/agent/pkg/edgex"
"github.com/mainflux/agent/pkg/encoder"
"github.com/mainflux/mainflux"
"github.com/mainflux/mainflux/logger"
"github.com/mainflux/mainflux/pkg/errors"
"github.com/nats-io/nats.go"
)

const msgTmplt = `
[
{
"bn": "urn:dev:demo:10001BCD:",
"bt": %d,
"n": "temperature",
"v": %v,
"u": "C"
},
{
"n": "humidity",
"v": %v,
"u": "V"
},
{
"n": "voltage",
"v": %v,
"u": "V",
"t": 10
}
]`

const (
defHTTPPort = "9998"
defBootstrapURL = "http://localhost:9013/things/bootstrap"
defBootstrapID = ""
defBootstrapKey = ""
defBootstrapID = "9scb6:s:sda:2"
defBootstrapKey = "key_123"
defBootstrapRetries = "5"
defBootstrapSkipTLS = "false"
defBootstrapRetryDelaySeconds = "10"
Expand Down Expand Up @@ -104,44 +126,55 @@ func main() {
logger.Error(fmt.Sprintf("Failed to load config: %s", err))
}

nc, err := nats.Connect(cfg.Server.NatsURL)
if err != nil {
logger.Error(fmt.Sprintf("Failed to connect to NATS: %s %s", err, cfg.Server.NatsURL))
os.Exit(1)
}
defer nc.Close()

mqttClient, err := connectToMQTTBroker(cfg.MQTT, logger)

if err != nil {
logger.Error(err.Error())
return
}
edgexClient := edgex.NewClient(cfg.Edgex.URL, logger)
handler := modbus.NewTCPClientHandler(cfg.ModBusConfig.Host)
handler.Timeout = 10 * time.Second
handler.SlaveId = 0xFF
handler.Connect()
defer handler.Close()

svc, err := agent.New(mqttClient, &cfg, edgexClient, nc, logger)
if err != nil {
logger.Error(fmt.Sprintf("Error in agent service: %s", err))
return
}
client := modbus.NewClient(handler)

logger.Info(fmt.Sprintf("Starting modbus for registers %v", cfg.ModBusConfig.Regs))
results := make([]uint16, len(cfg.ModBusConfig.Regs))
for {
for _, reg := range cfg.ModBusConfig.Regs {
for i, reg := range cfg.ModBusConfig.Regs {
logger.Info(fmt.Sprintf("reading modbus sensor on register: %d", reg))
data, err := readSensor(reg, cfg.ModBusConfig.Host, true)
result, err := client.ReadHoldingRegisters(reg, 1)
if err != nil {
logger.Error(fmt.Sprintf("failed to read sensor with error: %v", err.Error()))
continue
}
logger.Info("publishing sensor data")
if err := svc.Publish("data", string(data)); err != nil {
logger.Error(fmt.Sprintf("failed to publish with error: %v", err.Error()))
}
time.Sleep(cfg.ModBusConfig.PollingFrequency)
v, _ := SingleUint16FromBytes(result, 1)
results[i] = v
logger.Info(fmt.Sprintf("results %v", result))

}
time.Sleep(cfg.ModBusConfig.PollingFrequency)
topic := fmt.Sprintf("channels/%s/messages/data", cfg.Channels.Data)
msg := fmt.Sprintf(msgTmplt, time.Now().Unix(), results[0], results[1], results[2])
logger.Info(msg)
if err := publish(topic, msg, mqttClient, cfg.MQTT); err != nil {
logger.Error(fmt.Sprintf("failed to publish with error: %v", err.Error()))
}
}
}

func publish(t, payload string, client mqtt.Client, cfg agent.MQTTConfig) error {
token := client.Publish(t, cfg.QoS, cfg.Retain, payload)
token.Wait()
err := token.Error()
if err != nil {
return errors.New(err.Error())
}
return nil
}

func loadEnvConfig() (agent.Config, error) {
sc := agent.ServerConfig{
NatsURL: mainflux.Env(envNatsURL, defNatsURL),
Expand Down Expand Up @@ -338,7 +371,7 @@ func readSensor(register uint16, host string, simulate bool) ([]byte, error) {
}

func connectToMQTTBroker(conf agent.MQTTConfig, logger logger.Logger) (mqtt.Client, error) {
name := fmt.Sprintf("agent-%s22", conf.Username)
name := fmt.Sprintf("agent-%smodbus", conf.Username)
conn := func(client mqtt.Client) {
logger.Info(fmt.Sprintf("Client %s connected", name))
}
Expand Down Expand Up @@ -386,3 +419,21 @@ func connectToMQTTBroker(conf agent.MQTTConfig, logger logger.Logger) (mqtt.Clie
}
return client, nil
}

func SingleUint16FromBytes(bytes []byte, byteorder uint8) (uint16, error) {
bytesLen := len(bytes)
var val uint16
if bytesLen == 2 {
if byteorder == 1 { // comparison 1 = Big Endian
val = binary.BigEndian.Uint16(bytes)
return val, nil
} else if byteorder == 2 {
val = binary.LittleEndian.Uint16(bytes)
return val, nil
} else {
return 0, errors.New("Byte Order not specified")
}
} else {
return 0, errors.New("Array length is not equal to 2")
}
}
39 changes: 22 additions & 17 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,35 +2,40 @@ File = "config.toml"

[channels]
control = ""
data = ""
data = "46013270-7353-4157-81d6-587b786b7319"

[edgex]
url = "http://localhost:48090/api/v1/"
url = ""

[heartbeat]
interval = "10s"
interval = "0s"

[log]
level = "info"
level = ""

[modbus]
host = "localhost:502"
polling_frequency = "2s"
registers = [1, 3, 4]

[mqtt]
ca_cert = ""
ca_path = "ca.crt"
cert_path = "thing.cert"
client_cert = ""
client_key = ""
ca_cert = "ca_cert"
ca_path = ""
cert_path = ""
client_cert = "cert"
client_key = "test"
mtls = false
password = ""
priv_key_path = "thing.key"
password = "f07abbd6-0447-49a4-8239-2c9080dd3c15"
priv_key_path = ""
qos = 0
retain = false
skip_tls_ver = true
url = "localhost:1883"
username = ""
skip_tls_ver = false
url = "demo.mainflux.io:1883"
username = "ac7ecc53-1d5b-4321-917b-2f2c7e75d4a7"

[server]
nats_url = "nats://127.0.0.1:4222"
port = "9998"
nats_url = ""
port = ""

[terminal]
session_timeout = "1m0s"
session_timeout = "0s"
7 changes: 4 additions & 3 deletions pkg/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,16 @@ func (c *ModBusConfig) UnmarshalJSON(b []byte) error {
}
interval, ok := v["polling_frequency"]
if !ok {
return errors.New("missing modbusconfig polling frequency")
return errors.New("missing modbus config polling frequency")
}
c.Host, ok = v["host"].(string)
if !ok {
return errors.New("missing modbusconfig host")
c.Host = "localhost:502"
// return errors.New("missing modbus config host")
}
regs, ok := v["registers"].([]interface{})
if !ok {
return errors.New("missing modbusconfig registers")
return errors.New("missing modbus config registers")
}
for _, r := range regs {
switch val := r.(type) {
Expand Down
46 changes: 22 additions & 24 deletions pkg/bootstrap/bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"crypto/tls"
"crypto/x509"
"encoding/json"
"os"

"fmt"
"io/ioutil"
Expand Down Expand Up @@ -100,21 +99,21 @@ func Bootstrap(cfg Config, logger log.Logger, file string) error {
}
}

if len(dc.MainfluxChannels) < 2 {
return agent.ErrMalformedEntity
}
// if len(dc.MainfluxChannels) < 2 {
// return agent.ErrMalformedEntity
// }

ctrlChan := dc.MainfluxChannels[0].ID
dataChan := dc.MainfluxChannels[1].ID
// ctrlChan := dc.MainfluxChannels[0].ID
dataChan := dc.MainfluxChannels[0].ID
if dc.MainfluxChannels[0].Metadata["type"] == "data" {
ctrlChan = dc.MainfluxChannels[1].ID
// ctrlChan = dc.MainfluxChannels[1].ID
dataChan = dc.MainfluxChannels[0].ID
}

sc := dc.SvcsConf.Agent.Server
cc := agent.ChanConfig{
Control: ctrlChan,
Data: dataChan,
// Control: ctrlChan,
Data: dataChan,
}
ec := dc.SvcsConf.Agent.Edgex
lc := dc.SvcsConf.Agent.Log
Expand Down Expand Up @@ -167,20 +166,20 @@ func fillExportConfig(econf export.Config, c agent.Config) export.Config {
}

func saveExportConfig(econf export.Config, logger log.Logger) {
if econf.File == "" {
econf.File = exportConfigFile
}
exConfFileExist := false
if _, err := os.Stat(econf.File); err == nil {
exConfFileExist = true
logger.Info(fmt.Sprintf("Export config file %s exists", econf.File))
}
if !exConfFileExist {
logger.Info(fmt.Sprintf("Saving export config file %s", econf.File))
if err := export.Save(econf); err != nil {
logger.Warn(fmt.Sprintf("Failed to save export config file %s", err))
}
}
// if econf.File == "" {
// econf.File = exportConfigFile
// }
// exConfFileExist := false
// if _, err := os.Stat(econf.File); err == nil {
// exConfFileExist = true
// logger.Info(fmt.Sprintf("Export config file %s exists", econf.File))
// }
// if !exConfFileExist {
// logger.Info(fmt.Sprintf("Saving export config file %s", econf.File))
// if err := export.Save(econf); err != nil {
// logger.Warn(fmt.Sprintf("Failed to save export config file %s", err))
// }
// }
}

func getConfig(bsID, bsKey, bsSvrURL string, skipTLS bool, logger log.Logger) (deviceConfig, error) {
Expand Down Expand Up @@ -225,7 +224,6 @@ func getConfig(bsID, bsKey, bsSvrURL string, skipTLS bool, logger log.Logger) (d
if err := json.Unmarshal([]byte(body), &h); err != nil {
return deviceConfig{}, err
}
fmt.Println(h.Content)
sc := ServicesConfig{}
if err := json.Unmarshal([]byte(h.Content), &sc); err != nil {
return deviceConfig{}, err
Expand Down

0 comments on commit 69063ba

Please sign in to comment.