Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: statsviz.TimeSeries.GetValue is called multiple times per second #121

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion _example/default/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func main() {
go example.Work()

// Register a Statsviz server on the default mux.
statsviz.Register(http.DefaultServeMux)
statsviz.Register(http.DefaultServeMux, statsviz.EnableDataCache())

fmt.Println("Point your browser to http://localhost:8080/debug/statsviz/")
log.Fatal(http.ListenAndServe(":8080", nil))
Expand Down
18 changes: 18 additions & 0 deletions internal/plot/plots_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"runtime/debug"
"runtime/metrics"
"strings"
"sync"
"time"
)
Expand Down Expand Up @@ -156,6 +157,23 @@ func (pl *List) WriteValues(w io.Writer) error {
}

if err := json.NewEncoder(w).Encode(m); err != nil {
// If we fail to encode the metrics values, it's probably because
if strings.Contains(err.Error(), "NaN") {
for s, a := range m {
if v, ok := a.([]float64); ok {
for i := range v {
if v[i] != v[i] {
v[i] = 0
}
}
m[s] = v
}
}
err = json.NewEncoder(w).Encode(m)
if err == nil {
return nil
}
}
return fmt.Errorf("failed to write/convert metrics values to json: %v", err)
}
return nil
Expand Down
62 changes: 60 additions & 2 deletions statsviz.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,14 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -91,6 +94,13 @@ type Server struct {
root string // HTTP path root
plots *plot.List // plots shown on the user interface
userPlots []plot.UserPlot

enablePlotCache bool
cacheTime time.Time
plotCache []byte
plotCacheLock *sync.Mutex
idGen int32
mainId int32
}

// NewServer constructs a new Statsviz Server with the provided options, or the
Expand Down Expand Up @@ -134,6 +144,16 @@ func SendFrequency(intv time.Duration) Option {
}
}

// EnableDataCache Multiple web page share data.
// https://github.com/arl/statsviz/issues/119
func EnableDataCache() Option {
return func(s *Server) error {
s.enablePlotCache = true
s.plotCacheLock = &sync.Mutex{}
return nil
}
}

// Root changes the root path of the Statsviz user interface.
// The default is "/debug/statsviz".
func Root(path string) Option {
Expand Down Expand Up @@ -265,13 +285,16 @@ func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error
// requested. Call plots.Config() manually to ensure that s.plots internals
// are correctly initialized.
s.plots.Config()

//The time interval of tick.C is not precise,
//so use id to ensure that data generation
//will not cause data loss due to the time interval of tick.C
id := atomic.AddInt32(&s.idGen, 1)
for range tick.C {
w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
return err
}
if err := s.plots.WriteValues(w); err != nil {
if err := s.writeData(w, id); err != nil {
return err
}
if err := w.Close(); err != nil {
Expand All @@ -281,3 +304,38 @@ func (s *Server) sendStats(conn *websocket.Conn, frequency time.Duration) error

panic("unreachable")
}

func (s *Server) writeData(w io.WriteCloser, id int32) (err error) {
if !s.enablePlotCache {
return s.plots.WriteValues(w)
}
defer func() {
if err == nil {
_, err = w.Write(s.plotCache)
}
}()
if s.mainId == id || time.Since(s.cacheTime) >= s.intv {
s.plotCacheLock.Lock()
if s.mainId != id {
// double check
if time.Since(s.cacheTime) < s.intv {
s.plotCacheLock.Unlock()
return
} else {
// change main id to active id
s.mainId = id
}
}
buf := bytes.Buffer{}
// make s.cacheTime closer to tick.C time
start := time.Now()
if err = s.plots.WriteValues(&buf); err != nil {
s.plotCacheLock.Unlock()
return err
}
s.plotCache = buf.Bytes()
s.cacheTime = start
s.plotCacheLock.Unlock()
}
return
}
140 changes: 119 additions & 21 deletions statsviz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,12 +70,9 @@ func TestRoot(t *testing.T) {
testIndex(t, newServer(t, Root("/test/")).Index(), "http://example.com/test/")
}

func testWs(t *testing.T, f http.Handler, URL string) {
func testWs(t *testing.T, s *httptest.Server, URL string, number int, checkData func() any, check func(*testing.T, any)) {
t.Helper()

s := httptest.NewServer(f)
defer s.Close()

// Build a "ws://" url using the httptest server URL and the URL argument.
u1, err := url.Parse(s.URL)
if err != nil {
Expand All @@ -97,18 +94,31 @@ func testWs(t *testing.T, f http.Handler, URL string) {
defer ws.Close()

// Check the content of 2 consecutive payloads.
for i := 0; i < 2; i++ {

// Verifies that we've received 1 time series (goroutines) and one
// heatmap (sizeClasses).
var data struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}
if err := ws.ReadJSON(&data); err != nil {
for i := 0; i < number; i++ {
data := checkData()
if err := ws.ReadJSON(data); err != nil {
t.Fatalf("failed reading json from websocket: %v", err)
return
}
check(t, data)
}
}

func TestWs(t *testing.T) {
t.Parallel()
// Verifies that we've received 1 time series (goroutines) and one
// heatmap (sizeClasses).
type dataType struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}

s := httptest.NewServer(newServer(t).Ws())
defer s.Close()
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any {
return &dataType{}
}, func(t *testing.T, data1 any) {
data := data1.(*dataType)
// The time series must have one and only one element
if len(data.Goroutines) != 1 {
t.Errorf("len(goroutines) = %d, want 1", len(data.Goroutines))
Expand All @@ -117,13 +127,7 @@ func testWs(t *testing.T, f http.Handler, URL string) {
if len(data.SizeClasses) <= 1 {
t.Errorf("len(sizeClasses) = %d, want > 1", len(data.SizeClasses))
}
}
}

func TestWs(t *testing.T) {
t.Parallel()

testWs(t, newServer(t).Ws(), "http://example.com/debug/statsviz/ws")
})
}

func TestWsCantUpgrade(t *testing.T) {
Expand All @@ -141,7 +145,23 @@ func TestWsCantUpgrade(t *testing.T) {
func testRegister(t *testing.T, f http.Handler, baseURL string) {
testIndex(t, f, baseURL)
ws := strings.TrimRight(baseURL, "/") + "/ws"
testWs(t, f, ws)
type dataType struct {
Goroutines []uint64 `json:"goroutines"`
SizeClasses []uint64 `json:"size-classes"`
}
s := httptest.NewServer(f)
defer s.Close()
testWs(t, s, ws, 2, func() any {
return &dataType{}
}, func(t *testing.T, data1 any) {
data := data1.(*dataType)
if len(data.Goroutines) != 1 {
t.Errorf("len(goroutines) = %d, want 1", len(data.Goroutines))
}
if len(data.SizeClasses) <= 1 {
t.Errorf("len(sizeClasses) = %d, want > 1", len(data.SizeClasses))
}
})
}

func TestRegister(t *testing.T) {
Expand Down Expand Up @@ -174,6 +194,84 @@ func TestRegister(t *testing.T) {

testRegister(t, mux, "http://example.com/path/to/statsviz/")
})
type Data struct {
Test []float64 `json:"test"`
}

makeTestPlot := func() (TimeSeriesPlot, *int) {
num := 0
build, _ := TimeSeriesPlotConfig{
Title: "test",
Name: "test",
Series: []TimeSeries{
{
Name: "1",
GetValue: func() float64 {
num++
return 1
},
},
},
}.Build()
return build, &num
}

t.Run("customizePlot", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any {
return &Data{}
}, func(t *testing.T, a any) {
data := a.(*Data)
if len(data.Test) != 1 {
t.Fatalf("customizePlot failed,call num %d expect 1", len(data.Test))
}
})
time.Sleep(100 * time.Millisecond)
if *i != 4 {
t.Fatalf("dataCache2 failed,call num %d expect 4", *i)
}
})

t.Run("dataCache", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
EnableDataCache(),
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 1, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 3, func() any { return &Data{} }, func(t *testing.T, a any) {})
go testWs(t, s, "http://example.com/debug/statsviz/ws", 4, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 4, func() any { return &Data{} }, func(t *testing.T, a any) {})
time.Sleep(100 * time.Millisecond)
if *i != 4 {
t.Fatalf("dataCache failed,call num %d expect 4", *i)
}
})

t.Run("dataCache2", func(t *testing.T) {
t.Parallel()
plot, i := makeTestPlot()
s := httptest.NewServer(newServer(t,
EnableDataCache(),
TimeseriesPlot(plot),
).Ws())
defer s.Close()
go testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
testWs(t, s, "http://example.com/debug/statsviz/ws", 2, func() any { return &Data{} }, func(t *testing.T, a any) {})
time.Sleep(100 * time.Millisecond)
if *i != 2 {
t.Fatalf("dataCache failed,call num %d expect 2", *i)
}
})

t.Run("root+frequency", func(t *testing.T) {
t.Parallel()
Expand Down