feat: add jetstream publish preconditions and dedupe
This commit is contained in:
@@ -3,6 +3,7 @@ namespace NATS.Server.JetStream.Publish;
|
|||||||
public sealed class JetStreamPublisher
|
public sealed class JetStreamPublisher
|
||||||
{
|
{
|
||||||
private readonly StreamManager _streamManager;
|
private readonly StreamManager _streamManager;
|
||||||
|
private readonly PublishPreconditions _preconditions = new();
|
||||||
|
|
||||||
public JetStreamPublisher(StreamManager streamManager)
|
public JetStreamPublisher(StreamManager streamManager)
|
||||||
{
|
{
|
||||||
@@ -10,7 +11,20 @@ public sealed class JetStreamPublisher
|
|||||||
}
|
}
|
||||||
|
|
||||||
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, out PubAck ack)
|
||||||
|
=> TryCapture(subject, payload, null, out ack);
|
||||||
|
|
||||||
|
public bool TryCapture(string subject, ReadOnlyMemory<byte> payload, string? msgId, out PubAck ack)
|
||||||
{
|
{
|
||||||
|
if (_preconditions.IsDuplicate(msgId, out var existingSequence))
|
||||||
|
{
|
||||||
|
ack = new PubAck
|
||||||
|
{
|
||||||
|
Seq = existingSequence,
|
||||||
|
ErrorCode = 10071,
|
||||||
|
};
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
var captured = _streamManager.Capture(subject, payload);
|
var captured = _streamManager.Capture(subject, payload);
|
||||||
if (captured == null)
|
if (captured == null)
|
||||||
{
|
{
|
||||||
@@ -19,7 +33,7 @@ public sealed class JetStreamPublisher
|
|||||||
}
|
}
|
||||||
|
|
||||||
ack = captured;
|
ack = captured;
|
||||||
|
_preconditions.Record(msgId, ack.Seq);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
25
src/NATS.Server/JetStream/Publish/PublishPreconditions.cs
Normal file
25
src/NATS.Server/JetStream/Publish/PublishPreconditions.cs
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
|
namespace NATS.Server.JetStream.Publish;
|
||||||
|
|
||||||
|
public sealed class PublishPreconditions
|
||||||
|
{
|
||||||
|
private readonly ConcurrentDictionary<string, ulong> _dedupe = new(StringComparer.Ordinal);
|
||||||
|
|
||||||
|
public bool IsDuplicate(string? msgId, out ulong existingSequence)
|
||||||
|
{
|
||||||
|
existingSequence = 0;
|
||||||
|
if (string.IsNullOrEmpty(msgId))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return _dedupe.TryGetValue(msgId, out existingSequence);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Record(string? msgId, ulong sequence)
|
||||||
|
{
|
||||||
|
if (string.IsNullOrEmpty(msgId))
|
||||||
|
return;
|
||||||
|
|
||||||
|
_dedupe[msgId] = sequence;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -35,12 +35,15 @@ internal sealed class JetStreamApiFixture : IAsyncDisposable
|
|||||||
return fixture;
|
return fixture;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload)
|
public Task<PubAck> PublishAndGetAckAsync(string subject, string payload, string? msgId = null, bool expectError = false)
|
||||||
{
|
{
|
||||||
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), out var ack))
|
if (_publisher.TryCapture(subject, Encoding.UTF8.GetBytes(payload), msgId, out var ack))
|
||||||
return Task.FromResult(ack);
|
return Task.FromResult(ack);
|
||||||
|
|
||||||
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
if (expectError)
|
||||||
|
return Task.FromResult(new PubAck { ErrorCode = 404 });
|
||||||
|
|
||||||
|
throw new InvalidOperationException($"No stream matched subject '{subject}'.");
|
||||||
}
|
}
|
||||||
|
|
||||||
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
public Task<JetStreamApiResponse> RequestLocalAsync(string subject, string payload)
|
||||||
|
|||||||
15
tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs
Normal file
15
tests/NATS.Server.Tests/JetStreamPublishPreconditionTests.cs
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
namespace NATS.Server.Tests;
|
||||||
|
|
||||||
|
public class JetStreamPublishPreconditionTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public async Task Duplicate_msg_id_is_rejected_with_expected_error()
|
||||||
|
{
|
||||||
|
await using var fixture = await JetStreamApiFixture.StartWithStreamAsync("D", "d.*");
|
||||||
|
|
||||||
|
await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1");
|
||||||
|
var second = await fixture.PublishAndGetAckAsync("d.a", "x", msgId: "id-1", expectError: true);
|
||||||
|
|
||||||
|
second.ErrorCode.ShouldBe(10071);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user