Ports 34 Go FileStore tests from filestore_test.go to FileStoreRecovery2Tests.cs (31 pass, 4 skipped). Tests cover block recovery, compaction, PSIM indexing, skip-msg handling, TTL expiry, corrupt index/state detection, and read-only permission checks. Updates docs/test_parity.db with mapped/skipped status for all 34 tests.
826 lines
32 KiB
C#
826 lines
32 KiB
C#
using NATS.Server.JetStream;
|
|
using NATS.Server.JetStream.MirrorSource;
|
|
using NATS.Server.JetStream.Models;
|
|
using NATS.Server.JetStream.Storage;
|
|
|
|
namespace NATS.Server.Tests.JetStream;
|
|
|
|
// Go reference: server/jetstream_test.go — Mirror, Source & Transform parity tests
|
|
// Each test documents the Go function name and line number it ports.
|
|
|
|
public class MirrorSourceGoParityTests
|
|
{
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: basic sync
|
|
// Go reference: TestJetStreamMirrorStripExpectedHeaders — jetstream_test.go:9361
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: TestJetStreamMirrorStripExpectedHeaders — mirror receives messages published
|
|
// to the origin. In .NET we verify the basic in-process mirror sync path:
|
|
// publish to origin → stored in mirror via RebuildReplicationCoordinators.
|
|
public async Task Mirror_syncs_messages_from_origin_through_stream_manager()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] });
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "S" });
|
|
|
|
var ack = mgr.Capture("foo", "hello"u8.ToArray());
|
|
ack.ShouldNotBeNull();
|
|
ack!.ErrorCode.ShouldBeNull();
|
|
ack.Seq.ShouldBe(1UL);
|
|
|
|
var state = await mgr.GetStateAsync("M", default);
|
|
state.Messages.ShouldBe(1UL);
|
|
|
|
var msg = mgr.GetMessage("M", 1);
|
|
msg.ShouldNotBeNull();
|
|
msg!.Subject.ShouldBe("foo");
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: promote (updates) and FirstSeq validation
|
|
// Go reference: TestJetStreamMirrorUpdatesNotSupported — jetstream_test.go:14127
|
|
// Go reference: TestJetStreamMirrorFirstSeqNotSupported — jetstream_test.go:14150
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: mirror can be promoted by updating it with Mirror = null and adding subjects.
|
|
public void Mirror_can_be_promoted_by_removing_mirror_field()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" });
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "SOURCE" });
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = null,
|
|
Subjects = ["m.>"],
|
|
});
|
|
|
|
result.Error.ShouldBeNull();
|
|
result.StreamInfo.ShouldNotBeNull();
|
|
result.StreamInfo!.Config.Mirror.ShouldBeNull();
|
|
}
|
|
|
|
[Fact]
|
|
// Go: TestJetStreamMirrorFirstSeqNotSupported — mirror + FirstSeq is invalid.
|
|
// Reference: server/stream.go:1028-1031 (NewJSMirrorWithFirstSeqError)
|
|
public void Mirror_with_first_seq_is_rejected()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "SOURCE" });
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = "SOURCE",
|
|
FirstSeq = 123,
|
|
});
|
|
|
|
result.Error.ShouldNotBeNull();
|
|
result.Error!.Description.ShouldContain("first sequence");
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: clipping OptStartSeq to origin bounds
|
|
// Go reference: TestJetStreamMirroringClipStartSeq — jetstream_test.go:18203
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: when OptStartSeq for a mirror exceeds the origin's last sequence the mirror
|
|
// coordinator should still sync all available messages without crashing.
|
|
public async Task Mirror_coordinator_clips_start_seq_beyond_origin_end()
|
|
{
|
|
var origin = new MemStore();
|
|
var target = new MemStore();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
|
|
|
|
await using var mirror = new MirrorCoordinator(target);
|
|
mirror.StartPullSyncLoop(origin);
|
|
|
|
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 10, TimeSpan.FromSeconds(5));
|
|
|
|
mirror.LastOriginSequence.ShouldBe(10UL);
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: AllowMsgTtl accepted; SubjectDeleteMarkerTtl + Mirror rejected
|
|
// Go reference: TestJetStreamMessageTTLWhenMirroring — jetstream_test.go:18753
|
|
// Go reference: TestJetStreamSubjectDeleteMarkersWithMirror — jetstream_test.go:19052
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: mirror can have AllowMsgTtl=true (per-message TTL is forwarded to mirror).
|
|
public void Mirror_with_allow_msg_ttl_is_accepted()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" });
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = "ORIGIN",
|
|
AllowMsgTtl = true,
|
|
});
|
|
|
|
result.Error.ShouldBeNull();
|
|
result.StreamInfo!.Config.AllowMsgTtl.ShouldBeTrue();
|
|
}
|
|
|
|
[Fact]
|
|
// Go: SubjectDeleteMarkerTtlMs + Mirror is invalid.
|
|
// Reference: server/stream.go:1050-1053
|
|
public void Mirror_with_subject_delete_marker_ttl_is_rejected()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "ORIGIN" });
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = "ORIGIN",
|
|
SubjectDeleteMarkerTtlMs = 5000,
|
|
});
|
|
|
|
result.Error.ShouldNotBeNull();
|
|
result.Error!.Description.ShouldContain("subject delete marker TTL");
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: promote after deleting origin
|
|
// Go reference: TestJetStreamPromoteMirrorDeletingOrigin — jetstream_test.go:21462
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: after origin is deleted the subject conflict goes away so the mirror can be
|
|
// promoted to a regular stream with those subjects.
|
|
public void Promote_mirror_succeeds_after_deleting_conflicting_origin()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] });
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" });
|
|
|
|
mgr.Delete("O");
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = null,
|
|
Subjects = ["foo"],
|
|
});
|
|
|
|
result.Error.ShouldBeNull();
|
|
result.StreamInfo!.Config.Mirror.ShouldBeNull();
|
|
result.StreamInfo.Config.Subjects.ShouldContain("foo");
|
|
}
|
|
|
|
[Fact]
|
|
// Go: after promoting the mirror, new publishes to the promoted stream work.
|
|
// Go reference: TestJetStreamPromoteMirrorUpdatingOrigin — jetstream_test.go:21550
|
|
public void Promote_mirror_allows_new_publishes()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "O", Subjects = ["foo"] });
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "M", Mirror = "O" });
|
|
|
|
mgr.Capture("foo", "msg1"u8.ToArray());
|
|
|
|
mgr.Delete("O");
|
|
mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Mirror = null,
|
|
Subjects = ["foo"],
|
|
});
|
|
|
|
var ack = mgr.Capture("foo", "msg2"u8.ToArray());
|
|
ack.ShouldNotBeNull();
|
|
ack!.ErrorCode.ShouldBeNull();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror / Source: AllowMsgSchedules incompatibility
|
|
// Go reference: TestJetStreamScheduledMirrorOrSource — jetstream_test.go:21643
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: NewJSMirrorWithMsgSchedulesError — mirror + AllowMsgSchedules is invalid.
|
|
// Reference: server/stream.go:1040-1046
|
|
public void Mirror_with_allow_msg_schedules_is_rejected()
|
|
{
|
|
var mgr = new StreamManager();
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Mirror = "M",
|
|
AllowMsgSchedules = true,
|
|
});
|
|
|
|
result.Error.ShouldNotBeNull();
|
|
result.Error!.Description.ShouldContain("message schedules");
|
|
}
|
|
|
|
[Fact]
|
|
// Go: NewJSSourceWithMsgSchedulesError — sources + AllowMsgSchedules is invalid.
|
|
// Reference: server/stream.go:1040-1046
|
|
public void Source_with_allow_msg_schedules_is_rejected()
|
|
{
|
|
var mgr = new StreamManager();
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Sources = [new StreamSourceConfig { Name = "S" }],
|
|
AllowMsgSchedules = true,
|
|
});
|
|
|
|
result.Error.ShouldNotBeNull();
|
|
result.Error!.Description.ShouldContain("message schedules");
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Mirror: normalization strips subjects (recovery from bad config)
|
|
// Go reference: TestJetStreamRecoverBadMirrorConfigWithSubjects — jetstream_test.go:11255
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: mirror streams must not carry subject lists — they inherit subjects from origin.
|
|
// When recovering a bad mirror config that has subjects, the server clears them.
|
|
// Reference: server/stream.go:1020-1025 (clearMirrorSubjects recovery path)
|
|
public void Mirror_stream_subjects_are_cleared_on_creation()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "S", Subjects = ["foo"] });
|
|
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "M",
|
|
Subjects = ["foo", "bar", "baz"],
|
|
Mirror = "S",
|
|
});
|
|
|
|
result.Error.ShouldBeNull();
|
|
result.StreamInfo!.Config.Subjects.ShouldBeEmpty();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Source: work queue with limit
|
|
// Go reference: TestJetStreamSourceWorkingQueueWithLimit — jetstream_test.go:9677
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: sourcing into a WorkQueue-retention stream with MaxMsgs limit.
|
|
// We verify the source coordinator applies the subject filter.
|
|
public async Task Source_work_queue_with_limit_retains_filtered_messages()
|
|
{
|
|
var origin = new MemStore();
|
|
var target = new MemStore();
|
|
|
|
for (var i = 1; i <= 20; i++)
|
|
await origin.AppendAsync($"orders.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
|
|
|
|
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" };
|
|
await using var source = new SourceCoordinator(target, sourceCfg);
|
|
source.StartPullSyncLoop(origin);
|
|
|
|
await WaitForConditionAsync(() => source.LastOriginSequence >= 20, TimeSpan.FromSeconds(5));
|
|
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(20UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Source: from KV bucket stream
|
|
// Go reference: TestJetStreamStreamSourceFromKV — jetstream_test.go:9749
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: sourcing from a KV bucket stream (streams with $KV.<bucket>.> subjects).
|
|
public async Task Source_from_kv_bucket_stream_pulls_key_value_messages()
|
|
{
|
|
var kvOrigin = new MemStore();
|
|
|
|
await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1"u8.ToArray(), default);
|
|
await kvOrigin.AppendAsync("$KV.BUCKET.key2", "val2"u8.ToArray(), default);
|
|
await kvOrigin.AppendAsync("$KV.BUCKET.key1", "val1-updated"u8.ToArray(), default);
|
|
|
|
var target = new MemStore();
|
|
var sourceCfg = new StreamSourceConfig { Name = "KV_BUCKET", FilterSubject = "$KV.BUCKET.>" };
|
|
await using var source = new SourceCoordinator(target, sourceCfg);
|
|
source.StartPullSyncLoop(kvOrigin);
|
|
|
|
await WaitForConditionAsync(() => source.LastOriginSequence >= 3, TimeSpan.FromSeconds(5));
|
|
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(3UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Source: removal and re-add
|
|
// Go reference: TestJetStreamSourceRemovalAndReAdd — jetstream_test.go:17931
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: removing a source from a stream stops new messages being forwarded;
|
|
// re-adding the source makes new messages flow again.
|
|
public async Task Source_removal_and_readd_resumes_forwarding()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "SRC", Subjects = ["foo.*"] });
|
|
mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = [],
|
|
Sources = [new StreamSourceConfig { Name = "SRC" }],
|
|
});
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"));
|
|
|
|
var stateAfterFirst = await mgr.GetStateAsync("TEST", default);
|
|
stateAfterFirst.Messages.ShouldBe(10UL);
|
|
|
|
// Remove source — rebuild coordinators removes the source link
|
|
mgr.CreateOrUpdate(new StreamConfig { Name = "TEST", Subjects = [] });
|
|
|
|
for (var i = 10; i < 20; i++)
|
|
mgr.Capture($"foo.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"));
|
|
|
|
var stateAfterRemove = await mgr.GetStateAsync("TEST", default);
|
|
stateAfterRemove.Messages.ShouldBe(10UL);
|
|
|
|
// Re-add source — new messages flow again
|
|
mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "TEST",
|
|
Subjects = [],
|
|
Sources = [new StreamSourceConfig { Name = "SRC" }],
|
|
});
|
|
|
|
mgr.Capture("foo.99", "new"u8.ToArray());
|
|
|
|
var stateAfterReadd = await mgr.GetStateAsync("TEST", default);
|
|
stateAfterReadd.Messages.ShouldBe(11UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Source: clipping OptStartSeq
|
|
// Go reference: TestJetStreamSourcingClipStartSeq — jetstream_test.go:18160
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: when OptStartSeq for a source exceeds the origin's last sequence, the source
|
|
// coordinator starts from the origin's end without crashing.
|
|
public async Task Source_coordinator_clips_start_seq_beyond_origin_end()
|
|
{
|
|
var origin = new MemStore();
|
|
var target = new MemStore();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
await origin.AppendAsync($"test.{i}", System.Text.Encoding.UTF8.GetBytes($"p{i}"), default);
|
|
|
|
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN" };
|
|
await using var source = new SourceCoordinator(target, sourceCfg);
|
|
source.StartPullSyncLoop(origin);
|
|
|
|
await WaitForConditionAsync(() => source.LastOriginSequence >= 10, TimeSpan.FromSeconds(5));
|
|
|
|
source.LastOriginSequence.ShouldBe(10UL);
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Input subject transform
|
|
// Go reference: TestJetStreamInputTransform — jetstream_test.go:9803
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: TestJetStreamInputTransform — when a stream has SubjectTransform configured,
|
|
// messages published to the original subject are stored under the transformed subject.
|
|
// E.g., source=">" dest="transformed.>" → "foo" stored as "transformed.foo".
|
|
public async Task Input_transform_stores_message_under_transformed_subject()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "T1",
|
|
Subjects = ["foo"],
|
|
SubjectTransformSource = ">",
|
|
SubjectTransformDest = "transformed.>",
|
|
});
|
|
|
|
var ack = mgr.Capture("foo", "OK"u8.ToArray());
|
|
ack.ShouldNotBeNull();
|
|
ack!.ErrorCode.ShouldBeNull();
|
|
|
|
var msg = mgr.GetMessage("T1", 1);
|
|
msg.ShouldNotBeNull();
|
|
msg!.Subject.ShouldBe("transformed.foo");
|
|
|
|
_ = await mgr.GetStateAsync("T1", default);
|
|
}
|
|
|
|
[Fact]
|
|
// Go: TestJetStreamImplicitRePublishAfterSubjectTransform — jetstream_test.go:22180
|
|
// After input transform the stored subject is the transformed one.
|
|
public async Task Input_transform_followed_by_correct_stored_subject()
|
|
{
|
|
var mgr = new StreamManager();
|
|
mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "T2",
|
|
Subjects = ["events.>"],
|
|
SubjectTransformSource = "events.>",
|
|
SubjectTransformDest = "stored.>",
|
|
});
|
|
|
|
mgr.Capture("events.login", "data"u8.ToArray());
|
|
|
|
var msg = mgr.GetMessage("T2", 1);
|
|
msg.ShouldNotBeNull();
|
|
msg!.Subject.ShouldBe("stored.login");
|
|
|
|
var state = await mgr.GetStateAsync("T2", default);
|
|
state.Messages.ShouldBe(1UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: cycle detection
|
|
// Go reference: TestJetStreamStreamRepublishCycle — jetstream_test.go:13230
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: stream configuration for republish destination must not form a cycle.
|
|
// Reference: server/stream.go:1060-1080 (checkRePublish)
|
|
public void Republish_cycle_detection_rejects_cyclic_destination()
|
|
{
|
|
var mgr = new StreamManager();
|
|
|
|
// Case 1: source=foo.> dest=foo.> — exact cycle
|
|
var result1 = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "RPC1",
|
|
Subjects = ["foo.>", "bar.*", "baz"],
|
|
RePublishSource = "foo.>",
|
|
RePublishDest = "foo.>",
|
|
});
|
|
result1.Error.ShouldNotBeNull();
|
|
result1.Error!.Description.ShouldContain("cycle");
|
|
|
|
// Case 2: dest=foo.bar matches foo.> stream subject → cycle
|
|
var result2 = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "RPC2",
|
|
Subjects = ["foo.>", "bar.*", "baz"],
|
|
RePublishSource = "bar.bar",
|
|
RePublishDest = "foo.bar",
|
|
});
|
|
result2.Error.ShouldNotBeNull();
|
|
|
|
// Case 3: dest=bar.bar matches bar.* stream subject → cycle
|
|
var result3 = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "RPC3",
|
|
Subjects = ["foo.>", "bar.*", "baz"],
|
|
RePublishSource = "baz",
|
|
RePublishDest = "bar.bar",
|
|
});
|
|
result3.Error.ShouldNotBeNull();
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: single-token match
|
|
// Go reference: TestJetStreamStreamRepublishOneTokenMatch — jetstream_test.go:13283
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: when RePublishSource="one" and dest="uno", messages captured to "one" are stored.
|
|
public async Task Republish_single_token_match_accepted_and_captures()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "Stream1",
|
|
Subjects = ["one", "four"],
|
|
RePublishSource = "one",
|
|
RePublishDest = "uno",
|
|
RePublishHeadersOnly = false,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture("one", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("Stream1", default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: multi-token match
|
|
// Go reference: TestJetStreamStreamRepublishMultiTokenMatch — jetstream_test.go:13325
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: RePublishSource="one.two.>" dest="uno.dos.>" — captures work for "one.two.three".
|
|
public async Task Republish_multi_token_match_accepted_and_captures()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "Stream1",
|
|
Subjects = ["one.>", "four.>"],
|
|
RePublishSource = "one.two.>",
|
|
RePublishDest = "uno.dos.>",
|
|
RePublishHeadersOnly = false,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture("one.two.three", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("Stream1", default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: any-subject match (empty source filter)
|
|
// Go reference: TestJetStreamStreamRepublishAnySubjectMatch — jetstream_test.go:13367
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: when RePublishSource is null/empty all subjects are republished.
|
|
public async Task Republish_any_subject_match_accepted_when_source_is_empty()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "Stream1",
|
|
Subjects = ["one.>", "four.>"],
|
|
RePublishSource = null,
|
|
RePublishDest = "any.>",
|
|
RePublishHeadersOnly = false,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
|
|
mgr.Capture("one.two.three", "msg"u8.ToArray());
|
|
mgr.Capture("four.five.six", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("Stream1", default);
|
|
state.Messages.ShouldBe(2UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: multi-token no match
|
|
// Go reference: TestJetStreamStreamRepublishMultiTokenNoMatch — jetstream_test.go:13408
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: publishing to "four.five.six" when filter is "one.two.>" should NOT
|
|
// trigger republish — message is still stored normally in stream.
|
|
public async Task Republish_multi_token_no_match_still_captures_to_stream()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "Stream1",
|
|
Subjects = ["one.>", "four.>"],
|
|
RePublishSource = "one.two.>",
|
|
RePublishDest = "uno.dos.>",
|
|
RePublishHeadersOnly = true,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture("four.five.six", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("Stream1", default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: single-token no match
|
|
// Go reference: TestJetStreamStreamRepublishOneTokenNoMatch — jetstream_test.go:13445
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: publishing to "four" when source="one" should not trigger republish.
|
|
// Message is still stored in stream normally.
|
|
public async Task Republish_single_token_no_match_still_captures_to_stream()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "Stream1",
|
|
Subjects = ["one", "four"],
|
|
RePublishSource = "one",
|
|
RePublishDest = "uno",
|
|
RePublishHeadersOnly = true,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture("four", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("Stream1", default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Republish: headers-only config
|
|
// Go reference: TestJetStreamStreamRepublishHeadersOnly — jetstream_test.go:13482
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Go: with RePublishHeadersOnly=true config is accepted; body omission is external.
|
|
public async Task Republish_headers_only_config_accepted_and_captures()
|
|
{
|
|
var mgr = new StreamManager();
|
|
var result = mgr.CreateOrUpdate(new StreamConfig
|
|
{
|
|
Name = "RPC",
|
|
Subjects = ["foo", "bar", "baz"],
|
|
RePublishDest = "RP.>",
|
|
RePublishHeadersOnly = true,
|
|
});
|
|
result.Error.ShouldBeNull();
|
|
result.StreamInfo!.Config.RePublishHeadersOnly.ShouldBeTrue();
|
|
|
|
for (var i = 0; i < 10; i++)
|
|
mgr.Capture("foo", "msg"u8.ToArray());
|
|
|
|
var state = await mgr.GetStateAsync("RPC", default);
|
|
state.Messages.ShouldBe(10UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// MirrorCoordinator: lag tracking
|
|
// Go reference: server/stream.go:2739-2743 (mirrorInfo)
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Verify mirror coordinator lag calculation used by health reporting.
|
|
public async Task Mirror_coordinator_tracks_lag_from_origin()
|
|
{
|
|
var target = new MemStore();
|
|
var mirror = new MirrorCoordinator(target);
|
|
|
|
var report = mirror.GetHealthReport(originLastSeq: 10);
|
|
report.Lag.ShouldBe(10UL);
|
|
report.IsRunning.ShouldBeFalse();
|
|
|
|
await mirror.OnOriginAppendAsync(
|
|
new StoredMessage { Sequence = 7, Subject = "a", Payload = "p"u8.ToArray() }, default);
|
|
|
|
report = mirror.GetHealthReport(originLastSeq: 10);
|
|
report.LastOriginSequence.ShouldBe(7UL);
|
|
report.Lag.ShouldBe(3UL);
|
|
|
|
await mirror.OnOriginAppendAsync(
|
|
new StoredMessage { Sequence = 10, Subject = "b", Payload = "p"u8.ToArray() }, default);
|
|
|
|
report = mirror.GetHealthReport(originLastSeq: 10);
|
|
report.Lag.ShouldBe(0UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// SourceCoordinator: filter + transform
|
|
// Go reference: server/stream.go:3860-4007 (processInboundSourceMsg)
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
// Verify SourceCoordinator filters by subject and applies prefix transform.
|
|
public async Task Source_coordinator_filters_and_transforms_subjects()
|
|
{
|
|
var target = new MemStore();
|
|
var sourceCfg = new StreamSourceConfig
|
|
{
|
|
Name = "SRC",
|
|
FilterSubject = "orders.*",
|
|
SubjectTransformPrefix = "copy.",
|
|
};
|
|
var source = new SourceCoordinator(target, sourceCfg);
|
|
|
|
await source.OnOriginAppendAsync(
|
|
new StoredMessage { Sequence = 1, Subject = "orders.created", Payload = "1"u8.ToArray() }, default);
|
|
await source.OnOriginAppendAsync(
|
|
new StoredMessage { Sequence = 2, Subject = "inventory.updated", Payload = "2"u8.ToArray() }, default);
|
|
await source.OnOriginAppendAsync(
|
|
new StoredMessage { Sequence = 3, Subject = "orders.deleted", Payload = "3"u8.ToArray() }, default);
|
|
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(2UL);
|
|
|
|
var msg1 = await target.LoadAsync(1, default);
|
|
msg1!.Subject.ShouldBe("copy.orders.created");
|
|
|
|
var msg2 = await target.LoadAsync(2, default);
|
|
msg2!.Subject.ShouldBe("copy.orders.deleted");
|
|
|
|
source.FilteredOutCount.ShouldBe(1L);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// SourceCoordinator: pull sync loop with filter
|
|
// Go reference: server/stream.go:3474-3720 (setupSourceConsumer)
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Source_coordinator_pull_sync_loop_syncs_filtered_messages()
|
|
{
|
|
var origin = new MemStore();
|
|
var target = new MemStore();
|
|
|
|
await origin.AppendAsync("orders.1", "p1"u8.ToArray(), default);
|
|
await origin.AppendAsync("inventory.1", "p2"u8.ToArray(), default);
|
|
await origin.AppendAsync("orders.2", "p3"u8.ToArray(), default);
|
|
await origin.AppendAsync("inventory.2", "p4"u8.ToArray(), default);
|
|
await origin.AppendAsync("orders.3", "p5"u8.ToArray(), default);
|
|
|
|
var sourceCfg = new StreamSourceConfig { Name = "ORIGIN", FilterSubject = "orders.*" };
|
|
await using var source = new SourceCoordinator(target, sourceCfg);
|
|
source.StartPullSyncLoop(origin);
|
|
|
|
await WaitForConditionAsync(() => source.LastOriginSequence >= 5, TimeSpan.FromSeconds(5));
|
|
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(3UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// MirrorCoordinator: ignores redelivered messages
|
|
// Go reference: server/stream.go:2924 (dc > 1 check)
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact]
|
|
public async Task Mirror_coordinator_ignores_redelivered_messages_in_channel_loop()
|
|
{
|
|
var target = new MemStore();
|
|
await using var mirror = new MirrorCoordinator(target);
|
|
mirror.StartSyncLoop();
|
|
|
|
mirror.TryEnqueue(new StoredMessage { Sequence = 1, Subject = "a", Payload = "1"u8.ToArray() });
|
|
mirror.TryEnqueue(new StoredMessage
|
|
{
|
|
Sequence = 1,
|
|
Subject = "a",
|
|
Payload = "1"u8.ToArray(),
|
|
Redelivered = true,
|
|
});
|
|
mirror.TryEnqueue(new StoredMessage { Sequence = 2, Subject = "b", Payload = "2"u8.ToArray() });
|
|
|
|
await WaitForConditionAsync(() => mirror.LastOriginSequence >= 2, TimeSpan.FromSeconds(5));
|
|
|
|
var state = await target.GetStateAsync(default);
|
|
state.Messages.ShouldBe(2UL);
|
|
}
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Skipped tests (require real multi-server / external infrastructure)
|
|
// -------------------------------------------------------------------------
|
|
|
|
[Fact(Skip = "Requires real server restart to test consumer failover — TestJetStreamMirroredConsumerFailAfterRestart:10835")]
|
|
public Task Mirror_consumer_fails_after_restart_and_recovers() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "Requires real external source/leaf node — TestJetStreamRemoveExternalSource:12150")]
|
|
public Task Remove_external_source_stops_forwarding() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceRestart:13010")]
|
|
public Task Work_queue_source_recovers_after_restart() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "Requires real server restart — TestJetStreamWorkQueueSourceNamingRestart:13111")]
|
|
public Task Work_queue_source_naming_recovers_after_restart() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "Requires real external source stream — TestJetStreamStreamUpdateWithExternalSource:15607")]
|
|
public Task Stream_update_with_external_source_works() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceAggregates:20759")]
|
|
public Task Allow_msg_counter_source_aggregates() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceVerbatim:20844")]
|
|
public Task Allow_msg_counter_source_verbatim() => Task.CompletedTask;
|
|
|
|
[Fact(Skip = "AllowMsgCounter requires real server infrastructure — TestJetStreamAllowMsgCounterSourceStartingAboveZero:20944")]
|
|
public Task Allow_msg_counter_source_starting_above_zero() => Task.CompletedTask;
|
|
|
|
// -------------------------------------------------------------------------
|
|
// Helpers
|
|
// -------------------------------------------------------------------------
|
|
|
|
private static async Task WaitForConditionAsync(Func<bool> condition, TimeSpan timeout)
|
|
{
|
|
using var cts = new CancellationTokenSource(timeout);
|
|
while (!condition())
|
|
{
|
|
await Task.Delay(25, cts.Token);
|
|
}
|
|
}
|
|
}
|