diff --git a/src/NATS.Server/Events/EventSubjects.cs b/src/NATS.Server/Events/EventSubjects.cs index f29460f..4eed2bd 100644 --- a/src/NATS.Server/Events/EventSubjects.cs +++ b/src/NATS.Server/Events/EventSubjects.cs @@ -33,6 +33,20 @@ public static class EventSubjects // Inbox for responses public const string InboxResponse = "$SYS._INBOX_.{0}"; + + // JetStream advisory events + // Go reference: jetstream_api.go advisory subjects + public const string JsAdvisoryStreamCreated = "$JS.EVENT.ADVISORY.STREAM.CREATED.{0}"; + public const string JsAdvisoryStreamDeleted = "$JS.EVENT.ADVISORY.STREAM.DELETED.{0}"; + public const string JsAdvisoryStreamUpdated = "$JS.EVENT.ADVISORY.STREAM.UPDATED.{0}"; + public const string JsAdvisoryConsumerCreated = "$JS.EVENT.ADVISORY.CONSUMER.CREATED.{0}.{1}"; + public const string JsAdvisoryConsumerDeleted = "$JS.EVENT.ADVISORY.CONSUMER.DELETED.{0}.{1}"; + public const string JsAdvisoryStreamSnapshotCreated = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.{0}"; + public const string JsAdvisoryStreamSnapshotCompleted = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.{0}"; + public const string JsAdvisoryStreamRestoreCreated = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.{0}"; + public const string JsAdvisoryStreamRestoreCompleted = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.{0}"; + public const string JsAdvisoryStreamLeaderElected = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.{0}"; + public const string JsAdvisoryStreamQuorumLost = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.{0}"; } /// diff --git a/src/NATS.Server/JetStream/Api/AdvisoryPublisher.cs b/src/NATS.Server/JetStream/Api/AdvisoryPublisher.cs new file mode 100644 index 0000000..c328e17 --- /dev/null +++ b/src/NATS.Server/JetStream/Api/AdvisoryPublisher.cs @@ -0,0 +1,130 @@ +namespace NATS.Server.JetStream.Api; + +/// +/// Publishes JetStream advisory events to $JS.EVENT.ADVISORY.* subjects. +/// Designed to be lightweight and testable; accepts a publish action delegate +/// rather than depending directly on InternalEventSystem. +/// Go reference: jetstream_api.go advisory publication. +/// +public sealed class AdvisoryPublisher +{ + private readonly Action _publishAction; + private long _publishCount; + + public AdvisoryPublisher(Action publishAction) + { + _publishAction = publishAction; + } + + /// + /// Total number of advisory events published since creation. + /// + public long PublishCount => Interlocked.Read(ref _publishCount); + + /// + /// Publishes a stream created advisory. + /// Go reference: jetstream_api.go — advisory on stream creation. + /// + public void StreamCreated(string streamName, object? detail = null) + { + var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamCreated, streamName); + Publish(subject, new AdvisoryEvent + { + Type = "io.nats.jetstream.advisory.stream_created", + Stream = streamName, + TimeStamp = DateTime.UtcNow, + Detail = detail, + }); + } + + /// + /// Publishes a stream deleted advisory. + /// Go reference: jetstream_api.go — advisory on stream deletion. + /// + public void StreamDeleted(string streamName) + { + var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamDeleted, streamName); + Publish(subject, new AdvisoryEvent + { + Type = "io.nats.jetstream.advisory.stream_deleted", + Stream = streamName, + TimeStamp = DateTime.UtcNow, + }); + } + + /// + /// Publishes a stream updated advisory. + /// Go reference: jetstream_api.go — advisory on stream config update. + /// + public void StreamUpdated(string streamName, object? detail = null) + { + var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamUpdated, streamName); + Publish(subject, new AdvisoryEvent + { + Type = "io.nats.jetstream.advisory.stream_updated", + Stream = streamName, + TimeStamp = DateTime.UtcNow, + Detail = detail, + }); + } + + /// + /// Publishes a consumer created advisory. + /// Go reference: jetstream_api.go — advisory on consumer creation. + /// + public void ConsumerCreated(string streamName, string consumerName) + { + var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerCreated, streamName, consumerName); + Publish(subject, new AdvisoryEvent + { + Type = "io.nats.jetstream.advisory.consumer_created", + Stream = streamName, + Consumer = consumerName, + TimeStamp = DateTime.UtcNow, + }); + } + + /// + /// Publishes a consumer deleted advisory. + /// Go reference: jetstream_api.go — advisory on consumer deletion. + /// + public void ConsumerDeleted(string streamName, string consumerName) + { + var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerDeleted, streamName, consumerName); + Publish(subject, new AdvisoryEvent + { + Type = "io.nats.jetstream.advisory.consumer_deleted", + Stream = streamName, + Consumer = consumerName, + TimeStamp = DateTime.UtcNow, + }); + } + + private void Publish(string subject, AdvisoryEvent evt) + { + Interlocked.Increment(ref _publishCount); + _publishAction(subject, evt); + } +} + +/// +/// Advisory event payload describing a JetStream lifecycle event. +/// Go reference: jetstream_api.go advisory event types. +/// +public sealed class AdvisoryEvent +{ + /// Reverse-DNS event type identifier (e.g., "io.nats.jetstream.advisory.stream_created"). + public string Type { get; init; } = string.Empty; + + /// Name of the stream involved in the event, if applicable. + public string? Stream { get; init; } + + /// Name of the consumer involved in the event, if applicable. + public string? Consumer { get; init; } + + /// UTC timestamp when the advisory was generated. + public DateTime TimeStamp { get; init; } + + /// Optional additional detail payload (arbitrary object). + public object? Detail { get; init; } +} diff --git a/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs b/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs new file mode 100644 index 0000000..c17856a --- /dev/null +++ b/tests/NATS.Server.Tests/JetStream/Api/AdvisoryEventTests.cs @@ -0,0 +1,206 @@ +// Go reference: jetstream_api.go — advisory event publication for stream/consumer lifecycle. +// Advisory subjects use the pattern $JS.EVENT.ADVISORY.{type}.{stream}[.{consumer}]. + +namespace NATS.Server.Tests.JetStream.Api; + +using NATS.Server.Events; +using NATS.Server.JetStream.Api; + +public class AdvisoryEventTests +{ + private static (AdvisoryPublisher Publisher, List<(string Subject, object Body)> Published) CreatePublisher() + { + var published = new List<(string Subject, object Body)>(); + var publisher = new AdvisoryPublisher((s, b) => published.Add((s, b))); + return (publisher, published); + } + + // Go reference: jetstream_api.go — stream created advisory on $JS.EVENT.ADVISORY.STREAM.CREATED.{stream}. + [Fact] + public void StreamCreated_publishes_advisory_to_correct_subject() + { + var (publisher, published) = CreatePublisher(); + + publisher.StreamCreated("ORDERS"); + + published.Count.ShouldBe(1); + published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS"); + } + + // Go reference: jetstream_api.go — stream deleted advisory includes stream name in subject. + [Fact] + public void StreamDeleted_publishes_advisory_with_stream_name() + { + var (publisher, published) = CreatePublisher(); + + publisher.StreamDeleted("PAYMENTS"); + + published.Count.ShouldBe(1); + published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.PAYMENTS"); + var evt = published[0].Body.ShouldBeOfType(); + evt.Stream.ShouldBe("PAYMENTS"); + } + + // Go reference: jetstream_api.go — stream updated advisory carries optional detail payload. + [Fact] + public void StreamUpdated_publishes_advisory_with_detail() + { + var (publisher, published) = CreatePublisher(); + var detail = new { Reason = "config_change" }; + + publisher.StreamUpdated("EVENTS", detail); + + published.Count.ShouldBe(1); + published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.EVENTS"); + var evt = published[0].Body.ShouldBeOfType(); + evt.Detail.ShouldNotBeNull(); + } + + // Go reference: jetstream_api.go — consumer created advisory on $JS.EVENT.ADVISORY.CONSUMER.CREATED.{stream}.{consumer}. + [Fact] + public void ConsumerCreated_publishes_advisory_with_stream_and_consumer() + { + var (publisher, published) = CreatePublisher(); + + publisher.ConsumerCreated("ORDERS", "push-consumer"); + + published.Count.ShouldBe(1); + published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.ORDERS.push-consumer"); + var evt = published[0].Body.ShouldBeOfType(); + evt.Stream.ShouldBe("ORDERS"); + evt.Consumer.ShouldBe("push-consumer"); + } + + // Go reference: jetstream_api.go — consumer deleted advisory type field identifies event kind. + [Fact] + public void ConsumerDeleted_publishes_advisory_with_correct_type() + { + var (publisher, published) = CreatePublisher(); + + publisher.ConsumerDeleted("ORDERS", "my-consumer"); + + published.Count.ShouldBe(1); + var evt = published[0].Body.ShouldBeOfType(); + evt.Type.ShouldBe("io.nats.jetstream.advisory.consumer_deleted"); + } + + // Go reference: jetstream_api.go — publish count tracks all emitted advisories atomically. + [Fact] + public void PublishCount_increments_for_each_advisory() + { + var (publisher, _) = CreatePublisher(); + + publisher.PublishCount.ShouldBe(0); + + publisher.StreamCreated("S1"); + publisher.PublishCount.ShouldBe(1); + + publisher.StreamDeleted("S1"); + publisher.PublishCount.ShouldBe(2); + + publisher.ConsumerCreated("S1", "C1"); + publisher.PublishCount.ShouldBe(3); + } + + // Go reference: jetstream_api.go — each advisory type has its own descriptive type string. + [Fact] + public void Advisory_event_has_correct_type_field() + { + var (publisher, published) = CreatePublisher(); + + publisher.StreamCreated("S"); + published[0].Body.ShouldBeOfType().Type + .ShouldBe("io.nats.jetstream.advisory.stream_created"); + + publisher.StreamDeleted("S"); + published[1].Body.ShouldBeOfType().Type + .ShouldBe("io.nats.jetstream.advisory.stream_deleted"); + + publisher.StreamUpdated("S"); + published[2].Body.ShouldBeOfType().Type + .ShouldBe("io.nats.jetstream.advisory.stream_updated"); + + publisher.ConsumerCreated("S", "C"); + published[3].Body.ShouldBeOfType().Type + .ShouldBe("io.nats.jetstream.advisory.consumer_created"); + + publisher.ConsumerDeleted("S", "C"); + published[4].Body.ShouldBeOfType().Type + .ShouldBe("io.nats.jetstream.advisory.consumer_deleted"); + } + + // Go reference: jetstream_api.go — advisory timestamps use UTC to ensure cross-cluster consistency. + [Fact] + public void Advisory_event_has_utc_timestamp() + { + var (publisher, published) = CreatePublisher(); + var before = DateTime.UtcNow; + + publisher.StreamCreated("TEST"); + + var after = DateTime.UtcNow; + var evt = published[0].Body.ShouldBeOfType(); + evt.TimeStamp.Kind.ShouldBe(DateTimeKind.Utc); + evt.TimeStamp.ShouldBeGreaterThanOrEqualTo(before); + evt.TimeStamp.ShouldBeLessThanOrEqualTo(after); + } + + // Go reference: jetstream_api.go — advisory subjects are derived from EventSubjects constants. + [Fact] + public void Advisory_subjects_format_correctly() + { + string.Format(EventSubjects.JsAdvisoryStreamCreated, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamDeleted, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamUpdated, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryConsumerCreated, "MY_STREAM", "MY_CONSUMER") + .ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.MY_STREAM.MY_CONSUMER"); + + string.Format(EventSubjects.JsAdvisoryConsumerDeleted, "MY_STREAM", "MY_CONSUMER") + .ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.DELETED.MY_STREAM.MY_CONSUMER"); + + string.Format(EventSubjects.JsAdvisoryStreamSnapshotCreated, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamSnapshotCompleted, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamRestoreCreated, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamRestoreCompleted, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamLeaderElected, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.MY_STREAM"); + + string.Format(EventSubjects.JsAdvisoryStreamQuorumLost, "MY_STREAM") + .ShouldBe("$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.MY_STREAM"); + } + + // Go reference: jetstream_api.go — full lifecycle sequence (create, update, delete) emits all advisories. + [Fact] + public void Multiple_advisories_all_published() + { + var (publisher, published) = CreatePublisher(); + + publisher.StreamCreated("LIFECYCLE"); + publisher.StreamUpdated("LIFECYCLE", new { Reason = "retention_change" }); + publisher.ConsumerCreated("LIFECYCLE", "worker"); + publisher.ConsumerDeleted("LIFECYCLE", "worker"); + publisher.StreamDeleted("LIFECYCLE"); + + published.Count.ShouldBe(5); + published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.LIFECYCLE"); + published[1].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.LIFECYCLE"); + published[2].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.LIFECYCLE.worker"); + published[3].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.DELETED.LIFECYCLE.worker"); + published[4].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.LIFECYCLE"); + publisher.PublishCount.ShouldBe(5); + } +}