Skip to content
This repository has been archived by the owner on Feb 11, 2020. It is now read-only.

Commit

Permalink
Develop (#4)
Browse files Browse the repository at this point in the history
* first working version

* updating license

* better msg handler

* adding some tests

* trigger travis

* update glide

* travis

* some changes

* travis

* update all

* trying stuff for travis

* update go in travis

* Update .travis.yml

OSX test never ending

* workig config parser + tests

* Revert "workig config parser + tests"

This reverts commit 1100621.

* working config parser + test

* update readme

* Format/gofmt (#3)

* changes for gofmt and golint

* Update README.md

* Update README.md

* removing debug msg
  • Loading branch information
nathan-K- authored Jul 21, 2017
1 parent de0c957 commit b12d641
Show file tree
Hide file tree
Showing 5 changed files with 42 additions and 77,627 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Mqttbeat
[![Build Status](https://travis-ci.org/nathan-K-/mqttbeat.svg?branch=master)](https://travis-ci.org/nathan-K-/mqttbeat)
[![Go Report Card](https://goreportcard.com/badge/github.com/nathan-K-/mqttbeat)](https://goreportcard.com/report/github.com/nathan-K-/mqttbeat)

Welcome to Mqttbeat.

This beat will allow you to put MQTT messages in an elasticsearch instance.
You can use [this little project](https://github.com/nathan-K-/stack-docker) to easily start an elastic stack.

Ensure that this folder is at the following location:
`${GOPATH}/github.com/nathan-k-/mqttbeat`
Expand Down
53 changes: 27 additions & 26 deletions beater/mqttbeat.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package beater

import (
"fmt"
"time"
"encoding/json"
"strings"
"fmt"
"strconv"
"strings"
"time"

"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
Expand All @@ -19,31 +19,32 @@ import (
"github.com/nathan-k-/mqttbeat/config"
)

// Mqttbeat represent a mqtt beat object
type Mqttbeat struct {
done chan struct{}
beatConfig config.Config
done chan struct{}
beatConfig config.Config
elasticClient publisher.Client
mqttClient MQTT.Client
mqttClient MQTT.Client
}

// Prepare mqtt client
func setupMqttClient(bt *Mqttbeat) {
mqttClientOpt := MQTT.NewClientOptions()
mqttClientOpt.AddBroker(bt.beatConfig.BrokerUrl)
logp.Info("BROKER url " + bt.beatConfig.BrokerUrl)
mqttClientOpt.AddBroker(bt.beatConfig.BrokerURL)
logp.Info("BROKER url " + bt.beatConfig.BrokerURL)

bt.mqttClient = MQTT.NewClient(mqttClientOpt)
}

// Creates beater
// New function creates our mqtt beater
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
config := config.DefaultConfig
if err := cfg.Unpack(&config); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}

bt := &Mqttbeat{
done: make(chan struct{}),
done: make(chan struct{}),
beatConfig: config,
}
setupMqttClient(bt)
Expand All @@ -54,18 +55,16 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
logp.Info("MQTT Client connected")

subscriptions := ParseTopics(bt.beatConfig.TopicsSubscribe)
//bt.beatConfig.TopicsSubscribe
//bt.beatConfig.TopicsSubscribe

// Mqtt client - Subscribe to every topic in the config file, and bind with message handler
if token := bt.mqttClient.SubscribeMultiple(subscriptions, bt.onMessage);
token.Wait() && token.Error() != nil {
if token := bt.mqttClient.SubscribeMultiple(subscriptions, bt.onMessage); token.Wait() && token.Error() != nil {
panic(token.Error())
}

return bt, nil
}


// Mqtt message handler
func (bt *Mqttbeat) onMessage(client MQTT.Client, msg MQTT.Message) {
logp.Info("MQTT MESSAGE RECEIVED " + string(msg.Payload()))
Expand All @@ -79,15 +78,15 @@ func (bt *Mqttbeat) onMessage(client MQTT.Client, msg MQTT.Message) {
event["payload"] = msg.Payload()
}

event["beat"]= common.MapStr{"index": "mqttbeat", "type":"message"}
event["beat"] = common.MapStr{"index": "mqttbeat", "type": "message"}
event["@timestamp"] = common.Time(time.Now())
event["topic"] = msg.Topic()
// Finally sending the message to elasticsearch
bt.elasticClient.PublishEvent(event)
logp.Info("Event sent")
}

}

// Run is used to start this beater, once configured and connected
func (bt *Mqttbeat) Run(b *beat.Beat) error {
logp.Info("mqttbeat is running! Hit CTRL-C to stop it.")
bt.elasticClient = b.Publisher.Connect()
Expand All @@ -101,48 +100,50 @@ func (bt *Mqttbeat) Run(b *beat.Beat) error {
}
}

// Stop is used to close this beater
func (bt *Mqttbeat) Stop() {
bt.mqttClient.Disconnect(250)
bt.elasticClient.Close()
close(bt.done)
}

// Helpers
// DecodePayload will try to decode the payload. If every check fails, it will
// return the payload as a string
func DecodePayload(payload []byte) common.MapStr {
event := make(common.MapStr)

// A msgpack payload must be a json-like object
err := msgpack.Unmarshal(payload, &event)
if err == nil {
if err == nil {
logp.Info("Payload decoded - msgpack")
return event
}

err = json.Unmarshal(payload, &event)
if err == nil {
if err == nil {
logp.Info("Payload decoded - json")
return event
}

// default case
event["payload"]= string(payload)
event["payload"] = string(payload)
logp.Info("Payload decoded - text")
return event
}

func ParseTopics(topics []string) map[string]byte{
// ParseTopics will parse the config file and return a map with topic:QoS
func ParseTopics(topics []string) map[string]byte {
subscriptions := make(map[string]byte)
for _, value := range topics{
for _, value := range topics {
// Fist, spliting the string topic?qos
topic, qosStr := strings.Split(value, "?")[0], strings.Split(value, "?")[1]
// Then, parsing the qos to an int
qosInt, err := strconv.ParseInt(qosStr, 10, 0)
if err != nil {
panic("Error parsing topics")
}
// Finally, filling the subscriptions map
// Finally, filling the subscriptions map
subscriptions[topic] = byte(qosInt)
}
fmt.Println(subscriptions)
return subscriptions
}
}
13 changes: 6 additions & 7 deletions beater/mqttbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package beater
import (
"github.com/elastic/beats/libbeat/common"
"gopkg.in/vmihailenco/msgpack.v2"
"testing"
"reflect"
"testing"
)


func TestDecodeMsgpackJson(t *testing.T){
func TestDecodeMsgpackJson(t *testing.T) {

reference := make(common.MapStr)
reference["hello"] = "world"
Expand All @@ -22,7 +21,7 @@ func TestDecodeMsgpackJson(t *testing.T){
}
}

func TestDecodeJson(t *testing.T){
func TestDecodeJson(t *testing.T) {

reference := make(common.MapStr)
reference["hello"] = "world"
Expand All @@ -35,7 +34,7 @@ func TestDecodeJson(t *testing.T){
}
}

func TestDecodeText(t *testing.T){
func TestDecodeText(t *testing.T) {
payload := "Bonjour, monde!"

reference := make(common.MapStr)
Expand All @@ -48,7 +47,7 @@ func TestDecodeText(t *testing.T){
}
}

func TestParseTopic(t *testing.T){
func TestParseTopic(t *testing.T) {
input := []string{"some/topic?0", "some/ohter/topic?2", "final/topic?1"}

reference := map[string]byte{"some/topic": 0, "some/ohter/topic": 2, "final/topic": 1}
Expand All @@ -58,4 +57,4 @@ func TestParseTopic(t *testing.T){
if !reflect.DeepEqual(reference, output) {
t.Error("Not equals")
}
}
}
12 changes: 7 additions & 5 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@ package config

import "time"

// Config represents every needed configuration fields
type Config struct {
BrokerUrl string `config:"broker_url"`
TopicsSubscribe []string `config:"topics_subscribe"`
DecodePaylod bool `config:"decode_payload"`
BrokerURL string `config:"broker_url"`
TopicsSubscribe []string `config:"topics_subscribe"`
DecodePaylod bool `config:"decode_payload"`
Period time.Duration `config:"period"`
}

// DefaultConfig will be used if no config file is founded
var DefaultConfig = Config{
BrokerUrl: "tcp://localhost:1883",
BrokerURL: "tcp://localhost:1883",
TopicsSubscribe: []string{"/test/mqttbeat/#?1"},
DecodePaylod: true,
DecodePaylod: true,
}
Loading

0 comments on commit b12d641

Please sign in to comment.