feat(comms): IngestAuditEvents RPC + AuditEventDto proto (#23)

This commit is contained in:
Joseph Doherty
2026-05-20 12:29:58 -04:00
parent ff8766ec8b
commit 5c3d601198
4 changed files with 1516 additions and 30 deletions

View File

@@ -3,9 +3,11 @@ option csharp_namespace = "ScadaLink.Communication.Grpc";
package sitestream;
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto"; // Int32Value
service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
}
message InstanceStreamRequest {
@@ -63,3 +65,31 @@ message AlarmStateUpdate {
AlarmLevelEnum level = 6; // ALARM_LEVEL_NONE for binary trigger types; set by HiLo.
string message = 7; // Optional per-band operator message; empty when unset.
}
// Audit Log (#23) telemetry: single lifecycle event ferried from a site SQLite
// hot-path row to central via IngestAuditEvents. Mirrors AuditEvent (Commons)
// minus the site-local ForwardState and the central IngestedAtUtc (set on ingest).
message AuditEventDto {
string event_id = 1;
google.protobuf.Timestamp occurred_at_utc = 2;
string channel = 3;
string kind = 4;
string correlation_id = 5; // empty string represents null
string source_site_id = 6;
string source_instance_id = 7;
string source_script = 8;
string actor = 9;
string target = 10;
string status = 11;
google.protobuf.Int32Value http_status = 12; // null when absent
google.protobuf.Int32Value duration_ms = 13;
string error_message = 14;
string error_detail = 15;
string request_summary = 16;
string response_summary = 17;
bool payload_truncated = 18;
string extra = 19;
}
message AuditEventBatch { repeated AuditEventDto events = 1; }
message IngestAck { repeated string accepted_event_ids = 1; }

File diff suppressed because it is too large Load Diff

View File

@@ -49,6 +49,10 @@ namespace ScadaLink.Communication.Grpc {
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.InstanceStreamRequest> __Marshaller_sitestream_InstanceStreamRequest = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.InstanceStreamRequest.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.SiteStreamEvent> __Marshaller_sitestream_SiteStreamEvent = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.SiteStreamEvent.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.AuditEventBatch> __Marshaller_sitestream_AuditEventBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.AuditEventBatch.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.IngestAck> __Marshaller_sitestream_IngestAck = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.IngestAck.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent> __Method_SubscribeInstance = new grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>(
@@ -58,6 +62,14 @@ namespace ScadaLink.Communication.Grpc {
__Marshaller_sitestream_InstanceStreamRequest,
__Marshaller_sitestream_SiteStreamEvent);
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ScadaLink.Communication.Grpc.AuditEventBatch, global::ScadaLink.Communication.Grpc.IngestAck> __Method_IngestAuditEvents = new grpc::Method<global::ScadaLink.Communication.Grpc.AuditEventBatch, global::ScadaLink.Communication.Grpc.IngestAck>(
grpc::MethodType.Unary,
__ServiceName,
"IngestAuditEvents",
__Marshaller_sitestream_AuditEventBatch,
__Marshaller_sitestream_IngestAck);
/// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{
@@ -74,6 +86,12 @@ namespace ScadaLink.Communication.Grpc {
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::ScadaLink.Communication.Grpc.IngestAck> IngestAuditEvents(global::ScadaLink.Communication.Grpc.AuditEventBatch request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
}
/// <summary>Client for SiteStreamService</summary>
@@ -113,6 +131,26 @@ namespace ScadaLink.Communication.Grpc {
{
return CallInvoker.AsyncServerStreamingCall(__Method_SubscribeInstance, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ScadaLink.Communication.Grpc.IngestAck IngestAuditEvents(global::ScadaLink.Communication.Grpc.AuditEventBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestAuditEvents(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ScadaLink.Communication.Grpc.IngestAck IngestAuditEvents(global::ScadaLink.Communication.Grpc.AuditEventBatch request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_IngestAuditEvents, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ScadaLink.Communication.Grpc.IngestAck> IngestAuditEventsAsync(global::ScadaLink.Communication.Grpc.AuditEventBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestAuditEventsAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ScadaLink.Communication.Grpc.IngestAck> IngestAuditEventsAsync(global::ScadaLink.Communication.Grpc.AuditEventBatch request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_IngestAuditEvents, null, options, request);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration)
@@ -127,7 +165,8 @@ namespace ScadaLink.Communication.Grpc {
public static grpc::ServerServiceDefinition BindService(SiteStreamServiceBase serviceImpl)
{
return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance).Build();
.AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance)
.AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents).Build();
}
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
@@ -138,6 +177,7 @@ namespace ScadaLink.Communication.Grpc {
public static void BindService(grpc::ServiceBinderBase serviceBinder, SiteStreamServiceBase serviceImpl)
{
serviceBinder.AddMethod(__Method_SubscribeInstance, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>(serviceImpl.SubscribeInstance));
serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ScadaLink.Communication.Grpc.AuditEventBatch, global::ScadaLink.Communication.Grpc.IngestAck>(serviceImpl.IngestAuditEvents));
}
}

View File

@@ -0,0 +1,123 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Protos;
/// <summary>
/// Wire-format round-trip tests for the Audit Log (#23) telemetry proto messages
/// (<see cref="AuditEventDto"/>, <see cref="AuditEventBatch"/>, <see cref="IngestAck"/>).
/// Locks the additive contract the site → central audit pipeline depends on.
/// </summary>
public class AuditEventProtoTests
{
[Fact]
public void AuditEventDto_RoundTrip_PreservesAllFields()
{
var occurredAt = Timestamp.FromDateTimeOffset(
new DateTimeOffset(2026, 5, 20, 10, 15, 30, 123, TimeSpan.Zero));
var original = new AuditEventDto
{
EventId = Guid.NewGuid().ToString(),
OccurredAtUtc = occurredAt,
Channel = "ApiOutbound",
Kind = "ApiCall",
CorrelationId = Guid.NewGuid().ToString(),
SourceSiteId = "site-1",
SourceInstanceId = "Pump01",
SourceScript = "OnDemand",
Actor = "design-key",
Target = "weather-api",
Status = "Delivered",
HttpStatus = 200,
DurationMs = 42,
ErrorMessage = "no error",
ErrorDetail = "stack",
RequestSummary = "GET /weather?city=brisbane",
ResponseSummary = "{ \"temp\": 22.5 }",
PayloadTruncated = true,
Extra = "{ \"retryCount\": 0 }"
};
var bytes = original.ToByteArray();
var deserialized = AuditEventDto.Parser.ParseFrom(bytes);
Assert.Equal(original.EventId, deserialized.EventId);
Assert.Equal(original.OccurredAtUtc, deserialized.OccurredAtUtc);
Assert.Equal(original.Channel, deserialized.Channel);
Assert.Equal(original.Kind, deserialized.Kind);
Assert.Equal(original.CorrelationId, deserialized.CorrelationId);
Assert.Equal(original.SourceSiteId, deserialized.SourceSiteId);
Assert.Equal(original.SourceInstanceId, deserialized.SourceInstanceId);
Assert.Equal(original.SourceScript, deserialized.SourceScript);
Assert.Equal(original.Actor, deserialized.Actor);
Assert.Equal(original.Target, deserialized.Target);
Assert.Equal(original.Status, deserialized.Status);
Assert.Equal(original.HttpStatus, deserialized.HttpStatus);
Assert.Equal(original.DurationMs, deserialized.DurationMs);
Assert.Equal(original.ErrorMessage, deserialized.ErrorMessage);
Assert.Equal(original.ErrorDetail, deserialized.ErrorDetail);
Assert.Equal(original.RequestSummary, deserialized.RequestSummary);
Assert.Equal(original.ResponseSummary, deserialized.ResponseSummary);
Assert.Equal(original.PayloadTruncated, deserialized.PayloadTruncated);
Assert.Equal(original.Extra, deserialized.Extra);
}
[Fact]
public void AuditEventDto_NullableInt_AbsentByDefault_NotIncludedInWire()
{
// Int32Value fields (http_status, duration_ms) are wrapper-typed in proto;
// when unset, the wrapper is absent, not serialized, and deserializes back to null.
var original = new AuditEventDto
{
EventId = Guid.NewGuid().ToString(),
OccurredAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
Channel = "Notification",
Kind = "NotifySend",
Status = "Submitted"
};
Assert.Null(original.HttpStatus);
Assert.Null(original.DurationMs);
var bytes = original.ToByteArray();
var deserialized = AuditEventDto.Parser.ParseFrom(bytes);
Assert.Null(deserialized.HttpStatus);
Assert.Null(deserialized.DurationMs);
}
[Fact]
public void AuditEventBatch_Empty_RoundTrip_Yields_EmptyEvents()
{
var original = new AuditEventBatch();
Assert.Empty(original.Events);
var bytes = original.ToByteArray();
var deserialized = AuditEventBatch.Parser.ParseFrom(bytes);
Assert.Empty(deserialized.Events);
}
[Fact]
public void IngestAck_PreservesAcceptedEventIds()
{
var id1 = Guid.NewGuid().ToString();
var id2 = Guid.NewGuid().ToString();
var id3 = Guid.NewGuid().ToString();
var original = new IngestAck();
original.AcceptedEventIds.Add(id1);
original.AcceptedEventIds.Add(id2);
original.AcceptedEventIds.Add(id3);
var bytes = original.ToByteArray();
var deserialized = IngestAck.Parser.ParseFrom(bytes);
Assert.Equal(3, deserialized.AcceptedEventIds.Count);
Assert.Equal(id1, deserialized.AcceptedEventIds[0]);
Assert.Equal(id2, deserialized.AcceptedEventIds[1]);
Assert.Equal(id3, deserialized.AcceptedEventIds[2]);
}
}