feat(comms): IngestCachedTelemetry RPC + CachedTelemetryPacket proto (#23 M3)

This commit is contained in:
Joseph Doherty
2026-05-20 14:24:13 -04:00
parent de110f8b42
commit 2b54290c7f
4 changed files with 1334 additions and 15 deletions

View File

@@ -8,6 +8,7 @@ import "google/protobuf/wrappers.proto"; // Int32Value
service SiteStreamService { service SiteStreamService {
rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent); rpc SubscribeInstance(InstanceStreamRequest) returns (stream SiteStreamEvent);
rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck); rpc IngestAuditEvents(AuditEventBatch) returns (IngestAck);
rpc IngestCachedTelemetry(CachedTelemetryBatch) returns (IngestAck);
} }
message InstanceStreamRequest { message InstanceStreamRequest {
@@ -93,3 +94,28 @@ message AuditEventDto {
message AuditEventBatch { repeated AuditEventDto events = 1; } message AuditEventBatch { repeated AuditEventDto events = 1; }
message IngestAck { repeated string accepted_event_ids = 1; } message IngestAck { repeated string accepted_event_ids = 1; }
// Audit Log (#23) M3 cached-call combined telemetry: a single packet carries
// both the AuditEvent row to insert and the SiteCalls operational-state upsert
// for one lifecycle event of a cached outbound call. Central writes both rows
// in one MS SQL transaction so the audit and operational mirrors never drift.
message SiteCallOperationalDto {
string tracked_operation_id = 1; // GUID string ("D" format)
string channel = 2; // "ApiOutbound" | "DbOutbound"
string target = 3;
string source_site = 4;
string status = 5; // AuditStatus name
int32 retry_count = 6;
string last_error = 7; // empty when null
google.protobuf.Int32Value http_status = 8;
google.protobuf.Timestamp created_at_utc = 9;
google.protobuf.Timestamp updated_at_utc = 10;
google.protobuf.Timestamp terminal_at_utc = 11; // absent when not terminal
}
message CachedTelemetryPacket {
AuditEventDto audit_event = 1;
SiteCallOperationalDto operational = 2;
}
message CachedTelemetryBatch { repeated CachedTelemetryPacket packets = 1; }

File diff suppressed because it is too large Load Diff

View File

@@ -53,6 +53,8 @@ namespace ScadaLink.Communication.Grpc {
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.AuditEventBatch> __Marshaller_sitestream_AuditEventBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.AuditEventBatch.Parser)); static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.AuditEventBatch> __Marshaller_sitestream_AuditEventBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.AuditEventBatch.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.IngestAck> __Marshaller_sitestream_IngestAck = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.IngestAck.Parser)); static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.IngestAck> __Marshaller_sitestream_IngestAck = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.IngestAck.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Marshaller<global::ScadaLink.Communication.Grpc.CachedTelemetryBatch> __Marshaller_sitestream_CachedTelemetryBatch = grpc::Marshallers.Create(__Helper_SerializeMessage, context => __Helper_DeserializeMessage(context, global::ScadaLink.Communication.Grpc.CachedTelemetryBatch.Parser));
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent> __Method_SubscribeInstance = new grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>( static readonly grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent> __Method_SubscribeInstance = new grpc::Method<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>(
@@ -70,6 +72,14 @@ namespace ScadaLink.Communication.Grpc {
__Marshaller_sitestream_AuditEventBatch, __Marshaller_sitestream_AuditEventBatch,
__Marshaller_sitestream_IngestAck); __Marshaller_sitestream_IngestAck);
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
static readonly grpc::Method<global::ScadaLink.Communication.Grpc.CachedTelemetryBatch, global::ScadaLink.Communication.Grpc.IngestAck> __Method_IngestCachedTelemetry = new grpc::Method<global::ScadaLink.Communication.Grpc.CachedTelemetryBatch, global::ScadaLink.Communication.Grpc.IngestAck>(
grpc::MethodType.Unary,
__ServiceName,
"IngestCachedTelemetry",
__Marshaller_sitestream_CachedTelemetryBatch,
__Marshaller_sitestream_IngestAck);
/// <summary>Service descriptor</summary> /// <summary>Service descriptor</summary>
public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor public static global::Google.Protobuf.Reflection.ServiceDescriptor Descriptor
{ {
@@ -92,6 +102,12 @@ namespace ScadaLink.Communication.Grpc {
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, "")); throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
} }
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::System.Threading.Tasks.Task<global::ScadaLink.Communication.Grpc.IngestAck> IngestCachedTelemetry(global::ScadaLink.Communication.Grpc.CachedTelemetryBatch request, grpc::ServerCallContext context)
{
throw new grpc::RpcException(new grpc::Status(grpc::StatusCode.Unimplemented, ""));
}
} }
/// <summary>Client for SiteStreamService</summary> /// <summary>Client for SiteStreamService</summary>
@@ -151,6 +167,26 @@ namespace ScadaLink.Communication.Grpc {
{ {
return CallInvoker.AsyncUnaryCall(__Method_IngestAuditEvents, null, options, request); return CallInvoker.AsyncUnaryCall(__Method_IngestAuditEvents, null, options, request);
} }
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ScadaLink.Communication.Grpc.IngestAck IngestCachedTelemetry(global::ScadaLink.Communication.Grpc.CachedTelemetryBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestCachedTelemetry(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual global::ScadaLink.Communication.Grpc.IngestAck IngestCachedTelemetry(global::ScadaLink.Communication.Grpc.CachedTelemetryBatch request, grpc::CallOptions options)
{
return CallInvoker.BlockingUnaryCall(__Method_IngestCachedTelemetry, null, options, request);
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ScadaLink.Communication.Grpc.IngestAck> IngestCachedTelemetryAsync(global::ScadaLink.Communication.Grpc.CachedTelemetryBatch request, grpc::Metadata headers = null, global::System.DateTime? deadline = null, global::System.Threading.CancellationToken cancellationToken = default(global::System.Threading.CancellationToken))
{
return IngestCachedTelemetryAsync(request, new grpc::CallOptions(headers, deadline, cancellationToken));
}
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
public virtual grpc::AsyncUnaryCall<global::ScadaLink.Communication.Grpc.IngestAck> IngestCachedTelemetryAsync(global::ScadaLink.Communication.Grpc.CachedTelemetryBatch request, grpc::CallOptions options)
{
return CallInvoker.AsyncUnaryCall(__Method_IngestCachedTelemetry, null, options, request);
}
/// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary> /// <summary>Creates a new instance of client from given <c>ClientBaseConfiguration</c>.</summary>
[global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)] [global::System.CodeDom.Compiler.GeneratedCode("grpc_csharp_plugin", null)]
protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration) protected override SiteStreamServiceClient NewInstance(ClientBaseConfiguration configuration)
@@ -166,7 +202,8 @@ namespace ScadaLink.Communication.Grpc {
{ {
return grpc::ServerServiceDefinition.CreateBuilder() return grpc::ServerServiceDefinition.CreateBuilder()
.AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance) .AddMethod(__Method_SubscribeInstance, serviceImpl.SubscribeInstance)
.AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents).Build(); .AddMethod(__Method_IngestAuditEvents, serviceImpl.IngestAuditEvents)
.AddMethod(__Method_IngestCachedTelemetry, serviceImpl.IngestCachedTelemetry).Build();
} }
/// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic. /// <summary>Register service method with a service binder with or without implementation. Useful when customizing the service binding logic.
@@ -178,6 +215,7 @@ namespace ScadaLink.Communication.Grpc {
{ {
serviceBinder.AddMethod(__Method_SubscribeInstance, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>(serviceImpl.SubscribeInstance)); serviceBinder.AddMethod(__Method_SubscribeInstance, serviceImpl == null ? null : new grpc::ServerStreamingServerMethod<global::ScadaLink.Communication.Grpc.InstanceStreamRequest, global::ScadaLink.Communication.Grpc.SiteStreamEvent>(serviceImpl.SubscribeInstance));
serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ScadaLink.Communication.Grpc.AuditEventBatch, global::ScadaLink.Communication.Grpc.IngestAck>(serviceImpl.IngestAuditEvents)); serviceBinder.AddMethod(__Method_IngestAuditEvents, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ScadaLink.Communication.Grpc.AuditEventBatch, global::ScadaLink.Communication.Grpc.IngestAck>(serviceImpl.IngestAuditEvents));
serviceBinder.AddMethod(__Method_IngestCachedTelemetry, serviceImpl == null ? null : new grpc::UnaryServerMethod<global::ScadaLink.Communication.Grpc.CachedTelemetryBatch, global::ScadaLink.Communication.Grpc.IngestAck>(serviceImpl.IngestCachedTelemetry));
} }
} }

View File

@@ -0,0 +1,173 @@
using Google.Protobuf;
using Google.Protobuf.WellKnownTypes;
using ScadaLink.Communication.Grpc;
namespace ScadaLink.Communication.Tests.Protos;
/// <summary>
/// Wire-format round-trip tests for the Audit Log (#23) M3 cached-telemetry
/// proto messages (<see cref="SiteCallOperationalDto"/>,
/// <see cref="CachedTelemetryPacket"/>, <see cref="CachedTelemetryBatch"/>).
/// Locks the additive contract the central dual-write transaction depends on.
/// </summary>
public class CachedTelemetryProtoTests
{
private static AuditEventDto NewAuditDto(Guid? id = null) => new()
{
EventId = (id ?? Guid.NewGuid()).ToString(),
OccurredAtUtc = Timestamp.FromDateTimeOffset(
new DateTimeOffset(2026, 5, 20, 10, 15, 30, 123, TimeSpan.Zero)),
Channel = "ApiOutbound",
Kind = "CachedSubmit",
Status = "Submitted",
SourceSiteId = "site-1",
};
[Fact]
public void SiteCallOperationalDto_RoundTrip_PreservesAllFields()
{
var createdAt = Timestamp.FromDateTimeOffset(
new DateTimeOffset(2026, 5, 20, 10, 0, 0, TimeSpan.Zero));
var updatedAt = Timestamp.FromDateTimeOffset(
new DateTimeOffset(2026, 5, 20, 10, 5, 0, TimeSpan.Zero));
var terminalAt = Timestamp.FromDateTimeOffset(
new DateTimeOffset(2026, 5, 20, 10, 10, 0, TimeSpan.Zero));
var original = new SiteCallOperationalDto
{
TrackedOperationId = Guid.NewGuid().ToString(),
Channel = "ApiOutbound",
Target = "ERP.GetOrder",
SourceSite = "site-melbourne",
Status = "Delivered",
RetryCount = 3,
LastError = "transient 503",
HttpStatus = 200,
CreatedAtUtc = createdAt,
UpdatedAtUtc = updatedAt,
TerminalAtUtc = terminalAt,
};
var bytes = original.ToByteArray();
var deserialized = SiteCallOperationalDto.Parser.ParseFrom(bytes);
Assert.Equal(original.TrackedOperationId, deserialized.TrackedOperationId);
Assert.Equal(original.Channel, deserialized.Channel);
Assert.Equal(original.Target, deserialized.Target);
Assert.Equal(original.SourceSite, deserialized.SourceSite);
Assert.Equal(original.Status, deserialized.Status);
Assert.Equal(original.RetryCount, deserialized.RetryCount);
Assert.Equal(original.LastError, deserialized.LastError);
Assert.Equal(original.HttpStatus, deserialized.HttpStatus);
Assert.Equal(original.CreatedAtUtc, deserialized.CreatedAtUtc);
Assert.Equal(original.UpdatedAtUtc, deserialized.UpdatedAtUtc);
Assert.Equal(original.TerminalAtUtc, deserialized.TerminalAtUtc);
}
[Fact]
public void SiteCallOperationalDto_TerminalAt_AbsentWhenNotTerminal()
{
// Lifecycle events prior to the terminal step leave TerminalAtUtc unset;
// the well-known Timestamp wrapper is absent on the wire (null in C#).
var dto = new SiteCallOperationalDto
{
TrackedOperationId = Guid.NewGuid().ToString(),
Channel = "DbOutbound",
Target = "warehouse.dbo.WriteOrder",
SourceSite = "site-brisbane",
Status = "Attempted",
RetryCount = 1,
CreatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
UpdatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
Assert.Null(dto.TerminalAtUtc);
var bytes = dto.ToByteArray();
var deserialized = SiteCallOperationalDto.Parser.ParseFrom(bytes);
Assert.Null(deserialized.TerminalAtUtc);
}
[Fact]
public void SiteCallOperationalDto_NullableHttpStatus_AbsentByDefault()
{
// Int32Value wrapper-typed http_status — unset round-trips as null,
// matching DB nullable column semantics for non-API cached writes.
var dto = new SiteCallOperationalDto
{
TrackedOperationId = Guid.NewGuid().ToString(),
Channel = "DbOutbound",
Target = "warehouse.dbo.WriteOrder",
SourceSite = "site-brisbane",
Status = "Submitted",
RetryCount = 0,
CreatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
UpdatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
Assert.Null(dto.HttpStatus);
var bytes = dto.ToByteArray();
var deserialized = SiteCallOperationalDto.Parser.ParseFrom(bytes);
Assert.Null(deserialized.HttpStatus);
}
[Fact]
public void CachedTelemetryPacket_RoundTrip_PreservesNestedEntities()
{
var trackedOpId = Guid.NewGuid().ToString();
var auditDto = NewAuditDto();
auditDto.Target = "ERP.GetOrder";
auditDto.Status = "Attempted";
var operationalDto = new SiteCallOperationalDto
{
TrackedOperationId = trackedOpId,
Channel = "ApiOutbound",
Target = "ERP.GetOrder",
SourceSite = "site-1",
Status = "Attempted",
RetryCount = 2,
HttpStatus = 503,
LastError = "Service unavailable",
CreatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
UpdatedAtUtc = Timestamp.FromDateTimeOffset(DateTimeOffset.UtcNow),
};
var original = new CachedTelemetryPacket
{
AuditEvent = auditDto,
Operational = operationalDto,
};
var bytes = original.ToByteArray();
var deserialized = CachedTelemetryPacket.Parser.ParseFrom(bytes);
Assert.NotNull(deserialized.AuditEvent);
Assert.Equal(auditDto.EventId, deserialized.AuditEvent.EventId);
Assert.Equal(auditDto.Target, deserialized.AuditEvent.Target);
Assert.Equal(auditDto.Status, deserialized.AuditEvent.Status);
Assert.NotNull(deserialized.Operational);
Assert.Equal(trackedOpId, deserialized.Operational.TrackedOperationId);
Assert.Equal(operationalDto.Channel, deserialized.Operational.Channel);
Assert.Equal(operationalDto.Status, deserialized.Operational.Status);
Assert.Equal(operationalDto.RetryCount, deserialized.Operational.RetryCount);
Assert.Equal(operationalDto.HttpStatus, deserialized.Operational.HttpStatus);
Assert.Equal(operationalDto.LastError, deserialized.Operational.LastError);
}
[Fact]
public void CachedTelemetryBatch_Empty_RoundTrip_Yields_EmptyPackets()
{
var original = new CachedTelemetryBatch();
Assert.Empty(original.Packets);
var bytes = original.ToByteArray();
var deserialized = CachedTelemetryBatch.Parser.ParseFrom(bytes);
Assert.Empty(deserialized.Packets);
}
}