fix(site-event-logging): resolve SiteEventLogging-001/002/003, re-triage 004 — incremental auto_vacuum, cap-purge guard, write-lock connection access
This commit is contained in:
@@ -1,4 +1,3 @@
|
||||
using Microsoft.Data.Sqlite;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Options;
|
||||
@@ -12,6 +11,9 @@ namespace ScadaLink.SiteEventLogging;
|
||||
/// </summary>
|
||||
public class EventLogPurgeService : BackgroundService
|
||||
{
|
||||
/// <summary>Number of events deleted per cap-purge batch.</summary>
|
||||
private const int CapPurgeBatchSize = 1000;
|
||||
|
||||
private readonly SiteEventLogger _eventLogger;
|
||||
private readonly SiteEventLogOptions _options;
|
||||
private readonly ILogger<EventLogPurgeService> _logger;
|
||||
@@ -21,7 +23,7 @@ public class EventLogPurgeService : BackgroundService
|
||||
IOptions<SiteEventLogOptions> options,
|
||||
ILogger<EventLogPurgeService> logger)
|
||||
{
|
||||
// We need the concrete type to access the connection
|
||||
// We need the concrete type to funnel access through its shared lock.
|
||||
_eventLogger = (SiteEventLogger)eventLogger;
|
||||
_options = options.Value;
|
||||
_logger = logger;
|
||||
@@ -61,10 +63,13 @@ public class EventLogPurgeService : BackgroundService
|
||||
{
|
||||
var cutoff = DateTimeOffset.UtcNow.AddDays(-_options.RetentionDays).ToString("o");
|
||||
|
||||
using var cmd = _eventLogger.Connection.CreateCommand();
|
||||
cmd.CommandText = "DELETE FROM site_events WHERE timestamp < $cutoff";
|
||||
cmd.Parameters.AddWithValue("$cutoff", cutoff);
|
||||
var deleted = cmd.ExecuteNonQuery();
|
||||
var deleted = _eventLogger.WithConnection(connection =>
|
||||
{
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = "DELETE FROM site_events WHERE timestamp < $cutoff";
|
||||
cmd.Parameters.AddWithValue("$cutoff", cutoff);
|
||||
return cmd.ExecuteNonQuery();
|
||||
});
|
||||
|
||||
if (deleted > 0)
|
||||
{
|
||||
@@ -74,8 +79,8 @@ public class EventLogPurgeService : BackgroundService
|
||||
|
||||
private void PurgeByStorageCap()
|
||||
{
|
||||
var currentSizeBytes = GetDatabaseSizeBytes();
|
||||
var capBytes = (long)_options.MaxStorageMb * 1024 * 1024;
|
||||
var currentSizeBytes = GetDatabaseSizeBytes();
|
||||
|
||||
if (currentSizeBytes <= capBytes)
|
||||
return;
|
||||
@@ -84,37 +89,77 @@ public class EventLogPurgeService : BackgroundService
|
||||
"Event log size {Size:F1} MB exceeds cap {Cap} MB — purging oldest events",
|
||||
currentSizeBytes / (1024.0 * 1024.0), _options.MaxStorageMb);
|
||||
|
||||
// Delete oldest events in batches until under the cap
|
||||
// Delete the oldest events in batches until the database is under the cap.
|
||||
// The loop also stops if the on-disk size fails to decrease across an
|
||||
// iteration (e.g. if vacuum cannot reclaim space), so a cap that can never
|
||||
// be met does not silently empty the entire table.
|
||||
while (currentSizeBytes > capBytes)
|
||||
{
|
||||
using var cmd = _eventLogger.Connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
DELETE FROM site_events WHERE id IN (
|
||||
SELECT id FROM site_events ORDER BY id ASC LIMIT 1000
|
||||
)
|
||||
""";
|
||||
var deleted = cmd.ExecuteNonQuery();
|
||||
if (deleted == 0) break;
|
||||
var previousSizeBytes = currentSizeBytes;
|
||||
|
||||
// Reclaim space
|
||||
using var vacuumCmd = _eventLogger.Connection.CreateCommand();
|
||||
vacuumCmd.CommandText = "PRAGMA incremental_vacuum";
|
||||
vacuumCmd.ExecuteNonQuery();
|
||||
var deleted = _eventLogger.WithConnection(connection =>
|
||||
{
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = $"""
|
||||
DELETE FROM site_events WHERE id IN (
|
||||
SELECT id FROM site_events ORDER BY id ASC LIMIT {CapPurgeBatchSize}
|
||||
)
|
||||
""";
|
||||
var rows = cmd.ExecuteNonQuery();
|
||||
|
||||
// Reclaim free pages so page_count/freelist measurement reflects the
|
||||
// delete. Effective because auto_vacuum = INCREMENTAL is set at schema
|
||||
// creation; harmless otherwise.
|
||||
using var vacuumCmd = connection.CreateCommand();
|
||||
vacuumCmd.CommandText = "PRAGMA incremental_vacuum";
|
||||
vacuumCmd.ExecuteNonQuery();
|
||||
|
||||
return rows;
|
||||
});
|
||||
|
||||
if (deleted == 0)
|
||||
break;
|
||||
|
||||
currentSizeBytes = GetDatabaseSizeBytes();
|
||||
|
||||
if (currentSizeBytes >= previousSizeBytes)
|
||||
{
|
||||
// Size is not shrinking despite deletes — stop rather than wipe the
|
||||
// whole table. This should not happen now that logical size is
|
||||
// measured, but guards against any future regression.
|
||||
_logger.LogWarning(
|
||||
"Event log size did not decrease after a cap-purge batch ({Size:F1} MB); " +
|
||||
"stopping to avoid emptying the log",
|
||||
currentSizeBytes / (1024.0 * 1024.0));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Returns the logical size of the database in bytes — only pages that hold live
|
||||
/// data, excluding free pages on the freelist. Measuring logical size (rather than
|
||||
/// the raw file size from <c>page_count</c>) means the storage-cap loop observes
|
||||
/// space being reclaimed even if free pages have not yet been returned to the OS.
|
||||
/// </summary>
|
||||
internal long GetDatabaseSizeBytes()
|
||||
{
|
||||
using var pageCountCmd = _eventLogger.Connection.CreateCommand();
|
||||
pageCountCmd.CommandText = "PRAGMA page_count";
|
||||
var pageCount = (long)pageCountCmd.ExecuteScalar()!;
|
||||
return _eventLogger.WithConnection(connection =>
|
||||
{
|
||||
using var pageCountCmd = connection.CreateCommand();
|
||||
pageCountCmd.CommandText = "PRAGMA page_count";
|
||||
var pageCount = (long)pageCountCmd.ExecuteScalar()!;
|
||||
|
||||
using var pageSizeCmd = _eventLogger.Connection.CreateCommand();
|
||||
pageSizeCmd.CommandText = "PRAGMA page_size";
|
||||
var pageSize = (long)pageSizeCmd.ExecuteScalar()!;
|
||||
using var freeListCmd = connection.CreateCommand();
|
||||
freeListCmd.CommandText = "PRAGMA freelist_count";
|
||||
var freeListCount = (long)freeListCmd.ExecuteScalar()!;
|
||||
|
||||
return pageCount * pageSize;
|
||||
using var pageSizeCmd = connection.CreateCommand();
|
||||
pageSizeCmd.CommandText = "PRAGMA page_size";
|
||||
var pageSize = (long)pageSizeCmd.ExecuteScalar()!;
|
||||
|
||||
var usedPages = Math.Max(0, pageCount - freeListCount);
|
||||
return usedPages * pageSize;
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -33,7 +33,6 @@ public class EventLogQueryService : IEventLogQueryService
|
||||
{
|
||||
var pageSize = request.PageSize > 0 ? request.PageSize : _options.QueryPageSize;
|
||||
|
||||
using var cmd = _eventLogger.Connection.CreateCommand();
|
||||
var whereClauses = new List<string>();
|
||||
var parameters = new List<SqliteParameter>();
|
||||
|
||||
@@ -84,32 +83,42 @@ public class EventLogQueryService : IEventLogQueryService
|
||||
? "WHERE " + string.Join(" AND ", whereClauses)
|
||||
: "";
|
||||
|
||||
// Fetch pageSize + 1 to determine if there are more results
|
||||
cmd.CommandText = $"""
|
||||
SELECT id, timestamp, event_type, severity, instance_id, source, message, details
|
||||
FROM site_events
|
||||
{whereClause}
|
||||
ORDER BY id ASC
|
||||
LIMIT $limit
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$limit", pageSize + 1);
|
||||
foreach (var p in parameters)
|
||||
cmd.Parameters.Add(p);
|
||||
|
||||
var entries = new List<EventLogEntry>();
|
||||
using var reader = cmd.ExecuteReader();
|
||||
while (reader.Read())
|
||||
// Run the read against the shared connection under the logger's write
|
||||
// lock — the connection is not thread-safe and is also used by the
|
||||
// recorder and the purge service on other threads.
|
||||
var entries = _eventLogger.WithConnection(connection =>
|
||||
{
|
||||
entries.Add(new EventLogEntry(
|
||||
Id: reader.GetInt64(0),
|
||||
Timestamp: DateTimeOffset.Parse(reader.GetString(1)),
|
||||
EventType: reader.GetString(2),
|
||||
Severity: reader.GetString(3),
|
||||
InstanceId: reader.IsDBNull(4) ? null : reader.GetString(4),
|
||||
Source: reader.GetString(5),
|
||||
Message: reader.GetString(6),
|
||||
Details: reader.IsDBNull(7) ? null : reader.GetString(7)));
|
||||
}
|
||||
using var cmd = connection.CreateCommand();
|
||||
|
||||
// Fetch pageSize + 1 to determine if there are more results
|
||||
cmd.CommandText = $"""
|
||||
SELECT id, timestamp, event_type, severity, instance_id, source, message, details
|
||||
FROM site_events
|
||||
{whereClause}
|
||||
ORDER BY id ASC
|
||||
LIMIT $limit
|
||||
""";
|
||||
cmd.Parameters.AddWithValue("$limit", pageSize + 1);
|
||||
foreach (var p in parameters)
|
||||
cmd.Parameters.Add(p);
|
||||
|
||||
var rows = new List<EventLogEntry>();
|
||||
using var reader = cmd.ExecuteReader();
|
||||
while (reader.Read())
|
||||
{
|
||||
rows.Add(new EventLogEntry(
|
||||
Id: reader.GetInt64(0),
|
||||
Timestamp: DateTimeOffset.Parse(reader.GetString(1)),
|
||||
EventType: reader.GetString(2),
|
||||
Severity: reader.GetString(3),
|
||||
InstanceId: reader.IsDBNull(4) ? null : reader.GetString(4),
|
||||
Source: reader.GetString(5),
|
||||
Message: reader.GetString(6),
|
||||
Details: reader.IsDBNull(7) ? null : reader.GetString(7)));
|
||||
}
|
||||
|
||||
return rows;
|
||||
});
|
||||
|
||||
var hasMore = entries.Count > pageSize;
|
||||
if (hasMore)
|
||||
|
||||
@@ -9,6 +9,11 @@ namespace ScadaLink.SiteEventLogging;
|
||||
/// Only the active node generates events. Not replicated to standby.
|
||||
/// On failover, the new active node starts a fresh log.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A single <see cref="SqliteConnection"/> is owned here and is NOT thread-safe.
|
||||
/// All access — recording, querying, purging — must be funnelled through
|
||||
/// <see cref="WithConnection"/>, which serialises callers on a shared lock.
|
||||
/// </remarks>
|
||||
public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
{
|
||||
private readonly SqliteConnection _connection;
|
||||
@@ -31,10 +36,50 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
InitializeSchema();
|
||||
}
|
||||
|
||||
internal SqliteConnection Connection => _connection;
|
||||
/// <summary>
|
||||
/// Runs <paramref name="action"/> against the shared connection while holding the
|
||||
/// write lock, so purge / query / record callers on different threads never use
|
||||
/// the non-thread-safe <see cref="SqliteConnection"/> concurrently.
|
||||
/// Returns <see langword="false"/> without invoking the action if the logger has
|
||||
/// been disposed.
|
||||
/// </summary>
|
||||
internal bool WithConnection(Action<SqliteConnection> action)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(action);
|
||||
lock (_writeLock)
|
||||
{
|
||||
if (_disposed) return false;
|
||||
action(_connection);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Runs <paramref name="func"/> against the shared connection while holding the
|
||||
/// write lock. Throws <see cref="ObjectDisposedException"/> if the logger has
|
||||
/// been disposed (callers that need a result cannot proceed without the database).
|
||||
/// </summary>
|
||||
internal T WithConnection<T>(Func<SqliteConnection, T> func)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(func);
|
||||
lock (_writeLock)
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
return func(_connection);
|
||||
}
|
||||
}
|
||||
|
||||
private void InitializeSchema()
|
||||
{
|
||||
// auto_vacuum must be set before any table is created for it to take effect
|
||||
// on a fresh database. With INCREMENTAL mode, PRAGMA incremental_vacuum can
|
||||
// later reclaim free pages so the storage-cap purge can shrink the file.
|
||||
using (var pragmaCmd = _connection.CreateCommand())
|
||||
{
|
||||
pragmaCmd.CommandText = "PRAGMA auto_vacuum = INCREMENTAL";
|
||||
pragmaCmd.ExecuteNonQuery();
|
||||
}
|
||||
|
||||
using var cmd = _connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
CREATE TABLE IF NOT EXISTS site_events (
|
||||
@@ -69,13 +114,11 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
|
||||
var timestamp = DateTimeOffset.UtcNow.ToString("o");
|
||||
|
||||
lock (_writeLock)
|
||||
try
|
||||
{
|
||||
if (_disposed) return Task.CompletedTask;
|
||||
|
||||
try
|
||||
WithConnection(connection =>
|
||||
{
|
||||
using var cmd = _connection.CreateCommand();
|
||||
using var cmd = connection.CreateCommand();
|
||||
cmd.CommandText = """
|
||||
INSERT INTO site_events (timestamp, event_type, severity, instance_id, source, message, details)
|
||||
VALUES ($timestamp, $event_type, $severity, $instance_id, $source, $message, $details)
|
||||
@@ -88,11 +131,11 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
cmd.Parameters.AddWithValue("$message", message);
|
||||
cmd.Parameters.AddWithValue("$details", (object?)details ?? DBNull.Value);
|
||||
cmd.ExecuteNonQuery();
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to record event: {EventType} from {Source}", eventType, source);
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogError(ex, "Failed to record event: {EventType} from {Source}", eventType, source);
|
||||
}
|
||||
|
||||
return Task.CompletedTask;
|
||||
@@ -100,8 +143,11 @@ public class SiteEventLogger : ISiteEventLogger, IDisposable
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_connection.Dispose();
|
||||
lock (_writeLock)
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_connection.Dispose();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user