feat(audit): stamp SourceNode at CentralAuditWriter + persist via AuditLogRepository
CentralAuditWriter injects INodeIdentityProvider and stamps the event before handing to the repository. AuditLogRepository.InsertIfNotExistsAsync now includes SourceNode in the INSERT column list. Caller-provided value wins (supports any future direct-write callsite that already has its own node id).
This commit is contained in:
@@ -43,6 +43,7 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
|
|||||||
private readonly ILogger<CentralAuditWriter> _logger;
|
private readonly ILogger<CentralAuditWriter> _logger;
|
||||||
private readonly IAuditPayloadFilter? _filter;
|
private readonly IAuditPayloadFilter? _filter;
|
||||||
private readonly ICentralAuditWriteFailureCounter _failureCounter;
|
private readonly ICentralAuditWriteFailureCounter _failureCounter;
|
||||||
|
private readonly INodeIdentityProvider? _nodeIdentity;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Bundle C (M5-T6) — the central direct-write path used by the
|
/// Bundle C (M5-T6) — the central direct-write path used by the
|
||||||
@@ -56,18 +57,27 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
|
|||||||
/// throw bumps the central health surface's
|
/// throw bumps the central health surface's
|
||||||
/// <c>CentralAuditWriteFailures</c> counter. Defaults to a NoOp so test
|
/// <c>CentralAuditWriteFailures</c> counter. Defaults to a NoOp so test
|
||||||
/// composition roots that don't wire the counter keep their current
|
/// composition roots that don't wire the counter keep their current
|
||||||
/// behaviour.
|
/// behaviour. SourceNode-stamping (Task 12) — adds the optional
|
||||||
|
/// <see cref="INodeIdentityProvider"/> so central-origin rows (Notification
|
||||||
|
/// Outbox dispatch, Inbound API) carry the writing central node's
|
||||||
|
/// identifier when the caller hasn't already supplied one. Optional /
|
||||||
|
/// defaulting-to-null so M4 test composition roots that don't pass a
|
||||||
|
/// provider keep working — the caller-wins discipline means an absent
|
||||||
|
/// provider simply leaves SourceNode at whatever the caller set (often
|
||||||
|
/// null, which is the legacy behaviour).
|
||||||
/// </summary>
|
/// </summary>
|
||||||
public CentralAuditWriter(
|
public CentralAuditWriter(
|
||||||
IServiceProvider services,
|
IServiceProvider services,
|
||||||
ILogger<CentralAuditWriter> logger,
|
ILogger<CentralAuditWriter> logger,
|
||||||
IAuditPayloadFilter? filter = null,
|
IAuditPayloadFilter? filter = null,
|
||||||
ICentralAuditWriteFailureCounter? failureCounter = null)
|
ICentralAuditWriteFailureCounter? failureCounter = null,
|
||||||
|
INodeIdentityProvider? nodeIdentity = null)
|
||||||
{
|
{
|
||||||
_services = services ?? throw new ArgumentNullException(nameof(services));
|
_services = services ?? throw new ArgumentNullException(nameof(services));
|
||||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||||
_filter = filter;
|
_filter = filter;
|
||||||
_failureCounter = failureCounter ?? new NoOpCentralAuditWriteFailureCounter();
|
_failureCounter = failureCounter ?? new NoOpCentralAuditWriteFailureCounter();
|
||||||
|
_nodeIdentity = nodeIdentity;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
@@ -93,6 +103,18 @@ public sealed class CentralAuditWriter : ICentralAuditWriter
|
|||||||
// M4 test composition roots (no filter passed) working unchanged.
|
// M4 test composition roots (no filter passed) working unchanged.
|
||||||
var filtered = _filter?.Apply(evt) ?? evt;
|
var filtered = _filter?.Apply(evt) ?? evt;
|
||||||
|
|
||||||
|
// SourceNode-stamping (Task 12): caller-provided value wins
|
||||||
|
// (supports any future direct-write callsite that already has its
|
||||||
|
// own node id); otherwise stamp from the local
|
||||||
|
// INodeIdentityProvider, when one is wired. Production DI on
|
||||||
|
// central nodes always supplies the provider; legacy test
|
||||||
|
// composition roots that don't pass it leave SourceNode at
|
||||||
|
// whatever the caller set (often null), preserving back-compat.
|
||||||
|
if (filtered.SourceNode is null && _nodeIdentity?.NodeName is { } nodeName)
|
||||||
|
{
|
||||||
|
filtered = filtered with { SourceNode = nodeName };
|
||||||
|
}
|
||||||
|
|
||||||
await using var scope = _services.CreateAsyncScope();
|
await using var scope = _services.CreateAsyncScope();
|
||||||
var repo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
|
var repo = scope.ServiceProvider.GetRequiredService<IAuditLogRepository>();
|
||||||
var stamped = filtered with { IngestedAtUtc = DateTime.UtcNow };
|
var stamped = filtered with { IngestedAtUtc = DateTime.UtcNow };
|
||||||
|
|||||||
@@ -183,7 +183,14 @@ public static class ServiceCollectionExtensions
|
|||||||
sp,
|
sp,
|
||||||
sp.GetRequiredService<ILogger<CentralAuditWriter>>(),
|
sp.GetRequiredService<ILogger<CentralAuditWriter>>(),
|
||||||
sp.GetRequiredService<IAuditPayloadFilter>(),
|
sp.GetRequiredService<IAuditPayloadFilter>(),
|
||||||
sp.GetRequiredService<ICentralAuditWriteFailureCounter>()));
|
sp.GetRequiredService<ICentralAuditWriteFailureCounter>(),
|
||||||
|
// SourceNode-stamping (Task 12): wire the local node identity so
|
||||||
|
// central-origin rows (Notification Outbox dispatch, Inbound API)
|
||||||
|
// carry the writing node's identifier when the caller hasn't
|
||||||
|
// already supplied one. GetRequiredService — the production
|
||||||
|
// composition root in SiteServiceRegistration registers the
|
||||||
|
// provider as a singleton on both site and central paths.
|
||||||
|
sp.GetRequiredService<INodeIdentityProvider>()));
|
||||||
|
|
||||||
return services;
|
return services;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -65,12 +65,12 @@ public class AuditLogRepository : IAuditLogRepository
|
|||||||
$@"IF NOT EXISTS (SELECT 1 FROM dbo.AuditLog WHERE EventId = {evt.EventId})
|
$@"IF NOT EXISTS (SELECT 1 FROM dbo.AuditLog WHERE EventId = {evt.EventId})
|
||||||
INSERT INTO dbo.AuditLog
|
INSERT INTO dbo.AuditLog
|
||||||
(EventId, OccurredAtUtc, IngestedAtUtc, Channel, Kind, CorrelationId, ExecutionId, ParentExecutionId,
|
(EventId, OccurredAtUtc, IngestedAtUtc, Channel, Kind, CorrelationId, ExecutionId, ParentExecutionId,
|
||||||
SourceSiteId, SourceInstanceId, SourceScript, Actor, Target, Status,
|
SourceSiteId, SourceNode, SourceInstanceId, SourceScript, Actor, Target, Status,
|
||||||
HttpStatus, DurationMs, ErrorMessage, ErrorDetail, RequestSummary,
|
HttpStatus, DurationMs, ErrorMessage, ErrorDetail, RequestSummary,
|
||||||
ResponseSummary, PayloadTruncated, Extra, ForwardState)
|
ResponseSummary, PayloadTruncated, Extra, ForwardState)
|
||||||
VALUES
|
VALUES
|
||||||
({evt.EventId}, {evt.OccurredAtUtc}, {evt.IngestedAtUtc}, {channel}, {kind}, {evt.CorrelationId}, {evt.ExecutionId}, {evt.ParentExecutionId},
|
({evt.EventId}, {evt.OccurredAtUtc}, {evt.IngestedAtUtc}, {channel}, {kind}, {evt.CorrelationId}, {evt.ExecutionId}, {evt.ParentExecutionId},
|
||||||
{evt.SourceSiteId}, {evt.SourceInstanceId}, {evt.SourceScript}, {evt.Actor}, {evt.Target}, {status},
|
{evt.SourceSiteId}, {evt.SourceNode}, {evt.SourceInstanceId}, {evt.SourceScript}, {evt.Actor}, {evt.Target}, {status},
|
||||||
{evt.HttpStatus}, {evt.DurationMs}, {evt.ErrorMessage}, {evt.ErrorDetail}, {evt.RequestSummary},
|
{evt.HttpStatus}, {evt.DurationMs}, {evt.ErrorMessage}, {evt.ErrorDetail}, {evt.RequestSummary},
|
||||||
{evt.ResponseSummary}, {evt.PayloadTruncated}, {evt.Extra}, {forwardState});",
|
{evt.ResponseSummary}, {evt.PayloadTruncated}, {evt.Extra}, {forwardState});",
|
||||||
ct);
|
ct);
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ using Microsoft.Extensions.Logging.Abstractions;
|
|||||||
using NSubstitute;
|
using NSubstitute;
|
||||||
using NSubstitute.ExceptionExtensions;
|
using NSubstitute.ExceptionExtensions;
|
||||||
using ScadaLink.AuditLog.Central;
|
using ScadaLink.AuditLog.Central;
|
||||||
|
using ScadaLink.AuditLog.Tests.TestSupport;
|
||||||
using ScadaLink.Commons.Entities.Audit;
|
using ScadaLink.Commons.Entities.Audit;
|
||||||
using ScadaLink.Commons.Interfaces.Repositories;
|
using ScadaLink.Commons.Interfaces.Repositories;
|
||||||
using ScadaLink.Commons.Interfaces.Services;
|
using ScadaLink.Commons.Interfaces.Services;
|
||||||
@@ -124,4 +125,59 @@ public class CentralAuditWriterTests
|
|||||||
Assert.Throws<ArgumentNullException>(
|
Assert.Throws<ArgumentNullException>(
|
||||||
() => new CentralAuditWriter(services, null!));
|
() => new CentralAuditWriter(services, null!));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ----- SourceNode stamping (Task 12) ----- //
|
||||||
|
|
||||||
|
private static (CentralAuditWriter writer, IAuditLogRepository repo) BuildWriterWithIdentity(
|
||||||
|
INodeIdentityProvider? nodeIdentity)
|
||||||
|
{
|
||||||
|
var repo = Substitute.For<IAuditLogRepository>();
|
||||||
|
var services = new ServiceCollection();
|
||||||
|
services.AddScoped(_ => repo);
|
||||||
|
var provider = services.BuildServiceProvider();
|
||||||
|
var writer = new CentralAuditWriter(
|
||||||
|
provider,
|
||||||
|
NullLogger<CentralAuditWriter>.Instance,
|
||||||
|
filter: null,
|
||||||
|
failureCounter: null,
|
||||||
|
nodeIdentity: nodeIdentity);
|
||||||
|
return (writer, repo);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_StampsSourceNodeFromProvider_WhenEventHasNone()
|
||||||
|
{
|
||||||
|
var (writer, repo) = BuildWriterWithIdentity(new FakeNodeIdentityProvider("central-a"));
|
||||||
|
|
||||||
|
await writer.WriteAsync(NewEvent());
|
||||||
|
|
||||||
|
await repo.Received(1).InsertIfNotExistsAsync(
|
||||||
|
Arg.Is<AuditEvent>(e => e.SourceNode == "central-a"),
|
||||||
|
Arg.Any<CancellationToken>());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_PreservesCallerProvidedSourceNode()
|
||||||
|
{
|
||||||
|
var (writer, repo) = BuildWriterWithIdentity(new FakeNodeIdentityProvider("central-a"));
|
||||||
|
var evt = NewEvent() with { SourceNode = "central-b" };
|
||||||
|
|
||||||
|
await writer.WriteAsync(evt);
|
||||||
|
|
||||||
|
await repo.Received(1).InsertIfNotExistsAsync(
|
||||||
|
Arg.Is<AuditEvent>(e => e.SourceNode == "central-b"),
|
||||||
|
Arg.Any<CancellationToken>());
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task WriteAsync_LeavesSourceNodeNull_WhenProviderReturnsNull()
|
||||||
|
{
|
||||||
|
var (writer, repo) = BuildWriterWithIdentity(new FakeNodeIdentityProvider(nodeName: null));
|
||||||
|
|
||||||
|
await writer.WriteAsync(NewEvent());
|
||||||
|
|
||||||
|
await repo.Received(1).InsertIfNotExistsAsync(
|
||||||
|
Arg.Is<AuditEvent>(e => e.SourceNode == null),
|
||||||
|
Arg.Any<CancellationToken>());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -50,6 +50,56 @@ public class AuditLogRepositoryTests : IClassFixture<MsSqlMigrationFixture>
|
|||||||
Assert.Equal(evt.EventId, loaded[0].EventId);
|
Assert.Equal(evt.EventId, loaded[0].EventId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task InsertIfNotExistsAsync_PersistsSourceNode()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
var siteId = NewSiteId();
|
||||||
|
await using var context = CreateContext();
|
||||||
|
var repo = new AuditLogRepository(context);
|
||||||
|
|
||||||
|
var evt = NewEvent(
|
||||||
|
siteId,
|
||||||
|
occurredAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
||||||
|
sourceNode: "central-a");
|
||||||
|
await repo.InsertIfNotExistsAsync(evt);
|
||||||
|
|
||||||
|
await using var readContext = CreateContext();
|
||||||
|
var loaded = await readContext.Set<AuditEvent>()
|
||||||
|
.Where(e => e.SourceSiteId == siteId)
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
Assert.Single(loaded);
|
||||||
|
Assert.Equal("central-a", loaded[0].SourceNode);
|
||||||
|
}
|
||||||
|
|
||||||
|
[SkippableFact]
|
||||||
|
public async Task InsertIfNotExistsAsync_PersistsNullSourceNode()
|
||||||
|
{
|
||||||
|
Skip.IfNot(_fixture.Available, _fixture.SkipReason);
|
||||||
|
|
||||||
|
var siteId = NewSiteId();
|
||||||
|
await using var context = CreateContext();
|
||||||
|
var repo = new AuditLogRepository(context);
|
||||||
|
|
||||||
|
// Caller passes null SourceNode (e.g. an unconfigured node) — the
|
||||||
|
// column should persist as NULL, not as the empty string.
|
||||||
|
var evt = NewEvent(
|
||||||
|
siteId,
|
||||||
|
occurredAtUtc: new DateTime(2026, 5, 20, 10, 0, 0, DateTimeKind.Utc),
|
||||||
|
sourceNode: null);
|
||||||
|
await repo.InsertIfNotExistsAsync(evt);
|
||||||
|
|
||||||
|
await using var readContext = CreateContext();
|
||||||
|
var loaded = await readContext.Set<AuditEvent>()
|
||||||
|
.Where(e => e.SourceSiteId == siteId)
|
||||||
|
.ToListAsync();
|
||||||
|
|
||||||
|
Assert.Single(loaded);
|
||||||
|
Assert.Null(loaded[0].SourceNode);
|
||||||
|
}
|
||||||
|
|
||||||
[SkippableFact]
|
[SkippableFact]
|
||||||
public async Task InsertIfNotExistsAsync_DuplicateEventId_IsNoOp_NoExceptionNoDuplicate()
|
public async Task InsertIfNotExistsAsync_DuplicateEventId_IsNoOp_NoExceptionNoDuplicate()
|
||||||
{
|
{
|
||||||
@@ -962,7 +1012,8 @@ public class AuditLogRepositoryTests : IClassFixture<MsSqlMigrationFixture>
|
|||||||
AuditStatus status = AuditStatus.Delivered,
|
AuditStatus status = AuditStatus.Delivered,
|
||||||
string? errorMessage = null,
|
string? errorMessage = null,
|
||||||
Guid? executionId = null,
|
Guid? executionId = null,
|
||||||
Guid? parentExecutionId = null) =>
|
Guid? parentExecutionId = null,
|
||||||
|
string? sourceNode = null) =>
|
||||||
new()
|
new()
|
||||||
{
|
{
|
||||||
EventId = Guid.NewGuid(),
|
EventId = Guid.NewGuid(),
|
||||||
@@ -971,6 +1022,7 @@ public class AuditLogRepositoryTests : IClassFixture<MsSqlMigrationFixture>
|
|||||||
Kind = kind,
|
Kind = kind,
|
||||||
Status = status,
|
Status = status,
|
||||||
SourceSiteId = siteId,
|
SourceSiteId = siteId,
|
||||||
|
SourceNode = sourceNode,
|
||||||
ErrorMessage = errorMessage,
|
ErrorMessage = errorMessage,
|
||||||
ExecutionId = executionId,
|
ExecutionId = executionId,
|
||||||
ParentExecutionId = parentExecutionId,
|
ParentExecutionId = parentExecutionId,
|
||||||
|
|||||||
Reference in New Issue
Block a user