Skip to content

Commit

Permalink
feat(telem)_: track raw message by type on dispatch
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Dec 8, 2024
1 parent e7cc535 commit 055bb1d
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 0 deletions.
5 changes: 5 additions & 0 deletions protocol/messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -705,6 +705,10 @@ func (m *Messenger) processSentMessage(id string) error {
return err
}

if m.telemetryClient != nil {
m.telemetryClient.PushRawMessageByType(context.Background(), rawMessage.MessageType.String(), uint32(len(rawMessage.Payload)))
}

return nil
}

Expand Down Expand Up @@ -2102,6 +2106,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, rawMessage common.RawMe
if m.dispatchMessageTestCallback != nil {
m.dispatchMessageTestCallback(rawMessage)
}

return rawMessage, nil
}

Expand Down
24 changes: 24 additions & 0 deletions telemetry/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ const (
MessageDeliveryConfirmedMetric TelemetryType = "MessageDeliveryConfirmed"
// Total number and size of Waku messages sent by this node
SentMessageTotalMetric TelemetryType = "SentMessageTotal"
// Size and type of raw message successfully returned by dispatchMessage
RawMessageByTypeMetric TelemetryType = "RawMessageByType"
)

const MaxRetryCache = 5000
Expand Down Expand Up @@ -151,6 +153,10 @@ func (c *Client) PushSentMessageTotal(ctx context.Context, messageSize uint32) {
c.processAndPushTelemetry(ctx, SentMessageTotal{Size: messageSize})
}

func (c *Client) PushRawMessageByType(ctx context.Context, messageType string, messageSize uint32) {
c.processAndPushTelemetry(ctx, RawMessageByType{MessageType: messageType, Size: messageSize})
}

type ReceivedMessages struct {
Filter transport.Filter
SSHMessage *types.Message
Expand Down Expand Up @@ -206,6 +212,11 @@ type SentMessageTotal struct {
Size uint32
}

type RawMessageByType struct {
MessageType string
Size uint32
}

type Client struct {
serverURL string
httpClient *http.Client
Expand Down Expand Up @@ -408,6 +419,12 @@ func (c *Client) processAndPushTelemetry(ctx context.Context, data interface{})
TelemetryType: SentMessageTotalMetric,
TelemetryData: c.ProcessSentMessageTotal(v),
}
case RawMessageByType:
telemetryRequest = TelemetryRequest{
Id: c.nextId,
TelemetryType: RawMessageByTypeMetric,
TelemetryData: c.ProcessRawMessageByType(v),
}
default:
c.logger.Error("Unknown telemetry data type")
return
Expand Down Expand Up @@ -589,6 +606,13 @@ func (c *Client) ProcessSentMessageTotal(sentMessageTotal SentMessageTotal) *jso
return c.marshalPostBody(postBody)
}

func (c *Client) ProcessRawMessageByType(rawMessageByType RawMessageByType) *json.RawMessage {
postBody := c.commonPostBody()
postBody["messageType"] = rawMessageByType.MessageType
postBody["size"] = rawMessageByType.Size
return c.marshalPostBody(postBody)
}

// Helper function to marshal post body and handle errors
func (c *Client) marshalPostBody(postBody map[string]interface{}) *json.RawMessage {
body, err := json.Marshal(postBody)
Expand Down
16 changes: 16 additions & 0 deletions telemetry/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -611,3 +611,19 @@ func TestProcessSentMessageTotal(t *testing.T) {
}
runTestCase(t, tc)
}

func TestProcessRawMessageByType(t *testing.T) {
tc := testCase{
name: "RawMessageByType",
input: RawMessageByType{
MessageType: "test-message-type",
Size: 1234,
},
expectedType: RawMessageByTypeMetric,
expectedFields: map[string]interface{}{
"messageType": "test-message-type",
"size": float64(1234),
},
}
runTestCase(t, tc)
}

0 comments on commit 055bb1d

Please sign in to comment.