// Copyright 2019-2026 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// Adapted from server/consumer.go in the NATS server Go source.
namespace ZB.MOM.NatsNet.Server;
///
/// Represents a JetStream consumer, managing message delivery, ack tracking, and lifecycle.
/// Mirrors the consumer struct in server/consumer.go.
///
internal sealed class NatsConsumer : IDisposable
{
private readonly ReaderWriterLockSlim _mu = new(LockRecursionPolicy.SupportsRecursion);
public string Name { get; private set; } = string.Empty;
public string Stream { get; private set; } = string.Empty;
public ConsumerConfig Config { get; private set; } = new();
public DateTime Created { get; private set; }
// Atomic counters — use Interlocked for thread-safe access
internal long Delivered;
internal long AckFloor;
internal long NumAckPending;
internal long NumRedelivered;
private bool _closed;
private bool _isLeader;
private ulong _leaderTerm;
private ConsumerState _state = new();
/// IRaftNode — stored as object to avoid cross-dependency on Raft session.
private object? _node;
private CancellationTokenSource? _quitCts;
public NatsConsumer(string stream, ConsumerConfig config, DateTime created)
{
Stream = stream;
Name = (config.Name is { Length: > 0 } name) ? name
: (config.Durable ?? string.Empty);
Config = config;
Created = created;
_quitCts = new CancellationTokenSource();
}
// -------------------------------------------------------------------------
// Factory
// -------------------------------------------------------------------------
///
/// Creates a new for the given stream.
/// Returns null if the consumer cannot be created (stub: always throws).
/// Mirrors newConsumer / consumer.create in server/consumer.go.
///
public static NatsConsumer? Create(
NatsStream stream,
ConsumerConfig cfg,
ConsumerAction action,
ConsumerAssignment? sa)
{
ArgumentNullException.ThrowIfNull(stream);
ArgumentNullException.ThrowIfNull(cfg);
return new NatsConsumer(stream.Name, cfg, DateTime.UtcNow);
}
// -------------------------------------------------------------------------
// Lifecycle
// -------------------------------------------------------------------------
///
/// Stops processing and tears down goroutines / timers.
/// Mirrors consumer.stop in server/consumer.go.
///
public void Stop()
{
_mu.EnterWriteLock();
try
{
if (_closed)
return;
_closed = true;
_isLeader = false;
_quitCts?.Cancel();
}
finally
{
_mu.ExitWriteLock();
}
}
///
/// Deletes the consumer and all associated state permanently.
/// Mirrors consumer.delete in server/consumer.go.
///
public void Delete() => Stop();
// -------------------------------------------------------------------------
// Info / State
// -------------------------------------------------------------------------
///
/// Returns a snapshot of consumer info including config and delivery state.
/// Mirrors consumer.info in server/consumer.go.
///
public ConsumerInfo GetInfo()
{
_mu.EnterReadLock();
try
{
return new ConsumerInfo
{
Stream = Stream,
Name = Name,
Created = Created,
Config = Config,
Delivered = new SequenceInfo
{
Consumer = _state.Delivered.Consumer,
Stream = _state.Delivered.Stream,
},
AckFloor = new SequenceInfo
{
Consumer = _state.AckFloor.Consumer,
Stream = _state.AckFloor.Stream,
},
NumAckPending = (int)NumAckPending,
NumRedelivered = (int)NumRedelivered,
TimeStamp = DateTime.UtcNow,
};
}
finally
{
_mu.ExitReadLock();
}
}
///
/// Returns the current consumer configuration.
/// Mirrors consumer.config in server/consumer.go.
///
public ConsumerConfig GetConfig()
{
_mu.EnterReadLock();
try { return Config; }
finally { _mu.ExitReadLock(); }
}
///
/// Applies an updated configuration to the consumer.
/// Mirrors consumer.update in server/consumer.go.
///
public void UpdateConfig(ConsumerConfig config)
{
ArgumentNullException.ThrowIfNull(config);
_mu.EnterWriteLock();
try { Config = config; }
finally { _mu.ExitWriteLock(); }
}
///
/// Returns the current durable consumer state (delivered, ack_floor, pending, redelivered).
/// Mirrors consumer.state in server/consumer.go.
///
public ConsumerState GetConsumerState()
{
_mu.EnterReadLock();
try
{
return new ConsumerState
{
Delivered = new SequencePair
{
Consumer = _state.Delivered.Consumer,
Stream = _state.Delivered.Stream,
},
AckFloor = new SequencePair
{
Consumer = _state.AckFloor.Consumer,
Stream = _state.AckFloor.Stream,
},
Pending = _state.Pending is { Count: > 0 } ? new Dictionary(_state.Pending) : null,
Redelivered = _state.Redelivered is { Count: > 0 } ? new Dictionary(_state.Redelivered) : null,
};
}
finally
{
_mu.ExitReadLock();
}
}
// -------------------------------------------------------------------------
// Leadership
// -------------------------------------------------------------------------
///
/// Returns true if this server is the current consumer leader.
/// Mirrors consumer.isLeader in server/consumer.go.
///
public bool IsLeader()
{
_mu.EnterReadLock();
try { return _isLeader && !_closed; }
finally { _mu.ExitReadLock(); }
}
///
/// Transitions this consumer into or out of the leader role.
/// Mirrors consumer.setLeader in server/consumer.go.
///
public void SetLeader(bool isLeader, ulong term)
{
_mu.EnterWriteLock();
try
{
_isLeader = isLeader;
_leaderTerm = term;
}
finally
{
_mu.ExitWriteLock();
}
}
// -------------------------------------------------------------------------
// IDisposable
// -------------------------------------------------------------------------
public void Dispose()
{
_quitCts?.Cancel();
_quitCts?.Dispose();
_quitCts = null;
_mu.Dispose();
}
}