From 0c70ab6158ccfb692d0f51c8e8a13657e2f4269e Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Fri, 20 Oct 2023 13:31:02 +0200 Subject: [PATCH] Refactor MQTT endpoints + Introduce End-to-End encryption using RSA and AES keys + finetune PTZ --- README.md | 4 + machinery/data/config/config.json | 3 +- machinery/src/cloud/Cloud.go | 51 ++-- machinery/src/components/Kerberos.go | 2 +- machinery/src/computervision/main.go | 18 +- machinery/src/config/main.go | 18 ++ machinery/src/encryption/main.go | 148 ++++++++++ machinery/src/models/Communication.go | 2 +- machinery/src/models/Config.go | 9 + machinery/src/models/MQTT.go | 151 ++++++++++ machinery/src/onvif/main.go | 22 +- machinery/src/routers/mqtt/main.go | 397 ++++++++++---------------- machinery/src/webrtc/main.go | 88 +++--- 13 files changed, 610 insertions(+), 303 deletions(-) create mode 100644 machinery/src/encryption/main.go create mode 100644 machinery/src/models/MQTT.go diff --git a/README.md b/README.md index b016b9b9..058373a1 100644 --- a/README.md +++ b/README.md @@ -227,6 +227,10 @@ Next to attaching the configuration file, it is also possible to override the co | `AGENT_KERBEROSVAULT_DIRECTORY` | The directory, in the provider, where the recordings will be stored in. | "" | | `AGENT_DROPBOX_ACCESS_TOKEN` | The Access Token from your Dropbox app, that is used to leverage the Dropbox SDK. | "" | | `AGENT_DROPBOX_DIRECTORY` | The directory, in the provider, where the recordings will be stored in. | "" | +| `AGENT_ENCRYPTION` | Enable 'true' or disable 'false' end-to-end encryption through MQTT (recordings will follow). | "false" | +| `AGENT_ENCRYPTION_FINGERPRINT` | The fingerprint of the keypair (public/private keys), so you know which one to use. | "" | +| `AGENT_ENCRYPTION_PRIVATE_KEY` | The private key (assymetric/RSA) to decryptand sign requests send over MQTT. | "" | +| `AGENT_ENCRYPTION_SYMMETRIC_KEY` | The symmetric key (AES) to encrypt and decrypt request send over MQTT. | "" | ## Contribute with Codespaces diff --git a/machinery/data/config/config.json b/machinery/data/config/config.json index 980cc319..27bc12c8 100644 --- a/machinery/data/config/config.json +++ b/machinery/data/config/config.json @@ -111,5 +111,6 @@ "hub_key": "", "hub_private_key": "", "hub_site": "", - "condition_uri": "" + "condition_uri": "", + "encryption": {} } diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index bf0d8810..b2dc551c 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -458,19 +458,17 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod // Allocate frame frame := ffmpeg.AllocVideoFrame() - key := "" + hubKey := "" if config.Cloud == "s3" && config.S3 != nil && config.S3.Publickey != "" { - key = config.S3.Publickey + hubKey = config.S3.Publickey } else if config.Cloud == "kstorage" && config.KStorage != nil && config.KStorage.CloudKey != "" { - key = config.KStorage.CloudKey + hubKey = config.KStorage.CloudKey } // This is the new way ;) if config.HubKey != "" { - key = config.HubKey + hubKey = config.HubKey } - topic := "kerberos/" + key + "/device/" + config.Key + "/live" - lastLivestreamRequest := int64(0) var cursorError error @@ -491,7 +489,27 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod continue } log.Log.Info("HandleLiveStreamSD: Sending base64 encoded images to MQTT.") - sendImage(frame, topic, mqttClient, pkt, decoder, decoderMutex) + _, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex) + if err == nil { + bytes, _ := computervision.ImageToBytes(&frame.Image) + encoded := base64.StdEncoding.EncodeToString(bytes) + + valueMap := make(map[string]interface{}) + valueMap["image"] = encoded + message := models.Message{ + Payload: models.Payload{ + Action: "receive-sd-stream", + DeviceId: configuration.Config.Key, + Value: valueMap, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } else { + log.Log.Info("HandleRequestConfig: something went wrong while sending acknowledge config to hub: " + string(payload)) + } + } } // Cleanup the frame. @@ -505,15 +523,6 @@ func HandleLiveStreamSD(livestreamCursor *pubsub.QueueCursor, configuration *mod log.Log.Debug("HandleLiveStreamSD: finished") } -func sendImage(frame *ffmpeg.VideoFrame, topic string, mqttClient mqtt.Client, pkt av.Packet, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) { - _, err := computervision.GetRawImage(frame, pkt, decoder, decoderMutex) - if err == nil { - bytes, _ := computervision.ImageToBytes(&frame.Image) - encoded := base64.StdEncoding.EncodeToString(bytes) - mqttClient.Publish(topic, 0, false, encoded) - } -} - func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, codecs []av.CodecData, decoder *ffmpeg.VideoDecoder, decoderMutex *sync.Mutex) { config := configuration.Config @@ -532,25 +541,23 @@ func HandleLiveStreamHD(livestreamCursor *pubsub.QueueCursor, configuration *mod if config.Capture.ForwardWebRTC == "true" { // We get a request with an offer, but we'll forward it. - for m := range communication.HandleLiveHDHandshake { + /*for m := range communication.HandleLiveHDHandshake { // Forward SDP m.CloudKey = config.Key request, err := json.Marshal(m) if err == nil { mqttClient.Publish("kerberos/webrtc/request", 2, false, request) } - } + }*/ } else { log.Log.Info("HandleLiveStreamHD: Waiting for peer connections.") for handshake := range communication.HandleLiveHDHandshake { log.Log.Info("HandleLiveStreamHD: setting up a peer connection.") - key := config.Key + "/" + handshake.Cuuid - webrtc.CandidatesMutex.Lock() + key := config.Key + "/" + handshake.SessionID _, ok := webrtc.CandidateArrays[key] if !ok { - webrtc.CandidateArrays[key] = make(chan string, 30) + webrtc.CandidateArrays[key] = make(chan string) } - webrtc.CandidatesMutex.Unlock() webrtc.InitializeWebRTCConnection(configuration, communication, mqttClient, videoTrack, audioTrack, handshake, webrtc.CandidateArrays[key]) } diff --git a/machinery/src/components/Kerberos.go b/machinery/src/components/Kerberos.go index 67956837..947e2d68 100644 --- a/machinery/src/components/Kerberos.go +++ b/machinery/src/components/Kerberos.go @@ -264,7 +264,7 @@ func RunAgent(configDirectory string, configuration *models.Configuration, commu } // Handle livestream HD (high resolution over WEBRTC) - communication.HandleLiveHDHandshake = make(chan models.SDPPayload, 1) + communication.HandleLiveHDHandshake = make(chan models.RequestHDStreamPayload, 1) if subStreamEnabled { livestreamHDCursor := subQueue.Latest() go cloud.HandleLiveStreamHD(livestreamHDCursor, configuration, communication, mqttClient, subStreams, subDecoder, &decoderMutex) diff --git a/machinery/src/computervision/main.go b/machinery/src/computervision/main.go index 5b0f8bbb..664a8a15 100644 --- a/machinery/src/computervision/main.go +++ b/machinery/src/computervision/main.go @@ -169,9 +169,23 @@ func ProcessMotion(motionCursor *pubsub.QueueCursor, configuration *models.Confi if config.Offline != "true" { if mqttClient != nil { if hubKey != "" { - mqttClient.Publish("kerberos/"+hubKey+"/device/"+deviceKey+"/motion", 2, false, "motion") + message := models.Message{ + Payload: models.Payload{ + Action: "motion", + DeviceId: configuration.Config.Key, + Value: map[string]interface{}{ + "timestamp": time.Now().Unix(), + }, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } else { + log.Log.Info("ProcessMotion: failed to package MQTT message: " + err.Error()) + } } else { - mqttClient.Publish("kerberos/device/"+deviceKey+"/motion", 2, false, "motion") + mqttClient.Publish("kerberos/agent/"+deviceKey, 2, false, "motion") } } } diff --git a/machinery/src/config/main.go b/machinery/src/config/main.go index dc5805e9..03b6bda4 100644 --- a/machinery/src/config/main.go +++ b/machinery/src/config/main.go @@ -461,6 +461,24 @@ func OverrideWithEnvironmentVariables(configuration *models.Configuration) { case "AGENT_DROPBOX_DIRECTORY": configuration.Config.Dropbox.Directory = value break + + /* When encryption is enabled */ + case "AGENT_ENCRYPTION": + if value == "true" { + configuration.Config.Encryption.Enabled = true + } else { + configuration.Config.Encryption.Enabled = false + } + break + case "AGENT_ENCRYPTION_FINGERPRINT": + configuration.Config.Encryption.Fingerprint = value + break + case "AGENT_ENCRYPTION_PRIVATE_KEY": + configuration.Config.Encryption.PrivateKey = value + break + case "AGENT_ENCRYPTION_SYMMETRIC_KEY": + configuration.Config.Encryption.SymmetricKey = value + break } } } diff --git a/machinery/src/encryption/main.go b/machinery/src/encryption/main.go new file mode 100644 index 00000000..6713fb95 --- /dev/null +++ b/machinery/src/encryption/main.go @@ -0,0 +1,148 @@ +package encryption + +import ( + "bytes" + "crypto" + "crypto/aes" + "crypto/cipher" + "crypto/md5" + "crypto/rand" + "crypto/rsa" + "crypto/sha256" + "encoding/base64" + "errors" + "hash" +) + +// DecryptWithPrivateKey decrypts data with private key +func DecryptWithPrivateKey(ciphertext string, privateKey *rsa.PrivateKey) ([]byte, error) { + + // decode our encrypted string into cipher bytes + cipheredValue, _ := base64.StdEncoding.DecodeString(ciphertext) + + // decrypt the data + out, err := rsa.DecryptPKCS1v15(nil, privateKey, cipheredValue) + + return out, err +} + +// SignWithPrivateKey signs data with private key +func SignWithPrivateKey(data []byte, privateKey *rsa.PrivateKey) ([]byte, error) { + + // hash the data with sha256 + hashed := sha256.Sum256(data) + + // sign the data + signature, err := rsa.SignPKCS1v15(nil, privateKey, crypto.SHA256, hashed[:]) + + return signature, err +} + +func AesEncrypt(content string, password string) (string, error) { + salt := make([]byte, 8) + _, err := rand.Read(salt) + if err != nil { + return "", err + } + key, iv, err := DefaultEvpKDF([]byte(password), salt) + + block, err := aes.NewCipher(key) + if err != nil { + return "", err + } + + mode := cipher.NewCBCEncrypter(block, iv) + cipherBytes := PKCS5Padding([]byte(content), aes.BlockSize) + mode.CryptBlocks(cipherBytes, cipherBytes) + + data := make([]byte, 16+len(cipherBytes)) + copy(data[:8], []byte("Salted__")) + copy(data[8:16], salt) + copy(data[16:], cipherBytes) + + cipherText := base64.StdEncoding.EncodeToString(data) + return cipherText, nil +} + +func AesDecrypt(cipherText string, password string) (string, error) { + data, err := base64.StdEncoding.DecodeString(cipherText) + if err != nil { + return "", err + } + if string(data[:8]) != "Salted__" { + return "", errors.New("invalid crypto js aes encryption") + } + + salt := data[8:16] + cipherBytes := data[16:] + key, iv, err := DefaultEvpKDF([]byte(password), salt) + if err != nil { + return "", err + } + + block, err := aes.NewCipher(key) + if err != nil { + return "", err + } + + mode := cipher.NewCBCDecrypter(block, iv) + mode.CryptBlocks(cipherBytes, cipherBytes) + + result := PKCS5UnPadding(cipherBytes) + return string(result), nil +} + +// https://stackoverflow.com/questions/27677236/encryption-in-javascript-and-decryption-with-php/27678978#27678978 +// https://github.com/brix/crypto-js/blob/8e6d15bf2e26d6ff0af5277df2604ca12b60a718/src/evpkdf.js#L55 +func EvpKDF(password []byte, salt []byte, keySize int, iterations int, hashAlgorithm string) ([]byte, error) { + var block []byte + var hasher hash.Hash + derivedKeyBytes := make([]byte, 0) + switch hashAlgorithm { + case "md5": + hasher = md5.New() + default: + return []byte{}, errors.New("not implement hasher algorithm") + } + for len(derivedKeyBytes) < keySize*4 { + if len(block) > 0 { + hasher.Write(block) + } + hasher.Write(password) + hasher.Write(salt) + block = hasher.Sum([]byte{}) + hasher.Reset() + + for i := 1; i < iterations; i++ { + hasher.Write(block) + block = hasher.Sum([]byte{}) + hasher.Reset() + } + derivedKeyBytes = append(derivedKeyBytes, block...) + } + return derivedKeyBytes[:keySize*4], nil +} + +func DefaultEvpKDF(password []byte, salt []byte) (key []byte, iv []byte, err error) { + // https://github.com/brix/crypto-js/blob/8e6d15bf2e26d6ff0af5277df2604ca12b60a718/src/cipher-core.js#L775 + keySize := 256 / 32 + ivSize := 128 / 32 + derivedKeyBytes, err := EvpKDF(password, salt, keySize+ivSize, 1, "md5") + if err != nil { + return []byte{}, []byte{}, err + } + return derivedKeyBytes[:keySize*4], derivedKeyBytes[keySize*4:], nil +} + +// https://stackoverflow.com/questions/41579325/golang-how-do-i-decrypt-with-des-cbc-and-pkcs7 +func PKCS5UnPadding(src []byte) []byte { + length := len(src) + unpadding := int(src[length-1]) + return src[:(length - unpadding)] +} + +func PKCS5Padding(src []byte, blockSize int) []byte { + padding := blockSize - len(src)%blockSize + padtext := bytes.Repeat([]byte{byte(padding)}, padding) + return append(src, padtext...) +} diff --git a/machinery/src/models/Communication.go b/machinery/src/models/Communication.go index 93c526d2..8c547464 100644 --- a/machinery/src/models/Communication.go +++ b/machinery/src/models/Communication.go @@ -26,7 +26,7 @@ type Communication struct { HandleHeartBeat chan string HandleLiveSD chan int64 HandleLiveHDKeepalive chan string - HandleLiveHDHandshake chan SDPPayload + HandleLiveHDHandshake chan RequestHDStreamPayload HandleLiveHDPeers chan string HandleONVIF chan OnvifAction IsConfiguring *abool.AtomicBool diff --git a/machinery/src/models/Config.go b/machinery/src/models/Config.go index 56de8fdb..397ac85d 100644 --- a/machinery/src/models/Config.go +++ b/machinery/src/models/Config.go @@ -42,6 +42,7 @@ type Config struct { HubPrivateKey string `json:"hub_private_key" bson:"hub_private_key"` HubSite string `json:"hub_site" bson:"hub_site"` ConditionURI string `json:"condition_uri" bson:"condition_uri"` + Encryption *Encryption `json:"encryption" bson:"encryption"` } // Capture defines which camera type (Id) you are using (IP, USB or Raspberry Pi camera), @@ -157,3 +158,11 @@ type Dropbox struct { AccessToken string `json:"access_token,omitempty" bson:"access_token,omitempty"` Directory string `json:"directory,omitempty" bson:"directory,omitempty"` } + +// Encryption +type Encryption struct { + Enabled bool `json:"enabled" bson:"enabled"` + Fingerprint string `json:"fingerprint" bson:"fingerprint"` + PrivateKey string `json:"private_key" bson:"private_key"` + SymmetricKey string `json:"symmetric_key" bson:"symmetric_key"` +} diff --git a/machinery/src/models/MQTT.go b/machinery/src/models/MQTT.go new file mode 100644 index 00000000..febcff2d --- /dev/null +++ b/machinery/src/models/MQTT.go @@ -0,0 +1,151 @@ +package models + +import ( + "crypto/rsa" + "crypto/x509" + "encoding/base64" + "encoding/json" + "encoding/pem" + "io/ioutil" + "strings" + "time" + + "github.com/gofrs/uuid" + "github.com/kerberos-io/agent/machinery/src/encryption" + "github.com/kerberos-io/agent/machinery/src/log" +) + +func PackageMQTTMessage(configuration *Configuration, msg Message) ([]byte, error) { + // Create a Version 4 UUID. + u2, err := uuid.NewV4() + if err != nil { + log.Log.Error("failed to generate UUID: " + err.Error()) + } + + // We'll generate an unique id, and encrypt / decrypt it using the private key if available. + msg.Mid = u2.String() + msg.DeviceId = msg.Payload.DeviceId + msg.Timestamp = time.Now().Unix() + + // At the moment we don't do the encryption part, but we'll implement it + // once the legacy methods (subscriptions are moved). + msg.Encrypted = false + if configuration.Config.Encryption.Enabled { + msg.Encrypted = true + } + msg.PublicKey = "" + msg.Fingerprint = "" + + if msg.Encrypted { + pload := msg.Payload + + // Pload to base64 + data, err := json.Marshal(pload) + if err != nil { + log.Log.Error("failed to marshal payload: " + err.Error()) + } + + // Encrypt the value + privateKey := configuration.Config.Encryption.PrivateKey + r := strings.NewReader(privateKey) + pemBytes, _ := ioutil.ReadAll(r) + block, _ := pem.Decode(pemBytes) + if block == nil { + log.Log.Error("MQTTListenerHandler: error decoding PEM block containing private key") + } else { + // Parse private key + b := block.Bytes + key, err := x509.ParsePKCS8PrivateKey(b) + if err != nil { + log.Log.Error("MQTTListenerHandler: error parsing private key: " + err.Error()) + } + + // Conver key to *rsa.PrivateKey + rsaKey, _ := key.(*rsa.PrivateKey) + + // Create a 16bit key random + k := configuration.Config.Encryption.SymmetricKey + encryptedValue, err := encryption.AesEncrypt(string(data), k) + + // Sign the encrypted value + signature, err := encryption.SignWithPrivateKey([]byte(encryptedValue), rsaKey) + base64Signature := base64.StdEncoding.EncodeToString(signature) + + msg.Payload.EncryptedValue = encryptedValue + msg.Payload.Signature = base64Signature + msg.Payload.Value = make(map[string]interface{}) + } + } + + payload, err := json.Marshal(msg) + return payload, err +} + +// The message structure which is used to send over +// and receive messages from the MQTT broker +type Message struct { + Mid string `json:"mid"` + DeviceId string `json:"device_id"` + Timestamp int64 `json:"timestamp"` + Encrypted bool `json:"encrypted"` + PublicKey string `json:"public_key"` + Fingerprint string `json:"fingerprint"` + Payload Payload `json:"payload"` +} + +// The payload structure which is used to send over +// and receive messages from the MQTT broker +type Payload struct { + Action string `json:"action"` + DeviceId string `json:"device_id"` + Signature string `json:"signature"` + EncryptedValue string `json:"encrypted_value"` + Value map[string]interface{} `json:"value"` +} + +// We received a recording request, we'll send it to the motion handler. +type RecordPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp of the recording request. +} + +// We received a preset position request, we'll request it through onvif and send it back. +type PTZPositionPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp of the preset request. +} + +// We received a request config request, we'll fetch the current config and send it back. +type RequestConfigPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp of the preset request. +} + +// We received a update config request, we'll update the current config and send a confirmation back. +type UpdateConfigPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp of the preset request. + Config Config `json:"config"` +} + +// We received a request SD stream request +type RequestSDStreamPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp +} + +// We received a request HD stream request +type RequestHDStreamPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp + HubKey string `json:"hub_key"` // hub key + SessionID string `json:"session_id"` // session id + SessionDescription string `json:"session_description"` // session description +} + +// We received a receive HD candidates request +type ReceiveHDCandidatesPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp + SessionID string `json:"session_id"` // session id + Candidate string `json:"candidate"` // candidate +} + +type NavigatePTZPayload struct { + Timestamp int64 `json:"timestamp"` // timestamp + DeviceId string `json:"device_id"` // device id + Action string `json:"action"` // action +} diff --git a/machinery/src/onvif/main.go b/machinery/src/onvif/main.go index 46c70533..c6b90d06 100644 --- a/machinery/src/onvif/main.go +++ b/machinery/src/onvif/main.go @@ -391,7 +391,7 @@ func ZoomOutCompletely(device *onvif.Device, configuration ptz.GetConfigurations func PanUntilPosition(device *onvif.Device, configuration ptz.GetConfigurationsResponse, token xsd.ReferenceToken, pan float64, zoom float64, speed float64, wait time.Duration) error { position, err := GetPosition(device, token) - if position.PanTilt.X >= pan-0.005 && position.PanTilt.X <= pan+0.005 { + if position.PanTilt.X >= pan-0.01 && position.PanTilt.X <= pan+0.01 { } else { @@ -423,9 +423,15 @@ func PanUntilPosition(device *onvif.Device, configuration ptz.GetConfigurationsR // While moving we'll check if we reached the desired position. // or if we overshot the desired position. + + // Break after 3seconds + now := time.Now() for { position, _ := GetPosition(device, token) - if position.PanTilt.X == -1 || position.PanTilt.X == 1 || (directionX > 0 && position.PanTilt.X >= pan) || (directionX < 0 && position.PanTilt.X <= pan) || (position.PanTilt.X >= pan-0.005 && position.PanTilt.X <= pan+0.005) { + if position.PanTilt.X == -1 || position.PanTilt.X == 1 || (directionX > 0 && position.PanTilt.X >= pan) || (directionX < 0 && position.PanTilt.X <= pan) || (position.PanTilt.X >= pan-0.01 && position.PanTilt.X <= pan+0.01) { + break + } + if time.Since(now) > 3*time.Second { break } time.Sleep(wait) @@ -479,11 +485,17 @@ func TiltUntilPosition(device *onvif.Device, configuration ptz.GetConfigurations // While moving we'll check if we reached the desired position. // or if we overshot the desired position. + + // Break after 3seconds + now := time.Now() for { position, _ := GetPosition(device, token) if position.PanTilt.Y == -1 || position.PanTilt.Y == 1 || (directionY > 0 && position.PanTilt.Y >= tilt) || (directionY < 0 && position.PanTilt.Y <= tilt) || (position.PanTilt.Y >= tilt-0.005 && position.PanTilt.Y <= tilt+0.005) { break } + if time.Since(now) > 3*time.Second { + break + } time.Sleep(wait) } @@ -534,11 +546,17 @@ func ZoomUntilPosition(device *onvif.Device, configuration ptz.GetConfigurations // While moving we'll check if we reached the desired position. // or if we overshot the desired position. + + // Break after 3seconds + now := time.Now() for { position, _ := GetPosition(device, token) if position.Zoom.X == -1 || position.Zoom.X == 1 || (directionZ > 0 && position.Zoom.X >= zoom) || (directionZ < 0 && position.Zoom.X <= zoom) || (position.Zoom.X >= zoom-0.005 && position.Zoom.X <= zoom+0.005) { break } + if time.Since(now) > 3*time.Second { + break + } time.Sleep(wait) } diff --git a/machinery/src/routers/mqtt/main.go b/machinery/src/routers/mqtt/main.go index 01e320af..b75bf92a 100644 --- a/machinery/src/routers/mqtt/main.go +++ b/machinery/src/routers/mqtt/main.go @@ -1,40 +1,26 @@ package mqtt import ( + "crypto/rsa" + "crypto/x509" "encoding/json" + "encoding/pem" "fmt" + "io/ioutil" "math/rand" "strconv" + "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" - "github.com/gofrs/uuid" configService "github.com/kerberos-io/agent/machinery/src/config" + "github.com/kerberos-io/agent/machinery/src/encryption" "github.com/kerberos-io/agent/machinery/src/log" "github.com/kerberos-io/agent/machinery/src/models" "github.com/kerberos-io/agent/machinery/src/onvif" "github.com/kerberos-io/agent/machinery/src/webrtc" ) -// The message structure which is used to send over -// and receive messages from the MQTT broker -type Message struct { - Mid string `json:"mid"` - Timestamp int64 `json:"timestamp"` - Encrypted bool `json:"encrypted"` - PublicKey string `json:"public_key"` - Fingerprint string `json:"fingerprint"` - Payload Payload `json:"payload"` -} - -// The payload structure which is used to send over -// and receive messages from the MQTT broker -type Payload struct { - Action string `json:"action"` - DeviceId string `json:"device_id"` - Value map[string]interface{} `json:"value"` -} - // We'll cache the MQTT settings to know if we need to reinitialize the MQTT client connection. // If we update the configuration but no new MQTT settings are provided, we don't need to restart it. var PREV_MQTTURI string @@ -56,58 +42,15 @@ func HasMQTTClientModified(configuration *models.Configuration) bool { return false } -func PackageMQTTMessage(msg Message) ([]byte, error) { - // Create a Version 4 UUID. - u2, err := uuid.NewV4() - if err != nil { - log.Log.Error("failed to generate UUID: " + err.Error()) - } - - // We'll generate an unique id, and encrypt / decrypt it using the private key if available. - msg.Mid = u2.String() - msg.Timestamp = time.Now().Unix() - - // At the moment we don't do the encryption part, but we'll implement it - // once the legacy methods (subscriptions are moved). - msg.Encrypted = false - msg.PublicKey = "" - msg.Fingerprint = "" - - payload, err := json.Marshal(msg) - return payload, err -} - // Configuring MQTT to subscribe for various bi-directional messaging // Listen and reply (a generic method to share and retrieve information) // -// !!! NEW METHOD TO COMMUNICATE: only create a single subscription for all communication. -// and an additional publish messages back -// // - [SUBSCRIPTION] kerberos/agent/{hubkey} (hub -> agent) // - [PUBLISH] kerberos/hub/{hubkey} (agent -> hub) // // !!! LEGACY METHODS BELOW, WE SHOULD LEVERAGE THE ABOVE METHOD! -// -// [SUBSCRIPTIONS] -// -// SD Streaming (Base64 JPEGs) -// - kerberos/{hubkey}/device/{devicekey}/request-live: use for polling of SD live streaming (as long the user requests stream, we'll send JPEGs over). -// -// HD Streaming (WebRTC) -// - kerberos/register: use for receiving HD live streaming requests. -// - candidate/cloud: remote ICE candidates are shared over this line. -// - kerberos/webrtc/keepalivehub/{devicekey}: use for polling of HD streaming (as long the user requests stream, we'll send it over). -// - kerberos/webrtc/peers/{devicekey}: we'll keep track of the number of peers (we can have more than 1 concurrent listeners). -// -// ONVIF capabilities -// - kerberos/onvif/{devicekey}: endpoint to execute ONVIF commands such as (PTZ, Zoom, IO, etc) -// // [PUBlISH] // Next to subscribing to various topics, we'll also publish messages to various topics, find a list of available Publish methods. -// -// - kerberos/webrtc/packets/{devicekey}: use for forwarding WebRTC (RTP Packets) over MQTT -> Complex firewall. -// - kerberos/webrtc/keepalive/{devicekey}: use for keeping alive forwarded WebRTC stream -// - {devicekey}/{sessionid}/answer: once a WebRTC request is received through (kerberos/register), we'll draft an answer and send it back to the remote WebRTC client. // - kerberos/{hubkey}/device/{devicekey}/motion: a motion signal func ConfigureMQTT(configDirectory string, configuration *models.Configuration, communication *models.Communication) mqtt.Client { @@ -187,25 +130,6 @@ func ConfigureMQTT(configDirectory string, configuration *models.Configuration, // Create a susbcription for listen and reply MQTTListenerHandler(c, hubKey, configDirectory, configuration, communication) - - // Legacy methods below -> should be converted to the above method. - // Create a subscription to know if send out a livestream or not. - MQTTListenerHandleLiveSD(c, hubKey, configuration, communication) - - // Create a subscription for the WEBRTC livestream. - MQTTListenerHandleLiveHDHandshake(c, hubKey, configuration, communication) - - // Create a subscription for keeping alive the WEBRTC livestream. - MQTTListenerHandleLiveHDKeepalive(c, hubKey, configuration, communication) - - // Create a subscription to listen to the number of WEBRTC peers. - MQTTListenerHandleLiveHDPeers(c, hubKey, configuration, communication) - - // Create a subscription to listen for WEBRTC candidates. - MQTTListenerHandleLiveHDCandidates(c, hubKey, configuration, communication) - - // Create a susbcription to listen for ONVIF actions: e.g. PTZ, Zoom, etc. - MQTTListenerHandleONVIF(c, hubKey, configuration, communication) } } mqc := mqtt.NewClient(opts) @@ -236,57 +160,99 @@ func MQTTListenerHandler(mqttClient mqtt.Client, hubKey string, configDirectory // payload: Payload, "a json object which might be encrypted" // } - var message Message + var message models.Message json.Unmarshal(msg.Payload(), &message) - if message.Mid != "" && message.Timestamp != 0 { + // We will receive all messages from our hub, so we'll need to filter to the relevant device. + if message.Mid != "" && message.Timestamp != 0 && message.DeviceId == configuration.Config.Key { // Messages might be encrypted, if so we'll // need to decrypt them. - var payload Payload + var payload models.Payload if message.Encrypted { - // We'll find out the key we use to decrypt the message. - // TODO -> still needs to be implemented. - // Use to fingerprint to act accordingly. + encryptedValue := message.Payload.EncryptedValue + if len(encryptedValue) > 0 { + symmetricKey := configuration.Config.Encryption.SymmetricKey + privateKey := configuration.Config.Encryption.PrivateKey + r := strings.NewReader(privateKey) + pemBytes, _ := ioutil.ReadAll(r) + block, _ := pem.Decode(pemBytes) + if block == nil { + log.Log.Error("MQTTListenerHandler: error decoding PEM block containing private key") + return + } else { + // Parse private key + b := block.Bytes + key, err := x509.ParsePKCS8PrivateKey(b) + if err != nil { + log.Log.Error("MQTTListenerHandler: error parsing private key: " + err.Error()) + return + } else { + // Conver key to *rsa.PrivateKey + rsaKey, _ := key.(*rsa.PrivateKey) + + // Get encrypted key from message, delimited by ::: + encryptedKey := strings.Split(encryptedValue, ":::")[0] // encrypted with RSA + encryptedValue := strings.Split(encryptedValue, ":::")[1] // encrypted with AES + // Convert encrypted value to []byte + decryptedKey, err := encryption.DecryptWithPrivateKey(encryptedKey, rsaKey) + if decryptedKey != nil { + if string(decryptedKey) == symmetricKey { + // Decrypt value with decryptedKey + decryptedValue, err := encryption.AesDecrypt(encryptedValue, string(decryptedKey)) + if err != nil { + log.Log.Error("MQTTListenerHandler: error decrypting message: " + err.Error()) + return + } + json.Unmarshal([]byte(decryptedValue), &payload) + } else { + log.Log.Error("MQTTListenerHandler: error decrypting message, assymetric keys do not match.") + return + } + } else if err != nil { + log.Log.Error("MQTTListenerHandler: error decrypting message: " + err.Error()) + return + } + } + } + } } else { payload = message.Payload } - // We will receive all messages from our hub, so we'll need to filter to the relevant device. - if payload.DeviceId != configuration.Config.Key { - // Not relevant for this device, so we'll ignore it. - } else { - // We'll find out which message we received, and act accordingly. - log.Log.Info("MQTTListenerHandler: received message with action: " + payload.Action) - - switch payload.Action { - case "record": - HandleRecording(mqttClient, hubKey, payload, configuration, communication) - case "get-ptz-position": - HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication) - case "update-ptz-position": - HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication) - case "request-config": - HandleRequestConfig(mqttClient, hubKey, payload, configuration, communication) - case "update-config": - HandleUpdateConfig(mqttClient, hubKey, payload, configDirectory, configuration, communication) - } + // We'll find out which message we received, and act accordingly. + log.Log.Info("MQTTListenerHandler: received message with action: " + payload.Action) + switch payload.Action { + case "record": + go HandleRecording(mqttClient, hubKey, payload, configuration, communication) + case "get-ptz-position": + go HandleGetPTZPosition(mqttClient, hubKey, payload, configuration, communication) + case "update-ptz-position": + go HandleUpdatePTZPosition(mqttClient, hubKey, payload, configuration, communication) + case "navigate-ptz": + go HandleNavigatePTZ(mqttClient, hubKey, payload, configuration, communication) + case "request-config": + go HandleRequestConfig(mqttClient, hubKey, payload, configuration, communication) + case "update-config": + go HandleUpdateConfig(mqttClient, hubKey, payload, configDirectory, configuration, communication) + case "request-sd-stream": + go HandleRequestSDStream(mqttClient, hubKey, payload, configuration, communication) + case "request-hd-stream": + go HandleRequestHDStream(mqttClient, hubKey, payload, configuration, communication) + case "receive-hd-candidates": + go HandleReceiveHDCandidates(mqttClient, hubKey, payload, configuration, communication) } + } }) } } -// We received a recording request, we'll send it to the motion handler. -type RecordPayload struct { - Timestamp int64 `json:"timestamp"` // timestamp of the recording request. -} - -func HandleRecording(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) { +func HandleRecording(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to RecordPayload jsonData, _ := json.Marshal(value) - var recordPayload RecordPayload + var recordPayload models.RecordPayload json.Unmarshal(jsonData, &recordPayload) if recordPayload.Timestamp != 0 { @@ -297,17 +263,12 @@ func HandleRecording(mqttClient mqtt.Client, hubKey string, payload Payload, con } } -// We received a preset position request, we'll request it through onvif and send it back. -type PTZPositionPayload struct { - Timestamp int64 `json:"timestamp"` // timestamp of the preset request. -} - -func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) { +func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to PTZPositionPayload jsonData, _ := json.Marshal(value) - var positionPayload PTZPositionPayload + var positionPayload models.PTZPositionPayload json.Unmarshal(jsonData, &positionPayload) if positionPayload.Timestamp != 0 { @@ -318,8 +279,8 @@ func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload } else { // Needs to wrapped! posString := fmt.Sprintf("%f,%f,%f", pos.PanTilt.X, pos.PanTilt.Y, pos.Zoom.X) - message := Message{ - Payload: Payload{ + message := models.Message{ + Payload: models.Payload{ Action: "ptz-position", DeviceId: configuration.Config.Key, Value: map[string]interface{}{ @@ -328,7 +289,7 @@ func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload }, }, } - payload, err := PackageMQTTMessage(message) + payload, err := models.PackageMQTTMessage(configuration, message) if err == nil { mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) } else { @@ -338,7 +299,7 @@ func HandleGetPTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload } } -func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) { +func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to PTZPositionPayload @@ -356,17 +317,12 @@ func HandleUpdatePTZPosition(mqttClient mqtt.Client, hubKey string, payload Payl } } -// We received a request config request, we'll fetch the current config and send it back. -type RequestConfigPayload struct { - Timestamp int64 `json:"timestamp"` // timestamp of the preset request. -} - -func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload Payload, configuration *models.Configuration, communication *models.Communication) { +func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to RequestConfigPayload jsonData, _ := json.Marshal(value) - var configPayload RequestConfigPayload + var configPayload models.RequestConfigPayload json.Unmarshal(jsonData, &configPayload) if configPayload.Timestamp != 0 { @@ -381,14 +337,14 @@ func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload Payload, inrec, _ := json.Marshal(configuration.Config) json.Unmarshal(inrec, &configMap) - message := Message{ - Payload: Payload{ + message := models.Message{ + Payload: models.Payload{ Action: "receive-config", DeviceId: configuration.Config.Key, Value: configMap, }, } - payload, err := PackageMQTTMessage(message) + payload, err := models.PackageMQTTMessage(configuration, message) if err == nil { mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) } else { @@ -403,18 +359,12 @@ func HandleRequestConfig(mqttClient mqtt.Client, hubKey string, payload Payload, } } -// We received a update config request, we'll update the current config and send a confirmation back. -type UpdateConfigPayload struct { - Timestamp int64 `json:"timestamp"` // timestamp of the preset request. - Config models.Config `json:"config"` -} - -func HandleUpdateConfig(mqttClient mqtt.Client, hubKey string, payload Payload, configDirectory string, configuration *models.Configuration, communication *models.Communication) { +func HandleUpdateConfig(mqttClient mqtt.Client, hubKey string, payload models.Payload, configDirectory string, configuration *models.Configuration, communication *models.Communication) { value := payload.Value // Convert map[string]interface{} to UpdateConfigPayload jsonData, _ := json.Marshal(value) - var configPayload UpdateConfigPayload + var configPayload models.UpdateConfigPayload json.Unmarshal(jsonData, &configPayload) if configPayload.Timestamp != 0 { @@ -423,14 +373,13 @@ func HandleUpdateConfig(mqttClient mqtt.Client, hubKey string, payload Payload, err := configService.SaveConfig(configDirectory, config, configuration, communication) if err == nil { log.Log.Info("HandleUpdateConfig: Config updated") - - message := Message{ - Payload: Payload{ + message := models.Message{ + Payload: models.Payload{ Action: "acknowledge-update-config", DeviceId: configuration.Config.Key, }, } - payload, err := PackageMQTTMessage(message) + payload, err := models.PackageMQTTMessage(configuration, message) if err == nil { mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) } else { @@ -442,129 +391,93 @@ func HandleUpdateConfig(mqttClient mqtt.Client, hubKey string, payload Payload, } } -func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) { - if mqttClient != nil { - // Cleanup all subscriptions - // New methods - mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey) - - // Legacy methods - mqttClient.Unsubscribe("kerberos/" + PREV_HubKey + "/device/" + PREV_AgentKey + "/request-live") - mqttClient.Unsubscribe(PREV_AgentKey + "/register") - mqttClient.Unsubscribe("kerberos/webrtc/keepalivehub/" + PREV_AgentKey) - mqttClient.Unsubscribe("kerberos/webrtc/peers/" + PREV_AgentKey) - mqttClient.Unsubscribe("candidate/cloud") - mqttClient.Unsubscribe("kerberos/onvif/" + PREV_AgentKey) - - mqttClient.Disconnect(1000) - mqttClient = nil - log.Log.Info("DisconnectMQTT: MQTT client disconnected.") - } -} - -// ################################################################################################# -// Below you'll find legacy methods, as of now we'll have a single subscription, which scales better +func HandleRequestSDStream(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { + value := payload.Value + // Convert map[string]interface{} to RequestSDStreamPayload + jsonData, _ := json.Marshal(value) + var requestSDStreamPayload models.RequestSDStreamPayload + json.Unmarshal(jsonData, &requestSDStreamPayload) -func MQTTListenerHandleLiveSD(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicRequest := "kerberos/" + hubKey + "/device/" + config.Key + "/request-live" - mqttClient.Subscribe(topicRequest, 0, func(c mqtt.Client, msg mqtt.Message) { + if requestSDStreamPayload.Timestamp != 0 { if communication.CameraConnected { select { case communication.HandleLiveSD <- time.Now().Unix(): default: } - log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream.") + log.Log.Info("HandleRequestSDStream: received request to livestream.") } else { - log.Log.Info("MQTTListenerHandleLiveSD: received request to livestream, but camera is not connected.") + log.Log.Info("HandleRequestSDStream: received request to livestream, but camera is not connected.") } - msg.Ack() - }) + } } -func MQTTListenerHandleLiveHDHandshake(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicRequestWebRtc := config.Key + "/register" - mqttClient.Subscribe(topicRequestWebRtc, 0, func(c mqtt.Client, msg mqtt.Message) { +func HandleRequestHDStream(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { + value := payload.Value + // Convert map[string]interface{} to RequestHDStreamPayload + jsonData, _ := json.Marshal(value) + var requestHDStreamPayload models.RequestHDStreamPayload + json.Unmarshal(jsonData, &requestHDStreamPayload) + + if requestHDStreamPayload.Timestamp != 0 { if communication.CameraConnected { - var sdp models.SDPPayload - json.Unmarshal(msg.Payload(), &sdp) + // Set the Hub key, so we can send back the answer. + requestHDStreamPayload.HubKey = hubKey select { - case communication.HandleLiveHDHandshake <- sdp: + case communication.HandleLiveHDHandshake <- requestHDStreamPayload: default: } - log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc.") + log.Log.Info("HandleRequestHDStream: received request to setup webrtc.") } else { - log.Log.Info("MQTTListenerHandleLiveHDHandshake: received request to setup webrtc, but camera is not connected.") + log.Log.Info("HandleRequestHDStream: received request to setup webrtc, but camera is not connected.") } - msg.Ack() - }) + } } -func MQTTListenerHandleLiveHDKeepalive(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicKeepAlive := fmt.Sprintf("kerberos/webrtc/keepalivehub/%s", config.Key) - mqttClient.Subscribe(topicKeepAlive, 0, func(c mqtt.Client, msg mqtt.Message) { - if communication.CameraConnected { - alive := string(msg.Payload()) - communication.HandleLiveHDKeepalive <- alive - log.Log.Info("MQTTListenerHandleLiveHDKeepalive: Received keepalive: " + alive) - } else { - log.Log.Info("MQTTListenerHandleLiveHDKeepalive: received keepalive, but camera is not connected.") - } - }) -} +func HandleReceiveHDCandidates(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { + value := payload.Value + // Convert map[string]interface{} to ReceiveHDCandidatesPayload + jsonData, _ := json.Marshal(value) + var receiveHDCandidatesPayload models.ReceiveHDCandidatesPayload + json.Unmarshal(jsonData, &receiveHDCandidatesPayload) -func MQTTListenerHandleLiveHDPeers(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicPeers := fmt.Sprintf("kerberos/webrtc/peers/%s", config.Key) - mqttClient.Subscribe(topicPeers, 0, func(c mqtt.Client, msg mqtt.Message) { + if receiveHDCandidatesPayload.Timestamp != 0 { if communication.CameraConnected { - peerCount := string(msg.Payload()) - communication.HandleLiveHDPeers <- peerCount - log.Log.Info("MQTTListenerHandleLiveHDPeers: Number of peers listening: " + peerCount) + channel := webrtc.CandidateArrays[receiveHDCandidatesPayload.SessionID] + log.Log.Info("HandleReceiveHDCandidates: " + receiveHDCandidatesPayload.Candidate) + channel <- receiveHDCandidatesPayload.Candidate } else { - log.Log.Info("MQTTListenerHandleLiveHDPeers: received peer count, but camera is not connected.") + log.Log.Info("HandleReceiveHDCandidates: received candidate, but camera is not connected.") } - }) + } } -func MQTTListenerHandleLiveHDCandidates(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicCandidates := "candidate/cloud" - mqttClient.Subscribe(topicCandidates, 0, func(c mqtt.Client, msg mqtt.Message) { - if communication.CameraConnected { - var candidate models.Candidate - json.Unmarshal(msg.Payload(), &candidate) - if candidate.CloudKey == config.Key { - key := candidate.CloudKey + "/" + candidate.Cuuid - candidatesExists := false - var channel chan string - for !candidatesExists { - webrtc.CandidatesMutex.Lock() - channel, candidatesExists = webrtc.CandidateArrays[key] - webrtc.CandidatesMutex.Unlock() - } - log.Log.Info("MQTTListenerHandleLiveHDCandidates: " + string(msg.Payload())) - channel <- string(msg.Payload()) - } - } else { - log.Log.Info("MQTTListenerHandleLiveHDCandidates: received candidate, but camera is not connected.") - } - }) -} +func HandleNavigatePTZ(mqttClient mqtt.Client, hubKey string, payload models.Payload, configuration *models.Configuration, communication *models.Communication) { + value := payload.Value + jsonData, _ := json.Marshal(value) + var navigatePTZPayload models.NavigatePTZPayload + json.Unmarshal(jsonData, &navigatePTZPayload) -func MQTTListenerHandleONVIF(mqttClient mqtt.Client, hubKey string, configuration *models.Configuration, communication *models.Communication) { - config := configuration.Config - topicOnvif := fmt.Sprintf("kerberos/onvif/%s", config.Key) - mqttClient.Subscribe(topicOnvif, 0, func(c mqtt.Client, msg mqtt.Message) { + if navigatePTZPayload.Timestamp != 0 { if communication.CameraConnected { + action := navigatePTZPayload.Action var onvifAction models.OnvifAction - json.Unmarshal(msg.Payload(), &onvifAction) + json.Unmarshal([]byte(action), &onvifAction) communication.HandleONVIF <- onvifAction - log.Log.Info("MQTTListenerHandleONVIF: Received an action - " + onvifAction.Action) + log.Log.Info("HandleNavigatePTZ: Received an action - " + onvifAction.Action) + } else { - log.Log.Info("MQTTListenerHandleONVIF: received action, but camera is not connected.") + log.Log.Info("HandleNavigatePTZ: received action, but camera is not connected.") } - }) + } +} + +func DisconnectMQTT(mqttClient mqtt.Client, config *models.Config) { + if mqttClient != nil { + // Cleanup all subscriptions + // New methods + mqttClient.Unsubscribe("kerberos/agent/" + PREV_HubKey) + mqttClient.Disconnect(1000) + mqttClient = nil + log.Log.Info("DisconnectMQTT: MQTT client disconnected.") + } } diff --git a/machinery/src/webrtc/main.go b/machinery/src/webrtc/main.go index 56b8a175..eef62c27 100644 --- a/machinery/src/webrtc/main.go +++ b/machinery/src/webrtc/main.go @@ -87,19 +87,22 @@ func (w WebRTC) CreateOffer(sd []byte) pionWebRTC.SessionDescription { return offer } -func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.SDPPayload, candidates chan string) { +func InitializeWebRTCConnection(configuration *models.Configuration, communication *models.Communication, mqttClient mqtt.Client, videoTrack *pionWebRTC.TrackLocalStaticSample, audioTrack *pionWebRTC.TrackLocalStaticSample, handshake models.RequestHDStreamPayload, candidates chan string) { config := configuration.Config - deviceKey := config.Key stunServers := []string{config.STUNURI} turnServers := []string{config.TURNURI} turnServersUsername := config.TURNUsername turnServersCredential := config.TURNPassword + // Set variables + hubKey := handshake.HubKey + sessionDescription := handshake.SessionDescription + // Create WebRTC object w := CreateWebRTC(deviceKey, stunServers, turnServers, turnServersUsername, turnServersCredential) - sd, err := w.DecodeSessionDescription(handshake.Sdp) + sd, err := w.DecodeSessionDescription(sessionDescription) if err == nil { @@ -122,7 +125,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati Credential: w.TurnServersCredential, }, }, - //ICETransportPolicy: pionWebRTC.ICETransportPolicyRelay, }, ) @@ -143,7 +145,7 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati peerConnection.OnICEConnectionStateChange(func(connectionState pionWebRTC.ICEConnectionState) { if connectionState == pionWebRTC.ICEConnectionStateDisconnected { atomic.AddInt64(&peerConnectionCount, -1) - peerConnections[handshake.Cuuid] = nil + peerConnections[handshake.SessionID] = nil close(candidates) close(w.PacketsCount) if err := peerConnection.Close(); err != nil { @@ -152,9 +154,12 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati } else if connectionState == pionWebRTC.ICEConnectionStateConnected { atomic.AddInt64(&peerConnectionCount, 1) } else if connectionState == pionWebRTC.ICEConnectionStateChecking { + // Iterate over the candidates and send them to the remote client + // Non blocking channel for candidate := range candidates { log.Log.Info("InitializeWebRTCConnection: Received candidate.") if candidateErr := peerConnection.AddICECandidate(pionWebRTC.ICECandidateInit{Candidate: string(candidate)}); candidateErr != nil { + log.Log.Error("InitializeWebRTCConnection: something went wrong while adding candidate: " + candidateErr.Error()) } } } @@ -167,7 +172,6 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati panic(err) } - //gatherCompletePromise := pionWebRTC.GatheringCompletePromise(peerConnection) answer, err := peerConnection.CreateAnswer(nil) if err != nil { panic(err) @@ -175,37 +179,64 @@ func InitializeWebRTCConnection(configuration *models.Configuration, communicati panic(err) } - // When an ICE candidate is available send to the other Pion instance - // the other Pion instance will add this candidate by calling AddICECandidate - var candidatesMux sync.Mutex + // When an ICE candidate is available send to the other peer using the signaling server (MQTT). + // The other peer will add this candidate by calling AddICECandidate peerConnection.OnICECandidate(func(candidate *pionWebRTC.ICECandidate) { - if candidate == nil { return } - candidatesMux.Lock() - defer candidatesMux.Unlock() - - topic := fmt.Sprintf("%s/%s/candidate/edge", deviceKey, handshake.Cuuid) - log.Log.Info("InitializeWebRTCConnection: Send candidate to " + topic) - candiInit := candidate.ToJSON() + // Create a config map + valueMap := make(map[string]interface{}) + candateJSON := candidate.ToJSON() sdpmid := "0" - candiInit.SDPMid = &sdpmid - candi, err := json.Marshal(candiInit) + candateJSON.SDPMid = &sdpmid + candateBinary, err := json.Marshal(candateJSON) + if err == nil { + valueMap["candidate"] = string(candateBinary) + } else { + log.Log.Info("HandleRequestConfig: something went wrong while marshalling candidate: " + err.Error()) + } + + // We'll send the candidate to the hub + message := models.Message{ + Payload: models.Payload{ + Action: "receive-hd-candidates", + DeviceId: configuration.Config.Key, + Value: valueMap, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) if err == nil { - log.Log.Info("InitializeWebRTCConnection:" + string(candi)) - token := mqttClient.Publish(topic, 2, false, candi) - token.Wait() + mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } else { + log.Log.Info("HandleRequestConfig: something went wrong while sending acknowledge config to hub: " + string(payload)) } }) - peerConnections[handshake.Cuuid] = peerConnection + // Create a channel which will be used to send candidates to the other peer + peerConnections[handshake.SessionID] = peerConnection if err == nil { - topic := fmt.Sprintf("%s/%s/answer", deviceKey, handshake.Cuuid) - log.Log.Info("InitializeWebRTCConnection: Send SDP answer to " + topic) - mqttClient.Publish(topic, 2, false, []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP)))) + // Create a config map + valueMap := make(map[string]interface{}) + valueMap["sdp"] = []byte(base64.StdEncoding.EncodeToString([]byte(answer.SDP))) + log.Log.Info("InitializeWebRTCConnection: Send SDP answer") + + // We'll send the candidate to the hub + message := models.Message{ + Payload: models.Payload{ + Action: "receive-hd-answer", + DeviceId: configuration.Config.Key, + Value: valueMap, + }, + } + payload, err := models.PackageMQTTMessage(configuration, message) + if err == nil { + mqttClient.Publish("kerberos/hub/"+hubKey, 0, false, payload) + } else { + log.Log.Info("HandleRequestConfig: something went wrong while sending acknowledge config to hub: " + string(payload)) + } } } } else { @@ -358,16 +389,9 @@ func WriteToTrack(livestreamCursor *pubsub.QueueCursor, configuration *models.Co pkt.Data = append(codecData.(h264parser.CodecData).SPS(), pkt.Data...) pkt.Data = append(annexbNALUStartCode(), pkt.Data...) log.Log.Info("WriteToTrack: Sending keyframe") - - if config.Capture.ForwardWebRTC == "true" { - log.Log.Info("WriteToTrack: Sending keep a live to remote broker.") - topic := fmt.Sprintf("kerberos/webrtc/keepalive/%s", config.Key) - mqttClient.Publish(topic, 2, false, "1") - } } if start { - sample := pionMedia.Sample{Data: pkt.Data, Duration: bufferDuration} if config.Capture.ForwardWebRTC == "true" { samplePacket, err := json.Marshal(sample)