From e0c6375261f5a945db6f8fa04b9d789b986ef66c Mon Sep 17 00:00:00 2001 From: Cedric Verstraeten Date: Sun, 25 Aug 2024 20:27:46 +0200 Subject: [PATCH] IO fix: workaround for ONVIF event system --- machinery/docs/docs.go | 40 ++++++++++--- machinery/docs/swagger.json | 35 ++++++++--- machinery/docs/swagger.yaml | 28 ++++++--- machinery/main.go | 2 +- machinery/src/cloud/Cloud.go | 77 ++++++++++++++++++------- machinery/src/components/backchannel.go | 1 - machinery/src/onvif/main.go | 65 +++++++++++---------- machinery/src/utils/main.go | 2 + 8 files changed, 172 insertions(+), 78 deletions(-) diff --git a/machinery/docs/docs.go b/machinery/docs/docs.go index 422b332b..9487352f 100644 --- a/machinery/docs/docs.go +++ b/machinery/docs/docs.go @@ -1,5 +1,4 @@ -// Package docs GENERATED BY SWAG; DO NOT EDIT -// This file was generated by swaggo/swag +// Package docs Code generated by swaggo/swag. DO NOT EDIT package docs import "github.com/swaggo/swag" @@ -388,7 +387,7 @@ const docTemplate = `{ "operationId": "snapshot-base64", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -403,7 +402,7 @@ const docTemplate = `{ "operationId": "snapshot-jpeg", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -476,7 +475,7 @@ const docTemplate = `{ "operationId": "config", "responses": { "200": { - "description": "" + "description": "OK" } } }, @@ -500,7 +499,7 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -515,7 +514,7 @@ const docTemplate = `{ "operationId": "dashboard", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -530,7 +529,7 @@ const docTemplate = `{ "operationId": "days", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -590,7 +589,7 @@ const docTemplate = `{ ], "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -803,6 +802,9 @@ const docTemplate = `{ "description": "obsolete", "type": "string" }, + "hub_encryption": { + "type": "string" + }, "hub_key": { "type": "string" }, @@ -839,6 +841,12 @@ const docTemplate = `{ "offline": { "type": "string" }, + "realtimeprocessing": { + "type": "string" + }, + "realtimeprocessing_topic": { + "type": "string" + }, "region": { "$ref": "#/definitions/models.Region" }, @@ -863,6 +871,9 @@ const docTemplate = `{ "timezone": { "type": "string" }, + "turn_force": { + "type": "string" + }, "turn_password": { "type": "string" }, @@ -957,9 +968,18 @@ const docTemplate = `{ "rtsp": { "type": "string" }, + "sub_fps": { + "type": "string" + }, + "sub_height": { + "type": "integer" + }, "sub_rtsp": { "type": "string" }, + "sub_width": { + "type": "integer" + }, "width": { "type": "integer" } @@ -1166,6 +1186,8 @@ var SwaggerInfo = &swag.Spec{ Description: "This is the API for using and configure Kerberos Agent.", InfoInstanceName: "swagger", SwaggerTemplate: docTemplate, + LeftDelim: "{{", + RightDelim: "}}", } func init() { diff --git a/machinery/docs/swagger.json b/machinery/docs/swagger.json index deec6111..fa809e95 100644 --- a/machinery/docs/swagger.json +++ b/machinery/docs/swagger.json @@ -380,7 +380,7 @@ "operationId": "snapshot-base64", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -395,7 +395,7 @@ "operationId": "snapshot-jpeg", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -468,7 +468,7 @@ "operationId": "config", "responses": { "200": { - "description": "" + "description": "OK" } } }, @@ -492,7 +492,7 @@ ], "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -507,7 +507,7 @@ "operationId": "dashboard", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -522,7 +522,7 @@ "operationId": "days", "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -582,7 +582,7 @@ ], "responses": { "200": { - "description": "" + "description": "OK" } } } @@ -795,6 +795,9 @@ "description": "obsolete", "type": "string" }, + "hub_encryption": { + "type": "string" + }, "hub_key": { "type": "string" }, @@ -831,6 +834,12 @@ "offline": { "type": "string" }, + "realtimeprocessing": { + "type": "string" + }, + "realtimeprocessing_topic": { + "type": "string" + }, "region": { "$ref": "#/definitions/models.Region" }, @@ -855,6 +864,9 @@ "timezone": { "type": "string" }, + "turn_force": { + "type": "string" + }, "turn_password": { "type": "string" }, @@ -949,9 +961,18 @@ "rtsp": { "type": "string" }, + "sub_fps": { + "type": "string" + }, + "sub_height": { + "type": "integer" + }, "sub_rtsp": { "type": "string" }, + "sub_width": { + "type": "integer" + }, "width": { "type": "integer" } diff --git a/machinery/docs/swagger.yaml b/machinery/docs/swagger.yaml index 11f1c437..c31e7959 100644 --- a/machinery/docs/swagger.yaml +++ b/machinery/docs/swagger.yaml @@ -95,6 +95,8 @@ definitions: heartbeaturi: description: obsolete type: string + hub_encryption: + type: string hub_key: type: string hub_private_key: @@ -119,6 +121,10 @@ definitions: type: string offline: type: string + realtimeprocessing: + type: string + realtimeprocessing_topic: + type: string region: $ref: '#/definitions/models.Region' remove_after_upload: @@ -135,6 +141,8 @@ definitions: type: array timezone: type: string + turn_force: + type: string turn_password: type: string turn_username: @@ -196,8 +204,14 @@ definitions: type: string rtsp: type: string + sub_fps: + type: string + sub_height: + type: integer sub_rtsp: type: string + sub_width: + type: integer width: type: integer type: object @@ -564,7 +578,7 @@ paths: operationId: snapshot-base64 responses: "200": - description: "" + description: OK summary: Get a snapshot from the camera in base64. tags: - camera @@ -574,7 +588,7 @@ paths: operationId: snapshot-jpeg responses: "200": - description: "" + description: OK summary: Get a snapshot from the camera in jpeg format. tags: - camera @@ -624,7 +638,7 @@ paths: operationId: config responses: "200": - description: "" + description: OK summary: Get the current configuration. tags: - config @@ -640,7 +654,7 @@ paths: $ref: '#/definitions/models.Config' responses: "200": - description: "" + description: OK summary: Update the current configuration. tags: - config @@ -650,7 +664,7 @@ paths: operationId: dashboard responses: "200": - description: "" + description: OK summary: Get all information showed on the dashboard. tags: - general @@ -660,7 +674,7 @@ paths: operationId: days responses: "200": - description: "" + description: OK summary: Get all days stored in the recordings directory. tags: - general @@ -698,7 +712,7 @@ paths: $ref: '#/definitions/models.EventFilter' responses: "200": - description: "" + description: OK summary: Get the latest recordings (events) from the recordings directory. tags: - general diff --git a/machinery/main.go b/machinery/main.go index e4335961..a356669b 100644 --- a/machinery/main.go +++ b/machinery/main.go @@ -19,7 +19,7 @@ import ( "gopkg.in/DataDog/dd-trace-go.v1/profiler" ) -var VERSION = "3.2.0" +var VERSION = utils.VERSION func main() { // You might be interested in debugging the agent. diff --git a/machinery/src/cloud/Cloud.go b/machinery/src/cloud/Cloud.go index 02efd3d6..4f86cd25 100644 --- a/machinery/src/cloud/Cloud.go +++ b/machinery/src/cloud/Cloud.go @@ -229,17 +229,17 @@ func HandleHeartBeat(configuration *models.Configuration, communication *models. } else { client = &http.Client{} } - config := configuration.Config - kerberosAgentVersion := "3.1.8" + kerberosAgentVersion := utils.VERSION - // Get a pull point address - var pullPointAddress string - if config.Capture.IPCamera.ONVIFXAddr != "" { + // Create a loop pull point address, which we will use to retrieve async events + // As you'll read below camera manufactures are having different implementations of events. + var pullPointAddressLoopState string + if configuration.Config.Capture.IPCamera.ONVIFXAddr != "" { cameraConfiguration := configuration.Config.Capture.IPCamera device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration) - if err == nil { - pullPointAddress, err = onvif.CreatePullPointSubscription(device) + if err != nil { + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } @@ -263,6 +263,7 @@ loop: cameraConfiguration := configuration.Config.Capture.IPCamera device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration) if err == nil { + // We will try to retrieve the PTZ configurations from the device. onvifEnabled = "true" configurations, err := onvif.GetPTZConfigurationsFromDevice(device) if err == nil { @@ -297,9 +298,43 @@ loop: // We will also fetch some events, to know the status of the inputs and outputs. // More event types might be added. - if pullPointAddress != "" { + // -- We have two differen pull point subscriptions, one for the initials events and one for the loop. + // -- Some cameras do send recurrent events, others don't. + // a. For some older Hikvision models, events are send repeatedly (if input is high) with the strong state (set to false). + // - In this scenarion we are using a polling mechanism and set a timestamp to understand if the input is still active. + // b. For some newer Hikvision models, Avigilon, events are send only once (if state is set active). + // - In this scenario we are creating a new subscription to retrieve the initial (current) state of the inputs and outputs. + + // Get a new pull point address, to get the initiatal state of the inputs and outputs. + pullPointAddressInitialState, err := onvif.CreatePullPointSubscription(device) + if err != nil { + log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) + } + if pullPointAddressInitialState != "" { + log.Log.Debug("cloud.HandleHeartBeat(): Fetching events from pullPointAddressInitialState") + events, err := onvif.GetEventMessages(device, pullPointAddressInitialState) + log.Log.Debug("cloud.HandleHeartBeat(): Completed fetching events from pullPointAddressInitialState") + if err == nil && len(events) > 0 { + onvifEventsList, err = json.Marshal(events) + if err != nil { + log.Log.Error("cloud.HandleHeartBeat(): error while marshalling events: " + err.Error()) + onvifEventsList = []byte("[]") + } + } else if err != nil { + log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error()) + onvifEventsList = []byte("[]") + } else if len(events) == 0 { + log.Log.Debug("cloud.HandleHeartBeat(): no events found.") + onvifEventsList = []byte("[]") + } + onvif.UnsubscribePullPoint(device, pullPointAddressInitialState) + } - events, err := onvif.GetEventMessages(device, pullPointAddress) + // We do a second run an a long-living subscription to get the events asynchronously. + if pullPointAddressLoopState != "" { + log.Log.Debug("cloud.HandleHeartBeat(): Fetching events from pullPointAddressLoopState") + events, err := onvif.GetEventMessages(device, pullPointAddressLoopState) + log.Log.Debug("cloud.HandleHeartBeat(): Completed fetching events from pullPointAddressLoopState") if err == nil && len(events) > 0 { onvifEventsList, err = json.Marshal(events) if err != nil { @@ -309,9 +344,7 @@ loop: } else if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while getting events: " + err.Error()) onvifEventsList = []byte("[]") - // Try to unsubscribe and subscribe again. - onvif.UnsubscribePullPoint(device, pullPointAddress) - pullPointAddress, err = onvif.CreatePullPointSubscription(device) + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) if err != nil { log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } @@ -321,14 +354,17 @@ loop: } } else { log.Log.Debug("cloud.HandleHeartBeat(): no pull point address found.") - onvifEventsList = []byte("[]") - - // Try again - pullPointAddress, err = onvif.CreatePullPointSubscription(device) + pullPointAddressLoopState, err = onvif.CreatePullPointSubscription(device) if err != nil { - log.Log.Debug("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) + log.Log.Error("cloud.HandleHeartBeat(): error while creating pull point subscription: " + err.Error()) } + } + + // It also might be that events are not supported by the camera, in that case we will try to get the digital inputs and outputs. + // Through the `device` API, the `GetDigitalInputs` and `GetDigitalOutputs` functions are called. + // The disadvantage of this approach is that we don't have the state of the inputs and outputs (which is crazy..) + if pullPointAddressInitialState == "" && pullPointAddressLoopState == "" { var events []onvif.ONVIFEvents outputs, err := onvif.GetRelayOutputs(device) if err != nil { @@ -367,7 +403,6 @@ loop: onvifEventsList = []byte("[]") } } - } else { log.Log.Error("cloud.HandleHeartBeat(): error while connecting to ONVIF device: " + err.Error()) onvifPresetsList = []byte("[]") @@ -612,11 +647,11 @@ loop: } } - if pullPointAddress != "" { + if pullPointAddressLoopState != "" { cameraConfiguration := configuration.Config.Capture.IPCamera device, _, err := onvif.ConnectToOnvifDevice(&cameraConfiguration) - if err == nil { - onvif.UnsubscribePullPoint(device, pullPointAddress) + if err != nil { + onvif.UnsubscribePullPoint(device, pullPointAddressLoopState) } } diff --git a/machinery/src/components/backchannel.go b/machinery/src/components/backchannel.go index 5d693a96..07698545 100644 --- a/machinery/src/components/backchannel.go +++ b/machinery/src/components/backchannel.go @@ -87,7 +87,6 @@ func WriteFileToBackChannel(infile av.DemuxCloser) { break } // Send to backchannel - fmt.Println(buffer) infile.Write(buffer, 2, uint32(count)) count = count + 1024 diff --git a/machinery/src/onvif/main.go b/machinery/src/onvif/main.go index fedab64a..c4e19369 100644 --- a/machinery/src/onvif/main.go +++ b/machinery/src/onvif/main.go @@ -5,7 +5,6 @@ import ( "encoding/json" "encoding/xml" "errors" - "fmt" "io" "strconv" "strings" @@ -809,14 +808,16 @@ func GetPresetsFromDevice(device *onvif.Device) ([]models.OnvifActionPreset, err return presets, err } + presetsList := "" for _, preset := range presetsResponse.Preset { - log.Log.Debug("onvif.main.GetPresetsFromDevice(): " + string(preset.Name) + " (" + string(preset.Token) + ")") p := models.OnvifActionPreset{ Name: string(preset.Name), Token: string(preset.Token), } + presetsList += string(preset.Name) + " (" + string(preset.Token) + "), " presets = append(presets, p) } + log.Log.Debug("onvif.main.GetPresetsFromDevice(): " + presetsList) return presets, err } @@ -991,7 +992,8 @@ func CreatePullPointSubscription(dev *onvif.Device) (string, error) { Filter: &event.FilterType{ TopicExpression: &event.TopicExpressionType{ Dialect: xsd.String("http://www.onvif.org/ver10/tev/topicExpression/ConcreteSet"), - TopicKinds: "tns1:Device/Trigger//.", + TopicKinds: "tns1:Device/Trigger//.", // -> This works for Avigilon, Hanwa, Hikvision + // TopicKinds: "//.", -> This works for Axis, but throws other errors. }, }, }) @@ -1048,14 +1050,10 @@ func GetInputOutputs() ([]ONVIFEvents, error) { // We have some odd behaviour for inputs: the logical state is set to false even if circuit is closed. However we do see repeated events (looks like heartbeats). // We are assuming that if we do not receive an event for 15 seconds the input is inactive, otherwise we set to active. for key, value := range inputOutputDeviceMap { - if value.Type == "input" { - if time.Now().Unix()-value.Timestamp > 15 { - value.Value = "false" - } else { - value.Value = "true" - } - inputOutputDeviceMap[key] = value + if time.Now().Unix()-value.Timestamp < 15 && value.Value == "false" { + value.Value = "true" } + inputOutputDeviceMap[key] = value eventsArray = append(eventsArray, *value) } for _, value := range eventsArray { @@ -1080,7 +1078,7 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents } else { // Pull message pullMessage := event.PullMessages{ - Timeout: xsd.Duration("PT30S"), + Timeout: xsd.Duration("PT5S"), MessageLimit: 10, } requestBody, err := xml.Marshal(pullMessage) @@ -1100,7 +1098,6 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents res.Body.Close() if err == nil { stringBody := string(bs) - fmt.Println(stringBody) decodedXML, et, err := getXMLNode(stringBody, "PullMessagesResponse") if err != nil { log.Log.Error("onvif.main.GetEventMessages(pullMessages): " + err.Error()) @@ -1116,9 +1113,9 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents for _, message := range pullMessagesResponse.NotificationMessage { log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Topic.TopicKinds)) - if len(message.Message.Message.Data.SimpleItem) > 0 { - log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Message.Message.Data.SimpleItem[0].Name) + " " + string(message.Message.Message.Data.SimpleItem[0].Value)) - } + //if len(message.Message.Message.Data.SimpleItem) > 0 { + // log.Log.Debug("onvif.main.GetEventMessages(pullMessages): " + string(message.Message.Message.Data.SimpleItem[0].Name) + " " + string(message.Message.Message.Data.SimpleItem[0].Value)) + //} if message.Topic.TopicKinds == "tns1:Device/Trigger/Relay" || message.Topic.TopicKinds == "tns1:Device/tns1:Trigger/tns1:Relay" { // This is for avigilon cameras if len(message.Message.Message.Data.SimpleItem) > 0 { @@ -1126,7 +1123,8 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents message.Message.Message.Data.SimpleItem[0].Name == "RelayLogicalState" { // On avigilon it's called RelayLogicalState key := string(message.Message.Message.Source.SimpleItem[0].Value) value := string(message.Message.Message.Data.SimpleItem[0].Value) - log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value) + propertyOperation := string(message.Message.Message.PropertyOperation) + log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value + " (" + propertyOperation + ")") // Depending on the onvif library they might use different values for active and inactive. if value == "active" || value == "1" { @@ -1137,17 +1135,18 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents // Check if key exists in map // If it does not exist we'll add it to the map otherwise we'll update the value. - if _, ok := inputOutputDeviceMap[key]; !ok { - inputOutputDeviceMap[key] = &ONVIFEvents{ - Key: key, + if _, ok := inputOutputDeviceMap[key+"-output"]; !ok { + inputOutputDeviceMap[key+"-output"] = &ONVIFEvents{ + Key: key + "-output", Type: "output", Value: value, Timestamp: 0, } - } else { - log.Log.Debug("onvif.main.GetEventMessages(pullMessages) output: " + key + " " + value) - inputOutputDeviceMap[key].Value = value - inputOutputDeviceMap[key].Timestamp = time.Now().Unix() + } else if propertyOperation == "Changed" { + inputOutputDeviceMap[key+"-output"].Value = value + inputOutputDeviceMap[key+"-output"].Timestamp = time.Now().Unix() + } else if propertyOperation == "Initialized" { + inputOutputDeviceMap[key+"-output"].Value = value } } } @@ -1158,7 +1157,8 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents message.Message.Message.Data.SimpleItem[0].Name == "Level" { // On avigilon it's called level key := string(message.Message.Message.Source.SimpleItem[0].Value) value := string(message.Message.Message.Data.SimpleItem[0].Value) - log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value) + propertyOperation := string(message.Message.Message.PropertyOperation) + log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value + " (" + propertyOperation + ")") // Depending on the onvif library they might use different values for active and inactive. if value == "active" || value == "1" { @@ -1169,17 +1169,18 @@ func GetEventMessages(dev *onvif.Device, pullPointAddress string) ([]ONVIFEvents // Check if key exists in map // If it does not exist we'll add it to the map otherwise we'll update the value. - if _, ok := inputOutputDeviceMap[key]; !ok { - inputOutputDeviceMap[key] = &ONVIFEvents{ - Key: key, + if _, ok := inputOutputDeviceMap[key+"-input"]; !ok { + inputOutputDeviceMap[key+"-input"] = &ONVIFEvents{ + Key: key + "-input", Type: "input", Value: value, Timestamp: 0, } - } else { - log.Log.Debug("onvif.main.GetEventMessages(pullMessages) input: " + key + " " + value) - inputOutputDeviceMap[key].Value = value - inputOutputDeviceMap[key].Timestamp = time.Now().Unix() + } else if propertyOperation == "Changed" { + inputOutputDeviceMap[key+"-input"].Value = value + inputOutputDeviceMap[key+"-input"].Timestamp = time.Now().Unix() + } else if propertyOperation == "Initialized" { + inputOutputDeviceMap[key+"-input"].Value = value } } } @@ -1268,7 +1269,7 @@ func TriggerRelayOutput(dev *onvif.Device, output string) (err error) { // this in the future "kerberos-io/onvif" library. if err == nil { token := relayoutputs.RelayOutputs[0].Token - if output == string(token) { + if output == string(token+"-output") { outputState := device.SetRelayOutputState{ RelayOutputToken: token, LogicalState: "active", diff --git a/machinery/src/utils/main.go b/machinery/src/utils/main.go index 2708a42b..22130fd1 100644 --- a/machinery/src/utils/main.go +++ b/machinery/src/utils/main.go @@ -23,6 +23,8 @@ import ( "github.com/kerberos-io/agent/machinery/src/models" ) +const VERSION = "3.2.1" + const letterBytes = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ" // MaxUint8 - maximum value which can be held in an uint8