feat: implement SubscriptionIndex + JetStreamMemStore cluster — 39 features verified

Add SubscriptionIndex factory methods, notification wrappers, and
ValidateMapping. Implement 24 MemStore methods (TTL, scheduling, SDM,
age-check, purge/compact/reset) with JetStream header helpers and
constants. Verified features: 987 → 1026.
This commit is contained in:
Joseph Doherty
2026-02-27 06:19:47 -05:00
parent 4e61314c1c
commit ba4f41cf71
9 changed files with 897 additions and 8 deletions

View File

@@ -74,6 +74,21 @@ public sealed class StreamDeletionMeta
return false;
}
/// <summary>
/// Tries to get the pending entry for <paramref name="seq"/>.
/// </summary>
public bool TryGetPending(ulong seq, out SdmBySeq entry) => _pending.TryGetValue(seq, out entry);
/// <summary>
/// Sets the pending entry for <paramref name="seq"/>.
/// </summary>
public void SetPending(ulong seq, SdmBySeq entry) => _pending[seq] = entry;
/// <summary>
/// Returns the pending count for <paramref name="subj"/>, or 0 if not tracked.
/// </summary>
public ulong GetSubjectTotal(string subj) => _totals.TryGetValue(subj, out var cnt) ? cnt : 0;
/// <summary>
/// Clears all tracked data.
/// Mirrors <c>SDMMeta.empty</c>.

View File

@@ -1096,6 +1096,14 @@ public sealed class SubscriptionIndex
return false;
}
// Write lock must be held.
private Exception? AddInsertNotify(string subject, Action<bool> notify)
=> AddNotify(_notify!.Insert, subject, notify);
// Write lock must be held.
private Exception? AddRemoveNotify(string subject, Action<bool> notify)
=> AddNotify(_notify!.Remove, subject, notify);
private static Exception? AddNotify(Dictionary<string, List<Action<bool>>> m, string subject, Action<bool> notify)
{
if (m.TryGetValue(subject, out var chs))
@@ -1531,6 +1539,9 @@ public sealed class SubscriptionIndex
public List<Subscription>? PList;
public SublistLevel? Next;
/// <summary>Factory method matching Go's <c>newNode()</c>.</summary>
public static SublistNode NewNode() => new();
public bool IsEmpty()
{
return PSubs.Count == 0 && (QSubs == null || QSubs.Count == 0) &&
@@ -1544,6 +1555,9 @@ public sealed class SubscriptionIndex
public SublistNode? Pwc;
public SublistNode? Fwc;
/// <summary>Factory method matching Go's <c>newLevel()</c>.</summary>
public static SublistLevel NewLevel() => new();
public int NumNodes()
{
var num = Nodes.Count;

View File

@@ -243,6 +243,51 @@ public sealed class SubjectTransform : ISubjectTransformer
public static (SubjectTransform? transform, Exception? err) NewStrict(string src, string dest) =>
NewWithStrict(src, dest, true);
/// <summary>
/// Validates a subject mapping destination. Checks each token for valid syntax,
/// validates mustache-style mapping functions against known regexes, then verifies
/// the full transform can be created. Mirrors Go's <c>ValidateMapping</c>.
/// </summary>
public static Exception? ValidateMapping(string src, string dest)
{
if (string.IsNullOrEmpty(dest))
return null;
bool sfwc = false;
foreach (var t in dest.Split(SubjectTokens.Btsep))
{
var length = t.Length;
if (length == 0 || sfwc)
return new MappingDestinationException(t, ServerErrors.ErrInvalidMappingDestinationSubject);
// If it looks like a mapping function, validate against known patterns.
if (length > 4 && t[0] == '{' && t[1] == '{' && t[length - 2] == '}' && t[length - 1] == '}')
{
if (!PartitionRe.IsMatch(t) &&
!WildcardRe.IsMatch(t) &&
!SplitFromLeftRe.IsMatch(t) &&
!SplitFromRightRe.IsMatch(t) &&
!SliceFromLeftRe.IsMatch(t) &&
!SliceFromRightRe.IsMatch(t) &&
!SplitRe.IsMatch(t) &&
!RandomRe.IsMatch(t))
{
return new MappingDestinationException(t, ServerErrors.ErrUnknownMappingDestinationFunction);
}
continue;
}
if (length == 1 && t[0] == SubjectTokens.Fwc)
sfwc = true;
else if (t.AsSpan().ContainsAny("\t\n\f\r "))
return ServerErrors.ErrInvalidMappingDestinationSubject;
}
// Verify that the transform can actually be created.
var (_, err) = New(src, dest);
return err;
}
/// <summary>
/// Attempts to match a published subject against the source pattern.
/// Returns the transformed subject or an error.