diff --git a/protocol/messenger.go b/protocol/messenger.go index 77f5f2c025..f1431e9d0a 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -705,6 +705,10 @@ func (m *Messenger) processSentMessage(id string) error { return err } + if m.telemetryClient != nil { + m.telemetryClient.PushRawMessageByType(context.Background(), rawMessage.MessageType.String(), uint32(len(rawMessage.Payload))) + } + return nil } @@ -2102,6 +2106,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe if m.dispatchMessageTestCallback != nil { m.dispatchMessageTestCallback(rawMessage) } + return rawMessage, nil } diff --git a/telemetry/client.go b/telemetry/client.go index 5ded3e7b74..25400b136f 100644 --- a/telemetry/client.go +++ b/telemetry/client.go @@ -60,6 +60,8 @@ const ( MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed" // Total number and size of Waku messages sent by this node SentMessageTotalMetric TelemetryType = "SentMessageTotal" + // Size and type of raw message successfully returned by dispatchMessage + RawMessageByTypeMetric TelemetryType = "RawMessageByType" ) const MaxRetryCache = 5000 @@ -151,6 +153,10 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) { c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize}) } +func (c *Client) PushRawMessageByType(ctx context.Context, messageType string, messageSize uint32) { + c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: messageType, Size: messageSize}) +} + type ReceivedMessages struct { Filter transport.Filter SSHMessage *types.Message @@ -206,6 +212,11 @@ type SentMessageTotal struct { Size uint32 } +type RawMessageByType struct { + MessageType string + Size uint32 +} + type Client struct { serverURL string httpClient *http.Client @@ -408,6 +419,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{}) TelemetryType: SentMessageTotalMetric, TelemetryData: c.ProcessSentMessageTotal(v), } + case RawMessageByType: + telemetryRequest = TelemetryRequest{ + Id: c.nextId, + TelemetryType: RawMessageByTypeMetric, + TelemetryData: c.ProcessRawMessageByType(v), + } default: c.logger.Error("Unknown telemetry data type") return @@ -589,6 +606,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso return c.marshalPostBody(postBody) } +func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage { + postBody := c.commonPostBody() + postBody["messageType"] = rawMessageByType.MessageType + postBody["size"] = rawMessageByType.Size + return c.marshalPostBody(postBody) +} + // Helper function to marshal post body and handle errors func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage { body, err := json.Marshal(postBody) diff --git a/telemetry/client_test.go b/telemetry/client_test.go index 7c1e6e7848..d6b3141d49 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -611,3 +611,19 @@ func TestProcessSentMessageTotal(t *testing.T) { } runTestCase(t, tc) } + +func TestProcessRawMessageByType(t *testing.T) { + tc := testCase{ + name: "RawMessageByType", + input: RawMessageByType{ + MessageType: "test-message-type", + Size: 1234, + }, + expectedType: RawMessageByTypeMetric, + expectedFields: map[string]interface{}{ + "messageType": "test-message-type", + "size": float64(1234), + }, + } + runTestCase(t, tc) +}