From ea59fdcdcf6fa347a679a852a2d8860c1439bd0b Mon Sep 17 00:00:00 2001 From: Kian Parvin <46668016+kian99@users.noreply.github.com> Date: Mon, 22 Jul 2024 08:40:23 +0200 Subject: [PATCH] fix how monitoring proxy messages is handled (#1276) --- internal/rpc/proxy.go | 34 +++++++++++++++++++++------------- 1 file changed, 21 insertions(+), 13 deletions(-) diff --git a/internal/rpc/proxy.go b/internal/rpc/proxy.go index 5fde08f40..3cc01f9c3 100644 --- a/internal/rpc/proxy.go +++ b/internal/rpc/proxy.go @@ -168,6 +168,8 @@ func (c *writeLockConn) sendMessage(responseObject any, request *message) { c.writeJson(msg) } +// inflightMsgs holds only request messages that are +// still pending a response from a Juju controller. type inflightMsgs struct { controllerUUID string @@ -198,17 +200,23 @@ func (msgs *inflightMsgs) addMessage(msg *message) { msgs.messages[msg.RequestID] = msg } -func (msgs *inflightMsgs) removeMessage(msg *message) { - // monitor how long it took to handle this message - servermon.JujuCallDurationHistogram.WithLabelValues( - msg.Type, - msg.Request, - msgs.controllerUUID, - ).Observe(time.Since(msg.start).Seconds()) - +// removeMessage deletes the request message that corresponds +// to the responses message ID. +func (msgs *inflightMsgs) removeMessage(msgID uint64) { msgs.mu.Lock() - defer msgs.mu.Unlock() - delete(msgs.messages, msg.RequestID) + req, ok := msgs.messages[msgID] + if ok { + delete(msgs.messages, msgID) + } + msgs.mu.Unlock() + + if ok { + servermon.JujuCallDurationHistogram.WithLabelValues( + req.Type, + req.Request, + msgs.controllerUUID, + ).Observe(time.Since(req.start).Seconds()) + } } func (msgs *inflightMsgs) getMessage(key uint64) *message { @@ -353,7 +361,7 @@ func (p *clientProxy) start(ctx context.Context) error { if err := p.dst.writeJson(msg); err != nil { zapctx.Error(ctx, "clientProxy error writing to dst", zap.Error(err)) p.sendError(p.src, msg, err) - p.msgs.removeMessage(msg) + p.msgs.removeMessage(msg.RequestID) continue } } @@ -445,7 +453,7 @@ func (p *controllerProxy) start(ctx context.Context) error { return err } } - p.msgs.removeMessage(msg) + p.msgs.removeMessage(msg.RequestID) p.auditLogMessage(msg, true) zapctx.Debug(ctx, "Writing modified message to client", zap.Any("Message", msg)) if err := p.dst.writeJson(msg); err != nil { @@ -457,7 +465,7 @@ func (p *controllerProxy) start(ctx context.Context) error { func (p *controllerProxy) handleError(msg *message, err error) { p.sendError(p.dst, msg, err) - p.msgs.removeMessage(msg) + p.msgs.removeMessage(msg.RequestID) } // checkPermissionsRequired returns a nil map if no permissions are required.