// Copyright 2012-2025 The NATS Authors
// Licensed under the Apache License, Version 2.0
using System.Threading.Channels;
using NATS.Client.Core;
using Shouldly;
namespace ZB.MOM.NatsNet.Server.IntegrationTests;
///
/// Behavioral baseline tests against the reference Go NATS server.
/// These tests require a running Go NATS server on localhost:4222.
/// Start with: cd golang/nats-server && go run . -p 4222
///
[Collection("NatsIntegration")]
[Trait("Category", "Integration")]
public class NatsServerBehaviorTests : IAsyncLifetime
{
private NatsConnection? _nats;
private Exception? _initFailure;
public async Task InitializeAsync()
{
try
{
_nats = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" });
await _nats.ConnectAsync();
}
catch (Exception ex)
{
_initFailure = ex;
}
}
public async Task DisposeAsync()
{
if (_nats is not null)
await _nats.DisposeAsync();
}
///
/// Returns true if the server is not available, causing the calling test to return early (pass silently).
/// xUnit 2.x does not support dynamic skip at runtime; early return is the pragmatic workaround.
///
private bool ServerUnavailable() => _initFailure != null;
[Fact]
public async Task BasicPubSub_ShouldDeliverMessage()
{
if (ServerUnavailable()) return;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var received = new TaskCompletionSource();
_ = Task.Run(async () =>
{
try
{
await foreach (var msg in _nats!.SubscribeAsync("test.hello", cancellationToken: cts.Token))
{
received.TrySetResult(msg.Data ?? "");
break;
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
received.TrySetException(ex);
}
}, cts.Token);
// Give subscriber a moment to register
await Task.Delay(100, cts.Token);
await _nats!.PublishAsync("test.hello", "world");
var result = await received.Task.WaitAsync(cts.Token);
result.ShouldBe("world");
}
[Fact]
public async Task WildcardSubscription_DotStar_ShouldMatch()
{
if (ServerUnavailable()) return;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var received = new TaskCompletionSource();
_ = Task.Run(async () =>
{
try
{
await foreach (var msg in _nats!.SubscribeAsync("foo.*", cancellationToken: cts.Token))
{
received.TrySetResult(msg.Subject);
break;
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
received.TrySetException(ex);
}
}, cts.Token);
await Task.Delay(100, cts.Token);
await _nats!.PublishAsync("foo.bar", "payload");
var subject = await received.Task.WaitAsync(cts.Token);
subject.ShouldBe("foo.bar");
}
[Fact]
public async Task WildcardSubscription_GreaterThan_ShouldMatchMultiLevel()
{
if (ServerUnavailable()) return;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var received = new TaskCompletionSource();
_ = Task.Run(async () =>
{
try
{
await foreach (var msg in _nats!.SubscribeAsync("foo.>", cancellationToken: cts.Token))
{
received.TrySetResult(msg.Subject);
break;
}
}
catch (Exception ex) when (ex is not OperationCanceledException)
{
received.TrySetException(ex);
}
}, cts.Token);
await Task.Delay(100, cts.Token);
await _nats!.PublishAsync("foo.bar.baz", "payload");
var subject = await received.Task.WaitAsync(cts.Token);
subject.ShouldBe("foo.bar.baz");
}
[Fact]
public async Task QueueGroup_ShouldDeliverToOnlyOneSubscriber()
{
if (ServerUnavailable()) return;
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(10));
const int messageCount = 30;
var channel = Channel.CreateBounded(messageCount * 2);
var count1 = 0;
var count2 = 0;
var reader1 = Task.Run(async () =>
{
try
{
await foreach (var _ in _nats!.SubscribeAsync("qg.test", queueGroup: "workers", cancellationToken: cts.Token))
{
Interlocked.Increment(ref count1);
await channel.Writer.WriteAsync(1, cts.Token);
}
}
catch (OperationCanceledException) { }
});
var reader2 = Task.Run(async () =>
{
try
{
await foreach (var _ in _nats!.SubscribeAsync("qg.test", queueGroup: "workers", cancellationToken: cts.Token))
{
Interlocked.Increment(ref count2);
await channel.Writer.WriteAsync(1, cts.Token);
}
}
catch (OperationCanceledException) { }
});
// Give subscribers a moment to register
await Task.Delay(200, cts.Token);
for (var i = 0; i < messageCount; i++)
await _nats!.PublishAsync("qg.test", $"msg{i}");
// Wait for all messages to be received
var received = 0;
while (received < messageCount)
{
await channel.Reader.ReadAsync(cts.Token);
received++;
}
(count1 + count2).ShouldBe(messageCount);
// Don't assert per-subscriber counts — distribution is probabilistic
cts.Cancel();
await Task.WhenAll(reader1, reader2);
}
[Fact]
public async Task ConnectDisconnect_ShouldNotThrow()
{
if (ServerUnavailable()) return;
var nats2 = new NatsConnection(new NatsOpts { Url = "nats://localhost:4222" });
await Should.NotThrowAsync(async () =>
{
await nats2.ConnectAsync();
await nats2.DisposeAsync();
});
}
}