Files
lmxopcua/src/ZB.MOM.WW.LmxOpcUa.Host/OpcUaService.cs
Joseph Doherty 44177acf64 Add integration test harness: OpcUaServiceBuilder + OpcUaServerFixture
OpcUaServiceBuilder provides fluent API for constructing OpcUaService
with dependency overrides (IMxProxy, IGalaxyRepository, IMxAccessClient).
WithMxAccessClient skips the STA thread and COM interop entirely.

OpcUaServerFixture wraps the service lifecycle with automatic port
allocation (atomic counter starting at 16000), guaranteed cleanup via
IAsyncLifetime, and factory methods for common test scenarios:
- WithFakes() — FakeMxProxy + FakeGalaxyRepository with standard data
- WithFakeMxAccessClient() — bypasses COM, fastest for most tests

Also adds TestData helper with reusable hierarchy/attributes matching
gr/layout.md, and 5 fixture tests verifying startup, shutdown, port
isolation, and address space building.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 06:22:31 -04:00

301 lines
13 KiB
C#

using System;
using System.Threading;
using Microsoft.Extensions.Configuration;
using Serilog;
using ZB.MOM.WW.LmxOpcUa.Host.Configuration;
using ZB.MOM.WW.LmxOpcUa.Host.Domain;
using ZB.MOM.WW.LmxOpcUa.Host.GalaxyRepository;
using ZB.MOM.WW.LmxOpcUa.Host.Metrics;
using ZB.MOM.WW.LmxOpcUa.Host.MxAccess;
using ZB.MOM.WW.LmxOpcUa.Host.OpcUa;
using ZB.MOM.WW.LmxOpcUa.Host.Status;
namespace ZB.MOM.WW.LmxOpcUa.Host
{
/// <summary>
/// Full service implementation wiring all components together. (SVC-004, SVC-005, SVC-006)
/// </summary>
internal sealed class OpcUaService
{
private static readonly ILogger Log = Serilog.Log.ForContext<OpcUaService>();
private readonly AppConfiguration _config;
private readonly IMxProxy? _mxProxy;
private readonly IGalaxyRepository? _galaxyRepository;
private readonly IMxAccessClient? _mxAccessClientOverride;
private readonly bool _hasMxAccessClientOverride;
private CancellationTokenSource? _cts;
private PerformanceMetrics? _metrics;
private StaComThread? _staThread;
private MxAccessClient? _mxAccessClient;
private IMxAccessClient? _mxAccessClientForWiring;
private ChangeDetectionService? _changeDetection;
private OpcUaServerHost? _serverHost;
private LmxNodeManager? _nodeManager;
private HealthCheckService? _healthCheck;
private StatusReportService? _statusReport;
private StatusWebServer? _statusWebServer;
private GalaxyRepositoryStats? _galaxyStats;
/// <summary>
/// Production constructor. Loads configuration from appsettings.json.
/// </summary>
public OpcUaService()
{
var configuration = new ConfigurationBuilder()
.AddJsonFile("appsettings.json", optional: false)
.AddJsonFile($"appsettings.{Environment.GetEnvironmentVariable("DOTNET_ENVIRONMENT") ?? "Production"}.json", optional: true)
.AddEnvironmentVariables()
.Build();
_config = new AppConfiguration();
configuration.GetSection("OpcUa").Bind(_config.OpcUa);
configuration.GetSection("MxAccess").Bind(_config.MxAccess);
configuration.GetSection("GalaxyRepository").Bind(_config.GalaxyRepository);
configuration.GetSection("Dashboard").Bind(_config.Dashboard);
_mxProxy = new MxProxyAdapter();
_galaxyRepository = new GalaxyRepositoryService(_config.GalaxyRepository);
}
/// <summary>
/// Test constructor. Accepts injected dependencies.
/// </summary>
internal OpcUaService(AppConfiguration config, IMxProxy? mxProxy, IGalaxyRepository? galaxyRepository,
IMxAccessClient? mxAccessClientOverride = null, bool hasMxAccessClientOverride = false)
{
_config = config;
_mxProxy = mxProxy;
_galaxyRepository = galaxyRepository;
_mxAccessClientOverride = mxAccessClientOverride;
_hasMxAccessClientOverride = hasMxAccessClientOverride;
}
public void Start()
{
Log.Information("LmxOpcUa service starting");
try
{
// Step 2: Validate config
if (!ConfigurationValidator.ValidateAndLog(_config))
{
Log.Error("Configuration validation failed");
throw new InvalidOperationException("Configuration validation failed");
}
// Step 3: Register exception handler (SVC-006)
AppDomain.CurrentDomain.UnhandledException += OnUnhandledException;
// Step 4: Create PerformanceMetrics
_cts = new CancellationTokenSource();
_metrics = new PerformanceMetrics();
// Step 5: Create MxAccessClient → Connect
if (_hasMxAccessClientOverride)
{
// Test path: use injected IMxAccessClient directly (skips STA thread + COM)
_mxAccessClientForWiring = _mxAccessClientOverride;
if (_mxAccessClientForWiring != null && _mxAccessClientForWiring.State != ConnectionState.Connected)
{
_mxAccessClientForWiring.ConnectAsync(_cts.Token).GetAwaiter().GetResult();
}
}
else if (_mxProxy != null)
{
try
{
_staThread = new StaComThread();
_staThread.Start();
_mxAccessClient = new MxAccessClient(_staThread, _mxProxy, _config.MxAccess, _metrics);
_mxAccessClient.ConnectAsync(_cts.Token).GetAwaiter().GetResult();
// Step 6: Start monitor loop
_mxAccessClient.StartMonitor();
}
catch (Exception ex)
{
Log.Warning(ex, "MxAccess connection failed — continuing without runtime data access");
_mxAccessClient?.Dispose();
_mxAccessClient = null;
_staThread?.Dispose();
_staThread = null;
}
}
// Step 7: Create GalaxyRepositoryService → TestConnection
_galaxyStats = new GalaxyRepositoryStats { GalaxyName = _config.OpcUa.GalaxyName };
if (_galaxyRepository != null)
{
var dbOk = _galaxyRepository.TestConnectionAsync(_cts.Token).GetAwaiter().GetResult();
_galaxyStats.DbConnected = dbOk;
if (!dbOk)
Log.Warning("Galaxy repository database connection failed — continuing without initial data");
}
// Step 8: Create OPC UA server host + node manager
var effectiveMxClient = (IMxAccessClient?)_mxAccessClient ?? _mxAccessClientForWiring ?? new NullMxAccessClient();
_serverHost = new OpcUaServerHost(_config.OpcUa, effectiveMxClient, _metrics);
// Step 9-10: Query hierarchy, start server, build address space
if (_galaxyRepository != null && _galaxyStats.DbConnected)
{
try
{
var hierarchy = _galaxyRepository.GetHierarchyAsync(_cts.Token).GetAwaiter().GetResult();
var attributes = _galaxyRepository.GetAttributesAsync(_cts.Token).GetAwaiter().GetResult();
_galaxyStats.ObjectCount = hierarchy.Count;
_galaxyStats.AttributeCount = attributes.Count;
_serverHost.StartAsync().GetAwaiter().GetResult();
_nodeManager = _serverHost.NodeManager;
if (_nodeManager != null)
{
_nodeManager.BuildAddressSpace(hierarchy, attributes);
_galaxyStats.LastRebuildTime = DateTime.UtcNow;
}
}
catch (Exception ex)
{
Log.Warning(ex, "Failed to build initial address space");
if (!_serverHost.IsRunning)
{
_serverHost.StartAsync().GetAwaiter().GetResult();
_nodeManager = _serverHost.NodeManager;
}
}
}
else
{
_serverHost.StartAsync().GetAwaiter().GetResult();
_nodeManager = _serverHost.NodeManager;
}
// Step 11-12: Change detection wired to rebuild
if (_galaxyRepository != null)
{
_changeDetection = new ChangeDetectionService(_galaxyRepository, _config.GalaxyRepository.ChangeDetectionIntervalSeconds);
_changeDetection.OnGalaxyChanged += OnGalaxyChanged;
_changeDetection.Start();
}
// Step 13: Dashboard
_healthCheck = new HealthCheckService();
_statusReport = new StatusReportService(_healthCheck, _config.Dashboard.RefreshIntervalSeconds);
_statusReport.SetComponents(effectiveMxClient, _metrics, _galaxyStats, _serverHost);
if (_config.Dashboard.Enabled)
{
_statusWebServer = new StatusWebServer(_statusReport, _config.Dashboard.Port);
_statusWebServer.Start();
}
// Step 14
Log.Information("LmxOpcUa service started successfully");
}
catch (Exception ex)
{
Log.Fatal(ex, "LmxOpcUa service failed to start");
throw;
}
}
public void Stop()
{
Log.Information("LmxOpcUa service stopping");
try
{
_cts?.Cancel();
_changeDetection?.Stop();
_serverHost?.Stop();
if (_mxAccessClient != null)
{
_mxAccessClient.StopMonitor();
_mxAccessClient.DisconnectAsync().GetAwaiter().GetResult();
_mxAccessClient.Dispose();
}
_staThread?.Dispose();
_statusWebServer?.Dispose();
_metrics?.Dispose();
_changeDetection?.Dispose();
_cts?.Dispose();
AppDomain.CurrentDomain.UnhandledException -= OnUnhandledException;
}
catch (Exception ex)
{
Log.Warning(ex, "Error during service shutdown");
}
Log.Information("Service shutdown complete");
}
private void OnGalaxyChanged()
{
Log.Information("Galaxy change detected — rebuilding address space");
try
{
if (_galaxyRepository == null || _nodeManager == null) return;
var hierarchy = _galaxyRepository.GetHierarchyAsync().GetAwaiter().GetResult();
var attributes = _galaxyRepository.GetAttributesAsync().GetAwaiter().GetResult();
_nodeManager.RebuildAddressSpace(hierarchy, attributes);
if (_galaxyStats != null)
{
_galaxyStats.ObjectCount = hierarchy.Count;
_galaxyStats.AttributeCount = attributes.Count;
_galaxyStats.LastRebuildTime = DateTime.UtcNow;
_galaxyStats.LastDeployTime = _changeDetection?.LastKnownDeployTime;
}
}
catch (Exception ex)
{
Log.Error(ex, "Failed to rebuild address space");
}
}
private static void OnUnhandledException(object sender, UnhandledExceptionEventArgs e)
{
Log.Fatal(e.ExceptionObject as Exception, "Unhandled exception (IsTerminating={IsTerminating})", e.IsTerminating);
}
// Accessors for testing
internal IMxAccessClient? MxClient => (IMxAccessClient?)_mxAccessClient ?? _mxAccessClientForWiring;
internal PerformanceMetrics? Metrics => _metrics;
internal OpcUaServerHost? ServerHost => _serverHost;
internal LmxNodeManager? NodeManagerInstance => _nodeManager;
internal ChangeDetectionService? ChangeDetectionInstance => _changeDetection;
internal StatusWebServer? StatusWeb => _statusWebServer;
internal StatusReportService? StatusReportInstance => _statusReport;
internal GalaxyRepositoryStats? GalaxyStatsInstance => _galaxyStats;
}
/// <summary>
/// Null implementation of IMxAccessClient for when MXAccess is not available.
/// </summary>
internal sealed class NullMxAccessClient : IMxAccessClient
{
public ConnectionState State => ConnectionState.Disconnected;
public int ActiveSubscriptionCount => 0;
public int ReconnectCount => 0;
public event EventHandler<ConnectionStateChangedEventArgs>? ConnectionStateChanged;
public event Action<string, Vtq>? OnTagValueChanged;
public System.Threading.Tasks.Task ConnectAsync(CancellationToken ct = default) => System.Threading.Tasks.Task.CompletedTask;
public System.Threading.Tasks.Task DisconnectAsync() => System.Threading.Tasks.Task.CompletedTask;
public System.Threading.Tasks.Task SubscribeAsync(string fullTagReference, Action<string, Vtq> callback) => System.Threading.Tasks.Task.CompletedTask;
public System.Threading.Tasks.Task UnsubscribeAsync(string fullTagReference) => System.Threading.Tasks.Task.CompletedTask;
public System.Threading.Tasks.Task<Vtq> ReadAsync(string fullTagReference, CancellationToken ct = default) => System.Threading.Tasks.Task.FromResult(Vtq.Bad());
public System.Threading.Tasks.Task<bool> WriteAsync(string fullTagReference, object value, CancellationToken ct = default) => System.Threading.Tasks.Task.FromResult(false);
public void Dispose() { }
}
}