feat: add source/mirror info reporting for monitoring (Gap 4.10)
Add GetMirrorInfo/GetSourceInfo methods to MirrorCoordinator and SourceCoordinator returning structured MirrorInfoResponse/SourceInfoResponse records (Name, Lag, Active ms, FilterSubject, Error). Wire both into StreamManager.GetMirrorInfo and GetSourceInfos for the monitoring API path.
This commit is contained in:
@@ -297,6 +297,22 @@ public sealed class MirrorCoordinator : IAsyncDisposable
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a structured monitoring response for this mirror.
|
||||
/// Go reference: server/stream.go:2739-2743 (mirrorInfo building StreamSourceInfo)
|
||||
/// </summary>
|
||||
public MirrorInfoResponse GetMirrorInfo(string streamName)
|
||||
{
|
||||
var report = GetHealthReport();
|
||||
return new MirrorInfoResponse
|
||||
{
|
||||
Name = streamName,
|
||||
Lag = report.Lag,
|
||||
Active = report.LastSyncUtc != default ? (long)(DateTime.UtcNow - report.LastSyncUtc).TotalMilliseconds : -1,
|
||||
Error = ErrorMessage,
|
||||
};
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await StopAsync();
|
||||
@@ -462,3 +478,25 @@ public sealed record MirrorHealthReport
|
||||
public bool IsRunning { get; init; }
|
||||
public bool IsStalled { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Structured monitoring response for a mirror, as returned by the JetStream monitoring API.
|
||||
/// Go reference: server/stream.go:2739-2743 (mirrorInfo / StreamSourceInfo)
|
||||
/// </summary>
|
||||
public sealed record MirrorInfoResponse
|
||||
{
|
||||
/// <summary>Name of the stream being mirrored.</summary>
|
||||
public string Name { get; init; } = string.Empty;
|
||||
|
||||
/// <summary>Number of messages behind the origin stream.</summary>
|
||||
public ulong Lag { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Milliseconds since the last successful sync.
|
||||
/// -1 when the mirror has never synced.
|
||||
/// </summary>
|
||||
public long Active { get; init; } = -1;
|
||||
|
||||
/// <summary>Current error message, or null if no error.</summary>
|
||||
public string? Error { get; init; }
|
||||
}
|
||||
|
||||
@@ -295,6 +295,23 @@ public sealed class SourceCoordinator : IAsyncDisposable
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns a structured monitoring response for this source.
|
||||
/// Go reference: server/stream.go:2687-2736 (sourcesInfo / StreamSourceInfo)
|
||||
/// </summary>
|
||||
public SourceInfoResponse GetSourceInfo()
|
||||
{
|
||||
var report = GetHealthReport();
|
||||
return new SourceInfoResponse
|
||||
{
|
||||
Name = _sourceConfig.Name,
|
||||
Lag = report.Lag,
|
||||
Active = report.LastSyncUtc != default ? (long)(DateTime.UtcNow - report.LastSyncUtc).TotalMilliseconds : -1,
|
||||
FilterSubject = _sourceConfig.FilterSubject,
|
||||
Error = null, // SourceCoordinator doesn't have error state tracking yet
|
||||
};
|
||||
}
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
await StopAsync();
|
||||
@@ -539,3 +556,28 @@ public sealed record SourceHealthReport
|
||||
public long FilteredOutCount { get; init; }
|
||||
public long DeduplicatedCount { get; init; }
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Structured monitoring response for a source, as returned by the JetStream monitoring API.
|
||||
/// Go reference: server/stream.go:2687-2736 (sourcesInfo / StreamSourceInfo)
|
||||
/// </summary>
|
||||
public sealed record SourceInfoResponse
|
||||
{
|
||||
/// <summary>Name of the origin stream.</summary>
|
||||
public string Name { get; init; } = string.Empty;
|
||||
|
||||
/// <summary>Number of messages behind the origin stream.</summary>
|
||||
public ulong Lag { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// Milliseconds since the last successful sync.
|
||||
/// -1 when the source has never synced.
|
||||
/// </summary>
|
||||
public long Active { get; init; } = -1;
|
||||
|
||||
/// <summary>Optional subject filter applied by this source coordinator.</summary>
|
||||
public string? FilterSubject { get; init; }
|
||||
|
||||
/// <summary>Current error message, or null if no error.</summary>
|
||||
public string? Error { get; init; }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
using NATS.Server.JetStream.MirrorSource;
|
||||
using NATS.Server.JetStream.Models;
|
||||
using NATS.Server.JetStream.Storage;
|
||||
using Shouldly;
|
||||
|
||||
namespace NATS.Server.Tests.JetStream.Streams;
|
||||
|
||||
// Go reference: server/stream.go:2739-2743 (mirrorInfo building StreamSourceInfo)
|
||||
// Go reference: server/stream.go:2687-2736 (sourcesInfo / StreamSourceInfo)
|
||||
|
||||
public class SourceMirrorInfoTests
|
||||
{
|
||||
// -------------------------------------------------------------------------
|
||||
// MirrorInfoResponse tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2739 — StreamSourceInfo.Name field
|
||||
public async Task MirrorInfoResponse_has_correct_name()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
var info = mirror.GetMirrorInfo("MIRROR");
|
||||
|
||||
info.Name.ShouldBe("MIRROR");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2740 — StreamSourceInfo.Lag field from mirrorInfo
|
||||
public async Task MirrorInfoResponse_shows_lag()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
// Sync up to origin seq 5, origin is at 10 → lag = 5 via GetHealthReport
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(5, "a", "data"), default);
|
||||
var report = mirror.GetHealthReport(originLastSeq: 10);
|
||||
report.Lag.ShouldBe(5UL);
|
||||
|
||||
// GetMirrorInfo calls GetHealthReport() without originLastSeq, so Lag comes from
|
||||
// the internal Lag property (0 after in-process sync). Validate the field is wired up.
|
||||
var info = mirror.GetMirrorInfo("LAGGED");
|
||||
info.Lag.ShouldBe(mirror.Lag);
|
||||
info.Name.ShouldBe("LAGGED");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2741 — Active field, -1 when never synced
|
||||
public async Task MirrorInfoResponse_active_is_negative_when_never_synced()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
var info = mirror.GetMirrorInfo("FRESH");
|
||||
|
||||
info.Active.ShouldBe(-1L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2741 — Active = ms since last sync
|
||||
public async Task MirrorInfoResponse_active_shows_ms_since_sync()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
await mirror.OnOriginAppendAsync(MakeMessage(1, "a", "payload"), default);
|
||||
|
||||
var info = mirror.GetMirrorInfo("SYNCED");
|
||||
|
||||
info.Active.ShouldBeGreaterThanOrEqualTo(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2742 — StreamSourceInfo.Error field
|
||||
public async Task MirrorInfoResponse_includes_error()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var mirror = new MirrorCoordinator(target);
|
||||
|
||||
mirror.SetError("consumer not found");
|
||||
|
||||
var info = mirror.GetMirrorInfo("BROKEN");
|
||||
|
||||
info.Error.ShouldBe("consumer not found");
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// SourceInfoResponse tests
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2698 — StreamSourceInfo.Name from sourceInfo
|
||||
public async Task SourceInfoResponse_has_correct_name()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "MY_SOURCE" });
|
||||
|
||||
var info = source.GetSourceInfo();
|
||||
|
||||
info.Name.ShouldBe("MY_SOURCE");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2700 — StreamSourceInfo.FilterSubject
|
||||
public async Task SourceInfoResponse_shows_filter_subject()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig
|
||||
{
|
||||
Name = "SRC",
|
||||
FilterSubject = "orders.*",
|
||||
});
|
||||
|
||||
var info = source.GetSourceInfo();
|
||||
|
||||
info.FilterSubject.ShouldBe("orders.*");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2701 — Active field, -1 when never synced
|
||||
public async Task SourceInfoResponse_active_is_negative_when_never_synced()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
var info = source.GetSourceInfo();
|
||||
|
||||
info.Active.ShouldBe(-1L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2701 — Active = ms since last sync
|
||||
public async Task SourceInfoResponse_active_shows_ms_since_sync()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "orders.created", "payload"), default);
|
||||
|
||||
var info = source.GetSourceInfo();
|
||||
|
||||
info.Active.ShouldBeGreaterThanOrEqualTo(0L);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
// Go reference: server/stream.go:2699 — StreamSourceInfo.Lag from sourceInfo
|
||||
public async Task SourceInfoResponse_lag_reflects_health_report()
|
||||
{
|
||||
var target = new MemStore();
|
||||
await using var source = new SourceCoordinator(target, new StreamSourceConfig { Name = "SRC" });
|
||||
|
||||
// Apply 3 messages directly (no background loop, no timing dependency).
|
||||
// Origin has 5 messages total; coordinator sees up to seq 3, so lag = 2.
|
||||
await source.OnOriginAppendAsync(MakeMessage(1, "a", "1"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(2, "b", "2"), default);
|
||||
await source.OnOriginAppendAsync(MakeMessage(3, "c", "3"), default);
|
||||
|
||||
// Simulate the lag that GetHealthReport would compute against a known origin end.
|
||||
var healthReport = source.GetHealthReport(originLastSeq: 5);
|
||||
var info = source.GetSourceInfo();
|
||||
|
||||
// GetSourceInfo calls GetHealthReport() without originLastSeq, so both should
|
||||
// reflect the same internal Lag value (0 after in-process sync).
|
||||
info.Lag.ShouldBe(source.Lag);
|
||||
// Verify against the explicit-originLastSeq health report for the expected lag value.
|
||||
healthReport.Lag.ShouldBe(2UL);
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------
|
||||
// Helper
|
||||
// -------------------------------------------------------------------------
|
||||
|
||||
private static StoredMessage MakeMessage(ulong seq, string subject, string payload) => new()
|
||||
{
|
||||
Sequence = seq,
|
||||
Subject = subject,
|
||||
Payload = System.Text.Encoding.UTF8.GetBytes(payload),
|
||||
TimestampUtc = DateTime.UtcNow,
|
||||
};
|
||||
}
|
||||
Reference in New Issue
Block a user