diff --git a/code-reviews/Worker/findings.md b/code-reviews/Worker/findings.md index f740510..5f8077e 100644 --- a/code-reviews/Worker/findings.md +++ b/code-reviews/Worker/findings.md @@ -7,7 +7,7 @@ | Review date | 2026-05-18 | | Commit reviewed | `6c64030` | | Status | Reviewed | -| Open findings | 7 | +| Open findings | 0 | ## Checklist coverage @@ -157,13 +157,13 @@ | Severity | Low | | Category | Performance & resource management | | Location | `src/MxGateway.Worker/Ipc/WorkerFrameReader.cs:31,49`, `src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs:57-58` | -| Status | Open | +| Status | Resolved | **Description:** Every frame read allocates a fresh 4-byte length buffer and a payload `byte[]`; every write allocates `ToByteArray()` plus a 4-byte prefix. On the hot event-drain path (batches of up to 128 `WorkerEvent` frames every 25 ms) this produces steady gen-0 garbage. `WorkerFrameWriter` also effectively serializes twice (`CalculateSize()` then `ToByteArray()`). **Recommendation:** Reuse a pooled buffer / `ArrayPool` for the length prefix and payload, and write directly into a pooled buffer using `CodedOutputStream`. Low priority unless event throughput is high. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — `WorkerFrameWriter.WriteAsync` now serializes the envelope exactly once into a single frame buffer that carries the 4-byte length prefix followed by the payload, via `envelope.WriteTo(new Span(frame, sizeof(uint), payloadLength))`. This eliminates the redundant second serialization pass (`ToByteArray()` re-runs `CalculateSize()` internally), the separate length-prefix array, and the separate prefix `WriteAsync`/extra `FlushAsync` round. `WorkerFrameReader.ReadAsync` now rents its payload buffer from `ArrayPool.Shared` and returns it in a `finally` once `WorkerEnvelope.Parser.ParseFrom(payload, 0, length)` has copied what it needs; `ReadExactlyOrThrowAsync` gained an explicit `count` parameter so it honours the logical frame length rather than the (possibly larger) rented buffer length. The 4-byte length-prefix buffer is left as a per-call stack-sized allocation — pooling a 4-byte array is not worthwhile. Verified by the new regression test `WorkerFrameProtocolTests.ReadAsync_WithVaryingFrameSizes_ParsesEachFrameExactly`, which reads a large frame followed by a small frame through one reader to prove the pooled buffer is sliced to each frame's own length and never leaks stale trailing bytes; the existing round-trip, malformed-payload, and concurrent-write tests continue to pass. ### Worker-010 @@ -172,13 +172,13 @@ | Severity | Low | | Category | Correctness & logic bugs | | Location | `src/MxGateway.Worker/Conversion/VariantConverter.cs:204-226` | -| Status | Open | +| Status | Resolved | **Description:** `ConvertInt64Scalar` is reached for `TypeCode.UInt32` and `TypeCode.Int64`. For a `uint` with `expectedDataType == MxDataType.Time`, the value is treated as a Windows `FILETIME` via `DateTime.FromFileTimeUtc(longValue)`; a 32-bit FILETIME is never a valid full FILETIME, so this silently produces a near-epoch timestamp rather than a raw/diagnostic value. Unlikely in practice but a silent misconversion. **Recommendation:** Only apply the `MxDataType.Time` FILETIME projection for 64-bit source types; for `uint` fall through to integer or raw. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — `ConvertInt64Scalar`'s `MxDataType.Time` FILETIME projection is now gated on `value is long`. A genuine 64-bit `long` still projects to a `Timestamp` via `DateTime.FromFileTimeUtc`; a 32-bit `uint` — which can only hold the low half of a FILETIME — now falls through to the integer projection (`DataType = Integer`, `Int64Value`) instead of silently producing a bogus near-1601 timestamp. Verified by the regression test `VariantConverterTests.Convert_WithUInt32AndExpectedTime_DoesNotProjectFileTime`; the existing `Convert_WithFileTimeAndExpectedTime_ProjectsTimestamp` (a `long` FILETIME) continues to pass, confirming the 64-bit path is unchanged. ### Worker-011 @@ -187,13 +187,13 @@ | Severity | Low | | Category | Correctness & logic bugs | | Location | `src/MxGateway.Worker/Ipc/WorkerPipeClient.cs:169-171` | -| Status | Open | +| Status | Resolved | **Description:** `retryAttempts` is computed as `(connectTimeout / min(connectTimeout, attemptTimeout)) - 1`. With defaults (30000 / 2000) this yields 14 retries, but each retry also incurs Polly exponential backoff. The overall `connectDeadline` (`CancelAfter(connectTimeout)`) is the real bound, so the computed attempt count can be larger or smaller than the time budget allows, and the formula is opaque. **Recommendation:** Drive retries purely off the `connectDeadline` token (Polly stops when cancelled) and drop the fragile attempt-count arithmetic, or add a comment explaining the intent. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — The opaque `retryAttempts` arithmetic in `ConnectWithRetryAsync` was removed. `MaxRetryAttempts` is now `int.MaxValue`, so the retry loop is bounded solely by the `connectDeadline` linked token (`CancelAfter(_connectTimeoutMilliseconds)`): Polly stops retrying the moment that token is cancelled, making the overall connect timeout the single source of truth and correctly accounting for the exponential backoff between attempts (which the old formula ignored). A comment documents the intent. No new test was added — the change does not alter observable behavior (the deadline was always the real bound; the old formula always permitted more attempts than fit the budget), and the existing `WorkerPipeClientTests.RunAsync_RetriesUntilPipeServerAppears` (server appears mid-retry) and `RunAsync_WhenPipeNeverAppears_ThrowsTimeoutException` (deadline ends the loop) already cover both retry-until-success and deadline-bounded termination. ### Worker-012 @@ -202,13 +202,15 @@ | Severity | Low | | Category | Documentation & comments | | Location | `src/MxGateway.Worker/MxAccess/MxAccessAlarmEventSink.cs:44-55`, `src/MxGateway.Worker/MxAccess/WnWrapAlarmConsumer.cs:38-43`, `src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs:106-112` | -| Status | Open | +| Status | Resolved | **Description:** Multiple comments describe the alarm path as not-yet-wired future work ("PR A.2 — COM-side subscription scaffold … the worker advertises no alarm subscription", "the worker bootstrap will gain a thin 'run-on-STA' wrapper as part of A.3"). As of commit 6c64030 the alarm command handler, STA poll loop, and `SubscribeAlarms`/`AcknowledgeAlarm`/`QueryActiveAlarms` are all wired. These comments are stale and misleading. **Recommendation:** Update the XML docs/comments to describe the shipped behavior; remove the "future PR" framing. -**Resolution:** _(open)_ +**Re-triage:** The `WnWrapAlarmConsumer.cs:38-43` citation is inaccurate — those lines were rewritten by Worker-001 and already describe the shipped no-internal-timer threading model correctly; nothing stale there. Conversely, two stale comments the finding did *not* cite were found on the same alarm path and fixed under the same root cause: `AlarmDispatcher.cs`'s `` still framed the dispatcher as "the in-process slice of A.3" with a "companion follow-up PR" adding the (now-shipped) `SubscribeAlarmsCommand`/`AcknowledgeAlarmCommand`/`QueryActiveAlarmsCommand`, and stated the consumer "polls on a `System.Threading.Timer` thread today" — a claim made false by Worker-001's removal of that timer; and `AlarmCommandHandler.cs`'s `` likewise asserted "the wnwrap consumer's polling timer fires on a thread-pool thread". The discovery document `docs/AlarmClientDiscovery.md` (referenced by the source comments) was deliberately left untouched: it is a historical research log of the investigation that chose the shipped design, not API/contract/lifecycle prose, and the source comments cite only its still-accurate "Option A — captured" payload schema. + +**Resolution:** 2026-05-18 — Rewrote the stale alarm-path comments to describe shipped behavior with no "future PR / A.2 / A.3" framing. `MxAccessAlarmEventSink`: the class `` and the `Attach` comment now explain that `AlarmDispatcher` owns the consumer→sink→queue wire-up and that `Attach` carries only the session id (no COM-event subscription is needed because the polled wnwrap consumer raises transition events itself). `MxAccessEventMapper.CreateOnAlarmTransition`'s XML summary now states the worker drives it from `MxAccessAlarmEventSink.EnqueueTransition` once `AlarmDispatcher` decodes a wnwrap transition. `AlarmDispatcher` and `AlarmCommandHandler` `` were corrected to describe the shipped command surface and the no-internal-timer / STA-driven polling model (the `System.Threading.Timer` claims were factually wrong post-Worker-001). Pure documentation change — no behavior altered, no test needed; the build stays green. ### Worker-013 @@ -217,13 +219,15 @@ | Severity | Low | | Category | Testing coverage | | Location | `src/MxGateway.Worker/Sta/StaMessagePump.cs` | -| Status | Open | +| Status | Resolved | **Description:** `StaMessagePump` — the heart of COM event delivery (`MsgWaitForMultipleObjectsEx` + `PeekMessage`/`DispatchMessage`) — has no direct unit tests. `StaRuntimeTests` exercises it indirectly for command wake-up but never verifies that a posted Windows message actually wakes the wait and is dispatched, nor that `PumpPendingMessages` returns a correct count. The alarm poll-loop lifecycle in `MxAccessStaSession` (start/cancel/await on shutdown) also has no test. These are the most failure-sensitive paths in the module. **Recommendation:** Add tests that post a message to the STA thread and assert it is pumped, and tests covering alarm poll-loop start/stop and shutdown ordering. -**Resolution:** _(open)_ +**Re-triage:** This finding is stale as of the reviewed branch — the coverage it asks for already exists. `src/MxGateway.Worker.Tests/Sta/StaMessagePumpTests.cs` contains direct `StaMessagePump` tests covering null-argument validation, waking on a signalled event, returning on timeout, the zero-timeout conversion branch, `PumpPendingMessages` returning the correct count for messages posted to the STA thread (`PumpPendingMessages_MessagesPostedToStaThread_ReturnsCountProcessed`, `PumpPendingMessages_NoMessagesPosted_ReturnsZero`), and `WaitForWorkOrMessages` waking on a posted Windows message (`WaitForWorkOrMessages_WindowsMessagePosted_ReturnsForInputAvailable`) — exactly the "post a message and assert it is pumped" test the recommendation asks for. The alarm poll-loop lifecycle is covered by `MxAccessStaSessionTests.StartAsync_WithAlarmCommandHandlerFactory_PollOnceCalledViaSta` (start → poll runs on the STA) and `Dispose_StopsAlarmPollLoop` (Dispose joins the poll task; no further polls). The finding was raised against a stale view of the test project; no source or test change is required. Re-triaged as already resolved rather than fixed. + +**Resolution:** 2026-05-18 — No code change. Re-triaged: the requested direct `StaMessagePump` tests (including posted-message dispatch and pump count) and the alarm poll-loop start/stop lifecycle tests already exist in `StaMessagePumpTests.cs` and `MxAccessStaSessionTests.cs`. See the re-triage note above for the specific test names. ### Worker-014 @@ -232,13 +236,13 @@ | Severity | Low | | Category | Code organization & conventions | | Location | `src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs:33`, `:202` | -| Status | Open | +| Status | Resolved | **Description:** The file declares two public types — the `AlarmCommandHandler` class and the `IAlarmCommandHandler` interface. The C# style guide and the rest of the module follow one-public-type-per-file (e.g. interfaces in their own `I*.cs` files like `IMxAccessAlarmConsumer.cs`). **Recommendation:** Move `IAlarmCommandHandler` to its own `IAlarmCommandHandler.cs` for consistency. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — The `IAlarmCommandHandler` interface (with its XML docs) was moved verbatim out of `AlarmCommandHandler.cs` into a new `src/MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs`, with its own `using` directives (`System`, `System.Collections.Generic`, `MxGateway.Contracts.Proto`). `AlarmCommandHandler.cs` now declares one public type, matching the module's one-public-type-per-file convention (cf. `IMxAccessAlarmConsumer.cs`). Pure file-organization change — no API surface, behavior, or namespace changed; no test needed. The worker build is clean with zero warnings (no unused usings left behind in `AlarmCommandHandler.cs`). ### Worker-015 @@ -247,10 +251,10 @@ | Severity | Low | | Category | Correctness & logic bugs | | Location | `src/MxGateway.Worker/MxAccess/MxAccessEventQueue.cs:115-145` | -| Status | Open | +| Status | Resolved | **Description:** On overflow, `Enqueue` records the overflow fault and throws `MxAccessEventQueueOverflowException`; `MxAccessBaseEventSink.EnqueueEvent` catches it and calls `RecordFault` again. `RecordFault` is a no-op when a fault already exists, so the second call is harmless — but the intent is muddled, and there is no test asserting the dropped-event behavior. This is acceptable per the fail-fast design but undocumented at the call site. **Recommendation:** Add a brief comment in `EnqueueEvent` clarifying that an overflow exception is expected and already self-records its fault, so the catch is intentionally a near no-op. -**Resolution:** _(open)_ +**Resolution:** 2026-05-18 — Added a comment in `MxAccessBaseEventSink.EnqueueEvent`'s catch block (per the finding's recommendation) explaining that two distinct fail-fast failures land there: a conversion failure from `createEvent()` (recorded here as an `MxaccessEventConversionFailed` fault) and an `MxAccessEventQueueOverflowException` from `Enqueue` at capacity, which — per the fail-fast backpressure design in `docs/DesignDecisions.md` — drops the event and has *already* self-recorded a `QueueOverflow` fault inside `Enqueue`. Because `MxAccessEventQueue.RecordFault` keeps only the first fault, the catch's `RecordFault` call is then a deliberate near no-op rather than a second, conflicting fault. Pure comment change as recommended — no behavior altered. `docs/DesignDecisions.md` already documents the fail-fast event backpressure rule, so no doc change was required. diff --git a/src/MxGateway.Worker.Tests/Conversion/VariantConverterTests.cs b/src/MxGateway.Worker.Tests/Conversion/VariantConverterTests.cs index 865548e..49cd391 100644 --- a/src/MxGateway.Worker.Tests/Conversion/VariantConverterTests.cs +++ b/src/MxGateway.Worker.Tests/Conversion/VariantConverterTests.cs @@ -60,6 +60,26 @@ public sealed class VariantConverterTests Assert.Equal("VT_I8", converted.VariantType); } + /// + /// Worker-010 regression: a 32-bit with an expected + /// data type of must not be projected as a + /// Windows FILETIME. A uint can only hold the low 32 bits of a FILETIME, + /// which would silently render as a near-1601 timestamp; the converter + /// must fall through to an integer projection instead. + /// + [Fact] + public void Convert_WithUInt32AndExpectedTime_DoesNotProjectFileTime() + { + const uint value = 123456789u; + + MxValue converted = _converter.Convert(value, MxDataType.Time); + + Assert.Equal(MxDataType.Integer, converted.DataType); + Assert.Equal(MxValue.KindOneofCase.Int64Value, converted.KindCase); + Assert.Equal(value, converted.Int64Value); + Assert.Equal("VT_UI4", converted.VariantType); + } + /// Verifies that null-like values preserve their null semantics and variant type. /// Null-like value to convert. /// Expected variant type string. diff --git a/src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs b/src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs index dbd81c5..0ab1f55 100644 --- a/src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs +++ b/src/MxGateway.Worker.Tests/Ipc/WorkerFrameProtocolTests.cs @@ -118,6 +118,39 @@ public sealed class WorkerFrameProtocolTests Assert.Equal(new ulong[] { 1, 2, 3 }, new[] { first.Sequence, second.Sequence, third.Sequence }.OrderBy(sequence => sequence)); } + /// + /// Worker-009 regression: the reader rents its payload buffer from a + /// shared pool, so a rented buffer can be larger than the current frame + /// and may carry bytes from a previous, larger frame. Reading frames of + /// differing sizes back-to-back through one reader must parse each frame + /// using only its own payload length, never trailing pooled bytes. + /// + [Fact] + public async Task ReadAsync_WithVaryingFrameSizes_ParsesEachFrameExactly() + { + WorkerFrameProtocolOptions options = CreateOptions(); + using MemoryStream stream = new(); + WorkerFrameWriter writer = new(stream, options); + + // A large-payload frame followed by a small-payload frame: if the + // reader reused a pooled buffer without honouring the second frame's + // length, the small frame would parse with stale trailing bytes. + WorkerEnvelope large = CreateGatewayHelloEnvelope(sequence: 1); + large.GatewayHello.GatewayVersion = new string('x', 4096); + WorkerEnvelope small = CreateGatewayHelloEnvelope(sequence: 2); + + await writer.WriteAsync(large); + await writer.WriteAsync(small); + stream.Position = 0; + + WorkerFrameReader reader = new(stream, options); + WorkerEnvelope firstParsed = await reader.ReadAsync(); + WorkerEnvelope secondParsed = await reader.ReadAsync(); + + Assert.Equal(large, firstParsed); + Assert.Equal(small, secondParsed); + } + private static WorkerFrameProtocolOptions CreateOptions() { return new WorkerFrameProtocolOptions( diff --git a/src/MxGateway.Worker/Conversion/VariantConverter.cs b/src/MxGateway.Worker/Conversion/VariantConverter.cs index 4f55ee1..c368769 100644 --- a/src/MxGateway.Worker/Conversion/VariantConverter.cs +++ b/src/MxGateway.Worker/Conversion/VariantConverter.cs @@ -207,7 +207,14 @@ public sealed class VariantConverter MxDataType expectedDataType) { long longValue = System.Convert.ToInt64(value, CultureInfo.InvariantCulture); - if (expectedDataType == MxDataType.Time) + + // The MxDataType.Time projection treats the source as a Windows FILETIME + // (a 64-bit 100-ns tick count since 1601). Only a genuine 64-bit source + // (long) can carry a valid full FILETIME; a uint can only hold the low + // 32 bits, which DateTime.FromFileTimeUtc would silently render as a + // near-1601 timestamp. For uint sources fall through to the integer + // projection rather than producing a bogus timestamp. + if (expectedDataType == MxDataType.Time && value is long) { return new MxValue { diff --git a/src/MxGateway.Worker/Ipc/WorkerFrameReader.cs b/src/MxGateway.Worker/Ipc/WorkerFrameReader.cs index 6324ad6..591234c 100644 --- a/src/MxGateway.Worker/Ipc/WorkerFrameReader.cs +++ b/src/MxGateway.Worker/Ipc/WorkerFrameReader.cs @@ -1,4 +1,5 @@ using System; +using System.Buffers; using System.IO; using System.Threading; using System.Threading.Tasks; @@ -29,7 +30,7 @@ public sealed class WorkerFrameReader public async Task ReadAsync(CancellationToken cancellationToken = default) { byte[] lengthPrefix = new byte[sizeof(uint)]; - await ReadExactlyOrThrowAsync(lengthPrefix, cancellationToken).ConfigureAwait(false); + await ReadExactlyOrThrowAsync(lengthPrefix, lengthPrefix.Length, cancellationToken).ConfigureAwait(false); uint payloadLength = ReadUInt32LittleEndian(lengthPrefix); if (payloadLength == 0) @@ -46,20 +47,32 @@ public sealed class WorkerFrameReader $"Worker frame payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes."); } - byte[] payload = new byte[payloadLength]; - await ReadExactlyOrThrowAsync(payload, cancellationToken).ConfigureAwait(false); - + // Rent the payload buffer from the shared pool rather than allocating + // a fresh byte[] per frame. ParseFrom copies whatever it needs into + // the parsed message, so the rented buffer can be returned as soon as + // parsing completes. + int length = checked((int)payloadLength); + byte[] payload = ArrayPool.Shared.Rent(length); WorkerEnvelope envelope; try { - envelope = WorkerEnvelope.Parser.ParseFrom(payload); + await ReadExactlyOrThrowAsync(payload, length, cancellationToken).ConfigureAwait(false); + + try + { + envelope = WorkerEnvelope.Parser.ParseFrom(payload, 0, length); + } + catch (InvalidProtocolBufferException exception) + { + throw new WorkerFrameProtocolException( + WorkerFrameProtocolErrorCode.InvalidEnvelope, + "Worker frame payload is not a valid WorkerEnvelope protobuf message.", + exception); + } } - catch (InvalidProtocolBufferException exception) + finally { - throw new WorkerFrameProtocolException( - WorkerFrameProtocolErrorCode.InvalidEnvelope, - "Worker frame payload is not a valid WorkerEnvelope protobuf message.", - exception); + ArrayPool.Shared.Return(payload); } WorkerEnvelopeValidator.Validate(envelope, _options); @@ -77,13 +90,14 @@ public sealed class WorkerFrameReader private async Task ReadExactlyOrThrowAsync( byte[] buffer, + int count, CancellationToken cancellationToken) { int offset = 0; - while (offset < buffer.Length) + while (offset < count) { int bytesRead = await _stream - .ReadAsync(buffer, offset, buffer.Length - offset, cancellationToken) + .ReadAsync(buffer, offset, count - offset, cancellationToken) .ConfigureAwait(false); if (bytesRead == 0) diff --git a/src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs b/src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs index 3e68106..3513662 100644 --- a/src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs +++ b/src/MxGateway.Worker/Ipc/WorkerFrameWriter.cs @@ -54,15 +54,20 @@ public sealed class WorkerFrameWriter $"Worker envelope payload length {payloadLength} exceeds the configured maximum of {_options.MaxMessageBytes} bytes."); } - byte[] payload = envelope.ToByteArray(); - byte[] lengthPrefix = new byte[sizeof(uint)]; - WriteUInt32LittleEndian(lengthPrefix, (uint)payloadLength); + // Serialize once into a single buffer that carries the 4-byte + // length prefix followed by the payload, then issue one stream write. + // This avoids a second serialization pass (envelope.ToByteArray() + // would re-run CalculateSize internally), a separate prefix array, + // and a separate prefix write. + int frameLength = sizeof(uint) + payloadLength; + byte[] frame = new byte[frameLength]; + WriteUInt32LittleEndian(frame, (uint)payloadLength); + envelope.WriteTo(new Span(frame, sizeof(uint), payloadLength)); await _writeLock.WaitAsync(cancellationToken).ConfigureAwait(false); try { - await _stream.WriteAsync(lengthPrefix, 0, lengthPrefix.Length, cancellationToken).ConfigureAwait(false); - await _stream.WriteAsync(payload, 0, payload.Length, cancellationToken).ConfigureAwait(false); + await _stream.WriteAsync(frame, 0, frameLength, cancellationToken).ConfigureAwait(false); await _stream.FlushAsync(cancellationToken).ConfigureAwait(false); } finally diff --git a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs index f37cfaa..fb2817e 100644 --- a/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs +++ b/src/MxGateway.Worker/Ipc/WorkerPipeClient.cs @@ -166,14 +166,17 @@ public sealed class WorkerPipeClient : IWorkerPipeClient string pipeName, CancellationToken cancellationToken) { - int retryAttempts = Math.Max( - 0, - (_connectTimeoutMilliseconds / Math.Min(_connectTimeoutMilliseconds, _connectAttemptTimeoutMilliseconds)) - 1); - + // The real bound on connection attempts is the connectDeadline token + // below (CancelAfter(connectTimeout)): Polly stops retrying as soon as + // that token is cancelled. Driving retries purely off the deadline — + // rather than a fragile attempt-count formula that ignored the + // exponential backoff between attempts — keeps the time budget the + // single source of truth. MaxRetryAttempts is set to its maximum so it + // never ends the retry loop before the deadline does. ResiliencePipeline pipeline = new ResiliencePipelineBuilder() .AddRetry(new RetryStrategyOptions { - MaxRetryAttempts = retryAttempts, + MaxRetryAttempts = int.MaxValue, BackoffType = DelayBackoffType.Exponential, UseJitter = true, Delay = TimeSpan.FromMilliseconds(250), diff --git a/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs b/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs index e8b3236..3a97839 100644 --- a/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs +++ b/src/MxGateway.Worker/MxAccess/AlarmCommandHandler.cs @@ -24,10 +24,12 @@ namespace MxGateway.Worker.MxAccess; /// /// /// Threading: invoked from -/// which runs on the STA. The wnwrap consumer's polling timer -/// fires on a thread-pool thread; the only cross-thread surface -/// is the 's event handler, which -/// hand-offs into the thread-safe . +/// which runs on the STA. The wnwrap consumer owns no internal +/// timer — the worker's STA drives via +/// StaRuntime.InvokeAsync, so the consumer's transition +/// events fire on the same STA. The +/// 's event handler hands transitions +/// into the thread-safe . /// /// public sealed class AlarmCommandHandler : IAlarmCommandHandler @@ -191,58 +193,3 @@ public sealed class AlarmCommandHandler : IAlarmCommandHandler Unsubscribe(); } } - -/// -/// Per-session interface routing the worker's alarm IPC commands — -/// SubscribeAlarmsCommand, AcknowledgeAlarmCommand, -/// QueryActiveAlarmsCommand, UnsubscribeAlarmsCommand — -/// to the underlying . Production binding -/// is ; tests substitute a fake. -/// -public interface IAlarmCommandHandler : IDisposable -{ - /// Begin a subscription against the supplied AVEVA alarm-provider expression. - void Subscribe(string subscription, string sessionId); - - /// Tear down the active subscription. No-op if not subscribed. - void Unsubscribe(); - - /// Acknowledge a single alarm by GUID. Returns AVEVA's native status (0 = success). - int Acknowledge( - Guid alarmGuid, - string comment, - string operatorUser, - string operatorNode, - string operatorDomain, - string operatorFullName); - - /// - /// Acknowledge a single alarm by (name, provider, group) — used when - /// the caller has the human-readable reference but not the GUID. - /// - int AcknowledgeByName( - string alarmName, - string providerName, - string groupName, - string comment, - string operatorUser, - string operatorNode, - string operatorDomain, - string operatorFullName); - - /// - /// Snapshot the currently-active alarm set, optionally scoped to a - /// prefix matched against AlarmFullReference. - /// - IReadOnlyList QueryActive(string? alarmFilterPrefix); - - /// - /// Drives a single poll of the underlying alarm consumer on the - /// caller's thread. This is a no-op when there is no active - /// subscription. In production the caller is the worker's STA - /// (marshalled via StaRuntime.InvokeAsync), which satisfies - /// the ThreadingModel=Apartment requirement of - /// wwAlarmConsumerClass. - /// - void PollOnce(); -} diff --git a/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs index a70272b..cccc1ca 100644 --- a/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs +++ b/src/MxGateway.Worker/MxAccess/AlarmDispatcher.cs @@ -14,22 +14,20 @@ namespace MxGateway.Worker.MxAccess; /// /// /// -/// This is the in-process slice of A.3 — it proves the -/// consumer→sink→queue pipeline end-to-end without touching the -/// worker's IPC command framing. The companion follow-up PR adds -/// SubscribeAlarmsCommand / AcknowledgeAlarmCommand / -/// QueryActiveAlarmsCommand proto entries plus the gateway- -/// side WorkerAlarmRpcDispatcher that issues them. +/// The dispatcher carries the consumer→sink→queue pipeline. The +/// worker's IPC layer issues SubscribeAlarmsCommand / +/// AcknowledgeAlarmCommand / QueryActiveAlarmsCommand +/// through , which owns one +/// dispatcher per session. /// /// -/// Threading: polls on a -/// thread today; production -/// hosting should marshal the consumer onto the worker's STA via -/// StaRuntime.InvokeAsync. The dispatcher itself is purely -/// a pass-through, so it inherits whatever thread the consumer's -/// event handler fires on. Fan-out into EnqueueTransition -/// uses which is -/// thread-safe. +/// Threading: owns no internal +/// timer — the worker's STA drives polling via +/// StaRuntime.InvokeAsync(() => PollOnce()), so the +/// consumer's AlarmTransitionEmitted event fires on the STA. +/// The dispatcher is purely a pass-through, so it inherits that +/// thread. Fan-out into EnqueueTransition uses the +/// thread-safe . /// /// public sealed class AlarmDispatcher : IDisposable diff --git a/src/MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs b/src/MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs new file mode 100644 index 0000000..4a0718c --- /dev/null +++ b/src/MxGateway.Worker/MxAccess/IAlarmCommandHandler.cs @@ -0,0 +1,60 @@ +using System; +using System.Collections.Generic; +using MxGateway.Contracts.Proto; + +namespace MxGateway.Worker.MxAccess; + +/// +/// Per-session interface routing the worker's alarm IPC commands — +/// SubscribeAlarmsCommand, AcknowledgeAlarmCommand, +/// QueryActiveAlarmsCommand, UnsubscribeAlarmsCommand — +/// to the underlying . Production binding +/// is ; tests substitute a fake. +/// +public interface IAlarmCommandHandler : IDisposable +{ + /// Begin a subscription against the supplied AVEVA alarm-provider expression. + void Subscribe(string subscription, string sessionId); + + /// Tear down the active subscription. No-op if not subscribed. + void Unsubscribe(); + + /// Acknowledge a single alarm by GUID. Returns AVEVA's native status (0 = success). + int Acknowledge( + Guid alarmGuid, + string comment, + string operatorUser, + string operatorNode, + string operatorDomain, + string operatorFullName); + + /// + /// Acknowledge a single alarm by (name, provider, group) — used when + /// the caller has the human-readable reference but not the GUID. + /// + int AcknowledgeByName( + string alarmName, + string providerName, + string groupName, + string comment, + string operatorUser, + string operatorNode, + string operatorDomain, + string operatorFullName); + + /// + /// Snapshot the currently-active alarm set, optionally scoped to a + /// prefix matched against AlarmFullReference. + /// + IReadOnlyList QueryActive(string? alarmFilterPrefix); + + /// + /// Drives a single poll of the underlying alarm consumer on the + /// caller's thread. This is a no-op when there is no active + /// subscription. In production the caller is the worker's STA + /// (marshalled via StaRuntime.InvokeAsync), which satisfies + /// the ThreadingModel=Apartment requirement of + /// wwAlarmConsumerClass. + /// + void PollOnce(); +} diff --git a/src/MxGateway.Worker/MxAccess/MxAccessAlarmEventSink.cs b/src/MxGateway.Worker/MxAccess/MxAccessAlarmEventSink.cs index 59bf61e..10352d5 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessAlarmEventSink.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessAlarmEventSink.cs @@ -11,13 +11,15 @@ namespace MxGateway.Worker.MxAccess; /// /// /// -/// The dispatcher subscribes the consumer's +/// owns the wire-up: it constructs the +/// consumer/sink pair, calls to propagate the +/// session id, and subscribes the consumer's /// event -/// to at session attach time. The -/// override here is a stub kept for the data- -/// session shape; the actual wire-up between consumer and sink -/// lives in the A.3 dispatcher (one step up the stack). Captured -/// payload schema and consumer threading discipline are described in +/// so each decoded transition reaches . +/// The method here carries only the session id — +/// the alarm path needs no COM-event subscription of its own because +/// the consumer already polls and raises transition events. The +/// captured payload schema is described in /// docs/AlarmClientDiscovery.md "Option A — captured". /// /// @@ -47,10 +49,10 @@ public sealed class MxAccessAlarmEventSink : IMxAccessEventSink if (mxAccessComObject is null) throw new ArgumentNullException(nameof(mxAccessComObject)); this.sessionId = sessionId ?? string.Empty; - // PR A.2 — COM-side subscription scaffold. The MXAccess Toolkit alarm - // event source is pinned during dev-rig validation. Until then, the - // worker advertises no alarm subscription; data-change behaviour is - // unaffected. + // The alarm path needs no COM-event subscription here: the wnwrap + // consumer is polled by the worker's STA and raises transition events + // that AlarmDispatcher routes into EnqueueTransition. Attach only + // records the session id stamped onto every emitted MxEvent. attached = true; } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs b/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs index 317747d..cb8b328 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessBaseEventSink.cs @@ -158,6 +158,16 @@ public sealed class MxAccessBaseEventSink : IMxAccessEventSink } catch (Exception exception) { + // Two distinct failures land here, both intentionally fail-fast: + // - A conversion failure from createEvent() — recorded here as an + // MxaccessEventConversionFailed fault. + // - An MxAccessEventQueueOverflowException from Enqueue when the + // queue is at capacity. Per the fail-fast backpressure design + // (docs/DesignDecisions.md) the event is dropped and the queue + // has *already* self-recorded a QueueOverflow fault. Because + // MxAccessEventQueue.RecordFault keeps only the first fault, + // this catch's RecordFault call is then a deliberate near + // no-op rather than a second, conflicting fault. eventQueue.RecordFault(CreateEventConversionFault(exception)); } } diff --git a/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs b/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs index d2a94a5..6712020 100644 --- a/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs +++ b/src/MxGateway.Worker/MxAccess/MxAccessEventMapper.cs @@ -103,12 +103,11 @@ public sealed class MxAccessEventMapper } /// - /// Creates an OnAlarmTransition event from MXAccess COM alarm-event arguments. - /// PR A.2 — proto-build path is mechanical and unit-testable; the COM-side - /// subscription that calls into this method (registering an - /// IAlarmEventSink against the MXAccess Toolkit's alarm provider) is - /// pinned during dev-rig validation since the exact MXAccess Toolkit version - /// installed on the worker host determines the API shape. + /// Creates an OnAlarmTransition event from MXAccess alarm-event arguments. + /// The worker's alarm path drives this method from + /// once + /// decodes a transition raised by the + /// wnwrap-backed . /// /// Identifier of the session. /// Fully-qualified MxAccess alarm reference (e.g. "Tank01.Level.HiHi").