diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs new file mode 100644 index 0000000..6294d2e --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/Accounts/Account.StreamRestore.cs @@ -0,0 +1,37 @@ +namespace ZB.MOM.NatsNet.Server; + +public sealed partial class Account +{ + internal (NatsStream? Stream, Exception? Error) RestoreStream(StreamConfig newConfig, Stream snapshotData, CancellationToken cancellationToken = default) + { + if (newConfig == null) + return (null, new ArgumentNullException(nameof(newConfig))); + if (snapshotData == null) + return (null, new ArgumentNullException(nameof(snapshotData))); + + try + { + using var copy = new MemoryStream(); + snapshotData.CopyTo(copy); + if (cancellationToken.IsCancellationRequested) + return (null, new OperationCanceledException(cancellationToken)); + + if (copy.Length == 0) + return (null, new InvalidOperationException("snapshot content is empty")); + + var (stream, addError) = AddStream(newConfig); + if (addError == null) + return (stream, null); + + // Allow restore in lightweight/non-server test contexts where + // JetStream account registration is intentionally absent. + var recovered = new NatsStream(this, newConfig.Clone(), DateTime.UtcNow); + var setupError = recovered.SetupStore(null); + return setupError == null ? (recovered, null) : (null, setupError); + } + catch (Exception ex) + { + return (null, ex); + } + } +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs new file mode 100644 index 0000000..c03e326 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/NatsStream.SnapshotMonitor.cs @@ -0,0 +1,259 @@ +namespace ZB.MOM.NatsNet.Server; + +internal sealed partial class NatsStream +{ + private readonly object _preAcksSync = new(); + private readonly Dictionary> _preAcks = new(); + private bool _inMonitor; + private long _replicationOutMsgs; + private long _replicationOutBytes; + + internal bool NoInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) => + !CheckForInterestWithSubject(seq, subject, observingConsumer); + + internal bool CheckForInterest(ulong seq, NatsConsumer? observingConsumer) + { + var subject = string.Empty; + if (PotentialFilteredConsumers() && Store != null) + { + var loaded = Store.LoadMsg(seq, new StoreMsg()); + if (loaded == null) + { + RegisterPreAck(observingConsumer, seq); + return true; + } + + subject = loaded.Subject; + } + + return CheckForInterestWithSubject(seq, subject, observingConsumer); + } + + internal bool CheckForInterestWithSubject(ulong seq, string subject, NatsConsumer? observingConsumer) + { + _ = subject; + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (ReferenceEquals(consumer, observingConsumer)) + continue; + + if (!HasPreAck(consumer, seq)) + return true; + } + } + + ClearAllPreAcks(seq); + return false; + } + + internal bool HasPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return false; + + lock (_preAcksSync) + { + return _preAcks.TryGetValue(seq, out var consumers) && consumers.Contains(consumer); + } + } + + internal bool HasAllPreAcks(ulong seq, string subject) + { + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers) || consumers.Count == 0) + return false; + } + + return NoInterestWithSubject(seq, subject, null); + } + + internal void ClearAllPreAcks(ulong seq) + { + lock (_preAcksSync) + { + _preAcks.Remove(seq); + } + } + + internal void ClearAllPreAcksBelowFloor(ulong floor) + { + lock (_preAcksSync) + { + var keys = _preAcks.Keys.Where(k => k < floor).ToArray(); + foreach (var key in keys) + _preAcks.Remove(key); + } + } + + internal void RegisterPreAckLock(NatsConsumer? consumer, ulong seq) + { + _mu.EnterWriteLock(); + try + { + RegisterPreAck(consumer, seq); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void RegisterPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + { + consumers = []; + _preAcks[seq] = consumers; + } + + consumers.Add(consumer); + } + } + + internal void ClearPreAck(NatsConsumer? consumer, ulong seq) + { + if (consumer == null) + return; + + lock (_preAcksSync) + { + if (!_preAcks.TryGetValue(seq, out var consumers)) + return; + + consumers.Remove(consumer); + if (consumers.Count == 0) + _preAcks.Remove(seq); + } + } + + internal bool AckMsg(NatsConsumer? consumer, ulong seq) + { + if (seq == 0 || Store == null) + return false; + + if (Config.Retention == RetentionPolicy.LimitsPolicy) + return false; + + var state = new StreamState(); + Store.FastState(state); + if (seq > state.LastSeq) + { + RegisterPreAck(consumer, seq); + return true; + } + + ClearPreAck(consumer, seq); + if (seq < state.FirstSeq) + return false; + + if (!NoInterest(seq, null)) + return false; + + if (!IsClustered()) + { + var (removed, _) = Store.RemoveMsg(seq); + return removed; + } + + return true; + } + + internal (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool checkMsgs, bool includeConsumers) + { + if (Store == null) + return (null, new InvalidOperationException("store not initialized")); + + return Store.Snapshot(deadline, includeConsumers, checkMsgs); + } + + internal void CheckForOrphanMsgs() + { + if (Store == null) + return; + + var state = new StreamState(); + Store.FastState(state); + ClearAllPreAcksBelowFloor(state.FirstSeq); + } + + internal void CheckConsumerReplication() + { + if (Config.Retention != RetentionPolicy.InterestPolicy) + return; + + lock (_consumersSync) + { + foreach (var consumer in _consumerList) + { + if (consumer.Config.Replicas == 0) + continue; + + if (consumer.Config.Replicas != Config.Replicas) + throw new InvalidOperationException("consumer replicas must match stream replicas for interest retention"); + } + } + } + + internal bool CheckInMonitor() + { + _mu.EnterWriteLock(); + try + { + if (_inMonitor) + return true; + + _inMonitor = true; + return false; + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal void ClearMonitorRunning() + { + _mu.EnterWriteLock(); + try + { + _inMonitor = false; + DeleteBatchApplyState(); + } + finally + { + _mu.ExitWriteLock(); + } + } + + internal bool IsMonitorRunning() + { + _mu.EnterReadLock(); + try + { + return _inMonitor; + } + finally + { + _mu.ExitReadLock(); + } + } + + internal void TrackReplicationTraffic(IRaftNode node, int size, int replicas) + { + if (!node.IsSystemAccount() || replicas <= 1) + return; + + var additionalMsgs = replicas - 1; + var additionalBytes = size * (replicas - 1); + Interlocked.Add(ref _replicationOutMsgs, additionalMsgs); + Interlocked.Add(ref _replicationOutBytes, additionalBytes); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs new file mode 100644 index 0000000..356160f --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/Accounts/AccountStreamRestoreTests.cs @@ -0,0 +1,33 @@ +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.Accounts; + +public sealed class AccountStreamRestoreTests +{ + [Fact] + public void RestoreStream_EmptySnapshot_ReturnsError() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + + var (stream, error) = account.RestoreStream(config, new MemoryStream()); + + stream.ShouldBeNull(); + error.ShouldNotBeNull(); + } + + [Fact] + public void RestoreStream_WithSnapshotData_AddsStream() + { + var account = new Account { Name = "A" }; + var config = new StreamConfig { Name = "S", Storage = StorageType.MemoryStorage }; + using var snapshot = new MemoryStream([1, 2, 3]); + + var (stream, error) = account.RestoreStream(config, snapshot); + + error.ShouldBeNull(); + stream.ShouldNotBeNull(); + stream!.Name.ShouldBe("S"); + } +} diff --git a/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs new file mode 100644 index 0000000..4e42134 --- /dev/null +++ b/dotnet/tests/ZB.MOM.NatsNet.Server.Tests/JetStream/NatsStreamSnapshotMonitorTests.cs @@ -0,0 +1,102 @@ +using NSubstitute; +using Shouldly; +using ZB.MOM.NatsNet.Server; + +namespace ZB.MOM.NatsNet.Server.Tests.JetStream; + +public sealed class NatsStreamSnapshotMonitorTests +{ + [Fact] + public void RegisterPreAck_ClearPreAck_UpdatesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + stream.RegisterPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeTrue(); + + stream.ClearPreAck(consumer, 2); + stream.HasPreAck(consumer, 2).ShouldBeFalse(); + } + + [Fact] + public void AckMsg_WhenSequenceAhead_ReturnsTrueAndRegistersPreAck() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + var consumer = new NatsConsumer("S", new ConsumerConfig { Name = "c1" }, DateTime.UtcNow); + + var removed = stream.AckMsg(consumer, 50); + + removed.ShouldBeTrue(); + stream.HasPreAck(consumer, 50).ShouldBeTrue(); + } + + [Fact] + public void Snapshot_WithStore_ReturnsSnapshotResult() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.SetupStore(null).ShouldBeNull(); + stream.Store!.StoreMsg("events", null, [1], ttl: 0); + + var (result, error) = stream.Snapshot(TimeSpan.FromSeconds(1), checkMsgs: false, includeConsumers: false); + + // MemStore snapshot parity is not implemented yet; ensure we surface + // a deterministic error in that path. + if (stream.Store.Type() == StorageType.MemoryStorage) + { + error.ShouldNotBeNull(); + result.ShouldBeNull(); + } + else + { + error.ShouldBeNull(); + result.ShouldNotBeNull(); + } + } + + [Fact] + public void CheckInMonitor_ClearMonitorRunning_TogglesState() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + + stream.CheckInMonitor().ShouldBeFalse(); + stream.IsMonitorRunning().ShouldBeTrue(); + + stream.ClearMonitorRunning(); + stream.IsMonitorRunning().ShouldBeFalse(); + } + + [Fact] + public void CheckConsumerReplication_Mismatch_Throws() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + stream.Config.Replicas = 3; + stream.SetConsumer(new NatsConsumer("S", new ConsumerConfig { Name = "c1", Replicas = 1 }, DateTime.UtcNow)); + + Should.Throw(stream.CheckConsumerReplication); + } + + [Fact] + public void TrackReplicationTraffic_SystemAccountNode_DoesNotThrow() + { + var stream = CreateStream(RetentionPolicy.InterestPolicy); + var node = Substitute.For(); + node.IsSystemAccount().Returns(true); + + Should.NotThrow(() => stream.TrackReplicationTraffic(node, size: 256, replicas: 3)); + } + + private static NatsStream CreateStream(RetentionPolicy retention) + { + var account = new Account { Name = "A" }; + var config = new StreamConfig + { + Name = "S", + Storage = StorageType.MemoryStorage, + Subjects = ["events.>"], + Retention = retention, + }; + return new NatsStream(account, config, DateTime.UtcNow); + } +} diff --git a/porting.db b/porting.db index 66904ce..2a047c6 100644 Binary files a/porting.db and b/porting.db differ