fix: correct MaxBytes enforcement and consumer start sequence after purge

StreamManager.Capture now accounts for full message size (subject +
payload + 16-byte overhead) when checking MaxBytes, matching Go's
memStoreMsgSize. PullConsumerEngine uses stream FirstSeq instead of
hardcoded 1 for DeliverAll after purge. Fix 6 tests with Go parity
assertions and updated MaxBytes values.
This commit is contained in:
Joseph Doherty
2026-02-24 23:59:37 -05:00
parent 1f83df12e4
commit 51ebded300
8 changed files with 6404 additions and 24 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -263,7 +263,9 @@ public sealed class PullConsumerEngine
DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq, DeliverPolicy.ByStartSequence when config.OptStartSeq > 0 => config.OptStartSeq,
DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct), DeliverPolicy.ByStartTime when config.OptStartTimeUtc is { } startTime => await ResolveByStartTimeAsync(stream, startTime, ct),
DeliverPolicy.LastPerSubject => await ResolveLastPerSubjectAsync(stream, config, state.LastSeq, ct), DeliverPolicy.LastPerSubject => await ResolveLastPerSubjectAsync(stream, config, state.LastSeq, ct),
_ => 1, // Go: consumer.go — DeliverAll starts from stream's FirstSeq (not always 1).
// After purge, FirstSeq advances past deleted messages.
_ => state.FirstSeq > 0 ? state.FirstSeq : 1,
}; };
} }

View File

@@ -299,8 +299,11 @@ public sealed class StreamManager
PruneExpiredMessages(stream, DateTime.UtcNow); PruneExpiredMessages(stream, DateTime.UtcNow);
// Go: memStoreMsgSize — full message size includes subject + headers + payload + 16 bytes overhead.
var msgSize = subject.Length + payload.Length + 16;
var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); var stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();
if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes) if (stream.Config.MaxBytes > 0 && (long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes)
{ {
if (stream.Config.Discard == DiscardPolicy.New) if (stream.Config.Discard == DiscardPolicy.New)
{ {
@@ -311,7 +314,7 @@ public sealed class StreamManager
}; };
} }
while ((long)stateBefore.Bytes + payload.Length > stream.Config.MaxBytes && stateBefore.FirstSeq > 0) while ((long)stateBefore.Bytes + msgSize > stream.Config.MaxBytes && stateBefore.FirstSeq > 0)
{ {
stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult(); stream.Store.RemoveAsync(stateBefore.FirstSeq, default).GetAwaiter().GetResult();
stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult(); stateBefore = stream.Store.GetStateAsync(default).GetAwaiter().GetResult();

View File

@@ -29,11 +29,14 @@ public class JetStreamStreamCrudTests
[Fact] [Fact]
public async Task Create_stream_with_discard_new_policy() public async Task Create_stream_with_discard_new_policy()
{ {
// Each message size = subject + payload + 16 overhead.
// Two small messages: ("dn.one"=6 + "1"=1 + 16=23) + ("dn.two"=6 + "2"=1 + 16=23) = 46 bytes.
// MaxBytes=50 allows both small messages but rejects the large third one.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{ {
Name = "DN", Name = "DN",
Subjects = ["dn.>"], Subjects = ["dn.>"],
MaxBytes = 30, MaxBytes = 50,
Discard = DiscardPolicy.New, Discard = DiscardPolicy.New,
}); });
@@ -319,15 +322,20 @@ public class JetStreamStreamCrudTests
state.Messages.ShouldBeLessThanOrEqualTo(3UL); state.Messages.ShouldBeLessThanOrEqualTo(3UL);
} }
// Go: TestJetStreamMaxBytesIgnored server/jetstream_test.go // Go: TestJetStreamMaxBytesIgnored server/jetstream_test.go:15778
// Go test uses 10MB limit with 1MB messages, expects 9 messages retained.
// We use smaller sizes but same principle: MaxBytes caps stored bytes,
// accounting for per-message overhead (subject + 16 bytes).
[Fact] [Fact]
public async Task Stream_with_max_bytes_discard_old_evicts_oldest() public async Task Stream_with_max_bytes_discard_old_evicts_oldest()
{ {
// Each stored message = subject("bytes.x"=7) + payload(20) + overhead(16) = 43 bytes.
// MaxBytes=200 holds ~4 messages (4*43=172). Publishing 20 should evict old ones.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{ {
Name = "BYTES", Name = "BYTES",
Subjects = ["bytes.>"], Subjects = ["bytes.>"],
MaxBytes = 100, MaxBytes = 200,
Discard = DiscardPolicy.Old, Discard = DiscardPolicy.Old,
}); });
@@ -335,7 +343,8 @@ public class JetStreamStreamCrudTests
_ = await fx.PublishAndGetAckAsync("bytes.x", $"payload-{i:D10}"); _ = await fx.PublishAndGetAckAsync("bytes.x", $"payload-{i:D10}");
var state = await fx.GetStreamStateAsync("BYTES"); var state = await fx.GetStreamStateAsync("BYTES");
((long)state.Bytes).ShouldBeLessThanOrEqualTo(100L); ((long)state.Bytes).ShouldBeLessThanOrEqualTo(200L);
state.Messages.ShouldBeLessThan(20UL);
} }
// Go: TestJetStreamMaxMsgsPerSubject server/jetstream_test.go // Go: TestJetStreamMaxMsgsPerSubject server/jetstream_test.go

View File

@@ -91,15 +91,17 @@ public class JetStreamStreamEdgeCaseTests
[Fact] [Fact]
public async Task Discard_new_rejects_when_stream_at_max_bytes() public async Task Discard_new_rejects_when_stream_at_max_bytes()
{ {
// Message 1 = "discnew.a"(9) + payload(20) + overhead(16) = 45 bytes.
// MaxBytes=50 holds exactly one message; second is rejected.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{ {
Name = "DISCNEW", Name = "DISCNEW",
Subjects = ["discnew.>"], Subjects = ["discnew.>"],
MaxBytes = 20, MaxBytes = 50,
Discard = DiscardPolicy.New, Discard = DiscardPolicy.New,
}); });
// Fill up the stream with small messages first // Fill up the stream with one message
var ack1 = await fx.PublishAndGetAckAsync("discnew.a", "12345678901234567890"); var ack1 = await fx.PublishAndGetAckAsync("discnew.a", "12345678901234567890");
ack1.ErrorCode.ShouldBeNull(); ack1.ErrorCode.ShouldBeNull();
@@ -159,16 +161,18 @@ public class JetStreamStreamEdgeCaseTests
[Fact] [Fact]
public async Task Max_msgs_with_discard_new_via_bytes_rejects_when_bytes_exceeded() public async Task Max_msgs_with_discard_new_via_bytes_rejects_when_bytes_exceeded()
{ {
// Use MaxBytes + DiscardNew to get the rejection path (pre-store check in Capture()) // Use MaxBytes + DiscardNew to get the rejection path (pre-store check in Capture()).
// Message = "maxnew.a"(8) + "1234567890"(10) + overhead(16) = 34 bytes.
// MaxBytes=40 holds exactly one message; second is rejected.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{ {
Name = "MAXNEW", Name = "MAXNEW",
Subjects = ["maxnew.>"], Subjects = ["maxnew.>"],
MaxBytes = 10, MaxBytes = 40,
Discard = DiscardPolicy.New, Discard = DiscardPolicy.New,
}); });
_ = await fx.PublishAndGetAckAsync("maxnew.a", "1234567890"); // 10 bytes, fills stream _ = await fx.PublishAndGetAckAsync("maxnew.a", "1234567890"); // 34 bytes with overhead
var rejected = await fx.PublishAndGetAckAsync("maxnew.c", "extra-data-overflows"); var rejected = await fx.PublishAndGetAckAsync("maxnew.c", "extra-data-overflows");
rejected.ErrorCode.ShouldNotBeNull(); rejected.ErrorCode.ShouldNotBeNull();

View File

@@ -280,14 +280,17 @@ public class JetStreamStreamFeatureTests
restore.Error.ShouldNotBeNull(); restore.Error.ShouldNotBeNull();
} }
// Go: TestJetStreamMirrorUpdatePreventsSubjects server/jetstream_test.go // Go: TestJetStreamMirrorUpdatePreventsSubjects server/jetstream_test.go:9412
// Mirror streams cannot have subjects — the Go test verifies that attempting
// to update a mirror with subjects returns an error.
[Fact] [Fact]
public async Task Mirror_stream_has_its_own_subjects() public async Task Mirror_stream_cannot_have_subjects()
{ {
await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync(); await using var fx = await JetStreamApiFixture.StartWithMirrorSetupAsync();
// Mirror streams should have empty subjects (Go: "stream mirrors can not contain subjects")
var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}"); var info = await fx.RequestLocalAsync("$JS.API.STREAM.INFO.ORDERS_MIRROR", "{}");
info.StreamInfo!.Config.Subjects.ShouldContain("orders.mirror.*"); info.StreamInfo!.Config.Subjects.ShouldBeEmpty();
} }
// Go: TestJetStreamStreamSubjectsOverlap server/jetstream_test.go // Go: TestJetStreamStreamSubjectsOverlap server/jetstream_test.go

View File

@@ -49,11 +49,13 @@ public class StorageRetentionTests
[Fact] [Fact]
public async Task Max_bytes_limit_enforced() public async Task Max_bytes_limit_enforced()
{ {
// Each payload is 100 bytes. Set MaxBytes to hold exactly 5 messages. // Go: memStoreMsgSize = subject.Length + headers.Length + data.Length + 16
// Each message = "byteslimit.foo"(14) + payload(100) + overhead(16) = 130 bytes.
var payload = new byte[100]; var payload = new byte[100];
const int payloadSize = 100; const string subject = "byteslimit.foo";
const int msgSize = 14 + 100 + 16; // 130
const int maxCapacity = 5; const int maxCapacity = 5;
var maxBytes = (long)(payloadSize * maxCapacity); var maxBytes = (long)(msgSize * maxCapacity); // 650
var manager = new StreamManager(); var manager = new StreamManager();
manager.CreateOrUpdate(new StreamConfig manager.CreateOrUpdate(new StreamConfig
@@ -66,16 +68,16 @@ public class StorageRetentionTests
// Store exactly maxCapacity messages — should all fit. // Store exactly maxCapacity messages — should all fit.
for (var i = 0; i < maxCapacity; i++) for (var i = 0; i < maxCapacity; i++)
manager.Capture("byteslimit.foo", payload); manager.Capture(subject, payload);
manager.TryGet("BYTESLIMIT", out var handle).ShouldBeTrue(); manager.TryGet("BYTESLIMIT", out var handle).ShouldBeTrue();
var stateAtCapacity = await handle.Store.GetStateAsync(default); var stateAtCapacity = await handle.Store.GetStateAsync(default);
stateAtCapacity.Messages.ShouldBe((ulong)maxCapacity); stateAtCapacity.Messages.ShouldBe((ulong)maxCapacity);
stateAtCapacity.Bytes.ShouldBe((ulong)(payloadSize * maxCapacity)); stateAtCapacity.Bytes.ShouldBe((ulong)(msgSize * maxCapacity));
// Store 5 more — each one should displace an old message. // Store 5 more — each one should displace an old message.
for (var i = 0; i < maxCapacity; i++) for (var i = 0; i < maxCapacity; i++)
manager.Capture("byteslimit.foo", payload); manager.Capture(subject, payload);
var stateFinal = await handle.Store.GetStateAsync(default); var stateFinal = await handle.Store.GetStateAsync(default);
stateFinal.Messages.ShouldBe((ulong)maxCapacity); stateFinal.Messages.ShouldBe((ulong)maxCapacity);

View File

@@ -7,11 +7,13 @@ public class JetStreamStreamPolicyRuntimeTests
[Fact] [Fact]
public async Task Discard_new_rejects_publish_when_max_bytes_exceeded() public async Task Discard_new_rejects_publish_when_max_bytes_exceeded()
{ {
// Each message = subject("s.a"=3) + payload(2) + overhead(16) = 21 bytes.
// MaxBytes=25 holds exactly one message; second publish is rejected.
await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig await using var fx = await JetStreamApiFixture.StartWithStreamConfigAsync(new StreamConfig
{ {
Name = "S", Name = "S",
Subjects = ["s.*"], Subjects = ["s.*"],
MaxBytes = 2, MaxBytes = 25,
Discard = DiscardPolicy.New, Discard = DiscardPolicy.New,
}); });