Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(v2.11) [IMPROVED] stream sourcing: wq/interest stream with limits #5774

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 40 additions & 5 deletions server/jetstream_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,17 @@ func (js *jetStream) applyStreamEntries(mset *stream, ce *CommittedEntry, isReco
mset.clMu.Unlock()
}

if mset.inflightSubjects != nil {
mset.clMu.Lock()
n := mset.inflightSubjects[subject]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always tracking these?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or only for discard new per subject?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for discard new + discard new per subject for wq or interest streams (see line#7932)

if n > 1 {
mset.inflightSubjects[subject]--
} else {
delete(mset.inflightSubjects, subject)
}
mset.clMu.Unlock()
}

if err != nil {
if err == errLastSeqMismatch {

Expand Down Expand Up @@ -7966,7 +7977,7 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
name, stype, store := mset.cfg.Name, mset.cfg.Storage, mset.store
s, js, jsa, st, r, tierName, outq, node := mset.srv, mset.js, mset.jsa, mset.cfg.Storage, mset.cfg.Replicas, mset.tier, mset.outq, mset.node
maxMsgSize, lseq := int(mset.cfg.MaxMsgSize), mset.lseq
interestPolicy, discard, maxMsgs, maxBytes := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes
interestPolicy, discard, maxMsgs, maxBytes, discardNewPerSubject, maxMsgsPer := mset.cfg.Retention != LimitsPolicy, mset.cfg.Discard, mset.cfg.MaxMsgs, mset.cfg.MaxBytes, mset.cfg.DiscardNewPer, mset.cfg.MaxMsgsPer
isLeader, isSealed, compressOK := mset.isLeader(), mset.cfg.Sealed, mset.compressOK
mset.mu.RUnlock()

Expand Down Expand Up @@ -8143,27 +8154,36 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
mset.clseq = lseq + mset.clfs
}

// Check if we have an interest policy and discard new with max msgs or bytes.
// Check if we have an interest or working queue retention and discard new with max msgs or bytes.
// We need to deny here otherwise it could succeed on some peers and not others
// depending on consumer ack state. So we deny here, if we allow that means we know
// it would succeed on every peer.
if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0) {
if interestPolicy && discard == DiscardNew && (maxMsgs > 0 || maxBytes > 0 || (maxMsgsPer > 0 && discardNewPerSubject)) {
// Track inflight.
if mset.inflight == nil {
mset.inflight = make(map[uint64]uint64)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove nl?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not as that's how go fmt wants it

if stype == FileStorage {
mset.inflight[mset.clseq] = fileStoreMsgSize(subject, hdr, msg)
} else {
mset.inflight[mset.clseq] = memStoreMsgSize(subject, hdr, msg)
}

if mset.inflightSubjects == nil {
mset.inflightSubjects = make(map[string]uint64)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove nl

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can not remove newline as go fmt puts it back

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really? It does not for me..

mset.inflightSubjects[subject]++

var state StreamState
mset.store.FastState(&state)

var err error
if maxMsgs > 0 && state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) {
err = ErrMaxMsgs
if maxMsgs > 0 {
if state.Msgs+uint64(len(mset.inflight)) > uint64(maxMsgs) {
err = ErrMaxMsgs
}
} else if maxBytes > 0 {
// TODO(dlc) - Could track this rollup independently.
var bytesPending uint64
Expand All @@ -8173,9 +8193,24 @@ func (mset *stream) processClusteredInboundMsg(subject, reply string, hdr, msg [
if state.Bytes+bytesPending > uint64(maxBytes) {
err = ErrMaxBytes
}
} else if maxMsgsPer > 0 && discardNewPerSubject {
totals := mset.store.SubjectsTotals(subject)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could be very large.. We are getting this each time?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only for wq/interest + discard new + discard new per subject streams.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be very slow based on the shape of the stream. This concerns me. As does tracking inflight.

total := totals[subject]
if (total + mset.inflightSubjects[subject]) > uint64(maxMsgsPer) {
err = ErrMaxMsgsPerSubject
}
}

if err != nil {
delete(mset.inflight, mset.clseq)
n := mset.inflightSubjects[subject]

if n > 1 {
mset.inflightSubjects[subject] = n - 1
} else {
delete(mset.inflightSubjects, subject)
}

mset.clMu.Unlock()
if canRespond {
var resp = &JSPubAckResponse{PubAck: &PubAck{Stream: name}}
Expand Down
150 changes: 120 additions & 30 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,12 @@ func TestJetStreamClusterStreamPlacementDistribution(t *testing.T) {
}

func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {
const (
totalMsgs = 300
maxMsgs = 100
maxBytes = maxMsgs * 100
msgPayloadFormat = "%0100d" // %0100d is 100 bytes. Must match payload value above.
)
c := createJetStreamClusterExplicit(t, "WQ3", 3)
defer c.shutdown()

Expand All @@ -240,66 +246,150 @@ func TestJetStreamClusterSourceWorkingQueueWithLimit(t *testing.T) {
_, err := js.AddStream(&nats.StreamConfig{Name: "test", Subjects: []string{"test"}, Replicas: 3})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: 100, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy,
_, err = js.AddStream(&nats.StreamConfig{Name: "wq", MaxMsgs: maxMsgs, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy,
Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "wq2", MaxBytes: maxBytes, Discard: nats.DiscardNew, Retention: nats.WorkQueuePolicy,
Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3})
require_NoError(t, err)

_, err = js.AddStream(&nats.StreamConfig{Name: "wq3", MaxMsgsPerSubject: maxMsgs, Discard: nats.DiscardNew, DiscardNewPerSubject: true, Retention: nats.WorkQueuePolicy,
Sources: []*nats.StreamSource{{Name: "test"}}, Replicas: 3})
require_NoError(t, err)

sendBatch := func(subject string, n int) {
for i := 0; i < n; i++ {
_, err = js.Publish(subject, []byte(strconv.Itoa(i)))
_, err = js.Publish(subject, []byte(fmt.Sprintf(msgPayloadFormat, i)))
require_NoError(t, err)
}
}

f := func(ss *nats.Subscription, done chan bool) {
for i := 0; i < totalMsgs; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
p, err := strconv.Atoi(string(m[0].Data))
require_NoError(t, err)
require_Equal(t, p, i)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)
}
done <- true
}
// Populate each one.
sendBatch("test", 300)

// Populate the sourced stream.
sendBatch("test", totalMsgs)

checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs != 100 {
return fmt.Errorf("Expected 100 msgs, got state: %+v", si.State)
if si.State.Msgs != maxMsgs {
return fmt.Errorf("expected %d msgs on stream wq, got state: %+v", maxMsgs, si.State)
}
return nil
})

checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq2")
require_NoError(t, err)
if si.State.Bytes > maxBytes {
return fmt.Errorf("expected no more than %d bytes on stream wq2, got state: %+v", maxBytes, si.State)
}
return nil
})

checkFor(t, 3*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq3")
require_NoError(t, err)
if si.State.Msgs != maxMsgs {
return fmt.Errorf("expected %d msgs on stream wq, got state: %+v", maxMsgs, si.State)
}
return nil
})

_, err = js.AddConsumer("wq", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

ss, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc"))
ss1, err := js.PullSubscribe("test", "wqc", nats.Bind("wq", "wqc"))
require_NoError(t, err)
// we must have at least one message on the transformed subject name (ie no timeout)
f := func(done chan bool) {
for i := 0; i < 300; i++ {
m, err := ss.Fetch(1, nats.MaxWait(3*time.Second))
require_NoError(t, err)
p, err := strconv.Atoi(string(m[0].Data))
require_NoError(t, err)
require_Equal(t, p, i)
time.Sleep(11 * time.Millisecond)
err = m[0].Ack()
require_NoError(t, err)

var doneChan1 = make(chan bool)
go f(ss1, doneChan1)

checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
require_NoError(t, err)
if si.State.Msgs > 0 && si.State.Msgs <= maxMsgs {
return fmt.Errorf("expected 0 msgs on stream wq, got: %d", si.State.Msgs)
} else if si.State.Msgs > maxMsgs {
t.Fatalf("got more than our %d message limit on stream wq: %+v", maxMsgs, si.State)
}
done <- true

return nil
})

select {
case <-doneChan1:
ss1.Drain()
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive completion signal")
}

var doneChan = make(chan bool)
go f(doneChan)
_, err = js.AddConsumer("wq2", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

checkFor(t, 6*time.Second, 100*time.Millisecond, func() error {
si, err := js.StreamInfo("wq")
ss2, err := js.PullSubscribe("test", "wqc", nats.Bind("wq2", "wqc"))
require_NoError(t, err)

var doneChan2 = make(chan bool)
go f(ss2, doneChan2)

checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq2")
require_NoError(t, err)
if si.State.Msgs > 0 && si.State.Msgs <= 100 {
return fmt.Errorf("Expected 0 msgs, got: %d", si.State.Msgs)
} else if si.State.Msgs > 100 {
t.Fatalf("Got more than our 100 message limit: %+v", si.State)
if si.State.Bytes > 0 && si.State.Bytes <= maxBytes {
return fmt.Errorf("expected 0 bytes on stream wq2, got: %+v", si.State)
} else if si.State.Bytes > maxBytes {
t.Fatalf("got more than our %d bytes limit on stream wq2: %+v", maxMsgs, si.State)
}

return nil
})

select {
case <-doneChan:
ss.Drain()
case <-time.After(5 * time.Second):
case <-doneChan2:
ss2.Drain()
case <-time.After(20 * time.Second):
t.Fatalf("Did not receive completion signal")
}

_, err = js.AddConsumer("wq3", &nats.ConsumerConfig{Durable: "wqc", FilterSubject: "test", AckPolicy: nats.AckExplicitPolicy})
require_NoError(t, err)

ss3, err := js.PullSubscribe("test", "wqc", nats.Bind("wq3", "wqc"))
require_NoError(t, err)

var doneChan3 = make(chan bool)
go f(ss3, doneChan3)

checkFor(t, 10*time.Second, 250*time.Millisecond, func() error {
si, err := js.StreamInfo("wq3")
require_NoError(t, err)
if si.State.Msgs > 0 && si.State.Msgs <= maxMsgs {
return fmt.Errorf("expected 0 msgs on stream wq3, got: %d", si.State.Msgs)
} else if si.State.Msgs > maxMsgs {
t.Fatalf("got more than our %d message limit on stream wq3: %+v", maxMsgs, si.State)
}

return nil
})

select {
case <-doneChan3:
ss3.Drain()
case <-time.After(10 * time.Second):
t.Fatalf("Did not receive completion signal")
}
}
Expand Down
31 changes: 16 additions & 15 deletions server/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,20 +336,21 @@ type stream struct {

// TODO(dlc) - Hide everything below behind two pointers.
// Clustered mode.
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq and clfs.
clseq uint64 // The current last seq being proposed to the NRG layer.
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
compressOK bool // True if we can do message compression in RAFT and catchup logic
inMonitor bool // True if the monitor routine has been started.
sa *streamAssignment // What the meta controller uses to assign streams to peers.
node RaftNode // Our RAFT node for the stream's group.
catchup atomic.Bool // Used to signal we are in catchup mode.
catchups map[string]uint64 // The number of messages that need to be caught per peer.
syncSub *subscription // Internal subscription for sync messages (on "$JSC.SYNC").
infoSub *subscription // Internal subscription for stream info requests.
clMu sync.Mutex // The mutex for clseq, clfs, inflight and inflightSubjects.
clseq uint64 // The current last seq being proposed to the NRG layer.
clfs uint64 // The count (offset) of the number of failed NRG sequences used to compute clseq.
inflight map[uint64]uint64 // Inflight message sizes per clseq.
inflightSubjects map[string]uint64 // Inflight number of messages per subject.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could get pretty big in terms of in memory state..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assumed that because it's only storing in flight messages, that the size would be 0 most of the time and go maybe up to a few thousands temporarily during message bursts, so still acceptable IMHO.

I did hesitate adding the support for new per subject to the PR because I couldn't think of a good use case off the top of my head (doesn't mean one doesn't exist), but it needed to be handled and I thought it would be better to actually implement it rather than try to prevent it (which is not that easy/clean to do because what you want to forbid specifically is discard new+new per subject+sourcing (and you can add/remove sources at any time)).

lqsent time.Time // The time at which the last lost quorum advisory was sent. Used to rate limit.
uch chan struct{} // The channel to signal updates to the monitor routine.
compressOK bool // True if we can do message compression in RAFT and catchup logic
inMonitor bool // True if the monitor routine has been started.

// Direct get subscription.
directSub *subscription
Expand Down Expand Up @@ -3471,7 +3472,7 @@ func (mset *stream) processInboundSourceMsg(si *sourceInfo, m *inMsg) bool {
// Can happen temporarily all the time during normal operations when the sourcing stream
// is working queue/interest with a limit and discard new.
// TODO - Improve sourcing to WQ with limit and new to use flow control rather than re-creating the consumer.
if errors.Is(err, ErrMaxMsgs) {
if errors.Is(err, ErrMaxMsgs) || errors.Is(err, ErrMaxBytes) || errors.Is(err, ErrMaxMsgsPerSubject) {
// Do not need to do a full retry that includes finding the last sequence in the stream
// for that source. Just re-create starting with the seq we couldn't store instead.
mset.mu.Lock()
Expand Down