feat(batch15): complete group 6 msgblock/consumerfilestore
This commit is contained in:
@@ -1585,6 +1585,44 @@ internal sealed class MessageBlock
|
||||
return null;
|
||||
}
|
||||
|
||||
// Close the message block.
|
||||
internal void Close(bool sync)
|
||||
{
|
||||
Mu.EnterWriteLock();
|
||||
try
|
||||
{
|
||||
if (Closed)
|
||||
return;
|
||||
|
||||
if (Ctmr != null)
|
||||
{
|
||||
Ctmr.Dispose();
|
||||
Ctmr = null;
|
||||
}
|
||||
|
||||
Fss = null;
|
||||
_ = FlushPendingMsgsLocked();
|
||||
ClearCacheAndOffset();
|
||||
|
||||
Qch?.Writer.TryComplete();
|
||||
Qch = null;
|
||||
|
||||
if (Mfd != null)
|
||||
{
|
||||
if (sync)
|
||||
Mfd.Flush(flushToDisk: true);
|
||||
Mfd.Dispose();
|
||||
}
|
||||
|
||||
Mfd = null;
|
||||
Closed = true;
|
||||
}
|
||||
finally
|
||||
{
|
||||
Mu.ExitWriteLock();
|
||||
}
|
||||
}
|
||||
|
||||
// Remove a sequence from per-subject state and track whether first/last need recompute.
|
||||
// Lock should be held.
|
||||
internal ulong RemoveSeqPerSubject(string subj, ulong seq)
|
||||
@@ -3300,9 +3338,125 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
_name = name;
|
||||
_odir = odir;
|
||||
_ifn = Path.Combine(odir, FileStoreDefaults.ConsumerState);
|
||||
_fch = Channel.CreateBounded<byte>(1);
|
||||
_qch = Channel.CreateUnbounded<byte>();
|
||||
_ = Task.Run(() => FlushLoop(_fch, _qch));
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
TryLoadStateLocked();
|
||||
var loadErr = LoadState();
|
||||
if (loadErr != null)
|
||||
throw loadErr;
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? ConvertCipher()
|
||||
{
|
||||
byte[]? state;
|
||||
Exception? err;
|
||||
lock (_mu)
|
||||
{
|
||||
(state, err) = EncodeStateLocked();
|
||||
}
|
||||
|
||||
if (err != null || state == null)
|
||||
return err ?? new InvalidOperationException("unable to encode consumer state");
|
||||
|
||||
var metaErr = WriteConsumerMeta();
|
||||
if (metaErr != null)
|
||||
return metaErr;
|
||||
|
||||
return WriteState(state);
|
||||
}
|
||||
|
||||
// Kick flusher for this consumer.
|
||||
// Lock should be held.
|
||||
internal void KickFlusher()
|
||||
{
|
||||
if (_fch != null)
|
||||
_ = _fch.Writer.TryWrite(0);
|
||||
_dirty = true;
|
||||
}
|
||||
|
||||
// Set in flusher status.
|
||||
internal void SetInFlusher()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
_flusher = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Clear in flusher status.
|
||||
internal void ClearInFlusher()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
_flusher = false;
|
||||
}
|
||||
}
|
||||
|
||||
// Report in flusher status.
|
||||
internal bool InFlusher()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return _flusher;
|
||||
}
|
||||
}
|
||||
|
||||
// flushLoop watches for consumer updates and the quit channel.
|
||||
internal void FlushLoop(Channel<byte>? fch, Channel<byte>? qch)
|
||||
{
|
||||
SetInFlusher();
|
||||
try
|
||||
{
|
||||
if (fch == null)
|
||||
return;
|
||||
|
||||
var minTime = TimeSpan.FromMilliseconds(100);
|
||||
var lastWrite = DateTime.MinValue;
|
||||
|
||||
while (true)
|
||||
{
|
||||
if (qch?.Reader.Completion.IsCompleted == true)
|
||||
return;
|
||||
|
||||
var canRead = fch.Reader.WaitToReadAsync().AsTask().GetAwaiter().GetResult();
|
||||
if (!canRead)
|
||||
return;
|
||||
|
||||
while (fch.Reader.TryRead(out _))
|
||||
{
|
||||
}
|
||||
|
||||
if (lastWrite != DateTime.MinValue)
|
||||
{
|
||||
var elapsed = DateTime.UtcNow - lastWrite;
|
||||
if (elapsed < minTime)
|
||||
Thread.Sleep(minTime - elapsed);
|
||||
}
|
||||
|
||||
byte[]? buf;
|
||||
Exception? encodeErr;
|
||||
lock (_mu)
|
||||
{
|
||||
if (_closed)
|
||||
return;
|
||||
|
||||
(buf, encodeErr) = EncodeStateLocked();
|
||||
}
|
||||
|
||||
if (encodeErr != null || buf == null)
|
||||
return;
|
||||
|
||||
if (WriteState(buf) == null)
|
||||
lastWrite = DateTime.UtcNow;
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
ClearInFlusher();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3313,12 +3467,21 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
/// <inheritdoc/>
|
||||
public void SetStarting(ulong sseq)
|
||||
{
|
||||
byte[]? buf;
|
||||
Exception? encodeErr;
|
||||
lock (_mu)
|
||||
{
|
||||
_state.Delivered.Stream = sseq;
|
||||
_state.AckFloor.Stream = sseq;
|
||||
PersistStateLocked();
|
||||
(buf, encodeErr) = EncodeStateLocked();
|
||||
}
|
||||
|
||||
if (encodeErr != null || buf == null)
|
||||
throw encodeErr ?? new InvalidOperationException("unable to encode consumer state");
|
||||
|
||||
var writeErr = WriteState(buf);
|
||||
if (writeErr != null)
|
||||
throw writeErr;
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
@@ -3332,7 +3495,7 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
_state.Delivered.Stream = sseq;
|
||||
if (_cfg.Config.AckPolicy == AckPolicy.AckNone)
|
||||
_state.AckFloor.Stream = sseq;
|
||||
PersistStateLocked();
|
||||
KickFlusher();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3342,10 +3505,9 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
lock (_mu)
|
||||
{
|
||||
_state = new ConsumerState();
|
||||
_state.Delivered.Stream = sseq;
|
||||
_state.AckFloor.Stream = sseq;
|
||||
PersistStateLocked();
|
||||
}
|
||||
|
||||
SetStarting(sseq);
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
@@ -3353,10 +3515,10 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return _state.Delivered.Consumer != 0 ||
|
||||
_state.Delivered.Stream != 0 ||
|
||||
_state.Pending is { Count: > 0 } ||
|
||||
_state.Redelivered is { Count: > 0 };
|
||||
if (_state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0)
|
||||
return true;
|
||||
|
||||
return File.Exists(_ifn);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3418,7 +3580,7 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
}
|
||||
}
|
||||
|
||||
PersistStateLocked();
|
||||
KickFlusher();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3471,7 +3633,7 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
}
|
||||
}
|
||||
|
||||
PersistStateLocked();
|
||||
KickFlusher();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -3510,17 +3672,21 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
}
|
||||
|
||||
_state.Redelivered?.Remove(sseq);
|
||||
PersistStateLocked();
|
||||
KickFlusher();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void UpdateConfig(ConsumerConfig cfg)
|
||||
=> _ = UpdateConfigInternal(cfg);
|
||||
|
||||
// Used to update config for recovered ephemerals.
|
||||
internal Exception? UpdateConfigInternal(ConsumerConfig cfg)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
_cfg.Config = cfg;
|
||||
PersistStateLocked();
|
||||
return WriteConsumerMeta();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3544,31 +3710,17 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
throw new InvalidOperationException("old update ignored");
|
||||
|
||||
_state = CloneState(state, copyCollections: true);
|
||||
PersistStateLocked();
|
||||
KickFlusher();
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ConsumerState? State, Exception? Error) State()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (_closed)
|
||||
return (null, StoreErrors.ErrStoreClosed);
|
||||
return (CloneState(_state, copyCollections: true), null);
|
||||
}
|
||||
}
|
||||
=> StateWithCopy(doCopy: true);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public (ConsumerState? State, Exception? Error) BorrowState()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
if (_closed)
|
||||
return (null, StoreErrors.ErrStoreClosed);
|
||||
return (CloneState(_state, copyCollections: false), null);
|
||||
}
|
||||
}
|
||||
=> StateWithCopy(doCopy: false);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public byte[] EncodedState()
|
||||
@@ -3577,7 +3729,10 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
{
|
||||
if (_closed)
|
||||
throw StoreErrors.ErrStoreClosed;
|
||||
return JsonSerializer.SerializeToUtf8Bytes(CloneState(_state, copyCollections: true));
|
||||
var (buf, err) = EncodeStateLocked();
|
||||
if (err != null || buf == null)
|
||||
throw err ?? new InvalidOperationException("unable to encode consumer state");
|
||||
return buf;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -3587,27 +3742,257 @@ public sealed class ConsumerFileStore : IConsumerStore
|
||||
/// <inheritdoc/>
|
||||
public void Stop()
|
||||
{
|
||||
byte[]? buf = null;
|
||||
lock (_mu)
|
||||
{
|
||||
if (_closed)
|
||||
return;
|
||||
PersistStateLocked();
|
||||
|
||||
_qch?.Writer.TryComplete();
|
||||
_qch = null;
|
||||
|
||||
var hasState = _state.Delivered.Consumer != 0 ||
|
||||
_state.Delivered.Stream != 0 ||
|
||||
_state.Pending is { Count: > 0 } ||
|
||||
_state.Redelivered is { Count: > 0 };
|
||||
|
||||
if (_dirty || hasState)
|
||||
{
|
||||
var (encoded, err) = EncodeStateLocked();
|
||||
if (err == null)
|
||||
buf = encoded;
|
||||
}
|
||||
|
||||
_closed = true;
|
||||
}
|
||||
|
||||
_fs.RemoveConsumer(this);
|
||||
|
||||
if (buf is { Length: > 0 })
|
||||
{
|
||||
WaitOnFlusher();
|
||||
_ = WriteState(buf);
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void Delete()
|
||||
{
|
||||
Stop();
|
||||
if (Directory.Exists(_odir))
|
||||
Directory.Delete(_odir, recursive: true);
|
||||
}
|
||||
=> _ = DeleteInternal(streamDeleted: false);
|
||||
|
||||
/// <inheritdoc/>
|
||||
public void StreamDelete()
|
||||
=> Stop();
|
||||
=> _ = DeleteInternal(streamDeleted: true);
|
||||
|
||||
internal (byte[]? Buffer, Exception? Error) EncodeState()
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return EncodeStateLocked();
|
||||
}
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
internal (byte[]? Buffer, Exception? Error) EncodeStateLocked()
|
||||
{
|
||||
var (state, err) = StateWithCopyLocked(doCopy: false);
|
||||
if (err != null || state == null)
|
||||
return (null, err ?? StoreErrors.ErrStoreClosed);
|
||||
|
||||
return (JsonSerializer.SerializeToUtf8Bytes(state), null);
|
||||
}
|
||||
|
||||
// Will encrypt the state with the consumer key.
|
||||
// Current .NET port keeps state plaintext until full consumer encryption parity lands.
|
||||
internal (byte[]? Buffer, Exception? Error) EncryptState(byte[] buf)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(buf);
|
||||
return (buf, null);
|
||||
}
|
||||
|
||||
internal Exception? WriteState(byte[] buf)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(buf);
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
if (_writing || buf.Length == 0)
|
||||
return null;
|
||||
|
||||
var (encrypted, encryptErr) = EncryptState(buf);
|
||||
if (encryptErr != null || encrypted == null)
|
||||
return encryptErr ?? new InvalidOperationException("failed to encrypt consumer state");
|
||||
|
||||
buf = encrypted;
|
||||
_writing = true;
|
||||
_dirty = false;
|
||||
}
|
||||
|
||||
Exception? writeErr = null;
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(_odir);
|
||||
File.WriteAllBytes(_ifn, buf);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
writeErr = ex;
|
||||
}
|
||||
|
||||
lock (_mu)
|
||||
{
|
||||
if (writeErr != null)
|
||||
_dirty = true;
|
||||
_writing = false;
|
||||
}
|
||||
|
||||
return writeErr;
|
||||
}
|
||||
|
||||
// Write out consumer meta data (configuration).
|
||||
// Lock should be held.
|
||||
internal Exception? WriteConsumerMeta()
|
||||
{
|
||||
try
|
||||
{
|
||||
Directory.CreateDirectory(_odir);
|
||||
var meta = Path.Combine(_odir, FileStoreDefaults.JetStreamMetaFile);
|
||||
var b = JsonSerializer.SerializeToUtf8Bytes(_cfg);
|
||||
File.WriteAllBytes(meta, b);
|
||||
|
||||
var checksum = Convert.ToHexString(SHA256.HashData(b)).ToLowerInvariant();
|
||||
var sum = Path.Combine(_odir, FileStoreDefaults.JetStreamMetaFileSum);
|
||||
File.WriteAllText(sum, checksum);
|
||||
return null;
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
internal Dictionary<ulong, Pending>? CopyPending()
|
||||
{
|
||||
if (_state.Pending is not { Count: > 0 })
|
||||
return null;
|
||||
|
||||
var pending = new Dictionary<ulong, Pending>(_state.Pending.Count);
|
||||
foreach (var (seq, p) in _state.Pending)
|
||||
{
|
||||
pending[seq] = new Pending
|
||||
{
|
||||
Sequence = p.Sequence,
|
||||
Timestamp = p.Timestamp,
|
||||
};
|
||||
}
|
||||
|
||||
return pending;
|
||||
}
|
||||
|
||||
internal Dictionary<ulong, ulong>? CopyRedelivered()
|
||||
{
|
||||
if (_state.Redelivered is not { Count: > 0 })
|
||||
return null;
|
||||
|
||||
return new Dictionary<ulong, ulong>(_state.Redelivered);
|
||||
}
|
||||
|
||||
internal (ConsumerState? State, Exception? Error) StateWithCopy(bool doCopy)
|
||||
{
|
||||
lock (_mu)
|
||||
{
|
||||
return StateWithCopyLocked(doCopy);
|
||||
}
|
||||
}
|
||||
|
||||
// Lock should be held.
|
||||
internal (ConsumerState? State, Exception? Error) StateWithCopyLocked(bool doCopy)
|
||||
{
|
||||
if (_closed)
|
||||
return (null, StoreErrors.ErrStoreClosed);
|
||||
|
||||
if (_state.Delivered.Consumer != 0 || _state.Delivered.Stream != 0 || _state.Pending is { Count: > 0 } || _state.Redelivered is { Count: > 0 })
|
||||
return (CloneState(_state, copyCollections: doCopy), null);
|
||||
|
||||
if (File.Exists(_ifn))
|
||||
{
|
||||
try
|
||||
{
|
||||
var raw = File.ReadAllBytes(_ifn);
|
||||
if (raw.Length > 0)
|
||||
{
|
||||
var loaded = JsonSerializer.Deserialize<ConsumerState>(raw);
|
||||
if (loaded != null)
|
||||
_state = CloneState(loaded, copyCollections: true);
|
||||
}
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return (null, ex);
|
||||
}
|
||||
}
|
||||
|
||||
return (CloneState(_state, copyCollections: doCopy), null);
|
||||
}
|
||||
|
||||
// Lock should be held at startup.
|
||||
internal Exception? LoadState()
|
||||
{
|
||||
if (File.Exists(_ifn))
|
||||
{
|
||||
var (_, err) = StateWithCopyLocked(doCopy: false);
|
||||
return err;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
internal void WaitOnFlusher()
|
||||
{
|
||||
if (!InFlusher())
|
||||
return;
|
||||
|
||||
var timeoutAt = DateTime.UtcNow.AddMilliseconds(100);
|
||||
while (DateTime.UtcNow < timeoutAt)
|
||||
{
|
||||
if (!InFlusher())
|
||||
return;
|
||||
|
||||
Thread.Sleep(10);
|
||||
}
|
||||
}
|
||||
|
||||
internal Exception? DeleteInternal(bool streamDeleted)
|
||||
{
|
||||
string? removeDir = null;
|
||||
lock (_mu)
|
||||
{
|
||||
if (_closed)
|
||||
return null;
|
||||
|
||||
_qch?.Writer.TryComplete();
|
||||
_qch = null;
|
||||
_closed = true;
|
||||
removeDir = _odir;
|
||||
}
|
||||
|
||||
if (!streamDeleted)
|
||||
{
|
||||
try
|
||||
{
|
||||
if (!string.IsNullOrEmpty(removeDir) && Directory.Exists(removeDir))
|
||||
Directory.Delete(removeDir, recursive: true);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
||||
if (!streamDeleted)
|
||||
_fs.RemoveConsumer(this);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void TryLoadStateLocked()
|
||||
{
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
using System.Text.Json;
|
||||
using IronSnappy;
|
||||
|
||||
namespace ZB.MOM.NatsNet.Server;
|
||||
|
||||
@@ -51,4 +52,38 @@ public static class StoreEnumExtensions
|
||||
ArgumentNullException.ThrowIfNull(b);
|
||||
UnmarshalJSON(ref alg, b.AsSpan());
|
||||
}
|
||||
|
||||
public static (byte[]? Buffer, Exception? Error) Compress(this StoreCompression alg, byte[] buf)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(buf);
|
||||
|
||||
const int checksumSize = FileStoreDefaults.RecordHashSize;
|
||||
if (buf.Length < checksumSize)
|
||||
return (null, new InvalidDataException("uncompressed buffer is too short"));
|
||||
|
||||
return alg switch
|
||||
{
|
||||
StoreCompression.NoCompression => (buf, null),
|
||||
StoreCompression.S2Compression => CompressS2(buf, checksumSize),
|
||||
_ => (null, new InvalidOperationException("compression algorithm not known")),
|
||||
};
|
||||
}
|
||||
|
||||
private static (byte[]? Buffer, Exception? Error) CompressS2(byte[] buf, int checksumSize)
|
||||
{
|
||||
try
|
||||
{
|
||||
var bodyLength = buf.Length - checksumSize;
|
||||
var compressedBody = Snappy.Encode(buf.AsSpan(0, bodyLength));
|
||||
|
||||
var output = new byte[compressedBody.Length + checksumSize];
|
||||
Buffer.BlockCopy(compressedBody, 0, output, 0, compressedBody.Length);
|
||||
Buffer.BlockCopy(buf, bodyLength, output, compressedBody.Length, checksumSize);
|
||||
return (output, null);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
return (null, new IOException("error writing to compression writer", ex));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user