Files
natsdotnet/src/NATS.Server/JetStream/Api/JetStreamApiResponse.cs
Joseph Doherty 5d9d1bebd5 test: add E2E JetStream push consumers, ACK policies, retention modes, ordered, mirror, source
Adds 8 new E2E tests to JetStreamTests.cs (tests 11-18) covering push
consumer config, AckNone/AckAll policies, Interest/WorkQueue retention,
ordered consumers, mirror streams, and source streams.

Fixes three server gaps exposed by the new tests: mirror JSON parsing
(deliver_subject and mirror object fields were silently ignored in stream
and consumer API handlers), and deliver_subject omitted from consumer
info wire format. Also fixes ShutdownDrainTests to use TaskCompletionSource
on ConnectionDisconnected instead of a Task.Delay poll loop.
2026-03-12 19:36:29 -04:00

277 lines
9.6 KiB
C#

using NATS.Server.JetStream.Models;
namespace NATS.Server.JetStream.Api;
public sealed class JetStreamApiResponse
{
public JetStreamApiError? Error { get; init; }
public JetStreamStreamInfo? StreamInfo { get; init; }
public JetStreamConsumerInfo? ConsumerInfo { get; init; }
public JetStreamAccountInfo? AccountInfo { get; init; }
public IReadOnlyList<string>? StreamNames { get; init; }
public IReadOnlyList<JetStreamStreamInfo>? StreamInfoList { get; init; }
public IReadOnlyList<string>? ConsumerNames { get; init; }
public IReadOnlyList<JetStreamConsumerInfo>? ConsumerInfoList { get; init; }
public JetStreamStreamMessage? StreamMessage { get; init; }
public JetStreamDirectMessage? DirectMessage { get; init; }
public JetStreamSnapshot? Snapshot { get; init; }
public JetStreamPullBatch? PullBatch { get; init; }
public bool Success { get; init; }
public ulong Purged { get; init; }
/// <summary>
/// Total count of all items (before pagination). Used by list responses for offset-based pagination.
/// Go reference: jetstream_api.go — ApiPaged struct includes Total, Offset, Limit fields.
/// </summary>
public int PaginationTotal { get; init; }
/// <summary>
/// Requested offset for pagination. Echoed back to client so it can calculate the next page.
/// </summary>
public int PaginationOffset { get; init; }
/// <summary>
/// Whether the consumer is currently paused. Populated by pause/resume API responses.
/// Go reference: server/consumer.go jsConsumerPauseResponse.paused field.
/// </summary>
public bool? Paused { get; init; }
/// <summary>
/// UTC deadline until which the consumer is paused. Null when no deadline is set.
/// Go reference: server/consumer.go jsConsumerPauseResponse.pause_until field.
/// </summary>
public DateTime? PauseUntil { get; init; }
/// <summary>
/// Returns a wire-format object for JSON serialization matching the Go server's
/// flat response structure (e.g., config/state at root level for stream responses,
/// not nested under a wrapper property).
/// </summary>
public object ToWireFormat()
{
if (StreamInfo != null)
{
if (Error != null)
return new { type = "io.nats.jetstream.api.v1.stream_create_response", error = Error };
return new
{
type = "io.nats.jetstream.api.v1.stream_create_response",
config = ToWireConfig(StreamInfo.Config),
state = ToWireState(StreamInfo.State),
};
}
if (ConsumerInfo != null)
{
if (Error != null)
return new { type = "io.nats.jetstream.api.v1.consumer_create_response", error = Error };
return new
{
type = "io.nats.jetstream.api.v1.consumer_create_response",
stream_name = ConsumerInfo.StreamName,
name = ConsumerInfo.Name,
config = ToWireConsumerConfig(ConsumerInfo.Config),
};
}
if (Error != null)
return new { error = Error };
if (StreamInfoList != null)
{
var wireStreams = StreamInfoList.Select(s => new
{
config = ToWireConfig(s.Config),
state = ToWireState(s.State),
}).ToList();
return new { total = PaginationTotal, offset = PaginationOffset, limit = wireStreams.Count, streams = wireStreams };
}
if (StreamNames != null)
return new { total = PaginationTotal, offset = PaginationOffset, limit = StreamNames.Count, streams = StreamNames };
if (ConsumerInfoList != null)
{
var wireConsumers = ConsumerInfoList.Select(c => new
{
stream_name = c.StreamName,
name = c.Name,
config = ToWireConsumerConfig(c.Config),
}).ToList();
return new { total = PaginationTotal, offset = PaginationOffset, limit = wireConsumers.Count, consumers = wireConsumers };
}
if (ConsumerNames != null)
return new { total = PaginationTotal, offset = PaginationOffset, limit = ConsumerNames.Count, consumers = ConsumerNames };
if (Purged > 0 || Success)
return new { success = Success, purged = Purged };
return new { success = Success };
}
/// <summary>
/// Creates a Go-compatible wire format for StreamConfig.
/// Only includes fields the Go server sends, with enums as lowercase strings.
/// Go reference: server/stream.go StreamConfig JSON marshaling.
/// </summary>
private static object ToWireConfig(StreamConfig c) => new
{
name = c.Name,
subjects = c.Subjects,
retention = c.Retention.ToString().ToLowerInvariant(),
max_consumers = c.MaxConsumers,
max_msgs = c.MaxMsgs,
max_bytes = c.MaxBytes,
max_age = c.MaxAge,
max_msgs_per_subject = c.MaxMsgsPer,
max_msg_size = c.MaxMsgSize,
storage = c.Storage.ToString().ToLowerInvariant(),
discard = c.Discard.ToString().ToLowerInvariant(),
num_replicas = c.Replicas,
duplicate_window = (long)c.DuplicateWindowMs * 1_000_000L,
sealed_field = c.Sealed,
deny_delete = c.DenyDelete,
deny_purge = c.DenyPurge,
allow_direct = c.AllowDirect,
first_seq = c.FirstSeq,
};
private static object ToWireState(ApiStreamState s) => new
{
messages = s.Messages,
bytes = s.Bytes,
first_seq = s.FirstSeq,
last_seq = s.LastSeq,
consumer_count = 0,
};
private static object ToWireConsumerConfig(ConsumerConfig c) => new
{
durable_name = string.IsNullOrEmpty(c.DurableName) ? null : c.DurableName,
name = string.IsNullOrEmpty(c.DurableName) ? null : c.DurableName,
deliver_policy = c.DeliverPolicy.ToString().ToLowerInvariant(),
ack_policy = c.AckPolicy.ToString().ToLowerInvariant(),
replay_policy = c.ReplayPolicy.ToString().ToLowerInvariant(),
ack_wait = (long)c.AckWaitMs * 1_000_000L,
max_deliver = c.MaxDeliver,
max_ack_pending = c.MaxAckPending,
filter_subject = c.FilterSubject,
// Go: consumer.go — deliver_subject present for push consumers
deliver_subject = string.IsNullOrEmpty(c.DeliverSubject) ? null : c.DeliverSubject,
};
public static JetStreamApiResponse NotFound(string subject) => new()
{
Error = new JetStreamApiError
{
Code = 404,
Description = $"unknown api subject '{subject}'",
},
};
public static JetStreamApiResponse Ok() => new();
public static JetStreamApiResponse SuccessResponse() => new()
{
Success = true,
};
public static JetStreamApiResponse ErrorResponse(int code, string description) => new()
{
Error = new JetStreamApiError
{
Code = code,
Description = description,
},
};
/// <summary>
/// Returns a not-leader error with code 10003 and a leader_hint.
/// Go reference: jetstream_api.go:200-300 — non-leader nodes return this error
/// for mutating operations so clients can redirect.
/// </summary>
public static JetStreamApiResponse NotLeader(string leaderHint) => new()
{
Error = new JetStreamApiError
{
Code = 10003,
Description = "not leader",
LeaderHint = leaderHint,
},
};
/// <summary>
/// Returns a purge success response with the number of messages purged.
/// Go reference: jetstream_api.go:1200-1350 — purge response includes purged count.
/// </summary>
public static JetStreamApiResponse PurgeResponse(ulong purged) => new()
{
Success = true,
Purged = purged,
};
/// <summary>
/// Returns a pause/resume success response with current pause state.
/// Go reference: server/consumer.go jsConsumerPauseResponse — returned after pause/resume API call.
/// </summary>
public static JetStreamApiResponse PauseResponse(bool paused, DateTime? pauseUntil) => new()
{
Success = true,
Paused = paused,
PauseUntil = pauseUntil,
};
}
public sealed class JetStreamStreamInfo
{
public required StreamConfig Config { get; init; }
public required ApiStreamState State { get; init; }
}
public sealed class JetStreamConsumerInfo
{
public string? Name { get; init; }
public string? StreamName { get; init; }
public required ConsumerConfig Config { get; init; }
}
public sealed class JetStreamAccountInfo
{
public int Streams { get; init; }
public int Consumers { get; init; }
}
public sealed class JetStreamStreamMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
}
public sealed class JetStreamDirectMessage
{
public ulong Sequence { get; init; }
public string Subject { get; init; } = string.Empty;
public string Payload { get; init; } = string.Empty;
}
public sealed class JetStreamSnapshot
{
public string Payload { get; init; } = string.Empty;
/// <summary>Stream name this snapshot was taken from.</summary>
public string? StreamName { get; init; }
/// <summary>Number of chunks the snapshot was split into (1 for non-chunked snapshots).</summary>
public int NumChunks { get; init; }
/// <summary>Block/chunk size in bytes.</summary>
public int BlkSize { get; init; }
}
public sealed class JetStreamPullBatch
{
public IReadOnlyList<JetStreamDirectMessage> Messages { get; init; } = [];
}