using Akka.Actor; using Akka.TestKit.Xunit2; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging.Abstractions; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Deployment; using ZB.MOM.WW.ScadaBridge.Commons.Messages.DebugView; using ZB.MOM.WW.ScadaBridge.Commons.Messages.InboundApi; using ZB.MOM.WW.ScadaBridge.Commons.Messages.Lifecycle; using ZB.MOM.WW.ScadaBridge.Commons.Types; using ZB.MOM.WW.ScadaBridge.Commons.Types.Enums; using ZB.MOM.WW.ScadaBridge.Commons.Types.Flattening; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Actors; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Persistence; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Scripts; using ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.TestSupport; using System.Text.Json; namespace ZB.MOM.WW.ScadaBridge.SiteRuntime.Tests.Actors; /// /// Tests for DeploymentManagerActor: startup from SQLite, staggered batching, /// lifecycle commands, and supervision strategy. /// public class DeploymentManagerActorTests : TestKit, IDisposable { private readonly SiteStorageService _storage; private readonly ScriptCompilationService _compilationService; private readonly SharedScriptLibrary _sharedScriptLibrary; private readonly string _dbFile; public DeploymentManagerActorTests() { _dbFile = Path.Combine(Path.GetTempPath(), $"dm-test-{Guid.NewGuid():N}.db"); _storage = new SiteStorageService( $"Data Source={_dbFile}", NullLogger.Instance); _storage.InitializeAsync().GetAwaiter().GetResult(); _compilationService = new ScriptCompilationService( NullLogger.Instance); _sharedScriptLibrary = new SharedScriptLibrary( _compilationService, NullLogger.Instance); } void IDisposable.Dispose() { Shutdown(); try { File.Delete(_dbFile); } catch { /* cleanup */ } } private IActorRef CreateDeploymentManager( SiteRuntimeOptions? options = null, IServiceProvider? serviceProvider = null) { options ??= new SiteRuntimeOptions(); return ActorOf(Props.Create(() => new DeploymentManagerActor( _storage, _compilationService, _sharedScriptLibrary, null, // no stream manager in tests options, NullLogger.Instance, null, null, null, serviceProvider, null))); } private static string MakeConfigJson(string instanceName) { var config = new FlattenedConfiguration { InstanceUniqueName = instanceName, Attributes = [ new ResolvedAttribute { CanonicalName = "TestAttr", Value = "42", DataType = "Int32" } ] }; return JsonSerializer.Serialize(config); } /// /// Builds a config carrying a single STATIC List attribute whose canonical /// JSON-array default the Instance Actor decodes into a typed List<int> /// in memory (InstanceActor.PreStart / DecodeAttributeValue). Used to drive a /// routed wait whose matched value is a collection — the shape WS-4 normalizes /// for cross-process transport. /// private static string MakeConfigWithListAttributeJson(string instanceName) { var config = new FlattenedConfiguration { InstanceUniqueName = instanceName, Attributes = [ new ResolvedAttribute { CanonicalName = "Setpoints", Value = "[10,20,30]", DataType = "List", ElementDataType = "Int32" } ] }; return JsonSerializer.Serialize(config); } /// /// Builds a config carrying a single callable (no-trigger) script that /// returns a constant — enough for an inbound /// to be routed end-to-end through the Instance/Script/ScriptExecution actors. /// private static string MakeConfigWithScriptJson(string instanceName, string scriptName) { var config = new FlattenedConfiguration { InstanceUniqueName = instanceName, Attributes = [ new ResolvedAttribute { CanonicalName = "TestAttr", Value = "42", DataType = "Int32" } ], Scripts = [ new ResolvedScript { CanonicalName = scriptName, Code = "return 7;" } ] }; return JsonSerializer.Serialize(config); } [Fact] public async Task DeploymentManager_CreatesInstanceActors_FromStoredConfigs() { // Pre-populate SQLite with deployed configs await _storage.StoreDeployedConfigAsync("Pump1", MakeConfigJson("Pump1"), "d1", "h1", true); await _storage.StoreDeployedConfigAsync("Pump2", MakeConfigJson("Pump2"), "d2", "h2", true); var actor = CreateDeploymentManager( new SiteRuntimeOptions { StartupBatchSize = 100, StartupBatchDelayMs = 10 }); // Allow time for async startup (load configs + create actors) await Task.Delay(2000); // Verify by deploying -- if actors already exist, we'd get a warning // Instead, verify by checking we can send lifecycle commands actor.Tell(new DisableInstanceCommand("cmd-1", "Pump1", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(response.Success); Assert.Equal("Pump1", response.InstanceUniqueName); } [Fact] public async Task DeploymentManager_SkipsDisabledInstances_OnStartup() { await _storage.StoreDeployedConfigAsync("Active1", MakeConfigJson("Active1"), "d1", "h1", true); await _storage.StoreDeployedConfigAsync("Disabled1", MakeConfigJson("Disabled1"), "d2", "h2", false); var actor = CreateDeploymentManager( new SiteRuntimeOptions { StartupBatchSize = 100, StartupBatchDelayMs = 10 }); await Task.Delay(2000); // The disabled instance should NOT have an actor running // Try to disable it -- it should succeed (no actor to stop, but SQLite update works) actor.Tell(new DisableInstanceCommand("cmd-2", "Disabled1", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(response.Success); } [Fact] public async Task DeploymentManager_StaggeredBatchCreation() { // Create more instances than the batch size for (int i = 0; i < 5; i++) { var name = $"Batch{i}"; await _storage.StoreDeployedConfigAsync(name, MakeConfigJson(name), $"d{i}", $"h{i}", true); } // Use a small batch size to force multiple batches var actor = CreateDeploymentManager( new SiteRuntimeOptions { StartupBatchSize = 2, StartupBatchDelayMs = 50 }); // Wait for all batches to complete (3 batches with 50ms delay = ~150ms + processing) await Task.Delay(3000); // Verify all instances are running by disabling them for (int i = 0; i < 5; i++) { actor.Tell(new DisableInstanceCommand($"cmd-{i}", $"Batch{i}", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(response.Success); } } [Fact] public async Task DeploymentManager_Deploy_CreatesNewInstance() { var actor = CreateDeploymentManager(); await Task.Delay(500); // Wait for empty startup actor.Tell(new DeployInstanceCommand( "dep-100", "NewPump", "sha256:xyz", MakeConfigJson("NewPump"), "admin", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal(DeploymentStatus.Success, response.Status); Assert.Equal("NewPump", response.InstanceUniqueName); } // ── M1.6: site event log `deployment` category ───────────────────────── [Fact] public async Task DeploymentManager_DeploySuccess_EmitsDeploymentSiteEvent() { var siteLog = new FakeSiteEventLogger(); var actor = CreateDeploymentManager(serviceProvider: new SingleServiceProvider(siteLog)); await Task.Delay(500); // wait for empty startup actor.Tell(new DeployInstanceCommand( "dep-evt-1", "AuditedPump", "sha256:xyz", MakeConfigJson("AuditedPump"), "admin", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal(DeploymentStatus.Success, response.Status); AwaitAssert(() => { var rows = siteLog.OfType("deployment"); Assert.Contains(rows, r => r.Severity == "Info" && r.InstanceId == "AuditedPump" && r.Source == "DeploymentManagerActor" && r.Message.Contains("deploy", StringComparison.OrdinalIgnoreCase)); }, TimeSpan.FromSeconds(2)); } [Fact] public async Task DeploymentManager_DisableEnableDelete_EmitDeploymentSiteEvents() { var siteLog = new FakeSiteEventLogger(); var actor = CreateDeploymentManager(serviceProvider: new SingleServiceProvider(siteLog)); await Task.Delay(500); actor.Tell(new DeployInstanceCommand( "dep-evt-2", "EvtPump", "sha256:abc", MakeConfigJson("EvtPump"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(1000); // The deployment site events are emitted fire-and-forget off the actor // thread (LogDeploymentEvent runs in a ContinueWith continuation), so // poll for each event with AwaitAssert rather than a bare Task.Delay — // a fixed sleep is racy under CI load. actor.Tell(new DisableInstanceCommand("cmd-de1", "EvtPump", DateTimeOffset.UtcNow)); Assert.True(ExpectMsg(TimeSpan.FromSeconds(5)).Success); AwaitAssert(() => Assert.Contains(siteLog.OfType("deployment"), r => r.Message.Contains("disabled", StringComparison.OrdinalIgnoreCase)), TimeSpan.FromSeconds(2)); actor.Tell(new EnableInstanceCommand("cmd-en1", "EvtPump", DateTimeOffset.UtcNow)); Assert.True(ExpectMsg(TimeSpan.FromSeconds(5)).Success); AwaitAssert(() => Assert.Contains(siteLog.OfType("deployment"), r => r.Message.Contains("enabled", StringComparison.OrdinalIgnoreCase)), TimeSpan.FromSeconds(2)); actor.Tell(new DeleteInstanceCommand("cmd-del-evt", "EvtPump", DateTimeOffset.UtcNow)); Assert.True(ExpectMsg(TimeSpan.FromSeconds(5)).Success); AwaitAssert(() => Assert.Contains(siteLog.OfType("deployment"), r => r.Message.Contains("deleted", StringComparison.OrdinalIgnoreCase)), TimeSpan.FromSeconds(2)); } [Fact] public async Task DeploymentManager_Lifecycle_DisableEnableDelete() { var actor = CreateDeploymentManager(); await Task.Delay(500); // Deploy actor.Tell(new DeployInstanceCommand( "dep-200", "LifecyclePump", "sha256:abc", MakeConfigJson("LifecyclePump"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); // Wait for the async deploy persistence (PipeTo) to complete await Task.Delay(1000); // Disable actor.Tell(new DisableInstanceCommand("cmd-d1", "LifecyclePump", DateTimeOffset.UtcNow)); var disableResp = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(disableResp.Success); // Verify disabled in storage await Task.Delay(500); var configs = await _storage.GetAllDeployedConfigsAsync(); var pump = configs.FirstOrDefault(c => c.InstanceUniqueName == "LifecyclePump"); Assert.NotNull(pump); Assert.False(pump.IsEnabled); // Delete actor.Tell(new DeleteInstanceCommand("cmd-del1", "LifecyclePump", DateTimeOffset.UtcNow)); var deleteResp = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.True(deleteResp.Success); // Verify removed from storage await Task.Delay(500); configs = await _storage.GetAllDeployedConfigsAsync(); Assert.DoesNotContain(configs, c => c.InstanceUniqueName == "LifecyclePump"); } // ── DeploymentManager-006: query-the-site-before-redeploy ── [Fact] public async Task DeploymentStateQuery_DeployedInstance_ReturnsAppliedIdentity() { // A deployed instance must report its currently-applied deployment ID // and revision hash so central can reconcile before a re-deploy. await _storage.StoreDeployedConfigAsync( "QueriedPump", MakeConfigJson("QueriedPump"), "dep-applied", "sha256:applied", true); var actor = CreateDeploymentManager(); await Task.Delay(2000); // allow startup to load configs actor.Tell(new DeploymentStateQueryRequest("corr-q1", "QueriedPump", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("corr-q1", response.CorrelationId); Assert.Equal("QueriedPump", response.InstanceUniqueName); Assert.True(response.IsDeployed); Assert.Equal("dep-applied", response.AppliedDeploymentId); Assert.Equal("sha256:applied", response.AppliedRevisionHash); } [Fact] public async Task DeploymentStateQuery_UnknownInstance_ReturnsNotDeployed() { // An instance the site has never received a deployment for must report // IsDeployed=false with null applied identity. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new DeploymentStateQueryRequest("corr-q2", "NeverDeployed", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("corr-q2", response.CorrelationId); Assert.False(response.IsDeployed); Assert.Null(response.AppliedDeploymentId); Assert.Null(response.AppliedRevisionHash); } [Fact] public void DeploymentManager_SupervisionStrategy_ResumesOnException() { var actor = CreateDeploymentManager(); // The actor exists and is responsive -- supervision is configured actor.Tell(new DeployInstanceCommand( "dep-sup", "SupervisedPump", "sha256:sup", MakeConfigJson("SupervisedPump"), "admin", DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal(DeploymentStatus.Success, response.Status); } // ── Audit Log #23 (ParentExecutionId, Task 4): inbound-API routing ── [Fact] public async Task RouteInboundApiCall_WithParentExecutionId_RoutesToScriptSuccessfully() { // A RouteToCallRequest carrying a ParentExecutionId (the inbound // request's ExecutionId) must be mapped to a ScriptCallRequest and // routed end-to-end through the Instance/Script/ScriptExecution actors. // The additive ParentExecutionId field must not break that routing. var actor = CreateDeploymentManager(); await Task.Delay(500); // empty startup actor.Tell(new DeployInstanceCommand( "dep-route", "RoutedPump", "sha256:route", MakeConfigWithScriptJson("RoutedPump", "DoWork"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(1000); // let the InstanceActor + ScriptActor spin up var parentExecutionId = Guid.NewGuid(); actor.Tell(new RouteToCallRequest( "route-corr-1", "RoutedPump", "DoWork", Parameters: null, DateTimeOffset.UtcNow, ParentExecutionId: parentExecutionId)); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal("route-corr-1", response.CorrelationId); Assert.True(response.Success, $"Routed call failed: {response.ErrorMessage}"); Assert.Equal(7, Convert.ToInt32(response.ReturnValue)); } [Fact] public async Task RouteInboundApiCall_WithoutParentExecutionId_StillRoutes() { // A routed call with no ParentExecutionId (e.g. the Central UI sandbox) // is the additive-default path — it must route exactly as before. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new DeployInstanceCommand( "dep-route2", "RoutedPump2", "sha256:route2", MakeConfigWithScriptJson("RoutedPump2", "DoWork"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(1000); // No ParentExecutionId argument — exercises the additive `= null` default. actor.Tell(new RouteToCallRequest( "route-corr-2", "RoutedPump2", "DoWork", Parameters: null, DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal("route-corr-2", response.CorrelationId); Assert.True(response.Success, $"Routed call failed: {response.ErrorMessage}"); } // ── Spec §6 (WD-2b): routed RouteToWaitForAttributeRequest → InstanceActor ── [Fact] public async Task RouteInboundApiWaitForAttribute_AttributeAlreadyAtTarget_RepliesMatched() { // A routed wait whose target equals the instance's current (static) // attribute value must satisfy the InstanceActor fast-path and come back // Success:true, Matched:true with the matched value/quality. var actor = CreateDeploymentManager(); await Task.Delay(500); // empty startup // MakeConfigJson seeds a scalar static attribute "TestAttr" = "42" (Good). actor.Tell(new DeployInstanceCommand( "dep-wait", "WaitPump", "sha256:wait", MakeConfigJson("WaitPump"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(1000); // let the InstanceActor spin up + load static attrs // Encode the target the same way the InstanceActor encodes the current // value for its codec-equality match (value-equality only across the wire). var encodedTarget = AttributeValueCodec.Encode("42"); actor.Tell(new RouteToWaitForAttributeRequest( "wait-corr-1", "WaitPump", "TestAttr", encodedTarget, TimeSpan.FromSeconds(5), DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal("wait-corr-1", response.CorrelationId); Assert.True(response.Success, $"Routed wait failed: {response.ErrorMessage}"); Assert.True(response.Matched, "Expected fast-path match (attribute already at target)."); Assert.False(response.TimedOut); Assert.Equal("42", response.Value); Assert.Equal("Good", response.Quality); } [Fact] public async Task RouteInboundApiWaitForAttribute_UnknownInstance_RepliesNotFound() { // A routed wait for an instance that was never deployed to this site must // come back Success:false with a not-found message (routing-level outcome), // mirroring the other RouteTo* unknown-instance paths. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new RouteToWaitForAttributeRequest( "wait-corr-2", "NeverDeployedWait", "TestAttr", AttributeValueCodec.Encode("42"), TimeSpan.FromSeconds(5), DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("wait-corr-2", response.CorrelationId); Assert.False(response.Success); Assert.False(response.Matched); Assert.NotNull(response.ErrorMessage); Assert.Contains("not found", response.ErrorMessage!, StringComparison.OrdinalIgnoreCase); } [Fact] public async Task RouteInboundApiWaitForAttribute_ListValuedMatch_ReturnsSerializerSafeValue() { // WS-4: a routed wait whose matched attribute is a List-typed value comes // back from the Instance Actor as a concrete generic List — a shape the // cross-process (Newtonsoft) serializer cannot reliably round-trip, which would // otherwise silently drop the reply and hang the caller's Ask. The handler must // normalize the value (via the same NormalizeRoutedReturnValue projection that // RouteInboundApiCall uses) to a plain CLR graph that survives transport while // preserving the matched elements. var actor = CreateDeploymentManager(); await Task.Delay(500); // empty startup // Static List attribute "Setpoints" = [10,20,30] (decoded to List, Good). actor.Tell(new DeployInstanceCommand( "dep-wait-list", "WaitPumpList", "sha256:wait-list", MakeConfigWithListAttributeJson("WaitPumpList"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(1000); // let the InstanceActor spin up + decode the List default // Encode the target the same canonical way the Instance Actor encodes the // current List for its codec-equality match (value-equality across the wire). var encodedTarget = AttributeValueCodec.Encode(new List { 10, 20, 30 }); actor.Tell(new RouteToWaitForAttributeRequest( "wait-corr-list", "WaitPumpList", "Setpoints", encodedTarget, TimeSpan.FromSeconds(5), DateTimeOffset.UtcNow)); var response = ExpectMsg(TimeSpan.FromSeconds(10)); Assert.Equal("wait-corr-list", response.CorrelationId); Assert.True(response.Success, $"Routed wait failed: {response.ErrorMessage}"); Assert.True(response.Matched, "Expected fast-path match (List attribute already at target)."); Assert.False(response.TimedOut); Assert.Equal("Good", response.Quality); // The matched value must have been projected to a serializer-safe graph: // a plain List of primitives — NOT the Instance Actor's concrete // List (which Akka's cross-process serializer drops). The JSON round-trip // re-materializes the numbers as boxed primitives; assert numeric equality // without pinning the exact CLR numeric type (long vs double is an artifact). Assert.NotNull(response.Value); var list = Assert.IsType>(response.Value); Assert.Equal( new[] { 10.0, 20.0, 30.0 }, list.Select(e => Convert.ToDouble(e, System.Globalization.CultureInfo.InvariantCulture))); } // ── M2.11: Debug-view routing — unknown-instance not-found signal ── [Fact] public async Task RouteDebugSnapshot_UnknownInstance_SetsInstanceNotFound() { // An instance that was never deployed to this site must return a // DebugViewSnapshot with InstanceNotFound=true so the caller can // distinguish "not deployed here" from a deployed-but-empty instance. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new DebugSnapshotRequest("NeverDeployed", "snap-corr-1")); var snapshot = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("NeverDeployed", snapshot.InstanceUniqueName); Assert.True(snapshot.InstanceNotFound, "Expected InstanceNotFound=true for an instance not registered on this site."); Assert.Empty(snapshot.AttributeValues); Assert.Empty(snapshot.AlarmStates); } [Fact] public async Task RouteDebugViewSubscribe_UnknownInstance_SetsInstanceNotFound() { // Same contract for the subscribe path: unknown instance → InstanceNotFound=true. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new SubscribeDebugViewRequest("NeverDeployed2", "sub-corr-1")); var snapshot = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("NeverDeployed2", snapshot.InstanceUniqueName); Assert.True(snapshot.InstanceNotFound, "Expected InstanceNotFound=true for an instance not registered on this site."); Assert.Empty(snapshot.AttributeValues); Assert.Empty(snapshot.AlarmStates); } [Fact] public async Task RouteDebugSnapshot_KnownButEmptyInstance_DoesNotSetInstanceNotFound() { // A deployed instance with no runtime data yet must return an empty // snapshot with InstanceNotFound=false — the known-empty path is unchanged. var actor = CreateDeploymentManager(); await Task.Delay(500); actor.Tell(new DeployInstanceCommand( "dep-dbg", "EmptyPump", "sha256:dbg", MakeConfigJson("EmptyPump"), "admin", DateTimeOffset.UtcNow)); ExpectMsg(TimeSpan.FromSeconds(5)); await Task.Delay(800); // let InstanceActor spin up actor.Tell(new DebugSnapshotRequest("EmptyPump", "snap-corr-2")); var snapshot = ExpectMsg(TimeSpan.FromSeconds(5)); Assert.Equal("EmptyPump", snapshot.InstanceUniqueName); Assert.False(snapshot.InstanceNotFound, "A deployed (but empty) instance must NOT set InstanceNotFound."); } }