fix(site-event-logging): resolve SiteEventLogging-005,007,008,010 — background async writer, drop concrete downcast, surface write failures, test coverage
This commit is contained in:
@@ -19,12 +19,14 @@ public class EventLogPurgeService : BackgroundService
|
||||
private readonly ILogger<EventLogPurgeService> _logger;
|
||||
|
||||
public EventLogPurgeService(
|
||||
ISiteEventLogger eventLogger,
|
||||
SiteEventLogger eventLogger,
|
||||
IOptions<SiteEventLogOptions> options,
|
||||
ILogger<EventLogPurgeService> logger)
|
||||
{
|
||||
// We need the concrete type to funnel access through its shared lock.
|
||||
_eventLogger = (SiteEventLogger)eventLogger;
|
||||
// Depend on the concrete recorder directly: purge must funnel database access
|
||||
// through its lock-guarded WithConnection. Taking ISiteEventLogger and
|
||||
// downcasting would throw InvalidCastException for any other implementation.
|
||||
_eventLogger = eventLogger;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
@@ -18,11 +18,14 @@ public class EventLogQueryService : IEventLogQueryService
|
||||
private readonly ILogger<EventLogQueryService> _logger;
|
||||
|
||||
public EventLogQueryService(
|
||||
ISiteEventLogger eventLogger,
|
||||
SiteEventLogger eventLogger,
|
||||
IOptions<SiteEventLogOptions> options,
|
||||
ILogger<EventLogQueryService> logger)
|
||||
{
|
||||
_eventLogger = (SiteEventLogger)eventLogger;
|
||||
// Depend on the concrete recorder directly: queries must funnel database
|
||||
// access through its lock-guarded WithConnection. Taking ISiteEventLogger and
|
||||
// downcasting would throw InvalidCastException for any other implementation.
|
||||
_eventLogger = eventLogger;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
}
|
||||
|
||||
@@ -9,7 +9,13 @@ public static class ServiceCollectionExtensions
|
||||
/// </summary>
|
||||
public static IServiceCollection AddSiteEventLogging(this IServiceCollection services)
|
||||
{
|
||||
services.AddSingleton<ISiteEventLogger, SiteEventLogger>();
|
||||
// The recorder is registered as a concrete singleton and the interface is
|
||||
// forwarded to the same instance. The purge and query services depend on the
|
||||
// concrete SiteEventLogger directly (they need its lock-guarded WithConnection)
|
||||
// rather than downcasting an ISiteEventLogger, which would throw
|
||||
// InvalidCastException for any other ISiteEventLogger implementation.
|
||||
services.AddSingleton<SiteEventLogger>();
|
||||
services.AddSingleton<ISiteEventLogger>(sp => sp.GetRequiredService<SiteEventLogger>());
|
||||
services.AddSingleton<IEventLogQueryService, EventLogQueryService>();
|
||||
services.AddHostedService<EventLogPurgeService>();
|
||||
return services;
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
using System.Threading.Channels;
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -10,15 +11,28 @@ namespace ScadaLink.SiteEventLogging;
|
||||
/// On failover, the new active node starts a fresh log.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// A single <see cref="SqliteConnection"/> is owned here and is NOT thread-safe.
|
||||
/// All access — recording, querying, purging — must be funnelled through
|
||||
/// <see cref="WithConnection"/>, which serialises callers on a shared lock.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Event recording is offloaded to a dedicated background writer thread (fed by an
|
||||
/// unbounded <see cref="Channel{T}"/>). <see cref="LogEventAsync"/> only validates
|
||||
/// its arguments and enqueues, so callers — typically Akka actor threads on hot
|
||||
/// paths — never block on disk I/O or on contention for the write lock. The
|
||||
/// returned <see cref="Task"/> completes once the event is durably persisted and
|
||||
/// faults if the write fails, so failures are observable rather than swallowed.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _connection;
|
||||
private readonly ILogger<SiteEventLogger> _logger;
|
||||
private readonly object _writeLock = new();
|
||||
private readonly Channel<PendingEvent> _writeQueue;
|
||||
private readonly Task _writerLoop;
|
||||
private long _failedWriteCount;
|
||||
private bool _disposed;
|
||||
|
||||
public SiteEventLogger(
|
||||
@@ -34,8 +48,22 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
_connection.Open();
|
||||
|
||||
InitializeSchema();
|
||||
|
||||
_writeQueue = Channel.CreateUnbounded<PendingEvent>(new UnboundedChannelOptions
|
||||
{
|
||||
SingleReader = true,
|
||||
SingleWriter = false,
|
||||
});
|
||||
_writerLoop = Task.Run(ProcessWriteQueueAsync);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Number of event writes that have failed (SQLite error, disk full, etc.)
|
||||
/// since this logger was created. Surfaced so Health Monitoring can detect a
|
||||
/// logging outage instead of relying on a local log line nobody is watching.
|
||||
/// </summary>
|
||||
public long FailedWriteCount => Interlocked.Read(ref _failedWriteCount);
|
||||
|
||||
/// <summary>
|
||||
/// Runs <paramref name="action"/> against the shared connection while holding the
|
||||
/// write lock, so purge / query / record callers on different threads never use
|
||||
@@ -112,42 +140,110 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(source);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(message);
|
||||
|
||||
var timestamp = DateTimeOffset.UtcNow.ToString("o");
|
||||
var pending = new PendingEvent(
|
||||
DateTimeOffset.UtcNow.ToString("o"),
|
||||
eventType,
|
||||
severity,
|
||||
instanceId,
|
||||
source,
|
||||
message,
|
||||
details);
|
||||
|
||||
try
|
||||
// Enqueue only — the actual SQLite write happens on the background writer
|
||||
// thread so the caller (an Akka actor thread on a hot path) never blocks
|
||||
// on disk I/O or on contention for the write lock.
|
||||
if (!_writeQueue.Writer.TryWrite(pending))
|
||||
{
|
||||
WithConnection(connection =>
|
||||
// The channel is unbounded, so the only way TryWrite fails is that the
|
||||
// writer has been completed (logger disposed). Drop silently — there is
|
||||
// nowhere to persist the event.
|
||||
pending.Completion.TrySetResult();
|
||||
}
|
||||
|
||||
return pending.Completion.Task;
|
||||
}
|
||||
|
||||
private async Task ProcessWriteQueueAsync()
|
||||
{
|
||||
await foreach (var pending in _writeQueue.Reader.ReadAllAsync().ConfigureAwait(false))
|
||||
{
|
||||
try
|
||||
{
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO site_events (timestamp, event_type, severity, instance_id, source, message, details)
|
||||
VALUES ($timestamp, $event_type, $severity, $instance_id, $source, $message, $details)
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$timestamp", timestamp);
|
||||
cmd.Parameters.AddWithValue("$event_type", eventType);
|
||||
cmd.Parameters.AddWithValue("$severity", severity);
|
||||
cmd.Parameters.AddWithValue("$instance_id", (object?)instanceId ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$source", source);
|
||||
cmd.Parameters.AddWithValue("$message", message);
|
||||
cmd.Parameters.AddWithValue("$details", (object?)details ?? DBNull.Value);
|
||||
cmd.ExecuteNonQuery();
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to record event: {EventType} from {Source}", eventType, source);
|
||||
}
|
||||
var written = WithConnection(connection =>
|
||||
{
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO site_events (timestamp, event_type, severity, instance_id, source, message, details)
|
||||
VALUES ($timestamp, $event_type, $severity, $instance_id, $source, $message, $details)
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$timestamp", pending.Timestamp);
|
||||
cmd.Parameters.AddWithValue("$event_type", pending.EventType);
|
||||
cmd.Parameters.AddWithValue("$severity", pending.Severity);
|
||||
cmd.Parameters.AddWithValue("$instance_id", (object?)pending.InstanceId ?? DBNull.Value);
|
||||
cmd.Parameters.AddWithValue("$source", pending.Source);
|
||||
cmd.Parameters.AddWithValue("$message", pending.Message);
|
||||
cmd.Parameters.AddWithValue("$details", (object?)pending.Details ?? DBNull.Value);
|
||||
cmd.ExecuteNonQuery();
|
||||
});
|
||||
|
||||
return Task.CompletedTask;
|
||||
// WithConnection returns false only when the logger has been
|
||||
// disposed; the event simply cannot be persisted in that case.
|
||||
pending.Completion.TrySetResult();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
// SiteEventLogging-008: a write failure must be observable. Count it
|
||||
// (Health Monitoring reads FailedWriteCount) and fault the caller's
|
||||
// Task instead of silently discarding the exception.
|
||||
Interlocked.Increment(ref _failedWriteCount);
|
||||
_logger.LogError(ex, "Failed to record event: {EventType} from {Source}",
|
||||
pending.EventType, pending.Source);
|
||||
pending.Completion.TrySetException(ex);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
Task? writerLoop = null;
|
||||
lock (_writeLock)
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
// Stop accepting new events and let the writer loop drain.
|
||||
_writeQueue.Writer.TryComplete();
|
||||
writerLoop = _writerLoop;
|
||||
}
|
||||
|
||||
// Wait for the writer loop to finish outside the lock — the loop itself
|
||||
// acquires the lock for each write.
|
||||
try
|
||||
{
|
||||
writerLoop?.Wait(TimeSpan.FromSeconds(5));
|
||||
}
|
||||
catch (AggregateException)
|
||||
{
|
||||
// A faulted writer loop has already been logged per event; nothing more
|
||||
// to do during disposal.
|
||||
}
|
||||
|
||||
lock (_writeLock)
|
||||
{
|
||||
_connection.Dispose();
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>An event awaiting persistence by the background writer.</summary>
|
||||
private sealed record PendingEvent(
|
||||
string Timestamp,
|
||||
string EventType,
|
||||
string Severity,
|
||||
string? InstanceId,
|
||||
string Source,
|
||||
string Message,
|
||||
string? Details)
|
||||
{
|
||||
public TaskCompletionSource Completion { get; } =
|
||||
new(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user