From 6825839191a5ebd6f17a4357553a99b92bd0a0df Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Mon, 23 Feb 2026 06:05:01 -0500 Subject: [PATCH] feat: add jetstream publish preconditions and dedupe --- .../JetStream/Publish/JetStreamPublisher.cs | 16 +++++++++++- .../JetStream/Publish/PublishPreconditions.cs | 25 +++++++++++++++++++ .../NATS.Server.Tests/JetStreamApiFixture.cs | 9 ++++--- .../JetStreamPublishPreconditionTests.cs | 15 +++++++++++ 4 files changed, 61 insertions(+), 4 deletions(-) create mode 100644 src/NATS.Server/JetStream/Publish/PublishPreconditions.cs create mode 100644 tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs diff --git a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs index c8ba4e5..d00d087 100644 --- a/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs +++ b/src/NATS.Server/JetStream/Publish/JetStreamPublisher.cs @@ -3,6 +3,7 @@ namespace NATS.Server.JetStream.Publish; public sealed class JetStreamPublisher { private readonly StreamManager _streamManager; + private readonly PublishPreconditions _preconditions = new(); public JetStreamPublisher(StreamManager streamManager) { @@ -10,7 +11,20 @@ public sealed class JetStreamPublisher } public bool TryCapture(string subject, ReadOnlyMemory payload, out PubAck ack) + => TryCapture(subject, payload, null, out ack); + + public bool TryCapture(string subject, ReadOnlyMemory payload, string? msgId, out PubAck ack) { + if (_preconditions.IsDuplicate(msgId, out var existingSequence)) + { + ack = new PubAck + { + Seq = existingSequence, + ErrorCode = 10071, + }; + return true; + } + var captured = _streamManager.Capture(subject, payload); if (captured == null) { @@ -19,7 +33,7 @@ public sealed class JetStreamPublisher } ack = captured; - + _preconditions.Record(msgId, ack.Seq); return true; } } diff --git a/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs new file mode 100644 index 0000000..4ae05f9 --- /dev/null +++ b/src/NATS.Server/JetStream/Publish/PublishPreconditions.cs @@ -0,0 +1,25 @@ +using System.Collections.Concurrent; + +namespace NATS.Server.JetStream.Publish; + +public sealed class PublishPreconditions +{ + private readonly ConcurrentDictionary _dedupe = new(StringComparer.Ordinal); + + public bool IsDuplicate(string? msgId, out ulong existingSequence) + { + existingSequence = 0; + if (string.IsNullOrEmpty(msgId)) + return false; + + return _dedupe.TryGetValue(msgId, out existingSequence); + } + + public void Record(string? msgId, ulong sequence) + { + if (string.IsNullOrEmpty(msgId)) + return; + + _dedupe[msgId] = sequence; + } +} diff --git a/tests/NATS.Server.Tests/JetStreamApiFixture.cs b/tests/NATS.Server.Tests/JetStreamApiFixture.cs index ee2281c..7ae5588 100644 --- a/tests/NATS.Server.Tests/JetStreamApiFixture.cs +++ b/tests/NATS.Server.Tests/JetStreamApiFixture.cs @@ -35,12 +35,15 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable return fixture; } - public Task PublishAndGetAckAsync(string subject, string payload) + public Task PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false) { - if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), out var ack)) + if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack)) return Task.FromResult(ack); - return Task.FromResult(new PubAck { ErrorCode = 404 }); + if (expectError) + return Task.FromResult(new PubAck { ErrorCode = 404 }); + + throw new InvalidOperationException($"No stream matched subject '{subject}'."); } public Task RequestLocalAsync(string subject, string payload) diff --git a/tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs b/tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs new file mode 100644 index 0000000..1ea6db5 --- /dev/null +++ b/tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs @@ -0,0 +1,15 @@ +namespace NATS.Server.Tests; + +public class JetStreamPublishPreconditionTests +{ + [Fact] + public async Task Duplicate_msg_id_is_rejected_with_expected_error() + { + await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("D", "d.*"); + + await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1"); + var second = await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1", expectError: true); + + second.ErrorCode.ShouldBe(10071); + } +}