Merge branch 'worktree-agent-ad34cad856c59bbc1' into feat/scripted-alarm-shelve-routing
This commit is contained in:
@@ -316,15 +316,9 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
var result = query.QueryResult;
|
||||
var timestamp = DateTime.SpecifyKind(result.StartDateTime, DateTimeKind.Utc);
|
||||
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
Value = SelectValue(result),
|
||||
TimestampUtc = timestamp,
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
@@ -379,6 +373,12 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
return Task.FromResult(results);
|
||||
}
|
||||
|
||||
// Apply the same bucket cap as the raw-read path so a wide time range with a
|
||||
// small IntervalMs cannot produce an unbounded result set that would overflow
|
||||
// the 16 MiB FrameWriter frame cap and lose the entire reply.
|
||||
var bucketLimit = _config.MaxValuesPerRead;
|
||||
var bucketCount = 0;
|
||||
|
||||
while (query.MoveNext(out error))
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
@@ -392,6 +392,15 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
Value = value,
|
||||
TimestampUtc = timestamp,
|
||||
});
|
||||
|
||||
bucketCount++;
|
||||
if (bucketLimit > 0 && bucketCount >= bucketLimit)
|
||||
{
|
||||
Log.Warning(
|
||||
"HistoryRead aggregate ({Aggregate}): {Tag} truncated at {Limit} buckets — widen IntervalMs or reduce time range",
|
||||
aggregateColumn, tagName, bucketLimit);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
query.EndQuery(out _);
|
||||
@@ -453,15 +462,9 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
if (query.MoveNext(out error))
|
||||
{
|
||||
var result = query.QueryResult;
|
||||
object? value;
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
value = result.StringValue;
|
||||
else
|
||||
value = result.Value;
|
||||
|
||||
results.Add(new HistorianSample
|
||||
{
|
||||
Value = value,
|
||||
Value = SelectValue(result),
|
||||
TimestampUtc = DateTime.SpecifyKind(timestamp, DateTimeKind.Utc),
|
||||
Quality = (byte)(result.OpcQuality & 0xFF),
|
||||
});
|
||||
@@ -574,6 +577,29 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Historian.Wonderware.Backend
|
||||
#pragma warning restore CS0618
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Selects the typed value from a <see cref="HistoryQueryResult"/> row.
|
||||
/// <para>
|
||||
/// <b>SDK limitation:</b> <c>HistoryQueryResult</c> exposes only <c>Value</c>
|
||||
/// (double) and <c>StringValue</c> (string) — there is no tag data-type field on
|
||||
/// the result. The correct approach would be to branch on the tag's declared
|
||||
/// data type, but the bound version of <c>aahClientManaged</c> does not surface
|
||||
/// it per query result. The heuristic below is the best available: prefer
|
||||
/// <c>StringValue</c> only when it is non-empty AND <c>Value</c> is zero,
|
||||
/// because string tags in the Historian SDK always project to <c>Value=0</c>
|
||||
/// while numeric tags may legitimately sample to zero (in which case the SDK
|
||||
/// does not populate <c>StringValue</c>). A numeric tag at exactly zero with a
|
||||
/// non-empty formatted <c>StringValue</c> (e.g. "0.00") would be mis-reported
|
||||
/// as a string; this is a known edge case of the SDK binding.
|
||||
/// </para>
|
||||
/// </summary>
|
||||
private static object? SelectValue(HistoryQueryResult result)
|
||||
{
|
||||
if (!string.IsNullOrEmpty(result.StringValue) && result.Value == 0)
|
||||
return result.StringValue;
|
||||
return result.Value;
|
||||
}
|
||||
|
||||
internal static double? ExtractAggregateValue(AnalogSummaryQueryResult result, string column)
|
||||
{
|
||||
switch (column)
|
||||
|
||||
@@ -153,6 +153,11 @@ public sealed class HistorianFrameHandler : IFrameHandler
|
||||
private async Task HandleWriteAlarmEventsAsync(byte[] body, FrameWriter writer, CancellationToken ct)
|
||||
{
|
||||
var req = MessagePackSerializer.Deserialize<WriteAlarmEventsRequest>(body);
|
||||
|
||||
// MessagePack deserializes an absent or explicit-nil array as null, not Array.Empty.
|
||||
// Normalise here so every path below can safely dereference .Length without an NRE.
|
||||
req.Events ??= Array.Empty<AlarmHistorianEventDto>();
|
||||
|
||||
var reply = new WriteAlarmEventsReply { CorrelationId = req.CorrelationId };
|
||||
|
||||
if (_alarmWriter is null)
|
||||
|
||||
@@ -113,17 +113,62 @@ public sealed class PipeServer : IDisposable
|
||||
}
|
||||
}
|
||||
|
||||
// Backoff sequence for consecutive RunOneConnection failures: 250 ms → 500 ms →
|
||||
// 1 000 ms → 2 000 ms → 4 000 ms → capped at 8 000 ms thereafter.
|
||||
private static readonly TimeSpan[] BackoffSteps =
|
||||
{
|
||||
TimeSpan.FromMilliseconds(250),
|
||||
TimeSpan.FromMilliseconds(500),
|
||||
TimeSpan.FromSeconds(1),
|
||||
TimeSpan.FromSeconds(2),
|
||||
TimeSpan.FromSeconds(4),
|
||||
TimeSpan.FromSeconds(8),
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Maximum consecutive failures before the server gives up and lets the process exit
|
||||
/// so the supervisor (NSSM / SCM) can restart the sidecar cleanly.
|
||||
/// </summary>
|
||||
private const int MaxConsecutiveFailures = 20;
|
||||
|
||||
/// <summary>
|
||||
/// Runs the server continuously, handling one connection at a time. When a connection
|
||||
/// ends (clean or error), accepts the next.
|
||||
/// ends (clean or error), waits with exponential backoff before accepting the next.
|
||||
/// If <see cref="MaxConsecutiveFailures"/> consecutive failures occur the method
|
||||
/// throws so the supervisor can restart the sidecar.
|
||||
/// </summary>
|
||||
public async Task RunAsync(IFrameHandler handler, CancellationToken ct)
|
||||
{
|
||||
var consecutiveFailures = 0;
|
||||
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
try { await RunOneConnectionAsync(handler, ct).ConfigureAwait(false); }
|
||||
try
|
||||
{
|
||||
await RunOneConnectionAsync(handler, ct).ConfigureAwait(false);
|
||||
consecutiveFailures = 0; // a clean connection (even a short-lived one) resets the counter
|
||||
}
|
||||
catch (OperationCanceledException) { break; }
|
||||
catch (Exception ex) { _logger.Error(ex, "Sidecar IPC connection loop error — accepting next"); }
|
||||
catch (Exception ex)
|
||||
{
|
||||
consecutiveFailures++;
|
||||
|
||||
if (consecutiveFailures >= MaxConsecutiveFailures)
|
||||
{
|
||||
_logger.Fatal(ex,
|
||||
"Sidecar IPC connection loop failed {Count} consecutive times — giving up so supervisor can restart",
|
||||
consecutiveFailures);
|
||||
throw;
|
||||
}
|
||||
|
||||
var delay = BackoffSteps[Math.Min(consecutiveFailures - 1, BackoffSteps.Length - 1)];
|
||||
_logger.Error(ex,
|
||||
"Sidecar IPC connection loop error (consecutive failure {Count}/{Max}) — retrying in {Delay}",
|
||||
consecutiveFailures, MaxConsecutiveFailures, delay);
|
||||
|
||||
try { await Task.Delay(delay, ct).ConfigureAwait(false); }
|
||||
catch (OperationCanceledException) { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user