diff --git a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs index 0c8c669..da1be16 100644 --- a/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs +++ b/src/NATS.Server/JetStream/MirrorSource/SourceCoordinator.cs @@ -18,6 +18,13 @@ public sealed class SourceCoordinator public async Task OnOriginAppendAsync(StoredMessage message, CancellationToken ct) { + if (!string.IsNullOrWhiteSpace(_sourceConfig.SourceAccount) + && !string.IsNullOrWhiteSpace(message.Account) + && !string.Equals(_sourceConfig.SourceAccount, message.Account, StringComparison.Ordinal)) + { + return; + } + var subject = message.Subject; if (!string.IsNullOrWhiteSpace(_sourceConfig.SubjectTransformPrefix)) subject = $"{_sourceConfig.SubjectTransformPrefix}{subject}"; diff --git a/src/NATS.Server/JetStream/Storage/StoredMessage.cs b/src/NATS.Server/JetStream/Storage/StoredMessage.cs index b40f873..47d87b7 100644 --- a/src/NATS.Server/JetStream/Storage/StoredMessage.cs +++ b/src/NATS.Server/JetStream/Storage/StoredMessage.cs @@ -6,5 +6,6 @@ public sealed class StoredMessage public string Subject { get; init; } = string.Empty; public ReadOnlyMemory Payload { get; init; } public DateTime TimestampUtc { get; init; } = DateTime.UtcNow; + public string? Account { get; init; } public bool Redelivered { get; init; } } diff --git a/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs b/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs new file mode 100644 index 0000000..dd58e9a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/JetStreamMirrorSourceStrictRuntimeTests.cs @@ -0,0 +1,39 @@ +using NATS.Server.JetStream.MirrorSource; +using NATS.Server.JetStream.Models; +using NATS.Server.JetStream.Storage; + +namespace NATS.Server.Tests.JetStream; + +public class JetStreamMirrorSourceStrictRuntimeTests +{ + [Fact] + public async Task Mirror_source_transform_and_cross_account_filters_follow_runtime_contract() + { + var sourceTarget = new MemStore(); + var source = new SourceCoordinator(sourceTarget, new StreamSourceConfig + { + Name = "SRC", + SubjectTransformPrefix = "agg.", + SourceAccount = "A", + }); + + await source.OnOriginAppendAsync(new StoredMessage + { + Sequence = 1, + Subject = "orders.created", + Payload = "ok"u8.ToArray(), + Account = "A", + }, default); + await source.OnOriginAppendAsync(new StoredMessage + { + Sequence = 2, + Subject = "orders.created", + Payload = "skip"u8.ToArray(), + Account = "B", + }, default); + + var state = await sourceTarget.GetStateAsync(default); + state.Messages.ShouldBe((ulong)1); + (await sourceTarget.LoadAsync(1, default))!.Subject.ShouldBe("agg.orders.created"); + } +}