batch37 task2 implement group A message header methods

This commit is contained in:
Joseph Doherty
2026-02-28 23:37:09 -05:00
parent aaccc16b93
commit f0ea92b8dd
3 changed files with 377 additions and 0 deletions

View File

@@ -0,0 +1,298 @@
using System.Numerics;
using System.Text;
using ZB.MOM.NatsNet.Server.Internal;
namespace ZB.MOM.NatsNet.Server;
internal sealed partial class NatsStream
{
internal sealed class DedupeEntry
{
public required string Id { get; init; }
public ulong Seq { get; set; }
public long TimestampNanos { get; set; }
}
private readonly object _ddLock = new();
private Dictionary<string, DedupeEntry>? _ddMap;
private List<DedupeEntry>? _ddArr;
private int _ddIndex;
private Timer? _ddTimer;
internal void Unsubscribe(Subscription? sub)
{
if (sub == null)
return;
_mu.EnterReadLock();
try
{
if (_closed)
return;
}
finally
{
_mu.ExitReadLock();
}
sub.Close();
}
internal Exception? SetupStore(FileStoreConfig? fileStoreConfig)
{
_mu.EnterWriteLock();
try
{
if (Store != null)
{
RegisterStoreCallbacks(Store);
return null;
}
var streamConfig = Config.Clone();
IStreamStore store = streamConfig.Storage switch
{
StorageType.MemoryStorage => new JetStreamMemStore(streamConfig),
StorageType.FileStorage => BuildFileStore(fileStoreConfig, streamConfig),
_ => throw new InvalidOperationException($"unsupported storage type: {streamConfig.Storage}"),
};
Store = store;
RegisterStoreCallbacks(store);
return null;
}
catch (Exception ex)
{
return ex;
}
finally
{
_mu.ExitWriteLock();
}
}
private IStreamStore BuildFileStore(FileStoreConfig? fileStoreConfig, StreamConfig streamConfig)
{
var cfg = fileStoreConfig ?? FileStoreConfig();
if (string.IsNullOrWhiteSpace(cfg.StoreDir))
cfg.StoreDir = FileStoreConfig().StoreDir;
var fsi = new FileStreamInfo
{
Created = Created,
Config = streamConfig,
};
return new JetStreamFileStore(cfg, fsi);
}
private void RegisterStoreCallbacks(IStreamStore store)
{
store.RegisterStorageUpdates(StoreUpdates);
store.RegisterStorageRemoveMsg(_ => { });
store.RegisterProcessJetStreamMsg(_ => { });
}
internal void StoreUpdates(long msgs, long bytes, ulong seq, string subj)
{
_ = seq;
_ = subj;
Interlocked.Add(ref Msgs, msgs);
Interlocked.Add(ref Bytes, bytes);
}
internal int NumMsgIds()
{
lock (_ddLock)
{
return _ddMap?.Count ?? 0;
}
}
internal DedupeEntry? CheckMsgId(string id)
{
if (string.IsNullOrEmpty(id))
return null;
if (_ddMap == null || _ddMap.Count == 0)
return null;
return _ddMap.GetValueOrDefault(id);
}
internal void PurgeMsgIds()
{
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var window = (long)Config.Duplicates.TotalMilliseconds * 1_000_000L;
if (window <= 0)
{
lock (_ddLock)
{
_ddMap = null;
_ddArr = null;
_ddIndex = 0;
_ddTimer?.Dispose();
_ddTimer = null;
}
return;
}
lock (_ddLock)
{
if (_ddArr == null || _ddMap == null)
return;
for (var i = _ddIndex; i < _ddArr.Count; i++)
{
var entry = _ddArr[i];
if (now - entry.TimestampNanos >= window)
_ddMap.Remove(entry.Id);
else
{
_ddIndex = i;
break;
}
}
if (_ddMap.Count == 0)
{
_ddMap = null;
_ddArr = null;
_ddIndex = 0;
_ddTimer?.Dispose();
_ddTimer = null;
}
else
{
_ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan);
_ddTimer.Change(Config.Duplicates, Timeout.InfiniteTimeSpan);
}
}
}
internal void StoreMsgId(DedupeEntry entry)
{
ArgumentNullException.ThrowIfNull(entry);
lock (_ddLock)
{
StoreMsgIdLocked(entry);
}
}
internal void StoreMsgIdLocked(DedupeEntry entry)
{
ArgumentNullException.ThrowIfNull(entry);
if (Config.Duplicates <= TimeSpan.Zero)
return;
_ddMap ??= new Dictionary<string, DedupeEntry>(StringComparer.Ordinal);
_ddArr ??= new List<DedupeEntry>();
_ddMap[entry.Id] = entry;
_ddArr.Add(entry);
_ddTimer ??= new Timer(_ => PurgeMsgIds(), null, Config.Duplicates, Timeout.InfiniteTimeSpan);
}
internal static string GetMsgId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetExpectedLastMsgId(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastMsgId, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static string GetExpectedStream(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedStream, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (ulong Seq, bool Exists) GetExpectedLastSeq(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var seq = ServerUtilities.ParseInt64(value.Value.Span);
return seq < 0 ? (0, false) : ((ulong)seq, true);
}
internal static string GetRollup(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMsgRollup, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value).ToLowerInvariant();
}
internal static (ulong Seq, bool Exists) GetExpectedLastSeqPerSubject(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (0, false);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsExpectedLastSubjSeq, hdr);
if (value is null || value.Value.Length == 0)
return (0, false);
var seq = ServerUtilities.ParseInt64(value.Value.Span);
return seq < 0 ? (0, false) : ((ulong)seq, true);
}
internal static string GetExpectedLastSeqPerSubjectForSubject(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return string.Empty;
var value = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsExpectedLastSubjSeqSubj, hdr);
return value == null || value.Length == 0 ? string.Empty : Encoding.ASCII.GetString(value);
}
internal static (long Ttl, Exception? Error) ParseMessageTTL(string ttl) =>
JetStreamHeaderHelpers.ParseMessageTtl(ttl);
internal static (BigInteger? Value, bool Ok) GetMessageIncr(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (null, true);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsMessageIncr, hdr);
if (value is null || value.Value.Length == 0)
return (null, true);
if (BigInteger.TryParse(Encoding.ASCII.GetString(value.Value.Span), out var parsed))
return (parsed, true);
return (null, false);
}
internal static (DateTime Schedule, bool Ok) GetMessageSchedule(byte[]? hdr)
{
if (hdr == null || hdr.Length == 0)
return (default, true);
var value = NatsMessageHeaders.SliceHeader(NatsHeaderConstants.JsSchedulePattern, hdr);
if (value is null || value.Value.Length == 0)
return (default, true);
var ts = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
var pattern = Encoding.ASCII.GetString(value.Value.Span);
var (schedule, _, ok) = Internal.MsgScheduling.ParseMsgSchedule(pattern, ts);
return (schedule, ok);
}
}

View File

@@ -0,0 +1,79 @@
using System.Numerics;
using Shouldly;
using ZB.MOM.NatsNet.Server;
namespace ZB.MOM.NatsNet.Server.Tests.JetStream;
public sealed class NatsStreamMessageHeadersTests
{
[Fact]
public void ParseMessageTTL_NeverValue_ReturnsMinusOne()
{
var (ttl, error) = NatsStream.ParseMessageTTL("never");
error.ShouldBeNull();
ttl.ShouldBe(-1);
}
[Fact]
public void GetMessageIncr_ValidHeader_ReturnsParsedValue()
{
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMessageIncr, "42");
var (value, ok) = NatsStream.GetMessageIncr(hdr);
ok.ShouldBeTrue();
value.ShouldBe(new BigInteger(42));
}
[Fact]
public void GetMessageSchedule_InvalidPattern_ReturnsNotOk()
{
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsSchedulePattern, "invalid");
var (schedule, ok) = NatsStream.GetMessageSchedule(hdr);
ok.ShouldBeFalse();
schedule.ShouldBe(default);
}
[Fact]
public void SetupStore_MemoryStorage_CreatesMemStore()
{
var stream = CreateStream(duplicates: TimeSpan.FromSeconds(1));
var error = stream.SetupStore(null);
error.ShouldBeNull();
stream.Store.ShouldNotBeNull();
stream.Store!.Type().ShouldBe(StorageType.MemoryStorage);
}
[Fact]
public void StoreMsgIdAndPurge_ExpiredEntry_RemovesEntry()
{
var stream = CreateStream(duplicates: TimeSpan.FromMilliseconds(50));
var oldTs = (DateTimeOffset.UtcNow - TimeSpan.FromSeconds(1)).ToUnixTimeMilliseconds() * 1_000_000L;
stream.StoreMsgId(new NatsStream.DedupeEntry { Id = "id-1", Seq = 1, TimestampNanos = oldTs });
stream.NumMsgIds().ShouldBe(1);
stream.CheckMsgId("id-1").ShouldNotBeNull();
stream.PurgeMsgIds();
stream.NumMsgIds().ShouldBe(0);
stream.CheckMsgId("id-1").ShouldBeNull();
}
private static NatsStream CreateStream(TimeSpan duplicates)
{
var account = new Account { Name = "A" };
var config = new StreamConfig
{
Name = "S",
Storage = StorageType.MemoryStorage,
Duplicates = duplicates,
};
return new NatsStream(account, config, DateTime.UtcNow);
}
}

Binary file not shown.