feat(batch29): implement jetstream batching group-b validation and apply state
This commit is contained in:
@@ -13,10 +13,18 @@
|
||||
//
|
||||
// Adapted from server/jetstream_batching.go in the NATS server Go source.
|
||||
|
||||
using System.Numerics;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
internal static class JetStreamBatching
|
||||
{
|
||||
private const string JsStreamSource = "Nats-Stream-Source";
|
||||
private const string JsMessageCounterSources = "Nats-Counter-Sources";
|
||||
|
||||
internal static readonly TimeSpan StreamDefaultMaxBatchTimeout = TimeSpan.FromSeconds(10);
|
||||
internal const string BatchTimeout = "timeout";
|
||||
|
||||
@@ -117,6 +125,490 @@ internal static class JetStreamBatching
|
||||
ArgumentNullException.ThrowIfNull(batchGroup);
|
||||
batchGroup.StopLocked();
|
||||
}
|
||||
|
||||
internal static void Commit(BatchStagedDiff diff, BatchRuntimeState state)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(diff);
|
||||
ArgumentNullException.ThrowIfNull(state);
|
||||
|
||||
if (diff.MsgIds is { Count: > 0 })
|
||||
{
|
||||
var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds() * 1_000_000L;
|
||||
foreach (var msgId in diff.MsgIds.Keys)
|
||||
state.MsgIds[msgId] = new NatsStream.DedupeEntry { Id = msgId, Seq = 0, TimestampNanos = now };
|
||||
}
|
||||
|
||||
if (diff.Counter is { Count: > 0 })
|
||||
{
|
||||
foreach (var entry in diff.Counter)
|
||||
state.ClusteredCounterTotal[entry.Key] = entry.Value;
|
||||
}
|
||||
|
||||
if (diff.Inflight is { Count: > 0 })
|
||||
{
|
||||
foreach (var pair in diff.Inflight)
|
||||
{
|
||||
if (state.Inflight.TryGetValue(pair.Key, out var existing))
|
||||
{
|
||||
existing.Bytes += pair.Value.Bytes;
|
||||
existing.Ops += pair.Value.Ops;
|
||||
}
|
||||
else
|
||||
{
|
||||
state.Inflight[pair.Key] = pair.Value;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (diff.ExpectedPerSubject is { Count: > 0 })
|
||||
{
|
||||
foreach (var pair in diff.ExpectedPerSubject)
|
||||
{
|
||||
state.ExpectedPerSubjectSequence[pair.Value.ClSeq] = pair.Key;
|
||||
state.ExpectedPerSubjectInProcess.Add(pair.Key);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal static void ClearBatchStateLocked(BatchApply batch)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(batch);
|
||||
batch.ClearBatchStateLocked();
|
||||
}
|
||||
|
||||
internal static void RejectBatchStateLocked(BatchApply batch, BatchApplyContext context)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(batch);
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
batch.RejectBatchStateLocked(context);
|
||||
}
|
||||
|
||||
internal static void RejectBatchState(BatchApply batch, BatchApplyContext context)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(batch);
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
batch.RejectBatchState(context);
|
||||
}
|
||||
|
||||
internal static (byte[] Hdr, byte[] Msg, ulong Sequence, JsApiError? ApiError, Exception? Error) CheckMsgHeadersPreClusteredProposal(
|
||||
BatchStagedDiff diff,
|
||||
BatchHeaderCheckContext context,
|
||||
string subject,
|
||||
byte[]? hdr,
|
||||
byte[]? msg,
|
||||
bool sourced,
|
||||
string name,
|
||||
bool allowRollup,
|
||||
bool denyPurge,
|
||||
bool allowTtl,
|
||||
bool allowMsgCounter,
|
||||
bool allowMsgSchedules,
|
||||
DiscardPolicy discard,
|
||||
bool discardNewPer,
|
||||
int maxMsgSize,
|
||||
long maxMsgs,
|
||||
long maxMsgsPer,
|
||||
long maxBytes)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(diff);
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
ArgumentNullException.ThrowIfNull(context.Store);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(subject);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(name);
|
||||
|
||||
hdr ??= Array.Empty<byte>();
|
||||
msg ??= Array.Empty<byte>();
|
||||
BigInteger? incr = null;
|
||||
|
||||
if (hdr.Length > 0)
|
||||
{
|
||||
if (hdr.Length > ushort.MaxValue)
|
||||
{
|
||||
var err = new InvalidOperationException($"JetStream header size exceeds limits for '{context.AccountName} > {name}'");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamHeaderExceedsMaximumError(), err);
|
||||
}
|
||||
|
||||
var (parsedIncr, okIncr) = NatsStream.GetMessageIncr(hdr);
|
||||
if (!okIncr)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageIncrInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
incr = parsedIncr;
|
||||
if (incr != null && !sourced)
|
||||
{
|
||||
if (!allowMsgCounter)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageIncrDisabledError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
if (msg.Length > 0)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageIncrPayloadError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
var hasInvalidCounterHeader =
|
||||
NatsStream.GetRollup(hdr) != string.Empty ||
|
||||
NatsStream.GetExpectedStream(hdr) != string.Empty ||
|
||||
NatsStream.GetExpectedLastMsgId(hdr) != string.Empty ||
|
||||
NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr) != string.Empty ||
|
||||
NatsStream.GetExpectedLastSeq(hdr).Exists ||
|
||||
NatsStream.GetExpectedLastSeqPerSubject(hdr).Exists;
|
||||
|
||||
if (hasInvalidCounterHeader)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageIncrInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
}
|
||||
|
||||
var expectedStream = NatsStream.GetExpectedStream(hdr);
|
||||
if (!string.IsNullOrEmpty(expectedStream) && !string.Equals(expectedStream, name, StringComparison.Ordinal))
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamNotMatchError(), new InvalidOperationException("stream mismatch"));
|
||||
|
||||
var ttlValue = NatsMessageHeaders.GetHeader(NatsHeaderConstants.JsMessageTtl, hdr) is { Length: > 0 } ttlHdr
|
||||
? Encoding.ASCII.GetString(ttlHdr)
|
||||
: string.Empty;
|
||||
var (ttl, ttlErr) = string.IsNullOrEmpty(ttlValue)
|
||||
? (0L, (Exception?)null)
|
||||
: NatsStream.ParseMessageTTL(ttlValue);
|
||||
if (!sourced && (ttl != 0 || ttlErr != null))
|
||||
{
|
||||
if (!allowTtl)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLDisabledError(), new InvalidOperationException("message ttl disabled"));
|
||||
|
||||
if (ttlErr != null)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLInvalidError(), ttlErr);
|
||||
}
|
||||
|
||||
var msgId = NatsStream.GetMsgId(hdr);
|
||||
if (!string.IsNullOrEmpty(msgId))
|
||||
{
|
||||
diff.MsgIds ??= new Dictionary<string, object?>(StringComparer.Ordinal);
|
||||
if (diff.MsgIds.ContainsKey(msgId))
|
||||
{
|
||||
var err = new InvalidOperationException("duplicate message id in staged batch");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSAtomicPublishContainsDuplicateMessageError(), err);
|
||||
}
|
||||
|
||||
if (context.MsgIds.TryGetValue(msgId, out var dedupe))
|
||||
{
|
||||
var err = new InvalidOperationException("duplicate message id");
|
||||
if (dedupe.Seq > 0)
|
||||
return (hdr, msg, dedupe.Seq, JsApiErrors.NewJSAtomicPublishContainsDuplicateMessageError(), err);
|
||||
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamDuplicateMessageConflictError(), err);
|
||||
}
|
||||
|
||||
diff.MsgIds[msgId] = null;
|
||||
}
|
||||
}
|
||||
|
||||
if (incr is null && allowMsgCounter)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageIncrMissingError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
if (incr != null && allowMsgCounter)
|
||||
{
|
||||
var initial = BigInteger.Zero;
|
||||
Dictionary<string, Dictionary<string, string>>? sources = null;
|
||||
MsgCounterRunningTotal? counter = null;
|
||||
|
||||
diff.Counter ??= new Dictionary<string, MsgCounterRunningTotal>(StringComparer.Ordinal);
|
||||
if (diff.Counter.TryGetValue(subject, out var stagedCounter))
|
||||
{
|
||||
initial = stagedCounter.Total;
|
||||
sources = stagedCounter.Sources;
|
||||
counter = stagedCounter;
|
||||
}
|
||||
else if (context.ClusteredCounterTotal.TryGetValue(subject, out var committedCounter))
|
||||
{
|
||||
initial = committedCounter.Total;
|
||||
sources = committedCounter.Sources;
|
||||
counter = new MsgCounterRunningTotal { Ops = committedCounter.Ops };
|
||||
}
|
||||
else
|
||||
{
|
||||
var last = context.Store.LoadLastMsg(subject, new StoreMsg());
|
||||
if (last != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
var current = JsonSerializer.Deserialize<CounterValue>(last.Msg);
|
||||
if (current?.Value is null || !BigInteger.TryParse(current.Value, out initial))
|
||||
throw new InvalidOperationException("invalid counter payload");
|
||||
|
||||
var sourceHdr = NatsMessageHeaders.SliceHeader(JsMessageCounterSources, last.Hdr);
|
||||
if (sourceHdr is { Length: > 0 })
|
||||
sources = JsonSerializer.Deserialize<Dictionary<string, Dictionary<string, string>>>(sourceHdr.Value.ToArray());
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSMessageCounterBrokenError(), ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var sourceHeader = NatsMessageHeaders.SliceHeader(JsStreamSource, hdr);
|
||||
if (sourceHeader is { Length: > 0 })
|
||||
{
|
||||
try
|
||||
{
|
||||
var fields = Encoding.ASCII.GetString(sourceHeader.Value.Span).Split(' ', StringSplitOptions.RemoveEmptyEntries);
|
||||
var originStream = fields.Length > 0 ? fields[0] : string.Empty;
|
||||
var originSubject = fields.Length >= 5 ? fields[4] : subject;
|
||||
|
||||
var current = JsonSerializer.Deserialize<CounterValue>(msg);
|
||||
if (current?.Value is null || !BigInteger.TryParse(current.Value, out var sourcedValue))
|
||||
throw new InvalidOperationException("invalid sourced counter payload");
|
||||
|
||||
sources ??= new Dictionary<string, Dictionary<string, string>>(StringComparer.Ordinal);
|
||||
if (!sources.TryGetValue(originStream, out var bySubject))
|
||||
{
|
||||
bySubject = new Dictionary<string, string>(StringComparer.Ordinal);
|
||||
sources[originStream] = bySubject;
|
||||
}
|
||||
|
||||
var previousRaw = bySubject.GetValueOrDefault(originSubject);
|
||||
_ = BigInteger.TryParse(previousRaw, out var previousValue);
|
||||
bySubject[originSubject] = sourcedValue.ToString();
|
||||
incr = sourcedValue - previousValue;
|
||||
hdr = NatsMessageHeaders.SetHeader(NatsHeaderConstants.JsMessageIncr, incr.ToString(), hdr);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSMessageCounterBrokenError(), ex);
|
||||
}
|
||||
}
|
||||
|
||||
initial += incr.Value;
|
||||
msg = Encoding.ASCII.GetBytes($"{{\"val\":\"{initial}\"}}");
|
||||
if (sources is { Count: > 0 })
|
||||
hdr = NatsMessageHeaders.SetHeader(JsMessageCounterSources, JsonSerializer.Serialize(sources), hdr);
|
||||
|
||||
var maxSize = context.MaxPayload;
|
||||
if (maxMsgSize >= 0 && maxMsgSize < maxSize)
|
||||
maxSize = maxMsgSize;
|
||||
if (hdr.Length > maxSize || msg.Length > maxSize - hdr.Length)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamMessageExceedsMaximumError(), ServerErrors.ErrMaxPayload);
|
||||
|
||||
counter ??= new MsgCounterRunningTotal();
|
||||
counter.Total = initial;
|
||||
counter.Sources = sources;
|
||||
counter.Ops++;
|
||||
diff.Counter[subject] = counter;
|
||||
}
|
||||
|
||||
if (hdr.Length > 0)
|
||||
{
|
||||
var (lastSeq, hasLastSeq) = NatsStream.GetExpectedLastSeq(hdr);
|
||||
var actualLastSeq = context.ClSeq - context.Clfs;
|
||||
if (hasLastSeq && lastSeq != actualLastSeq)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(actualLastSeq), new InvalidOperationException($"last sequence mismatch: {lastSeq} vs {actualLastSeq}"));
|
||||
if (hasLastSeq && diff.Inflight is { Count: > 0 })
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceConstantError(), new InvalidOperationException("last sequence mismatch"));
|
||||
|
||||
var (expectedPerSubject, hasExpectedPerSubject) = NatsStream.GetExpectedLastSeqPerSubject(hdr);
|
||||
if (hasExpectedPerSubject)
|
||||
{
|
||||
var seqSubject = subject;
|
||||
var overrideSubject = NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr);
|
||||
if (!string.IsNullOrEmpty(overrideSubject))
|
||||
seqSubject = overrideSubject;
|
||||
|
||||
if ((diff.Inflight?.ContainsKey(seqSubject) ?? false) ||
|
||||
context.ExpectedPerSubjectInProcess.Contains(seqSubject) ||
|
||||
context.Inflight.ContainsKey(seqSubject))
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceConstantError(), new InvalidOperationException("last sequence by subject mismatch"));
|
||||
|
||||
diff.ExpectedPerSubject ??= new Dictionary<string, BatchExpectedPerSubject>(StringComparer.Ordinal);
|
||||
if (diff.ExpectedPerSubject.TryGetValue(seqSubject, out var existingExpected))
|
||||
{
|
||||
if (existingExpected.SSeq != expectedPerSubject)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(existingExpected.SSeq), new InvalidOperationException($"last sequence by subject mismatch: {expectedPerSubject} vs {existingExpected.SSeq}"));
|
||||
existingExpected.ClSeq = context.ClSeq;
|
||||
}
|
||||
else
|
||||
{
|
||||
var stored = context.Store.LoadLastMsg(seqSubject, new StoreMsg());
|
||||
var foundSeq = stored?.Seq ?? 0;
|
||||
if (foundSeq != expectedPerSubject)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamWrongLastSequenceError(foundSeq), new InvalidOperationException($"last sequence by subject mismatch: {expectedPerSubject} vs {foundSeq}"));
|
||||
|
||||
diff.ExpectedPerSubject[seqSubject] = new BatchExpectedPerSubject
|
||||
{
|
||||
SSeq = foundSeq,
|
||||
ClSeq = context.ClSeq,
|
||||
};
|
||||
}
|
||||
}
|
||||
else if (!string.IsNullOrEmpty(NatsStream.GetExpectedLastSeqPerSubjectForSubject(hdr)))
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSStreamExpectedLastSeqPerSubjectInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
var (schedule, hasValidSchedulePattern) = NatsStream.GetMessageSchedule(hdr);
|
||||
if (!hasValidSchedulePattern)
|
||||
{
|
||||
var apiErr = allowMsgSchedules
|
||||
? JsApiErrors.NewJSMessageSchedulesPatternInvalidError()
|
||||
: JsApiErrors.NewJSMessageSchedulesDisabledError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
if (schedule != default)
|
||||
{
|
||||
if (!allowMsgSchedules)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesDisabledError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
var (scheduleTtl, scheduleTtlOk) = NatsStream.GetMessageScheduleTTL(hdr);
|
||||
if (!scheduleTtlOk)
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesTTLInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
if (!string.IsNullOrEmpty(scheduleTtl) && !allowTtl)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSMessageTTLDisabledError(), new InvalidOperationException("message ttl disabled"));
|
||||
|
||||
var scheduleTarget = NatsStream.GetMessageScheduleTarget(hdr);
|
||||
if (string.IsNullOrEmpty(scheduleTarget) ||
|
||||
!SubscriptionIndex.IsValidPublishSubject(scheduleTarget) ||
|
||||
SubscriptionIndex.SubjectsCollide(scheduleTarget, subject))
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesTargetInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
var scheduleSource = NatsStream.GetMessageScheduleSource(hdr);
|
||||
if (!string.IsNullOrEmpty(scheduleSource) &&
|
||||
(string.Equals(scheduleSource, scheduleTarget, StringComparison.Ordinal) ||
|
||||
string.Equals(scheduleSource, subject, StringComparison.Ordinal) ||
|
||||
!SubscriptionIndex.IsValidPublishSubject(scheduleSource)))
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesSourceInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
if (context.StreamSubjects.Length > 0 && !context.StreamSubjects.Any(s => SubscriptionIndex.SubjectsCollide(s, scheduleTarget)))
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesTargetInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
|
||||
var rollup = NatsStream.GetRollup(hdr);
|
||||
if (string.IsNullOrEmpty(rollup))
|
||||
{
|
||||
hdr = NatsMessageHeaders.GenHeader(hdr, NatsHeaderConstants.JsMsgRollup, NatsHeaderConstants.JsMsgRollupSubject);
|
||||
}
|
||||
else if (!string.Equals(rollup, NatsHeaderConstants.JsMsgRollupSubject, StringComparison.Ordinal))
|
||||
{
|
||||
var apiErr = JsApiErrors.NewJSMessageSchedulesRollupInvalidError();
|
||||
return (hdr, msg, 0, apiErr, new InvalidOperationException(apiErr.Description));
|
||||
}
|
||||
}
|
||||
|
||||
var headerRollup = NatsStream.GetRollup(hdr);
|
||||
if (!string.IsNullOrEmpty(headerRollup))
|
||||
{
|
||||
if (!allowRollup || denyPurge)
|
||||
{
|
||||
var err = new InvalidOperationException("rollup not permitted");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err);
|
||||
}
|
||||
|
||||
if (string.Equals(headerRollup, NatsHeaderConstants.JsMsgRollupSubject, StringComparison.Ordinal))
|
||||
{
|
||||
if (diff.Inflight?.ContainsKey(subject) ?? false)
|
||||
{
|
||||
var err = new InvalidOperationException("batch rollup sub invalid");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err);
|
||||
}
|
||||
}
|
||||
else if (string.Equals(headerRollup, NatsHeaderConstants.JsMsgRollupAll, StringComparison.Ordinal))
|
||||
{
|
||||
if (diff.Inflight is { Count: > 0 })
|
||||
{
|
||||
var err = new InvalidOperationException("batch rollup all invalid");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err);
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
var err = new InvalidOperationException($"rollup value invalid: {headerRollup}");
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamRollupFailedError(err), err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
diff.Inflight ??= new Dictionary<string, InflightSubjectRunningTotal>(StringComparer.Ordinal);
|
||||
var msgSize = context.Store.Type() == StorageType.FileStorage
|
||||
? JetStreamFileStore.FileStoreMsgSizeRaw(subject.Length, hdr.Length, msg.Length)
|
||||
: JetStreamMemStore.MemStoreMsgSizeRaw(subject.Length, hdr.Length, msg.Length);
|
||||
|
||||
if (diff.Inflight.TryGetValue(subject, out var inflight))
|
||||
{
|
||||
inflight.Bytes += msgSize;
|
||||
inflight.Ops++;
|
||||
}
|
||||
else
|
||||
{
|
||||
inflight = new InflightSubjectRunningTotal { Bytes = msgSize, Ops = 1 };
|
||||
diff.Inflight[subject] = inflight;
|
||||
}
|
||||
|
||||
if (discard == DiscardPolicy.DiscardNew)
|
||||
{
|
||||
if (maxMsgs > 0 || maxBytes > 0)
|
||||
{
|
||||
var state = new StreamState();
|
||||
context.Store.FastState(state);
|
||||
var totalMsgs = state.Msgs;
|
||||
var totalBytes = state.Bytes;
|
||||
foreach (var inflightState in context.Inflight.Values)
|
||||
{
|
||||
totalMsgs += inflightState.Ops;
|
||||
totalBytes += inflightState.Bytes;
|
||||
}
|
||||
foreach (var inflightState in diff.Inflight.Values)
|
||||
{
|
||||
totalMsgs += inflightState.Ops;
|
||||
totalBytes += inflightState.Bytes;
|
||||
}
|
||||
|
||||
Exception? thresholdErr = null;
|
||||
if (maxMsgs > 0 && totalMsgs > (ulong)maxMsgs)
|
||||
thresholdErr = StoreErrors.ErrMaxMsgs;
|
||||
else if (maxBytes > 0 && totalBytes > (ulong)maxBytes)
|
||||
thresholdErr = StoreErrors.ErrMaxBytes;
|
||||
|
||||
if (thresholdErr != null)
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamStoreFailedError(thresholdErr, JsApiErrors.Unless(thresholdErr)), thresholdErr);
|
||||
}
|
||||
|
||||
if (discardNewPer && maxMsgsPer > 0)
|
||||
{
|
||||
var bySubject = context.Store.SubjectsTotals(subject);
|
||||
var totalForSubject = bySubject.GetValueOrDefault(subject) + inflight.Ops;
|
||||
if (context.Inflight.TryGetValue(subject, out var streamInflight))
|
||||
totalForSubject += streamInflight.Ops;
|
||||
if (totalForSubject > (ulong)maxMsgsPer)
|
||||
{
|
||||
var err = StoreErrors.ErrMaxMsgsPerSubject;
|
||||
return (hdr, msg, 0, JsApiErrors.NewJSStreamStoreFailedError(err, JsApiErrors.Unless(err)), err);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return (hdr, msg, 0, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
internal interface IBatchTimer
|
||||
@@ -258,10 +750,10 @@ internal sealed class BatchStagedDiff
|
||||
public Dictionary<string, object?>? MsgIds { get; set; }
|
||||
|
||||
/// <summary>Running counter totals, keyed by subject.</summary>
|
||||
public Dictionary<string, object?>? Counter { get; set; }
|
||||
public Dictionary<string, MsgCounterRunningTotal>? Counter { get; set; }
|
||||
|
||||
/// <summary>Inflight subject byte/op totals for DiscardNew checks.</summary>
|
||||
public Dictionary<string, object?>? Inflight { get; set; }
|
||||
public Dictionary<string, InflightSubjectRunningTotal>? Inflight { get; set; }
|
||||
|
||||
/// <summary>Expected-last-seq-per-subject checks staged in this batch.</summary>
|
||||
public Dictionary<string, BatchExpectedPerSubject>? ExpectedPerSubject { get; set; }
|
||||
@@ -295,7 +787,7 @@ internal sealed class BatchApply
|
||||
public ulong Count { get; set; }
|
||||
|
||||
/// <summary>Raft committed entries that make up this batch.</summary>
|
||||
public List<object?>? Entries { get; set; }
|
||||
public List<ICommittedEntry>? Entries { get; set; }
|
||||
|
||||
/// <summary>Index within an entry indicating the first message of the batch.</summary>
|
||||
public int EntryStart { get; set; }
|
||||
@@ -317,4 +809,77 @@ internal sealed class BatchApply
|
||||
EntryStart = 0;
|
||||
MaxApplied = 0;
|
||||
}
|
||||
|
||||
public void RejectBatchStateLocked(BatchApplyContext context)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(context);
|
||||
|
||||
context.Clfs += Count;
|
||||
if (Entries is { Count: > 0 })
|
||||
{
|
||||
foreach (var entry in Entries)
|
||||
entry.ReturnToPool();
|
||||
}
|
||||
|
||||
ClearBatchStateLocked();
|
||||
}
|
||||
|
||||
public void RejectBatchState(BatchApplyContext context)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
RejectBatchStateLocked(context);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal interface ICommittedEntry
|
||||
{
|
||||
void ReturnToPool();
|
||||
}
|
||||
|
||||
internal sealed class BatchApplyContext
|
||||
{
|
||||
public ulong Clfs { get; set; }
|
||||
}
|
||||
|
||||
internal sealed class MsgCounterRunningTotal
|
||||
{
|
||||
public ulong Ops { get; set; }
|
||||
public BigInteger Total { get; set; }
|
||||
public Dictionary<string, Dictionary<string, string>>? Sources { get; set; }
|
||||
}
|
||||
|
||||
internal sealed class InflightSubjectRunningTotal
|
||||
{
|
||||
public ulong Bytes { get; set; }
|
||||
public ulong Ops { get; set; }
|
||||
}
|
||||
|
||||
internal sealed class BatchRuntimeState
|
||||
{
|
||||
public Dictionary<string, NatsStream.DedupeEntry> MsgIds { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, MsgCounterRunningTotal> ClusteredCounterTotal { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, InflightSubjectRunningTotal> Inflight { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<ulong, string> ExpectedPerSubjectSequence { get; } = new();
|
||||
public HashSet<string> ExpectedPerSubjectInProcess { get; } = new(StringComparer.Ordinal);
|
||||
}
|
||||
|
||||
internal sealed class BatchHeaderCheckContext
|
||||
{
|
||||
public string AccountName { get; set; } = string.Empty;
|
||||
public required IStreamStore Store { get; set; }
|
||||
public Dictionary<string, NatsStream.DedupeEntry> MsgIds { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, MsgCounterRunningTotal> ClusteredCounterTotal { get; } = new(StringComparer.Ordinal);
|
||||
public Dictionary<string, InflightSubjectRunningTotal> Inflight { get; } = new(StringComparer.Ordinal);
|
||||
public HashSet<string> ExpectedPerSubjectInProcess { get; } = new(StringComparer.Ordinal);
|
||||
public string[] StreamSubjects { get; set; } = Array.Empty<string>();
|
||||
public ulong ClSeq { get; set; }
|
||||
public ulong Clfs { get; set; }
|
||||
public int MaxPayload { get; set; } = int.MaxValue;
|
||||
}
|
||||
|
||||
internal sealed class CounterValue
|
||||
{
|
||||
public string Value { get; set; } = "0";
|
||||
}
|
||||
|
||||
@@ -143,4 +143,165 @@ public sealed class JetStreamBatchingCoreTests
|
||||
store.Received(1).Stop();
|
||||
JetStreamBatching.GlobalInflightBatches.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Commit_DiffContainsState_UpdatesRuntimeState()
|
||||
{
|
||||
var diff = new BatchStagedDiff
|
||||
{
|
||||
MsgIds = new Dictionary<string, object?> { ["id-1"] = null },
|
||||
Counter = new Dictionary<string, MsgCounterRunningTotal>
|
||||
{
|
||||
["foo"] = new() { Ops = 2, Total = new System.Numerics.BigInteger(5) },
|
||||
},
|
||||
Inflight = new Dictionary<string, InflightSubjectRunningTotal>
|
||||
{
|
||||
["foo"] = new() { Bytes = 10, Ops = 1 },
|
||||
},
|
||||
ExpectedPerSubject = new Dictionary<string, BatchExpectedPerSubject>
|
||||
{
|
||||
["foo"] = new() { SSeq = 3, ClSeq = 9 },
|
||||
},
|
||||
};
|
||||
|
||||
var state = new BatchRuntimeState();
|
||||
|
||||
JetStreamBatching.Commit(diff, state);
|
||||
|
||||
state.MsgIds.ContainsKey("id-1").ShouldBeTrue();
|
||||
state.ClusteredCounterTotal["foo"].Total.ShouldBe(new System.Numerics.BigInteger(5));
|
||||
state.Inflight["foo"].Ops.ShouldBe((ulong)1);
|
||||
state.ExpectedPerSubjectSequence[9].ShouldBe("foo");
|
||||
state.ExpectedPerSubjectInProcess.Contains("foo").ShouldBeTrue();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BatchApplyWrappers_ClearAndRejectPaths_UpdateState()
|
||||
{
|
||||
var entry = new TestCommittedEntry();
|
||||
var apply = new BatchApply
|
||||
{
|
||||
Id = "batch-A",
|
||||
Count = 3,
|
||||
EntryStart = 4,
|
||||
MaxApplied = 8,
|
||||
Entries = [entry],
|
||||
};
|
||||
|
||||
var context = new BatchApplyContext { Clfs = 10 };
|
||||
JetStreamBatching.RejectBatchStateLocked(apply, context);
|
||||
|
||||
context.Clfs.ShouldBe((ulong)13);
|
||||
entry.Returned.ShouldBeTrue();
|
||||
apply.Id.ShouldBe(string.Empty);
|
||||
apply.Count.ShouldBe((ulong)0);
|
||||
|
||||
apply.Id = "batch-B";
|
||||
apply.Count = 2;
|
||||
JetStreamBatching.ClearBatchStateLocked(apply);
|
||||
apply.Id.ShouldBe(string.Empty);
|
||||
apply.Count.ShouldBe((ulong)0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CheckMsgHeadersPreClusteredProposal_ExpectedStreamMismatch_ReturnsNotMatchError()
|
||||
{
|
||||
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
|
||||
try
|
||||
{
|
||||
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsExpectedStream, "OTHER");
|
||||
var diff = new BatchStagedDiff();
|
||||
var context = new BatchHeaderCheckContext
|
||||
{
|
||||
Store = store,
|
||||
ClSeq = 1,
|
||||
Clfs = 0,
|
||||
MaxPayload = 1024,
|
||||
StreamSubjects = ["ORDERS.>"],
|
||||
};
|
||||
|
||||
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
|
||||
diff,
|
||||
context,
|
||||
"ORDERS.created",
|
||||
hdr,
|
||||
[],
|
||||
sourced: false,
|
||||
name: "ORDERS",
|
||||
allowRollup: true,
|
||||
denyPurge: false,
|
||||
allowTtl: true,
|
||||
allowMsgCounter: false,
|
||||
allowMsgSchedules: false,
|
||||
discard: DiscardPolicy.DiscardOld,
|
||||
discardNewPer: false,
|
||||
maxMsgSize: -1,
|
||||
maxMsgs: -1,
|
||||
maxMsgsPer: -1,
|
||||
maxBytes: -1);
|
||||
|
||||
apiError.ShouldNotBeNull();
|
||||
apiError!.ErrCode.ShouldBe(JsApiErrors.StreamNotMatch.ErrCode);
|
||||
error.ShouldNotBeNull();
|
||||
}
|
||||
finally
|
||||
{
|
||||
store.Stop();
|
||||
}
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CheckMsgHeadersPreClusteredProposal_DuplicateMsgIdInBatch_ReturnsDuplicateError()
|
||||
{
|
||||
var store = new JetStreamMemStore(new StreamConfig { Name = "ORDERS", Storage = StorageType.MemoryStorage });
|
||||
try
|
||||
{
|
||||
var hdr = NatsMessageHeaders.GenHeader(null, NatsHeaderConstants.JsMsgId, "msg-1");
|
||||
var diff = new BatchStagedDiff
|
||||
{
|
||||
MsgIds = new Dictionary<string, object?> { ["msg-1"] = null },
|
||||
};
|
||||
var context = new BatchHeaderCheckContext
|
||||
{
|
||||
Store = store,
|
||||
ClSeq = 1,
|
||||
MaxPayload = 1024,
|
||||
};
|
||||
|
||||
var (_, _, _, apiError, error) = JetStreamBatching.CheckMsgHeadersPreClusteredProposal(
|
||||
diff,
|
||||
context,
|
||||
"ORDERS.created",
|
||||
hdr,
|
||||
[],
|
||||
sourced: false,
|
||||
name: "ORDERS",
|
||||
allowRollup: true,
|
||||
denyPurge: false,
|
||||
allowTtl: true,
|
||||
allowMsgCounter: false,
|
||||
allowMsgSchedules: false,
|
||||
discard: DiscardPolicy.DiscardOld,
|
||||
discardNewPer: false,
|
||||
maxMsgSize: -1,
|
||||
maxMsgs: -1,
|
||||
maxMsgsPer: -1,
|
||||
maxBytes: -1);
|
||||
|
||||
apiError.ShouldNotBeNull();
|
||||
apiError!.ErrCode.ShouldBe(JsApiErrors.AtomicPublishContainsDuplicateMessage.ErrCode);
|
||||
error.ShouldNotBeNull();
|
||||
}
|
||||
finally
|
||||
{
|
||||
store.Stop();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
internal sealed class TestCommittedEntry : ICommittedEntry
|
||||
{
|
||||
public bool Returned { get; private set; }
|
||||
|
||||
public void ReturnToPool() => Returned = true;
|
||||
}
|
||||
|
||||
BIN
porting.db
BIN
porting.db
Binary file not shown.
Reference in New Issue
Block a user