Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 76d35d1b9f | |||
| 27a8d05b7c |
@@ -0,0 +1,162 @@
|
||||
# Code Review Process
|
||||
|
||||
This document describes how to perform a comprehensive, per-module code review of
|
||||
the `lmxopcua` codebase (the ZB.MOM.WW.OtOpcUa OPC UA server) and how to track
|
||||
findings to resolution.
|
||||
|
||||
A **module** is one buildable project under `src/` (e.g.
|
||||
`src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy`) or one test project under `tests/`
|
||||
(e.g. `tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests`). Each module has its
|
||||
own folder under `code-reviews/` containing a single `findings.md`.
|
||||
|
||||
## 1. Before you start
|
||||
|
||||
1. Pick the module to review. Its folder is `code-reviews/<Module>/`, where
|
||||
`<Module>` is the project name with the `ZB.MOM.WW.OtOpcUa.` prefix stripped:
|
||||
- `src/Server/ZB.MOM.WW.OtOpcUa.Server` is reviewed in `code-reviews/Server/`.
|
||||
- `src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy` → `code-reviews/Driver.Galaxy/`.
|
||||
- `src/Core/ZB.MOM.WW.OtOpcUa.Core.Abstractions` → `code-reviews/Core.Abstractions/`.
|
||||
- `tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests` →
|
||||
`code-reviews/Driver.Galaxy.Tests/`.
|
||||
|
||||
The solution `ZB.MOM.WW.OtOpcUa.slnx` enumerates every project; `src/` is
|
||||
grouped into `Core/`, `Server/`, `Drivers/`, `Client/`, and `Tooling/`.
|
||||
2. Identify the design context for the module:
|
||||
- `CLAUDE.md` — project goal, the data-flow architecture, the contained-name
|
||||
vs tag-name concept, and the **Library Preferences** / build & runtime
|
||||
constraints.
|
||||
- `StyleGuide.md` — repository code-style conventions.
|
||||
- The relevant docs under `docs/` and `docs/v2/` — e.g. `docs/OpcUaServer.md`,
|
||||
`docs/AddressSpace.md`, `docs/ReadWriteOperations.md`, `docs/security.md`,
|
||||
`docs/Redundancy.md`, `docs/ScriptedAlarms.md`, `docs/AlarmTracking.md`,
|
||||
`docs/ServiceHosting.md`, `docs/v2/plan.md`, `docs/v2/acl-design.md`,
|
||||
`docs/v2/driver-specs.md`, `docs/v2/driver-stability.md`, the
|
||||
`docs/v2/Galaxy.*.md` set, and the driver notes under `docs/drivers/`.
|
||||
- The auto-memory index at
|
||||
`~/.claude/projects/.../memory/MEMORY.md` records non-obvious project
|
||||
decisions and is worth a scan before a review.
|
||||
3. Record the exact commit being reviewed: `git rev-parse --short HEAD`. Every
|
||||
review is a snapshot — a finding only means something relative to a known
|
||||
commit.
|
||||
4. Open `code-reviews/<Module>/findings.md` (copy it from
|
||||
[`code-reviews/_template/findings.md`](code-reviews/_template/findings.md) if it
|
||||
does not exist yet) and fill in the header table (reviewer, date, commit SHA,
|
||||
status).
|
||||
|
||||
## 2. Review checklist
|
||||
|
||||
Work through **every** category below for the module. A comprehensive review
|
||||
means the checklist is completed even where it produces no findings — record
|
||||
"No issues found" for a category rather than leaving it ambiguous.
|
||||
|
||||
1. **Correctness & logic bugs** — off-by-one, null handling, incorrect
|
||||
conditionals, misuse of APIs, broken edge cases, wrong data-type mapping.
|
||||
2. **OtOpcUa conventions** — the rules in `CLAUDE.md` and `StyleGuide.md`: Galaxy
|
||||
access flows through the in-process `GalaxyDriver` over gRPC to the separately
|
||||
installed `mxaccessgw` gateway — nothing in this repo loads MXAccess COM
|
||||
directly; browse uses **contained names** and runtime read/write uses
|
||||
**tag names** (`tag_name.AttributeName`); authorization decisions happen in
|
||||
`DriverNodeManager` at the server layer, never in driver-level code — drivers
|
||||
only report `SecurityClassification` as metadata; .NET 10 / AnyCPU; Serilog
|
||||
with a rolling daily file sink; xUnit + Shouldly for unit tests; the .NET
|
||||
generic host with `AddWindowsService` for the Server and Admin hosts; the OPC
|
||||
Foundation UA .NET Standard stack for OPC UA; generated code is not
|
||||
hand-edited.
|
||||
3. **Concurrency & thread safety** — shared mutable state, race conditions,
|
||||
correct use of `async`/`await`, locking, disposal races, background-loop and
|
||||
reconnect-supervisor lifetimes.
|
||||
4. **Error handling & resilience** — exception paths, driver/gateway reconnect
|
||||
handling, transient vs permanent error classification, graceful degradation,
|
||||
correct OPC UA `StatusCode`s, address-space rebuild on redeploy.
|
||||
5. **Security** — OPC UA transport security profiles (`SecurityProfileResolver`),
|
||||
LDAP bind authentication and the group→permission mapping
|
||||
(`LdapUserAuthenticator`), ACL enforcement at the `DriverNodeManager` layer,
|
||||
input validation, SQL injection in the `ConfigDb` / Galaxy Repository queries,
|
||||
certificate handling, and secret handling (no logging of credentials, LDAP
|
||||
service-account passwords, or API keys).
|
||||
6. **Performance & resource management** — `IDisposable` disposal, gRPC channel /
|
||||
stream / session lifetimes, buffering and back-pressure on event pumps,
|
||||
unnecessary allocations on hot paths, N+1 queries.
|
||||
7. **Design-document adherence** — does the code match `CLAUDE.md`, the relevant
|
||||
`docs/` and `docs/v2/` designs? Flag both code that drifts from the design and
|
||||
design docs that are now stale.
|
||||
8. **Code organization & conventions** — namespace hierarchy, project layout, the
|
||||
Options pattern, separation of concerns, the capability-interface seams
|
||||
(`IReadable`, `IWritable`, `ISubscribable`, `IAlarmSource`, etc.).
|
||||
9. **Testing coverage** — are the module's behaviours covered? Unit suites are
|
||||
`*.Tests` (xUnit + Shouldly); integration suites are `*.IntegrationTests` and
|
||||
need their Docker fixture up; DB-backed tests in `*.Configuration.Tests`,
|
||||
`*.Admin.Tests`, and `*.Server.Tests` need the central SQL Server. Note
|
||||
untested critical paths and missing edge-case tests.
|
||||
10. **Documentation & comments** — XML doc accuracy, misleading or stale comments,
|
||||
undocumented non-obvious behaviour.
|
||||
|
||||
## 3. Recording findings
|
||||
|
||||
Add one entry per finding to the `## Findings` section of the module's
|
||||
`findings.md`, using the entry format in
|
||||
[`_template/findings.md`](code-reviews/_template/findings.md).
|
||||
|
||||
- **Finding ID** — `<Module>-NNN`, numbered sequentially within the module and
|
||||
never reused (e.g. `Driver.Galaxy-001`). IDs are permanent even after
|
||||
resolution.
|
||||
- **Severity:**
|
||||
- **Critical** — data loss, security breach, crash/deadlock, or outage.
|
||||
- **High** — incorrect behaviour with significant impact; no safe workaround.
|
||||
- **Medium** — incorrect or risky behaviour with limited impact or a workaround.
|
||||
- **Low** — minor issues, style, maintainability, documentation.
|
||||
- **Category** — one of the 10 checklist categories above.
|
||||
- **Location** — `file:line` (clickable), or a list of locations.
|
||||
- **Description** — what is wrong and why it matters.
|
||||
- **Recommendation** — concrete suggested fix.
|
||||
|
||||
After recording findings, update the module header table (status, open-finding
|
||||
count) and regenerate the base README (step 5).
|
||||
|
||||
## 4. Marking an item resolved
|
||||
|
||||
Findings are **never deleted** — they are an audit trail. To close one, change
|
||||
its **Status** and complete the **Resolution** field:
|
||||
|
||||
- `Open` — newly recorded, not yet addressed.
|
||||
- `In Progress` — a fix is actively being worked on.
|
||||
- `Resolved` — fixed. The Resolution field must state the fixing commit SHA, the
|
||||
date, and a one-line description of the fix.
|
||||
- `Won't Fix` — intentionally not fixed. The Resolution field must justify why.
|
||||
- `Deferred` — valid but postponed. The Resolution field must say what it is
|
||||
waiting on (e.g. a tracked issue or a later milestone).
|
||||
|
||||
`Resolved`, `Won't Fix`, and `Deferred` findings are all considered **closed**.
|
||||
`Open` and `In Progress` are **pending** and appear in the base README's Pending
|
||||
Findings table.
|
||||
|
||||
## 5. Updating the base README
|
||||
|
||||
`code-reviews/README.md` holds the single cross-module view (the Module Status
|
||||
table and the Pending / Closed Findings tables). It is **generated** from the
|
||||
per-module `findings.md` files — do not edit it by hand.
|
||||
|
||||
After any review or status change, regenerate it:
|
||||
|
||||
```
|
||||
python code-reviews/regen-readme.py
|
||||
```
|
||||
|
||||
`regen-readme.py --check` exits non-zero if `README.md` is stale, if a module
|
||||
header's `Open findings` count disagrees with its finding statuses, or if a
|
||||
finding carries an unrecognised Status value. The PowerShell wrapper
|
||||
`scripts/check-code-reviews-readme.ps1` runs that check and is the intended hook
|
||||
for CI or a pre-commit step. `code-reviews/test_regen_readme.py` covers the
|
||||
generator itself (`python code-reviews/test_regen_readme.py`).
|
||||
|
||||
> The repo's installed `python` is the real interpreter; the bare `python3`
|
||||
> alias on this box resolves to the Windows Store stub and fails. Use `python`.
|
||||
|
||||
The per-module `findings.md` files are the source of truth; `README.md` is the
|
||||
aggregated index and must always agree with them — which the script guarantees.
|
||||
|
||||
## 6. Re-reviewing a module
|
||||
|
||||
Re-reviews append to the same `findings.md`. Update the header to the new commit
|
||||
and date, continue the finding numbering from the last used ID, and leave prior
|
||||
findings (including closed ones) in place as history.
|
||||
@@ -0,0 +1,2 @@
|
||||
# regen-readme.py / test_regen_readme.py bytecode cache
|
||||
__pycache__/
|
||||
@@ -0,0 +1,25 @@
|
||||
# Code Reviews
|
||||
|
||||
<!-- GENERATED FILE - do not edit by hand. Regenerate with: python code-reviews/regen-readme.py -->
|
||||
|
||||
Cross-module code review index for the OtOpcUa server codebase (`lmxopcua`). The review process is defined in [../REVIEW-PROCESS.md](../REVIEW-PROCESS.md).
|
||||
|
||||
Each module's `findings.md` is the source of truth; this file is generated from them by `regen-readme.py` and must not be edited by hand.
|
||||
|
||||
## Module status
|
||||
|
||||
| Module | Reviewer | Date | Commit | Status | Open | Total |
|
||||
|---|---|---|---|---|---|---|
|
||||
| _no modules reviewed yet_ | | | | | | |
|
||||
|
||||
## Pending findings
|
||||
|
||||
Findings with status `Open` or `In Progress`, ordered by severity.
|
||||
|
||||
_No pending findings._
|
||||
|
||||
## Closed findings
|
||||
|
||||
Findings with status `Resolved`, `Won't Fix`, or `Deferred`.
|
||||
|
||||
_No closed findings._
|
||||
@@ -0,0 +1,53 @@
|
||||
# Code Review — <Module>
|
||||
|
||||
<!-- Template for a per-module findings file. Copy to code-reviews/<Module>/findings.md.
|
||||
See ../../REVIEW-PROCESS.md for the full process. The base README.md is generated
|
||||
from these files by regen-readme.py — do not edit README.md by hand. -->
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Module | `src/<area>/ZB.MOM.WW.OtOpcUa.<Module>` |
|
||||
| Reviewer | <name> |
|
||||
| Review date | <YYYY-MM-DD> |
|
||||
| Commit reviewed | `<short-sha>` |
|
||||
| Status | Not started |
|
||||
| Open findings | 0 |
|
||||
|
||||
## Checklist coverage
|
||||
|
||||
A comprehensive review completes every category, recording "No issues found" where
|
||||
a category produced nothing rather than leaving it blank.
|
||||
|
||||
| # | Category | Result |
|
||||
|---|---|---|
|
||||
| 1 | Correctness & logic bugs | _pending_ |
|
||||
| 2 | OtOpcUa conventions | _pending_ |
|
||||
| 3 | Concurrency & thread safety | _pending_ |
|
||||
| 4 | Error handling & resilience | _pending_ |
|
||||
| 5 | Security | _pending_ |
|
||||
| 6 | Performance & resource management | _pending_ |
|
||||
| 7 | Design-document adherence | _pending_ |
|
||||
| 8 | Code organization & conventions | _pending_ |
|
||||
| 9 | Testing coverage | _pending_ |
|
||||
| 10 | Documentation & comments | _pending_ |
|
||||
|
||||
## Findings
|
||||
|
||||
<!-- One ### entry per finding. IDs are <Module>-NNN, sequential within the module,
|
||||
never reused. Findings are never deleted — close them by changing Status and
|
||||
completing Resolution. -->
|
||||
|
||||
### <Module>-001
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Severity | Critical / High / Medium / Low |
|
||||
| Category | one of the 10 checklist categories |
|
||||
| Location | `path/to/File.cs:NN` |
|
||||
| Status | Open / In Progress / Resolved / Won't Fix / Deferred |
|
||||
|
||||
**Description:** What is wrong and why it matters.
|
||||
|
||||
**Recommendation:** Concrete suggested fix.
|
||||
|
||||
**Resolution:** _(empty until closed; on close, record the fixing commit SHA, the date, and a one-line description of the fix)_
|
||||
@@ -0,0 +1,78 @@
|
||||
# Prompt — resolve open code-review findings
|
||||
|
||||
Reusable orchestration prompt for clearing the `code-reviews/` backlog. Paste it
|
||||
to a fresh agent when you want the remaining findings worked through.
|
||||
|
||||
---
|
||||
|
||||
Resolve all open code-review findings (every severity), following the workflow
|
||||
in `REVIEW-PROCESS.md`.
|
||||
|
||||
## Setup
|
||||
|
||||
- Read `code-reviews/README.md` for the open findings and `REVIEW-PROCESS.md`
|
||||
for the workflow. Group the open findings by module.
|
||||
- A module is one folder under `code-reviews/` — one `src/` project or one
|
||||
`tests/` project, named with the `ZB.MOM.WW.OtOpcUa.` prefix stripped. The
|
||||
module→project mapping is in `REVIEW-PROCESS.md` section 1; the build/test
|
||||
commands are in `CLAUDE.md` ("Build Commands").
|
||||
|
||||
## Dispatch — one general-purpose subagent per module, in batches of ~5 modules
|
||||
|
||||
Each subagent, for every open finding in its assigned module, must:
|
||||
|
||||
- Verify the finding's root cause against the actual source. Do NOT trust the
|
||||
finding text — if it is wrong or misclassified, re-triage it (correct the
|
||||
severity/description in that module's `findings.md`) instead of forcing a fix.
|
||||
- Use real TDD: write the regression test FIRST and run it to confirm it fails,
|
||||
THEN implement the root-cause fix, THEN confirm it passes. (Do not use
|
||||
`git stash` — parallel agents would race on the shared stash stack.)
|
||||
- The regression test belongs in the reviewed project's own test project — a
|
||||
finding in `src/.../ZB.MOM.WW.OtOpcUa.Driver.Galaxy` gets its test in
|
||||
`tests/.../ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests`.
|
||||
- Run that module's build and test suite and confirm it is green:
|
||||
- Build + unit-test the affected project, e.g.
|
||||
`dotnet build src/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy/...` and
|
||||
`dotnet test tests/Drivers/ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests/...`.
|
||||
- A single test: `dotnet test --filter "FullyQualifiedName~MyClass.MyMethod"`.
|
||||
- `*.IntegrationTests` need their Docker fixture up — bring it up with
|
||||
`lmxopcua-fix up <driver> <profile>` (see `CLAUDE.md` "Docker Workflow").
|
||||
DB-backed `*.Configuration.Tests`, `*.Admin.Tests`, and `*.Server.Tests`
|
||||
need the central SQL Server. If a fixture/service is unavailable, document
|
||||
why the suite was skipped rather than reporting it green.
|
||||
- For a change that crosses project boundaries, build each affected project;
|
||||
a whole-solution check is `dotnet build ZB.MOM.WW.OtOpcUa.slnx`.
|
||||
- Update only that module's `code-reviews/<Module>/findings.md`: set each
|
||||
resolved finding's Status to `Resolved` with a Resolution note describing the
|
||||
fix (the orchestrator appends the fixing commit SHA), and update the header
|
||||
"Open findings" count.
|
||||
- CONSTRAINTS: edit only the source and test files needed for the assigned
|
||||
module's findings, plus that module's own `findings.md`. Do NOT edit
|
||||
`code-reviews/README.md`. Do NOT commit. Do NOT touch another module's
|
||||
`findings.md`.
|
||||
- Report a summary: each finding — root-cause confirmation, the fix, test names,
|
||||
and any re-triage.
|
||||
|
||||
Batch so that no two subagents in the same batch write to the same project. In
|
||||
particular do not review a `src/` project and its matching `*.Tests` project in
|
||||
the same batch — a finding in the source project adds its regression test to
|
||||
that test project.
|
||||
|
||||
## After each batch returns (orchestrator does this — keep your own context lean)
|
||||
|
||||
- Build and test every project the batch touched, using the `CLAUDE.md`
|
||||
commands; confirm clean. For a wide change, `dotnet build ZB.MOM.WW.OtOpcUa.slnx`.
|
||||
- Commit per module — one commit per module, message referencing the finding
|
||||
IDs. Record the fixing commit SHA in each finding's Resolution.
|
||||
- Regenerate the index: `python code-reviews/regen-readme.py`, then
|
||||
`python code-reviews/regen-readme.py --check` to confirm it is consistent;
|
||||
stage `code-reviews/README.md`. (Use `python` — the bare `python3` alias on
|
||||
this box resolves to the Windows Store stub and fails.) You may stage
|
||||
`README.md` with each module's commit, or commit it once per batch after the
|
||||
script runs.
|
||||
- Push.
|
||||
|
||||
## Continue
|
||||
|
||||
Continue batch by batch until all findings are Resolved or re-triaged. If a
|
||||
finding needs a design decision, skip it and surface it rather than guessing.
|
||||
@@ -0,0 +1,241 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Regenerate code-reviews/README.md from the per-module findings.md files.
|
||||
|
||||
The per-module findings.md files are the source of truth. This script aggregates
|
||||
them into the single cross-module README.md (module status + pending/closed
|
||||
finding tables).
|
||||
|
||||
Usage:
|
||||
python code-reviews/regen-readme.py # rewrite README.md
|
||||
python code-reviews/regen-readme.py --check # exit 1 if stale or inconsistent
|
||||
|
||||
`--check` fails when README.md is out of date OR when a module's header
|
||||
`Open findings` count disagrees with its finding statuses, or a finding
|
||||
carries an unrecognised Status value.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
ROOT = Path(__file__).resolve().parent
|
||||
README = ROOT / "README.md"
|
||||
|
||||
PENDING_STATUSES = {"Open", "In Progress"}
|
||||
KNOWN_STATUSES = {"Open", "In Progress", "Resolved", "Won't Fix", "Deferred"}
|
||||
SEVERITY_ORDER = {"Critical": 0, "High": 1, "Medium": 2, "Low": 3}
|
||||
|
||||
GENERATED_NOTE = (
|
||||
"<!-- GENERATED FILE - do not edit by hand. "
|
||||
"Regenerate with: python code-reviews/regen-readme.py -->"
|
||||
)
|
||||
|
||||
|
||||
def cell(value: str) -> str:
|
||||
"""Escape a value for safe inclusion in a markdown table cell."""
|
||||
return value.replace("|", "\\|").strip()
|
||||
|
||||
|
||||
def summarize(value: str, limit: int = 240) -> str:
|
||||
"""Trim a long description to a single-cell-friendly summary."""
|
||||
value = value.strip()
|
||||
if len(value) <= limit:
|
||||
return value
|
||||
return value[: limit - 1].rstrip() + "…"
|
||||
|
||||
|
||||
def first_table(text: str) -> dict[str, str]:
|
||||
"""Parse the first contiguous block of '| key | value |' rows into a dict."""
|
||||
rows: dict[str, str] = {}
|
||||
started = False
|
||||
for line in text.splitlines():
|
||||
stripped = line.strip()
|
||||
if stripped.startswith("|"):
|
||||
started = True
|
||||
cells = [c.strip() for c in stripped.strip("|").split("|")]
|
||||
if len(cells) >= 2:
|
||||
key, value = cells[0], cells[1]
|
||||
if key and not set(key) <= {"-", ":"} and key != "Field":
|
||||
rows[key] = value
|
||||
elif started:
|
||||
break
|
||||
return rows
|
||||
|
||||
|
||||
def parse_module(findings_path: Path) -> dict:
|
||||
"""Parse one module's findings.md into its header and finding list."""
|
||||
text = findings_path.read_text(encoding="utf-8")
|
||||
module = findings_path.parent.name
|
||||
parts = re.split(r"^##\s+Findings\s*$", text, maxsplit=1, flags=re.M)
|
||||
header = first_table(parts[0])
|
||||
findings: list[dict] = []
|
||||
if len(parts) > 1:
|
||||
for chunk in re.split(r"^###\s+", parts[1], flags=re.M)[1:]:
|
||||
fid = chunk.splitlines()[0].strip()
|
||||
tbl = first_table(chunk)
|
||||
desc_m = re.search(
|
||||
r"\*\*Description:\*\*\s*(.*?)(?=\n\*\*|\Z)", chunk, re.S
|
||||
)
|
||||
desc = re.sub(r"\s+", " ", desc_m.group(1)).strip() if desc_m else ""
|
||||
findings.append(
|
||||
{
|
||||
"id": fid,
|
||||
"severity": tbl.get("Severity", ""),
|
||||
"category": tbl.get("Category", ""),
|
||||
"location": tbl.get("Location", ""),
|
||||
"status": tbl.get("Status", ""),
|
||||
"description": desc,
|
||||
}
|
||||
)
|
||||
return {"module": module, "header": header, "findings": findings}
|
||||
|
||||
|
||||
def build_readme(modules: list[dict]) -> str:
|
||||
modules = sorted(modules, key=lambda m: m["module"])
|
||||
all_findings = [
|
||||
dict(f, module=m["module"]) for m in modules for f in m["findings"]
|
||||
]
|
||||
pending = [f for f in all_findings if f["status"] in PENDING_STATUSES]
|
||||
closed = [
|
||||
f
|
||||
for f in all_findings
|
||||
if f["status"] and f["status"] not in PENDING_STATUSES
|
||||
]
|
||||
|
||||
def sev_key(f: dict) -> tuple:
|
||||
return (SEVERITY_ORDER.get(f["severity"], 9), f["id"])
|
||||
|
||||
pending.sort(key=sev_key)
|
||||
closed.sort(key=sev_key)
|
||||
|
||||
out: list[str] = [
|
||||
"# Code Reviews",
|
||||
"",
|
||||
GENERATED_NOTE,
|
||||
"",
|
||||
"Cross-module code review index for the OtOpcUa server codebase "
|
||||
"(`lmxopcua`). The review process is defined in "
|
||||
"[../REVIEW-PROCESS.md](../REVIEW-PROCESS.md).",
|
||||
"",
|
||||
"Each module's `findings.md` is the source of truth; this file is generated "
|
||||
"from them by `regen-readme.py` and must not be edited by hand.",
|
||||
"",
|
||||
"## Module status",
|
||||
"",
|
||||
"| Module | Reviewer | Date | Commit | Status | Open | Total |",
|
||||
"|---|---|---|---|---|---|---|",
|
||||
]
|
||||
if not modules:
|
||||
out.append(
|
||||
"| _no modules reviewed yet_ | | | | | | |"
|
||||
)
|
||||
for m in modules:
|
||||
h = m["header"]
|
||||
open_n = sum(
|
||||
1 for f in m["findings"] if f["status"] in PENDING_STATUSES
|
||||
)
|
||||
out.append(
|
||||
f"| [{m['module']}]({m['module']}/findings.md) "
|
||||
f"| {cell(h.get('Reviewer', ''))} "
|
||||
f"| {cell(h.get('Review date', ''))} "
|
||||
f"| {cell(h.get('Commit reviewed', ''))} "
|
||||
f"| {cell(h.get('Status', ''))} "
|
||||
f"| {open_n} | {len(m['findings'])} |"
|
||||
)
|
||||
|
||||
out += ["", "## Pending findings", ""]
|
||||
out.append(
|
||||
"Findings with status `Open` or `In Progress`, ordered by severity."
|
||||
)
|
||||
out.append("")
|
||||
if pending:
|
||||
out.append("| ID | Severity | Category | Location | Description |")
|
||||
out.append("|---|---|---|---|---|")
|
||||
for f in pending:
|
||||
out.append(
|
||||
f"| {cell(f['id'])} | {cell(f['severity'])} "
|
||||
f"| {cell(f['category'])} | {cell(f['location'])} "
|
||||
f"| {cell(summarize(f['description']))} |"
|
||||
)
|
||||
else:
|
||||
out.append("_No pending findings._")
|
||||
|
||||
out += ["", "## Closed findings", ""]
|
||||
out.append("Findings with status `Resolved`, `Won't Fix`, or `Deferred`.")
|
||||
out.append("")
|
||||
if closed:
|
||||
out.append("| ID | Severity | Status | Category | Location |")
|
||||
out.append("|---|---|---|---|---|")
|
||||
for f in closed:
|
||||
out.append(
|
||||
f"| {cell(f['id'])} | {cell(f['severity'])} "
|
||||
f"| {cell(f['status'])} | {cell(f['category'])} "
|
||||
f"| {cell(f['location'])} |"
|
||||
)
|
||||
else:
|
||||
out.append("_No closed findings._")
|
||||
|
||||
return "\n".join(out) + "\n"
|
||||
|
||||
|
||||
def find_inconsistencies(modules: list[dict]) -> list[str]:
|
||||
"""Return human-readable problems in the per-module findings.md files.
|
||||
|
||||
Checks that each module header's `Open findings` count agrees with its
|
||||
finding statuses, and that every finding carries a known Status value.
|
||||
"""
|
||||
issues: list[str] = []
|
||||
for m in modules:
|
||||
open_n = sum(
|
||||
1 for f in m["findings"] if f["status"] in PENDING_STATUSES
|
||||
)
|
||||
declared = m["header"].get("Open findings", "").strip()
|
||||
if declared != str(open_n):
|
||||
issues.append(
|
||||
f"{m['module']}: header 'Open findings' = '{declared}' but "
|
||||
f"{open_n} finding(s) are Open/In Progress"
|
||||
)
|
||||
for f in m["findings"]:
|
||||
if f["status"] not in KNOWN_STATUSES:
|
||||
issues.append(
|
||||
f"{m['module']}: finding {f['id']} has unrecognised "
|
||||
f"Status '{f['status']}'"
|
||||
)
|
||||
return issues
|
||||
|
||||
|
||||
def main(argv: list[str]) -> int:
|
||||
check = "--check" in argv[1:]
|
||||
module_dirs = sorted(
|
||||
d
|
||||
for d in ROOT.iterdir()
|
||||
if d.is_dir() and d.name != "_template" and (d / "findings.md").is_file()
|
||||
)
|
||||
modules = [parse_module(d / "findings.md") for d in module_dirs]
|
||||
content = build_readme(modules)
|
||||
issues = find_inconsistencies(modules)
|
||||
if check:
|
||||
stale = (
|
||||
README.read_text(encoding="utf-8") if README.exists() else ""
|
||||
) != content
|
||||
for issue in issues:
|
||||
print(f"inconsistent: {issue}", file=sys.stderr)
|
||||
if stale:
|
||||
print(
|
||||
"code-reviews/README.md is stale - run regen-readme.py",
|
||||
file=sys.stderr,
|
||||
)
|
||||
if stale or issues:
|
||||
return 1
|
||||
print("code-reviews/README.md is up to date and consistent.")
|
||||
return 0
|
||||
for issue in issues:
|
||||
print(f"warning: {issue}", file=sys.stderr)
|
||||
README.write_text(content, encoding="utf-8", newline="\n")
|
||||
print(f"Wrote {README} ({len(modules)} modules).")
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main(sys.argv))
|
||||
@@ -0,0 +1,165 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for regen-readme.py.
|
||||
|
||||
Dependency-free: run with `python code-reviews/test_regen_readme.py`.
|
||||
Exits 0 if all tests pass, 1 otherwise.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import importlib.util
|
||||
import tempfile
|
||||
import traceback
|
||||
from pathlib import Path
|
||||
|
||||
HERE = Path(__file__).resolve().parent
|
||||
|
||||
# regen-readme.py is not an importable module name (hyphen), so load it by path.
|
||||
_spec = importlib.util.spec_from_file_location("regen_readme", HERE / "regen-readme.py")
|
||||
regen = importlib.util.module_from_spec(_spec)
|
||||
_spec.loader.exec_module(regen)
|
||||
|
||||
FIXTURE = """# Code Review — Demo
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Module | `src/Demo` |
|
||||
| Reviewer | Tester |
|
||||
| Review date | 2026-05-18 |
|
||||
| Commit reviewed | `abc1234` |
|
||||
| Status | Reviewed |
|
||||
| Open findings | 1 |
|
||||
|
||||
## Findings
|
||||
|
||||
### Demo-001
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Severity | High |
|
||||
| Category | Security |
|
||||
| Location | `src/Demo/File.cs:10` |
|
||||
| Status | Open |
|
||||
|
||||
**Description:** A first problem that matters.
|
||||
|
||||
**Recommendation:** Fix it.
|
||||
|
||||
**Resolution:** _(open)_
|
||||
|
||||
### Demo-002
|
||||
|
||||
| Field | Value |
|
||||
|---|---|
|
||||
| Severity | Low |
|
||||
| Category | Documentation & comments |
|
||||
| Location | `src/Demo/File.cs:20` |
|
||||
| Status | Resolved |
|
||||
|
||||
**Description:** A second, minor problem.
|
||||
|
||||
**Recommendation:** Tidy it.
|
||||
|
||||
**Resolution:** Fixed in def5678 on 2026-05-18.
|
||||
"""
|
||||
|
||||
|
||||
def _parse_fixture() -> dict:
|
||||
"""Write FIXTURE to a temp Demo/findings.md and parse it."""
|
||||
with tempfile.TemporaryDirectory() as tmp:
|
||||
path = Path(tmp) / "Demo" / "findings.md"
|
||||
path.parent.mkdir()
|
||||
path.write_text(FIXTURE, encoding="utf-8")
|
||||
return regen.parse_module(path)
|
||||
|
||||
|
||||
def test_first_table_skips_separator_and_field_header():
|
||||
table = regen.first_table("| Field | Value |\n|---|---|\n| Severity | High |\n")
|
||||
assert table == {"Severity": "High"}, table
|
||||
|
||||
|
||||
def test_parse_module_header():
|
||||
m = _parse_fixture()
|
||||
assert m["module"] == "Demo", m["module"]
|
||||
assert m["header"]["Reviewer"] == "Tester"
|
||||
assert m["header"]["Status"] == "Reviewed"
|
||||
assert m["header"]["Open findings"] == "1"
|
||||
|
||||
|
||||
def test_parse_module_findings():
|
||||
m = _parse_fixture()
|
||||
assert len(m["findings"]) == 2, len(m["findings"])
|
||||
first = m["findings"][0]
|
||||
assert first["id"] == "Demo-001"
|
||||
assert first["severity"] == "High"
|
||||
assert first["category"] == "Security"
|
||||
assert first["location"] == "`src/Demo/File.cs:10`"
|
||||
assert first["status"] == "Open"
|
||||
assert first["description"] == "A first problem that matters."
|
||||
assert m["findings"][1]["status"] == "Resolved"
|
||||
|
||||
|
||||
def test_build_readme_splits_pending_and_closed():
|
||||
readme = regen.build_readme([_parse_fixture()])
|
||||
assert "## Pending findings" in readme
|
||||
assert "## Closed findings" in readme
|
||||
pending, closed = readme.split("## Closed findings", 1)
|
||||
assert "Demo-001" in pending # Open -> pending
|
||||
assert "Demo-001" not in closed
|
||||
assert "Demo-002" in closed # Resolved -> closed
|
||||
assert "_No pending findings._" not in pending
|
||||
|
||||
|
||||
def test_build_readme_handles_no_modules():
|
||||
readme = regen.build_readme([])
|
||||
assert "no modules reviewed yet" in readme
|
||||
assert "_No pending findings._" in readme
|
||||
assert "_No closed findings._" in readme
|
||||
|
||||
|
||||
def test_find_inconsistencies_clean_fixture():
|
||||
assert regen.find_inconsistencies([_parse_fixture()]) == []
|
||||
|
||||
|
||||
def test_find_inconsistencies_detects_wrong_open_count():
|
||||
m = _parse_fixture()
|
||||
m["header"]["Open findings"] = "7"
|
||||
issues = regen.find_inconsistencies([m])
|
||||
assert len(issues) == 1 and "Open findings" in issues[0], issues
|
||||
|
||||
|
||||
def test_find_inconsistencies_detects_unknown_status():
|
||||
m = _parse_fixture()
|
||||
m["findings"][0]["status"] = "Bogus"
|
||||
issues = regen.find_inconsistencies([m])
|
||||
# Wrong status also shifts the open count, so expect the status issue present.
|
||||
assert any("unrecognised Status" in i for i in issues), issues
|
||||
|
||||
|
||||
def test_summarize_truncates_long_text():
|
||||
long = "x" * 500
|
||||
out = regen.summarize(long)
|
||||
assert len(out) <= 240 and out.endswith("…"), len(out)
|
||||
assert regen.summarize("short") == "short"
|
||||
|
||||
|
||||
def main() -> int:
|
||||
tests = sorted(
|
||||
(name, fn)
|
||||
for name, fn in globals().items()
|
||||
if name.startswith("test_") and callable(fn)
|
||||
)
|
||||
failed = 0
|
||||
for name, fn in tests:
|
||||
try:
|
||||
fn()
|
||||
print(f"PASS {name}")
|
||||
except Exception: # noqa: BLE001 - test runner reports all failures
|
||||
failed += 1
|
||||
print(f"FAIL {name}")
|
||||
traceback.print_exc()
|
||||
print(f"\n{len(tests) - failed}/{len(tests)} passed.")
|
||||
return 1 if failed else 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -0,0 +1,20 @@
|
||||
# Verifies code-reviews/README.md is regenerated from, and consistent with, the
|
||||
# per-module findings.md files. Intended as a CI / pre-commit gate.
|
||||
#
|
||||
# Exits non-zero when README.md is stale, when a module header's "Open findings"
|
||||
# count disagrees with its finding statuses, or when a finding carries an
|
||||
# unrecognised Status value. See REVIEW-PROCESS.md section 5.
|
||||
|
||||
[CmdletBinding()]
|
||||
param()
|
||||
|
||||
Set-StrictMode -Version Latest
|
||||
$ErrorActionPreference = "Stop"
|
||||
|
||||
$repoRoot = Resolve-Path (Join-Path $PSScriptRoot "..")
|
||||
$script = Join-Path $repoRoot "code-reviews/regen-readme.py"
|
||||
|
||||
# The bare `python3` alias on this platform resolves to the Windows Store stub;
|
||||
# `python` is the real interpreter.
|
||||
& python $script --check
|
||||
exit $LASTEXITCODE
|
||||
@@ -63,14 +63,18 @@ public sealed class GalaxyDriver
|
||||
private EventPump? _eventPump;
|
||||
private readonly Lock _pumpLock = new();
|
||||
|
||||
// PR B.2 — IAlarmSource implementation. Production-side acks route through
|
||||
// GatewayGalaxyAlarmAcknowledger which calls MxGatewayClient.AcknowledgeAlarmAsync
|
||||
// (PR E.2 SDK). Tests inject IGalaxyAlarmAcknowledger via the internal ctor to
|
||||
// exercise the wiring without a running gateway. The alarm event stream is
|
||||
// delivered by EventPump.OnAlarmTransition (PR B.1) — this driver is the
|
||||
// consumer that bridges it onto IAlarmSource.OnAlarmEvent.
|
||||
// IAlarmSource implementation. Production-side acks route through
|
||||
// GatewayGalaxyAlarmAcknowledger which calls the session-less
|
||||
// MxGatewayClient.AcknowledgeAlarmAsync RPC; alarm transitions arrive on the
|
||||
// gateway's session-less StreamAlarms feed via GatewayGalaxyAlarmFeed. Tests inject
|
||||
// IGalaxyAlarmAcknowledger + IGalaxyAlarmFeed via the internal ctor to exercise the
|
||||
// wiring without a running gateway. This driver bridges the feed's OnAlarmTransition
|
||||
// onto IAlarmSource.OnAlarmEvent.
|
||||
private IGalaxyAlarmAcknowledger? _alarmAcknowledger;
|
||||
private IGalaxyAlarmFeed? _alarmFeed;
|
||||
private readonly Lock _alarmHandlersLock = new();
|
||||
private readonly Lock _alarmFeedLock = new();
|
||||
private bool _alarmFeedWired;
|
||||
private readonly HashSet<GalaxyAlarmSubscriptionHandle> _alarmSubscriptions = new();
|
||||
|
||||
// PR 4.W — production runtime owned by InitializeAsync. The driver builds these
|
||||
@@ -118,7 +122,7 @@ public sealed class GalaxyDriver
|
||||
ILogger<GalaxyDriver>? logger = null)
|
||||
: this(driverInstanceId, options,
|
||||
hierarchySource: null, dataReader: null, dataWriter: null, subscriber: null,
|
||||
alarmAcknowledger: null, logger)
|
||||
alarmAcknowledger: null, alarmFeed: null, logger)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -136,6 +140,7 @@ public sealed class GalaxyDriver
|
||||
IGalaxyDataWriter? dataWriter = null,
|
||||
IGalaxySubscriber? subscriber = null,
|
||||
IGalaxyAlarmAcknowledger? alarmAcknowledger = null,
|
||||
IGalaxyAlarmFeed? alarmFeed = null,
|
||||
ILogger<GalaxyDriver>? logger = null)
|
||||
{
|
||||
_driverInstanceId = !string.IsNullOrWhiteSpace(driverInstanceId)
|
||||
@@ -148,6 +153,7 @@ public sealed class GalaxyDriver
|
||||
_dataWriter = dataWriter;
|
||||
_subscriber = subscriber;
|
||||
_alarmAcknowledger = alarmAcknowledger;
|
||||
_alarmFeed = alarmFeed;
|
||||
|
||||
// Forward the aggregator's transitions through IHostConnectivityProbe.
|
||||
_hostStatuses.OnHostStatusChanged += (_, args) => OnHostStatusChanged?.Invoke(this, args);
|
||||
@@ -230,8 +236,12 @@ public sealed class GalaxyDriver
|
||||
_subscriber, _hostStatuses, _logger,
|
||||
bufferedUpdateIntervalMs: _options.MxAccess.PublishingIntervalMs);
|
||||
|
||||
// PR B.2 — wire the alarm acknowledger to the live gateway client.
|
||||
_alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _ownedMxSession, _logger);
|
||||
// Wire the alarm acknowledger + feed to the live gateway client. Both are
|
||||
// session-less — the gateway serves alarms from an always-on central monitor —
|
||||
// so they hang off the owned MxGatewayClient, not the worker session.
|
||||
_alarmAcknowledger ??= new GatewayGalaxyAlarmAcknowledger(_ownedMxClient, _logger);
|
||||
_alarmFeed ??= new GatewayGalaxyAlarmFeed(
|
||||
_ownedMxClient.StreamAlarmsAsync, _logger, _options.MxAccess.ClientName);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@@ -724,13 +734,34 @@ public sealed class GalaxyDriver
|
||||
channelCapacity: _options.MxAccess.EventPumpChannelCapacity,
|
||||
clientName: _options.MxAccess.ClientName);
|
||||
_eventPump.OnDataChange += OnPumpDataChange;
|
||||
_eventPump.OnAlarmTransition += OnPumpAlarmTransition;
|
||||
_eventPump.Start();
|
||||
return _eventPump;
|
||||
}
|
||||
}
|
||||
|
||||
// ===== IAlarmSource (PR B.2) =====
|
||||
// ===== IAlarmSource =====
|
||||
|
||||
/// <summary>
|
||||
/// Start the gateway alarm feed (idempotent) and wire its transitions onto this
|
||||
/// driver's <see cref="OnAlarmEvent"/> bridge. The feed is session-less — it does
|
||||
/// not depend on a data subscription or the <see cref="EventPump"/>.
|
||||
/// </summary>
|
||||
private void EnsureAlarmFeedStarted()
|
||||
{
|
||||
lock (_alarmFeedLock)
|
||||
{
|
||||
if (_alarmFeed is null)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
"GalaxyDriver alarm feed is not wired. InitializeAsync must run (or a feed " +
|
||||
"seam must be injected via the internal ctor) before subscribing to alarms.");
|
||||
}
|
||||
if (_alarmFeedWired) return;
|
||||
_alarmFeed.OnAlarmTransition += OnAlarmFeedTransition;
|
||||
_alarmFeed.Start();
|
||||
_alarmFeedWired = true;
|
||||
}
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
public Task<IAlarmSubscriptionHandle> SubscribeAlarmsAsync(
|
||||
@@ -740,12 +771,11 @@ public sealed class GalaxyDriver
|
||||
ArgumentNullException.ThrowIfNull(sourceNodeIds);
|
||||
|
||||
// The driver doesn't multiplex alarm subscriptions per source-node-id today —
|
||||
// alarm events arrive on the same gateway StreamEvents channel as data-change
|
||||
// events once the gateway emits the new family (PRs A.2 + A.3). The
|
||||
// subscription handle is a sentinel the server uses for symmetric Unsubscribe;
|
||||
// every active handle receives every alarm transition, and the server filters
|
||||
// by source node before raising Part 9 conditions. Same shape AbCip uses.
|
||||
EnsureEventPumpStarted();
|
||||
// every active handle receives every transition off the gateway's session-less
|
||||
// StreamAlarms feed, and the server filters by source node before raising Part 9
|
||||
// conditions. The subscription handle is a sentinel the server uses for
|
||||
// symmetric Unsubscribe. Same shape AbCip uses.
|
||||
EnsureAlarmFeedStarted();
|
||||
var handle = new GalaxyAlarmSubscriptionHandle(Guid.NewGuid().ToString("N"));
|
||||
lock (_alarmHandlersLock)
|
||||
{
|
||||
@@ -809,13 +839,13 @@ public sealed class GalaxyDriver
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Receives <see cref="GalaxyAlarmTransition"/> events from the EventPump and
|
||||
/// reshapes them into <see cref="AlarmEventArgs"/> for OPC UA-side consumers.
|
||||
/// Fires <see cref="OnAlarmEvent"/> only when at least one alarm subscription is
|
||||
/// active so a server that hasn't called <see cref="SubscribeAlarmsAsync"/> yet
|
||||
/// doesn't surface untracked transitions.
|
||||
/// Receives <see cref="GalaxyAlarmTransition"/> events from the gateway alarm
|
||||
/// feed and reshapes them into <see cref="AlarmEventArgs"/> for OPC UA-side
|
||||
/// consumers. Fires <see cref="OnAlarmEvent"/> only when at least one alarm
|
||||
/// subscription is active so a server that hasn't called
|
||||
/// <see cref="SubscribeAlarmsAsync"/> yet doesn't surface untracked transitions.
|
||||
/// </summary>
|
||||
private void OnPumpAlarmTransition(object? sender, GalaxyAlarmTransition transition)
|
||||
private void OnAlarmFeedTransition(object? sender, GalaxyAlarmTransition transition)
|
||||
{
|
||||
GalaxyAlarmSubscriptionHandle? handle;
|
||||
lock (_alarmHandlersLock)
|
||||
@@ -921,6 +951,11 @@ public sealed class GalaxyDriver
|
||||
lock (_pumpLock) { pump = _eventPump; _eventPump = null; }
|
||||
pump?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
|
||||
IGalaxyAlarmFeed? alarmFeed;
|
||||
lock (_alarmFeedLock) { alarmFeed = _alarmFeed; _alarmFeed = null; }
|
||||
try { alarmFeed?.DisposeAsync().AsTask().GetAwaiter().GetResult(); }
|
||||
catch (Exception ex) { _logger.LogWarning(ex, "Alarm feed dispose failed"); }
|
||||
|
||||
_ownedMxSession?.DisposeAsync().AsTask().GetAwaiter().GetResult();
|
||||
_ownedMxSession = null;
|
||||
|
||||
|
||||
@@ -45,12 +45,6 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
private static readonly Counter<long> EventsDropped =
|
||||
Meter.CreateCounter<long>("galaxy.events.dropped", unit: "{event}",
|
||||
description: "MxEvents dropped because the bounded channel was full (newest-dropped).");
|
||||
private static readonly Counter<long> AlarmTransitionsReceived =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_transitions.received", unit: "{event}",
|
||||
description: "OnAlarmTransition events decoded and forwarded to driver-level handlers.");
|
||||
private static readonly Counter<long> AlarmTransitionsDecodingFailures =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_transitions.decoding_failures", unit: "{event}",
|
||||
description: "OnAlarmTransition events that arrived without a populated body or with an unspecified transition kind.");
|
||||
|
||||
private readonly IGalaxySubscriber _subscriber;
|
||||
private readonly SubscriptionRegistry _registry;
|
||||
@@ -66,15 +60,6 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
|
||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||
|
||||
/// <summary>
|
||||
/// Fires for every <see cref="MxEventFamily.OnAlarmTransition"/> event the
|
||||
/// gateway forwards. Decoded into a <see cref="GalaxyAlarmTransition"/> with
|
||||
/// the OPC UA severity bucket already mapped via
|
||||
/// <see cref="MxAccessSeverityMapper"/>. The driver wraps this onto
|
||||
/// <c>IAlarmSource.OnAlarmEvent</c> in PR B.2.
|
||||
/// </summary>
|
||||
internal event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
public EventPump(
|
||||
IGalaxySubscriber subscriber,
|
||||
SubscriptionRegistry registry,
|
||||
@@ -179,13 +164,12 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
case MxEventFamily.OnDataChange:
|
||||
DispatchDataChange(ev);
|
||||
break;
|
||||
case MxEventFamily.OnAlarmTransition:
|
||||
DispatchAlarmTransition(ev);
|
||||
break;
|
||||
default:
|
||||
// OnWriteComplete / OperationComplete / OnBufferedDataChange are filtered
|
||||
// out — write callers get their reply via the InvokeAsync round-trip, not
|
||||
// via the event stream.
|
||||
// OnAlarmTransition is no longer carried on the per-session event stream
|
||||
// — alarms come from the gateway's session-less StreamAlarms feed
|
||||
// (GatewayGalaxyAlarmFeed). OnWriteComplete / OperationComplete /
|
||||
// OnBufferedDataChange are filtered out: write callers get their reply
|
||||
// via the InvokeAsync round-trip, not via the event stream.
|
||||
return;
|
||||
}
|
||||
}
|
||||
@@ -212,73 +196,6 @@ internal sealed class EventPump : IAsyncDisposable
|
||||
}
|
||||
}
|
||||
|
||||
private void DispatchAlarmTransition(MxEvent ev)
|
||||
{
|
||||
// Body absent (e.g. malformed gateway event or worker version skew) — count and
|
||||
// drop. The Part 9 sub-attribute fallback path keeps an alarm functional even
|
||||
// when the rich payload disappears.
|
||||
if (ev.OnAlarmTransition is not { } body)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy OnAlarmTransition event arrived without a populated body (sequence={Sequence}); ignoring.",
|
||||
ev.WorkerSequence);
|
||||
return;
|
||||
}
|
||||
if (body.TransitionKind == AlarmTransitionKind.Unspecified)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy OnAlarmTransition for {AlarmRef} has unspecified transition kind; ignoring.",
|
||||
body.AlarmFullReference);
|
||||
return;
|
||||
}
|
||||
|
||||
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity);
|
||||
var transitionTimestamp = body.TransitionTimestamp is { } tts
|
||||
? tts.ToDateTime()
|
||||
: DateTime.UtcNow;
|
||||
DateTime? originalRaiseTimestamp = body.OriginalRaiseTimestamp is { } orts
|
||||
? orts.ToDateTime()
|
||||
: null;
|
||||
|
||||
var transition = new GalaxyAlarmTransition(
|
||||
AlarmFullReference: body.AlarmFullReference,
|
||||
SourceObjectReference: body.SourceObjectReference,
|
||||
AlarmTypeName: body.AlarmTypeName,
|
||||
TransitionKind: MapTransitionKind(body.TransitionKind),
|
||||
SeverityBucket: bucket,
|
||||
OpcUaSeverity: opcUaSeverity,
|
||||
RawMxAccessSeverity: body.Severity,
|
||||
OriginalRaiseTimestampUtc: originalRaiseTimestamp,
|
||||
TransitionTimestampUtc: transitionTimestamp,
|
||||
OperatorUser: body.OperatorUser,
|
||||
OperatorComment: body.OperatorComment,
|
||||
Category: body.Category,
|
||||
Description: body.Description);
|
||||
|
||||
AlarmTransitionsReceived.Add(1, _clientTag);
|
||||
try
|
||||
{
|
||||
OnAlarmTransition?.Invoke(this, transition);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Galaxy OnAlarmTransition handler threw for {AlarmRef} — continuing.",
|
||||
transition.AlarmFullReference);
|
||||
}
|
||||
}
|
||||
|
||||
private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch
|
||||
{
|
||||
AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise,
|
||||
AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge,
|
||||
AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear,
|
||||
AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger,
|
||||
_ => GalaxyAlarmTransitionKind.Unspecified,
|
||||
};
|
||||
|
||||
private DataValueSnapshot ToSnapshot(MxEvent ev)
|
||||
{
|
||||
var value = MxValueDecoder.Decode(ev.Value);
|
||||
|
||||
+28
-24
@@ -5,26 +5,27 @@ using MxGateway.Contracts.Proto;
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IGalaxyAlarmAcknowledger"/> backed by the
|
||||
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC (PR E.2). Maps the
|
||||
/// reply's protocol status into a thrown exception when the gateway
|
||||
/// reports a non-OK condition; native MxStatus failures inside the reply
|
||||
/// surface as a logged warning so operator workflows aren't blocked by a
|
||||
/// transient MxAccess hiccup.
|
||||
/// Production <see cref="IGalaxyAlarmAcknowledger"/> backed by the session-less
|
||||
/// <c>MxGatewayClient.AcknowledgeAlarmAsync</c> RPC. The updated gateway routes
|
||||
/// acknowledgement through its always-on central alarm monitor, so no worker
|
||||
/// session is involved — the driver supplies only the alarm reference, comment,
|
||||
/// and operator principal.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// A non-OK <see cref="ProtocolStatus"/> means the gateway never reached MXAccess
|
||||
/// (transport / dispatch failure) and is surfaced as a thrown exception. A non-zero
|
||||
/// native ack return code (<c>hresult</c>) means MXAccess itself rejected the ack;
|
||||
/// that is logged as a warning rather than thrown so a transient MXAccess hiccup
|
||||
/// doesn't block the operator workflow — the operator can retry.
|
||||
/// </remarks>
|
||||
internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger
|
||||
{
|
||||
private readonly MxGatewayClient _client;
|
||||
private readonly GalaxyMxSession _session;
|
||||
private readonly ILogger _logger;
|
||||
|
||||
public GatewayGalaxyAlarmAcknowledger(
|
||||
MxGatewayClient client,
|
||||
GalaxyMxSession session,
|
||||
ILogger logger)
|
||||
public GatewayGalaxyAlarmAcknowledger(MxGatewayClient client, ILogger logger)
|
||||
{
|
||||
_client = client ?? throw new ArgumentNullException(nameof(client));
|
||||
_session = session ?? throw new ArgumentNullException(nameof(session));
|
||||
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
|
||||
}
|
||||
|
||||
@@ -36,15 +37,9 @@ internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrEmpty(alarmFullReference);
|
||||
|
||||
var session = _session.Session
|
||||
?? throw new InvalidOperationException(
|
||||
"GatewayGalaxyAlarmAcknowledger requires a connected GalaxyMxSession; underlying gateway session is null.");
|
||||
var sessionId = session.SessionId;
|
||||
|
||||
var reply = await _client.AcknowledgeAlarmAsync(
|
||||
new AcknowledgeAlarmRequest
|
||||
{
|
||||
SessionId = sessionId,
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
AlarmFullReference = alarmFullReference,
|
||||
Comment = comment ?? string.Empty,
|
||||
@@ -52,14 +47,23 @@ internal sealed class GatewayGalaxyAlarmAcknowledger : IGalaxyAlarmAcknowledger
|
||||
},
|
||||
cancellationToken).ConfigureAwait(false);
|
||||
|
||||
if (reply.Status is { Success: 0 } status)
|
||||
// Protocol status — the gateway failed before MXAccess saw the ack. This is a
|
||||
// hard failure: the operator's request was not delivered at all.
|
||||
if (reply.ProtocolStatus is { } proto && proto.Code != ProtocolStatusCode.Ok)
|
||||
{
|
||||
throw new InvalidOperationException(
|
||||
$"Galaxy AcknowledgeAlarm for '{alarmFullReference}' failed at the gateway: "
|
||||
+ $"{proto.Code} {proto.Message}");
|
||||
}
|
||||
|
||||
// hresult is the authoritative native ack return code (0 = success). It is
|
||||
// absent only on a worker protocol violation; with an OK protocol status a
|
||||
// missing value is treated as success.
|
||||
if (reply.HasHresult && reply.Hresult != 0)
|
||||
{
|
||||
// Native MxAccess rejected the ack — log but don't throw. Treat as a
|
||||
// best-effort operator workflow; the operator can retry via the OPC UA
|
||||
// session if necessary.
|
||||
_logger.LogWarning(
|
||||
"Galaxy AcknowledgeAlarm for {AlarmRef} returned MxStatus failure: category={Category} detail={Detail} text={Text}",
|
||||
alarmFullReference, status.Category, status.Detail, status.DiagnosticText);
|
||||
"Galaxy AcknowledgeAlarm for {AlarmRef} returned native ack failure code {Hresult}.",
|
||||
alarmFullReference, reply.Hresult);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,264 @@
|
||||
using System.Diagnostics.Metrics;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using MxGateway.Contracts.Proto;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Production <see cref="IGalaxyAlarmFeed"/> over the gateway's session-less
|
||||
/// <c>StreamAlarms</c> RPC. The stream opens with one <see cref="ActiveAlarmSnapshot"/>
|
||||
/// per currently-active alarm (the ConditionRefresh snapshot), then a
|
||||
/// <c>snapshot_complete</c> sentinel, then a live <see cref="OnAlarmTransitionEvent"/>
|
||||
/// for every subsequent raise / acknowledge / clear. Each message is decoded into a
|
||||
/// <see cref="GalaxyAlarmTransition"/> (severity already bucketed via
|
||||
/// <see cref="MxAccessSeverityMapper"/>) and surfaced on <see cref="OnAlarmTransition"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// The feed is independent of any worker session — the gateway's always-on central
|
||||
/// alarm monitor owns the AVEVA subscription. The driver previously decoded alarm
|
||||
/// transitions off the per-session <c>StreamEvents</c> stream (<see cref="EventPump"/>);
|
||||
/// that path was retired when the gateway moved to the session-less alarm model.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The stream is supplied as a factory delegate (production passes
|
||||
/// <c>MxGatewayClient.StreamAlarmsAsync</c>) so tests can drive synthetic feeds.
|
||||
/// Streaming RPCs are not covered by the client's unary retry pipeline, so the feed
|
||||
/// owns its reconnect: on any non-cancellation stream fault it logs, waits
|
||||
/// <c>reconnectDelay</c>, and re-opens. The gateway re-sends the active-alarm
|
||||
/// snapshot on every re-open, so the OPC UA condition layer sees current state
|
||||
/// after a reconnect.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
internal sealed class GatewayGalaxyAlarmFeed : IGalaxyAlarmFeed
|
||||
{
|
||||
/// <summary>
|
||||
/// Opens a <c>StreamAlarms</c> feed. Matches the method group
|
||||
/// <c>MxGatewayClient.StreamAlarmsAsync</c>.
|
||||
/// </summary>
|
||||
internal delegate IAsyncEnumerable<AlarmFeedMessage> AlarmStreamFactory(
|
||||
StreamAlarmsRequest request, CancellationToken cancellationToken);
|
||||
|
||||
private static readonly TimeSpan DefaultReconnectDelay = TimeSpan.FromSeconds(5);
|
||||
|
||||
// Shares the driver meter name so a host-level MeterListener catches feed counters
|
||||
// alongside the EventPump's. Distinct Meter instance — same name is intentional.
|
||||
private static readonly Meter Meter = new(EventPump.MeterName);
|
||||
private static readonly Counter<long> AlarmTransitionsReceived =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_feed.transitions.received", unit: "{event}",
|
||||
description: "Alarm feed messages decoded and forwarded to driver-level handlers.");
|
||||
private static readonly Counter<long> AlarmTransitionsDecodingFailures =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_feed.transitions.decoding_failures", unit: "{event}",
|
||||
description: "Alarm feed messages dropped for a missing body or unspecified transition kind.");
|
||||
private static readonly Counter<long> AlarmFeedReconnects =
|
||||
Meter.CreateCounter<long>("galaxy.alarm_feed.reconnects", unit: "{reconnect}",
|
||||
description: "Times the alarm feed re-opened its StreamAlarms stream after a transport fault.");
|
||||
|
||||
private readonly AlarmStreamFactory _streamFactory;
|
||||
private readonly ILogger _logger;
|
||||
private readonly string _alarmFilterPrefix;
|
||||
private readonly TimeSpan _reconnectDelay;
|
||||
private readonly KeyValuePair<string, object?> _clientTag;
|
||||
private readonly CancellationTokenSource _cts = new();
|
||||
|
||||
private Task? _loop;
|
||||
private bool _disposed;
|
||||
|
||||
public event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
public GatewayGalaxyAlarmFeed(
|
||||
AlarmStreamFactory streamFactory,
|
||||
ILogger? logger = null,
|
||||
string? clientName = null,
|
||||
string? alarmFilterPrefix = null,
|
||||
TimeSpan? reconnectDelay = null)
|
||||
{
|
||||
_streamFactory = streamFactory ?? throw new ArgumentNullException(nameof(streamFactory));
|
||||
_logger = logger ?? NullLogger.Instance;
|
||||
_alarmFilterPrefix = alarmFilterPrefix ?? string.Empty;
|
||||
_reconnectDelay = reconnectDelay ?? DefaultReconnectDelay;
|
||||
_clientTag = new KeyValuePair<string, object?>("galaxy.client", clientName ?? "<unknown>");
|
||||
}
|
||||
|
||||
public void Start()
|
||||
{
|
||||
ObjectDisposedException.ThrowIf(_disposed, this);
|
||||
if (_loop is not null) return;
|
||||
_loop = Task.Run(() => RunAsync(_cts.Token));
|
||||
}
|
||||
|
||||
private async Task RunAsync(CancellationToken ct)
|
||||
{
|
||||
var firstAttempt = true;
|
||||
while (!ct.IsCancellationRequested)
|
||||
{
|
||||
if (!firstAttempt)
|
||||
{
|
||||
AlarmFeedReconnects.Add(1, _clientTag);
|
||||
}
|
||||
firstAttempt = false;
|
||||
|
||||
try
|
||||
{
|
||||
var request = new StreamAlarmsRequest
|
||||
{
|
||||
ClientCorrelationId = Guid.NewGuid().ToString("N"),
|
||||
AlarmFilterPrefix = _alarmFilterPrefix,
|
||||
};
|
||||
|
||||
await foreach (var message in _streamFactory(request, ct)
|
||||
.WithCancellation(ct).ConfigureAwait(false))
|
||||
{
|
||||
if (ct.IsCancellationRequested) break;
|
||||
Dispatch(message);
|
||||
}
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
return; // clean shutdown
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Galaxy alarm feed stream faulted — reopening in {DelaySeconds}s.",
|
||||
_reconnectDelay.TotalSeconds);
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
await Task.Delay(_reconnectDelay, ct).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (ct.IsCancellationRequested)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void Dispatch(AlarmFeedMessage message)
|
||||
{
|
||||
switch (message.PayloadCase)
|
||||
{
|
||||
case AlarmFeedMessage.PayloadOneofCase.ActiveAlarm:
|
||||
DispatchSnapshotEntry(message.ActiveAlarm);
|
||||
break;
|
||||
case AlarmFeedMessage.PayloadOneofCase.Transition:
|
||||
DispatchTransition(message.Transition);
|
||||
break;
|
||||
case AlarmFeedMessage.PayloadOneofCase.SnapshotComplete:
|
||||
_logger.LogDebug("Galaxy alarm feed active-alarm snapshot complete.");
|
||||
break;
|
||||
default:
|
||||
// Empty oneof — worker / gateway version skew. Count and drop.
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Decode one entry of the initial active-alarm snapshot. Each currently-active
|
||||
/// alarm is surfaced as a transition so the OPC UA Part 9 condition layer sees
|
||||
/// the alarm's present state on (re)connect: an unacknowledged active alarm as
|
||||
/// a <see cref="GalaxyAlarmTransitionKind.Raise"/>, an acknowledged one as a
|
||||
/// <see cref="GalaxyAlarmTransitionKind.Acknowledge"/>.
|
||||
/// </summary>
|
||||
private void DispatchSnapshotEntry(ActiveAlarmSnapshot snapshot)
|
||||
{
|
||||
var kind = snapshot.CurrentState switch
|
||||
{
|
||||
AlarmConditionState.Active => GalaxyAlarmTransitionKind.Raise,
|
||||
AlarmConditionState.ActiveAcked => GalaxyAlarmTransitionKind.Acknowledge,
|
||||
AlarmConditionState.Inactive => GalaxyAlarmTransitionKind.Clear,
|
||||
_ => GalaxyAlarmTransitionKind.Unspecified,
|
||||
};
|
||||
if (kind == GalaxyAlarmTransitionKind.Unspecified)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy alarm feed snapshot entry for {AlarmRef} has unspecified condition state; ignoring.",
|
||||
snapshot.AlarmFullReference);
|
||||
return;
|
||||
}
|
||||
|
||||
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(snapshot.Severity);
|
||||
Raise(new GalaxyAlarmTransition(
|
||||
AlarmFullReference: snapshot.AlarmFullReference,
|
||||
SourceObjectReference: snapshot.SourceObjectReference,
|
||||
AlarmTypeName: snapshot.AlarmTypeName,
|
||||
TransitionKind: kind,
|
||||
SeverityBucket: bucket,
|
||||
OpcUaSeverity: opcUaSeverity,
|
||||
RawMxAccessSeverity: snapshot.Severity,
|
||||
OriginalRaiseTimestampUtc: snapshot.OriginalRaiseTimestamp?.ToDateTime(),
|
||||
TransitionTimestampUtc: snapshot.LastTransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow,
|
||||
OperatorUser: snapshot.OperatorUser,
|
||||
OperatorComment: snapshot.OperatorComment,
|
||||
Category: snapshot.Category,
|
||||
Description: snapshot.Description));
|
||||
}
|
||||
|
||||
private void DispatchTransition(OnAlarmTransitionEvent body)
|
||||
{
|
||||
if (body.TransitionKind == AlarmTransitionKind.Unspecified)
|
||||
{
|
||||
AlarmTransitionsDecodingFailures.Add(1, _clientTag);
|
||||
_logger.LogDebug(
|
||||
"Galaxy alarm feed transition for {AlarmRef} has unspecified transition kind; ignoring.",
|
||||
body.AlarmFullReference);
|
||||
return;
|
||||
}
|
||||
|
||||
var (bucket, opcUaSeverity) = MxAccessSeverityMapper.Map(body.Severity);
|
||||
Raise(new GalaxyAlarmTransition(
|
||||
AlarmFullReference: body.AlarmFullReference,
|
||||
SourceObjectReference: body.SourceObjectReference,
|
||||
AlarmTypeName: body.AlarmTypeName,
|
||||
TransitionKind: MapTransitionKind(body.TransitionKind),
|
||||
SeverityBucket: bucket,
|
||||
OpcUaSeverity: opcUaSeverity,
|
||||
RawMxAccessSeverity: body.Severity,
|
||||
OriginalRaiseTimestampUtc: body.OriginalRaiseTimestamp?.ToDateTime(),
|
||||
TransitionTimestampUtc: body.TransitionTimestamp?.ToDateTime() ?? DateTime.UtcNow,
|
||||
OperatorUser: body.OperatorUser,
|
||||
OperatorComment: body.OperatorComment,
|
||||
Category: body.Category,
|
||||
Description: body.Description));
|
||||
}
|
||||
|
||||
private void Raise(GalaxyAlarmTransition transition)
|
||||
{
|
||||
AlarmTransitionsReceived.Add(1, _clientTag);
|
||||
try
|
||||
{
|
||||
OnAlarmTransition?.Invoke(this, transition);
|
||||
}
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"Galaxy alarm feed OnAlarmTransition handler threw for {AlarmRef} — continuing.",
|
||||
transition.AlarmFullReference);
|
||||
}
|
||||
}
|
||||
|
||||
private static GalaxyAlarmTransitionKind MapTransitionKind(AlarmTransitionKind kind) => kind switch
|
||||
{
|
||||
AlarmTransitionKind.Raise => GalaxyAlarmTransitionKind.Raise,
|
||||
AlarmTransitionKind.Acknowledge => GalaxyAlarmTransitionKind.Acknowledge,
|
||||
AlarmTransitionKind.Clear => GalaxyAlarmTransitionKind.Clear,
|
||||
AlarmTransitionKind.Retrigger => GalaxyAlarmTransitionKind.Retrigger,
|
||||
_ => GalaxyAlarmTransitionKind.Unspecified,
|
||||
};
|
||||
|
||||
public async ValueTask DisposeAsync()
|
||||
{
|
||||
if (_disposed) return;
|
||||
_disposed = true;
|
||||
_cts.Cancel();
|
||||
if (_loop is not null)
|
||||
{
|
||||
try { await _loop.ConfigureAwait(false); } catch { /* shutdown */ }
|
||||
}
|
||||
_cts.Dispose();
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,29 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Driver-side seam for the gateway's session-less alarm feed. Production wraps
|
||||
/// <c>MxGatewayClient.StreamAlarmsAsync</c> (<see cref="GatewayGalaxyAlarmFeed"/>);
|
||||
/// tests substitute a fake to drive synthetic <see cref="GalaxyAlarmTransition"/>
|
||||
/// events through <see cref="GalaxyDriver"/>'s <c>IAlarmSource</c> bridge without a
|
||||
/// running gateway.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// The feed is independent of any worker session — the updated gateway serves
|
||||
/// alarms from an always-on central monitor, so the feed survives subscription
|
||||
/// churn and reconnects its own stream on transient transport failures.
|
||||
/// </remarks>
|
||||
internal interface IGalaxyAlarmFeed : IAsyncDisposable
|
||||
{
|
||||
/// <summary>
|
||||
/// Fires for every alarm transition the gateway feed delivers — both the
|
||||
/// entries of the initial active-alarm snapshot and every subsequent live
|
||||
/// raise / acknowledge / clear. The OPC UA severity bucket is already mapped.
|
||||
/// </summary>
|
||||
event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
/// <summary>
|
||||
/// Start consuming the alarm feed on a background task. Idempotent — second
|
||||
/// calls are no-ops while the loop is running.
|
||||
/// </summary>
|
||||
void Start();
|
||||
}
|
||||
+45
-76
@@ -1,6 +1,3 @@
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
@@ -10,49 +7,40 @@ using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// PR E.7 — pins that the GalaxyDriver populates the extended AlarmEventArgs
|
||||
/// fields (OperatorComment, OriginalRaiseTimestampUtc, AlarmCategory) when the
|
||||
/// gateway emits a transition with the rich payload, and leaves them null on
|
||||
/// events that don't carry them.
|
||||
/// Pins that the GalaxyDriver populates the extended AlarmEventArgs fields
|
||||
/// (OperatorComment, OriginalRaiseTimestampUtc, AlarmCategory) when the gateway
|
||||
/// alarm feed delivers a transition with the rich payload, and leaves them null on
|
||||
/// transitions that don't carry them.
|
||||
/// </summary>
|
||||
public sealed class GalaxyDriverAlarmEventArgsExtensionTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Acknowledge_transition_with_full_payload_populates_extended_fields()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
using var driver = NewDriver(subscriber);
|
||||
var feed = new FakeAlarmFeed();
|
||||
using var driver = NewDriver(feed);
|
||||
|
||||
await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc);
|
||||
var ack = raise.AddSeconds(45);
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
SourceObjectReference = "Tank01",
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Acknowledge,
|
||||
Severity = 750,
|
||||
OriginalRaiseTimestamp = Timestamp.FromDateTime(raise),
|
||||
TransitionTimestamp = Timestamp.FromDateTime(ack),
|
||||
OperatorUser = "alice",
|
||||
OperatorComment = "investigating",
|
||||
Category = "Process",
|
||||
Description = "Tank 01 high-high level",
|
||||
},
|
||||
});
|
||||
feed.Emit(new GalaxyAlarmTransition(
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
SourceObjectReference: "Tank01",
|
||||
AlarmTypeName: "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind: GalaxyAlarmTransitionKind.Acknowledge,
|
||||
SeverityBucket: AlarmSeverity.Critical,
|
||||
OpcUaSeverity: 800,
|
||||
RawMxAccessSeverity: 750,
|
||||
OriginalRaiseTimestampUtc: raise,
|
||||
TransitionTimestampUtc: ack,
|
||||
OperatorUser: "alice",
|
||||
OperatorComment: "investigating",
|
||||
Category: "Process",
|
||||
Description: "Tank 01 high-high level"));
|
||||
|
||||
for (var i = 0; i < 20 && observed.Count == 0; i++)
|
||||
{
|
||||
await Task.Delay(50);
|
||||
}
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].OperatorComment.ShouldBe("investigating");
|
||||
observed[0].OriginalRaiseTimestampUtc.ShouldBe(raise);
|
||||
@@ -62,38 +50,35 @@ public sealed class GalaxyDriverAlarmEventArgsExtensionTests
|
||||
[Fact]
|
||||
public async Task Raise_transition_without_optional_fields_leaves_them_null()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
using var driver = NewDriver(subscriber);
|
||||
var feed = new FakeAlarmFeed();
|
||||
using var driver = NewDriver(feed);
|
||||
|
||||
await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 750,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
},
|
||||
});
|
||||
feed.Emit(new GalaxyAlarmTransition(
|
||||
AlarmFullReference: "Tank01.Level.HiHi",
|
||||
SourceObjectReference: string.Empty,
|
||||
AlarmTypeName: "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind: GalaxyAlarmTransitionKind.Raise,
|
||||
SeverityBucket: AlarmSeverity.Critical,
|
||||
OpcUaSeverity: 800,
|
||||
RawMxAccessSeverity: 750,
|
||||
OriginalRaiseTimestampUtc: null,
|
||||
TransitionTimestampUtc: DateTime.UtcNow,
|
||||
OperatorUser: string.Empty,
|
||||
OperatorComment: string.Empty,
|
||||
Category: string.Empty,
|
||||
Description: string.Empty));
|
||||
|
||||
for (var i = 0; i < 20 && observed.Count == 0; i++)
|
||||
{
|
||||
await Task.Delay(50);
|
||||
}
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].OperatorComment.ShouldBeNull();
|
||||
observed[0].OriginalRaiseTimestampUtc.ShouldBeNull();
|
||||
observed[0].AlarmCategory.ShouldBeNull();
|
||||
}
|
||||
|
||||
private static GalaxyDriver NewDriver(ManualSubscriber subscriber)
|
||||
private static GalaxyDriver NewDriver(IGalaxyAlarmFeed feed)
|
||||
{
|
||||
var options = new GalaxyDriverOptions(
|
||||
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
|
||||
@@ -104,35 +89,19 @@ public sealed class GalaxyDriverAlarmEventArgsExtensionTests
|
||||
driverInstanceId: "drv-1",
|
||||
options: options,
|
||||
hierarchySource: null,
|
||||
dataReader: null,
|
||||
dataWriter: null,
|
||||
subscriber: subscriber,
|
||||
alarmAcknowledger: null);
|
||||
alarmFeed: feed);
|
||||
}
|
||||
|
||||
private sealed class ManualSubscriber : IGalaxySubscriber
|
||||
/// <summary>In-memory <see cref="IGalaxyAlarmFeed"/> the test drives directly.</summary>
|
||||
private sealed class FakeAlarmFeed : IGalaxyAlarmFeed
|
||||
{
|
||||
private readonly Channel<MxEvent> _stream =
|
||||
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
||||
public event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new List<SubscribeResult>();
|
||||
var nextHandle = 100;
|
||||
foreach (var r in fullReferences)
|
||||
{
|
||||
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
|
||||
}
|
||||
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
|
||||
}
|
||||
public void Start() { }
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
public void Emit(GalaxyAlarmTransition transition)
|
||||
=> OnAlarmTransition?.Invoke(this, transition);
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
||||
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
||||
|
||||
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
}
|
||||
|
||||
+71
-107
@@ -1,6 +1,3 @@
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
@@ -10,51 +7,31 @@ using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests;
|
||||
|
||||
/// <summary>
|
||||
/// PR B.2 — pins GalaxyDriver's IAlarmSource implementation. The driver bridges
|
||||
/// EventPump.OnAlarmTransition (PR B.1) onto IAlarmSource.OnAlarmEvent and
|
||||
/// forwards Acknowledge through IGalaxyAlarmAcknowledger (production:
|
||||
/// GatewayGalaxyAlarmAcknowledger calling the gateway's AcknowledgeAlarm RPC
|
||||
/// from PR E.2).
|
||||
/// Pins GalaxyDriver's <c>IAlarmSource</c> implementation. The driver bridges the
|
||||
/// gateway's session-less alarm feed (<see cref="IGalaxyAlarmFeed"/>, production:
|
||||
/// <c>GatewayGalaxyAlarmFeed</c>) onto <c>IAlarmSource.OnAlarmEvent</c> and forwards
|
||||
/// Acknowledge through <see cref="IGalaxyAlarmAcknowledger"/> (production:
|
||||
/// <c>GatewayGalaxyAlarmAcknowledger</c> calling the session-less
|
||||
/// <c>AcknowledgeAlarm</c> RPC).
|
||||
/// </summary>
|
||||
public sealed class GalaxyDriverAlarmSourceTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task SubscribeAlarmsAsync_returns_handle_and_event_fires_after_pump_alarm()
|
||||
public async Task SubscribeAlarmsAsync_starts_feed_and_event_fires_on_transition()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
// Subscribe so OnAlarmEvent has a registered handle to fire under.
|
||||
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
handle.ShouldNotBeNull();
|
||||
feed.Started.ShouldBeTrue("SubscribeAlarmsAsync must start the alarm feed");
|
||||
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
|
||||
// SubscribeAsync to start the EventPump (alarm wiring is lazy on first sub).
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
SourceObjectReference = "Tank01",
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 750,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
Description = "Tank 01 high-high level",
|
||||
},
|
||||
});
|
||||
|
||||
// Drain pump events.
|
||||
for (var i = 0; i < 20 && observed.Count == 0; i++)
|
||||
{
|
||||
await Task.Delay(50);
|
||||
}
|
||||
feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01",
|
||||
GalaxyAlarmTransitionKind.Raise, AlarmSeverity.Critical));
|
||||
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].ConditionId.ShouldBe("Tank01.Level.HiHi");
|
||||
@@ -65,30 +42,19 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task OnAlarmEvent_does_not_fire_when_no_subscription_active()
|
||||
public void OnAlarmEvent_does_not_fire_before_any_alarm_subscription()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
|
||||
// Start the pump via a data subscription so alarm events flow but no alarm
|
||||
// subscription is registered → OnAlarmEvent is suppressed.
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 600,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
},
|
||||
});
|
||||
await Task.Delay(150);
|
||||
// No SubscribeAlarmsAsync → the feed is never wired onto the driver, so a
|
||||
// transition surfaces nowhere.
|
||||
feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01",
|
||||
GalaxyAlarmTransitionKind.Raise, AlarmSeverity.High));
|
||||
|
||||
observed.ShouldBeEmpty();
|
||||
}
|
||||
@@ -96,29 +62,20 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
[Fact]
|
||||
public async Task UnsubscribeAlarmsAsync_stops_event_flow()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
var handle = await driver.SubscribeAlarmsAsync(["Tank01"], CancellationToken.None);
|
||||
var observed = new List<AlarmEventArgs>();
|
||||
driver.OnAlarmEvent += (_, args) => observed.Add(args);
|
||||
await driver.SubscribeAsync(["Tank01.Level"], TimeSpan.Zero, CancellationToken.None);
|
||||
|
||||
await driver.UnsubscribeAlarmsAsync(handle, CancellationToken.None);
|
||||
|
||||
await subscriber.EmitAlarmAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = "Tank01.Level.HiHi",
|
||||
TransitionKind = AlarmTransitionKind.Raise,
|
||||
Severity = 600,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
},
|
||||
});
|
||||
await Task.Delay(150);
|
||||
// The feed keeps running (it is session-less and shared), but with no active
|
||||
// subscription the driver suppresses the bridged event.
|
||||
feed.Emit(NewTransition("Tank01.Level.HiHi", "Tank01",
|
||||
GalaxyAlarmTransitionKind.Raise, AlarmSeverity.High));
|
||||
|
||||
observed.ShouldBeEmpty();
|
||||
}
|
||||
@@ -126,9 +83,9 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
[Fact]
|
||||
public async Task UnsubscribeAlarmsAsync_throws_for_foreign_handle()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
var foreignHandle = new ForeignAlarmHandle();
|
||||
await Should.ThrowAsync<ArgumentException>(() =>
|
||||
@@ -138,9 +95,9 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_routes_each_request_to_the_acknowledger()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
var requests = new[]
|
||||
{
|
||||
@@ -159,9 +116,9 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_falls_back_to_SourceNodeId_when_ConditionId_empty()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var feed = new FakeAlarmFeed();
|
||||
var ack = new RecordingAcknowledger();
|
||||
using var driver = NewDriver(subscriber, ack);
|
||||
using var driver = NewDriver(feed, ack);
|
||||
|
||||
await driver.AcknowledgeAsync(
|
||||
[new AlarmAcknowledgeRequest("Tank01.Level.HiHi", string.Empty, null)],
|
||||
@@ -173,8 +130,8 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
[Fact]
|
||||
public async Task AcknowledgeAsync_throws_NotSupported_without_acknowledger()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
using var driver = NewDriver(subscriber, alarmAcknowledger: null);
|
||||
var feed = new FakeAlarmFeed();
|
||||
using var driver = NewDriver(feed, alarmAcknowledger: null);
|
||||
|
||||
await Should.ThrowAsync<NotSupportedException>(() =>
|
||||
driver.AcknowledgeAsync(
|
||||
@@ -183,7 +140,7 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
}
|
||||
|
||||
private static GalaxyDriver NewDriver(
|
||||
ManualSubscriber subscriber, IGalaxyAlarmAcknowledger? alarmAcknowledger)
|
||||
IGalaxyAlarmFeed alarmFeed, IGalaxyAlarmAcknowledger? alarmAcknowledger)
|
||||
{
|
||||
var options = new GalaxyDriverOptions(
|
||||
new GalaxyGatewayOptions("http://localhost:5000", "literal-api-key"),
|
||||
@@ -194,10 +151,43 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
driverInstanceId: "drv-1",
|
||||
options: options,
|
||||
hierarchySource: null,
|
||||
dataReader: null,
|
||||
dataWriter: null,
|
||||
subscriber: subscriber,
|
||||
alarmAcknowledger: alarmAcknowledger);
|
||||
alarmAcknowledger: alarmAcknowledger,
|
||||
alarmFeed: alarmFeed);
|
||||
}
|
||||
|
||||
private static GalaxyAlarmTransition NewTransition(
|
||||
string alarmFullReference,
|
||||
string sourceObjectReference,
|
||||
GalaxyAlarmTransitionKind kind,
|
||||
AlarmSeverity severity)
|
||||
=> new(
|
||||
AlarmFullReference: alarmFullReference,
|
||||
SourceObjectReference: sourceObjectReference,
|
||||
AlarmTypeName: "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind: kind,
|
||||
SeverityBucket: severity,
|
||||
OpcUaSeverity: 800,
|
||||
RawMxAccessSeverity: 750,
|
||||
OriginalRaiseTimestampUtc: null,
|
||||
TransitionTimestampUtc: DateTime.UtcNow,
|
||||
OperatorUser: string.Empty,
|
||||
OperatorComment: string.Empty,
|
||||
Category: "Process",
|
||||
Description: "Tank 01 high-high level");
|
||||
|
||||
/// <summary>In-memory <see cref="IGalaxyAlarmFeed"/> the test drives directly.</summary>
|
||||
private sealed class FakeAlarmFeed : IGalaxyAlarmFeed
|
||||
{
|
||||
public bool Started { get; private set; }
|
||||
|
||||
public event EventHandler<GalaxyAlarmTransition>? OnAlarmTransition;
|
||||
|
||||
public void Start() => Started = true;
|
||||
|
||||
public void Emit(GalaxyAlarmTransition transition)
|
||||
=> OnAlarmTransition?.Invoke(this, transition);
|
||||
|
||||
public ValueTask DisposeAsync() => ValueTask.CompletedTask;
|
||||
}
|
||||
|
||||
private sealed class RecordingAcknowledger : IGalaxyAlarmAcknowledger
|
||||
@@ -215,30 +205,4 @@ public sealed class GalaxyDriverAlarmSourceTests
|
||||
{
|
||||
public string DiagnosticId => "foreign";
|
||||
}
|
||||
|
||||
private sealed class ManualSubscriber : IGalaxySubscriber
|
||||
{
|
||||
private readonly Channel<MxEvent> _stream =
|
||||
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
||||
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
{
|
||||
var results = new List<SubscribeResult>();
|
||||
var nextHandle = 100;
|
||||
foreach (var r in fullReferences)
|
||||
{
|
||||
results.Add(new SubscribeResult { TagAddress = r, ItemHandle = nextHandle++, WasSuccessful = true });
|
||||
}
|
||||
return Task.FromResult<IReadOnlyList<SubscribeResult>>(results);
|
||||
}
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
||||
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
||||
|
||||
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,239 +0,0 @@
|
||||
using System.Threading.Channels;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// PR B.1 — pins the EventPump's OnAlarmTransition decode path. Synthetic MxEvents
|
||||
/// with the new family go in; the pump fires <c>OnAlarmTransition</c> with the
|
||||
/// decoded payload + mapped severity bucket; data-change subscribers stay
|
||||
/// untouched.
|
||||
/// </summary>
|
||||
public sealed class EventPumpAlarmTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Dispatches_raise_acknowledge_clear_in_sequence()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var registry = new SubscriptionRegistry();
|
||||
var transitions = new List<GalaxyAlarmTransition>();
|
||||
var dispatched = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "AlarmTest");
|
||||
pump.OnAlarmTransition += (_, transition) =>
|
||||
{
|
||||
lock (transitions)
|
||||
{
|
||||
transitions.Add(transition);
|
||||
if (transitions.Count == 3) dispatched.TrySetResult(true);
|
||||
}
|
||||
};
|
||||
pump.Start();
|
||||
|
||||
var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc);
|
||||
var ack = raise.AddSeconds(30);
|
||||
var clear = ack.AddSeconds(60);
|
||||
|
||||
await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi",
|
||||
AlarmTransitionKind.Raise, severity: 750, transitionTime: raise));
|
||||
await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi",
|
||||
AlarmTransitionKind.Acknowledge, severity: 750, transitionTime: ack,
|
||||
originalRaise: raise, operatorUser: "alice", operatorComment: "investigating"));
|
||||
await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi",
|
||||
AlarmTransitionKind.Clear, severity: 750, transitionTime: clear,
|
||||
originalRaise: raise));
|
||||
|
||||
var completed = await Task.WhenAny(dispatched.Task, Task.Delay(TimeSpan.FromSeconds(2)));
|
||||
completed.ShouldBe(dispatched.Task, "all three alarm transitions should dispatch within 2s");
|
||||
|
||||
transitions.Count.ShouldBe(3);
|
||||
|
||||
transitions[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise);
|
||||
transitions[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical);
|
||||
transitions[0].OpcUaSeverity.ShouldBe(MxAccessSeverityMapper.OpcUaSeverityCritical);
|
||||
transitions[0].RawMxAccessSeverity.ShouldBe(750);
|
||||
transitions[0].TransitionTimestampUtc.ShouldBe(raise);
|
||||
|
||||
transitions[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge);
|
||||
transitions[1].OperatorUser.ShouldBe("alice");
|
||||
transitions[1].OperatorComment.ShouldBe("investigating");
|
||||
transitions[1].OriginalRaiseTimestampUtc.ShouldBe(raise);
|
||||
|
||||
transitions[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear);
|
||||
transitions[2].OriginalRaiseTimestampUtc.ShouldBe(raise);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Drops_alarm_event_with_unspecified_transition_kind()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var registry = new SubscriptionRegistry();
|
||||
var transitions = new List<GalaxyAlarmTransition>();
|
||||
|
||||
await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest");
|
||||
pump.OnAlarmTransition += (_, transition) => transitions.Add(transition);
|
||||
pump.Start();
|
||||
|
||||
await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi",
|
||||
AlarmTransitionKind.Unspecified, severity: 100,
|
||||
transitionTime: DateTime.UtcNow));
|
||||
|
||||
// Give the pump a beat to drain the channel.
|
||||
await Task.Delay(150);
|
||||
|
||||
transitions.ShouldBeEmpty("alarm transitions with Unspecified kind are decoder failures and must not fire OnAlarmTransition");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Drops_alarm_event_with_missing_body()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var registry = new SubscriptionRegistry();
|
||||
var transitions = new List<GalaxyAlarmTransition>();
|
||||
|
||||
await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "AlarmTest");
|
||||
pump.OnAlarmTransition += (_, transition) => transitions.Add(transition);
|
||||
pump.Start();
|
||||
|
||||
// Family marked as alarm-transition but body left empty (worker version skew /
|
||||
// malformed event). Production should count + drop, not throw.
|
||||
await subscriber.EmitRawAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
WorkerSequence = 42,
|
||||
});
|
||||
|
||||
await Task.Delay(150);
|
||||
|
||||
transitions.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Mixed_data_change_and_alarm_events_dispatch_independently()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var registry = new SubscriptionRegistry();
|
||||
registry.Register(1, [new TagBinding("Tank01.Level", ItemHandle: 7)]);
|
||||
|
||||
var dataChanges = new List<DataChangeEventArgs>();
|
||||
var alarms = new List<GalaxyAlarmTransition>();
|
||||
var bothSeen = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
await using var pump = new EventPump(subscriber, registry, channelCapacity: 16, clientName: "MixedTest");
|
||||
pump.OnDataChange += (_, args) =>
|
||||
{
|
||||
lock (dataChanges)
|
||||
{
|
||||
dataChanges.Add(args);
|
||||
if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true);
|
||||
}
|
||||
};
|
||||
pump.OnAlarmTransition += (_, transition) =>
|
||||
{
|
||||
lock (alarms)
|
||||
{
|
||||
alarms.Add(transition);
|
||||
if (dataChanges.Count >= 1 && alarms.Count >= 1) bothSeen.TrySetResult(true);
|
||||
}
|
||||
};
|
||||
pump.Start();
|
||||
|
||||
await subscriber.EmitAsync(itemHandle: 7, value: 41.0);
|
||||
await subscriber.EmitAlarmAsync(NewAlarm("Tank01.Level.HiHi",
|
||||
AlarmTransitionKind.Raise, severity: 600, transitionTime: DateTime.UtcNow));
|
||||
|
||||
var completed = await Task.WhenAny(bothSeen.Task, Task.Delay(TimeSpan.FromSeconds(2)));
|
||||
completed.ShouldBe(bothSeen.Task);
|
||||
|
||||
dataChanges.Count.ShouldBe(1);
|
||||
alarms.Count.ShouldBe(1);
|
||||
alarms[0].SeverityBucket.ShouldBe(AlarmSeverity.High);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Filters_out_unsupported_event_families()
|
||||
{
|
||||
var subscriber = new ManualSubscriber();
|
||||
var registry = new SubscriptionRegistry();
|
||||
var transitions = new List<GalaxyAlarmTransition>();
|
||||
|
||||
await using var pump = new EventPump(subscriber, registry, channelCapacity: 4, clientName: "FilterTest");
|
||||
pump.OnAlarmTransition += (_, transition) => transitions.Add(transition);
|
||||
pump.Start();
|
||||
|
||||
// OnWriteComplete and OperationComplete should be silently dropped.
|
||||
await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OnWriteComplete });
|
||||
await subscriber.EmitRawAsync(new MxEvent { Family = MxEventFamily.OperationComplete });
|
||||
|
||||
await Task.Delay(150);
|
||||
|
||||
transitions.ShouldBeEmpty();
|
||||
}
|
||||
|
||||
private static MxEvent NewAlarm(
|
||||
string fullReference,
|
||||
AlarmTransitionKind kind,
|
||||
int severity,
|
||||
DateTime transitionTime,
|
||||
DateTime? originalRaise = null,
|
||||
string operatorUser = "",
|
||||
string operatorComment = "")
|
||||
{
|
||||
var body = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = fullReference,
|
||||
SourceObjectReference = fullReference.Split('.')[0],
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = kind,
|
||||
Severity = severity,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(transitionTime),
|
||||
OperatorUser = operatorUser,
|
||||
OperatorComment = operatorComment,
|
||||
Category = "Process",
|
||||
Description = "Tank 01 high-high level",
|
||||
};
|
||||
if (originalRaise is { } orts)
|
||||
{
|
||||
body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts);
|
||||
}
|
||||
return new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnAlarmTransition,
|
||||
OnAlarmTransition = body,
|
||||
};
|
||||
}
|
||||
|
||||
private sealed class ManualSubscriber : IGalaxySubscriber
|
||||
{
|
||||
private readonly Channel<MxEvent> _stream =
|
||||
Channel.CreateUnbounded<MxEvent>(new UnboundedChannelOptions { SingleReader = true });
|
||||
|
||||
public Task<IReadOnlyList<SubscribeResult>> SubscribeBulkAsync(
|
||||
IReadOnlyList<string> fullReferences, int bufferedUpdateIntervalMs, CancellationToken cancellationToken)
|
||||
=> Task.FromResult<IReadOnlyList<SubscribeResult>>([]);
|
||||
|
||||
public Task UnsubscribeBulkAsync(IReadOnlyList<int> itemHandles, CancellationToken cancellationToken)
|
||||
=> Task.CompletedTask;
|
||||
|
||||
public IAsyncEnumerable<MxEvent> StreamEventsAsync(CancellationToken cancellationToken)
|
||||
=> _stream.Reader.ReadAllAsync(cancellationToken);
|
||||
|
||||
public ValueTask EmitAsync(int itemHandle, double value) =>
|
||||
_stream.Writer.WriteAsync(new MxEvent
|
||||
{
|
||||
Family = MxEventFamily.OnDataChange,
|
||||
ItemHandle = itemHandle,
|
||||
Value = new MxValue { DoubleValue = value },
|
||||
Quality = 192,
|
||||
SourceTimestamp = Timestamp.FromDateTime(DateTime.UtcNow),
|
||||
});
|
||||
|
||||
public ValueTask EmitAlarmAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
||||
public ValueTask EmitRawAsync(MxEvent ev) => _stream.Writer.WriteAsync(ev);
|
||||
}
|
||||
}
|
||||
+213
@@ -0,0 +1,213 @@
|
||||
using System.Runtime.CompilerServices;
|
||||
using Google.Protobuf.WellKnownTypes;
|
||||
using MxGateway.Contracts.Proto;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Runtime;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Driver.Galaxy.Tests.Runtime;
|
||||
|
||||
/// <summary>
|
||||
/// Pins <see cref="GatewayGalaxyAlarmFeed"/> — the session-less consumer of the
|
||||
/// gateway's <c>StreamAlarms</c> feed. Synthetic <see cref="AlarmFeedMessage"/>s go
|
||||
/// in through the stream-factory seam; the feed fires <c>OnAlarmTransition</c> with
|
||||
/// decoded payloads and mapped severity buckets, drops malformed messages, and
|
||||
/// re-opens the stream after a transport fault.
|
||||
/// </summary>
|
||||
public sealed class GatewayGalaxyAlarmFeedTests
|
||||
{
|
||||
[Fact]
|
||||
public async Task Decodes_active_alarm_snapshot_then_live_transition()
|
||||
{
|
||||
var raise = new DateTime(2026, 5, 1, 12, 0, 0, DateTimeKind.Utc);
|
||||
var messages = new[]
|
||||
{
|
||||
SnapshotMessage("Tank01.Level.HiHi", AlarmConditionState.Active, severity: 750,
|
||||
lastTransition: raise),
|
||||
SnapshotMessage("Tank02.Level.HiHi", AlarmConditionState.ActiveAcked, severity: 500,
|
||||
lastTransition: raise, operatorUser: "alice", operatorComment: "investigating"),
|
||||
new AlarmFeedMessage { SnapshotComplete = true },
|
||||
TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Clear, severity: 750,
|
||||
transitionTime: raise.AddMinutes(5), originalRaise: raise),
|
||||
};
|
||||
|
||||
var observed = new List<GalaxyAlarmTransition>();
|
||||
var got3 = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
await using var feed = new GatewayGalaxyAlarmFeed(
|
||||
(_, ct) => OpenStream(messages, ct), clientName: "FeedTest");
|
||||
feed.OnAlarmTransition += (_, t) =>
|
||||
{
|
||||
lock (observed)
|
||||
{
|
||||
observed.Add(t);
|
||||
if (observed.Count == 3) got3.TrySetResult(true);
|
||||
}
|
||||
};
|
||||
feed.Start();
|
||||
|
||||
(await Task.WhenAny(got3.Task, Task.Delay(TimeSpan.FromSeconds(2))))
|
||||
.ShouldBe(got3.Task, "snapshot + transition should dispatch within 2s");
|
||||
|
||||
observed.Count.ShouldBe(3);
|
||||
|
||||
// Active snapshot entry → Raise.
|
||||
observed[0].AlarmFullReference.ShouldBe("Tank01.Level.HiHi");
|
||||
observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise);
|
||||
observed[0].SeverityBucket.ShouldBe(AlarmSeverity.Critical);
|
||||
observed[0].RawMxAccessSeverity.ShouldBe(750);
|
||||
|
||||
// Acknowledged snapshot entry → Acknowledge, operator fields preserved.
|
||||
observed[1].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Acknowledge);
|
||||
observed[1].OperatorUser.ShouldBe("alice");
|
||||
observed[1].OperatorComment.ShouldBe("investigating");
|
||||
|
||||
// Live transition after snapshot_complete.
|
||||
observed[2].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Clear);
|
||||
observed[2].OriginalRaiseTimestampUtc.ShouldBe(raise);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Drops_transition_with_unspecified_kind_and_empty_message()
|
||||
{
|
||||
var messages = new[]
|
||||
{
|
||||
TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Unspecified, severity: 100,
|
||||
transitionTime: DateTime.UtcNow),
|
||||
new AlarmFeedMessage(), // empty oneof — version skew
|
||||
TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 600,
|
||||
transitionTime: DateTime.UtcNow),
|
||||
};
|
||||
|
||||
var observed = new List<GalaxyAlarmTransition>();
|
||||
var gotOne = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
await using var feed = new GatewayGalaxyAlarmFeed(
|
||||
(_, ct) => OpenStream(messages, ct), clientName: "FeedTest");
|
||||
feed.OnAlarmTransition += (_, t) =>
|
||||
{
|
||||
lock (observed)
|
||||
{
|
||||
observed.Add(t);
|
||||
gotOne.TrySetResult(true);
|
||||
}
|
||||
};
|
||||
feed.Start();
|
||||
|
||||
(await Task.WhenAny(gotOne.Task, Task.Delay(TimeSpan.FromSeconds(2))))
|
||||
.ShouldBe(gotOne.Task);
|
||||
|
||||
// Only the well-formed Raise survives; the Unspecified + empty messages drop.
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise);
|
||||
observed[0].SeverityBucket.ShouldBe(AlarmSeverity.High);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Reopens_stream_after_a_transport_fault()
|
||||
{
|
||||
var calls = 0;
|
||||
var liveTransition = new[]
|
||||
{
|
||||
TransitionMessage("Tank01.Level.HiHi", AlarmTransitionKind.Raise, severity: 750,
|
||||
transitionTime: DateTime.UtcNow),
|
||||
};
|
||||
|
||||
var observed = new List<GalaxyAlarmTransition>();
|
||||
var gotOne = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
|
||||
|
||||
await using var feed = new GatewayGalaxyAlarmFeed(
|
||||
(_, ct) =>
|
||||
{
|
||||
// First open faults; the feed must reconnect and succeed on the retry.
|
||||
if (Interlocked.Increment(ref calls) == 1)
|
||||
{
|
||||
throw new InvalidOperationException("synthetic stream fault");
|
||||
}
|
||||
return OpenStream(liveTransition, ct);
|
||||
},
|
||||
clientName: "ReconnectTest",
|
||||
reconnectDelay: TimeSpan.FromMilliseconds(20));
|
||||
feed.OnAlarmTransition += (_, t) =>
|
||||
{
|
||||
observed.Add(t);
|
||||
gotOne.TrySetResult(true);
|
||||
};
|
||||
feed.Start();
|
||||
|
||||
(await Task.WhenAny(gotOne.Task, Task.Delay(TimeSpan.FromSeconds(3))))
|
||||
.ShouldBe(gotOne.Task, "the feed should reopen the stream and deliver after a fault");
|
||||
|
||||
calls.ShouldBeGreaterThanOrEqualTo(2);
|
||||
observed.ShouldHaveSingleItem();
|
||||
observed[0].TransitionKind.ShouldBe(GalaxyAlarmTransitionKind.Raise);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Yields each message in order, then holds the stream open until the feed is
|
||||
/// disposed — mirrors a live server-streaming RPC that does not complete on its
|
||||
/// own.
|
||||
/// </summary>
|
||||
private static async IAsyncEnumerable<AlarmFeedMessage> OpenStream(
|
||||
IEnumerable<AlarmFeedMessage> messages,
|
||||
[EnumeratorCancellation] CancellationToken ct = default)
|
||||
{
|
||||
foreach (var message in messages)
|
||||
{
|
||||
ct.ThrowIfCancellationRequested();
|
||||
yield return message;
|
||||
await Task.Yield();
|
||||
}
|
||||
await Task.Delay(Timeout.Infinite, ct);
|
||||
}
|
||||
|
||||
private static AlarmFeedMessage SnapshotMessage(
|
||||
string fullReference,
|
||||
AlarmConditionState state,
|
||||
int severity,
|
||||
DateTime lastTransition,
|
||||
string operatorUser = "",
|
||||
string operatorComment = "")
|
||||
=> new()
|
||||
{
|
||||
ActiveAlarm = new ActiveAlarmSnapshot
|
||||
{
|
||||
AlarmFullReference = fullReference,
|
||||
SourceObjectReference = fullReference.Split('.')[0],
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
Severity = severity,
|
||||
CurrentState = state,
|
||||
Category = "Process",
|
||||
Description = "Tank high-high level",
|
||||
LastTransitionTimestamp = Timestamp.FromDateTime(lastTransition),
|
||||
OperatorUser = operatorUser,
|
||||
OperatorComment = operatorComment,
|
||||
},
|
||||
};
|
||||
|
||||
private static AlarmFeedMessage TransitionMessage(
|
||||
string fullReference,
|
||||
AlarmTransitionKind kind,
|
||||
int severity,
|
||||
DateTime transitionTime,
|
||||
DateTime? originalRaise = null)
|
||||
{
|
||||
var body = new OnAlarmTransitionEvent
|
||||
{
|
||||
AlarmFullReference = fullReference,
|
||||
SourceObjectReference = fullReference.Split('.')[0],
|
||||
AlarmTypeName = "AnalogLimitAlarm.HiHi",
|
||||
TransitionKind = kind,
|
||||
Severity = severity,
|
||||
TransitionTimestamp = Timestamp.FromDateTime(transitionTime),
|
||||
Category = "Process",
|
||||
Description = "Tank high-high level",
|
||||
};
|
||||
if (originalRaise is { } orts)
|
||||
{
|
||||
body.OriginalRaiseTimestamp = Timestamp.FromDateTime(orts);
|
||||
}
|
||||
return new AlarmFeedMessage { Transition = body };
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user