diff --git a/src/ScadaLink.Commons/Entities/Deployment/DeployedConfigSnapshot.cs b/src/ScadaLink.Commons/Entities/Deployment/DeployedConfigSnapshot.cs new file mode 100644 index 0000000..a16a198 --- /dev/null +++ b/src/ScadaLink.Commons/Entities/Deployment/DeployedConfigSnapshot.cs @@ -0,0 +1,27 @@ +namespace ScadaLink.Commons.Entities.Deployment; + +/// +/// WP-8: Stores the deployed configuration snapshot for an instance. +/// Captured at deploy time; compared against template-derived (live flattened) config for staleness detection. +/// +public class DeployedConfigSnapshot +{ + public int Id { get; set; } + public int InstanceId { get; set; } + public string DeploymentId { get; set; } + public string RevisionHash { get; set; } + + /// + /// JSON-serialized FlattenedConfiguration captured at deploy time. + /// + public string ConfigurationJson { get; set; } + + public DateTimeOffset DeployedAt { get; set; } + + public DeployedConfigSnapshot(string deploymentId, string revisionHash, string configurationJson) + { + DeploymentId = deploymentId ?? throw new ArgumentNullException(nameof(deploymentId)); + RevisionHash = revisionHash ?? throw new ArgumentNullException(nameof(revisionHash)); + ConfigurationJson = configurationJson ?? throw new ArgumentNullException(nameof(configurationJson)); + } +} diff --git a/src/ScadaLink.Commons/Entities/Deployment/DeploymentRecord.cs b/src/ScadaLink.Commons/Entities/Deployment/DeploymentRecord.cs index d0c95ed..d7d221b 100644 --- a/src/ScadaLink.Commons/Entities/Deployment/DeploymentRecord.cs +++ b/src/ScadaLink.Commons/Entities/Deployment/DeploymentRecord.cs @@ -12,6 +12,12 @@ public class DeploymentRecord public string DeployedBy { get; set; } public DateTimeOffset DeployedAt { get; set; } public DateTimeOffset? CompletedAt { get; set; } + public string? ErrorMessage { get; set; } + + /// + /// WP-4: Optimistic concurrency token for deployment status updates. + /// + public byte[] RowVersion { get; set; } = []; public DeploymentRecord(string deploymentId, string deployedBy) { diff --git a/src/ScadaLink.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs b/src/ScadaLink.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs index ebe597a..ba3314e 100644 --- a/src/ScadaLink.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs +++ b/src/ScadaLink.Commons/Interfaces/Repositories/IDeploymentManagerRepository.cs @@ -1,4 +1,5 @@ using ScadaLink.Commons.Entities.Deployment; +using ScadaLink.Commons.Entities.Instances; using ScadaLink.Commons.Types.Enums; namespace ScadaLink.Commons.Interfaces.Repositories; @@ -22,5 +23,16 @@ public interface IDeploymentManagerRepository Task UpdateSystemArtifactDeploymentAsync(SystemArtifactDeploymentRecord record, CancellationToken cancellationToken = default); Task DeleteSystemArtifactDeploymentAsync(int id, CancellationToken cancellationToken = default); + // WP-8: DeployedConfigSnapshot + Task GetDeployedSnapshotByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default); + Task AddDeployedSnapshotAsync(DeployedConfigSnapshot snapshot, CancellationToken cancellationToken = default); + Task UpdateDeployedSnapshotAsync(DeployedConfigSnapshot snapshot, CancellationToken cancellationToken = default); + Task DeleteDeployedSnapshotAsync(int instanceId, CancellationToken cancellationToken = default); + + // Instance lookups for deployment pipeline + Task GetInstanceByIdAsync(int instanceId, CancellationToken cancellationToken = default); + Task GetInstanceByUniqueNameAsync(string uniqueName, CancellationToken cancellationToken = default); + Task UpdateInstanceAsync(Instance instance, CancellationToken cancellationToken = default); + Task SaveChangesAsync(CancellationToken cancellationToken = default); } diff --git a/src/ScadaLink.Commons/Types/Enums/InstanceState.cs b/src/ScadaLink.Commons/Types/Enums/InstanceState.cs index a331955..fa51156 100644 --- a/src/ScadaLink.Commons/Types/Enums/InstanceState.cs +++ b/src/ScadaLink.Commons/Types/Enums/InstanceState.cs @@ -2,6 +2,7 @@ namespace ScadaLink.Commons.Types.Enums; public enum InstanceState { + NotDeployed, Enabled, Disabled } diff --git a/src/ScadaLink.Commons/Types/Enums/StoreAndForwardCategory.cs b/src/ScadaLink.Commons/Types/Enums/StoreAndForwardCategory.cs new file mode 100644 index 0000000..5f38c01 --- /dev/null +++ b/src/ScadaLink.Commons/Types/Enums/StoreAndForwardCategory.cs @@ -0,0 +1,11 @@ +namespace ScadaLink.Commons.Types.Enums; + +/// +/// WP-9: Categories for store-and-forward messages. +/// +public enum StoreAndForwardCategory +{ + ExternalSystem, + Notification, + CachedDbWrite +} diff --git a/src/ScadaLink.Commons/Types/Enums/StoreAndForwardMessageStatus.cs b/src/ScadaLink.Commons/Types/Enums/StoreAndForwardMessageStatus.cs new file mode 100644 index 0000000..91ff6de --- /dev/null +++ b/src/ScadaLink.Commons/Types/Enums/StoreAndForwardMessageStatus.cs @@ -0,0 +1,12 @@ +namespace ScadaLink.Commons.Types.Enums; + +/// +/// WP-9: Status of a store-and-forward message. +/// +public enum StoreAndForwardMessageStatus +{ + Pending, + InFlight, + Parked, + Delivered +} diff --git a/src/ScadaLink.ConfigurationDatabase/Configurations/DeploymentConfiguration.cs b/src/ScadaLink.ConfigurationDatabase/Configurations/DeploymentConfiguration.cs index 0747028..4441be8 100644 --- a/src/ScadaLink.ConfigurationDatabase/Configurations/DeploymentConfiguration.cs +++ b/src/ScadaLink.ConfigurationDatabase/Configurations/DeploymentConfiguration.cs @@ -41,6 +41,33 @@ public class DeploymentRecordConfiguration : IEntityTypeConfiguration +{ + public void Configure(EntityTypeBuilder builder) + { + builder.HasKey(s => s.Id); + + builder.Property(s => s.DeploymentId) + .IsRequired() + .HasMaxLength(100); + + builder.Property(s => s.RevisionHash) + .IsRequired() + .HasMaxLength(100); + + builder.Property(s => s.ConfigurationJson) + .IsRequired(); + + builder.HasOne() + .WithMany() + .HasForeignKey(s => s.InstanceId) + .OnDelete(DeleteBehavior.Cascade); + + builder.HasIndex(s => s.InstanceId).IsUnique(); + builder.HasIndex(s => s.DeploymentId); + } +} + public class SystemArtifactDeploymentRecordConfiguration : IEntityTypeConfiguration { public void Configure(EntityTypeBuilder builder) diff --git a/src/ScadaLink.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs b/src/ScadaLink.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs index d9aaf3c..949eafb 100644 --- a/src/ScadaLink.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs +++ b/src/ScadaLink.ConfigurationDatabase/Repositories/DeploymentManagerRepository.cs @@ -1,5 +1,6 @@ using Microsoft.EntityFrameworkCore; using ScadaLink.Commons.Entities.Deployment; +using ScadaLink.Commons.Entities.Instances; using ScadaLink.Commons.Interfaces.Repositories; namespace ScadaLink.ConfigurationDatabase.Repositories; @@ -133,6 +134,59 @@ public class DeploymentManagerRepository : IDeploymentManagerRepository return Task.CompletedTask; } + // --- WP-8: DeployedConfigSnapshot --- + + public async Task GetDeployedSnapshotByInstanceIdAsync(int instanceId, CancellationToken cancellationToken = default) + { + return await _dbContext.Set() + .FirstOrDefaultAsync(s => s.InstanceId == instanceId, cancellationToken); + } + + public async Task AddDeployedSnapshotAsync(DeployedConfigSnapshot snapshot, CancellationToken cancellationToken = default) + { + await _dbContext.Set().AddAsync(snapshot, cancellationToken); + } + + public Task UpdateDeployedSnapshotAsync(DeployedConfigSnapshot snapshot, CancellationToken cancellationToken = default) + { + _dbContext.Set().Update(snapshot); + return Task.CompletedTask; + } + + public async Task DeleteDeployedSnapshotAsync(int instanceId, CancellationToken cancellationToken = default) + { + var snapshot = await _dbContext.Set() + .FirstOrDefaultAsync(s => s.InstanceId == instanceId, cancellationToken); + if (snapshot != null) + { + _dbContext.Set().Remove(snapshot); + } + } + + // --- Instance lookups for deployment pipeline --- + + public async Task GetInstanceByIdAsync(int instanceId, CancellationToken cancellationToken = default) + { + return await _dbContext.Set() + .Include(i => i.AttributeOverrides) + .Include(i => i.ConnectionBindings) + .FirstOrDefaultAsync(i => i.Id == instanceId, cancellationToken); + } + + public async Task GetInstanceByUniqueNameAsync(string uniqueName, CancellationToken cancellationToken = default) + { + return await _dbContext.Set() + .Include(i => i.AttributeOverrides) + .Include(i => i.ConnectionBindings) + .FirstOrDefaultAsync(i => i.UniqueName == uniqueName, cancellationToken); + } + + public Task UpdateInstanceAsync(Instance instance, CancellationToken cancellationToken = default) + { + _dbContext.Set().Update(instance); + return Task.CompletedTask; + } + public async Task SaveChangesAsync(CancellationToken cancellationToken = default) { return await _dbContext.SaveChangesAsync(cancellationToken); diff --git a/src/ScadaLink.ConfigurationDatabase/ScadaLinkDbContext.cs b/src/ScadaLink.ConfigurationDatabase/ScadaLinkDbContext.cs index e7596b5..fddb490 100644 --- a/src/ScadaLink.ConfigurationDatabase/ScadaLinkDbContext.cs +++ b/src/ScadaLink.ConfigurationDatabase/ScadaLinkDbContext.cs @@ -40,6 +40,7 @@ public class ScadaLinkDbContext : DbContext, IDataProtectionKeyContext // Deployment public DbSet DeploymentRecords => Set(); public DbSet SystemArtifactDeploymentRecords => Set(); + public DbSet DeployedConfigSnapshots => Set(); // External Systems public DbSet ExternalSystemDefinitions => Set(); diff --git a/src/ScadaLink.DeploymentManager/ArtifactDeploymentService.cs b/src/ScadaLink.DeploymentManager/ArtifactDeploymentService.cs new file mode 100644 index 0000000..15571bd --- /dev/null +++ b/src/ScadaLink.DeploymentManager/ArtifactDeploymentService.cs @@ -0,0 +1,178 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ScadaLink.Commons.Entities.Deployment; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Artifacts; +using ScadaLink.Commons.Types; +using ScadaLink.Communication; + +namespace ScadaLink.DeploymentManager; + +/// +/// WP-7: System-wide artifact deployment. +/// Broadcasts artifacts (shared scripts, external systems, notification lists, DB connections) +/// to all sites with per-site tracking. +/// +/// - Successful sites are NOT rolled back on other failures. +/// - Failed sites are retryable individually. +/// - 120s timeout per site. +/// - Cross-site version skew is supported. +/// +public class ArtifactDeploymentService +{ + private readonly ISiteRepository _siteRepo; + private readonly IDeploymentManagerRepository _deploymentRepo; + private readonly CommunicationService _communicationService; + private readonly IAuditService _auditService; + private readonly DeploymentManagerOptions _options; + private readonly ILogger _logger; + + public ArtifactDeploymentService( + ISiteRepository siteRepo, + IDeploymentManagerRepository deploymentRepo, + CommunicationService communicationService, + IAuditService auditService, + IOptions options, + ILogger logger) + { + _siteRepo = siteRepo; + _deploymentRepo = deploymentRepo; + _communicationService = communicationService; + _auditService = auditService; + _options = options.Value; + _logger = logger; + } + + /// + /// Deploys artifacts to all sites. Returns per-site result matrix. + /// + public async Task> DeployToAllSitesAsync( + DeployArtifactsCommand command, + string user, + CancellationToken cancellationToken = default) + { + var sites = await _siteRepo.GetAllSitesAsync(cancellationToken); + if (sites.Count == 0) + return Result.Failure("No sites configured."); + + var perSiteResults = new Dictionary(); + + // Deploy to each site with per-site timeout + var tasks = sites.Select(async site => + { + try + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(_options.ArtifactDeploymentTimeoutPerSite); + + _logger.LogInformation( + "Deploying artifacts to site {SiteId} ({SiteName}), deploymentId={DeploymentId}", + site.SiteIdentifier, site.Name, command.DeploymentId); + + var response = await _communicationService.DeployArtifactsAsync( + site.SiteIdentifier, command, cts.Token); + + return new SiteArtifactResult( + site.SiteIdentifier, site.Name, response.Success, response.ErrorMessage); + } + catch (Exception ex) when (ex is TimeoutException or OperationCanceledException or TaskCanceledException) + { + _logger.LogWarning( + "Artifact deployment to site {SiteId} timed out: {Error}", + site.SiteIdentifier, ex.Message); + + return new SiteArtifactResult( + site.SiteIdentifier, site.Name, false, $"Timeout: {ex.Message}"); + } + catch (Exception ex) + { + _logger.LogError(ex, + "Artifact deployment to site {SiteId} failed", + site.SiteIdentifier); + + return new SiteArtifactResult( + site.SiteIdentifier, site.Name, false, ex.Message); + } + }).ToList(); + + var results = await Task.WhenAll(tasks); + + foreach (var result in results) + { + perSiteResults[result.SiteId] = result; + } + + // Persist the system artifact deployment record + var record = new SystemArtifactDeploymentRecord("Artifacts", user) + { + DeployedAt = DateTimeOffset.UtcNow, + PerSiteStatus = JsonSerializer.Serialize(perSiteResults) + }; + await _deploymentRepo.AddSystemArtifactDeploymentAsync(record, cancellationToken); + await _deploymentRepo.SaveChangesAsync(cancellationToken); + + var summary = new ArtifactDeploymentSummary( + command.DeploymentId, + results.ToList(), + results.Count(r => r.Success), + results.Count(r => !r.Success)); + + await _auditService.LogAsync(user, "DeployArtifacts", "SystemArtifact", + command.DeploymentId, "Artifacts", + new { summary.SuccessCount, summary.FailureCount }, + cancellationToken); + + return Result.Success(summary); + } + + /// + /// WP-7: Retry artifact deployment to a specific site that previously failed. + /// + public async Task> RetryForSiteAsync( + string siteId, + DeployArtifactsCommand command, + string user, + CancellationToken cancellationToken = default) + { + try + { + using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); + cts.CancelAfter(_options.ArtifactDeploymentTimeoutPerSite); + + var response = await _communicationService.DeployArtifactsAsync(siteId, command, cts.Token); + + var result = new SiteArtifactResult(siteId, siteId, response.Success, response.ErrorMessage); + + await _auditService.LogAsync(user, "RetryArtifactDeployment", "SystemArtifact", + command.DeploymentId, siteId, new { response.Success }, cancellationToken); + + return response.Success + ? Result.Success(result) + : Result.Failure(response.ErrorMessage ?? "Retry failed."); + } + catch (Exception ex) + { + return Result.Failure($"Retry failed for site {siteId}: {ex.Message}"); + } + } +} + +/// +/// WP-7: Per-site result for artifact deployment. +/// +public record SiteArtifactResult( + string SiteId, + string SiteName, + bool Success, + string? ErrorMessage); + +/// +/// WP-7: Summary of system-wide artifact deployment with per-site results. +/// +public record ArtifactDeploymentSummary( + string DeploymentId, + IReadOnlyList SiteResults, + int SuccessCount, + int FailureCount); diff --git a/src/ScadaLink.DeploymentManager/DeploymentManagerOptions.cs b/src/ScadaLink.DeploymentManager/DeploymentManagerOptions.cs new file mode 100644 index 0000000..61cc382 --- /dev/null +++ b/src/ScadaLink.DeploymentManager/DeploymentManagerOptions.cs @@ -0,0 +1,16 @@ +namespace ScadaLink.DeploymentManager; + +/// +/// Configuration options for the central-side Deployment Manager. +/// +public class DeploymentManagerOptions +{ + /// Timeout for lifecycle commands sent to sites (disable, enable, delete). + public TimeSpan LifecycleCommandTimeout { get; set; } = TimeSpan.FromSeconds(30); + + /// WP-7: Timeout per site for system-wide artifact deployment. + public TimeSpan ArtifactDeploymentTimeoutPerSite { get; set; } = TimeSpan.FromSeconds(120); + + /// WP-3: Timeout for acquiring an operation lock on an instance. + public TimeSpan OperationLockTimeout { get; set; } = TimeSpan.FromSeconds(5); +} diff --git a/src/ScadaLink.DeploymentManager/DeploymentService.cs b/src/ScadaLink.DeploymentManager/DeploymentService.cs new file mode 100644 index 0000000..83cd204 --- /dev/null +++ b/src/ScadaLink.DeploymentManager/DeploymentService.cs @@ -0,0 +1,393 @@ +using System.Text.Json; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using ScadaLink.Commons.Entities.Deployment; +using ScadaLink.Commons.Entities.Instances; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Deployment; +using ScadaLink.Commons.Messages.Lifecycle; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Flattening; +using ScadaLink.Communication; +using ScadaLink.TemplateEngine.Flattening; +using ScadaLink.TemplateEngine.Validation; + +namespace ScadaLink.DeploymentManager; + +/// +/// WP-1: Central-side deployment orchestration service. +/// Coordinates the full deployment pipeline: +/// 1. Validate instance state transition (WP-4) +/// 2. Acquire per-instance operation lock (WP-3) +/// 3. Flatten configuration via TemplateEngine (captures template state at time of flatten -- WP-16) +/// 4. Validate flattened configuration +/// 5. Compute revision hash and diff +/// 6. Send DeployInstanceCommand to site via CommunicationService +/// 7. Track deployment status with optimistic concurrency (WP-4) +/// 8. Store deployed config snapshot (WP-8) +/// 9. Audit log all actions +/// +/// WP-2: Each deployment has a unique deployment ID (GUID) + revision hash. +/// WP-16: Template state captured at flatten time -- last-write-wins on templates is safe. +/// +public class DeploymentService +{ + private readonly IDeploymentManagerRepository _repository; + private readonly IFlatteningPipeline _flatteningPipeline; + private readonly CommunicationService _communicationService; + private readonly OperationLockManager _lockManager; + private readonly IAuditService _auditService; + private readonly DeploymentManagerOptions _options; + private readonly ILogger _logger; + + public DeploymentService( + IDeploymentManagerRepository repository, + IFlatteningPipeline flatteningPipeline, + CommunicationService communicationService, + OperationLockManager lockManager, + IAuditService auditService, + IOptions options, + ILogger logger) + { + _repository = repository; + _flatteningPipeline = flatteningPipeline; + _communicationService = communicationService; + _lockManager = lockManager; + _auditService = auditService; + _options = options.Value; + _logger = logger; + } + + /// + /// WP-1: Deploy an instance to its site. + /// WP-2: Generates unique deployment ID, computes revision hash. + /// WP-4: Validates state transitions, uses optimistic concurrency. + /// WP-5: Site-side apply is all-or-nothing (handled by DeploymentManagerActor). + /// WP-8: Stores deployed config snapshot on success. + /// WP-16: Captures template state at time of flatten. + /// + public async Task> DeployInstanceAsync( + int instanceId, + string user, + CancellationToken cancellationToken = default) + { + // Load instance + var instance = await _repository.GetInstanceByIdAsync(instanceId, cancellationToken); + if (instance == null) + return Result.Failure($"Instance with ID {instanceId} not found."); + + // WP-4: Validate state transition + var transitionError = StateTransitionValidator.ValidateTransition(instance.State, "deploy"); + if (transitionError != null) + return Result.Failure(transitionError); + + // WP-3: Acquire per-instance operation lock + using var lockHandle = await _lockManager.AcquireAsync( + instance.UniqueName, _options.OperationLockTimeout, cancellationToken); + + // WP-2: Generate unique deployment ID + var deploymentId = Guid.NewGuid().ToString("N"); + + // WP-1/16: Flatten configuration (captures template state at this point in time) + var flattenResult = await _flatteningPipeline.FlattenAndValidateAsync(instanceId, cancellationToken); + if (flattenResult.IsFailure) + return Result.Failure($"Validation failed: {flattenResult.Error}"); + + var flattenedConfig = flattenResult.Value.Configuration; + var revisionHash = flattenResult.Value.RevisionHash; + var validationResult = flattenResult.Value.Validation; + + if (!validationResult.IsValid) + { + var errors = string.Join("; ", validationResult.Errors.Select(e => e.Message)); + return Result.Failure($"Pre-deployment validation failed: {errors}"); + } + + // Serialize for transmission + var configJson = JsonSerializer.Serialize(flattenedConfig); + + // WP-4: Create deployment record with Pending status + var record = new DeploymentRecord(deploymentId, user) + { + InstanceId = instanceId, + Status = DeploymentStatus.Pending, + RevisionHash = revisionHash, + DeployedAt = DateTimeOffset.UtcNow + }; + + await _repository.AddDeploymentRecordAsync(record, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + // Update status to InProgress + record.Status = DeploymentStatus.InProgress; + await _repository.UpdateDeploymentRecordAsync(record, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + try + { + // WP-1: Send to site via CommunicationService + var siteId = instance.SiteId.ToString(); + var command = new DeployInstanceCommand( + deploymentId, instance.UniqueName, revisionHash, configJson, user, DateTimeOffset.UtcNow); + + _logger.LogInformation( + "Sending deployment {DeploymentId} for instance {Instance} to site {SiteId}", + deploymentId, instance.UniqueName, siteId); + + var response = await _communicationService.DeployInstanceAsync(siteId, command, cancellationToken); + + // WP-1: Update status based on site response + record.Status = response.Status; + record.ErrorMessage = response.ErrorMessage; + record.CompletedAt = DateTimeOffset.UtcNow; + await _repository.UpdateDeploymentRecordAsync(record, cancellationToken); + + if (response.Status == DeploymentStatus.Success) + { + // WP-4: Update instance state to Enabled on successful deployment + instance.State = InstanceState.Enabled; + await _repository.UpdateInstanceAsync(instance, cancellationToken); + + // WP-8: Store deployed config snapshot + await StoreDeployedSnapshotAsync(instanceId, deploymentId, revisionHash, configJson, cancellationToken); + } + + await _repository.SaveChangesAsync(cancellationToken); + + // Audit log + await _auditService.LogAsync(user, "Deploy", "Instance", instanceId.ToString(), + instance.UniqueName, new { DeploymentId = deploymentId, Status = record.Status.ToString() }, + cancellationToken); + + _logger.LogInformation( + "Deployment {DeploymentId} for instance {Instance}: {Status}", + deploymentId, instance.UniqueName, record.Status); + + return record.Status == DeploymentStatus.Success + ? Result.Success(record) + : Result.Failure( + $"Deployment failed: {response.ErrorMessage ?? "Unknown error"}"); + } + catch (Exception ex) when (ex is TimeoutException or OperationCanceledException) + { + record.Status = DeploymentStatus.Failed; + record.ErrorMessage = $"Communication failure: {ex.Message}"; + record.CompletedAt = DateTimeOffset.UtcNow; + await _repository.UpdateDeploymentRecordAsync(record, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + + await _auditService.LogAsync(user, "DeployFailed", "Instance", instanceId.ToString(), + instance.UniqueName, new { DeploymentId = deploymentId, Error = ex.Message }, + cancellationToken); + + return Result.Failure($"Deployment timed out: {ex.Message}"); + } + } + + /// + /// WP-6: Disable an instance. Stops Instance Actor, retains config, S&F drains. + /// + public async Task> DisableInstanceAsync( + int instanceId, + string user, + CancellationToken cancellationToken = default) + { + var instance = await _repository.GetInstanceByIdAsync(instanceId, cancellationToken); + if (instance == null) + return Result.Failure($"Instance with ID {instanceId} not found."); + + var transitionError = StateTransitionValidator.ValidateTransition(instance.State, "disable"); + if (transitionError != null) + return Result.Failure(transitionError); + + using var lockHandle = await _lockManager.AcquireAsync( + instance.UniqueName, _options.OperationLockTimeout, cancellationToken); + + var commandId = Guid.NewGuid().ToString("N"); + var siteId = instance.SiteId.ToString(); + var command = new DisableInstanceCommand(commandId, instance.UniqueName, DateTimeOffset.UtcNow); + + var response = await _communicationService.DisableInstanceAsync(siteId, command, cancellationToken); + + if (response.Success) + { + instance.State = InstanceState.Disabled; + await _repository.UpdateInstanceAsync(instance, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + } + + await _auditService.LogAsync(user, "Disable", "Instance", instanceId.ToString(), + instance.UniqueName, new { CommandId = commandId, response.Success }, + cancellationToken); + + return response.Success + ? Result.Success(response) + : Result.Failure(response.ErrorMessage ?? "Disable failed."); + } + + /// + /// WP-6: Enable an instance. Re-creates Instance Actor from stored config. + /// + public async Task> EnableInstanceAsync( + int instanceId, + string user, + CancellationToken cancellationToken = default) + { + var instance = await _repository.GetInstanceByIdAsync(instanceId, cancellationToken); + if (instance == null) + return Result.Failure($"Instance with ID {instanceId} not found."); + + var transitionError = StateTransitionValidator.ValidateTransition(instance.State, "enable"); + if (transitionError != null) + return Result.Failure(transitionError); + + using var lockHandle = await _lockManager.AcquireAsync( + instance.UniqueName, _options.OperationLockTimeout, cancellationToken); + + var commandId = Guid.NewGuid().ToString("N"); + var siteId = instance.SiteId.ToString(); + var command = new EnableInstanceCommand(commandId, instance.UniqueName, DateTimeOffset.UtcNow); + + var response = await _communicationService.EnableInstanceAsync(siteId, command, cancellationToken); + + if (response.Success) + { + instance.State = InstanceState.Enabled; + await _repository.UpdateInstanceAsync(instance, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + } + + await _auditService.LogAsync(user, "Enable", "Instance", instanceId.ToString(), + instance.UniqueName, new { CommandId = commandId, response.Success }, + cancellationToken); + + return response.Success + ? Result.Success(response) + : Result.Failure(response.ErrorMessage ?? "Enable failed."); + } + + /// + /// WP-6: Delete an instance. Stops actor, removes config. S&F NOT cleared. + /// Delete fails if site unreachable (30s timeout via CommunicationOptions). + /// + public async Task> DeleteInstanceAsync( + int instanceId, + string user, + CancellationToken cancellationToken = default) + { + var instance = await _repository.GetInstanceByIdAsync(instanceId, cancellationToken); + if (instance == null) + return Result.Failure($"Instance with ID {instanceId} not found."); + + var transitionError = StateTransitionValidator.ValidateTransition(instance.State, "delete"); + if (transitionError != null) + return Result.Failure(transitionError); + + using var lockHandle = await _lockManager.AcquireAsync( + instance.UniqueName, _options.OperationLockTimeout, cancellationToken); + + var commandId = Guid.NewGuid().ToString("N"); + var siteId = instance.SiteId.ToString(); + var command = new DeleteInstanceCommand(commandId, instance.UniqueName, DateTimeOffset.UtcNow); + + var response = await _communicationService.DeleteInstanceAsync(siteId, command, cancellationToken); + + if (response.Success) + { + // Remove deployed snapshot + await _repository.DeleteDeployedSnapshotAsync(instanceId, cancellationToken); + + // Set state to NotDeployed (or the instance record could be deleted entirely by higher layers) + instance.State = InstanceState.NotDeployed; + await _repository.UpdateInstanceAsync(instance, cancellationToken); + await _repository.SaveChangesAsync(cancellationToken); + } + + await _auditService.LogAsync(user, "Delete", "Instance", instanceId.ToString(), + instance.UniqueName, new { CommandId = commandId, response.Success }, + cancellationToken); + + return response.Success + ? Result.Success(response) + : Result.Failure( + response.ErrorMessage ?? "Delete failed. Site may be unreachable."); + } + + /// + /// WP-8: Get the deployed config snapshot and compare with current template-derived state. + /// + public async Task> GetDeploymentComparisonAsync( + int instanceId, + CancellationToken cancellationToken = default) + { + var snapshot = await _repository.GetDeployedSnapshotByInstanceIdAsync(instanceId, cancellationToken); + if (snapshot == null) + return Result.Failure("No deployed snapshot found for this instance."); + + // Compute current template-derived config + var currentResult = await _flatteningPipeline.FlattenAndValidateAsync(instanceId, cancellationToken); + if (currentResult.IsFailure) + return Result.Failure($"Cannot compute current config: {currentResult.Error}"); + + var currentHash = currentResult.Value.RevisionHash; + var isStale = snapshot.RevisionHash != currentHash; + + var result = new DeploymentComparisonResult( + instanceId, + snapshot.RevisionHash, + currentHash, + isStale, + snapshot.DeployedAt); + + return Result.Success(result); + } + + /// + /// WP-2: After failover/timeout, query site for current deployment state before re-deploying. + /// + public async Task GetDeploymentStatusAsync( + string deploymentId, + CancellationToken cancellationToken = default) + { + return await _repository.GetDeploymentByDeploymentIdAsync(deploymentId, cancellationToken); + } + + private async Task StoreDeployedSnapshotAsync( + int instanceId, + string deploymentId, + string revisionHash, + string configJson, + CancellationToken cancellationToken) + { + var existing = await _repository.GetDeployedSnapshotByInstanceIdAsync(instanceId, cancellationToken); + if (existing != null) + { + existing.DeploymentId = deploymentId; + existing.RevisionHash = revisionHash; + existing.ConfigurationJson = configJson; + existing.DeployedAt = DateTimeOffset.UtcNow; + await _repository.UpdateDeployedSnapshotAsync(existing, cancellationToken); + } + else + { + var snapshot = new DeployedConfigSnapshot(deploymentId, revisionHash, configJson) + { + InstanceId = instanceId, + DeployedAt = DateTimeOffset.UtcNow + }; + await _repository.AddDeployedSnapshotAsync(snapshot, cancellationToken); + } + } +} + +/// +/// WP-8: Result of comparing deployed vs template-derived configuration. +/// +public record DeploymentComparisonResult( + int InstanceId, + string DeployedRevisionHash, + string CurrentRevisionHash, + bool IsStale, + DateTimeOffset DeployedAt); diff --git a/src/ScadaLink.DeploymentManager/FlatteningPipeline.cs b/src/ScadaLink.DeploymentManager/FlatteningPipeline.cs new file mode 100644 index 0000000..36087a3 --- /dev/null +++ b/src/ScadaLink.DeploymentManager/FlatteningPipeline.cs @@ -0,0 +1,121 @@ +using ScadaLink.Commons.Entities.Sites; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Flattening; +using ScadaLink.TemplateEngine.Flattening; +using ScadaLink.TemplateEngine.Validation; + +namespace ScadaLink.DeploymentManager; + +/// +/// Orchestrates the TemplateEngine services (FlatteningService, ValidationService, RevisionHashService) +/// into a single pipeline for deployment use. +/// +/// WP-16: This captures template state at the time of flatten, ensuring that concurrent template edits +/// (last-write-wins) do not conflict with in-progress deployments. +/// +public class FlatteningPipeline : IFlatteningPipeline +{ + private readonly ITemplateEngineRepository _templateRepo; + private readonly ISiteRepository _siteRepo; + private readonly FlatteningService _flatteningService; + private readonly ValidationService _validationService; + private readonly RevisionHashService _revisionHashService; + + public FlatteningPipeline( + ITemplateEngineRepository templateRepo, + ISiteRepository siteRepo, + FlatteningService flatteningService, + ValidationService validationService, + RevisionHashService revisionHashService) + { + _templateRepo = templateRepo; + _siteRepo = siteRepo; + _flatteningService = flatteningService; + _validationService = validationService; + _revisionHashService = revisionHashService; + } + + public async Task> FlattenAndValidateAsync( + int instanceId, + CancellationToken cancellationToken = default) + { + // Load instance with full graph + var instance = await _templateRepo.GetInstanceByIdAsync(instanceId, cancellationToken); + if (instance == null) + return Result.Failure($"Instance with ID {instanceId} not found."); + + // Build template chain + var templateChain = await BuildTemplateChainAsync(instance.TemplateId, cancellationToken); + if (templateChain.Count == 0) + return Result.Failure("Template chain is empty."); + + // Build composition maps + var compositionMap = new Dictionary>(); + var composedChains = new Dictionary>(); + + foreach (var template in templateChain) + { + var compositions = await _templateRepo.GetCompositionsByTemplateIdAsync(template.Id, cancellationToken); + if (compositions.Count > 0) + { + compositionMap[template.Id] = compositions; + foreach (var comp in compositions) + { + if (!composedChains.ContainsKey(comp.ComposedTemplateId)) + { + composedChains[comp.ComposedTemplateId] = + await BuildTemplateChainAsync(comp.ComposedTemplateId, cancellationToken); + } + } + } + } + + // Load data connections for the site + var dataConnections = await LoadDataConnectionsAsync(instance.SiteId, cancellationToken); + + // Flatten + var flattenResult = _flatteningService.Flatten( + instance, templateChain, compositionMap, composedChains, dataConnections); + + if (flattenResult.IsFailure) + return Result.Failure(flattenResult.Error); + + var config = flattenResult.Value; + + // Validate + var validation = _validationService.Validate(config); + + // Compute revision hash + var hash = _revisionHashService.ComputeHash(config); + + return Result.Success( + new FlatteningPipelineResult(config, hash, validation)); + } + + private async Task> BuildTemplateChainAsync( + int templateId, + CancellationToken cancellationToken) + { + var chain = new List(); + var currentId = (int?)templateId; + + while (currentId.HasValue) + { + var template = await _templateRepo.GetTemplateWithChildrenAsync(currentId.Value, cancellationToken); + if (template == null) break; + chain.Add(template); + currentId = template.ParentTemplateId; + } + + return chain; + } + + private async Task> LoadDataConnectionsAsync( + int siteId, + CancellationToken cancellationToken) + { + var connections = await _siteRepo.GetDataConnectionsBySiteIdAsync(siteId, cancellationToken); + return connections.ToDictionary(c => c.Id); + } +} diff --git a/src/ScadaLink.DeploymentManager/IFlatteningPipeline.cs b/src/ScadaLink.DeploymentManager/IFlatteningPipeline.cs new file mode 100644 index 0000000..87b534d --- /dev/null +++ b/src/ScadaLink.DeploymentManager/IFlatteningPipeline.cs @@ -0,0 +1,27 @@ +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Flattening; + +namespace ScadaLink.DeploymentManager; + +/// +/// Abstraction over the TemplateEngine flattening + validation + hashing pipeline. +/// Used by DeploymentService to obtain a validated, hashed FlattenedConfiguration. +/// +public interface IFlatteningPipeline +{ + /// + /// Flattens and validates an instance configuration, returning the configuration, + /// revision hash, and validation result. + /// + Task> FlattenAndValidateAsync( + int instanceId, + CancellationToken cancellationToken = default); +} + +/// +/// Result of the flattening pipeline: configuration, hash, and validation. +/// +public record FlatteningPipelineResult( + FlattenedConfiguration Configuration, + string RevisionHash, + ValidationResult Validation); diff --git a/src/ScadaLink.DeploymentManager/OperationLockManager.cs b/src/ScadaLink.DeploymentManager/OperationLockManager.cs new file mode 100644 index 0000000..5ffef52 --- /dev/null +++ b/src/ScadaLink.DeploymentManager/OperationLockManager.cs @@ -0,0 +1,58 @@ +using System.Collections.Concurrent; + +namespace ScadaLink.DeploymentManager; + +/// +/// WP-3: Per-instance operation lock. Only one mutating operation (deploy, disable, enable, delete) +/// may be in progress per instance at a time. Different instances can proceed in parallel. +/// +/// Implementation: ConcurrentDictionary of SemaphoreSlim(1,1) keyed by instance unique name. +/// Lock released on completion, timeout, or failure. +/// Lost on central failover (acceptable per design -- in-progress treated as failed). +/// +public class OperationLockManager +{ + private readonly ConcurrentDictionary _locks = new(StringComparer.Ordinal); + + /// + /// Acquires the operation lock for the given instance. Returns a disposable that releases the lock. + /// Throws TimeoutException if the lock cannot be acquired within the timeout. + /// + public async Task AcquireAsync(string instanceUniqueName, TimeSpan timeout, CancellationToken cancellationToken = default) + { + var semaphore = _locks.GetOrAdd(instanceUniqueName, _ => new SemaphoreSlim(1, 1)); + + if (!await semaphore.WaitAsync(timeout, cancellationToken)) + { + throw new TimeoutException( + $"Could not acquire operation lock for instance '{instanceUniqueName}' within {timeout.TotalSeconds}s. " + + "Another mutating operation is in progress."); + } + + return new LockRelease(semaphore); + } + + /// + /// Checks whether a lock is currently held for the given instance (for diagnostics). + /// + public bool IsLocked(string instanceUniqueName) + { + return _locks.TryGetValue(instanceUniqueName, out var semaphore) && semaphore.CurrentCount == 0; + } + + private sealed class LockRelease : IDisposable + { + private readonly SemaphoreSlim _semaphore; + private int _disposed; + + public LockRelease(SemaphoreSlim semaphore) => _semaphore = semaphore; + + public void Dispose() + { + if (Interlocked.CompareExchange(ref _disposed, 1, 0) == 0) + { + _semaphore.Release(); + } + } + } +} diff --git a/src/ScadaLink.DeploymentManager/ScadaLink.DeploymentManager.csproj b/src/ScadaLink.DeploymentManager/ScadaLink.DeploymentManager.csproj index 049c7d9..1b77f28 100644 --- a/src/ScadaLink.DeploymentManager/ScadaLink.DeploymentManager.csproj +++ b/src/ScadaLink.DeploymentManager/ScadaLink.DeploymentManager.csproj @@ -9,11 +9,14 @@ + + + diff --git a/src/ScadaLink.DeploymentManager/ServiceCollectionExtensions.cs b/src/ScadaLink.DeploymentManager/ServiceCollectionExtensions.cs index 980afb9..b199a1f 100644 --- a/src/ScadaLink.DeploymentManager/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.DeploymentManager/ServiceCollectionExtensions.cs @@ -6,13 +6,16 @@ public static class ServiceCollectionExtensions { public static IServiceCollection AddDeploymentManager(this IServiceCollection services) { - // Phase 0: skeleton only + services.AddSingleton(); + services.AddScoped(); + services.AddScoped(); + services.AddScoped(); return services; } public static IServiceCollection AddDeploymentManagerActors(this IServiceCollection services) { - // Phase 0: placeholder for Akka actor registration + // Akka actor registration is handled by Host component during actor system startup return services; } } diff --git a/src/ScadaLink.DeploymentManager/StateTransitionValidator.cs b/src/ScadaLink.DeploymentManager/StateTransitionValidator.cs new file mode 100644 index 0000000..7780266 --- /dev/null +++ b/src/ScadaLink.DeploymentManager/StateTransitionValidator.cs @@ -0,0 +1,48 @@ +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.DeploymentManager; + +/// +/// WP-4: State transition matrix for instance lifecycle. +/// +/// State | Deploy | Disable | Enable | Delete +/// ----------|--------|---------|--------|------- +/// NotDeploy | OK | NO | NO | NO +/// Enabled | OK | OK | NO | OK +/// Disabled | OK* | NO | OK | OK +/// +/// * Deploy on a Disabled instance also enables it. +/// +public static class StateTransitionValidator +{ + public static bool CanDeploy(InstanceState currentState) => + currentState is InstanceState.NotDeployed or InstanceState.Enabled or InstanceState.Disabled; + + public static bool CanDisable(InstanceState currentState) => + currentState == InstanceState.Enabled; + + public static bool CanEnable(InstanceState currentState) => + currentState == InstanceState.Disabled; + + public static bool CanDelete(InstanceState currentState) => + currentState is InstanceState.Enabled or InstanceState.Disabled; + + /// + /// Returns a human-readable error message if the transition is invalid, or null if valid. + /// + public static string? ValidateTransition(InstanceState currentState, string operation) + { + var allowed = operation.ToLowerInvariant() switch + { + "deploy" => CanDeploy(currentState), + "disable" => CanDisable(currentState), + "enable" => CanEnable(currentState), + "delete" => CanDelete(currentState), + _ => false + }; + + if (allowed) return null; + + return $"Operation '{operation}' is not allowed when instance is in state '{currentState}'."; + } +} diff --git a/src/ScadaLink.StoreAndForward/ReplicationService.cs b/src/ScadaLink.StoreAndForward/ReplicationService.cs new file mode 100644 index 0000000..4c687f1 --- /dev/null +++ b/src/ScadaLink.StoreAndForward/ReplicationService.cs @@ -0,0 +1,136 @@ +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward; + +/// +/// WP-11: Async replication of buffer operations to standby node. +/// +/// - Forwards add/remove/park operations to standby via a replication handler. +/// - No ack wait (fire-and-forget per design). +/// - Standby applies operations to its own SQLite. +/// - On failover, standby resumes delivery from its replicated state. +/// +public class ReplicationService +{ + private readonly StoreAndForwardOptions _options; + private readonly ILogger _logger; + private Func? _replicationHandler; + + public ReplicationService( + StoreAndForwardOptions options, + ILogger logger) + { + _options = options; + _logger = logger; + } + + /// + /// Sets the handler for forwarding replication operations to the standby node. + /// Typically wraps Akka Tell to the standby's replication actor. + /// + public void SetReplicationHandler(Func handler) + { + _replicationHandler = handler; + } + + /// + /// WP-11: Replicates an enqueue operation to standby (fire-and-forget). + /// + public void ReplicateEnqueue(StoreAndForwardMessage message) + { + if (!_options.ReplicationEnabled || _replicationHandler == null) return; + + FireAndForget(new ReplicationOperation( + ReplicationOperationType.Add, + message.Id, + message)); + } + + /// + /// WP-11: Replicates a remove operation to standby (fire-and-forget). + /// + public void ReplicateRemove(string messageId) + { + if (!_options.ReplicationEnabled || _replicationHandler == null) return; + + FireAndForget(new ReplicationOperation( + ReplicationOperationType.Remove, + messageId, + null)); + } + + /// + /// WP-11: Replicates a park operation to standby (fire-and-forget). + /// + public void ReplicatePark(StoreAndForwardMessage message) + { + if (!_options.ReplicationEnabled || _replicationHandler == null) return; + + FireAndForget(new ReplicationOperation( + ReplicationOperationType.Park, + message.Id, + message)); + } + + /// + /// WP-11: Applies a replicated operation received from the active node. + /// Used by the standby node to keep its SQLite in sync. + /// + public async Task ApplyReplicatedOperationAsync( + ReplicationOperation operation, + StoreAndForwardStorage storage) + { + switch (operation.OperationType) + { + case ReplicationOperationType.Add when operation.Message != null: + await storage.EnqueueAsync(operation.Message); + break; + + case ReplicationOperationType.Remove: + await storage.RemoveMessageAsync(operation.MessageId); + break; + + case ReplicationOperationType.Park when operation.Message != null: + operation.Message.Status = StoreAndForwardMessageStatus.Parked; + await storage.UpdateMessageAsync(operation.Message); + break; + } + } + + private void FireAndForget(ReplicationOperation operation) + { + Task.Run(async () => + { + try + { + await _replicationHandler!.Invoke(operation); + } + catch (Exception ex) + { + // WP-11: No ack wait — log and move on + _logger.LogDebug(ex, + "Replication of {OpType} for message {MessageId} failed (best-effort)", + operation.OperationType, operation.MessageId); + } + }); + } +} + +/// +/// WP-11: Represents a buffer operation to be replicated to standby. +/// +public record ReplicationOperation( + ReplicationOperationType OperationType, + string MessageId, + StoreAndForwardMessage? Message); + +/// +/// WP-11: Types of buffer operations that are replicated. +/// +public enum ReplicationOperationType +{ + Add, + Remove, + Park +} diff --git a/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj b/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj index 049c7d9..a3b50f4 100644 --- a/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj +++ b/src/ScadaLink.StoreAndForward/ScadaLink.StoreAndForward.csproj @@ -8,7 +8,9 @@ + + @@ -16,4 +18,8 @@ + + + + diff --git a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs index 35e82ee..31c59d0 100644 --- a/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs +++ b/src/ScadaLink.StoreAndForward/ServiceCollectionExtensions.cs @@ -1,4 +1,6 @@ using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; namespace ScadaLink.StoreAndForward; @@ -6,13 +8,36 @@ public static class ServiceCollectionExtensions { public static IServiceCollection AddStoreAndForward(this IServiceCollection services) { - // Phase 0: skeleton only + services.AddSingleton(sp => + { + var options = sp.GetRequiredService>().Value; + var logger = sp.GetRequiredService>(); + return new StoreAndForwardStorage( + $"Data Source={options.SqliteDbPath}", + logger); + }); + + services.AddSingleton(sp => + { + var storage = sp.GetRequiredService(); + var options = sp.GetRequiredService>().Value; + var logger = sp.GetRequiredService>(); + return new StoreAndForwardService(storage, options, logger); + }); + + services.AddSingleton(sp => + { + var options = sp.GetRequiredService>().Value; + var logger = sp.GetRequiredService>(); + return new ReplicationService(options, logger); + }); + return services; } public static IServiceCollection AddStoreAndForwardActors(this IServiceCollection services) { - // Phase 0: placeholder for Akka actor registration + // Akka actor registration handled by Host component during actor system startup return services; } } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs new file mode 100644 index 0000000..ab0a5cf --- /dev/null +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardMessage.cs @@ -0,0 +1,49 @@ +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward; + +/// +/// WP-9: Represents a single store-and-forward message as stored in SQLite. +/// Maps to the sf_messages table. +/// +public class StoreAndForwardMessage +{ + /// Unique message ID (GUID). + public string Id { get; set; } = string.Empty; + + /// WP-9: Category: ExternalSystem, Notification, or CachedDbWrite. + public StoreAndForwardCategory Category { get; set; } + + /// Target system name (external system, notification list, or DB connection). + public string Target { get; set; } = string.Empty; + + /// JSON-serialized payload containing the call details. + public string PayloadJson { get; set; } = string.Empty; + + /// Number of delivery attempts so far. + public int RetryCount { get; set; } + + /// Maximum retry attempts before parking (0 = no limit). + public int MaxRetries { get; set; } + + /// Retry interval in milliseconds. + public long RetryIntervalMs { get; set; } + + /// When this message was first enqueued. + public DateTimeOffset CreatedAt { get; set; } + + /// When delivery was last attempted (null if never attempted). + public DateTimeOffset? LastAttemptAt { get; set; } + + /// Current status of the message. + public StoreAndForwardMessageStatus Status { get; set; } + + /// Last error message from a failed delivery attempt. + public string? LastError { get; set; } + + /// + /// Instance that originated this message (for S&F-survives-delete behavior). + /// WP-13: Messages are NOT cleared when instance is deleted. + /// + public string? OriginInstanceName { get; set; } +} diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardOptions.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardOptions.cs index 7155338..5db885d 100644 --- a/src/ScadaLink.StoreAndForward/StoreAndForwardOptions.cs +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardOptions.cs @@ -1,7 +1,22 @@ namespace ScadaLink.StoreAndForward; +/// +/// WP-9/10: Configuration options for the Store-and-Forward Engine. +/// public class StoreAndForwardOptions { + /// Path to the SQLite database for S&F message persistence. public string SqliteDbPath { get; set; } = "./data/store-and-forward.db"; + + /// WP-11: Whether to replicate buffer operations to standby node. public bool ReplicationEnabled { get; set; } = true; + + /// WP-10: Default retry interval for messages without per-source settings. + public TimeSpan DefaultRetryInterval { get; set; } = TimeSpan.FromSeconds(30); + + /// WP-10: Default maximum retry count before parking. + public int DefaultMaxRetries { get; set; } = 50; + + /// WP-10: Interval for the background retry timer sweep. + public TimeSpan RetryTimerInterval { get; set; } = TimeSpan.FromSeconds(10); } diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs new file mode 100644 index 0000000..2b3d8ff --- /dev/null +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardService.cs @@ -0,0 +1,322 @@ +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward; + +/// +/// WP-9/10: Core store-and-forward service. +/// +/// Lifecycle: +/// 1. Caller attempts immediate delivery via IDeliveryHandler +/// 2. On transient failure → buffer in SQLite → retry loop +/// 3. On success → remove from buffer +/// 4. On max retries → park +/// 5. Permanent failures are returned to caller immediately (never buffered) +/// +/// WP-10: Fixed retry interval (not exponential). Per-source-entity retry settings. +/// Background timer-based retry sweep. +/// +/// WP-12: Parked messages queryable, retryable, and discardable. +/// +/// WP-14: Buffer depth reported as health metric. Activity logged to site event log. +/// +/// WP-15: CachedCall idempotency is the caller's responsibility. +/// This service does not deduplicate — if the same message is enqueued twice, +/// it will be delivered twice. Callers using ExternalSystem.CachedCall() must +/// design their payloads to be idempotent (e.g., include unique request IDs +/// and handle duplicate detection on the remote end). +/// +public class StoreAndForwardService +{ + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardOptions _options; + private readonly ILogger _logger; + private Timer? _retryTimer; + private int _retryInProgress; + + /// + /// WP-10: Delivery handler delegate. Returns true on success, throws on transient failure. + /// Permanent failures should return false (message will NOT be buffered). + /// + private readonly Dictionary>> _deliveryHandlers = new(); + + /// + /// WP-14: Event callback for logging S&F activity to site event log. + /// + public event Action? OnActivity; + + public StoreAndForwardService( + StoreAndForwardStorage storage, + StoreAndForwardOptions options, + ILogger logger) + { + _storage = storage; + _options = options; + _logger = logger; + } + + /// + /// Registers a delivery handler for a given message category. + /// + public void RegisterDeliveryHandler( + StoreAndForwardCategory category, + Func> handler) + { + _deliveryHandlers[category] = handler; + } + + /// + /// Initializes storage and starts the background retry timer. + /// + public async Task StartAsync() + { + await _storage.InitializeAsync(); + _retryTimer = new Timer( + _ => _ = RetryPendingMessagesAsync(), + null, + _options.RetryTimerInterval, + _options.RetryTimerInterval); + + _logger.LogInformation( + "Store-and-forward service started. Retry interval: {Interval}s", + _options.DefaultRetryInterval.TotalSeconds); + } + + /// + /// Stops the background retry timer. + /// + public async Task StopAsync() + { + if (_retryTimer != null) + { + await _retryTimer.DisposeAsync(); + _retryTimer = null; + } + } + + /// + /// WP-10: Enqueues a message for store-and-forward delivery. + /// Attempts immediate delivery first. On transient failure, buffers for retry. + /// On permanent failure (handler returns false), returns false immediately. + /// + /// WP-15: CachedCall idempotency note — this method does not deduplicate. + /// The caller (e.g., ExternalSystem.CachedCall()) is responsible for ensuring + /// that the remote system can handle duplicate deliveries safely. + /// + public async Task EnqueueAsync( + StoreAndForwardCategory category, + string target, + string payloadJson, + string? originInstanceName = null, + int? maxRetries = null, + TimeSpan? retryInterval = null) + { + var message = new StoreAndForwardMessage + { + Id = Guid.NewGuid().ToString("N"), + Category = category, + Target = target, + PayloadJson = payloadJson, + RetryCount = 0, + MaxRetries = maxRetries ?? _options.DefaultMaxRetries, + RetryIntervalMs = (long)(retryInterval ?? _options.DefaultRetryInterval).TotalMilliseconds, + CreatedAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Pending, + OriginInstanceName = originInstanceName + }; + + // Attempt immediate delivery + if (_deliveryHandlers.TryGetValue(category, out var handler)) + { + try + { + var success = await handler(message); + if (success) + { + RaiseActivity("Delivered", category, $"Immediate delivery to {target}"); + return new StoreAndForwardResult(true, message.Id, false); + } + else + { + // Permanent failure — do not buffer + return new StoreAndForwardResult(false, message.Id, false); + } + } + catch (Exception ex) + { + // Transient failure — buffer for retry + _logger.LogWarning(ex, + "Immediate delivery to {Target} failed (transient), buffering for retry", + target); + + message.LastAttemptAt = DateTimeOffset.UtcNow; + message.RetryCount = 1; + message.LastError = ex.Message; + await _storage.EnqueueAsync(message); + + RaiseActivity("Queued", category, $"Buffered for retry: {target} ({ex.Message})"); + return new StoreAndForwardResult(true, message.Id, true); + } + } + + // No handler registered — buffer for later + await _storage.EnqueueAsync(message); + RaiseActivity("Queued", category, $"No handler registered, buffered: {target}"); + return new StoreAndForwardResult(true, message.Id, true); + } + + /// + /// WP-10: Background retry sweep. Processes all pending messages that are due for retry. + /// + internal async Task RetryPendingMessagesAsync() + { + // Prevent overlapping retry sweeps + if (Interlocked.CompareExchange(ref _retryInProgress, 1, 0) != 0) + return; + + try + { + var messages = await _storage.GetMessagesForRetryAsync(); + if (messages.Count == 0) return; + + _logger.LogDebug("Retry sweep: {Count} messages due for retry", messages.Count); + + foreach (var message in messages) + { + await RetryMessageAsync(message); + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error during retry sweep"); + } + finally + { + Interlocked.Exchange(ref _retryInProgress, 0); + } + } + + private async Task RetryMessageAsync(StoreAndForwardMessage message) + { + if (!_deliveryHandlers.TryGetValue(message.Category, out var handler)) + { + _logger.LogWarning("No delivery handler for category {Category}", message.Category); + return; + } + + try + { + var success = await handler(message); + if (success) + { + await _storage.RemoveMessageAsync(message.Id); + RaiseActivity("Delivered", message.Category, + $"Delivered to {message.Target} after {message.RetryCount} retries"); + return; + } + + // Permanent failure on retry — park immediately + message.Status = StoreAndForwardMessageStatus.Parked; + message.LastAttemptAt = DateTimeOffset.UtcNow; + message.LastError = "Permanent failure (handler returned false)"; + await _storage.UpdateMessageAsync(message); + RaiseActivity("Parked", message.Category, + $"Permanent failure for {message.Target}: handler returned false"); + } + catch (Exception ex) + { + // Transient failure — increment retry, check max + message.RetryCount++; + message.LastAttemptAt = DateTimeOffset.UtcNow; + message.LastError = ex.Message; + + if (message.MaxRetries > 0 && message.RetryCount >= message.MaxRetries) + { + message.Status = StoreAndForwardMessageStatus.Parked; + await _storage.UpdateMessageAsync(message); + RaiseActivity("Parked", message.Category, + $"Max retries ({message.MaxRetries}) reached for {message.Target}"); + _logger.LogWarning( + "Message {MessageId} parked after {MaxRetries} retries to {Target}", + message.Id, message.MaxRetries, message.Target); + } + else + { + await _storage.UpdateMessageAsync(message); + RaiseActivity("Retried", message.Category, + $"Retry {message.RetryCount}/{message.MaxRetries} for {message.Target}: {ex.Message}"); + } + } + } + + /// + /// WP-12: Gets parked messages for central query (Pattern 8). + /// + public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( + StoreAndForwardCategory? category = null, + int pageNumber = 1, + int pageSize = 50) + { + return await _storage.GetParkedMessagesAsync(category, pageNumber, pageSize); + } + + /// + /// WP-12: Retries a parked message (moves back to pending queue). + /// + public async Task RetryParkedMessageAsync(string messageId) + { + var success = await _storage.RetryParkedMessageAsync(messageId); + if (success) + { + RaiseActivity("Retry", StoreAndForwardCategory.ExternalSystem, + $"Parked message {messageId} moved back to queue"); + } + return success; + } + + /// + /// WP-12: Permanently discards a parked message. + /// + public async Task DiscardParkedMessageAsync(string messageId) + { + var success = await _storage.DiscardParkedMessageAsync(messageId); + if (success) + { + RaiseActivity("Discard", StoreAndForwardCategory.ExternalSystem, + $"Parked message {messageId} discarded"); + } + return success; + } + + /// + /// WP-14: Gets buffer depth by category for health reporting. + /// + public async Task> GetBufferDepthAsync() + { + return await _storage.GetBufferDepthByCategoryAsync(); + } + + /// + /// WP-13: Gets count of S&F messages for a given instance (for verifying survival on deletion). + /// + public async Task GetMessageCountForInstanceAsync(string instanceName) + { + return await _storage.GetMessageCountByOriginInstanceAsync(instanceName); + } + + private void RaiseActivity(string action, StoreAndForwardCategory category, string detail) + { + OnActivity?.Invoke(action, category, detail); + } +} + +/// +/// Result of an enqueue operation. +/// +public record StoreAndForwardResult( + /// True if the message was accepted (either delivered immediately or buffered). + bool Accepted, + /// Unique message ID for tracking. + string MessageId, + /// True if the message was buffered (not delivered immediately). + bool WasBuffered); diff --git a/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs new file mode 100644 index 0000000..b0a6f77 --- /dev/null +++ b/src/ScadaLink.StoreAndForward/StoreAndForwardStorage.cs @@ -0,0 +1,339 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward; + +/// +/// WP-9: SQLite persistence layer for store-and-forward messages. +/// Uses direct Microsoft.Data.Sqlite (not EF Core) for lightweight site-side storage. +/// No max buffer size per design decision. +/// +public class StoreAndForwardStorage +{ + private readonly string _connectionString; + private readonly ILogger _logger; + + public StoreAndForwardStorage(string connectionString, ILogger logger) + { + _connectionString = connectionString; + _logger = logger; + } + + /// + /// Creates the sf_messages table if it does not exist. + /// + public async Task InitializeAsync() + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var command = connection.CreateCommand(); + command.CommandText = @" + CREATE TABLE IF NOT EXISTS sf_messages ( + id TEXT PRIMARY KEY, + category INTEGER NOT NULL, + target TEXT NOT NULL, + payload_json TEXT NOT NULL, + retry_count INTEGER NOT NULL DEFAULT 0, + max_retries INTEGER NOT NULL DEFAULT 50, + retry_interval_ms INTEGER NOT NULL DEFAULT 30000, + created_at TEXT NOT NULL, + last_attempt_at TEXT, + status INTEGER NOT NULL DEFAULT 0, + last_error TEXT, + origin_instance TEXT + ); + + CREATE INDEX IF NOT EXISTS idx_sf_messages_status ON sf_messages(status); + CREATE INDEX IF NOT EXISTS idx_sf_messages_category ON sf_messages(category); + "; + await command.ExecuteNonQueryAsync(); + + _logger.LogInformation("Store-and-forward SQLite storage initialized"); + } + + /// + /// WP-9: Enqueues a new message with Pending status. + /// + public async Task EnqueueAsync(StoreAndForwardMessage message) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + INSERT INTO sf_messages (id, category, target, payload_json, retry_count, max_retries, + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance) + VALUES (@id, @category, @target, @payload, @retryCount, @maxRetries, + @retryIntervalMs, @createdAt, @lastAttempt, @status, @lastError, @origin)"; + + cmd.Parameters.AddWithValue("@id", message.Id); + cmd.Parameters.AddWithValue("@category", (int)message.Category); + cmd.Parameters.AddWithValue("@target", message.Target); + cmd.Parameters.AddWithValue("@payload", message.PayloadJson); + cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); + cmd.Parameters.AddWithValue("@maxRetries", message.MaxRetries); + cmd.Parameters.AddWithValue("@retryIntervalMs", message.RetryIntervalMs); + cmd.Parameters.AddWithValue("@createdAt", message.CreatedAt.ToString("O")); + cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue + ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); + cmd.Parameters.AddWithValue("@status", (int)message.Status); + cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); + cmd.Parameters.AddWithValue("@origin", (object?)message.OriginInstanceName ?? DBNull.Value); + + await cmd.ExecuteNonQueryAsync(); + } + + /// + /// WP-10: Gets all messages that are due for retry (Pending status, last attempt older than retry interval). + /// + public async Task> GetMessagesForRetryAsync() + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + SELECT id, category, target, payload_json, retry_count, max_retries, + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + FROM sf_messages + WHERE status = @pending + AND (last_attempt_at IS NULL + OR retry_interval_ms = 0 + OR (julianday('now') - julianday(last_attempt_at)) * 86400000 >= retry_interval_ms) + ORDER BY created_at ASC"; + + cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); + + return await ReadMessagesAsync(cmd); + } + + /// + /// WP-10: Updates a message after a delivery attempt. + /// + public async Task UpdateMessageAsync(StoreAndForwardMessage message) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + UPDATE sf_messages + SET retry_count = @retryCount, + last_attempt_at = @lastAttempt, + status = @status, + last_error = @lastError + WHERE id = @id"; + + cmd.Parameters.AddWithValue("@id", message.Id); + cmd.Parameters.AddWithValue("@retryCount", message.RetryCount); + cmd.Parameters.AddWithValue("@lastAttempt", message.LastAttemptAt.HasValue + ? message.LastAttemptAt.Value.ToString("O") : DBNull.Value); + cmd.Parameters.AddWithValue("@status", (int)message.Status); + cmd.Parameters.AddWithValue("@lastError", (object?)message.LastError ?? DBNull.Value); + + await cmd.ExecuteNonQueryAsync(); + } + + /// + /// WP-10: Removes a successfully delivered message. + /// + public async Task RemoveMessageAsync(string messageId) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", messageId); + + await cmd.ExecuteNonQueryAsync(); + } + + /// + /// WP-12: Gets all parked messages, optionally filtered by category, with pagination. + /// + public async Task<(List Messages, int TotalCount)> GetParkedMessagesAsync( + StoreAndForwardCategory? category = null, + int pageNumber = 1, + int pageSize = 50) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + // Count + await using var countCmd = connection.CreateCommand(); + countCmd.CommandText = category.HasValue + ? "SELECT COUNT(*) FROM sf_messages WHERE status = @parked AND category = @category" + : "SELECT COUNT(*) FROM sf_messages WHERE status = @parked"; + countCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); + if (category.HasValue) countCmd.Parameters.AddWithValue("@category", (int)category.Value); + var totalCount = Convert.ToInt32(await countCmd.ExecuteScalarAsync()); + + // Page + await using var pageCmd = connection.CreateCommand(); + var categoryFilter = category.HasValue ? " AND category = @category" : ""; + pageCmd.CommandText = $@" + SELECT id, category, target, payload_json, retry_count, max_retries, + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + FROM sf_messages + WHERE status = @parked{categoryFilter} + ORDER BY created_at ASC + LIMIT @limit OFFSET @offset"; + + pageCmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); + if (category.HasValue) pageCmd.Parameters.AddWithValue("@category", (int)category.Value); + pageCmd.Parameters.AddWithValue("@limit", pageSize); + pageCmd.Parameters.AddWithValue("@offset", (pageNumber - 1) * pageSize); + + var messages = await ReadMessagesAsync(pageCmd); + return (messages, totalCount); + } + + /// + /// WP-12: Moves a parked message back to pending for retry. + /// + public async Task RetryParkedMessageAsync(string messageId) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + UPDATE sf_messages + SET status = @pending, retry_count = 0, last_error = NULL + WHERE id = @id AND status = @parked"; + + cmd.Parameters.AddWithValue("@id", messageId); + cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); + cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); + + var rows = await cmd.ExecuteNonQueryAsync(); + return rows > 0; + } + + /// + /// WP-12: Permanently discards a parked message. + /// + public async Task DiscardParkedMessageAsync(string messageId) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = "DELETE FROM sf_messages WHERE id = @id AND status = @parked"; + cmd.Parameters.AddWithValue("@id", messageId); + cmd.Parameters.AddWithValue("@parked", (int)StoreAndForwardMessageStatus.Parked); + + var rows = await cmd.ExecuteNonQueryAsync(); + return rows > 0; + } + + /// + /// WP-14: Gets buffer depth by category (count of pending messages per category). + /// + public async Task> GetBufferDepthByCategoryAsync() + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + SELECT category, COUNT(*) as cnt + FROM sf_messages + WHERE status = @pending + GROUP BY category"; + cmd.Parameters.AddWithValue("@pending", (int)StoreAndForwardMessageStatus.Pending); + + var result = new Dictionary(); + await using var reader = await cmd.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + var category = (StoreAndForwardCategory)reader.GetInt32(0); + var count = reader.GetInt32(1); + result[category] = count; + } + + return result; + } + + /// + /// WP-13: Verifies messages are NOT deleted when an instance is deleted. + /// Returns the count of messages for a given origin instance. + /// + public async Task GetMessageCountByOriginInstanceAsync(string instanceName) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + SELECT COUNT(*) + FROM sf_messages + WHERE origin_instance = @origin"; + cmd.Parameters.AddWithValue("@origin", instanceName); + + return Convert.ToInt32(await cmd.ExecuteScalarAsync()); + } + + /// + /// Gets a message by ID. + /// + public async Task GetMessageByIdAsync(string messageId) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = @" + SELECT id, category, target, payload_json, retry_count, max_retries, + retry_interval_ms, created_at, last_attempt_at, status, last_error, origin_instance + FROM sf_messages + WHERE id = @id"; + cmd.Parameters.AddWithValue("@id", messageId); + + var messages = await ReadMessagesAsync(cmd); + return messages.FirstOrDefault(); + } + + /// + /// Gets total message count by status. + /// + public async Task GetMessageCountByStatusAsync(StoreAndForwardMessageStatus status) + { + await using var connection = new SqliteConnection(_connectionString); + await connection.OpenAsync(); + + await using var cmd = connection.CreateCommand(); + cmd.CommandText = "SELECT COUNT(*) FROM sf_messages WHERE status = @status"; + cmd.Parameters.AddWithValue("@status", (int)status); + + return Convert.ToInt32(await cmd.ExecuteScalarAsync()); + } + + private static async Task> ReadMessagesAsync(SqliteCommand cmd) + { + var results = new List(); + await using var reader = await cmd.ExecuteReaderAsync(); + while (await reader.ReadAsync()) + { + results.Add(new StoreAndForwardMessage + { + Id = reader.GetString(0), + Category = (StoreAndForwardCategory)reader.GetInt32(1), + Target = reader.GetString(2), + PayloadJson = reader.GetString(3), + RetryCount = reader.GetInt32(4), + MaxRetries = reader.GetInt32(5), + RetryIntervalMs = reader.GetInt64(6), + CreatedAt = DateTimeOffset.Parse(reader.GetString(7)), + LastAttemptAt = reader.IsDBNull(8) ? null : DateTimeOffset.Parse(reader.GetString(8)), + Status = (StoreAndForwardMessageStatus)reader.GetInt32(9), + LastError = reader.IsDBNull(10) ? null : reader.GetString(10), + OriginInstanceName = reader.IsDBNull(11) ? null : reader.GetString(11) + }); + } + return results; + } +} diff --git a/tests/ScadaLink.Commons.Tests/Types/EnumTests.cs b/tests/ScadaLink.Commons.Tests/Types/EnumTests.cs index a205cef..fc21ec3 100644 --- a/tests/ScadaLink.Commons.Tests/Types/EnumTests.cs +++ b/tests/ScadaLink.Commons.Tests/Types/EnumTests.cs @@ -6,7 +6,7 @@ public class EnumTests { [Theory] [InlineData(typeof(DataType), new[] { "Boolean", "Int32", "Float", "Double", "String", "DateTime", "Binary" })] - [InlineData(typeof(InstanceState), new[] { "Enabled", "Disabled" })] + [InlineData(typeof(InstanceState), new[] { "NotDeployed", "Enabled", "Disabled" })] [InlineData(typeof(DeploymentStatus), new[] { "Pending", "InProgress", "Success", "Failed" })] [InlineData(typeof(AlarmState), new[] { "Active", "Normal" })] [InlineData(typeof(AlarmTriggerType), new[] { "ValueMatch", "RangeViolation", "RateOfChange" })] diff --git a/tests/ScadaLink.DeploymentManager.Tests/ArtifactDeploymentServiceTests.cs b/tests/ScadaLink.DeploymentManager.Tests/ArtifactDeploymentServiceTests.cs new file mode 100644 index 0000000..9a7ffdf --- /dev/null +++ b/tests/ScadaLink.DeploymentManager.Tests/ArtifactDeploymentServiceTests.cs @@ -0,0 +1,85 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.Commons.Entities.Sites; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Artifacts; +using ScadaLink.Communication; + +namespace ScadaLink.DeploymentManager.Tests; + +/// +/// WP-7: Tests for system-wide artifact deployment. +/// +public class ArtifactDeploymentServiceTests +{ + private readonly ISiteRepository _siteRepo; + private readonly IDeploymentManagerRepository _deploymentRepo; + private readonly IAuditService _audit; + + public ArtifactDeploymentServiceTests() + { + _siteRepo = Substitute.For(); + _deploymentRepo = Substitute.For(); + _audit = Substitute.For(); + } + + [Fact] + public async Task DeployToAllSitesAsync_NoSites_ReturnsFailure() + { + _siteRepo.GetAllSitesAsync().Returns(new List()); + + var service = CreateService(); + var command = CreateCommand(); + + var result = await service.DeployToAllSitesAsync(command, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("No sites", result.Error); + } + + [Fact] + public void SiteArtifactResult_ContainsSiteInfo() + { + var result = new SiteArtifactResult("site1", "Site One", true, null); + Assert.Equal("site1", result.SiteId); + Assert.Equal("Site One", result.SiteName); + Assert.True(result.Success); + Assert.Null(result.ErrorMessage); + } + + [Fact] + public void ArtifactDeploymentSummary_CountsCorrectly() + { + var results = new List + { + new("s1", "Site1", true, null), + new("s2", "Site2", false, "error"), + new("s3", "Site3", true, null) + }; + var summary = new ArtifactDeploymentSummary("dep1", results, 2, 1); + + Assert.Equal(2, summary.SuccessCount); + Assert.Equal(1, summary.FailureCount); + Assert.Equal(3, summary.SiteResults.Count); + } + + private ArtifactDeploymentService CreateService() + { + var comms = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + + return new ArtifactDeploymentService( + _siteRepo, _deploymentRepo, comms, _audit, + Options.Create(new DeploymentManagerOptions()), + NullLogger.Instance); + } + + private static DeployArtifactsCommand CreateCommand() + { + return new DeployArtifactsCommand( + "dep1", null, null, null, null, DateTimeOffset.UtcNow); + } +} diff --git a/tests/ScadaLink.DeploymentManager.Tests/DeploymentComparisonTests.cs b/tests/ScadaLink.DeploymentManager.Tests/DeploymentComparisonTests.cs new file mode 100644 index 0000000..12fe49b --- /dev/null +++ b/tests/ScadaLink.DeploymentManager.Tests/DeploymentComparisonTests.cs @@ -0,0 +1,37 @@ +namespace ScadaLink.DeploymentManager.Tests; + +/// +/// WP-8: Tests for deployed vs template-derived state comparison. +/// +public class DeploymentComparisonTests +{ + [Fact] + public void DeploymentComparisonResult_MatchingHashes_NotStale() + { + var result = new DeploymentComparisonResult( + 1, "sha256:abc", "sha256:abc", false, DateTimeOffset.UtcNow); + + Assert.False(result.IsStale); + Assert.Equal("sha256:abc", result.DeployedRevisionHash); + Assert.Equal("sha256:abc", result.CurrentRevisionHash); + } + + [Fact] + public void DeploymentComparisonResult_DifferentHashes_IsStale() + { + var result = new DeploymentComparisonResult( + 1, "sha256:old", "sha256:new", true, DateTimeOffset.UtcNow); + + Assert.True(result.IsStale); + Assert.NotEqual(result.DeployedRevisionHash, result.CurrentRevisionHash); + } + + [Fact] + public void DeploymentComparisonResult_ContainsDeployedTimestamp() + { + var deployedAt = new DateTimeOffset(2026, 3, 16, 12, 0, 0, TimeSpan.Zero); + var result = new DeploymentComparisonResult(1, "h1", "h2", true, deployedAt); + + Assert.Equal(deployedAt, result.DeployedAt); + } +} diff --git a/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs b/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs new file mode 100644 index 0000000..dd7f5be --- /dev/null +++ b/tests/ScadaLink.DeploymentManager.Tests/DeploymentServiceTests.cs @@ -0,0 +1,290 @@ +using Microsoft.Extensions.Logging.Abstractions; +using Microsoft.Extensions.Options; +using NSubstitute; +using ScadaLink.Commons.Entities.Deployment; +using ScadaLink.Commons.Entities.Instances; +using ScadaLink.Commons.Interfaces.Repositories; +using ScadaLink.Commons.Interfaces.Services; +using ScadaLink.Commons.Messages.Deployment; +using ScadaLink.Commons.Messages.Lifecycle; +using ScadaLink.Commons.Types; +using ScadaLink.Commons.Types.Enums; +using ScadaLink.Commons.Types.Flattening; +using ScadaLink.Communication; + +namespace ScadaLink.DeploymentManager.Tests; + +/// +/// WP-1/2/4/5/6/8/16: Tests for central-side DeploymentService. +/// +public class DeploymentServiceTests +{ + private readonly IDeploymentManagerRepository _repo; + private readonly IFlatteningPipeline _pipeline; + private readonly CommunicationService _comms; + private readonly OperationLockManager _lockManager; + private readonly IAuditService _audit; + private readonly DeploymentService _service; + + public DeploymentServiceTests() + { + _repo = Substitute.For(); + _pipeline = Substitute.For(); + _comms = new CommunicationService( + Options.Create(new CommunicationOptions()), + NullLogger.Instance); + _lockManager = new OperationLockManager(); + _audit = Substitute.For(); + + var options = Options.Create(new DeploymentManagerOptions + { + OperationLockTimeout = TimeSpan.FromSeconds(5) + }); + + _service = new DeploymentService( + _repo, _pipeline, _comms, _lockManager, _audit, options, + NullLogger.Instance); + } + + // ── WP-1: Deployment flow ── + + [Fact] + public async Task DeployInstanceAsync_InstanceNotFound_ReturnsFailure() + { + _repo.GetInstanceByIdAsync(1).Returns((Instance?)null); + + var result = await _service.DeployInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("not found", result.Error); + } + + [Fact] + public async Task DeployInstanceAsync_ValidationFails_ReturnsFailure() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + var validationResult = new ValidationResult + { + Errors = [ValidationEntry.Error(ValidationCategory.ScriptCompilation, "Compile error")] + }; + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(new FlattenedConfiguration(), "hash1", validationResult))); + + var result = await _service.DeployInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("validation failed", result.Error); + } + + [Fact] + public async Task DeployInstanceAsync_FlatteningFails_ReturnsFailure() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Failure("Template chain empty")); + + var result = await _service.DeployInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("Validation failed", result.Error); + } + + // ── WP-2: Deployment identity ── + + [Fact] + public async Task DeployInstanceAsync_CreatesUniqueDeploymentId() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + // Pipeline succeeds + var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" }; + var validResult = ValidationResult.Success(); + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(config, "sha256:abc", validResult))); + + // Capture the deployment record + DeploymentRecord? captured = null; + await _repo.AddDeploymentRecordAsync(Arg.Do(r => captured = r), Arg.Any()); + + // CommunicationService will throw because actor not set -- this tests the flow up to that point + try + { + await _service.DeployInstanceAsync(1, "admin"); + } + catch (InvalidOperationException) + { + // Expected -- CommunicationService not initialized + } + + Assert.NotNull(captured); + Assert.False(string.IsNullOrEmpty(captured!.DeploymentId)); + Assert.Equal(32, captured.DeploymentId.Length); // GUID without hyphens + Assert.Equal("sha256:abc", captured.RevisionHash); + } + + // ── WP-4: State transition validation ── + + [Fact] + public async Task DeployInstanceAsync_EnabledInstance_AllowsDeploy() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" }; + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(config, "hash", ValidationResult.Success()))); + + // Will fail at communication layer, but passes state validation + try { await _service.DeployInstanceAsync(1, "admin"); } catch (InvalidOperationException) { } + + // If we got past state validation, the deployment record was created + await _repo.Received().AddDeploymentRecordAsync(Arg.Any(), Arg.Any()); + } + + // ── WP-6: Lifecycle commands ── + + [Fact] + public async Task DisableInstanceAsync_InstanceNotFound_ReturnsFailure() + { + _repo.GetInstanceByIdAsync(1).Returns((Instance?)null); + + var result = await _service.DisableInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("not found", result.Error); + } + + [Fact] + public async Task DisableInstanceAsync_WhenDisabled_ReturnsTransitionError() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Disabled }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + var result = await _service.DisableInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("not allowed", result.Error); + } + + [Fact] + public async Task EnableInstanceAsync_WhenEnabled_ReturnsTransitionError() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.Enabled }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + var result = await _service.EnableInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("not allowed", result.Error); + } + + [Fact] + public async Task DeleteInstanceAsync_WhenNotDeployed_ReturnsTransitionError() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + var result = await _service.DeleteInstanceAsync(1, "admin"); + + Assert.True(result.IsFailure); + Assert.Contains("not allowed", result.Error); + } + + // ── WP-8: Deployment comparison ── + + [Fact] + public async Task GetDeploymentComparisonAsync_NoSnapshot_ReturnsFailure() + { + _repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns((DeployedConfigSnapshot?)null); + + var result = await _service.GetDeploymentComparisonAsync(1); + + Assert.True(result.IsFailure); + Assert.Contains("No deployed snapshot", result.Error); + } + + [Fact] + public async Task GetDeploymentComparisonAsync_SameHash_NotStale() + { + var snapshot = new DeployedConfigSnapshot("dep1", "sha256:abc", "{}") + { + InstanceId = 1, + DeployedAt = DateTimeOffset.UtcNow + }; + _repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns(snapshot); + + var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" }; + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(config, "sha256:abc", ValidationResult.Success()))); + + var result = await _service.GetDeploymentComparisonAsync(1); + + Assert.True(result.IsSuccess); + Assert.False(result.Value.IsStale); + } + + [Fact] + public async Task GetDeploymentComparisonAsync_DifferentHash_IsStale() + { + var snapshot = new DeployedConfigSnapshot("dep1", "sha256:abc", "{}") + { + InstanceId = 1, + DeployedAt = DateTimeOffset.UtcNow + }; + _repo.GetDeployedSnapshotByInstanceIdAsync(1).Returns(snapshot); + + var config = new FlattenedConfiguration { InstanceUniqueName = "TestInst" }; + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Success( + new FlatteningPipelineResult(config, "sha256:xyz", ValidationResult.Success()))); + + var result = await _service.GetDeploymentComparisonAsync(1); + + Assert.True(result.IsSuccess); + Assert.True(result.Value.IsStale); + } + + // ── WP-2: GetDeploymentStatusAsync ── + + [Fact] + public async Task GetDeploymentStatusAsync_ReturnsRecordByDeploymentId() + { + var record = new DeploymentRecord("dep1", "admin") + { + Status = DeploymentStatus.Success + }; + _repo.GetDeploymentByDeploymentIdAsync("dep1").Returns(record); + + var result = await _service.GetDeploymentStatusAsync("dep1"); + + Assert.NotNull(result); + Assert.Equal("dep1", result!.DeploymentId); + Assert.Equal(DeploymentStatus.Success, result.Status); + } + + // ── Audit logging ── + + [Fact] + public async Task DeployInstanceAsync_AuditLogs() + { + var instance = new Instance("TestInst") { Id = 1, SiteId = 1, State = InstanceState.NotDeployed }; + _repo.GetInstanceByIdAsync(1).Returns(instance); + + _pipeline.FlattenAndValidateAsync(1, Arg.Any()) + .Returns(Result.Failure("Error")); + + await _service.DeployInstanceAsync(1, "admin"); + + // Failure case does not reach audit (returns before communication) + // The audit is only logged after communication succeeds/fails + } +} diff --git a/tests/ScadaLink.DeploymentManager.Tests/OperationLockManagerTests.cs b/tests/ScadaLink.DeploymentManager.Tests/OperationLockManagerTests.cs new file mode 100644 index 0000000..8dea4d7 --- /dev/null +++ b/tests/ScadaLink.DeploymentManager.Tests/OperationLockManagerTests.cs @@ -0,0 +1,95 @@ +namespace ScadaLink.DeploymentManager.Tests; + +/// +/// WP-3: Tests for per-instance operation lock. +/// +public class OperationLockManagerTests +{ + private readonly OperationLockManager _lockManager = new(); + + [Fact] + public async Task AcquireAsync_ReturnsDisposable() + { + using var lockHandle = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + Assert.NotNull(lockHandle); + } + + [Fact] + public async Task AcquireAsync_SameInstance_BlocksSecondCaller() + { + using var firstLock = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + + // Second acquire should time out + await Assert.ThrowsAsync(() => + _lockManager.AcquireAsync("inst1", TimeSpan.FromMilliseconds(50))); + } + + [Fact] + public async Task AcquireAsync_DifferentInstances_BothSucceed() + { + using var lock1 = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + using var lock2 = await _lockManager.AcquireAsync("inst2", TimeSpan.FromSeconds(5)); + + Assert.NotNull(lock1); + Assert.NotNull(lock2); + } + + [Fact] + public async Task AcquireAsync_AfterRelease_CanReacquire() + { + var firstLock = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + firstLock.Dispose(); + + // Should succeed now + using var secondLock = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + Assert.NotNull(secondLock); + } + + [Fact] + public async Task IsLocked_ReturnsTrueWhileLocked() + { + Assert.False(_lockManager.IsLocked("inst1")); + + using var lockHandle = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + Assert.True(_lockManager.IsLocked("inst1")); + } + + [Fact] + public async Task IsLocked_ReturnsFalseAfterRelease() + { + var lockHandle = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + lockHandle.Dispose(); + + Assert.False(_lockManager.IsLocked("inst1")); + } + + [Fact] + public async Task AcquireAsync_DoubleDispose_DoesNotThrow() + { + var lockHandle = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(5)); + lockHandle.Dispose(); + lockHandle.Dispose(); // Should not throw + } + + [Fact] + public async Task AcquireAsync_CancellationToken_Respected() + { + using var firstLock = await _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(30)); + + using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50)); + await Assert.ThrowsAnyAsync(() => + _lockManager.AcquireAsync("inst1", TimeSpan.FromSeconds(30), cts.Token)); + } + + [Fact] + public async Task AcquireAsync_ConcurrentDifferentInstances_AllSucceed() + { + var tasks = Enumerable.Range(0, 10).Select(async i => + { + using var lockHandle = await _lockManager.AcquireAsync($"inst{i}", TimeSpan.FromSeconds(5)); + await Task.Delay(10); + }); + + await Task.WhenAll(tasks); + } +} diff --git a/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj b/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj index 3bfa727..b8aafbe 100644 --- a/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj +++ b/tests/ScadaLink.DeploymentManager.Tests/ScadaLink.DeploymentManager.Tests.csproj @@ -1,4 +1,4 @@ - + net10.0 @@ -11,6 +11,7 @@ + @@ -21,6 +22,9 @@ + + + - \ No newline at end of file + diff --git a/tests/ScadaLink.DeploymentManager.Tests/StateTransitionValidatorTests.cs b/tests/ScadaLink.DeploymentManager.Tests/StateTransitionValidatorTests.cs new file mode 100644 index 0000000..374ebbb --- /dev/null +++ b/tests/ScadaLink.DeploymentManager.Tests/StateTransitionValidatorTests.cs @@ -0,0 +1,118 @@ +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.DeploymentManager.Tests; + +/// +/// WP-4: Tests for instance state transition matrix. +/// +public class StateTransitionValidatorTests +{ + // ── Deploy transitions ── + + [Theory] + [InlineData(InstanceState.NotDeployed)] + [InlineData(InstanceState.Enabled)] + [InlineData(InstanceState.Disabled)] + public void CanDeploy_AllStates_ReturnsTrue(InstanceState state) + { + Assert.True(StateTransitionValidator.CanDeploy(state)); + } + + // ── Disable transitions ── + + [Fact] + public void CanDisable_WhenEnabled_ReturnsTrue() + { + Assert.True(StateTransitionValidator.CanDisable(InstanceState.Enabled)); + } + + [Fact] + public void CanDisable_WhenDisabled_ReturnsFalse() + { + Assert.False(StateTransitionValidator.CanDisable(InstanceState.Disabled)); + } + + [Fact] + public void CanDisable_WhenNotDeployed_ReturnsFalse() + { + Assert.False(StateTransitionValidator.CanDisable(InstanceState.NotDeployed)); + } + + // ── Enable transitions ── + + [Fact] + public void CanEnable_WhenDisabled_ReturnsTrue() + { + Assert.True(StateTransitionValidator.CanEnable(InstanceState.Disabled)); + } + + [Fact] + public void CanEnable_WhenEnabled_ReturnsFalse() + { + Assert.False(StateTransitionValidator.CanEnable(InstanceState.Enabled)); + } + + [Fact] + public void CanEnable_WhenNotDeployed_ReturnsFalse() + { + Assert.False(StateTransitionValidator.CanEnable(InstanceState.NotDeployed)); + } + + // ── Delete transitions ── + + [Fact] + public void CanDelete_WhenEnabled_ReturnsTrue() + { + Assert.True(StateTransitionValidator.CanDelete(InstanceState.Enabled)); + } + + [Fact] + public void CanDelete_WhenDisabled_ReturnsTrue() + { + Assert.True(StateTransitionValidator.CanDelete(InstanceState.Disabled)); + } + + [Fact] + public void CanDelete_WhenNotDeployed_ReturnsFalse() + { + Assert.False(StateTransitionValidator.CanDelete(InstanceState.NotDeployed)); + } + + // ── ValidateTransition ── + + [Fact] + public void ValidateTransition_ValidDeploy_ReturnsNull() + { + var error = StateTransitionValidator.ValidateTransition(InstanceState.NotDeployed, "deploy"); + Assert.Null(error); + } + + [Fact] + public void ValidateTransition_InvalidEnable_ReturnsError() + { + var error = StateTransitionValidator.ValidateTransition(InstanceState.Enabled, "enable"); + Assert.NotNull(error); + Assert.Contains("not allowed", error); + } + + [Fact] + public void ValidateTransition_InvalidDisable_ReturnsError() + { + var error = StateTransitionValidator.ValidateTransition(InstanceState.Disabled, "disable"); + Assert.NotNull(error); + } + + [Fact] + public void ValidateTransition_InvalidDeleteOnNotDeployed_ReturnsError() + { + var error = StateTransitionValidator.ValidateTransition(InstanceState.NotDeployed, "delete"); + Assert.NotNull(error); + } + + [Fact] + public void ValidateTransition_UnknownOperation_ReturnsError() + { + var error = StateTransitionValidator.ValidateTransition(InstanceState.Enabled, "unknown"); + Assert.NotNull(error); + } +} diff --git a/tests/ScadaLink.DeploymentManager.Tests/UnitTest1.cs b/tests/ScadaLink.DeploymentManager.Tests/UnitTest1.cs deleted file mode 100644 index ab47523..0000000 --- a/tests/ScadaLink.DeploymentManager.Tests/UnitTest1.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace ScadaLink.DeploymentManager.Tests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - - } -} diff --git a/tests/ScadaLink.StoreAndForward.Tests/ReplicationServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/ReplicationServiceTests.cs new file mode 100644 index 0000000..bd5d9ea --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/ReplicationServiceTests.cs @@ -0,0 +1,181 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// WP-11: Tests for async replication to standby. +/// +public class ReplicationServiceTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly ReplicationService _replicationService; + + public ReplicationServiceTests() + { + var dbName = $"RepTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + var options = new StoreAndForwardOptions { ReplicationEnabled = true }; + _replicationService = new ReplicationService( + options, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + public void Dispose() => _keepAlive.Dispose(); + + [Fact] + public void ReplicateEnqueue_NoHandler_DoesNotThrow() + { + var msg = CreateMessage("rep1"); + _replicationService.ReplicateEnqueue(msg); + } + + [Fact] + public async Task ReplicateEnqueue_WithHandler_ForwardsOperation() + { + ReplicationOperation? captured = null; + _replicationService.SetReplicationHandler(op => + { + captured = op; + return Task.CompletedTask; + }); + + var msg = CreateMessage("rep2"); + _replicationService.ReplicateEnqueue(msg); + + await Task.Delay(200); + + Assert.NotNull(captured); + Assert.Equal(ReplicationOperationType.Add, captured!.OperationType); + Assert.Equal("rep2", captured.MessageId); + } + + [Fact] + public async Task ReplicateRemove_WithHandler_ForwardsRemoveOperation() + { + ReplicationOperation? captured = null; + _replicationService.SetReplicationHandler(op => + { + captured = op; + return Task.CompletedTask; + }); + + _replicationService.ReplicateRemove("rep3"); + + await Task.Delay(200); + + Assert.NotNull(captured); + Assert.Equal(ReplicationOperationType.Remove, captured!.OperationType); + Assert.Equal("rep3", captured.MessageId); + } + + [Fact] + public async Task ReplicatePark_WithHandler_ForwardsParkOperation() + { + ReplicationOperation? captured = null; + _replicationService.SetReplicationHandler(op => + { + captured = op; + return Task.CompletedTask; + }); + + var msg = CreateMessage("rep4"); + _replicationService.ReplicatePark(msg); + + await Task.Delay(200); + + Assert.NotNull(captured); + Assert.Equal(ReplicationOperationType.Park, captured!.OperationType); + } + + [Fact] + public async Task ApplyReplicatedOperationAsync_Add_EnqueuesMessage() + { + var msg = CreateMessage("apply1"); + var operation = new ReplicationOperation(ReplicationOperationType.Add, "apply1", msg); + + await _replicationService.ApplyReplicatedOperationAsync(operation, _storage); + + var retrieved = await _storage.GetMessageByIdAsync("apply1"); + Assert.NotNull(retrieved); + } + + [Fact] + public async Task ApplyReplicatedOperationAsync_Remove_DeletesMessage() + { + var msg = CreateMessage("apply2"); + await _storage.EnqueueAsync(msg); + + var operation = new ReplicationOperation(ReplicationOperationType.Remove, "apply2", null); + await _replicationService.ApplyReplicatedOperationAsync(operation, _storage); + + var retrieved = await _storage.GetMessageByIdAsync("apply2"); + Assert.Null(retrieved); + } + + [Fact] + public async Task ApplyReplicatedOperationAsync_Park_UpdatesStatus() + { + var msg = CreateMessage("apply3"); + await _storage.EnqueueAsync(msg); + + var operation = new ReplicationOperation(ReplicationOperationType.Park, "apply3", msg); + await _replicationService.ApplyReplicatedOperationAsync(operation, _storage); + + var retrieved = await _storage.GetMessageByIdAsync("apply3"); + Assert.NotNull(retrieved); + Assert.Equal(StoreAndForwardMessageStatus.Parked, retrieved!.Status); + } + + [Fact] + public void ReplicateEnqueue_WhenReplicationDisabled_DoesNothing() + { + var options = new StoreAndForwardOptions { ReplicationEnabled = false }; + var service = new ReplicationService(options, NullLogger.Instance); + + bool handlerCalled = false; + service.SetReplicationHandler(_ => { handlerCalled = true; return Task.CompletedTask; }); + + service.ReplicateEnqueue(CreateMessage("disabled1")); + + Assert.False(handlerCalled); + } + + [Fact] + public async Task ReplicateEnqueue_HandlerThrows_DoesNotPropagateException() + { + _replicationService.SetReplicationHandler(_ => + throw new InvalidOperationException("standby down")); + + _replicationService.ReplicateEnqueue(CreateMessage("err1")); + + await Task.Delay(200); + // No exception -- fire-and-forget, best-effort + } + + private static StoreAndForwardMessage CreateMessage(string id) + { + return new StoreAndForwardMessage + { + Id = id, + Category = StoreAndForwardCategory.ExternalSystem, + Target = "target", + PayloadJson = "{}", + RetryCount = 0, + MaxRetries = 50, + RetryIntervalMs = 30000, + CreatedAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Pending + }; + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj b/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj index c12773e..a154b6f 100644 --- a/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj +++ b/tests/ScadaLink.StoreAndForward.Tests/ScadaLink.StoreAndForward.Tests.csproj @@ -1,4 +1,4 @@ - + net10.0 @@ -10,6 +10,7 @@ + @@ -21,6 +22,7 @@ + - \ No newline at end of file + diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardOptionsTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardOptionsTests.cs new file mode 100644 index 0000000..a94599d --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardOptionsTests.cs @@ -0,0 +1,37 @@ +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// WP-9: Tests for StoreAndForwardOptions defaults and configuration. +/// +public class StoreAndForwardOptionsTests +{ + [Fact] + public void DefaultOptions_HasReasonableDefaults() + { + var options = new StoreAndForwardOptions(); + + Assert.Equal("./data/store-and-forward.db", options.SqliteDbPath); + Assert.True(options.ReplicationEnabled); + Assert.Equal(TimeSpan.FromSeconds(30), options.DefaultRetryInterval); + Assert.Equal(50, options.DefaultMaxRetries); + Assert.Equal(TimeSpan.FromSeconds(10), options.RetryTimerInterval); + } + + [Fact] + public void Options_CanBeCustomized() + { + var options = new StoreAndForwardOptions + { + SqliteDbPath = "/custom/path.db", + ReplicationEnabled = false, + DefaultRetryInterval = TimeSpan.FromMinutes(5), + DefaultMaxRetries = 100, + RetryTimerInterval = TimeSpan.FromSeconds(30) + }; + + Assert.Equal("/custom/path.db", options.SqliteDbPath); + Assert.False(options.ReplicationEnabled); + Assert.Equal(TimeSpan.FromMinutes(5), options.DefaultRetryInterval); + Assert.Equal(100, options.DefaultMaxRetries); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs new file mode 100644 index 0000000..40df445 --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardServiceTests.cs @@ -0,0 +1,313 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// WP-10/12/13/14: Tests for the StoreAndForwardService retry engine and management. +/// +public class StoreAndForwardServiceTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly StoreAndForwardService _service; + private readonly StoreAndForwardOptions _options; + + public StoreAndForwardServiceTests() + { + var dbName = $"SvcTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={dbName};Mode=Memory;Cache=Shared"; + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + + _options = new StoreAndForwardOptions + { + DefaultRetryInterval = TimeSpan.Zero, + DefaultMaxRetries = 3, + RetryTimerInterval = TimeSpan.FromMinutes(10) + }; + + _service = new StoreAndForwardService( + _storage, _options, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + public void Dispose() => _keepAlive.Dispose(); + + // ── WP-10: Immediate delivery ── + + [Fact] + public async Task EnqueueAsync_ImmediateDeliverySuccess_ReturnsAcceptedNotBuffered() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api.example.com", + """{"method":"Test"}""", "Pump1"); + + Assert.True(result.Accepted); + Assert.False(result.WasBuffered); + } + + [Fact] + public async Task EnqueueAsync_PermanentFailure_ReturnsNotAccepted() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(false)); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api.example.com", + """{"method":"Test"}"""); + + Assert.False(result.Accepted); + Assert.False(result.WasBuffered); + } + + [Fact] + public async Task EnqueueAsync_TransientFailure_BuffersForRetry() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("Connection refused")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api.example.com", + """{"method":"Test"}""", "Pump1"); + + Assert.True(result.Accepted); + Assert.True(result.WasBuffered); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); + Assert.Equal(1, msg.RetryCount); + } + + [Fact] + public async Task EnqueueAsync_NoHandler_BuffersForLater() + { + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.Notification, "alerts@company.com", + """{"subject":"Alert"}"""); + + Assert.True(result.Accepted); + Assert.True(result.WasBuffered); + } + + // ── WP-10: Retry engine ── + + [Fact] + public async Task RetryPendingMessagesAsync_SuccessfulRetry_RemovesMessage() + { + int callCount = 0; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => + { + callCount++; + if (callCount == 1) throw new HttpRequestException("fail"); + return Task.FromResult(true); + }); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + Assert.True(result.WasBuffered); + + await _service.RetryPendingMessagesAsync(); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Null(msg); + } + + [Fact] + public async Task RetryPendingMessagesAsync_MaxRetriesReached_ParksMessage() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("always fails")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + maxRetries: 2); + + await _service.RetryPendingMessagesAsync(); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); + } + + [Fact] + public async Task RetryPendingMessagesAsync_PermanentFailureOnRetry_ParksMessage() + { + int callCount = 0; + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => + { + callCount++; + if (callCount == 1) throw new HttpRequestException("transient"); + return Task.FromResult(false); + }); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + + await _service.RetryPendingMessagesAsync(); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.NotNull(msg); + Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); + } + + // ── WP-12: Parked message management ── + + [Fact] + public async Task RetryParkedMessageAsync_MovesBackToQueue() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + maxRetries: 1); + + await _service.RetryPendingMessagesAsync(); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Equal(StoreAndForwardMessageStatus.Parked, msg!.Status); + + var retried = await _service.RetryParkedMessageAsync(result.MessageId); + Assert.True(retried); + + msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Equal(StoreAndForwardMessageStatus.Pending, msg!.Status); + Assert.Equal(0, msg.RetryCount); + } + + [Fact] + public async Task DiscardParkedMessageAsync_PermanentlyRemoves() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + maxRetries: 1); + + await _service.RetryPendingMessagesAsync(); + + var discarded = await _service.DiscardParkedMessageAsync(result.MessageId); + Assert.True(discarded); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Null(msg); + } + + [Fact] + public async Task GetParkedMessagesAsync_ReturnsPaginatedResults() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + for (int i = 0; i < 3; i++) + { + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, $"api{i}", """{}""", + maxRetries: 1); + } + + await _service.RetryPendingMessagesAsync(); + + var (messages, total) = await _service.GetParkedMessagesAsync( + StoreAndForwardCategory.ExternalSystem, 1, 2); + + Assert.Equal(2, messages.Count); + Assert.True(total >= 3); + } + + // ── WP-13: Messages survive instance deletion ── + + [Fact] + public async Task MessagesForInstance_SurviveAfterDeletion() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", "Pump1"); + await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api2", """{}""", "Pump1"); + + var count = await _service.GetMessageCountForInstanceAsync("Pump1"); + Assert.Equal(2, count); + } + + // ── WP-14: Health metrics ── + + [Fact] + public async Task GetBufferDepthAsync_ReturnsCorrectDepth() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + _service.RegisterDeliveryHandler(StoreAndForwardCategory.Notification, + _ => throw new HttpRequestException("fail")); + + await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api1", """{}"""); + await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api2", """{}"""); + await _service.EnqueueAsync(StoreAndForwardCategory.Notification, "email", """{}"""); + + var depth = await _service.GetBufferDepthAsync(); + Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2); + Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.Notification) >= 1); + } + + [Fact] + public async Task OnActivity_RaisedOnEnqueue() + { + var activities = new List(); + _service.OnActivity += (action, _, _) => activities.Add(action); + + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => Task.FromResult(true)); + + await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + + Assert.Contains("Delivered", activities); + } + + [Fact] + public async Task OnActivity_RaisedOnBuffer() + { + var activities = new List(); + _service.OnActivity += (action, _, _) => activities.Add(action); + + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + await _service.EnqueueAsync(StoreAndForwardCategory.ExternalSystem, "api", """{}"""); + + Assert.Contains("Queued", activities); + } + + // ── WP-10: Per-source-entity retry settings ── + + [Fact] + public async Task EnqueueAsync_CustomRetrySettings_Respected() + { + _service.RegisterDeliveryHandler(StoreAndForwardCategory.ExternalSystem, + _ => throw new HttpRequestException("fail")); + + var result = await _service.EnqueueAsync( + StoreAndForwardCategory.ExternalSystem, "api", """{}""", + maxRetries: 100, + retryInterval: TimeSpan.FromSeconds(60)); + + var msg = await _storage.GetMessageByIdAsync(result.MessageId); + Assert.Equal(100, msg!.MaxRetries); + Assert.Equal(60000, msg.RetryIntervalMs); + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs new file mode 100644 index 0000000..e30c25e --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/StoreAndForwardStorageTests.cs @@ -0,0 +1,224 @@ +using Microsoft.Data.Sqlite; +using Microsoft.Extensions.Logging.Abstractions; +using ScadaLink.Commons.Types.Enums; + +namespace ScadaLink.StoreAndForward.Tests; + +/// +/// WP-9: Tests for SQLite persistence layer. +/// Uses in-memory SQLite with a kept-alive connection for test isolation. +/// +public class StoreAndForwardStorageTests : IAsyncLifetime, IDisposable +{ + private readonly SqliteConnection _keepAlive; + private readonly StoreAndForwardStorage _storage; + private readonly string _dbName; + + public StoreAndForwardStorageTests() + { + _dbName = $"StorageTests_{Guid.NewGuid():N}"; + var connStr = $"Data Source={_dbName};Mode=Memory;Cache=Shared"; + // Keep one connection alive so the in-memory DB persists + _keepAlive = new SqliteConnection(connStr); + _keepAlive.Open(); + _storage = new StoreAndForwardStorage(connStr, NullLogger.Instance); + } + + public async Task InitializeAsync() => await _storage.InitializeAsync(); + + public Task DisposeAsync() => Task.CompletedTask; + + public void Dispose() + { + _keepAlive.Dispose(); + } + + [Fact] + public async Task EnqueueAsync_StoresMessage() + { + var message = CreateMessage("msg1", StoreAndForwardCategory.ExternalSystem); + await _storage.EnqueueAsync(message); + + var retrieved = await _storage.GetMessageByIdAsync("msg1"); + Assert.NotNull(retrieved); + Assert.Equal("msg1", retrieved!.Id); + Assert.Equal(StoreAndForwardCategory.ExternalSystem, retrieved.Category); + Assert.Equal("target1", retrieved.Target); + } + + [Fact] + public async Task EnqueueAsync_AllCategories() + { + await _storage.EnqueueAsync(CreateMessage("es1", StoreAndForwardCategory.ExternalSystem)); + await _storage.EnqueueAsync(CreateMessage("n1", StoreAndForwardCategory.Notification)); + await _storage.EnqueueAsync(CreateMessage("db1", StoreAndForwardCategory.CachedDbWrite)); + + var es = await _storage.GetMessageByIdAsync("es1"); + var n = await _storage.GetMessageByIdAsync("n1"); + var db = await _storage.GetMessageByIdAsync("db1"); + + Assert.Equal(StoreAndForwardCategory.ExternalSystem, es!.Category); + Assert.Equal(StoreAndForwardCategory.Notification, n!.Category); + Assert.Equal(StoreAndForwardCategory.CachedDbWrite, db!.Category); + } + + [Fact] + public async Task RemoveMessageAsync_RemovesSuccessfully() + { + await _storage.EnqueueAsync(CreateMessage("rm1", StoreAndForwardCategory.ExternalSystem)); + await _storage.RemoveMessageAsync("rm1"); + + var retrieved = await _storage.GetMessageByIdAsync("rm1"); + Assert.Null(retrieved); + } + + [Fact] + public async Task UpdateMessageAsync_UpdatesFields() + { + var message = CreateMessage("upd1", StoreAndForwardCategory.ExternalSystem); + await _storage.EnqueueAsync(message); + + message.RetryCount = 5; + message.LastAttemptAt = DateTimeOffset.UtcNow; + message.Status = StoreAndForwardMessageStatus.Parked; + message.LastError = "Connection refused"; + await _storage.UpdateMessageAsync(message); + + var retrieved = await _storage.GetMessageByIdAsync("upd1"); + Assert.Equal(5, retrieved!.RetryCount); + Assert.Equal(StoreAndForwardMessageStatus.Parked, retrieved.Status); + Assert.Equal("Connection refused", retrieved.LastError); + } + + [Fact] + public async Task GetMessagesForRetryAsync_ReturnsOnlyPendingMessages() + { + var pending = CreateMessage("pend1", StoreAndForwardCategory.ExternalSystem); + pending.Status = StoreAndForwardMessageStatus.Pending; + await _storage.EnqueueAsync(pending); + + var parked = CreateMessage("park1", StoreAndForwardCategory.ExternalSystem); + parked.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(parked); + await _storage.UpdateMessageAsync(parked); + + var forRetry = await _storage.GetMessagesForRetryAsync(); + Assert.All(forRetry, m => Assert.Equal(StoreAndForwardMessageStatus.Pending, m.Status)); + } + + [Fact] + public async Task GetParkedMessagesAsync_ReturnsParkedOnly() + { + var msg = CreateMessage("prk1", StoreAndForwardCategory.Notification); + msg.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(msg); + await _storage.UpdateMessageAsync(msg); + + var (messages, total) = await _storage.GetParkedMessagesAsync(); + Assert.True(total > 0); + Assert.All(messages, m => Assert.Equal(StoreAndForwardMessageStatus.Parked, m.Status)); + } + + [Fact] + public async Task RetryParkedMessageAsync_MovesToPending() + { + var msg = CreateMessage("retry1", StoreAndForwardCategory.ExternalSystem); + msg.Status = StoreAndForwardMessageStatus.Parked; + msg.RetryCount = 10; + await _storage.EnqueueAsync(msg); + await _storage.UpdateMessageAsync(msg); + + var success = await _storage.RetryParkedMessageAsync("retry1"); + Assert.True(success); + + var retrieved = await _storage.GetMessageByIdAsync("retry1"); + Assert.Equal(StoreAndForwardMessageStatus.Pending, retrieved!.Status); + Assert.Equal(0, retrieved.RetryCount); + } + + [Fact] + public async Task DiscardParkedMessageAsync_RemovesMessage() + { + var msg = CreateMessage("disc1", StoreAndForwardCategory.ExternalSystem); + msg.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(msg); + await _storage.UpdateMessageAsync(msg); + + var success = await _storage.DiscardParkedMessageAsync("disc1"); + Assert.True(success); + + var retrieved = await _storage.GetMessageByIdAsync("disc1"); + Assert.Null(retrieved); + } + + [Fact] + public async Task GetBufferDepthByCategoryAsync_ReturnsCorrectCounts() + { + await _storage.EnqueueAsync(CreateMessage("bd1", StoreAndForwardCategory.ExternalSystem)); + await _storage.EnqueueAsync(CreateMessage("bd2", StoreAndForwardCategory.ExternalSystem)); + await _storage.EnqueueAsync(CreateMessage("bd3", StoreAndForwardCategory.Notification)); + + var depth = await _storage.GetBufferDepthByCategoryAsync(); + Assert.True(depth.GetValueOrDefault(StoreAndForwardCategory.ExternalSystem) >= 2); + } + + [Fact] + public async Task GetMessageCountByOriginInstanceAsync_ReturnsCount() + { + var msg1 = CreateMessage("oi1", StoreAndForwardCategory.ExternalSystem); + msg1.OriginInstanceName = "Pump1"; + await _storage.EnqueueAsync(msg1); + + var msg2 = CreateMessage("oi2", StoreAndForwardCategory.Notification); + msg2.OriginInstanceName = "Pump1"; + await _storage.EnqueueAsync(msg2); + + var count = await _storage.GetMessageCountByOriginInstanceAsync("Pump1"); + Assert.Equal(2, count); + } + + [Fact] + public async Task GetParkedMessagesAsync_Pagination() + { + for (int i = 0; i < 5; i++) + { + var msg = CreateMessage($"page{i}", StoreAndForwardCategory.ExternalSystem); + msg.Status = StoreAndForwardMessageStatus.Parked; + await _storage.EnqueueAsync(msg); + await _storage.UpdateMessageAsync(msg); + } + + var (page1, total) = await _storage.GetParkedMessagesAsync(pageNumber: 1, pageSize: 2); + Assert.Equal(2, page1.Count); + Assert.True(total >= 5); + + var (page2, _) = await _storage.GetParkedMessagesAsync(pageNumber: 2, pageSize: 2); + Assert.Equal(2, page2.Count); + } + + [Fact] + public async Task GetMessageCountByStatusAsync_ReturnsAccurateCount() + { + var msg = CreateMessage("cnt1", StoreAndForwardCategory.ExternalSystem); + await _storage.EnqueueAsync(msg); + + var count = await _storage.GetMessageCountByStatusAsync(StoreAndForwardMessageStatus.Pending); + Assert.True(count >= 1); + } + + private static StoreAndForwardMessage CreateMessage(string id, StoreAndForwardCategory category) + { + return new StoreAndForwardMessage + { + Id = id, + Category = category, + Target = "target1", + PayloadJson = """{"method":"Test","args":{}}""", + RetryCount = 0, + MaxRetries = 50, + RetryIntervalMs = 30000, + CreatedAt = DateTimeOffset.UtcNow, + Status = StoreAndForwardMessageStatus.Pending + }; + } +} diff --git a/tests/ScadaLink.StoreAndForward.Tests/UnitTest1.cs b/tests/ScadaLink.StoreAndForward.Tests/UnitTest1.cs deleted file mode 100644 index b51a38d..0000000 --- a/tests/ScadaLink.StoreAndForward.Tests/UnitTest1.cs +++ /dev/null @@ -1,10 +0,0 @@ -namespace ScadaLink.StoreAndForward.Tests; - -public class UnitTest1 -{ - [Fact] - public void Test1() - { - - } -} diff --git a/tests/ScadaLink.StoreAndForward.Tests/xunit.runner.json b/tests/ScadaLink.StoreAndForward.Tests/xunit.runner.json new file mode 100644 index 0000000..08c512b --- /dev/null +++ b/tests/ScadaLink.StoreAndForward.Tests/xunit.runner.json @@ -0,0 +1,4 @@ +{ + "$schema": "https://xunit.net/schema/current/xunit.runner.schema.json", + "parallelizeTestCollections": false +}