feat: add advisory event publication for API operations (Gap 7.6)

Add JetStream advisory subject constants to EventSubjects and a lightweight
AdvisoryPublisher that publishes stream/consumer lifecycle events to
$JS.EVENT.ADVISORY.* subjects without depending on InternalEventSystem
directly (testable via Action delegate injection).
This commit is contained in:
Joseph Doherty
2026-02-25 10:56:11 -05:00
parent c0d206102d
commit 2c52b69c93
3 changed files with 350 additions and 0 deletions

View File

@@ -33,6 +33,20 @@ public static class EventSubjects
// Inbox for responses
public const string InboxResponse = "$SYS._INBOX_.{0}";
// JetStream advisory events
// Go reference: jetstream_api.go advisory subjects
public const string JsAdvisoryStreamCreated = "$JS.EVENT.ADVISORY.STREAM.CREATED.{0}";
public const string JsAdvisoryStreamDeleted = "$JS.EVENT.ADVISORY.STREAM.DELETED.{0}";
public const string JsAdvisoryStreamUpdated = "$JS.EVENT.ADVISORY.STREAM.UPDATED.{0}";
public const string JsAdvisoryConsumerCreated = "$JS.EVENT.ADVISORY.CONSUMER.CREATED.{0}.{1}";
public const string JsAdvisoryConsumerDeleted = "$JS.EVENT.ADVISORY.CONSUMER.DELETED.{0}.{1}";
public const string JsAdvisoryStreamSnapshotCreated = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.{0}";
public const string JsAdvisoryStreamSnapshotCompleted = "$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.{0}";
public const string JsAdvisoryStreamRestoreCreated = "$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.{0}";
public const string JsAdvisoryStreamRestoreCompleted = "$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.{0}";
public const string JsAdvisoryStreamLeaderElected = "$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.{0}";
public const string JsAdvisoryStreamQuorumLost = "$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.{0}";
}
/// <summary>

View File

@@ -0,0 +1,130 @@
namespace NATS.Server.JetStream.Api;
/// <summary>
/// Publishes JetStream advisory events to $JS.EVENT.ADVISORY.* subjects.
/// Designed to be lightweight and testable; accepts a publish action delegate
/// rather than depending directly on InternalEventSystem.
/// Go reference: jetstream_api.go advisory publication.
/// </summary>
public sealed class AdvisoryPublisher
{
private readonly Action<string, object> _publishAction;
private long _publishCount;
public AdvisoryPublisher(Action<string, object> publishAction)
{
_publishAction = publishAction;
}
/// <summary>
/// Total number of advisory events published since creation.
/// </summary>
public long PublishCount => Interlocked.Read(ref _publishCount);
/// <summary>
/// Publishes a stream created advisory.
/// Go reference: jetstream_api.go — advisory on stream creation.
/// </summary>
public void StreamCreated(string streamName, object? detail = null)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamCreated, streamName);
Publish(subject, new AdvisoryEvent
{
Type = "io.nats.jetstream.advisory.stream_created",
Stream = streamName,
TimeStamp = DateTime.UtcNow,
Detail = detail,
});
}
/// <summary>
/// Publishes a stream deleted advisory.
/// Go reference: jetstream_api.go — advisory on stream deletion.
/// </summary>
public void StreamDeleted(string streamName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamDeleted, streamName);
Publish(subject, new AdvisoryEvent
{
Type = "io.nats.jetstream.advisory.stream_deleted",
Stream = streamName,
TimeStamp = DateTime.UtcNow,
});
}
/// <summary>
/// Publishes a stream updated advisory.
/// Go reference: jetstream_api.go — advisory on stream config update.
/// </summary>
public void StreamUpdated(string streamName, object? detail = null)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryStreamUpdated, streamName);
Publish(subject, new AdvisoryEvent
{
Type = "io.nats.jetstream.advisory.stream_updated",
Stream = streamName,
TimeStamp = DateTime.UtcNow,
Detail = detail,
});
}
/// <summary>
/// Publishes a consumer created advisory.
/// Go reference: jetstream_api.go — advisory on consumer creation.
/// </summary>
public void ConsumerCreated(string streamName, string consumerName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerCreated, streamName, consumerName);
Publish(subject, new AdvisoryEvent
{
Type = "io.nats.jetstream.advisory.consumer_created",
Stream = streamName,
Consumer = consumerName,
TimeStamp = DateTime.UtcNow,
});
}
/// <summary>
/// Publishes a consumer deleted advisory.
/// Go reference: jetstream_api.go — advisory on consumer deletion.
/// </summary>
public void ConsumerDeleted(string streamName, string consumerName)
{
var subject = string.Format(Events.EventSubjects.JsAdvisoryConsumerDeleted, streamName, consumerName);
Publish(subject, new AdvisoryEvent
{
Type = "io.nats.jetstream.advisory.consumer_deleted",
Stream = streamName,
Consumer = consumerName,
TimeStamp = DateTime.UtcNow,
});
}
private void Publish(string subject, AdvisoryEvent evt)
{
Interlocked.Increment(ref _publishCount);
_publishAction(subject, evt);
}
}
/// <summary>
/// Advisory event payload describing a JetStream lifecycle event.
/// Go reference: jetstream_api.go advisory event types.
/// </summary>
public sealed class AdvisoryEvent
{
/// <summary>Reverse-DNS event type identifier (e.g., "io.nats.jetstream.advisory.stream_created").</summary>
public string Type { get; init; } = string.Empty;
/// <summary>Name of the stream involved in the event, if applicable.</summary>
public string? Stream { get; init; }
/// <summary>Name of the consumer involved in the event, if applicable.</summary>
public string? Consumer { get; init; }
/// <summary>UTC timestamp when the advisory was generated.</summary>
public DateTime TimeStamp { get; init; }
/// <summary>Optional additional detail payload (arbitrary object).</summary>
public object? Detail { get; init; }
}

View File

@@ -0,0 +1,206 @@
// Go reference: jetstream_api.go — advisory event publication for stream/consumer lifecycle.
// Advisory subjects use the pattern $JS.EVENT.ADVISORY.{type}.{stream}[.{consumer}].
namespace NATS.Server.Tests.JetStream.Api;
using NATS.Server.Events;
using NATS.Server.JetStream.Api;
public class AdvisoryEventTests
{
private static (AdvisoryPublisher Publisher, List<(string Subject, object Body)> Published) CreatePublisher()
{
var published = new List<(string Subject, object Body)>();
var publisher = new AdvisoryPublisher((s, b) => published.Add((s, b)));
return (publisher, published);
}
// Go reference: jetstream_api.go — stream created advisory on $JS.EVENT.ADVISORY.STREAM.CREATED.{stream}.
[Fact]
public void StreamCreated_publishes_advisory_to_correct_subject()
{
var (publisher, published) = CreatePublisher();
publisher.StreamCreated("ORDERS");
published.Count.ShouldBe(1);
published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.ORDERS");
}
// Go reference: jetstream_api.go — stream deleted advisory includes stream name in subject.
[Fact]
public void StreamDeleted_publishes_advisory_with_stream_name()
{
var (publisher, published) = CreatePublisher();
publisher.StreamDeleted("PAYMENTS");
published.Count.ShouldBe(1);
published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.PAYMENTS");
var evt = published[0].Body.ShouldBeOfType<AdvisoryEvent>();
evt.Stream.ShouldBe("PAYMENTS");
}
// Go reference: jetstream_api.go — stream updated advisory carries optional detail payload.
[Fact]
public void StreamUpdated_publishes_advisory_with_detail()
{
var (publisher, published) = CreatePublisher();
var detail = new { Reason = "config_change" };
publisher.StreamUpdated("EVENTS", detail);
published.Count.ShouldBe(1);
published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.EVENTS");
var evt = published[0].Body.ShouldBeOfType<AdvisoryEvent>();
evt.Detail.ShouldNotBeNull();
}
// Go reference: jetstream_api.go — consumer created advisory on $JS.EVENT.ADVISORY.CONSUMER.CREATED.{stream}.{consumer}.
[Fact]
public void ConsumerCreated_publishes_advisory_with_stream_and_consumer()
{
var (publisher, published) = CreatePublisher();
publisher.ConsumerCreated("ORDERS", "push-consumer");
published.Count.ShouldBe(1);
published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.ORDERS.push-consumer");
var evt = published[0].Body.ShouldBeOfType<AdvisoryEvent>();
evt.Stream.ShouldBe("ORDERS");
evt.Consumer.ShouldBe("push-consumer");
}
// Go reference: jetstream_api.go — consumer deleted advisory type field identifies event kind.
[Fact]
public void ConsumerDeleted_publishes_advisory_with_correct_type()
{
var (publisher, published) = CreatePublisher();
publisher.ConsumerDeleted("ORDERS", "my-consumer");
published.Count.ShouldBe(1);
var evt = published[0].Body.ShouldBeOfType<AdvisoryEvent>();
evt.Type.ShouldBe("io.nats.jetstream.advisory.consumer_deleted");
}
// Go reference: jetstream_api.go — publish count tracks all emitted advisories atomically.
[Fact]
public void PublishCount_increments_for_each_advisory()
{
var (publisher, _) = CreatePublisher();
publisher.PublishCount.ShouldBe(0);
publisher.StreamCreated("S1");
publisher.PublishCount.ShouldBe(1);
publisher.StreamDeleted("S1");
publisher.PublishCount.ShouldBe(2);
publisher.ConsumerCreated("S1", "C1");
publisher.PublishCount.ShouldBe(3);
}
// Go reference: jetstream_api.go — each advisory type has its own descriptive type string.
[Fact]
public void Advisory_event_has_correct_type_field()
{
var (publisher, published) = CreatePublisher();
publisher.StreamCreated("S");
published[0].Body.ShouldBeOfType<AdvisoryEvent>().Type
.ShouldBe("io.nats.jetstream.advisory.stream_created");
publisher.StreamDeleted("S");
published[1].Body.ShouldBeOfType<AdvisoryEvent>().Type
.ShouldBe("io.nats.jetstream.advisory.stream_deleted");
publisher.StreamUpdated("S");
published[2].Body.ShouldBeOfType<AdvisoryEvent>().Type
.ShouldBe("io.nats.jetstream.advisory.stream_updated");
publisher.ConsumerCreated("S", "C");
published[3].Body.ShouldBeOfType<AdvisoryEvent>().Type
.ShouldBe("io.nats.jetstream.advisory.consumer_created");
publisher.ConsumerDeleted("S", "C");
published[4].Body.ShouldBeOfType<AdvisoryEvent>().Type
.ShouldBe("io.nats.jetstream.advisory.consumer_deleted");
}
// Go reference: jetstream_api.go — advisory timestamps use UTC to ensure cross-cluster consistency.
[Fact]
public void Advisory_event_has_utc_timestamp()
{
var (publisher, published) = CreatePublisher();
var before = DateTime.UtcNow;
publisher.StreamCreated("TEST");
var after = DateTime.UtcNow;
var evt = published[0].Body.ShouldBeOfType<AdvisoryEvent>();
evt.TimeStamp.Kind.ShouldBe(DateTimeKind.Utc);
evt.TimeStamp.ShouldBeGreaterThanOrEqualTo(before);
evt.TimeStamp.ShouldBeLessThanOrEqualTo(after);
}
// Go reference: jetstream_api.go — advisory subjects are derived from EventSubjects constants.
[Fact]
public void Advisory_subjects_format_correctly()
{
string.Format(EventSubjects.JsAdvisoryStreamCreated, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamDeleted, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamUpdated, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryConsumerCreated, "MY_STREAM", "MY_CONSUMER")
.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.MY_STREAM.MY_CONSUMER");
string.Format(EventSubjects.JsAdvisoryConsumerDeleted, "MY_STREAM", "MY_CONSUMER")
.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.DELETED.MY_STREAM.MY_CONSUMER");
string.Format(EventSubjects.JsAdvisoryStreamSnapshotCreated, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_CREATE.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamSnapshotCompleted, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.SNAPSHOT_COMPLETE.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamRestoreCreated, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.RESTORE_CREATE.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamRestoreCompleted, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.RESTORE_COMPLETE.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamLeaderElected, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.LEADER_ELECTED.MY_STREAM");
string.Format(EventSubjects.JsAdvisoryStreamQuorumLost, "MY_STREAM")
.ShouldBe("$JS.EVENT.ADVISORY.STREAM.QUORUM_LOST.MY_STREAM");
}
// Go reference: jetstream_api.go — full lifecycle sequence (create, update, delete) emits all advisories.
[Fact]
public void Multiple_advisories_all_published()
{
var (publisher, published) = CreatePublisher();
publisher.StreamCreated("LIFECYCLE");
publisher.StreamUpdated("LIFECYCLE", new { Reason = "retention_change" });
publisher.ConsumerCreated("LIFECYCLE", "worker");
publisher.ConsumerDeleted("LIFECYCLE", "worker");
publisher.StreamDeleted("LIFECYCLE");
published.Count.ShouldBe(5);
published[0].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.CREATED.LIFECYCLE");
published[1].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.UPDATED.LIFECYCLE");
published[2].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.CREATED.LIFECYCLE.worker");
published[3].Subject.ShouldBe("$JS.EVENT.ADVISORY.CONSUMER.DELETED.LIFECYCLE.worker");
published[4].Subject.ShouldBe("$JS.EVENT.ADVISORY.STREAM.DELETED.LIFECYCLE");
publisher.PublishCount.ShouldBe(5);
}
}