diff --git a/src/NATS.Server/JetStream/Consumers/TokenBucketRateLimiter.cs b/src/NATS.Server/JetStream/Consumers/TokenBucketRateLimiter.cs
new file mode 100644
index 0000000..17e3f2f
--- /dev/null
+++ b/src/NATS.Server/JetStream/Consumers/TokenBucketRateLimiter.cs
@@ -0,0 +1,130 @@
+// Go: consumer.go (rateLimitBps config, rate limiting in consumer delivery)
+namespace NATS.Server.JetStream.Consumers;
+
+///
+/// Token bucket rate limiter for consumer message delivery.
+/// Tokens refill at a configurable rate (bytes per second).
+/// Each message consumes tokens equal to its payload size.
+/// Go reference: consumer.go rate limiting via rateLimitBps config.
+///
+public sealed class TokenBucketRateLimiter
+{
+ private readonly object _lock = new();
+ private double _tokens;
+ private double _maxTokens;
+ private double _refillRate; // tokens per millisecond
+ private DateTime _lastRefill;
+
+ ///
+ /// Creates a rate limiter with the specified rate in bytes per second.
+ ///
+ /// Maximum bytes per second. 0 = unlimited.
+ /// Maximum burst size in bytes. Defaults to 2x rate.
+ public TokenBucketRateLimiter(long bytesPerSecond, long burstSize = 0)
+ {
+ BytesPerSecond = bytesPerSecond;
+ _refillRate = bytesPerSecond / 1000.0; // tokens per ms
+ _maxTokens = burstSize > 0 ? burstSize : bytesPerSecond * 2;
+ _tokens = _maxTokens; // Start full
+ _lastRefill = DateTime.UtcNow;
+ }
+
+ /// Configured rate in bytes per second.
+ public long BytesPerSecond { get; private set; }
+
+ /// Current available tokens (approximate).
+ public double AvailableTokens
+ {
+ get
+ {
+ lock (_lock)
+ {
+ Refill();
+ return _tokens;
+ }
+ }
+ }
+
+ ///
+ /// Tries to consume the specified number of tokens (bytes).
+ /// Returns true if tokens were available (message can be sent).
+ /// Returns false if not enough tokens (caller should wait).
+ ///
+ public bool TryConsume(long bytes)
+ {
+ if (BytesPerSecond <= 0) return true; // Unlimited
+
+ lock (_lock)
+ {
+ Refill();
+ if (_tokens >= bytes)
+ {
+ _tokens -= bytes;
+ return true;
+ }
+ return false;
+ }
+ }
+
+ ///
+ /// Returns the estimated wait time until enough tokens are available.
+ ///
+ public TimeSpan EstimateWait(long bytes)
+ {
+ if (BytesPerSecond <= 0) return TimeSpan.Zero;
+
+ lock (_lock)
+ {
+ Refill();
+ if (_tokens >= bytes) return TimeSpan.Zero;
+
+ var deficit = bytes - _tokens;
+ var waitMs = deficit / _refillRate;
+ return TimeSpan.FromMilliseconds(waitMs);
+ }
+ }
+
+ ///
+ /// Waits until enough tokens are available, then consumes them.
+ ///
+ public async ValueTask WaitForTokensAsync(long bytes, CancellationToken ct = default)
+ {
+ if (BytesPerSecond <= 0) return;
+
+ while (!ct.IsCancellationRequested)
+ {
+ if (TryConsume(bytes)) return;
+
+ var wait = EstimateWait(bytes);
+ if (wait > TimeSpan.Zero)
+ await Task.Delay(wait, ct).ConfigureAwait(false);
+ }
+
+ ct.ThrowIfCancellationRequested();
+ }
+
+ ///
+ /// Updates the rate dynamically.
+ /// Go reference: consumer.go — rate can change on config update.
+ ///
+ public void UpdateRate(long bytesPerSecond, long burstSize = 0)
+ {
+ lock (_lock)
+ {
+ BytesPerSecond = bytesPerSecond;
+ _refillRate = bytesPerSecond / 1000.0;
+ _maxTokens = burstSize > 0 ? burstSize : bytesPerSecond * 2;
+ _tokens = Math.Min(_tokens, _maxTokens);
+ }
+ }
+
+ private void Refill()
+ {
+ var now = DateTime.UtcNow;
+ var elapsed = (now - _lastRefill).TotalMilliseconds;
+ if (elapsed <= 0) return;
+
+ _tokens = Math.Min(_maxTokens, _tokens + elapsed * _refillRate);
+ _lastRefill = now;
+ }
+}
diff --git a/tests/NATS.Server.Tests/JetStream/Consumers/TokenBucketTests.cs b/tests/NATS.Server.Tests/JetStream/Consumers/TokenBucketTests.cs
new file mode 100644
index 0000000..78c2750
--- /dev/null
+++ b/tests/NATS.Server.Tests/JetStream/Consumers/TokenBucketTests.cs
@@ -0,0 +1,209 @@
+// Go: consumer.go (rateLimitBps config, rate limiting in consumer delivery)
+using NATS.Server.JetStream.Consumers;
+
+namespace NATS.Server.Tests.JetStream.Consumers;
+
+public class TokenBucketTests
+{
+ // -------------------------------------------------------------------------
+ // Test 1 — TryConsume succeeds when enough tokens are available
+ //
+ // Go reference: consumer.go — rate limiter allows delivery when token
+ // bucket has sufficient capacity for the message payload size.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void TryConsume_succeeds_when_tokens_available()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000);
+
+ // Full bucket — consume 100 bytes should succeed
+ var result = limiter.TryConsume(100);
+
+ result.ShouldBeTrue();
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 2 — TryConsume fails when insufficient tokens remain
+ //
+ // Go reference: consumer.go — delivery is gated when bucket is drained.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void TryConsume_fails_when_insufficient_tokens()
+ {
+ // Burst = 2x rate = 200 bytes
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 100);
+
+ // Drain all tokens (200 byte burst)
+ limiter.TryConsume(200).ShouldBeTrue();
+
+ // Next consume should fail — no tokens left
+ var result = limiter.TryConsume(1);
+
+ result.ShouldBeFalse();
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 3 — TryConsume always returns true when rate is zero (unlimited)
+ //
+ // Go reference: consumer.go — rateLimitBps=0 means no rate limiting.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void TryConsume_unlimited_when_rate_zero()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 0);
+
+ // Should always succeed regardless of size
+ limiter.TryConsume(1_000_000).ShouldBeTrue();
+ limiter.TryConsume(1_000_000).ShouldBeTrue();
+ limiter.TryConsume(long.MaxValue / 2).ShouldBeTrue();
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 4 — AvailableTokens starts at burst size
+ //
+ // Go reference: consumer.go — bucket starts full so initial burst is allowed.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void AvailableTokens_starts_at_burst_size()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000, burstSize: 500);
+
+ limiter.AvailableTokens.ShouldBe(500.0, tolerance: 1.0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 5 — AvailableTokens refills over time
+ //
+ // Go reference: consumer.go — token bucket refills at configured bytes/sec
+ // so that a drained bucket recovers proportionally with elapsed time.
+ // -------------------------------------------------------------------------
+ [SlopwatchSuppress("SW004", "Token bucket refill is driven by real elapsed wall-clock time; no synchronisation primitive can replace observing time-based token accumulation")]
+ [Fact]
+ public async Task AvailableTokens_refills_over_time()
+ {
+ // 10,000 bytes/sec = 10 bytes/ms; burst = 20,000 bytes
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 10_000);
+
+ // Drain entire bucket
+ limiter.TryConsume(20_000).ShouldBeTrue();
+ limiter.AvailableTokens.ShouldBeLessThan(1.0);
+
+ // Wait 50ms — should refill ~500 bytes (10 bytes/ms * 50ms)
+ await Task.Delay(50);
+
+ limiter.AvailableTokens.ShouldBeGreaterThan(100.0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 6 — EstimateWait returns zero when tokens are available
+ //
+ // Go reference: consumer.go — no delay when bucket has capacity.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void EstimateWait_returns_zero_when_tokens_available()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000);
+
+ var wait = limiter.EstimateWait(100);
+
+ wait.ShouldBe(TimeSpan.Zero);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 7 — EstimateWait returns positive duration when tokens are insufficient
+ //
+ // Go reference: consumer.go — delivery delay calculated from deficit / refill rate.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void EstimateWait_returns_positive_when_insufficient()
+ {
+ // 100 bytes/sec = 0.1 bytes/ms; burst = 200 bytes
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 100);
+
+ // Drain all tokens
+ limiter.TryConsume(200).ShouldBeTrue();
+
+ // Requesting 50 more bytes — must wait
+ var wait = limiter.EstimateWait(50);
+
+ wait.ShouldBeGreaterThan(TimeSpan.Zero);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 8 — UpdateRate changes the effective rate dynamically
+ //
+ // Go reference: consumer.go — rate can be updated via config reload.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void UpdateRate_changes_rate_dynamically()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000);
+
+ limiter.UpdateRate(500);
+
+ limiter.BytesPerSecond.ShouldBe(500L);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 9 — UpdateRate caps existing tokens at new max
+ //
+ // Go reference: consumer.go — when burst is reduced, current tokens are
+ // clamped to not exceed the new maximum.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void UpdateRate_caps_tokens_at_new_max()
+ {
+ // Start with rate=1000, burst=2000
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000);
+
+ // Reduce to rate=100, burst=200 — existing tokens (2000) must be capped
+ limiter.UpdateRate(100);
+
+ limiter.AvailableTokens.ShouldBeLessThanOrEqualTo(200.0 + 1.0); // +1 for refill epsilon
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 10 — TryConsume partial consumption leaves correct remainder
+ //
+ // Go reference: consumer.go — each delivery subtracts exactly payload bytes
+ // from the bucket.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void TryConsume_partial_consumption()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 1000, burstSize: 200);
+
+ limiter.TryConsume(100).ShouldBeTrue();
+
+ // ~100 tokens should remain (minus any tiny refill drift during test)
+ limiter.AvailableTokens.ShouldBeInRange(99.0, 101.0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 11 — Default burst size is 2x the bytes-per-second rate
+ //
+ // Go reference: consumer.go — default burst allows two seconds worth of data.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void Default_burst_is_2x_rate()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 500);
+
+ // Bucket starts full at burst = 2 * 500 = 1000
+ limiter.AvailableTokens.ShouldBe(1000.0, tolerance: 1.0);
+ }
+
+ // -------------------------------------------------------------------------
+ // Test 12 — Custom burst size overrides the default 2x calculation
+ //
+ // Go reference: consumer.go — explicit burst size gives precise control
+ // over maximum allowed burst traffic.
+ // -------------------------------------------------------------------------
+ [Fact]
+ public void Custom_burst_size()
+ {
+ var limiter = new TokenBucketRateLimiter(bytesPerSecond: 500, burstSize: 750);
+
+ limiter.AvailableTokens.ShouldBe(750.0, tolerance: 1.0);
+ }
+}