feat: port session 18 — JetStream File Store
- FileStoreTypes: FileStoreConfig, FileStreamInfo, FileConsumerInfo, Psi, Cache, MsgId, CompressionInfo, ErrBadMsg, FileStoreDefaults constants - FileStore: JetStreamFileStore implementing IStreamStore (26 methods stubbed) with State/Type/Stop/Register* properly implemented - MessageBlock: MessageBlock type with all 40+ fields, ConsumerFileStore stub - 312 features complete (IDs 951-1262)
This commit is contained in:
428
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs
Normal file
428
dotnet/src/ZB.MOM.NatsNet.Server/JetStream/FileStore.cs
Normal file
@@ -0,0 +1,428 @@
|
||||
// Copyright 2019-2026 The NATS Authors
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
//
|
||||
// Adapted from server/filestore.go (fileStore struct and methods)
|
||||
|
||||
using System.Threading.Channels;
|
||||
using ZB.MOM.NatsNet.Server.Internal.DataStructures;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
/// <summary>
|
||||
/// File-backed implementation of <see cref="IStreamStore"/>.
|
||||
/// Stores JetStream messages in per-block files on disk with optional
|
||||
/// encryption and compression.
|
||||
/// Mirrors the <c>fileStore</c> struct in filestore.go.
|
||||
/// </summary>
|
||||
public sealed class JetStreamFileStore : IStreamStore, IDisposable
|
||||
{
|
||||
// -----------------------------------------------------------------------
|
||||
// Fields — mirrors fileStore struct fields
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.NoRecursion);
|
||||
|
||||
// State
|
||||
private StreamState _state = new();
|
||||
private List<ulong>? _tombs;
|
||||
private LostStreamData? _ld;
|
||||
|
||||
// Callbacks
|
||||
private StorageUpdateHandler? _scb;
|
||||
private StorageRemoveMsgHandler? _rmcb;
|
||||
private ProcessJetStreamMsgHandler? _pmsgcb;
|
||||
|
||||
// Age-check timer
|
||||
private Timer? _ageChk;
|
||||
private bool _ageChkRun;
|
||||
private long _ageChkTime;
|
||||
|
||||
// Background sync timer
|
||||
private Timer? _syncTmr;
|
||||
|
||||
// Configuration
|
||||
private FileStreamInfo _cfg;
|
||||
private FileStoreConfig _fcfg;
|
||||
|
||||
// Message block list and index
|
||||
private MessageBlock? _lmb; // last (active write) block
|
||||
private List<MessageBlock> _blks = [];
|
||||
private Dictionary<uint, MessageBlock> _bim = [];
|
||||
|
||||
// Per-subject index map
|
||||
private SubjectTree<Psi>? _psim;
|
||||
|
||||
// Total subject-list length (sum of subject-string lengths)
|
||||
private int _tsl;
|
||||
|
||||
// writeFullState concurrency guard
|
||||
private readonly object _wfsmu = new();
|
||||
private long _wfsrun; // Interlocked: is writeFullState running?
|
||||
private int _wfsadml; // Average dmap length (protected by _wfsmu)
|
||||
|
||||
// Quit / load-done channels (Channel<byte> mimics chan struct{})
|
||||
private Channel<byte>? _qch;
|
||||
private Channel<byte>? _fsld;
|
||||
|
||||
// Consumer list
|
||||
private readonly ReaderWriterLockSlim _cmu = new(LockRecursionPolicy.NoRecursion);
|
||||
private List<IConsumerStore> _cfs = [];
|
||||
|
||||
// Snapshot-in-progress count
|
||||
private int _sips;
|
||||
|
||||
// Dirty-write counter (incremented when writes are pending flush)
|
||||
private int _dirty;
|
||||
|
||||
// Lifecycle flags
|
||||
private bool _closing;
|
||||
private volatile bool _closed;
|
||||
|
||||
// Flush-in-progress flag
|
||||
private bool _fip;
|
||||
|
||||
// Whether the store has ever received a message
|
||||
private bool _receivedAny;
|
||||
|
||||
// Whether the first sequence has been moved forward
|
||||
private bool _firstMoved;
|
||||
|
||||
// Last PurgeEx call time (for throttle logic)
|
||||
private DateTime _lpex;
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// Constructor
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <summary>
|
||||
/// Initialises a file-backed stream store using the supplied file-store
|
||||
/// configuration and stream information.
|
||||
/// </summary>
|
||||
/// <param name="fcfg">File-store configuration (block size, cipher, paths, etc.).</param>
|
||||
/// <param name="cfg">Stream metadata (created time and stream config).</param>
|
||||
/// <exception cref="ArgumentNullException">
|
||||
/// Thrown when <paramref name="fcfg"/> or <paramref name="cfg"/> is null.
|
||||
/// </exception>
|
||||
public JetStreamFileStore(FileStoreConfig fcfg, FileStreamInfo cfg)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(fcfg);
|
||||
ArgumentNullException.ThrowIfNull(cfg);
|
||||
|
||||
_fcfg = fcfg;
|
||||
_cfg = cfg;
|
||||
|
||||
// Apply defaults (mirrors newFileStoreWithCreated in filestore.go).
|
||||
if (_fcfg.BlockSize == 0)
|
||||
_fcfg.BlockSize = FileStoreDefaults.DefaultLargeBlockSize;
|
||||
if (_fcfg.CacheExpire == TimeSpan.Zero)
|
||||
_fcfg.CacheExpire = FileStoreDefaults.DefaultCacheBufferExpiration;
|
||||
if (_fcfg.SubjectStateExpire == TimeSpan.Zero)
|
||||
_fcfg.SubjectStateExpire = FileStoreDefaults.DefaultFssExpiration;
|
||||
if (_fcfg.SyncInterval == TimeSpan.Zero)
|
||||
_fcfg.SyncInterval = FileStoreDefaults.DefaultSyncInterval;
|
||||
|
||||
_psim = new SubjectTree<Psi>();
|
||||
_bim = new Dictionary<uint, MessageBlock>();
|
||||
_qch = Channel.CreateUnbounded<byte>();
|
||||
_fsld = Channel.CreateUnbounded<byte>();
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — type / state
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public StorageType Type() => StorageType.FileStorage;
|
||||
|
||||
/// <inheritdoc/>
|
||||
public StreamState State()
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
// Return a shallow copy so callers cannot mutate internal state.
|
||||
return new StreamState
|
||||
{
|
||||
Msgs = _state.Msgs,
|
||||
Bytes = _state.Bytes,
|
||||
FirstSeq = _state.FirstSeq,
|
||||
FirstTime = _state.FirstTime,
|
||||
LastSeq = _state.LastSeq,
|
||||
LastTime = _state.LastTime,
|
||||
NumSubjects = _state.NumSubjects,
|
||||
NumDeleted = _state.NumDeleted,
|
||||
Deleted = _state.Deleted,
|
||||
Lost = _state.Lost,
|
||||
Consumers = _state.Consumers,
|
||||
};
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void FastState(StreamState state)
|
||||
{
|
||||
_mu.EnterReadLock();
|
||||
try
|
||||
{
|
||||
state.Msgs = _state.Msgs;
|
||||
state.Bytes = _state.Bytes;
|
||||
state.FirstSeq = _state.FirstSeq;
|
||||
state.FirstTime = _state.FirstTime;
|
||||
state.LastSeq = _state.LastSeq;
|
||||
state.LastTime = _state.LastTime;
|
||||
state.NumDeleted = _state.NumDeleted;
|
||||
state.Consumers = _state.Consumers;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitReadLock();
|
||||
}
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — callback registration
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void RegisterStorageUpdates(StorageUpdateHandler cb)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try { _scb = cb; }
|
||||
finally { _mu.ExitWriteLock(); }
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void RegisterStorageRemoveMsg(StorageRemoveMsgHandler cb)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try { _rmcb = cb; }
|
||||
finally { _mu.ExitWriteLock(); }
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void RegisterProcessJetStreamMsg(ProcessJetStreamMsgHandler cb)
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try { _pmsgcb = cb; }
|
||||
finally { _mu.ExitWriteLock(); }
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — lifecycle
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Stop()
|
||||
{
|
||||
_mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (_closing) return;
|
||||
_closing = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
_mu.ExitWriteLock();
|
||||
}
|
||||
|
||||
_ageChk?.Dispose();
|
||||
_ageChk = null;
|
||||
_syncTmr?.Dispose();
|
||||
_syncTmr = null;
|
||||
|
||||
_closed = true;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Dispose() => Stop();
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — store / load (all stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Seq, long Ts) StoreMsg(string subject, byte[]? hdr, byte[]? msg, long ttl)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore StoreMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void StoreRawMsg(string subject, byte[]? hdr, byte[]? msg, ulong seq, long ts, long ttl, bool discardNewCheck)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore StoreRawMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Seq, Exception? Error) SkipMsg(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SkipMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void SkipMsgs(ulong seq, ulong num)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SkipMsgs");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void FlushAllPending()
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore FlushAllPending");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public StoreMsg? LoadMsg(ulong seq, StoreMsg? sm)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (StoreMsg? Sm, ulong Skip) LoadNextMsg(string filter, bool wc, ulong start, StoreMsg? smp)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (StoreMsg? Sm, ulong Skip) LoadNextMsgMulti(object? sl, ulong start, StoreMsg? smp)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadNextMsgMulti");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public StoreMsg? LoadLastMsg(string subject, StoreMsg? sm)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadLastMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (StoreMsg? Sm, Exception? Error) LoadPrevMsg(ulong start, StoreMsg? smp)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (StoreMsg? Sm, ulong Skip, Exception? Error) LoadPrevMsgMulti(object? sl, ulong start, StoreMsg? smp)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore LoadPrevMsgMulti");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (bool Removed, Exception? Error) RemoveMsg(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore RemoveMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (bool Removed, Exception? Error) EraseMsg(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore EraseMsg");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Purged, Exception? Error) Purge()
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Purge");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Purged, Exception? Error) PurgeEx(string subject, ulong seq, ulong keep)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore PurgeEx");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Purged, Exception? Error) Compact(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Compact");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Truncate(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Truncate");
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — query methods (all stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public ulong GetSeqFromTime(DateTime t)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore GetSeqFromTime");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public SimpleState FilteredState(ulong seq, string subject)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore FilteredState");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public Dictionary<string, SimpleState> SubjectsState(string filterSubject)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SubjectsState");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public Dictionary<string, ulong> SubjectsTotals(string filterSubject)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SubjectsTotals");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong[] Seqs, Exception? Error) AllLastSeqs()
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore AllLastSeqs");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong[] Seqs, Exception? Error) MultiLastSeqs(string[] filters, ulong maxSeq, int maxAllowed)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore MultiLastSeqs");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (string Subject, Exception? Error) SubjectForSeq(ulong seq)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SubjectForSeq");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Total, ulong ValidThrough, Exception? Error) NumPending(ulong sseq, string filter, bool lastPerSubject)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore NumPending");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Total, ulong ValidThrough, Exception? Error) NumPendingMulti(ulong sseq, object? sl, bool lastPerSubject)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore NumPendingMulti");
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — stream state encoding (stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (byte[] Enc, Exception? Error) EncodedStreamState(ulong failed)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore EncodedStreamState");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void SyncDeleted(DeleteBlocks dbs)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore SyncDeleted");
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — config / admin (stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void UpdateConfig(StreamConfig cfg)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore UpdateConfig");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Delete(bool inline)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Delete");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void ResetState()
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore ResetState");
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — consumer management (stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public IConsumerStore ConsumerStore(string name, DateTime created, ConsumerConfig cfg)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore ConsumerStore");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void AddConsumer(IConsumerStore o)
|
||||
{
|
||||
_cmu.EnterWriteLock();
|
||||
try { _cfs.Add(o); }
|
||||
finally { _cmu.ExitWriteLock(); }
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void RemoveConsumer(IConsumerStore o)
|
||||
{
|
||||
_cmu.EnterWriteLock();
|
||||
try { _cfs.Remove(o); }
|
||||
finally { _cmu.ExitWriteLock(); }
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------
|
||||
// IStreamStore — snapshot / utilization (stubs)
|
||||
// -----------------------------------------------------------------------
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (SnapshotResult? Result, Exception? Error) Snapshot(TimeSpan deadline, bool includeConsumers, bool checkMsgs)
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Snapshot");
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ulong Total, ulong Reported, Exception? Error) Utilization()
|
||||
=> throw new NotImplementedException("TODO: session 18 — filestore Utilization");
|
||||
}
|
||||
Reference in New Issue
Block a user