// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/stream.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server;
///
/// Represents a JetStream stream, managing message storage, replication, and lifecycle.
/// Mirrors the stream struct in server/stream.go.
///
internal sealed class NatsStream : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
public Account Account { get; private set; }
public string Name { get; private set; } = string.Empty;
public StreamConfig Config { get; private set; } = new();
public DateTime Created { get; private set; }
internal IStreamStore? Store { get; private set; }
// Atomic counters — use Interlocked for thread-safe access
internal long Msgs;
internal long Bytes;
internal long FirstSeq;
internal long LastSeq;
internal bool IsMirror;
private bool _closed;
private bool _isLeader;
private ulong _leaderTerm;
private bool _sealed;
private CancellationTokenSource? _quitCts;
/// IRaftNode — stored as object to avoid cross-dependency on Raft session.
private object? _node;
private StreamAssignment? _assignment;
private bool _migrating;
private bool _recovering;
public NatsStream(Account account, StreamConfig config, DateTime created)
{
Account = account;
Name = config.Name ?? string.Empty;
Config = config;
Created = created;
_quitCts = new CancellationTokenSource();
}
// -------------------------------------------------------------------------
// Factory
// -------------------------------------------------------------------------
///
/// Creates a new after validating the configuration.
/// Returns null if the stream cannot be created (stub: always throws).
/// Mirrors newStream / stream.create in server/stream.go.
///
public static NatsStream? Create(
Account acc,
StreamConfig cfg,
object? jsacc,
IStreamStore? store,
StreamAssignment? sa,
object? server)
{
ArgumentNullException.ThrowIfNull(acc);
ArgumentNullException.ThrowIfNull(cfg);
var stream = new NatsStream(acc, cfg.Clone(), DateTime.UtcNow)
{
Store = store,
IsMirror = cfg.Mirror != null,
_assignment = sa,
};
return stream;
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
///
/// Stops processing and tears down goroutines / timers.
/// Mirrors stream.stop in server/stream.go.
///
public void Stop()
{
_mu.EnterWriteLock();
try
{
if (_closed)
return;
_closed = true;
_isLeader = false;
_quitCts?.Cancel();
}
finally
{
_mu.ExitWriteLock();
}
}
///
/// Deletes the stream and all stored messages permanently.
/// Mirrors stream.delete in server/stream.go.
///
public void Delete()
{
_mu.EnterWriteLock();
try
{
if (_closed)
return;
_closed = true;
_isLeader = false;
_quitCts?.Cancel();
Store?.Delete(inline: true);
Store = null;
}
finally
{
_mu.ExitWriteLock();
}
}
///
/// Purges messages from the stream according to the optional request filter.
/// Mirrors stream.purge in server/stream.go.
///
public void Purge(StreamPurgeRequest? req = null)
{
_mu.EnterWriteLock();
try
{
if (_closed || Store == null)
return;
if (req == null || (string.IsNullOrEmpty(req.Filter) && req.Sequence == 0 && req.Keep == 0))
Store.Purge();
else
Store.PurgeEx(req.Filter ?? string.Empty, req.Sequence, req.Keep);
SyncCountersFromState(Store.State());
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Info / State
// -------------------------------------------------------------------------
///
/// Returns a snapshot of stream info including config, state, and cluster information.
/// Mirrors stream.info in server/stream.go.
///
public StreamInfo GetInfo(bool includeDeleted = false)
{
_mu.EnterReadLock();
try
{
return new StreamInfo
{
Config = Config.Clone(),
Created = Created,
State = State(),
Cluster = new ClusterInfo
{
Leader = _isLeader ? Name : null,
},
};
}
finally
{
_mu.ExitReadLock();
}
}
///
/// Asynchronously returns a snapshot of stream info.
/// Mirrors stream.info (async path) in server/stream.go.
///
public Task GetInfoAsync(bool includeDeleted = false, CancellationToken ct = default) =>
ct.IsCancellationRequested
? Task.FromCanceled(ct)
: Task.FromResult(GetInfo(includeDeleted));
///
/// Returns the current stream state (message counts, byte totals, sequences).
/// Mirrors stream.state in server/stream.go.
///
public StreamState State()
{
_mu.EnterReadLock();
try
{
if (Store != null)
return Store.State();
return new StreamState
{
Msgs = (ulong)Math.Max(0, Interlocked.Read(ref Msgs)),
Bytes = (ulong)Math.Max(0, Interlocked.Read(ref Bytes)),
FirstSeq = (ulong)Math.Max(0, Interlocked.Read(ref FirstSeq)),
LastSeq = (ulong)Math.Max(0, Interlocked.Read(ref LastSeq)),
};
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Leadership
// -------------------------------------------------------------------------
///
/// Transitions this stream into or out of the leader role.
/// Mirrors stream.setLeader in server/stream.go.
///
public void SetLeader(bool isLeader, ulong term)
{
_mu.EnterWriteLock();
try
{
_isLeader = isLeader;
_leaderTerm = term;
}
finally
{
_mu.ExitWriteLock();
}
}
///
/// Returns true if this server is the current stream leader.
/// Mirrors stream.isLeader in server/stream.go.
///
public bool IsLeader()
{
_mu.EnterReadLock();
try { return _isLeader && !_closed; }
finally { _mu.ExitReadLock(); }
}
// -------------------------------------------------------------------------
// Configuration
// -------------------------------------------------------------------------
///
/// Returns the owning account.
/// Mirrors stream.account in server/stream.go.
///
public Account GetAccount()
{
_mu.EnterReadLock();
try { return Account; }
finally { _mu.ExitReadLock(); }
}
///
/// Returns the current stream configuration.
/// Mirrors stream.config in server/stream.go.
///
public StreamConfig GetConfig()
{
_mu.EnterReadLock();
try { return Config.Clone(); }
finally { _mu.ExitReadLock(); }
}
///
/// Applies an updated configuration to the stream.
/// Mirrors stream.update in server/stream.go.
///
public void UpdateConfig(StreamConfig config)
{
_mu.EnterWriteLock();
try
{
ArgumentNullException.ThrowIfNull(config);
Config = config.Clone();
Store?.UpdateConfig(Config);
_sealed = Config.Sealed;
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// Sealed state
// -------------------------------------------------------------------------
///
/// Returns true if the stream is sealed (no new messages accepted).
/// Mirrors stream.isSealed in server/stream.go.
///
public bool IsSealed()
{
_mu.EnterReadLock();
try { return _sealed || Config.Sealed; }
finally { _mu.ExitReadLock(); }
}
public RaftGroup? RaftGroup()
{
_mu.EnterReadLock();
try { return _assignment?.Group; }
finally { _mu.ExitReadLock(); }
}
public IRaftNode? RaftNode()
{
_mu.EnterReadLock();
try { return _node as IRaftNode; }
finally { _mu.ExitReadLock(); }
}
public void RemoveNode()
{
_mu.EnterWriteLock();
try
{
if (_node is IRaftNode raft)
raft.Delete();
_node = null;
}
finally
{
_mu.ExitWriteLock();
}
}
public void WaitOnConsumerAssignments(CancellationToken cancellationToken = default)
{
if (cancellationToken.IsCancellationRequested)
return;
var stopAt = DateTime.UtcNow.AddSeconds(2);
while (DateTime.UtcNow < stopAt)
{
cancellationToken.ThrowIfCancellationRequested();
if (!_recovering)
break;
Thread.Sleep(50);
}
}
public bool IsMigrating()
{
_mu.EnterReadLock();
try { return _migrating; }
finally { _mu.ExitReadLock(); }
}
public bool ResetClusteredState(Exception? cause = null)
{
_mu.EnterWriteLock();
try
{
_recovering = true;
_isLeader = false;
_leaderTerm = 0;
_migrating = false;
if (cause != null && _node is IRaftNode raft)
raft.StepDown();
return true;
}
finally
{
_mu.ExitWriteLock();
}
}
public bool SkipBatchIfRecovering()
{
_mu.EnterReadLock();
try { return _recovering; }
finally { _mu.ExitReadLock(); }
}
public bool ShouldSendLostQuorum()
{
_mu.EnterReadLock();
try
{
var replicas = Math.Max(1, Config.Replicas);
return replicas > 1 && _node is IRaftNode raft && raft.Leaderless();
}
finally
{
_mu.ExitReadLock();
}
}
///
/// Seals the stream so that no new messages can be stored.
/// Mirrors stream.seal in server/stream.go.
///
public void Seal()
{
_mu.EnterWriteLock();
try
{
_sealed = true;
Config.Sealed = true;
}
finally
{
_mu.ExitWriteLock();
}
}
private void SyncCountersFromState(StreamState state)
{
Interlocked.Exchange(ref Msgs, (long)state.Msgs);
Interlocked.Exchange(ref Bytes, (long)state.Bytes);
Interlocked.Exchange(ref FirstSeq, (long)state.FirstSeq);
Interlocked.Exchange(ref LastSeq, (long)state.LastSeq);
}
// -------------------------------------------------------------------------
// IDisposable
// -------------------------------------------------------------------------
public void Dispose()
{
_quitCts?.Cancel();
_quitCts?.Dispose();
_quitCts = null;
_mu.Dispose();
}
}