Compare commits
10 Commits
phase-6-4-
...
phase-6-1-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
ae8f226e45 | ||
| e032045247 | |||
|
|
ad131932d3 | ||
| 98b69ff4f9 | |||
|
|
016122841b | ||
| 244a36e03e | |||
|
|
4de94fab0d | ||
| fdd0bf52c3 | |||
|
|
7b50118b68 | ||
| eac457fa7c |
@@ -0,0 +1,207 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Staged-import orchestrator per Phase 6.4 Stream B.2-B.4. Covers the four operator
|
||||
/// actions: CreateBatch → StageRows (chunked) → FinaliseBatch (atomic apply into
|
||||
/// <see cref="Equipment"/>) → DropBatch (rollback of pre-finalise state).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>FinaliseBatch runs inside one EF transaction + bulk-inserts accepted rows into
|
||||
/// <see cref="Equipment"/>. Rejected rows stay behind as audit evidence; the batch row
|
||||
/// gains <see cref="EquipmentImportBatch.FinalisedAtUtc"/> so future writes know it's
|
||||
/// archived. DropBatch removes the batch + its cascaded rows.</para>
|
||||
///
|
||||
/// <para>Idempotence: calling FinaliseBatch twice throws <see cref="ImportBatchAlreadyFinalisedException"/>
|
||||
/// rather than double-inserting. Operator refreshes the admin page to see the first
|
||||
/// finalise completed.</para>
|
||||
///
|
||||
/// <para>ExternalIdReservation merging (ZTag + SAPID uniqueness) is NOT done here — a
|
||||
/// narrower follow-up wires it once the concurrent-insert test matrix is green.</para>
|
||||
/// </remarks>
|
||||
public sealed class EquipmentImportBatchService(OtOpcUaConfigDbContext db)
|
||||
{
|
||||
/// <summary>Create a new empty batch header. Returns the row with Id populated.</summary>
|
||||
public async Task<EquipmentImportBatch> CreateBatchAsync(string clusterId, string createdBy, CancellationToken ct)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
||||
|
||||
var batch = new EquipmentImportBatch
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
ClusterId = clusterId,
|
||||
CreatedBy = createdBy,
|
||||
CreatedAtUtc = DateTime.UtcNow,
|
||||
};
|
||||
db.EquipmentImportBatches.Add(batch);
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
return batch;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stage one chunk of rows into the batch. Caller usually feeds
|
||||
/// <see cref="EquipmentCsvImporter.Parse"/> output here — each
|
||||
/// <see cref="EquipmentCsvRow"/> becomes one accepted <see cref="EquipmentImportRow"/>,
|
||||
/// each rejected parser error becomes one row with <see cref="EquipmentImportRow.IsAccepted"/> false.
|
||||
/// </summary>
|
||||
public async Task StageRowsAsync(
|
||||
Guid batchId,
|
||||
IReadOnlyList<EquipmentCsvRow> acceptedRows,
|
||||
IReadOnlyList<EquipmentCsvRowError> rejectedRows,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false)
|
||||
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} finalised at {batch.FinalisedAtUtc:o}; no more rows can be staged.");
|
||||
|
||||
foreach (var row in acceptedRows)
|
||||
{
|
||||
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
BatchId = batchId,
|
||||
IsAccepted = true,
|
||||
ZTag = row.ZTag,
|
||||
MachineCode = row.MachineCode,
|
||||
SAPID = row.SAPID,
|
||||
EquipmentId = row.EquipmentId,
|
||||
EquipmentUuid = row.EquipmentUuid,
|
||||
Name = row.Name,
|
||||
UnsAreaName = row.UnsAreaName,
|
||||
UnsLineName = row.UnsLineName,
|
||||
Manufacturer = row.Manufacturer,
|
||||
Model = row.Model,
|
||||
SerialNumber = row.SerialNumber,
|
||||
HardwareRevision = row.HardwareRevision,
|
||||
SoftwareRevision = row.SoftwareRevision,
|
||||
YearOfConstruction = row.YearOfConstruction,
|
||||
AssetLocation = row.AssetLocation,
|
||||
ManufacturerUri = row.ManufacturerUri,
|
||||
DeviceManualUri = row.DeviceManualUri,
|
||||
});
|
||||
}
|
||||
|
||||
foreach (var error in rejectedRows)
|
||||
{
|
||||
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
BatchId = batchId,
|
||||
IsAccepted = false,
|
||||
RejectReason = error.Reason,
|
||||
LineNumberInFile = error.LineNumber,
|
||||
// Required columns need values for EF; reject rows use sentinel placeholders.
|
||||
ZTag = "", MachineCode = "", SAPID = "", EquipmentId = "", EquipmentUuid = "",
|
||||
Name = "", UnsAreaName = "", UnsLineName = "",
|
||||
});
|
||||
}
|
||||
|
||||
batch.RowsStaged += acceptedRows.Count + rejectedRows.Count;
|
||||
batch.RowsAccepted += acceptedRows.Count;
|
||||
batch.RowsRejected += rejectedRows.Count;
|
||||
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>Drop the batch (pre-finalise rollback). Cascaded row delete removes staged rows.</summary>
|
||||
public async Task DropBatchAsync(Guid batchId, CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false);
|
||||
if (batch is null) return;
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}; cannot drop.");
|
||||
|
||||
db.EquipmentImportBatches.Remove(batch);
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Atomic finalise. Inserts every accepted row into the live
|
||||
/// <see cref="Equipment"/> table under the target generation + stamps
|
||||
/// <see cref="EquipmentImportBatch.FinalisedAtUtc"/>. Failure rolls the whole tx
|
||||
/// back — <see cref="Equipment"/> never partially mutates.
|
||||
/// </summary>
|
||||
public async Task FinaliseBatchAsync(
|
||||
Guid batchId, long generationId, string driverInstanceIdForRows, string unsLineIdForRows, CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches
|
||||
.Include(b => b.Rows)
|
||||
.FirstOrDefaultAsync(b => b.Id == batchId, ct)
|
||||
.ConfigureAwait(false)
|
||||
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}.");
|
||||
|
||||
// EF InMemory provider doesn't honour BeginTransaction; SQL Server provider does.
|
||||
// Tests run the happy path under in-memory; production SQL Server runs the atomic tx.
|
||||
var supportsTx = db.Database.IsRelational();
|
||||
Microsoft.EntityFrameworkCore.Storage.IDbContextTransaction? tx = null;
|
||||
if (supportsTx)
|
||||
tx = await db.Database.BeginTransactionAsync(ct).ConfigureAwait(false);
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var row in batch.Rows.Where(r => r.IsAccepted))
|
||||
{
|
||||
db.Equipment.Add(new Equipment
|
||||
{
|
||||
EquipmentRowId = Guid.NewGuid(),
|
||||
GenerationId = generationId,
|
||||
EquipmentId = row.EquipmentId,
|
||||
EquipmentUuid = Guid.TryParse(row.EquipmentUuid, out var u) ? u : Guid.NewGuid(),
|
||||
DriverInstanceId = driverInstanceIdForRows,
|
||||
UnsLineId = unsLineIdForRows,
|
||||
Name = row.Name,
|
||||
MachineCode = row.MachineCode,
|
||||
ZTag = row.ZTag,
|
||||
SAPID = row.SAPID,
|
||||
Manufacturer = row.Manufacturer,
|
||||
Model = row.Model,
|
||||
SerialNumber = row.SerialNumber,
|
||||
HardwareRevision = row.HardwareRevision,
|
||||
SoftwareRevision = row.SoftwareRevision,
|
||||
YearOfConstruction = short.TryParse(row.YearOfConstruction, out var y) ? y : null,
|
||||
AssetLocation = row.AssetLocation,
|
||||
ManufacturerUri = row.ManufacturerUri,
|
||||
DeviceManualUri = row.DeviceManualUri,
|
||||
});
|
||||
}
|
||||
|
||||
batch.FinalisedAtUtc = DateTime.UtcNow;
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
if (tx is not null) await tx.CommitAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
if (tx is not null) await tx.RollbackAsync(ct).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (tx is not null) await tx.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>List batches created by the given user. Finalised batches are archived; include them on demand.</summary>
|
||||
public async Task<IReadOnlyList<EquipmentImportBatch>> ListByUserAsync(string createdBy, bool includeFinalised, CancellationToken ct)
|
||||
{
|
||||
var query = db.EquipmentImportBatches.AsNoTracking().Where(b => b.CreatedBy == createdBy);
|
||||
if (!includeFinalised)
|
||||
query = query.Where(b => b.FinalisedAtUtc == null);
|
||||
return await query.OrderByDescending(b => b.CreatedAtUtc).ToListAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class ImportBatchNotFoundException(string message) : Exception(message);
|
||||
public sealed class ImportBatchAlreadyFinalisedException(string message) : Exception(message);
|
||||
@@ -27,6 +27,24 @@ public sealed class DriverInstance
|
||||
/// <summary>Schemaless per-driver-type JSON config. Validated against registered JSON schema at draft-publish time (decision #91).</summary>
|
||||
public required string DriverConfig { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Optional per-instance overrides for the Phase 6.1 shared Polly resilience pipeline.
|
||||
/// Null = use the driver's tier defaults (decision #143). When populated, expected shape:
|
||||
/// <code>
|
||||
/// {
|
||||
/// "bulkheadMaxConcurrent": 16,
|
||||
/// "bulkheadMaxQueue": 64,
|
||||
/// "capabilityPolicies": {
|
||||
/// "Read": { "timeoutSeconds": 5, "retryCount": 5, "breakerFailureThreshold": 3 },
|
||||
/// "Write": { "timeoutSeconds": 5, "retryCount": 0, "breakerFailureThreshold": 5 }
|
||||
/// }
|
||||
/// }
|
||||
/// </code>
|
||||
/// Parsed at startup by <c>DriverResilienceOptionsParser</c>; every key is optional +
|
||||
/// unrecognised keys are ignored so future shapes land without a migration.
|
||||
/// </summary>
|
||||
public string? ResilienceConfig { get; set; }
|
||||
|
||||
public ConfigGeneration? Generation { get; set; }
|
||||
public ServerCluster? Cluster { get; set; }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,68 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
|
||||
/// <summary>
|
||||
/// Staged equipment-import batch per Phase 6.4 Stream B.2. Rows land in the child
|
||||
/// <see cref="EquipmentImportRow"/> table under a batch header; operator reviews + either
|
||||
/// drops (via <c>DropImportBatch</c>) or finalises (via <c>FinaliseImportBatch</c>) in one
|
||||
/// bounded transaction. The live <c>Equipment</c> table never sees partial state.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>User-scoped visibility: the preview modal only shows batches where
|
||||
/// <see cref="CreatedBy"/> equals the current operator. Prevents accidental
|
||||
/// cross-operator finalise during concurrent imports. An admin finalise / drop surface
|
||||
/// can override this — tracked alongside the UI follow-up.</para>
|
||||
///
|
||||
/// <para><see cref="FinalisedAtUtc"/> stamps the moment the batch promoted from staging
|
||||
/// into <c>Equipment</c>. Null = still in staging; non-null = archived / finalised.</para>
|
||||
/// </remarks>
|
||||
public sealed class EquipmentImportBatch
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
public required string ClusterId { get; set; }
|
||||
public required string CreatedBy { get; set; }
|
||||
public DateTime CreatedAtUtc { get; set; }
|
||||
public int RowsStaged { get; set; }
|
||||
public int RowsAccepted { get; set; }
|
||||
public int RowsRejected { get; set; }
|
||||
public DateTime? FinalisedAtUtc { get; set; }
|
||||
|
||||
public ICollection<EquipmentImportRow> Rows { get; set; } = [];
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// One staged row under an <see cref="EquipmentImportBatch"/>. Mirrors the decision #117
|
||||
/// + decision #139 columns from the CSV importer's output + an
|
||||
/// <see cref="IsAccepted"/> flag + a <see cref="RejectReason"/> string the preview modal
|
||||
/// renders.
|
||||
/// </summary>
|
||||
public sealed class EquipmentImportRow
|
||||
{
|
||||
public Guid Id { get; set; }
|
||||
public Guid BatchId { get; set; }
|
||||
public int LineNumberInFile { get; set; }
|
||||
public bool IsAccepted { get; set; }
|
||||
public string? RejectReason { get; set; }
|
||||
|
||||
// Required (decision #117)
|
||||
public required string ZTag { get; set; }
|
||||
public required string MachineCode { get; set; }
|
||||
public required string SAPID { get; set; }
|
||||
public required string EquipmentId { get; set; }
|
||||
public required string EquipmentUuid { get; set; }
|
||||
public required string Name { get; set; }
|
||||
public required string UnsAreaName { get; set; }
|
||||
public required string UnsLineName { get; set; }
|
||||
|
||||
// Optional (decision #139 — OPC 40010 Identification)
|
||||
public string? Manufacturer { get; set; }
|
||||
public string? Model { get; set; }
|
||||
public string? SerialNumber { get; set; }
|
||||
public string? HardwareRevision { get; set; }
|
||||
public string? SoftwareRevision { get; set; }
|
||||
public string? YearOfConstruction { get; set; }
|
||||
public string? AssetLocation { get; set; }
|
||||
public string? ManufacturerUri { get; set; }
|
||||
public string? DeviceManualUri { get; set; }
|
||||
|
||||
public EquipmentImportBatch? Batch { get; set; }
|
||||
}
|
||||
1347
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419161932_AddDriverInstanceResilienceConfig.Designer.cs
generated
Normal file
1347
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419161932_AddDriverInstanceResilienceConfig.Designer.cs
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,37 @@
|
||||
using Microsoft.EntityFrameworkCore.Migrations;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public partial class AddDriverInstanceResilienceConfig : Migration
|
||||
{
|
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.AddColumn<string>(
|
||||
name: "ResilienceConfig",
|
||||
table: "DriverInstance",
|
||||
type: "nvarchar(max)",
|
||||
nullable: true);
|
||||
|
||||
migrationBuilder.AddCheckConstraint(
|
||||
name: "CK_DriverInstance_ResilienceConfig_IsJson",
|
||||
table: "DriverInstance",
|
||||
sql: "ResilienceConfig IS NULL OR ISJSON(ResilienceConfig) = 1");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.DropCheckConstraint(
|
||||
name: "CK_DriverInstance_ResilienceConfig_IsJson",
|
||||
table: "DriverInstance");
|
||||
|
||||
migrationBuilder.DropColumn(
|
||||
name: "ResilienceConfig",
|
||||
table: "DriverInstance");
|
||||
}
|
||||
}
|
||||
}
|
||||
1505
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419185124_AddEquipmentImportBatch.Designer.cs
generated
Normal file
1505
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419185124_AddEquipmentImportBatch.Designer.cs
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,91 @@
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore.Migrations;
|
||||
|
||||
#nullable disable
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
{
|
||||
/// <inheritdoc />
|
||||
public partial class AddEquipmentImportBatch : Migration
|
||||
{
|
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.CreateTable(
|
||||
name: "EquipmentImportBatch",
|
||||
columns: table => new
|
||||
{
|
||||
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||
ClusterId = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||
CreatedBy = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||
CreatedAtUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: false),
|
||||
RowsStaged = table.Column<int>(type: "int", nullable: false),
|
||||
RowsAccepted = table.Column<int>(type: "int", nullable: false),
|
||||
RowsRejected = table.Column<int>(type: "int", nullable: false),
|
||||
FinalisedAtUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: true)
|
||||
},
|
||||
constraints: table =>
|
||||
{
|
||||
table.PrimaryKey("PK_EquipmentImportBatch", x => x.Id);
|
||||
});
|
||||
|
||||
migrationBuilder.CreateTable(
|
||||
name: "EquipmentImportRow",
|
||||
columns: table => new
|
||||
{
|
||||
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||
BatchId = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||
LineNumberInFile = table.Column<int>(type: "int", nullable: false),
|
||||
IsAccepted = table.Column<bool>(type: "bit", nullable: false),
|
||||
RejectReason = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||
ZTag = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||
MachineCode = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||
SAPID = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||
EquipmentId = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||
EquipmentUuid = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||
Name = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||
UnsAreaName = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||
UnsLineName = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||
Manufacturer = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||
Model = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||
SerialNumber = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||
HardwareRevision = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: true),
|
||||
SoftwareRevision = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: true),
|
||||
YearOfConstruction = table.Column<string>(type: "nvarchar(8)", maxLength: 8, nullable: true),
|
||||
AssetLocation = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||
ManufacturerUri = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||
DeviceManualUri = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true)
|
||||
},
|
||||
constraints: table =>
|
||||
{
|
||||
table.PrimaryKey("PK_EquipmentImportRow", x => x.Id);
|
||||
table.ForeignKey(
|
||||
name: "FK_EquipmentImportRow_EquipmentImportBatch_BatchId",
|
||||
column: x => x.BatchId,
|
||||
principalTable: "EquipmentImportBatch",
|
||||
principalColumn: "Id",
|
||||
onDelete: ReferentialAction.Cascade);
|
||||
});
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "IX_EquipmentImportBatch_Creator_Finalised",
|
||||
table: "EquipmentImportBatch",
|
||||
columns: new[] { "CreatedBy", "FinalisedAtUtc" });
|
||||
|
||||
migrationBuilder.CreateIndex(
|
||||
name: "IX_EquipmentImportRow_Batch",
|
||||
table: "EquipmentImportRow",
|
||||
column: "BatchId");
|
||||
}
|
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder)
|
||||
{
|
||||
migrationBuilder.DropTable(
|
||||
name: "EquipmentImportRow");
|
||||
|
||||
migrationBuilder.DropTable(
|
||||
name: "EquipmentImportBatch");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -413,6 +413,9 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("ResilienceConfig")
|
||||
.HasColumnType("nvarchar(max)");
|
||||
|
||||
b.HasKey("DriverInstanceRowId");
|
||||
|
||||
b.HasIndex("ClusterId");
|
||||
@@ -431,6 +434,8 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
b.ToTable("DriverInstance", null, t =>
|
||||
{
|
||||
t.HasCheckConstraint("CK_DriverInstance_DriverConfig_IsJson", "ISJSON(DriverConfig) = 1");
|
||||
|
||||
t.HasCheckConstraint("CK_DriverInstance_ResilienceConfig_IsJson", "ResilienceConfig IS NULL OR ISJSON(ResilienceConfig) = 1");
|
||||
});
|
||||
});
|
||||
|
||||
@@ -599,6 +604,148 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
b.ToTable("Equipment", (string)null);
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uniqueidentifier");
|
||||
|
||||
b.Property<string>("ClusterId")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<DateTime>("CreatedAtUtc")
|
||||
.HasColumnType("datetime2(3)");
|
||||
|
||||
b.Property<string>("CreatedBy")
|
||||
.IsRequired()
|
||||
.HasMaxLength(128)
|
||||
.HasColumnType("nvarchar(128)");
|
||||
|
||||
b.Property<DateTime?>("FinalisedAtUtc")
|
||||
.HasColumnType("datetime2(3)");
|
||||
|
||||
b.Property<int>("RowsAccepted")
|
||||
.HasColumnType("int");
|
||||
|
||||
b.Property<int>("RowsRejected")
|
||||
.HasColumnType("int");
|
||||
|
||||
b.Property<int>("RowsStaged")
|
||||
.HasColumnType("int");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("CreatedBy", "FinalisedAtUtc")
|
||||
.HasDatabaseName("IX_EquipmentImportBatch_Creator_Finalised");
|
||||
|
||||
b.ToTable("EquipmentImportBatch", (string)null);
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportRow", b =>
|
||||
{
|
||||
b.Property<Guid>("Id")
|
||||
.ValueGeneratedOnAdd()
|
||||
.HasColumnType("uniqueidentifier");
|
||||
|
||||
b.Property<string>("AssetLocation")
|
||||
.HasMaxLength(512)
|
||||
.HasColumnType("nvarchar(512)");
|
||||
|
||||
b.Property<Guid>("BatchId")
|
||||
.HasColumnType("uniqueidentifier");
|
||||
|
||||
b.Property<string>("DeviceManualUri")
|
||||
.HasMaxLength(512)
|
||||
.HasColumnType("nvarchar(512)");
|
||||
|
||||
b.Property<string>("EquipmentId")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("EquipmentUuid")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("HardwareRevision")
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<bool>("IsAccepted")
|
||||
.HasColumnType("bit");
|
||||
|
||||
b.Property<int>("LineNumberInFile")
|
||||
.HasColumnType("int");
|
||||
|
||||
b.Property<string>("MachineCode")
|
||||
.IsRequired()
|
||||
.HasMaxLength(128)
|
||||
.HasColumnType("nvarchar(128)");
|
||||
|
||||
b.Property<string>("Manufacturer")
|
||||
.HasMaxLength(256)
|
||||
.HasColumnType("nvarchar(256)");
|
||||
|
||||
b.Property<string>("ManufacturerUri")
|
||||
.HasMaxLength(512)
|
||||
.HasColumnType("nvarchar(512)");
|
||||
|
||||
b.Property<string>("Model")
|
||||
.HasMaxLength(256)
|
||||
.HasColumnType("nvarchar(256)");
|
||||
|
||||
b.Property<string>("Name")
|
||||
.IsRequired()
|
||||
.HasMaxLength(128)
|
||||
.HasColumnType("nvarchar(128)");
|
||||
|
||||
b.Property<string>("RejectReason")
|
||||
.HasMaxLength(512)
|
||||
.HasColumnType("nvarchar(512)");
|
||||
|
||||
b.Property<string>("SAPID")
|
||||
.IsRequired()
|
||||
.HasMaxLength(128)
|
||||
.HasColumnType("nvarchar(128)");
|
||||
|
||||
b.Property<string>("SerialNumber")
|
||||
.HasMaxLength(256)
|
||||
.HasColumnType("nvarchar(256)");
|
||||
|
||||
b.Property<string>("SoftwareRevision")
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("UnsAreaName")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("UnsLineName")
|
||||
.IsRequired()
|
||||
.HasMaxLength(64)
|
||||
.HasColumnType("nvarchar(64)");
|
||||
|
||||
b.Property<string>("YearOfConstruction")
|
||||
.HasMaxLength(8)
|
||||
.HasColumnType("nvarchar(8)");
|
||||
|
||||
b.Property<string>("ZTag")
|
||||
.IsRequired()
|
||||
.HasMaxLength(128)
|
||||
.HasColumnType("nvarchar(128)");
|
||||
|
||||
b.HasKey("Id");
|
||||
|
||||
b.HasIndex("BatchId")
|
||||
.HasDatabaseName("IX_EquipmentImportRow_Batch");
|
||||
|
||||
b.ToTable("EquipmentImportRow", (string)null);
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ExternalIdReservation", b =>
|
||||
{
|
||||
b.Property<Guid>("ReservationId")
|
||||
@@ -1226,6 +1373,17 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
b.Navigation("Generation");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportRow", b =>
|
||||
{
|
||||
b.HasOne("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", "Batch")
|
||||
.WithMany("Rows")
|
||||
.HasForeignKey("BatchId")
|
||||
.OnDelete(DeleteBehavior.Cascade)
|
||||
.IsRequired();
|
||||
|
||||
b.Navigation("Batch");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.LdapGroupRoleMapping", b =>
|
||||
{
|
||||
b.HasOne("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", "Cluster")
|
||||
@@ -1325,6 +1483,11 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||
b.Navigation("GenerationState");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", b =>
|
||||
{
|
||||
b.Navigation("Rows");
|
||||
});
|
||||
|
||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", b =>
|
||||
{
|
||||
b.Navigation("Generations");
|
||||
|
||||
@@ -30,6 +30,8 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
public DbSet<DriverHostStatus> DriverHostStatuses => Set<DriverHostStatus>();
|
||||
public DbSet<DriverInstanceResilienceStatus> DriverInstanceResilienceStatuses => Set<DriverInstanceResilienceStatus>();
|
||||
public DbSet<LdapGroupRoleMapping> LdapGroupRoleMappings => Set<LdapGroupRoleMapping>();
|
||||
public DbSet<EquipmentImportBatch> EquipmentImportBatches => Set<EquipmentImportBatch>();
|
||||
public DbSet<EquipmentImportRow> EquipmentImportRows => Set<EquipmentImportRow>();
|
||||
|
||||
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
||||
{
|
||||
@@ -53,6 +55,7 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
ConfigureDriverHostStatus(modelBuilder);
|
||||
ConfigureDriverInstanceResilienceStatus(modelBuilder);
|
||||
ConfigureLdapGroupRoleMapping(modelBuilder);
|
||||
ConfigureEquipmentImportBatch(modelBuilder);
|
||||
}
|
||||
|
||||
private static void ConfigureServerCluster(ModelBuilder modelBuilder)
|
||||
@@ -251,6 +254,8 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
{
|
||||
t.HasCheckConstraint("CK_DriverInstance_DriverConfig_IsJson",
|
||||
"ISJSON(DriverConfig) = 1");
|
||||
t.HasCheckConstraint("CK_DriverInstance_ResilienceConfig_IsJson",
|
||||
"ResilienceConfig IS NULL OR ISJSON(ResilienceConfig) = 1");
|
||||
});
|
||||
e.HasKey(x => x.DriverInstanceRowId);
|
||||
e.Property(x => x.DriverInstanceRowId).HasDefaultValueSql("NEWSEQUENTIALID()");
|
||||
@@ -260,6 +265,7 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
e.Property(x => x.Name).HasMaxLength(128);
|
||||
e.Property(x => x.DriverType).HasMaxLength(32);
|
||||
e.Property(x => x.DriverConfig).HasColumnType("nvarchar(max)");
|
||||
e.Property(x => x.ResilienceConfig).HasColumnType("nvarchar(max)");
|
||||
|
||||
e.HasOne(x => x.Generation).WithMany().HasForeignKey(x => x.GenerationId).OnDelete(DeleteBehavior.Restrict);
|
||||
e.HasOne(x => x.Cluster).WithMany().HasForeignKey(x => x.ClusterId).OnDelete(DeleteBehavior.Restrict);
|
||||
@@ -565,4 +571,52 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
||||
e.HasIndex(x => x.LdapGroup).HasDatabaseName("IX_LdapGroupRoleMapping_Group");
|
||||
});
|
||||
}
|
||||
|
||||
private static void ConfigureEquipmentImportBatch(ModelBuilder modelBuilder)
|
||||
{
|
||||
modelBuilder.Entity<EquipmentImportBatch>(e =>
|
||||
{
|
||||
e.ToTable("EquipmentImportBatch");
|
||||
e.HasKey(x => x.Id);
|
||||
e.Property(x => x.ClusterId).HasMaxLength(64);
|
||||
e.Property(x => x.CreatedBy).HasMaxLength(128);
|
||||
e.Property(x => x.CreatedAtUtc).HasColumnType("datetime2(3)");
|
||||
e.Property(x => x.FinalisedAtUtc).HasColumnType("datetime2(3)");
|
||||
|
||||
// Admin preview modal filters by user; finalise / drop both hit this index.
|
||||
e.HasIndex(x => new { x.CreatedBy, x.FinalisedAtUtc })
|
||||
.HasDatabaseName("IX_EquipmentImportBatch_Creator_Finalised");
|
||||
});
|
||||
|
||||
modelBuilder.Entity<EquipmentImportRow>(e =>
|
||||
{
|
||||
e.ToTable("EquipmentImportRow");
|
||||
e.HasKey(x => x.Id);
|
||||
e.Property(x => x.ZTag).HasMaxLength(128);
|
||||
e.Property(x => x.MachineCode).HasMaxLength(128);
|
||||
e.Property(x => x.SAPID).HasMaxLength(128);
|
||||
e.Property(x => x.EquipmentId).HasMaxLength(64);
|
||||
e.Property(x => x.EquipmentUuid).HasMaxLength(64);
|
||||
e.Property(x => x.Name).HasMaxLength(128);
|
||||
e.Property(x => x.UnsAreaName).HasMaxLength(64);
|
||||
e.Property(x => x.UnsLineName).HasMaxLength(64);
|
||||
e.Property(x => x.Manufacturer).HasMaxLength(256);
|
||||
e.Property(x => x.Model).HasMaxLength(256);
|
||||
e.Property(x => x.SerialNumber).HasMaxLength(256);
|
||||
e.Property(x => x.HardwareRevision).HasMaxLength(64);
|
||||
e.Property(x => x.SoftwareRevision).HasMaxLength(64);
|
||||
e.Property(x => x.YearOfConstruction).HasMaxLength(8);
|
||||
e.Property(x => x.AssetLocation).HasMaxLength(512);
|
||||
e.Property(x => x.ManufacturerUri).HasMaxLength(512);
|
||||
e.Property(x => x.DeviceManualUri).HasMaxLength(512);
|
||||
e.Property(x => x.RejectReason).HasMaxLength(512);
|
||||
|
||||
e.HasOne(x => x.Batch)
|
||||
.WithMany(b => b.Rows)
|
||||
.HasForeignKey(x => x.BatchId)
|
||||
.OnDelete(DeleteBehavior.Cascade);
|
||||
|
||||
e.HasIndex(x => x.BatchId).HasDatabaseName("IX_EquipmentImportRow_Batch");
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,34 @@
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
/// <summary>
|
||||
/// Optional driver capability that maps a per-tag full reference to the underlying host
|
||||
/// name responsible for serving it. Drivers with a one-host topology (Galaxy on one
|
||||
/// MXAccess endpoint, OpcUaClient against one remote server, S7 against one PLC) do NOT
|
||||
/// need to implement this — the dispatch layer falls back to
|
||||
/// <see cref="IDriver.DriverInstanceId"/> as a single-host key.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>Multi-host drivers (Modbus with N PLCs, hypothetical AB CIP across a rack, etc.)
|
||||
/// implement this so the Phase 6.1 resilience pipeline can be keyed on
|
||||
/// <c>(DriverInstanceId, ResolvedHostName, DriverCapability)</c> per decision #144. One
|
||||
/// dead PLC behind a multi-device Modbus driver then trips only its own breaker; healthy
|
||||
/// siblings keep serving.</para>
|
||||
///
|
||||
/// <para>Implementations must be fast + allocation-free on the hot path — <c>ReadAsync</c>
|
||||
/// / <c>WriteAsync</c> call this once per tag. A simple <c>Dictionary<string, string></c>
|
||||
/// lookup is typical.</para>
|
||||
///
|
||||
/// <para>When the fullRef doesn't map to a known host (caller passes an unregistered
|
||||
/// reference, or the tag was removed mid-flight), implementations should return the
|
||||
/// driver's default-host string rather than throwing — the invoker falls back to a
|
||||
/// single-host pipeline for that call, which is safer than tearing down the request.</para>
|
||||
/// </remarks>
|
||||
public interface IPerCallHostResolver
|
||||
{
|
||||
/// <summary>
|
||||
/// Resolve the host name for the given driver-side full reference. Returned value is
|
||||
/// used as the <c>hostName</c> argument to the Phase 6.1 <c>CapabilityInvoker</c> so
|
||||
/// per-host breaker isolation + per-host bulkhead accounting both kick in.
|
||||
/// </summary>
|
||||
string ResolveHost(string fullReference);
|
||||
}
|
||||
@@ -22,6 +22,7 @@ public sealed class CapabilityInvoker
|
||||
private readonly string _driverInstanceId;
|
||||
private readonly string _driverType;
|
||||
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
||||
private readonly DriverResilienceStatusTracker? _statusTracker;
|
||||
|
||||
/// <summary>
|
||||
/// Construct an invoker for one driver instance.
|
||||
@@ -33,11 +34,13 @@ public sealed class CapabilityInvoker
|
||||
/// pipeline-invalidate can take effect without restarting the invoker.
|
||||
/// </param>
|
||||
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
||||
/// <param name="statusTracker">Optional resilience-status tracker. When wired, every capability call records start/complete so Admin <c>/hosts</c> can surface <see cref="ResilienceStatusSnapshot.CurrentInFlight"/> as the bulkhead-depth proxy.</param>
|
||||
public CapabilityInvoker(
|
||||
DriverResiliencePipelineBuilder builder,
|
||||
string driverInstanceId,
|
||||
Func<DriverResilienceOptions> optionsAccessor,
|
||||
string driverType = "Unknown")
|
||||
string driverType = "Unknown",
|
||||
DriverResilienceStatusTracker? statusTracker = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(builder);
|
||||
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
||||
@@ -46,6 +49,7 @@ public sealed class CapabilityInvoker
|
||||
_driverInstanceId = driverInstanceId;
|
||||
_driverType = driverType;
|
||||
_optionsAccessor = optionsAccessor;
|
||||
_statusTracker = statusTracker;
|
||||
}
|
||||
|
||||
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
||||
@@ -59,9 +63,17 @@ public sealed class CapabilityInvoker
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||
try
|
||||
{
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
{
|
||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -75,9 +87,17 @@ public sealed class CapabilityInvoker
|
||||
ArgumentNullException.ThrowIfNull(callSite);
|
||||
|
||||
var pipeline = ResolvePipeline(capability, hostName);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||
try
|
||||
{
|
||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||
{
|
||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
finally
|
||||
{
|
||||
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,116 @@
|
||||
using System.Text.Json;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Parses the <c>DriverInstance.ResilienceConfig</c> JSON column into a
|
||||
/// <see cref="DriverResilienceOptions"/> instance layered on top of the tier defaults.
|
||||
/// Every key in the JSON is optional; missing keys fall back to the tier defaults from
|
||||
/// <see cref="DriverResilienceOptions.GetTierDefaults(DriverTier)"/>.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>Example JSON shape per Phase 6.1 Stream A.2:</para>
|
||||
/// <code>
|
||||
/// {
|
||||
/// "bulkheadMaxConcurrent": 16,
|
||||
/// "bulkheadMaxQueue": 64,
|
||||
/// "capabilityPolicies": {
|
||||
/// "Read": { "timeoutSeconds": 5, "retryCount": 5, "breakerFailureThreshold": 3 },
|
||||
/// "Write": { "timeoutSeconds": 5, "retryCount": 0, "breakerFailureThreshold": 5 }
|
||||
/// }
|
||||
/// }
|
||||
/// </code>
|
||||
///
|
||||
/// <para>Unrecognised keys + values are ignored so future shapes land without a migration.
|
||||
/// Per-capability overrides are layered on top of tier defaults — a partial policy (only
|
||||
/// some of TimeoutSeconds/RetryCount/BreakerFailureThreshold) fills in the other fields
|
||||
/// from the tier default for that capability.</para>
|
||||
///
|
||||
/// <para>Parser failures (malformed JSON, type mismatches) fall back to pure tier defaults
|
||||
/// + surface through an out-parameter diagnostic. Callers may log the diagnostic but should
|
||||
/// NOT fail driver startup — a misconfigured ResilienceConfig should never brick a
|
||||
/// working driver.</para>
|
||||
/// </remarks>
|
||||
public static class DriverResilienceOptionsParser
|
||||
{
|
||||
private static readonly JsonSerializerOptions JsonOpts = new()
|
||||
{
|
||||
PropertyNameCaseInsensitive = true,
|
||||
AllowTrailingCommas = true,
|
||||
ReadCommentHandling = JsonCommentHandling.Skip,
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Parse the JSON payload layered on <paramref name="tier"/>'s defaults. Returns the
|
||||
/// effective options; <paramref name="parseDiagnostic"/> is null on success, or a
|
||||
/// human-readable error message when the JSON was malformed (options still returned
|
||||
/// = tier defaults).
|
||||
/// </summary>
|
||||
public static DriverResilienceOptions ParseOrDefaults(
|
||||
DriverTier tier,
|
||||
string? resilienceConfigJson,
|
||||
out string? parseDiagnostic)
|
||||
{
|
||||
parseDiagnostic = null;
|
||||
var baseDefaults = DriverResilienceOptions.GetTierDefaults(tier);
|
||||
var baseOptions = new DriverResilienceOptions { Tier = tier, CapabilityPolicies = baseDefaults };
|
||||
|
||||
if (string.IsNullOrWhiteSpace(resilienceConfigJson))
|
||||
return baseOptions;
|
||||
|
||||
ResilienceConfigShape? shape;
|
||||
try
|
||||
{
|
||||
shape = JsonSerializer.Deserialize<ResilienceConfigShape>(resilienceConfigJson, JsonOpts);
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
parseDiagnostic = $"ResilienceConfig JSON malformed; falling back to tier {tier} defaults. Detail: {ex.Message}";
|
||||
return baseOptions;
|
||||
}
|
||||
|
||||
if (shape is null) return baseOptions;
|
||||
|
||||
var merged = new Dictionary<DriverCapability, CapabilityPolicy>(baseDefaults);
|
||||
if (shape.CapabilityPolicies is not null)
|
||||
{
|
||||
foreach (var (capName, overridePolicy) in shape.CapabilityPolicies)
|
||||
{
|
||||
if (!Enum.TryParse<DriverCapability>(capName, ignoreCase: true, out var capability))
|
||||
{
|
||||
parseDiagnostic ??= $"Unknown capability '{capName}' in ResilienceConfig; skipped.";
|
||||
continue;
|
||||
}
|
||||
|
||||
var basePolicy = merged[capability];
|
||||
merged[capability] = new CapabilityPolicy(
|
||||
TimeoutSeconds: overridePolicy.TimeoutSeconds ?? basePolicy.TimeoutSeconds,
|
||||
RetryCount: overridePolicy.RetryCount ?? basePolicy.RetryCount,
|
||||
BreakerFailureThreshold: overridePolicy.BreakerFailureThreshold ?? basePolicy.BreakerFailureThreshold);
|
||||
}
|
||||
}
|
||||
|
||||
return new DriverResilienceOptions
|
||||
{
|
||||
Tier = tier,
|
||||
CapabilityPolicies = merged,
|
||||
BulkheadMaxConcurrent = shape.BulkheadMaxConcurrent ?? baseOptions.BulkheadMaxConcurrent,
|
||||
BulkheadMaxQueue = shape.BulkheadMaxQueue ?? baseOptions.BulkheadMaxQueue,
|
||||
};
|
||||
}
|
||||
|
||||
private sealed class ResilienceConfigShape
|
||||
{
|
||||
public int? BulkheadMaxConcurrent { get; set; }
|
||||
public int? BulkheadMaxQueue { get; set; }
|
||||
public Dictionary<string, CapabilityPolicyShape>? CapabilityPolicies { get; set; }
|
||||
}
|
||||
|
||||
private sealed class CapabilityPolicyShape
|
||||
{
|
||||
public int? TimeoutSeconds { get; set; }
|
||||
public int? RetryCount { get; set; }
|
||||
public int? BreakerFailureThreshold { get; set; }
|
||||
}
|
||||
}
|
||||
@@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker
|
||||
});
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Record the entry of a capability call for this (instance, host). Increments the
|
||||
/// in-flight counter used as the <see cref="ResilienceStatusSnapshot.CurrentInFlight"/>
|
||||
/// surface (a cheap stand-in for Polly bulkhead depth). Paired with
|
||||
/// <see cref="RecordCallComplete"/>; callers use try/finally.
|
||||
/// </summary>
|
||||
public void RecordCallStart(string driverInstanceId, string hostName)
|
||||
{
|
||||
var key = new StatusKey(driverInstanceId, hostName);
|
||||
_status.AddOrUpdate(key,
|
||||
_ => new ResilienceStatusSnapshot { CurrentInFlight = 1 },
|
||||
(_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 });
|
||||
}
|
||||
|
||||
/// <summary>Paired with <see cref="RecordCallStart"/> — decrements the in-flight counter.</summary>
|
||||
public void RecordCallComplete(string driverInstanceId, string hostName)
|
||||
{
|
||||
var key = new StatusKey(driverInstanceId, hostName);
|
||||
_status.AddOrUpdate(key,
|
||||
_ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0
|
||||
(_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) });
|
||||
}
|
||||
|
||||
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
||||
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
||||
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
||||
@@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot
|
||||
public long BaselineFootprintBytes { get; init; }
|
||||
public long CurrentFootprintBytes { get; init; }
|
||||
public DateTime LastSampledUtc { get; init; }
|
||||
|
||||
/// <summary>
|
||||
/// In-flight capability calls against this (instance, host). Bumped on call entry +
|
||||
/// decremented on completion. Feeds <c>DriverInstanceResilienceStatus.CurrentBulkheadDepth</c>
|
||||
/// for Admin <c>/hosts</c> — a cheap proxy for the Polly bulkhead depth until the full
|
||||
/// telemetry observer lands.
|
||||
/// </summary>
|
||||
public int CurrentInFlight { get; init; }
|
||||
}
|
||||
|
||||
@@ -0,0 +1,139 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Hosting;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||
|
||||
/// <summary>
|
||||
/// Samples <see cref="DriverResilienceStatusTracker"/> at a fixed tick + upserts each
|
||||
/// <c>(DriverInstanceId, HostName)</c> snapshot into <see cref="DriverInstanceResilienceStatus"/>
|
||||
/// so Admin <c>/hosts</c> can render live resilience counters across restarts.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>Closes the HostedService piece of Phase 6.1 Stream E.2 flagged as a follow-up
|
||||
/// when the tracker shipped in PR #82. The Admin UI column-refresh piece (red badge when
|
||||
/// ConsecutiveFailures > breakerThreshold / 2 + SignalR push) is still deferred to
|
||||
/// the visual-compliance pass — this service owns the persistence half alone.</para>
|
||||
///
|
||||
/// <para>Tick interval defaults to 5 s. Persistence is best-effort: a DB outage during
|
||||
/// a tick logs + continues; the next tick tries again with the latest snapshots. The
|
||||
/// hosted service never crashes the app on sample failure.</para>
|
||||
///
|
||||
/// <para><see cref="PersistOnceAsync"/> factored as a public method so tests can drive
|
||||
/// it directly, matching the <see cref="ScheduledRecycleHostedService.TickOnceAsync"/>
|
||||
/// pattern for deterministic unit-test timing.</para>
|
||||
/// </remarks>
|
||||
public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
||||
{
|
||||
private readonly DriverResilienceStatusTracker _tracker;
|
||||
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
|
||||
private readonly ILogger<ResilienceStatusPublisherHostedService> _logger;
|
||||
private readonly TimeProvider _timeProvider;
|
||||
|
||||
/// <summary>Tick interval — how often the tracker snapshot is persisted.</summary>
|
||||
public TimeSpan TickInterval { get; }
|
||||
|
||||
/// <summary>Snapshot of the tick count for diagnostics + test assertions.</summary>
|
||||
public int TickCount { get; private set; }
|
||||
|
||||
public ResilienceStatusPublisherHostedService(
|
||||
DriverResilienceStatusTracker tracker,
|
||||
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
|
||||
ILogger<ResilienceStatusPublisherHostedService> logger,
|
||||
TimeProvider? timeProvider = null,
|
||||
TimeSpan? tickInterval = null)
|
||||
{
|
||||
ArgumentNullException.ThrowIfNull(tracker);
|
||||
ArgumentNullException.ThrowIfNull(dbContextFactory);
|
||||
|
||||
_tracker = tracker;
|
||||
_dbContextFactory = dbContextFactory;
|
||||
_logger = logger;
|
||||
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||
TickInterval = tickInterval ?? TimeSpan.FromSeconds(5);
|
||||
}
|
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||
{
|
||||
_logger.LogInformation(
|
||||
"ResilienceStatusPublisherHostedService starting — tick interval = {Interval}",
|
||||
TickInterval);
|
||||
|
||||
while (!stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
try
|
||||
{
|
||||
await Task.Delay(TickInterval, _timeProvider, stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
await PersistOnceAsync(stoppingToken).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
_logger.LogInformation("ResilienceStatusPublisherHostedService stopping after {TickCount} tick(s).", TickCount);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Take one snapshot of the tracker + upsert each pair into the persistence table.
|
||||
/// Swallows transient exceptions + logs them; never throws from a sample failure.
|
||||
/// </summary>
|
||||
public async Task PersistOnceAsync(CancellationToken cancellationToken)
|
||||
{
|
||||
TickCount++;
|
||||
var snapshot = _tracker.Snapshot();
|
||||
if (snapshot.Count == 0) return;
|
||||
|
||||
try
|
||||
{
|
||||
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
|
||||
var now = _timeProvider.GetUtcNow().UtcDateTime;
|
||||
|
||||
foreach (var (driverInstanceId, hostName, counters) in snapshot)
|
||||
{
|
||||
var existing = await db.DriverInstanceResilienceStatuses
|
||||
.FirstOrDefaultAsync(x => x.DriverInstanceId == driverInstanceId && x.HostName == hostName, cancellationToken)
|
||||
.ConfigureAwait(false);
|
||||
|
||||
if (existing is null)
|
||||
{
|
||||
db.DriverInstanceResilienceStatuses.Add(new DriverInstanceResilienceStatus
|
||||
{
|
||||
DriverInstanceId = driverInstanceId,
|
||||
HostName = hostName,
|
||||
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
|
||||
ConsecutiveFailures = counters.ConsecutiveFailures,
|
||||
CurrentBulkheadDepth = counters.CurrentInFlight,
|
||||
LastRecycleUtc = counters.LastRecycleUtc,
|
||||
BaselineFootprintBytes = counters.BaselineFootprintBytes,
|
||||
CurrentFootprintBytes = counters.CurrentFootprintBytes,
|
||||
LastSampledUtc = now,
|
||||
});
|
||||
}
|
||||
else
|
||||
{
|
||||
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
|
||||
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
|
||||
existing.CurrentBulkheadDepth = counters.CurrentInFlight;
|
||||
existing.LastRecycleUtc = counters.LastRecycleUtc;
|
||||
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
|
||||
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
|
||||
existing.LastSampledUtc = now;
|
||||
}
|
||||
}
|
||||
|
||||
await db.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
|
||||
}
|
||||
catch (OperationCanceledException) { throw; }
|
||||
catch (Exception ex)
|
||||
{
|
||||
_logger.LogWarning(ex,
|
||||
"ResilienceStatusPublisher persistence tick failed; next tick will retry with latest snapshots.");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -35,6 +35,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
private readonly IDriver _driver;
|
||||
private readonly IReadable? _readable;
|
||||
private readonly IWritable? _writable;
|
||||
private readonly IPerCallHostResolver? _hostResolver;
|
||||
private readonly CapabilityInvoker _invoker;
|
||||
private readonly ILogger<DriverNodeManager> _logger;
|
||||
|
||||
@@ -75,6 +76,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
_driver = driver;
|
||||
_readable = driver as IReadable;
|
||||
_writable = driver as IWritable;
|
||||
_hostResolver = driver as IPerCallHostResolver;
|
||||
_invoker = invoker;
|
||||
_authzGate = authzGate;
|
||||
_scopeResolver = scopeResolver;
|
||||
@@ -83,6 +85,21 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
protected override NodeStateCollection LoadPredefinedNodes(ISystemContext context) => new();
|
||||
|
||||
/// <summary>
|
||||
/// Resolve the host name fed to the Phase 6.1 CapabilityInvoker for a per-tag call.
|
||||
/// Multi-host drivers that implement <see cref="IPerCallHostResolver"/> get their
|
||||
/// per-PLC isolation (decision #144); single-host drivers + drivers that don't
|
||||
/// implement the resolver fall back to the DriverInstanceId — preserves existing
|
||||
/// Phase 6.1 pipeline-key semantics for those drivers.
|
||||
/// </summary>
|
||||
private string ResolveHostFor(string fullReference)
|
||||
{
|
||||
if (_hostResolver is null) return _driver.DriverInstanceId;
|
||||
|
||||
var resolved = _hostResolver.ResolveHost(fullReference);
|
||||
return string.IsNullOrWhiteSpace(resolved) ? _driver.DriverInstanceId : resolved;
|
||||
}
|
||||
|
||||
public override void CreateAddressSpace(IDictionary<NodeId, IList<IReference>> externalReferences)
|
||||
{
|
||||
lock (Lock)
|
||||
@@ -224,7 +241,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
|
||||
var result = _invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
_driver.DriverInstanceId,
|
||||
ResolveHostFor(fullRef),
|
||||
async ct => (IReadOnlyList<DataValueSnapshot>)await _readable.ReadAsync([fullRef], ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
if (result.Count == 0)
|
||||
@@ -439,7 +456,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
var isIdempotent = _writeIdempotentByFullRef.GetValueOrDefault(fullRef!, false);
|
||||
var capturedValue = value;
|
||||
var results = _invoker.ExecuteWriteAsync(
|
||||
_driver.DriverInstanceId,
|
||||
ResolveHostFor(fullRef!),
|
||||
isIdempotent,
|
||||
async ct => (IReadOnlyList<WriteResult>)await _writable.WriteAsync(
|
||||
[new DriverWriteRequest(fullRef!, capturedValue)],
|
||||
@@ -538,7 +555,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
{
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
ResolveHostFor(fullRef),
|
||||
async ct => await History.ReadRawAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
@@ -612,7 +629,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
{
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
ResolveHostFor(fullRef),
|
||||
async ct => await History.ReadProcessedAsync(
|
||||
fullRef,
|
||||
details.StartTime,
|
||||
@@ -679,7 +696,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
{
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
ResolveHostFor(fullRef),
|
||||
async ct => await History.ReadAtTimeAsync(fullRef, requestedTimes, ct).ConfigureAwait(false),
|
||||
CancellationToken.None).AsTask().GetAwaiter().GetResult();
|
||||
|
||||
@@ -749,7 +766,7 @@ public sealed class DriverNodeManager : CustomNodeManager2, IAddressSpaceBuilder
|
||||
{
|
||||
var driverResult = _invoker.ExecuteAsync(
|
||||
DriverCapability.HistoryRead,
|
||||
_driver.DriverInstanceId,
|
||||
fullRef is null ? _driver.DriverInstanceId : ResolveHostFor(fullRef),
|
||||
async ct => await History.ReadEventsAsync(
|
||||
sourceName: fullRef,
|
||||
startUtc: details.StartTime,
|
||||
|
||||
@@ -27,6 +27,8 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
private readonly AuthorizationGate? _authzGate;
|
||||
private readonly NodeScopeResolver? _scopeResolver;
|
||||
private readonly StaleConfigFlag? _staleConfigFlag;
|
||||
private readonly Func<string, ZB.MOM.WW.OtOpcUa.Core.Abstractions.DriverTier>? _tierLookup;
|
||||
private readonly Func<string, string?>? _resilienceConfigLookup;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly ILogger<OpcUaApplicationHost> _logger;
|
||||
private ApplicationInstance? _application;
|
||||
@@ -39,7 +41,9 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
DriverResiliencePipelineBuilder? pipelineBuilder = null,
|
||||
AuthorizationGate? authzGate = null,
|
||||
NodeScopeResolver? scopeResolver = null,
|
||||
StaleConfigFlag? staleConfigFlag = null)
|
||||
StaleConfigFlag? staleConfigFlag = null,
|
||||
Func<string, ZB.MOM.WW.OtOpcUa.Core.Abstractions.DriverTier>? tierLookup = null,
|
||||
Func<string, string?>? resilienceConfigLookup = null)
|
||||
{
|
||||
_options = options;
|
||||
_driverHost = driverHost;
|
||||
@@ -48,6 +52,8 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
_authzGate = authzGate;
|
||||
_scopeResolver = scopeResolver;
|
||||
_staleConfigFlag = staleConfigFlag;
|
||||
_tierLookup = tierLookup;
|
||||
_resilienceConfigLookup = resilienceConfigLookup;
|
||||
_loggerFactory = loggerFactory;
|
||||
_logger = logger;
|
||||
}
|
||||
@@ -75,7 +81,8 @@ public sealed class OpcUaApplicationHost : IAsyncDisposable
|
||||
$"OPC UA application certificate could not be validated or created in {_options.PkiStoreRoot}");
|
||||
|
||||
_server = new OtOpcUaServer(_driverHost, _authenticator, _pipelineBuilder, _loggerFactory,
|
||||
authzGate: _authzGate, scopeResolver: _scopeResolver);
|
||||
authzGate: _authzGate, scopeResolver: _scopeResolver,
|
||||
tierLookup: _tierLookup, resilienceConfigLookup: _resilienceConfigLookup);
|
||||
await _application.Start(_server).ConfigureAwait(false);
|
||||
|
||||
_logger.LogInformation("OPC UA server started — endpoint={Endpoint} driverCount={Count}",
|
||||
|
||||
@@ -23,6 +23,8 @@ public sealed class OtOpcUaServer : StandardServer
|
||||
private readonly DriverResiliencePipelineBuilder _pipelineBuilder;
|
||||
private readonly AuthorizationGate? _authzGate;
|
||||
private readonly NodeScopeResolver? _scopeResolver;
|
||||
private readonly Func<string, DriverTier>? _tierLookup;
|
||||
private readonly Func<string, string?>? _resilienceConfigLookup;
|
||||
private readonly ILoggerFactory _loggerFactory;
|
||||
private readonly List<DriverNodeManager> _driverNodeManagers = new();
|
||||
|
||||
@@ -32,13 +34,17 @@ public sealed class OtOpcUaServer : StandardServer
|
||||
DriverResiliencePipelineBuilder pipelineBuilder,
|
||||
ILoggerFactory loggerFactory,
|
||||
AuthorizationGate? authzGate = null,
|
||||
NodeScopeResolver? scopeResolver = null)
|
||||
NodeScopeResolver? scopeResolver = null,
|
||||
Func<string, DriverTier>? tierLookup = null,
|
||||
Func<string, string?>? resilienceConfigLookup = null)
|
||||
{
|
||||
_driverHost = driverHost;
|
||||
_authenticator = authenticator;
|
||||
_pipelineBuilder = pipelineBuilder;
|
||||
_authzGate = authzGate;
|
||||
_scopeResolver = scopeResolver;
|
||||
_tierLookup = tierLookup;
|
||||
_resilienceConfigLookup = resilienceConfigLookup;
|
||||
_loggerFactory = loggerFactory;
|
||||
}
|
||||
|
||||
@@ -59,10 +65,16 @@ public sealed class OtOpcUaServer : StandardServer
|
||||
if (driver is null) continue;
|
||||
|
||||
var logger = _loggerFactory.CreateLogger<DriverNodeManager>();
|
||||
// Per-driver resilience options: default Tier A pending Stream B.1 which wires
|
||||
// per-type tiers into DriverTypeRegistry. Read ResilienceConfig JSON from the
|
||||
// DriverInstance row in a follow-up PR; for now every driver gets Tier A defaults.
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||
// Per-driver resilience options: tier comes from lookup (Phase 6.1 Stream B.1
|
||||
// DriverTypeRegistry in the prod wire-up) or falls back to Tier A. ResilienceConfig
|
||||
// JSON comes from the DriverInstance row via the optional lookup Func; parser
|
||||
// layers JSON overrides on top of tier defaults (Phase 6.1 Stream A.2).
|
||||
var tier = _tierLookup?.Invoke(driver.DriverType) ?? DriverTier.A;
|
||||
var resilienceJson = _resilienceConfigLookup?.Invoke(driver.DriverInstanceId);
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(tier, resilienceJson, out var diag);
|
||||
if (diag is not null)
|
||||
logger.LogWarning("ResilienceConfig parse diagnostic for driver {DriverId}: {Diag}", driver.DriverInstanceId, diag);
|
||||
|
||||
var invoker = new CapabilityInvoker(_pipelineBuilder, driver.DriverInstanceId, () => options, driver.DriverType);
|
||||
var manager = new DriverNodeManager(server, configuration, driver, invoker, logger,
|
||||
authzGate: _authzGate, scopeResolver: _scopeResolver);
|
||||
|
||||
@@ -0,0 +1,165 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class EquipmentImportBatchServiceTests : IDisposable
|
||||
{
|
||||
private readonly OtOpcUaConfigDbContext _db;
|
||||
private readonly EquipmentImportBatchService _svc;
|
||||
|
||||
public EquipmentImportBatchServiceTests()
|
||||
{
|
||||
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||
.UseInMemoryDatabase($"import-batch-{Guid.NewGuid():N}")
|
||||
.Options;
|
||||
_db = new OtOpcUaConfigDbContext(options);
|
||||
_svc = new EquipmentImportBatchService(_db);
|
||||
}
|
||||
|
||||
public void Dispose() => _db.Dispose();
|
||||
|
||||
private static EquipmentCsvRow Row(string zTag, string name = "eq-1") => new()
|
||||
{
|
||||
ZTag = zTag,
|
||||
MachineCode = "mc",
|
||||
SAPID = "sap",
|
||||
EquipmentId = "eq-id",
|
||||
EquipmentUuid = Guid.NewGuid().ToString(),
|
||||
Name = name,
|
||||
UnsAreaName = "area",
|
||||
UnsLineName = "line",
|
||||
};
|
||||
|
||||
[Fact]
|
||||
public async Task CreateBatch_PopulatesId_AndTimestamp()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
|
||||
batch.Id.ShouldNotBe(Guid.Empty);
|
||||
batch.CreatedAtUtc.ShouldBeGreaterThan(DateTime.UtcNow.AddMinutes(-1));
|
||||
batch.RowsStaged.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task StageRows_AcceptedAndRejected_AllPersist()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
|
||||
await _svc.StageRowsAsync(batch.Id,
|
||||
acceptedRows: [Row("z-1"), Row("z-2")],
|
||||
rejectedRows: [new EquipmentCsvRowError(LineNumber: 5, Reason: "duplicate ZTag")],
|
||||
CancellationToken.None);
|
||||
|
||||
var reloaded = await _db.EquipmentImportBatches.Include(b => b.Rows).FirstAsync(b => b.Id == batch.Id);
|
||||
reloaded.RowsStaged.ShouldBe(3);
|
||||
reloaded.RowsAccepted.ShouldBe(2);
|
||||
reloaded.RowsRejected.ShouldBe(1);
|
||||
reloaded.Rows.Count.ShouldBe(3);
|
||||
reloaded.Rows.Count(r => r.IsAccepted).ShouldBe(2);
|
||||
reloaded.Rows.Single(r => !r.IsAccepted).RejectReason.ShouldBe("duplicate ZTag");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DropBatch_RemovesBatch_AndCascades_Rows()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||
|
||||
await _svc.DropBatchAsync(batch.Id, CancellationToken.None);
|
||||
|
||||
(await _db.EquipmentImportBatches.AnyAsync(b => b.Id == batch.Id)).ShouldBeFalse();
|
||||
(await _db.EquipmentImportRows.AnyAsync(r => r.BatchId == batch.Id)).ShouldBeFalse("cascaded delete clears rows");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DropBatch_AfterFinalise_Throws()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||
await _svc.FinaliseBatchAsync(batch.Id, generationId: 1, driverInstanceIdForRows: "drv-1", unsLineIdForRows: "line-1", CancellationToken.None);
|
||||
|
||||
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||
() => _svc.DropBatchAsync(batch.Id, CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Finalise_AcceptedRows_BecomeEquipment()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(batch.Id,
|
||||
[Row("z-1", name: "alpha"), Row("z-2", name: "beta")],
|
||||
rejectedRows: [new EquipmentCsvRowError(1, "rejected")],
|
||||
CancellationToken.None);
|
||||
|
||||
await _svc.FinaliseBatchAsync(batch.Id, 5, "drv-modbus", "line-warsaw", CancellationToken.None);
|
||||
|
||||
var equipment = await _db.Equipment.Where(e => e.GenerationId == 5).ToListAsync();
|
||||
equipment.Count.ShouldBe(2);
|
||||
equipment.Select(e => e.Name).ShouldBe(["alpha", "beta"], ignoreOrder: true);
|
||||
equipment.All(e => e.DriverInstanceId == "drv-modbus").ShouldBeTrue();
|
||||
equipment.All(e => e.UnsLineId == "line-warsaw").ShouldBeTrue();
|
||||
|
||||
var reloaded = await _db.EquipmentImportBatches.FirstAsync(b => b.Id == batch.Id);
|
||||
reloaded.FinalisedAtUtc.ShouldNotBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Finalise_Twice_Throws()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||
await _svc.FinaliseBatchAsync(batch.Id, 1, "drv", "line", CancellationToken.None);
|
||||
|
||||
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||
() => _svc.FinaliseBatchAsync(batch.Id, 2, "drv", "line", CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Finalise_MissingBatch_Throws()
|
||||
{
|
||||
await Should.ThrowAsync<ImportBatchNotFoundException>(
|
||||
() => _svc.FinaliseBatchAsync(Guid.NewGuid(), 1, "drv", "line", CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task Stage_After_Finalise_Throws()
|
||||
{
|
||||
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||
await _svc.FinaliseBatchAsync(batch.Id, 1, "drv", "line", CancellationToken.None);
|
||||
|
||||
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||
() => _svc.StageRowsAsync(batch.Id, [Row("z-2")], [], CancellationToken.None));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task ListByUser_FiltersByCreator_AndFinalised()
|
||||
{
|
||||
var a = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||
var b = await _svc.CreateBatchAsync("c1", "bob", CancellationToken.None);
|
||||
await _svc.StageRowsAsync(a.Id, [Row("z-a")], [], CancellationToken.None);
|
||||
await _svc.FinaliseBatchAsync(a.Id, 1, "d", "l", CancellationToken.None);
|
||||
_ = b;
|
||||
|
||||
var aliceOpen = await _svc.ListByUserAsync("alice", includeFinalised: false, CancellationToken.None);
|
||||
aliceOpen.ShouldBeEmpty("alice's only batch is finalised");
|
||||
|
||||
var aliceAll = await _svc.ListByUserAsync("alice", includeFinalised: true, CancellationToken.None);
|
||||
aliceAll.Count.ShouldBe(1);
|
||||
|
||||
var bobOpen = await _svc.ListByUserAsync("bob", includeFinalised: false, CancellationToken.None);
|
||||
bobOpen.Count.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DropBatch_Unknown_IsNoOp()
|
||||
{
|
||||
await _svc.DropBatchAsync(Guid.NewGuid(), CancellationToken.None);
|
||||
// no throw
|
||||
}
|
||||
}
|
||||
@@ -31,6 +31,8 @@ public sealed class SchemaComplianceTests
|
||||
"DriverHostStatus",
|
||||
"DriverInstanceResilienceStatus",
|
||||
"LdapGroupRoleMapping",
|
||||
"EquipmentImportBatch",
|
||||
"EquipmentImportRow",
|
||||
};
|
||||
|
||||
var actual = QueryStrings(@"
|
||||
@@ -78,6 +80,7 @@ WHERE i.is_unique = 1 AND i.has_filter = 1;",
|
||||
"CK_ServerCluster_RedundancyMode_NodeCount",
|
||||
"CK_Device_DeviceConfig_IsJson",
|
||||
"CK_DriverInstance_DriverConfig_IsJson",
|
||||
"CK_DriverInstance_ResilienceConfig_IsJson",
|
||||
"CK_PollGroup_IntervalMs_Min",
|
||||
"CK_Tag_TagConfig_IsJson",
|
||||
"CK_ConfigAuditLog_DetailsJson_IsJson",
|
||||
|
||||
@@ -0,0 +1,166 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class DriverResilienceOptionsParserTests
|
||||
{
|
||||
[Fact]
|
||||
public void NullJson_ReturnsPureTierDefaults()
|
||||
{
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, null, out var diag);
|
||||
|
||||
diag.ShouldBeNull();
|
||||
options.Tier.ShouldBe(DriverTier.A);
|
||||
options.Resolve(DriverCapability.Read).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void WhitespaceJson_ReturnsDefaults()
|
||||
{
|
||||
DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.B, " ", out var diag);
|
||||
diag.ShouldBeNull();
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void MalformedJson_FallsBack_WithDiagnostic()
|
||||
{
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, "{not json", out var diag);
|
||||
|
||||
diag.ShouldNotBeNull();
|
||||
diag.ShouldContain("malformed");
|
||||
options.Tier.ShouldBe(DriverTier.A);
|
||||
options.Resolve(DriverCapability.Read).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void EmptyObject_ReturnsDefaults()
|
||||
{
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, "{}", out var diag);
|
||||
|
||||
diag.ShouldBeNull();
|
||||
options.Resolve(DriverCapability.Write).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ReadOverride_MergedIntoTierDefaults()
|
||||
{
|
||||
var json = """
|
||||
{
|
||||
"capabilityPolicies": {
|
||||
"Read": { "timeoutSeconds": 5, "retryCount": 7, "breakerFailureThreshold": 2 }
|
||||
}
|
||||
}
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, json, out var diag);
|
||||
|
||||
diag.ShouldBeNull();
|
||||
var read = options.Resolve(DriverCapability.Read);
|
||||
read.TimeoutSeconds.ShouldBe(5);
|
||||
read.RetryCount.ShouldBe(7);
|
||||
read.BreakerFailureThreshold.ShouldBe(2);
|
||||
|
||||
// Other capabilities untouched
|
||||
options.Resolve(DriverCapability.Write).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Write]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PartialPolicy_FillsMissingFieldsFromTierDefault()
|
||||
{
|
||||
var json = """
|
||||
{
|
||||
"capabilityPolicies": {
|
||||
"Read": { "retryCount": 10 }
|
||||
}
|
||||
}
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, json, out _);
|
||||
|
||||
var read = options.Resolve(DriverCapability.Read);
|
||||
var tierDefault = DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read];
|
||||
read.RetryCount.ShouldBe(10);
|
||||
read.TimeoutSeconds.ShouldBe(tierDefault.TimeoutSeconds, "partial override; timeout falls back to tier default");
|
||||
read.BreakerFailureThreshold.ShouldBe(tierDefault.BreakerFailureThreshold);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void BulkheadOverrides_AreHonored()
|
||||
{
|
||||
var json = """
|
||||
{ "bulkheadMaxConcurrent": 100, "bulkheadMaxQueue": 500 }
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.B, json, out _);
|
||||
|
||||
options.BulkheadMaxConcurrent.ShouldBe(100);
|
||||
options.BulkheadMaxQueue.ShouldBe(500);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void UnknownCapability_Surfaces_InDiagnostic_ButDoesNotFail()
|
||||
{
|
||||
var json = """
|
||||
{
|
||||
"capabilityPolicies": {
|
||||
"InventedCapability": { "timeoutSeconds": 99 }
|
||||
}
|
||||
}
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, json, out var diag);
|
||||
|
||||
diag.ShouldNotBeNull();
|
||||
diag.ShouldContain("InventedCapability");
|
||||
// Known capabilities untouched.
|
||||
options.Resolve(DriverCapability.Read).ShouldBe(
|
||||
DriverResilienceOptions.GetTierDefaults(DriverTier.A)[DriverCapability.Read]);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void PropertyNames_AreCaseInsensitive()
|
||||
{
|
||||
var json = """
|
||||
{ "BULKHEADMAXCONCURRENT": 42 }
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, json, out _);
|
||||
|
||||
options.BulkheadMaxConcurrent.ShouldBe(42);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CapabilityName_IsCaseInsensitive()
|
||||
{
|
||||
var json = """
|
||||
{ "capabilityPolicies": { "read": { "retryCount": 99 } } }
|
||||
""";
|
||||
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(DriverTier.A, json, out var diag);
|
||||
|
||||
diag.ShouldBeNull();
|
||||
options.Resolve(DriverCapability.Read).RetryCount.ShouldBe(99);
|
||||
}
|
||||
|
||||
[Theory]
|
||||
[InlineData(DriverTier.A)]
|
||||
[InlineData(DriverTier.B)]
|
||||
[InlineData(DriverTier.C)]
|
||||
public void EveryTier_WithEmptyJson_RoundTrips_Its_Defaults(DriverTier tier)
|
||||
{
|
||||
var options = DriverResilienceOptionsParser.ParseOrDefaults(tier, "{}", out var diag);
|
||||
|
||||
diag.ShouldBeNull();
|
||||
options.Tier.ShouldBe(tier);
|
||||
foreach (var cap in Enum.GetValues<DriverCapability>())
|
||||
options.Resolve(cap).ShouldBe(DriverResilienceOptions.GetTierDefaults(tier)[cap]);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,130 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class InFlightCounterTests
|
||||
{
|
||||
[Fact]
|
||||
public void StartThenComplete_NetsToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void NestedStarts_SumDepth()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
|
||||
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void CompleteBeforeStart_ClampedToZero()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallComplete("drv", "host-a");
|
||||
|
||||
// A stray Complete without a matching Start shouldn't drive the counter negative.
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void DifferentHosts_TrackIndependently()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-a");
|
||||
tracker.RecordCallStart("drv", "host-b");
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void ConcurrentStarts_DoNotLose_Count()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
|
||||
|
||||
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
driverType: "Modbus",
|
||||
statusTracker: tracker);
|
||||
|
||||
var observedMidCall = -1;
|
||||
await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
"plc-1",
|
||||
async _ =>
|
||||
{
|
||||
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
|
||||
await Task.Yield();
|
||||
return 42;
|
||||
},
|
||||
CancellationToken.None);
|
||||
|
||||
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: tracker);
|
||||
|
||||
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||
await invoker.ExecuteAsync<int>(
|
||||
DriverCapability.Write,
|
||||
"plc-1",
|
||||
_ => throw new InvalidOperationException("boom"),
|
||||
CancellationToken.None));
|
||||
|
||||
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
|
||||
"finally-block must decrement even when call-site throws");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
|
||||
{
|
||||
var invoker = new CapabilityInvoker(
|
||||
new DriverResiliencePipelineBuilder(),
|
||||
"drv-live",
|
||||
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||
statusTracker: null);
|
||||
|
||||
var result = await invoker.ExecuteAsync(
|
||||
DriverCapability.Read, "host-1",
|
||||
_ => ValueTask.FromResult(7),
|
||||
CancellationToken.None);
|
||||
|
||||
result.ShouldBe(7);
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,110 @@
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||
|
||||
/// <summary>
|
||||
/// Exercises the per-call host resolver contract against the shared
|
||||
/// <see cref="DriverResiliencePipelineBuilder"/> + <see cref="CapabilityInvoker"/> — one
|
||||
/// dead PLC behind a multi-device driver must NOT open the breaker for healthy sibling
|
||||
/// PLCs (decision #144).
|
||||
/// </summary>
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class PerCallHostResolverDispatchTests
|
||||
{
|
||||
private sealed class StaticResolver : IPerCallHostResolver
|
||||
{
|
||||
private readonly Dictionary<string, string> _map;
|
||||
public StaticResolver(Dictionary<string, string> map) => _map = map;
|
||||
public string ResolveHost(string fullReference) =>
|
||||
_map.TryGetValue(fullReference, out var host) ? host : string.Empty;
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task DeadPlc_DoesNotOpenBreaker_For_HealthyPlc_With_Resolver()
|
||||
{
|
||||
// Two PLCs behind one driver. Dead PLC keeps failing; healthy PLC must keep serving.
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.B };
|
||||
var invoker = new CapabilityInvoker(builder, "drv-modbus", () => options);
|
||||
|
||||
var resolver = new StaticResolver(new Dictionary<string, string>
|
||||
{
|
||||
["tag-on-dead"] = "plc-dead",
|
||||
["tag-on-alive"] = "plc-alive",
|
||||
});
|
||||
|
||||
var threshold = options.Resolve(DriverCapability.Read).BreakerFailureThreshold;
|
||||
for (var i = 0; i < threshold + 3; i++)
|
||||
{
|
||||
await Should.ThrowAsync<Exception>(async () =>
|
||||
await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
hostName: resolver.ResolveHost("tag-on-dead"),
|
||||
_ => throw new InvalidOperationException("plc-dead unreachable"),
|
||||
CancellationToken.None));
|
||||
}
|
||||
|
||||
// Healthy PLC's pipeline is in a different bucket; the first call should succeed
|
||||
// without hitting the dead-PLC breaker.
|
||||
var aliveAttempts = 0;
|
||||
await invoker.ExecuteAsync(
|
||||
DriverCapability.Read,
|
||||
hostName: resolver.ResolveHost("tag-on-alive"),
|
||||
_ => { aliveAttempts++; return ValueTask.FromResult("ok"); },
|
||||
CancellationToken.None);
|
||||
|
||||
aliveAttempts.ShouldBe(1, "decision #144 — per-PLC isolation keeps healthy PLCs serving");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public void Resolver_EmptyString_Treated_As_Single_Host_Fallback()
|
||||
{
|
||||
var resolver = new StaticResolver(new Dictionary<string, string>
|
||||
{
|
||||
["tag-unknown"] = "",
|
||||
});
|
||||
|
||||
resolver.ResolveHost("tag-unknown").ShouldBe("");
|
||||
resolver.ResolveHost("not-in-map").ShouldBe("", "unknown refs return empty so dispatch falls back to single-host");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WithoutResolver_SameHost_Shares_One_Pipeline()
|
||||
{
|
||||
// Without a resolver all calls share the DriverInstanceId pipeline — that's the
|
||||
// pre-decision-#144 behavior single-host drivers should keep.
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.A };
|
||||
var invoker = new CapabilityInvoker(builder, "drv-single", () => options);
|
||||
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "drv-single",
|
||||
_ => ValueTask.FromResult("a"), CancellationToken.None);
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, "drv-single",
|
||||
_ => ValueTask.FromResult("b"), CancellationToken.None);
|
||||
|
||||
builder.CachedPipelineCount.ShouldBe(1, "single-host drivers share one pipeline");
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task WithResolver_TwoHosts_Get_Two_Pipelines()
|
||||
{
|
||||
var builder = new DriverResiliencePipelineBuilder();
|
||||
var options = new DriverResilienceOptions { Tier = DriverTier.B };
|
||||
var invoker = new CapabilityInvoker(builder, "drv-modbus", () => options);
|
||||
var resolver = new StaticResolver(new Dictionary<string, string>
|
||||
{
|
||||
["tag-a"] = "plc-a",
|
||||
["tag-b"] = "plc-b",
|
||||
});
|
||||
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, resolver.ResolveHost("tag-a"),
|
||||
_ => ValueTask.FromResult(1), CancellationToken.None);
|
||||
await invoker.ExecuteAsync(DriverCapability.Read, resolver.ResolveHost("tag-b"),
|
||||
_ => ValueTask.FromResult(2), CancellationToken.None);
|
||||
|
||||
builder.CachedPipelineCount.ShouldBe(2, "each host keyed on its own pipeline");
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,161 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.Extensions.Logging.Abstractions;
|
||||
using Shouldly;
|
||||
using Xunit;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||
using ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||
|
||||
[Trait("Category", "Unit")]
|
||||
public sealed class ResilienceStatusPublisherHostedServiceTests : IDisposable
|
||||
{
|
||||
private static readonly DateTime T0 = new(2026, 4, 19, 12, 0, 0, DateTimeKind.Utc);
|
||||
|
||||
private sealed class FakeClock : TimeProvider
|
||||
{
|
||||
public DateTime Utc { get; set; } = T0;
|
||||
public override DateTimeOffset GetUtcNow() => new(Utc, TimeSpan.Zero);
|
||||
}
|
||||
|
||||
private sealed class InMemoryDbContextFactory : IDbContextFactory<OtOpcUaConfigDbContext>
|
||||
{
|
||||
private readonly DbContextOptions<OtOpcUaConfigDbContext> _options;
|
||||
public InMemoryDbContextFactory(string dbName)
|
||||
{
|
||||
_options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||
.UseInMemoryDatabase(dbName)
|
||||
.Options;
|
||||
}
|
||||
public OtOpcUaConfigDbContext CreateDbContext() => new(_options);
|
||||
}
|
||||
|
||||
private readonly string _dbName = $"resilience-pub-{Guid.NewGuid():N}";
|
||||
private readonly InMemoryDbContextFactory _factory;
|
||||
private readonly OtOpcUaConfigDbContext _readCtx;
|
||||
|
||||
public ResilienceStatusPublisherHostedServiceTests()
|
||||
{
|
||||
_factory = new InMemoryDbContextFactory(_dbName);
|
||||
_readCtx = _factory.CreateDbContext();
|
||||
}
|
||||
|
||||
public void Dispose() => _readCtx.Dispose();
|
||||
|
||||
[Fact]
|
||||
public async Task EmptyTracker_Tick_NoOp_NoRowsWritten()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
host.TickCount.ShouldBe(1);
|
||||
(await _readCtx.DriverInstanceResilienceStatuses.CountAsync()).ShouldBe(0);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SingleHost_OnePairWithCounters_UpsertsNewRow()
|
||||
{
|
||||
var clock = new FakeClock();
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||
tracker.RecordBreakerOpen("drv-1", "plc-a", T0.AddSeconds(1));
|
||||
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance,
|
||||
timeProvider: clock);
|
||||
|
||||
clock.Utc = T0.AddSeconds(2);
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||
row.DriverInstanceId.ShouldBe("drv-1");
|
||||
row.HostName.ShouldBe("plc-a");
|
||||
row.ConsecutiveFailures.ShouldBe(2);
|
||||
row.LastCircuitBreakerOpenUtc.ShouldBe(T0.AddSeconds(1));
|
||||
row.LastSampledUtc.ShouldBe(T0.AddSeconds(2));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task SecondTick_UpdatesExistingRow_InPlace()
|
||||
{
|
||||
var clock = new FakeClock();
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance,
|
||||
timeProvider: clock);
|
||||
|
||||
clock.Utc = T0.AddSeconds(5);
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
// Second tick: success resets the counter.
|
||||
tracker.RecordSuccess("drv-1", "plc-a", T0.AddSeconds(6));
|
||||
clock.Utc = T0.AddSeconds(10);
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
(await _readCtx.DriverInstanceResilienceStatuses.CountAsync()).ShouldBe(1, "one row, updated in place");
|
||||
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||
row.ConsecutiveFailures.ShouldBe(0);
|
||||
row.LastSampledUtc.ShouldBe(T0.AddSeconds(10));
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task MultipleHosts_BothPersist_Independently()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||
tracker.RecordFailure("drv-1", "plc-b", T0);
|
||||
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
var rows = await _readCtx.DriverInstanceResilienceStatuses
|
||||
.OrderBy(r => r.HostName)
|
||||
.ToListAsync();
|
||||
rows.Count.ShouldBe(2);
|
||||
rows[0].HostName.ShouldBe("plc-a");
|
||||
rows[0].ConsecutiveFailures.ShouldBe(2);
|
||||
rows[1].HostName.ShouldBe("plc-b");
|
||||
rows[1].ConsecutiveFailures.ShouldBe(1);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task FootprintCounters_Persist()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
tracker.RecordFootprint("drv-1", "plc-a",
|
||||
baselineBytes: 100_000_000, currentBytes: 150_000_000, T0);
|
||||
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||
row.BaselineFootprintBytes.ShouldBe(100_000_000);
|
||||
row.CurrentFootprintBytes.ShouldBe(150_000_000);
|
||||
}
|
||||
|
||||
[Fact]
|
||||
public async Task TickCount_Advances_OnEveryCall()
|
||||
{
|
||||
var tracker = new DriverResilienceStatusTracker();
|
||||
var host = new ResilienceStatusPublisherHostedService(
|
||||
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
await host.PersistOnceAsync(CancellationToken.None);
|
||||
|
||||
host.TickCount.ShouldBe(3);
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user