batch36 task4 stream mirror lifecycle helpers

This commit is contained in:
Joseph Doherty
2026-02-28 23:02:51 -05:00
parent 3c974fbe55
commit 856cd17554
5 changed files with 315 additions and 14 deletions

View File

@@ -185,20 +185,10 @@ internal sealed partial class NatsStream
};
internal Exception? Update(StreamConfig config)
{
if (config == null)
return new ArgumentNullException(nameof(config));
UpdateConfig(config);
Exception? error = null;
return error;
}
=> UpdateWithAdvisory(config, sendAdvisory: true, pedantic: false);
internal Exception? UpdatePedantic(StreamConfig config, bool pedantic)
{
_ = pedantic;
return Update(config);
}
=> UpdateWithAdvisory(config, sendAdvisory: true, pedantic);
internal StreamAssignment? StreamAssignment()
{

View File

@@ -0,0 +1,270 @@
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
private static readonly TimeSpan RetryBackoff = TimeSpan.FromSeconds(5);
private static readonly TimeSpan RetryMaximum = TimeSpan.FromMinutes(2);
private static readonly TimeSpan SourceConsumerRetryThreshold = TimeSpan.FromSeconds(10);
internal Exception? UpdateWithAdvisory(StreamConfig config, bool sendAdvisory, bool pedantic)
{
_ = pedantic;
if (config == null)
return new ArgumentNullException(nameof(config));
UpdateConfig(config);
Exception? error = null;
if (sendAdvisory)
SendUpdateAdvisoryLocked();
return error;
}
internal string GetCfgName()
{
_mu.EnterReadLock();
try
{
return Config.Name ?? string.Empty;
}
finally
{
_mu.ExitReadLock();
}
}
internal (ulong Purged, Exception? Error) PurgeLocked(StreamPurgeRequest? request, bool needLock)
{
if (needLock)
_mu.EnterWriteLock();
try
{
if (_closed)
return (0UL, new InvalidOperationException("stream closed"));
if (_sealed)
return (0UL, new InvalidOperationException("sealed stream"));
if (Store == null)
return (0UL, new InvalidOperationException("stream store unavailable"));
var result = request == null
? Store.Purge()
: Store.PurgeEx(request.Filter ?? string.Empty, request.Sequence, request.Keep);
SyncCountersFromState(Store.State());
return result;
}
finally
{
if (needLock)
_mu.ExitWriteLock();
}
}
internal (bool Removed, Exception? Error) RemoveMsg(ulong sequence)
=> DeleteMsg(sequence);
internal (bool Removed, Exception? Error) DeleteMsg(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (false, new InvalidOperationException("stream closed"));
if (Store == null)
return (false, new InvalidOperationException("stream store unavailable"));
var result = Store.RemoveMsg(sequence);
if (result.Error == null)
SyncCountersFromState(Store.State());
return result;
}
finally
{
_mu.ExitWriteLock();
}
}
internal (bool Removed, Exception? Error) EraseMsg(ulong sequence)
{
_mu.EnterWriteLock();
try
{
if (_closed)
return (false, new InvalidOperationException("stream closed"));
if (Store == null)
return (false, new InvalidOperationException("stream store unavailable"));
var result = Store.EraseMsg(sequence);
if (result.Error == null)
SyncCountersFromState(Store.State());
return result;
}
finally
{
_mu.ExitWriteLock();
}
}
internal bool IsMirror()
{
_mu.EnterReadLock();
try
{
return _isMirror || Config.Mirror != null;
}
finally
{
_mu.ExitReadLock();
}
}
internal StreamSourceInfo[] SourcesInfo()
{
_mu.EnterReadLock();
try
{
return _sources.Values.Select(SourceInfo).Where(static info => info != null).Cast<StreamSourceInfo>().ToArray();
}
finally
{
_mu.ExitReadLock();
}
}
internal StreamSourceInfo? SourceInfo(StreamSourceInfo? sourceInfo)
{
if (sourceInfo == null)
return null;
return new StreamSourceInfo
{
Name = sourceInfo.Name,
Lag = sourceInfo.Lag,
FilterSubject = sourceInfo.FilterSubject,
Active = sourceInfo.Active,
Error = sourceInfo.Error,
External = sourceInfo.External == null
? null
: new StreamSource
{
Name = sourceInfo.External.Name,
FilterSubject = sourceInfo.External.FilterSubject,
SubjectTransforms = sourceInfo.External.SubjectTransforms,
External = sourceInfo.External.External,
},
};
}
internal StreamSourceInfo? MirrorInfo()
{
_mu.EnterReadLock();
try
{
return SourceInfo(_mirrorInfo);
}
finally
{
_mu.ExitReadLock();
}
}
internal void SetMirrorErr(JsApiError? error)
{
_mu.EnterWriteLock();
try
{
if (_mirrorInfo != null)
_mirrorInfo.Error = error?.Description;
}
finally
{
_mu.ExitWriteLock();
}
}
internal void CancelMirrorConsumer()
{
_mu.EnterWriteLock();
try
{
_mirrorConsumerSetupTimer?.Dispose();
_mirrorConsumerSetupTimer = null;
if (_mirrorInfo != null)
{
_mirrorInfo.Active = null;
_mirrorInfo.Error = null;
}
}
finally
{
_mu.ExitWriteLock();
}
}
internal Exception? RetryMirrorConsumer()
{
CancelMirrorConsumer();
Exception? error = null;
return error;
}
internal void SkipMsgs(ulong start, ulong end)
{
_mu.EnterWriteLock();
try
{
if (Store == null || start > end)
return;
var count = (end - start) + 1;
Store.SkipMsgs(start, count);
SetLastSeq(end);
}
finally
{
_mu.ExitWriteLock();
}
}
internal static TimeSpan CalculateRetryBackoff(int failures)
{
var backoff = TimeSpan.FromTicks(RetryBackoff.Ticks * Math.Max(1, failures * 2));
return backoff > RetryMaximum ? RetryMaximum : backoff;
}
internal void ScheduleSetupMirrorConsumerRetry()
{
_mu.EnterWriteLock();
try
{
var lastAttempt = _mirrorInfo?.Active ?? DateTime.UtcNow - SourceConsumerRetryThreshold;
var next = SourceConsumerRetryThreshold - (DateTime.UtcNow - lastAttempt);
if (next < TimeSpan.Zero)
next = TimeSpan.Zero;
var failures = _mirrorInfo == null ? 1 : (int)Math.Min(_mirrorInfo.Lag, int.MaxValue);
next += CalculateRetryBackoff(failures);
next += TimeSpan.FromMilliseconds(Random.Shared.Next(100, 201));
_mirrorConsumerSetupTimer?.Dispose();
_mirrorConsumerSetupTimer = new Timer(
static state =>
{
if (state is NatsStream stream)
stream.RetryMirrorConsumer();
},
this,
next,
Timeout.InfiniteTimeSpan);
}
finally
{
_mu.ExitWriteLock();
}
}
}

View File

@@ -37,7 +37,7 @@ internal sealed partial class NatsStream : IDisposable
internal long FirstSeq;
internal long LastSeq;
internal bool IsMirror;
private bool _isMirror;
private bool _closed;
private bool _isLeader;
@@ -50,6 +50,9 @@ internal sealed partial class NatsStream : IDisposable
private bool _clusterSubsActive;
private ulong _clseq;
private ulong _clfs;
private readonly Dictionary<string, StreamSourceInfo> _sources = new(StringComparer.Ordinal);
private StreamSourceInfo? _mirrorInfo;
private Timer? _mirrorConsumerSetupTimer;
/// <summary>IRaftNode — stored as object to avoid cross-dependency on Raft session.</summary>
private object? _node;
@@ -86,7 +89,7 @@ internal sealed partial class NatsStream : IDisposable
var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow)
{
Store = store,
IsMirror = cfg.Mirror != null,
_isMirror = cfg.Mirror != null,
};
return stream;
}

View File

@@ -123,6 +123,44 @@ public sealed class StreamLifecycleGroupBTests
jsa.SubjectsOverlap(["orders.created"], ownAssignment: null).ShouldBeTrue();
}
[Fact]
public void GroupCHelpers_GetCfgNameAndIsMirror_ReturnExpectedValues()
{
var mirrorCfg = new StreamConfig
{
Name = "MIRROR",
Storage = StorageType.MemoryStorage,
Mirror = new StreamSource { Name = "ORIGIN" },
};
var stream = CreateStream(mirrorCfg);
stream.GetCfgName().ShouldBe("MIRROR");
stream.IsMirror().ShouldBeTrue();
stream.MirrorInfo().ShouldBeNull();
}
[Fact]
public void GroupCHelpers_SourceInfoAndBackoff_Behave()
{
var stream = CreateStream();
var info = new StreamSourceInfo
{
Name = "SRC",
FilterSubject = "orders.*",
Lag = 10,
Error = "x",
};
var cloned = stream.SourceInfo(info);
cloned.ShouldNotBeNull();
cloned!.Name.ShouldBe("SRC");
cloned.FilterSubject.ShouldBe("orders.*");
NatsStream.CalculateRetryBackoff(1).ShouldBeGreaterThan(TimeSpan.Zero);
NatsStream.CalculateRetryBackoff(1000).ShouldBe(TimeSpan.FromMinutes(2));
}
private static NatsStream CreateStream(StreamConfig? cfg = null)
{
cfg ??= new StreamConfig { Name = "ORDERS", Subjects = ["orders.*"], Storage = StorageType.MemoryStorage };

Binary file not shown.