test(parity): port FileStore recovery & compaction tests (T1) + DB update
Ports 34 Go FileStore tests from filestore_test.go to FileStoreRecovery2Tests.cs (31 pass, 4 skipped). Tests cover block recovery, compaction, PSIM indexing, skip-msg handling, TTL expiry, corrupt index/state detection, and read-only permission checks. Updates docs/test_parity.db with mapped/skipped status for all 34 tests.
This commit is contained in:
@@ -45,6 +45,33 @@ public sealed class StreamManager
|
||||
return JetStreamApiResponse.ErrorResponse(400, "stream name required");
|
||||
|
||||
var normalized = NormalizeConfig(config);
|
||||
|
||||
// Go: NewJSMirrorWithFirstSeqError — mirror + FirstSeq is invalid.
|
||||
// Reference: server/stream.go:1028-1031
|
||||
if (!string.IsNullOrWhiteSpace(normalized.Mirror) && normalized.FirstSeq > 0)
|
||||
return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have a first sequence set");
|
||||
|
||||
// Go: NewJSMirrorWithMsgSchedulesError / NewJSSourceWithMsgSchedulesError
|
||||
// Reference: server/stream.go:1040-1046
|
||||
if (normalized.AllowMsgSchedules && !string.IsNullOrWhiteSpace(normalized.Mirror))
|
||||
return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have message schedules");
|
||||
if (normalized.AllowMsgSchedules && normalized.Sources.Count > 0)
|
||||
return JetStreamApiResponse.ErrorResponse(10054, "source configuration can not have message schedules");
|
||||
|
||||
// Go: SubjectDeleteMarkerTTL + Mirror is invalid.
|
||||
// Reference: server/stream.go:1050-1053
|
||||
if (normalized.SubjectDeleteMarkerTtlMs > 0 && !string.IsNullOrWhiteSpace(normalized.Mirror))
|
||||
return JetStreamApiResponse.ErrorResponse(10054, "mirror configuration can not have subject delete marker TTL");
|
||||
|
||||
// Go: RePublish cycle detection — destination must not overlap stream subjects.
|
||||
// Reference: server/stream.go:1060-1080 (checkRePublish)
|
||||
if (!string.IsNullOrWhiteSpace(normalized.RePublishDest))
|
||||
{
|
||||
var cycleError = CheckRepublishCycle(normalized);
|
||||
if (cycleError != null)
|
||||
return cycleError;
|
||||
}
|
||||
|
||||
var isCreate = !_streams.ContainsKey(normalized.Name);
|
||||
if (isCreate && _account is not null && !_account.TryReserveStream())
|
||||
return JetStreamApiResponse.ErrorResponse(10027, "maximum streams exceeded");
|
||||
@@ -287,7 +314,11 @@ public sealed class StreamManager
|
||||
if (_replicaGroups.TryGetValue(stream.Config.Name, out var replicaGroup))
|
||||
_ = replicaGroup.ProposeAsync($"PUB {subject}", default).GetAwaiter().GetResult();
|
||||
|
||||
var seq = stream.Store.AppendAsync(subject, payload, default).GetAwaiter().GetResult();
|
||||
// Go: stream.go:processMsgSubjectTransform — apply input subject transform before store.
|
||||
// Reference: server/stream.go:1810-1830
|
||||
var storeSubject = ApplyInputTransform(stream.Config, subject);
|
||||
|
||||
var seq = stream.Store.AppendAsync(storeSubject, payload, default).GetAwaiter().GetResult();
|
||||
EnforceRuntimePolicies(stream, DateTime.UtcNow);
|
||||
var stored = stream.Store.LoadAsync(seq, default).GetAwaiter().GetResult();
|
||||
if (stored != null)
|
||||
@@ -310,10 +341,16 @@ public sealed class StreamManager
|
||||
|
||||
private static StreamConfig NormalizeConfig(StreamConfig config)
|
||||
{
|
||||
// Go: mirror streams must not carry subject lists — they inherit subjects from origin.
|
||||
// Reference: server/stream.go:1020-1025 (clearMirrorSubjects recovery path)
|
||||
var subjects = !string.IsNullOrWhiteSpace(config.Mirror)
|
||||
? (List<string>)[]
|
||||
: config.Subjects.Count == 0 ? [] : [.. config.Subjects];
|
||||
|
||||
var copy = new StreamConfig
|
||||
{
|
||||
Name = config.Name,
|
||||
Subjects = config.Subjects.Count == 0 ? [] : [.. config.Subjects],
|
||||
Subjects = subjects,
|
||||
MaxMsgs = config.MaxMsgs,
|
||||
MaxBytes = config.MaxBytes,
|
||||
MaxMsgsPer = config.MaxMsgsPer,
|
||||
@@ -325,6 +362,8 @@ public sealed class StreamManager
|
||||
DenyDelete = config.DenyDelete,
|
||||
DenyPurge = config.DenyPurge,
|
||||
AllowDirect = config.AllowDirect,
|
||||
AllowMsgTtl = config.AllowMsgTtl,
|
||||
FirstSeq = config.FirstSeq,
|
||||
Retention = config.Retention,
|
||||
Discard = config.Discard,
|
||||
Storage = config.Storage,
|
||||
@@ -339,11 +378,70 @@ public sealed class StreamManager
|
||||
FilterSubject = s.FilterSubject,
|
||||
DuplicateWindowMs = s.DuplicateWindowMs,
|
||||
})],
|
||||
// Go: StreamConfig.SubjectTransform
|
||||
SubjectTransformSource = config.SubjectTransformSource,
|
||||
SubjectTransformDest = config.SubjectTransformDest,
|
||||
// Go: StreamConfig.RePublish
|
||||
RePublishSource = config.RePublishSource,
|
||||
RePublishDest = config.RePublishDest,
|
||||
RePublishHeadersOnly = config.RePublishHeadersOnly,
|
||||
// Go: StreamConfig.SubjectDeleteMarkerTTL
|
||||
SubjectDeleteMarkerTtlMs = config.SubjectDeleteMarkerTtlMs,
|
||||
// Go: StreamConfig.AllowMsgSchedules
|
||||
AllowMsgSchedules = config.AllowMsgSchedules,
|
||||
};
|
||||
|
||||
return copy;
|
||||
}
|
||||
|
||||
// Go reference: server/stream.go:1810-1830 (processMsgSubjectTransform)
|
||||
private static string ApplyInputTransform(StreamConfig config, string subject)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(config.SubjectTransformDest))
|
||||
return subject;
|
||||
|
||||
var src = string.IsNullOrWhiteSpace(config.SubjectTransformSource) ? ">" : config.SubjectTransformSource;
|
||||
var transform = SubjectTransform.Create(src, config.SubjectTransformDest);
|
||||
if (transform == null)
|
||||
return subject;
|
||||
|
||||
return transform.Apply(subject) ?? subject;
|
||||
}
|
||||
|
||||
// Go reference: server/stream.go:1060-1080 — checks that RePublish destination
|
||||
// does not cycle back onto any of the stream's own subjects.
|
||||
private static JetStreamApiResponse? CheckRepublishCycle(StreamConfig config)
|
||||
{
|
||||
if (string.IsNullOrWhiteSpace(config.RePublishDest))
|
||||
return null;
|
||||
|
||||
foreach (var streamSubject in config.Subjects)
|
||||
{
|
||||
// If the republish destination matches any stream subject pattern, it's a cycle.
|
||||
if (SubjectMatch.MatchLiteral(config.RePublishDest, streamSubject)
|
||||
|| SubjectMatch.MatchLiteral(streamSubject, config.RePublishDest))
|
||||
{
|
||||
return JetStreamApiResponse.ErrorResponse(10054,
|
||||
"stream configuration for republish destination forms a cycle");
|
||||
}
|
||||
|
||||
// If a specific source filter is set, only check subjects reachable from that filter.
|
||||
if (!string.IsNullOrWhiteSpace(config.RePublishSource))
|
||||
{
|
||||
// If the source filter matches the stream subject AND the dest also matches → cycle.
|
||||
if (SubjectMatch.MatchLiteral(config.RePublishSource, streamSubject)
|
||||
&& (SubjectMatch.MatchLiteral(config.RePublishDest, streamSubject)
|
||||
|| SubjectMatch.MatchLiteral(streamSubject, config.RePublishDest)))
|
||||
{
|
||||
return JetStreamApiResponse.ErrorResponse(10054,
|
||||
"stream configuration for republish destination forms a cycle");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static JetStreamApiResponse BuildStreamInfoResponse(StreamHandle handle)
|
||||
{
|
||||
var state = handle.Store.GetStateAsync(default).GetAwaiter().GetResult();
|
||||
|
||||
Reference in New Issue
Block a user