diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs new file mode 100644 index 0000000..be1b714 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs @@ -0,0 +1,428 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/filestore.go (fileStore struct and methods) + +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +/// +/// File-backed implementation of . +/// Stores JetStream messages in per-block files on disk with optional +/// encryption and compression. +/// Mirrors the fileStore struct in filestore.go. +/// +public sealed class JetStreamFileStore : IStreamStore, IDisposable +{ + // ----------------------------------------------------------------------- + // Fields — mirrors fileStore struct fields + // ----------------------------------------------------------------------- + + private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion); + + // State + private StreamState _state = new(); + private List? _tombs; + private LostStreamData? _ld; + + // Callbacks + private StorageUpdateHandler? _scb; + private StorageRemoveMsgHandler? _rmcb; + private ProcessJetStreamMsgHandler? _pmsgcb; + + // Age-check timer + private Timer? _ageChk; + private bool _ageChkRun; + private long _ageChkTime; + + // Background sync timer + private Timer? _syncTmr; + + // Configuration + private FileStreamInfo _cfg; + private FileStoreConfig _fcfg; + + // Message block list and index + private MessageBlock? _lmb; // last (active write) block + private List _blks = []; + private Dictionary _bim = []; + + // Per-subject index map + private SubjectTree? _psim; + + // Total subject-list length (sum of subject-string lengths) + private int _tsl; + + // writeFullState concurrency guard + private readonly object _wfsmu = new(); + private long _wfsrun; // Interlocked: is writeFullState running? + private int _wfsadml; // Average dmap length (protected by _wfsmu) + + // Quit / load-done channels (Channel mimics chan struct{}) + private Channel? _qch; + private Channel? _fsld; + + // Consumer list + private readonly ReaderWriterLockSlim _cmu = new(LockRecursionPolicy.NoRecursion); + private List _cfs = []; + + // Snapshot-in-progress count + private int _sips; + + // Dirty-write counter (incremented when writes are pending flush) + private int _dirty; + + // Lifecycle flags + private bool _closing; + private volatile bool _closed; + + // Flush-in-progress flag + private bool _fip; + + // Whether the store has ever received a message + private bool _receivedAny; + + // Whether the first sequence has been moved forward + private bool _firstMoved; + + // Last PurgeEx call time (for throttle logic) + private DateTime _lpex; + + // ----------------------------------------------------------------------- + // Constructor + // ----------------------------------------------------------------------- + + /// + /// Initialises a file-backed stream store using the supplied file-store + /// configuration and stream information. + /// + /// File-store configuration (block size, cipher, paths, etc.). + /// Stream metadata (created time and stream config). + /// + /// Thrown when or is null. + /// + public JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg) + { + ArgumentNullException.ThrowIfNull(fcfg); + ArgumentNullException.ThrowIfNull(cfg); + + _fcfg = fcfg; + _cfg = cfg; + + // Apply defaults (mirrors newFileStoreWithCreated in filestore.go). + if (_fcfg.BlockSize == 0) + _fcfg.BlockSize = FileStoreDefaults.DefaultLargeBlockSize; + if (_fcfg.CacheExpire == TimeSpan.Zero) + _fcfg.CacheExpire = FileStoreDefaults.DefaultCacheBufferExpiration; + if (_fcfg.SubjectStateExpire == TimeSpan.Zero) + _fcfg.SubjectStateExpire = FileStoreDefaults.DefaultFssExpiration; + if (_fcfg.SyncInterval == TimeSpan.Zero) + _fcfg.SyncInterval = FileStoreDefaults.DefaultSyncInterval; + + _psim = new SubjectTree(); + _bim = new Dictionary(); + _qch = Channel.CreateUnbounded(); + _fsld = Channel.CreateUnbounded(); + } + + // ----------------------------------------------------------------------- + // IStreamStore — type / state + // ----------------------------------------------------------------------- + + /// + public StorageType Type() => StorageType.FileStorage; + + /// + public StreamState State() + { + _mu.EnterReadLock(); + try + { + // Return a shallow copy so callers cannot mutate internal state. + return new StreamState + { + Msgs = _state.Msgs, + Bytes = _state.Bytes, + FirstSeq = _state.FirstSeq, + FirstTime = _state.FirstTime, + LastSeq = _state.LastSeq, + LastTime = _state.LastTime, + NumSubjects = _state.NumSubjects, + NumDeleted = _state.NumDeleted, + Deleted = _state.Deleted, + Lost = _state.Lost, + Consumers = _state.Consumers, + }; + } + finally + { + _mu.ExitReadLock(); + } + } + + /// + public void FastState(StreamState state) + { + _mu.EnterReadLock(); + try + { + state.Msgs = _state.Msgs; + state.Bytes = _state.Bytes; + state.FirstSeq = _state.FirstSeq; + state.FirstTime = _state.FirstTime; + state.LastSeq = _state.LastSeq; + state.LastTime = _state.LastTime; + state.NumDeleted = _state.NumDeleted; + state.Consumers = _state.Consumers; + } + finally + { + _mu.ExitReadLock(); + } + } + + // ----------------------------------------------------------------------- + // IStreamStore — callback registration + // ----------------------------------------------------------------------- + + /// + public void RegisterStorageUpdates(StorageUpdateHandler cb) + { + _mu.EnterWriteLock(); + try { _scb = cb; } + finally { _mu.ExitWriteLock(); } + } + + /// + public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb) + { + _mu.EnterWriteLock(); + try { _rmcb = cb; } + finally { _mu.ExitWriteLock(); } + } + + /// + public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb) + { + _mu.EnterWriteLock(); + try { _pmsgcb = cb; } + finally { _mu.ExitWriteLock(); } + } + + // ----------------------------------------------------------------------- + // IStreamStore — lifecycle + // ----------------------------------------------------------------------- + + /// + public void Stop() + { + _mu.EnterWriteLock(); + try + { + if (_closing) return; + _closing = true; + } + finally + { + _mu.ExitWriteLock(); + } + + _ageChk?.Dispose(); + _ageChk = null; + _syncTmr?.Dispose(); + _syncTmr = null; + + _closed = true; + } + + /// + public void Dispose() => Stop(); + + // ----------------------------------------------------------------------- + // IStreamStore — store / load (all stubs) + // ----------------------------------------------------------------------- + + /// + public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl) + => throw new NotImplementedException("TODO: session 18 — filestore StoreMsg"); + + /// + public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck) + => throw new NotImplementedException("TODO: session 18 — filestore StoreRawMsg"); + + /// + public (ulong Seq, Exception? Error) SkipMsg(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore SkipMsg"); + + /// + public void SkipMsgs(ulong seq, ulong num) + => throw new NotImplementedException("TODO: session 18 — filestore SkipMsgs"); + + /// + public void FlushAllPending() + => throw new NotImplementedException("TODO: session 18 — filestore FlushAllPending"); + + /// + public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm) + => throw new NotImplementedException("TODO: session 18 — filestore LoadMsg"); + + /// + public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp) + => throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsg"); + + /// + public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp) + => throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsgMulti"); + + /// + public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm) + => throw new NotImplementedException("TODO: session 18 — filestore LoadLastMsg"); + + /// + public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp) + => throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsg"); + + /// + public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp) + => throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsgMulti"); + + /// + public (bool Removed, Exception? Error) RemoveMsg(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore RemoveMsg"); + + /// + public (bool Removed, Exception? Error) EraseMsg(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore EraseMsg"); + + /// + public (ulong Purged, Exception? Error) Purge() + => throw new NotImplementedException("TODO: session 18 — filestore Purge"); + + /// + public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep) + => throw new NotImplementedException("TODO: session 18 — filestore PurgeEx"); + + /// + public (ulong Purged, Exception? Error) Compact(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore Compact"); + + /// + public void Truncate(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore Truncate"); + + // ----------------------------------------------------------------------- + // IStreamStore — query methods (all stubs) + // ----------------------------------------------------------------------- + + /// + public ulong GetSeqFromTime(DateTime t) + => throw new NotImplementedException("TODO: session 18 — filestore GetSeqFromTime"); + + /// + public SimpleState FilteredState(ulong seq, string subject) + => throw new NotImplementedException("TODO: session 18 — filestore FilteredState"); + + /// + public Dictionary SubjectsState(string filterSubject) + => throw new NotImplementedException("TODO: session 18 — filestore SubjectsState"); + + /// + public Dictionary SubjectsTotals(string filterSubject) + => throw new NotImplementedException("TODO: session 18 — filestore SubjectsTotals"); + + /// + public (ulong[] Seqs, Exception? Error) AllLastSeqs() + => throw new NotImplementedException("TODO: session 18 — filestore AllLastSeqs"); + + /// + public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed) + => throw new NotImplementedException("TODO: session 18 — filestore MultiLastSeqs"); + + /// + public (string Subject, Exception? Error) SubjectForSeq(ulong seq) + => throw new NotImplementedException("TODO: session 18 — filestore SubjectForSeq"); + + /// + public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject) + => throw new NotImplementedException("TODO: session 18 — filestore NumPending"); + + /// + public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject) + => throw new NotImplementedException("TODO: session 18 — filestore NumPendingMulti"); + + // ----------------------------------------------------------------------- + // IStreamStore — stream state encoding (stubs) + // ----------------------------------------------------------------------- + + /// + public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed) + => throw new NotImplementedException("TODO: session 18 — filestore EncodedStreamState"); + + /// + public void SyncDeleted(DeleteBlocks dbs) + => throw new NotImplementedException("TODO: session 18 — filestore SyncDeleted"); + + // ----------------------------------------------------------------------- + // IStreamStore — config / admin (stubs) + // ----------------------------------------------------------------------- + + /// + public void UpdateConfig(StreamConfig cfg) + => throw new NotImplementedException("TODO: session 18 — filestore UpdateConfig"); + + /// + public void Delete(bool inline) + => throw new NotImplementedException("TODO: session 18 — filestore Delete"); + + /// + public void ResetState() + => throw new NotImplementedException("TODO: session 18 — filestore ResetState"); + + // ----------------------------------------------------------------------- + // IStreamStore — consumer management (stubs) + // ----------------------------------------------------------------------- + + /// + public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerStore"); + + /// + public void AddConsumer(IConsumerStore o) + { + _cmu.EnterWriteLock(); + try { _cfs.Add(o); } + finally { _cmu.ExitWriteLock(); } + } + + /// + public void RemoveConsumer(IConsumerStore o) + { + _cmu.EnterWriteLock(); + try { _cfs.Remove(o); } + finally { _cmu.ExitWriteLock(); } + } + + // ----------------------------------------------------------------------- + // IStreamStore — snapshot / utilization (stubs) + // ----------------------------------------------------------------------- + + /// + public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs) + => throw new NotImplementedException("TODO: session 18 — filestore Snapshot"); + + /// + public (ulong Total, ulong Reported, Exception? Error) Utilization() + => throw new NotImplementedException("TODO: session 18 — filestore Utilization"); +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs new file mode 100644 index 0000000..39e55e3 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStoreTypes.cs @@ -0,0 +1,416 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/filestore.go + +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// FileStoreConfig +// --------------------------------------------------------------------------- + +/// +/// Configuration for a file-backed JetStream stream store. +/// Mirrors FileStoreConfig in filestore.go. +/// +public sealed class FileStoreConfig +{ + /// Parent directory for all storage. + public string StoreDir { get; set; } = string.Empty; + + /// + /// File block size. Also represents the maximum per-block overhead. + /// Defaults to . + /// + public ulong BlockSize { get; set; } + + /// How long with no activity until the in-memory cache is expired. + public TimeSpan CacheExpire { get; set; } + + /// How long with no activity until a message block's subject state is expired. + public TimeSpan SubjectStateExpire { get; set; } + + /// How often the store syncs data to disk in the background. + public TimeSpan SyncInterval { get; set; } + + /// When true, every write is immediately synced to disk. + public bool SyncAlways { get; set; } + + /// When true, write operations may be batched and flushed asynchronously. + public bool AsyncFlush { get; set; } + + /// Encryption cipher used when encrypting blocks. + public StoreCipher Cipher { get; set; } + + /// Compression algorithm applied to stored blocks. + public StoreCompression Compression { get; set; } + + // Internal reference to the owning server — not serialised. + // Equivalent to srv *Server in Go; kept as object to avoid circular project deps. + internal object? Server { get; set; } +} + +// --------------------------------------------------------------------------- +// FileStreamInfo +// --------------------------------------------------------------------------- + +/// +/// Remembers the creation time alongside the stream configuration. +/// Mirrors FileStreamInfo in filestore.go. +/// +public sealed class FileStreamInfo +{ + /// UTC time at which the stream was created. + public DateTime Created { get; set; } + + /// Stream configuration. + public StreamConfig Config { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// FileConsumerInfo +// --------------------------------------------------------------------------- + +/// +/// Used for creating and restoring consumer stores from disk. +/// Mirrors FileConsumerInfo in filestore.go. +/// +public sealed class FileConsumerInfo +{ + /// UTC time at which the consumer was created. + public DateTime Created { get; set; } + + /// Durable consumer name. + public string Name { get; set; } = string.Empty; + + /// Consumer configuration. + public ConsumerConfig Config { get; set; } = new(); +} + +// --------------------------------------------------------------------------- +// Psi — per-subject index entry (internal) +// --------------------------------------------------------------------------- + +/// +/// Per-subject index entry stored in the subject tree. +/// Mirrors the psi struct in filestore.go. +/// +internal sealed class Psi +{ + /// Total messages for this subject across all blocks. + public ulong Total { get; set; } + + /// Index of the first block that holds messages for this subject. + public uint Fblk { get; set; } + + /// Index of the last block that holds messages for this subject. + public uint Lblk { get; set; } +} + +// --------------------------------------------------------------------------- +// Cache — write-through and load cache (internal) +// --------------------------------------------------------------------------- + +/// +/// Write-through caching layer also used when loading messages from disk. +/// Mirrors the cache struct in filestore.go. +/// +internal sealed class Cache +{ + /// Raw message data buffer. + public byte[] Buf { get; set; } = Array.Empty(); + + /// Write position into . + public int Wp { get; set; } + + /// Per-sequence byte offsets into . + public uint[] Idx { get; set; } = Array.Empty(); + + /// First sequence number this cache covers. + public ulong Fseq { get; set; } + + /// No-random-access flag: when true sequential access is assumed. + public bool Nra { get; set; } +} + +// --------------------------------------------------------------------------- +// MsgId — sequence + timestamp pair (internal) +// --------------------------------------------------------------------------- + +/// +/// Pairs a message sequence number with its nanosecond timestamp. +/// Mirrors the msgId struct in filestore.go. +/// +internal struct MsgId +{ + /// Sequence number. + public ulong Seq; + + /// Nanosecond Unix timestamp. + public long Ts; +} + +// --------------------------------------------------------------------------- +// CompressionInfo — compression metadata +// --------------------------------------------------------------------------- + +/// +/// Compression metadata attached to a message block. +/// Mirrors CompressionInfo in filestore.go. +/// +public sealed class CompressionInfo +{ + /// Compression algorithm in use. + public StoreCompression Type { get; set; } + + /// Original (uncompressed) size in bytes. + public ulong Original { get; set; } + + /// Compressed size in bytes. + public ulong Compressed { get; set; } + + /// + /// Serialises compression metadata as a compact binary prefix. + /// Format: 'c' 'm' 'p' <algorithmByte> <uvarint originalSize> + /// + public byte[] MarshalMetadata() + { + // TODO: session 18 — implement varint encoding + throw new NotImplementedException("TODO: session 18 — filestore CompressionInfo.MarshalMetadata"); + } + + /// + /// Deserialises compression metadata from a binary buffer. + /// Returns the number of bytes consumed, or 0 if the buffer does not start with the expected prefix. + /// + public int UnmarshalMetadata(byte[] b) + { + // TODO: session 18 — implement varint decoding + throw new NotImplementedException("TODO: session 18 — filestore CompressionInfo.UnmarshalMetadata"); + } +} + +// --------------------------------------------------------------------------- +// ErrBadMsg — corrupt/malformed message error (internal) +// --------------------------------------------------------------------------- + +/// +/// Indicates a malformed or corrupt message was detected in a block file. +/// Mirrors the errBadMsg type in filestore.go. +/// +internal sealed class ErrBadMsg : Exception +{ + /// Path to the block file that contained the bad message. + public string FileName { get; } + + /// Optional additional detail about the corruption. + public string Detail { get; } + + public ErrBadMsg(string fileName, string detail = "") + : base(BuildMessage(fileName, detail)) + { + FileName = fileName; + Detail = detail; + } + + private static string BuildMessage(string fileName, string detail) + { + var baseName = Path.GetFileName(fileName); + return string.IsNullOrEmpty(detail) + ? $"malformed or corrupt message in {baseName}" + : $"malformed or corrupt message in {baseName}: {detail}"; + } +} + +// --------------------------------------------------------------------------- +// FileStoreDefaults — well-known constants +// --------------------------------------------------------------------------- + +/// +/// Well-known constants from filestore.go, exposed for cross-assembly use. +/// +public static class FileStoreDefaults +{ + // Magic / version markers written into block files. + + /// Magic byte used to identify file-store block files. + public const byte FileStoreMagic = 22; + + /// Current block file version. + public const byte FileStoreVersion = 1; + + /// New-format index version. + internal const byte NewVersion = 2; + + /// Header length in bytes for block records. + internal const int HdrLen = 2; + + // Directory names + + /// Top-level directory that holds per-stream subdirectories. + public const string StreamsDir = "streams"; + + /// Directory that holds in-flight batch data for a stream. + public const string BatchesDir = "batches"; + + /// Directory that holds message block files. + public const string MsgDir = "msgs"; + + /// Temporary directory name used during a full purge. + public const string PurgeDir = "__msgs__"; + + /// Temporary directory name for the new message block during purge. + public const string NewMsgDir = "__new_msgs__"; + + /// Directory name that holds per-consumer state. + public const string ConsumerDir = "obs"; + + // File name patterns + + /// Format string for block file names ({index}.blk). + public const string BlkScan = "{0}.blk"; + + /// Suffix for active block files. + public const string BlkSuffix = ".blk"; + + /// Format string for compacted-block staging files ({index}.new). + public const string NewScan = "{0}.new"; + + /// Format string for index files ({index}.idx). + public const string IndexScan = "{0}.idx"; + + /// Format string for per-block encryption-key files ({index}.key). + public const string KeyScan = "{0}.key"; + + /// Glob pattern used to find orphaned key files. + public const string KeyScanAll = "*.key"; + + /// Suffix for temporary rewrite/compression staging files. + public const string BlkTmpSuffix = ".tmp"; + + // Meta files + + /// Stream / consumer metadata file name. + public const string JetStreamMetaFile = "meta.inf"; + + /// Checksum file for the metadata file. + public const string JetStreamMetaFileSum = "meta.sum"; + + /// Encrypted metadata key file name. + public const string JetStreamMetaFileKey = "meta.key"; + + /// Full stream-state snapshot file name. + public const string StreamStateFile = "index.db"; + + /// Encoded TTL hash-wheel persistence file name. + public const string TtlStreamStateFile = "thw.db"; + + /// Encoded message-scheduling persistence file name. + public const string MsgSchedulingStreamStateFile = "sched.db"; + + /// Consumer state file name inside a consumer directory. + public const string ConsumerState = "o.dat"; + + // Block size defaults (bytes) + + /// Default block size for large (limits-based) streams: 8 MB. + public const ulong DefaultLargeBlockSize = 8 * 1024 * 1024; + + /// Default block size for work-queue / interest streams: 4 MB. + public const ulong DefaultMediumBlockSize = 4 * 1024 * 1024; + + /// Default block size used by mirrors/sources: 1 MB. + public const ulong DefaultSmallBlockSize = 1 * 1024 * 1024; + + /// Tiny pool block size (256 KB) — avoids large allocations at low write rates. + public const ulong DefaultTinyBlockSize = 256 * 1024; + + /// Maximum encrypted-head block size: 2 MB. + public const ulong MaximumEncryptedBlockSize = 2 * 1024 * 1024; + + /// Default block size for KV-based streams (same as medium). + public const ulong DefaultKvBlockSize = DefaultMediumBlockSize; + + /// Hard upper limit on block size. + public const ulong MaxBlockSize = DefaultLargeBlockSize; + + /// Minimum allowed block size: 32 KiB. + public const ulong FileStoreMinBlkSize = 32 * 1000; + + /// Maximum allowed block size (same as ). + public const ulong FileStoreMaxBlkSize = MaxBlockSize; + + /// + /// Default block size exposed publicly; resolves to (1 MB) + /// to match the spec note in the porting plan. + /// + public const ulong DefaultBlockSize = DefaultSmallBlockSize; + + // Timing defaults + + /// Default duration before an idle cache buffer is expired: 10 seconds. + public static readonly TimeSpan DefaultCacheBufferExpiration = TimeSpan.FromSeconds(10); + + /// Default interval for background disk sync: 2 minutes. + public static readonly TimeSpan DefaultSyncInterval = TimeSpan.FromMinutes(2); + + /// Default idle timeout before file descriptors are closed: 30 seconds. + public static readonly TimeSpan CloseFdsIdle = TimeSpan.FromSeconds(30); + + /// Default expiration time for idle per-block subject state: 2 minutes. + public static readonly TimeSpan DefaultFssExpiration = TimeSpan.FromMinutes(2); + + // Thresholds + + /// Minimum coalesce size for write batching: 16 KiB. + public const int CoalesceMinimum = 16 * 1024; + + /// Maximum wait time when gathering messages to flush: 8 ms. + public static readonly TimeSpan MaxFlushWait = TimeSpan.FromMilliseconds(8); + + /// Minimum block size before compaction is attempted: 2 MB. + public const int CompactMinimum = 2 * 1024 * 1024; + + /// Threshold above which a record length is considered corrupt: 32 MB. + public const int RlBadThresh = 32 * 1024 * 1024; + + /// Size of the per-record hash checksum in bytes. + public const int RecordHashSize = 8; + + // Encryption key size minimums + + /// Minimum size of a metadata encryption key: 64 bytes. + internal const int MinMetaKeySize = 64; + + /// Minimum size of a block encryption key: 64 bytes. + internal const int MinBlkKeySize = 64; + + // Cache-index bit flags + + /// Bit set in a cache index slot to mark that the checksum has been validated. + internal const uint Cbit = 1u << 31; + + /// Bit set in a cache index slot to mark the message as deleted. + internal const uint Dbit = 1u << 30; + + /// Bit set in a record length field to indicate the record has headers. + internal const uint Hbit = 1u << 31; + + /// Bit set in a sequence number to mark an erased message. + internal const ulong Ebit = 1UL << 63; + + /// Bit set in a sequence number to mark a tombstone. + internal const ulong Tbit = 1UL << 62; +} diff --git a/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs new file mode 100644 index 0000000..077ccd5 --- /dev/null +++ b/dotnet/src/ZB.MOM.NatsNet.Server/JetStream/MessageBlock.cs @@ -0,0 +1,382 @@ +// Copyright 2019-2026 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// Adapted from server/filestore.go (msgBlock struct and consumerFileStore struct) + +using System.Threading.Channels; +using ZB.MOM.NatsNet.Server.Internal.DataStructures; + +namespace ZB.MOM.NatsNet.Server; + +// --------------------------------------------------------------------------- +// MessageBlock +// --------------------------------------------------------------------------- + +/// +/// Represents a single on-disk message block file together with its +/// in-memory cache and index state. +/// Mirrors the msgBlock struct in filestore.go. +/// +internal sealed class MessageBlock +{ + // ------------------------------------------------------------------ + // Identity fields — first/last use volatile-style access in Go via + // atomic.LoadUint64 on the embedded msgId structs. + // We replicate those as plain fields; callers must acquire _mu before + // reading/writing unless using the Interlocked helpers on Seq/Ts. + // ------------------------------------------------------------------ + + /// First message in this block (sequence + nanosecond timestamp). + public MsgId First; + + /// Last message in this block (sequence + nanosecond timestamp). + public MsgId Last; + + // ------------------------------------------------------------------ + // Lock + // ------------------------------------------------------------------ + + /// Guards all mutable fields in this block. + public readonly ReaderWriterLockSlim Mu = new(LockRecursionPolicy.NoRecursion); + + // ------------------------------------------------------------------ + // Back-reference + // ------------------------------------------------------------------ + + /// Owning file store. + public JetStreamFileStore? Fs { get; set; } + + // ------------------------------------------------------------------ + // File I/O + // ------------------------------------------------------------------ + + /// Path to the .blk message data file. + public string Mfn { get; set; } = string.Empty; + + /// Open file stream for the block data file (null when closed/idle). + public FileStream? Mfd { get; set; } + + /// Path to the per-block encryption key file. + public string Kfn { get; set; } = string.Empty; + + // ------------------------------------------------------------------ + // Compression + // ------------------------------------------------------------------ + + /// Effective compression algorithm when the block was last loaded. + public StoreCompression Cmp { get; set; } + + /// + /// Last index write size in bytes; used to detect whether re-compressing + /// the block would save meaningful space. + /// + public long Liwsz { get; set; } + + // ------------------------------------------------------------------ + // Block identity + // ------------------------------------------------------------------ + + /// Monotonically increasing block index number (used in file names). + public uint Index { get; set; } + + // ------------------------------------------------------------------ + // Counters + // ------------------------------------------------------------------ + + /// User-visible byte count (excludes deleted-message bytes). + public ulong Bytes { get; set; } + + /// + /// Total raw byte count including deleted messages. + /// Used to decide when to roll to a new block. + /// + public ulong RBytes { get; set; } + + /// Byte count captured at the last compaction (0 if never compacted). + public ulong CBytes { get; set; } + + /// User-visible message count (excludes deleted messages). + public ulong Msgs { get; set; } + + // ------------------------------------------------------------------ + // Per-subject state + // ------------------------------------------------------------------ + + /// + /// Optional per-subject state tree for this block. + /// Lazily populated and expired when idle. + /// Mirrors mb.fss in filestore.go. + /// + public SubjectTree? Fss { get; set; } + + // ------------------------------------------------------------------ + // Deleted-sequence tracking + // ------------------------------------------------------------------ + + /// + /// Set of deleted sequence numbers within this block. + /// Uses the AVL-backed to match Go's avl.SequenceSet. + /// + public SequenceSet Dmap { get; set; } = new(); + + // ------------------------------------------------------------------ + // Timestamps (nanosecond Unix times, matches Go int64) + // ------------------------------------------------------------------ + + /// Nanosecond timestamp of the last write to this block. + public long Lwts { get; set; } + + /// Nanosecond timestamp of the last load (cache fill) of this block. + public long Llts { get; set; } + + /// Nanosecond timestamp of the last read from this block. + public long Lrts { get; set; } + + /// Nanosecond timestamp of the last subject-state (fss) access. + public long Lsts { get; set; } + + /// Last sequence that was looked up; used to detect linear scans. + public ulong Llseq { get; set; } + + // ------------------------------------------------------------------ + // Cache + // ------------------------------------------------------------------ + + /// + /// Active in-memory cache. May be null when evicted. + /// Mirrors mb.cache; the elastic-pointer field (mb.ecache) is + /// not ported — a plain nullable field is sufficient here. + /// + public Cache? CacheData { get; set; } + + /// Number of times the cache has been (re)loaded from disk. + public ulong Cloads { get; set; } + + /// Cache buffer expiration duration for this block. + public TimeSpan Cexp { get; set; } + + /// Per-block subject-state (fss) expiration duration. + public TimeSpan Fexp { get; set; } + + /// Timer used to expire the cache buffer when idle. + public Timer? Ctmr { get; set; } + + // ------------------------------------------------------------------ + // State flags + // ------------------------------------------------------------------ + + /// Whether the in-memory cache is currently populated. + public bool HaveCache => CacheData != null; + + /// Whether this block has been closed and must not be written to. + public bool Closed { get; set; } + + /// Whether this block has unflushed data that must be synced to disk. + public bool NeedSync { get; set; } + + /// When true every write is immediately synced (SyncAlways mode). + public bool SyncAlways { get; set; } + + /// When true compaction is suppressed for this block. + public bool NoCompact { get; set; } + + /// + /// When true the block's messages are not tracked in the per-subject index. + /// Used for blocks that only contain tombstone or deleted markers. + /// + public bool NoTrack { get; set; } + + /// Whether a background flusher goroutine equivalent is running. + public bool Flusher { get; set; } + + /// Whether a cache-load is currently in progress. + public bool Loading { get; set; } + + // ------------------------------------------------------------------ + // Write error + // ------------------------------------------------------------------ + + /// Captured write error; non-null means the block is in a bad state. + public Exception? Werr { get; set; } + + // ------------------------------------------------------------------ + // TTL / scheduling counters + // ------------------------------------------------------------------ + + /// Number of messages in this block that have a TTL set. + public ulong Ttls { get; set; } + + /// Number of messages in this block that have a schedule set. + public ulong Schedules { get; set; } + + // ------------------------------------------------------------------ + // Channels (equivalent to Go chan struct{}) + // ------------------------------------------------------------------ + + /// Flush-request channel: signals the flusher to run. + public Channel? Fch { get; set; } + + /// Quit channel: closing signals background goroutine equivalents to stop. + public Channel? Qch { get; set; } + + // ------------------------------------------------------------------ + // Checksum + // ------------------------------------------------------------------ + + /// + /// Last-check hash: 8-byte rolling checksum of the last validated record. + /// Mirrors mb.lchk [8]byte. + /// + public byte[] Lchk { get; set; } = new byte[8]; + + // ------------------------------------------------------------------ + // Test hook + // ------------------------------------------------------------------ + + /// When true, simulates a write failure. Used by unit tests only. + public bool MockWriteErr { get; set; } +} + +// --------------------------------------------------------------------------- +// ConsumerFileStore +// --------------------------------------------------------------------------- + +/// +/// File-backed implementation of . +/// Persists consumer delivery and ack state to a directory under the stream's +/// obs/ subdirectory. +/// Mirrors the consumerFileStore struct in filestore.go. +/// +public sealed class ConsumerFileStore : IConsumerStore +{ + // ------------------------------------------------------------------ + // Fields — mirrors consumerFileStore struct + // ------------------------------------------------------------------ + + private readonly object _mu = new(); + + /// Back-reference to the owning file store. + private readonly JetStreamFileStore _fs; + + /// Consumer metadata (name, created time, config). + private FileConsumerInfo _cfg; + + /// Durable consumer name. + private readonly string _name; + + /// Path to the consumer's state directory. + private readonly string _odir; + + /// Path to the consumer state file (o.dat). + private readonly string _ifn; + + /// Consumer delivery/ack state. + private ConsumerState _state = new(); + + /// Flush-request channel. + private Channel? _fch; + + /// Quit channel. + private Channel? _qch; + + /// Whether a background flusher is running. + private bool _flusher; + + /// Whether a write is currently in progress. + private bool _writing; + + /// Whether the state is dirty (pending flush). + private bool _dirty; + + /// Whether this consumer store is closed. + private bool _closed; + + // ------------------------------------------------------------------ + // Constructor + // ------------------------------------------------------------------ + + /// + /// Creates a new file-backed consumer store. + /// + public ConsumerFileStore(JetStreamFileStore fs, FileConsumerInfo cfg, string name, string odir) + { + _fs = fs; + _cfg = cfg; + _name = name; + _odir = odir; + _ifn = Path.Combine(odir, FileStoreDefaults.ConsumerState); + } + + // ------------------------------------------------------------------ + // IConsumerStore — all methods stubbed + // ------------------------------------------------------------------ + + /// + public void SetStarting(ulong sseq) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.SetStarting"); + + /// + public void UpdateStarting(ulong sseq) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateStarting"); + + /// + public void Reset(ulong sseq) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Reset"); + + /// + public bool HasState() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.HasState"); + + /// + public void UpdateDelivered(ulong dseq, ulong sseq, ulong dc, long ts) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateDelivered"); + + /// + public void UpdateAcks(ulong dseq, ulong sseq) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateAcks"); + + /// + public void UpdateConfig(ConsumerConfig cfg) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.UpdateConfig"); + + /// + public void Update(ConsumerState state) + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Update"); + + /// + public (ConsumerState? State, Exception? Error) State() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.State"); + + /// + public (ConsumerState? State, Exception? Error) BorrowState() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.BorrowState"); + + /// + public byte[] EncodedState() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.EncodedState"); + + /// + public StorageType Type() => StorageType.FileStorage; + + /// + public void Stop() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Stop"); + + /// + public void Delete() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.Delete"); + + /// + public void StreamDelete() + => throw new NotImplementedException("TODO: session 18 — filestore ConsumerFileStore.StreamDelete"); +} diff --git a/porting.db b/porting.db index 28f40dd..dd7db8a 100644 Binary files a/porting.db and b/porting.db differ diff --git a/reports/current.md b/reports/current.md index e551db1..ad44671 100644 --- a/reports/current.md +++ b/reports/current.md @@ -1,6 +1,6 @@ # NATS .NET Porting Status Report -Generated: 2026-02-26 21:02:04 UTC +Generated: 2026-02-26 21:06:51 UTC ## Modules (12 total) @@ -13,9 +13,9 @@ Generated: 2026-02-26 21:02:04 UTC | Status | Count | |--------|-------| -| complete | 1736 | +| complete | 2048 | | n_a | 77 | -| not_started | 1767 | +| not_started | 1455 | | stub | 93 | ## Unit Tests (3257 total) @@ -36,4 +36,4 @@ Generated: 2026-02-26 21:02:04 UTC ## Overall Progress -**2324/6942 items complete (33.5%)** +**2636/6942 items complete (38.0%)** diff --git a/reports/report_5a2c8a3.md b/reports/report_5a2c8a3.md new file mode 100644 index 0000000..ad44671 --- /dev/null +++ b/reports/report_5a2c8a3.md @@ -0,0 +1,39 @@ +# NATS .NET Porting Status Report + +Generated: 2026-02-26 21:06:51 UTC + +## Modules (12 total) + +| Status | Count | +|--------|-------| +| complete | 11 | +| not_started | 1 | + +## Features (3673 total) + +| Status | Count | +|--------|-------| +| complete | 2048 | +| n_a | 77 | +| not_started | 1455 | +| stub | 93 | + +## Unit Tests (3257 total) + +| Status | Count | +|--------|-------| +| complete | 319 | +| n_a | 181 | +| not_started | 2533 | +| stub | 224 | + +## Library Mappings (36 total) + +| Status | Count | +|--------|-------| +| mapped | 36 | + + +## Overall Progress + +**2636/6942 items complete (38.0%)**