Compare commits
8 Commits
phase-6-1-
...
abcip-pr1-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4ab587707f | ||
| 2172d49d2e | |||
|
|
ae8f226e45 | ||
| e032045247 | |||
|
|
ad131932d3 | ||
| 98b69ff4f9 | |||
|
|
016122841b | ||
| 244a36e03e |
@@ -0,0 +1,207 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Staged-import orchestrator per Phase 6.4 Stream B.2-B.4. Covers the four operator
|
||||||
|
/// actions: CreateBatch → StageRows (chunked) → FinaliseBatch (atomic apply into
|
||||||
|
/// <see cref="Equipment"/>) → DropBatch (rollback of pre-finalise state).
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>FinaliseBatch runs inside one EF transaction + bulk-inserts accepted rows into
|
||||||
|
/// <see cref="Equipment"/>. Rejected rows stay behind as audit evidence; the batch row
|
||||||
|
/// gains <see cref="EquipmentImportBatch.FinalisedAtUtc"/> so future writes know it's
|
||||||
|
/// archived. DropBatch removes the batch + its cascaded rows.</para>
|
||||||
|
///
|
||||||
|
/// <para>Idempotence: calling FinaliseBatch twice throws <see cref="ImportBatchAlreadyFinalisedException"/>
|
||||||
|
/// rather than double-inserting. Operator refreshes the admin page to see the first
|
||||||
|
/// finalise completed.</para>
|
||||||
|
///
|
||||||
|
/// <para>ExternalIdReservation merging (ZTag + SAPID uniqueness) is NOT done here — a
|
||||||
|
/// narrower follow-up wires it once the concurrent-insert test matrix is green.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class EquipmentImportBatchService(OtOpcUaConfigDbContext db)
|
||||||
|
{
|
||||||
|
/// <summary>Create a new empty batch header. Returns the row with Id populated.</summary>
|
||||||
|
public async Task<EquipmentImportBatch> CreateBatchAsync(string clusterId, string createdBy, CancellationToken ct)
|
||||||
|
{
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||||
|
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
||||||
|
|
||||||
|
var batch = new EquipmentImportBatch
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid(),
|
||||||
|
ClusterId = clusterId,
|
||||||
|
CreatedBy = createdBy,
|
||||||
|
CreatedAtUtc = DateTime.UtcNow,
|
||||||
|
};
|
||||||
|
db.EquipmentImportBatches.Add(batch);
|
||||||
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||||
|
return batch;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Stage one chunk of rows into the batch. Caller usually feeds
|
||||||
|
/// <see cref="EquipmentCsvImporter.Parse"/> output here — each
|
||||||
|
/// <see cref="EquipmentCsvRow"/> becomes one accepted <see cref="EquipmentImportRow"/>,
|
||||||
|
/// each rejected parser error becomes one row with <see cref="EquipmentImportRow.IsAccepted"/> false.
|
||||||
|
/// </summary>
|
||||||
|
public async Task StageRowsAsync(
|
||||||
|
Guid batchId,
|
||||||
|
IReadOnlyList<EquipmentCsvRow> acceptedRows,
|
||||||
|
IReadOnlyList<EquipmentCsvRowError> rejectedRows,
|
||||||
|
CancellationToken ct)
|
||||||
|
{
|
||||||
|
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false)
|
||||||
|
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||||
|
|
||||||
|
if (batch.FinalisedAtUtc is not null)
|
||||||
|
throw new ImportBatchAlreadyFinalisedException(
|
||||||
|
$"Batch {batchId} finalised at {batch.FinalisedAtUtc:o}; no more rows can be staged.");
|
||||||
|
|
||||||
|
foreach (var row in acceptedRows)
|
||||||
|
{
|
||||||
|
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid(),
|
||||||
|
BatchId = batchId,
|
||||||
|
IsAccepted = true,
|
||||||
|
ZTag = row.ZTag,
|
||||||
|
MachineCode = row.MachineCode,
|
||||||
|
SAPID = row.SAPID,
|
||||||
|
EquipmentId = row.EquipmentId,
|
||||||
|
EquipmentUuid = row.EquipmentUuid,
|
||||||
|
Name = row.Name,
|
||||||
|
UnsAreaName = row.UnsAreaName,
|
||||||
|
UnsLineName = row.UnsLineName,
|
||||||
|
Manufacturer = row.Manufacturer,
|
||||||
|
Model = row.Model,
|
||||||
|
SerialNumber = row.SerialNumber,
|
||||||
|
HardwareRevision = row.HardwareRevision,
|
||||||
|
SoftwareRevision = row.SoftwareRevision,
|
||||||
|
YearOfConstruction = row.YearOfConstruction,
|
||||||
|
AssetLocation = row.AssetLocation,
|
||||||
|
ManufacturerUri = row.ManufacturerUri,
|
||||||
|
DeviceManualUri = row.DeviceManualUri,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
foreach (var error in rejectedRows)
|
||||||
|
{
|
||||||
|
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||||
|
{
|
||||||
|
Id = Guid.NewGuid(),
|
||||||
|
BatchId = batchId,
|
||||||
|
IsAccepted = false,
|
||||||
|
RejectReason = error.Reason,
|
||||||
|
LineNumberInFile = error.LineNumber,
|
||||||
|
// Required columns need values for EF; reject rows use sentinel placeholders.
|
||||||
|
ZTag = "", MachineCode = "", SAPID = "", EquipmentId = "", EquipmentUuid = "",
|
||||||
|
Name = "", UnsAreaName = "", UnsLineName = "",
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
batch.RowsStaged += acceptedRows.Count + rejectedRows.Count;
|
||||||
|
batch.RowsAccepted += acceptedRows.Count;
|
||||||
|
batch.RowsRejected += rejectedRows.Count;
|
||||||
|
|
||||||
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Drop the batch (pre-finalise rollback). Cascaded row delete removes staged rows.</summary>
|
||||||
|
public async Task DropBatchAsync(Guid batchId, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false);
|
||||||
|
if (batch is null) return;
|
||||||
|
if (batch.FinalisedAtUtc is not null)
|
||||||
|
throw new ImportBatchAlreadyFinalisedException(
|
||||||
|
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}; cannot drop.");
|
||||||
|
|
||||||
|
db.EquipmentImportBatches.Remove(batch);
|
||||||
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Atomic finalise. Inserts every accepted row into the live
|
||||||
|
/// <see cref="Equipment"/> table under the target generation + stamps
|
||||||
|
/// <see cref="EquipmentImportBatch.FinalisedAtUtc"/>. Failure rolls the whole tx
|
||||||
|
/// back — <see cref="Equipment"/> never partially mutates.
|
||||||
|
/// </summary>
|
||||||
|
public async Task FinaliseBatchAsync(
|
||||||
|
Guid batchId, long generationId, string driverInstanceIdForRows, string unsLineIdForRows, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var batch = await db.EquipmentImportBatches
|
||||||
|
.Include(b => b.Rows)
|
||||||
|
.FirstOrDefaultAsync(b => b.Id == batchId, ct)
|
||||||
|
.ConfigureAwait(false)
|
||||||
|
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||||
|
|
||||||
|
if (batch.FinalisedAtUtc is not null)
|
||||||
|
throw new ImportBatchAlreadyFinalisedException(
|
||||||
|
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}.");
|
||||||
|
|
||||||
|
// EF InMemory provider doesn't honour BeginTransaction; SQL Server provider does.
|
||||||
|
// Tests run the happy path under in-memory; production SQL Server runs the atomic tx.
|
||||||
|
var supportsTx = db.Database.IsRelational();
|
||||||
|
Microsoft.EntityFrameworkCore.Storage.IDbContextTransaction? tx = null;
|
||||||
|
if (supportsTx)
|
||||||
|
tx = await db.Database.BeginTransactionAsync(ct).ConfigureAwait(false);
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
foreach (var row in batch.Rows.Where(r => r.IsAccepted))
|
||||||
|
{
|
||||||
|
db.Equipment.Add(new Equipment
|
||||||
|
{
|
||||||
|
EquipmentRowId = Guid.NewGuid(),
|
||||||
|
GenerationId = generationId,
|
||||||
|
EquipmentId = row.EquipmentId,
|
||||||
|
EquipmentUuid = Guid.TryParse(row.EquipmentUuid, out var u) ? u : Guid.NewGuid(),
|
||||||
|
DriverInstanceId = driverInstanceIdForRows,
|
||||||
|
UnsLineId = unsLineIdForRows,
|
||||||
|
Name = row.Name,
|
||||||
|
MachineCode = row.MachineCode,
|
||||||
|
ZTag = row.ZTag,
|
||||||
|
SAPID = row.SAPID,
|
||||||
|
Manufacturer = row.Manufacturer,
|
||||||
|
Model = row.Model,
|
||||||
|
SerialNumber = row.SerialNumber,
|
||||||
|
HardwareRevision = row.HardwareRevision,
|
||||||
|
SoftwareRevision = row.SoftwareRevision,
|
||||||
|
YearOfConstruction = short.TryParse(row.YearOfConstruction, out var y) ? y : null,
|
||||||
|
AssetLocation = row.AssetLocation,
|
||||||
|
ManufacturerUri = row.ManufacturerUri,
|
||||||
|
DeviceManualUri = row.DeviceManualUri,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
batch.FinalisedAtUtc = DateTime.UtcNow;
|
||||||
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||||
|
if (tx is not null) await tx.CommitAsync(ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch
|
||||||
|
{
|
||||||
|
if (tx is not null) await tx.RollbackAsync(ct).ConfigureAwait(false);
|
||||||
|
throw;
|
||||||
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
if (tx is not null) await tx.DisposeAsync().ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>List batches created by the given user. Finalised batches are archived; include them on demand.</summary>
|
||||||
|
public async Task<IReadOnlyList<EquipmentImportBatch>> ListByUserAsync(string createdBy, bool includeFinalised, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var query = db.EquipmentImportBatches.AsNoTracking().Where(b => b.CreatedBy == createdBy);
|
||||||
|
if (!includeFinalised)
|
||||||
|
query = query.Where(b => b.FinalisedAtUtc == null);
|
||||||
|
return await query.OrderByDescending(b => b.CreatedAtUtc).ToListAsync(ct).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public sealed class ImportBatchNotFoundException(string message) : Exception(message);
|
||||||
|
public sealed class ImportBatchAlreadyFinalisedException(string message) : Exception(message);
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Staged equipment-import batch per Phase 6.4 Stream B.2. Rows land in the child
|
||||||
|
/// <see cref="EquipmentImportRow"/> table under a batch header; operator reviews + either
|
||||||
|
/// drops (via <c>DropImportBatch</c>) or finalises (via <c>FinaliseImportBatch</c>) in one
|
||||||
|
/// bounded transaction. The live <c>Equipment</c> table never sees partial state.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>User-scoped visibility: the preview modal only shows batches where
|
||||||
|
/// <see cref="CreatedBy"/> equals the current operator. Prevents accidental
|
||||||
|
/// cross-operator finalise during concurrent imports. An admin finalise / drop surface
|
||||||
|
/// can override this — tracked alongside the UI follow-up.</para>
|
||||||
|
///
|
||||||
|
/// <para><see cref="FinalisedAtUtc"/> stamps the moment the batch promoted from staging
|
||||||
|
/// into <c>Equipment</c>. Null = still in staging; non-null = archived / finalised.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class EquipmentImportBatch
|
||||||
|
{
|
||||||
|
public Guid Id { get; set; }
|
||||||
|
public required string ClusterId { get; set; }
|
||||||
|
public required string CreatedBy { get; set; }
|
||||||
|
public DateTime CreatedAtUtc { get; set; }
|
||||||
|
public int RowsStaged { get; set; }
|
||||||
|
public int RowsAccepted { get; set; }
|
||||||
|
public int RowsRejected { get; set; }
|
||||||
|
public DateTime? FinalisedAtUtc { get; set; }
|
||||||
|
|
||||||
|
public ICollection<EquipmentImportRow> Rows { get; set; } = [];
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// One staged row under an <see cref="EquipmentImportBatch"/>. Mirrors the decision #117
|
||||||
|
/// + decision #139 columns from the CSV importer's output + an
|
||||||
|
/// <see cref="IsAccepted"/> flag + a <see cref="RejectReason"/> string the preview modal
|
||||||
|
/// renders.
|
||||||
|
/// </summary>
|
||||||
|
public sealed class EquipmentImportRow
|
||||||
|
{
|
||||||
|
public Guid Id { get; set; }
|
||||||
|
public Guid BatchId { get; set; }
|
||||||
|
public int LineNumberInFile { get; set; }
|
||||||
|
public bool IsAccepted { get; set; }
|
||||||
|
public string? RejectReason { get; set; }
|
||||||
|
|
||||||
|
// Required (decision #117)
|
||||||
|
public required string ZTag { get; set; }
|
||||||
|
public required string MachineCode { get; set; }
|
||||||
|
public required string SAPID { get; set; }
|
||||||
|
public required string EquipmentId { get; set; }
|
||||||
|
public required string EquipmentUuid { get; set; }
|
||||||
|
public required string Name { get; set; }
|
||||||
|
public required string UnsAreaName { get; set; }
|
||||||
|
public required string UnsLineName { get; set; }
|
||||||
|
|
||||||
|
// Optional (decision #139 — OPC 40010 Identification)
|
||||||
|
public string? Manufacturer { get; set; }
|
||||||
|
public string? Model { get; set; }
|
||||||
|
public string? SerialNumber { get; set; }
|
||||||
|
public string? HardwareRevision { get; set; }
|
||||||
|
public string? SoftwareRevision { get; set; }
|
||||||
|
public string? YearOfConstruction { get; set; }
|
||||||
|
public string? AssetLocation { get; set; }
|
||||||
|
public string? ManufacturerUri { get; set; }
|
||||||
|
public string? DeviceManualUri { get; set; }
|
||||||
|
|
||||||
|
public EquipmentImportBatch? Batch { get; set; }
|
||||||
|
}
|
||||||
1505
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419185124_AddEquipmentImportBatch.Designer.cs
generated
Normal file
1505
src/ZB.MOM.WW.OtOpcUa.Configuration/Migrations/20260419185124_AddEquipmentImportBatch.Designer.cs
generated
Normal file
File diff suppressed because it is too large
Load Diff
@@ -0,0 +1,91 @@
|
|||||||
|
using System;
|
||||||
|
using Microsoft.EntityFrameworkCore.Migrations;
|
||||||
|
|
||||||
|
#nullable disable
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
public partial class AddEquipmentImportBatch : Migration
|
||||||
|
{
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.CreateTable(
|
||||||
|
name: "EquipmentImportBatch",
|
||||||
|
columns: table => new
|
||||||
|
{
|
||||||
|
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||||
|
ClusterId = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
CreatedBy = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||||
|
CreatedAtUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: false),
|
||||||
|
RowsStaged = table.Column<int>(type: "int", nullable: false),
|
||||||
|
RowsAccepted = table.Column<int>(type: "int", nullable: false),
|
||||||
|
RowsRejected = table.Column<int>(type: "int", nullable: false),
|
||||||
|
FinalisedAtUtc = table.Column<DateTime>(type: "datetime2(3)", nullable: true)
|
||||||
|
},
|
||||||
|
constraints: table =>
|
||||||
|
{
|
||||||
|
table.PrimaryKey("PK_EquipmentImportBatch", x => x.Id);
|
||||||
|
});
|
||||||
|
|
||||||
|
migrationBuilder.CreateTable(
|
||||||
|
name: "EquipmentImportRow",
|
||||||
|
columns: table => new
|
||||||
|
{
|
||||||
|
Id = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||||
|
BatchId = table.Column<Guid>(type: "uniqueidentifier", nullable: false),
|
||||||
|
LineNumberInFile = table.Column<int>(type: "int", nullable: false),
|
||||||
|
IsAccepted = table.Column<bool>(type: "bit", nullable: false),
|
||||||
|
RejectReason = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||||
|
ZTag = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||||
|
MachineCode = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||||
|
SAPID = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||||
|
EquipmentId = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
EquipmentUuid = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
Name = table.Column<string>(type: "nvarchar(128)", maxLength: 128, nullable: false),
|
||||||
|
UnsAreaName = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
UnsLineName = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: false),
|
||||||
|
Manufacturer = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||||
|
Model = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||||
|
SerialNumber = table.Column<string>(type: "nvarchar(256)", maxLength: 256, nullable: true),
|
||||||
|
HardwareRevision = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: true),
|
||||||
|
SoftwareRevision = table.Column<string>(type: "nvarchar(64)", maxLength: 64, nullable: true),
|
||||||
|
YearOfConstruction = table.Column<string>(type: "nvarchar(8)", maxLength: 8, nullable: true),
|
||||||
|
AssetLocation = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||||
|
ManufacturerUri = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true),
|
||||||
|
DeviceManualUri = table.Column<string>(type: "nvarchar(512)", maxLength: 512, nullable: true)
|
||||||
|
},
|
||||||
|
constraints: table =>
|
||||||
|
{
|
||||||
|
table.PrimaryKey("PK_EquipmentImportRow", x => x.Id);
|
||||||
|
table.ForeignKey(
|
||||||
|
name: "FK_EquipmentImportRow_EquipmentImportBatch_BatchId",
|
||||||
|
column: x => x.BatchId,
|
||||||
|
principalTable: "EquipmentImportBatch",
|
||||||
|
principalColumn: "Id",
|
||||||
|
onDelete: ReferentialAction.Cascade);
|
||||||
|
});
|
||||||
|
|
||||||
|
migrationBuilder.CreateIndex(
|
||||||
|
name: "IX_EquipmentImportBatch_Creator_Finalised",
|
||||||
|
table: "EquipmentImportBatch",
|
||||||
|
columns: new[] { "CreatedBy", "FinalisedAtUtc" });
|
||||||
|
|
||||||
|
migrationBuilder.CreateIndex(
|
||||||
|
name: "IX_EquipmentImportRow_Batch",
|
||||||
|
table: "EquipmentImportRow",
|
||||||
|
column: "BatchId");
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder)
|
||||||
|
{
|
||||||
|
migrationBuilder.DropTable(
|
||||||
|
name: "EquipmentImportRow");
|
||||||
|
|
||||||
|
migrationBuilder.DropTable(
|
||||||
|
name: "EquipmentImportBatch");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -604,6 +604,148 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
b.ToTable("Equipment", (string)null);
|
b.ToTable("Equipment", (string)null);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", b =>
|
||||||
|
{
|
||||||
|
b.Property<Guid>("Id")
|
||||||
|
.ValueGeneratedOnAdd()
|
||||||
|
.HasColumnType("uniqueidentifier");
|
||||||
|
|
||||||
|
b.Property<string>("ClusterId")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<DateTime>("CreatedAtUtc")
|
||||||
|
.HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
b.Property<string>("CreatedBy")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(128)
|
||||||
|
.HasColumnType("nvarchar(128)");
|
||||||
|
|
||||||
|
b.Property<DateTime?>("FinalisedAtUtc")
|
||||||
|
.HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
b.Property<int>("RowsAccepted")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.Property<int>("RowsRejected")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.Property<int>("RowsStaged")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.HasKey("Id");
|
||||||
|
|
||||||
|
b.HasIndex("CreatedBy", "FinalisedAtUtc")
|
||||||
|
.HasDatabaseName("IX_EquipmentImportBatch_Creator_Finalised");
|
||||||
|
|
||||||
|
b.ToTable("EquipmentImportBatch", (string)null);
|
||||||
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportRow", b =>
|
||||||
|
{
|
||||||
|
b.Property<Guid>("Id")
|
||||||
|
.ValueGeneratedOnAdd()
|
||||||
|
.HasColumnType("uniqueidentifier");
|
||||||
|
|
||||||
|
b.Property<string>("AssetLocation")
|
||||||
|
.HasMaxLength(512)
|
||||||
|
.HasColumnType("nvarchar(512)");
|
||||||
|
|
||||||
|
b.Property<Guid>("BatchId")
|
||||||
|
.HasColumnType("uniqueidentifier");
|
||||||
|
|
||||||
|
b.Property<string>("DeviceManualUri")
|
||||||
|
.HasMaxLength(512)
|
||||||
|
.HasColumnType("nvarchar(512)");
|
||||||
|
|
||||||
|
b.Property<string>("EquipmentId")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("EquipmentUuid")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("HardwareRevision")
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<bool>("IsAccepted")
|
||||||
|
.HasColumnType("bit");
|
||||||
|
|
||||||
|
b.Property<int>("LineNumberInFile")
|
||||||
|
.HasColumnType("int");
|
||||||
|
|
||||||
|
b.Property<string>("MachineCode")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(128)
|
||||||
|
.HasColumnType("nvarchar(128)");
|
||||||
|
|
||||||
|
b.Property<string>("Manufacturer")
|
||||||
|
.HasMaxLength(256)
|
||||||
|
.HasColumnType("nvarchar(256)");
|
||||||
|
|
||||||
|
b.Property<string>("ManufacturerUri")
|
||||||
|
.HasMaxLength(512)
|
||||||
|
.HasColumnType("nvarchar(512)");
|
||||||
|
|
||||||
|
b.Property<string>("Model")
|
||||||
|
.HasMaxLength(256)
|
||||||
|
.HasColumnType("nvarchar(256)");
|
||||||
|
|
||||||
|
b.Property<string>("Name")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(128)
|
||||||
|
.HasColumnType("nvarchar(128)");
|
||||||
|
|
||||||
|
b.Property<string>("RejectReason")
|
||||||
|
.HasMaxLength(512)
|
||||||
|
.HasColumnType("nvarchar(512)");
|
||||||
|
|
||||||
|
b.Property<string>("SAPID")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(128)
|
||||||
|
.HasColumnType("nvarchar(128)");
|
||||||
|
|
||||||
|
b.Property<string>("SerialNumber")
|
||||||
|
.HasMaxLength(256)
|
||||||
|
.HasColumnType("nvarchar(256)");
|
||||||
|
|
||||||
|
b.Property<string>("SoftwareRevision")
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("UnsAreaName")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("UnsLineName")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(64)
|
||||||
|
.HasColumnType("nvarchar(64)");
|
||||||
|
|
||||||
|
b.Property<string>("YearOfConstruction")
|
||||||
|
.HasMaxLength(8)
|
||||||
|
.HasColumnType("nvarchar(8)");
|
||||||
|
|
||||||
|
b.Property<string>("ZTag")
|
||||||
|
.IsRequired()
|
||||||
|
.HasMaxLength(128)
|
||||||
|
.HasColumnType("nvarchar(128)");
|
||||||
|
|
||||||
|
b.HasKey("Id");
|
||||||
|
|
||||||
|
b.HasIndex("BatchId")
|
||||||
|
.HasDatabaseName("IX_EquipmentImportRow_Batch");
|
||||||
|
|
||||||
|
b.ToTable("EquipmentImportRow", (string)null);
|
||||||
|
});
|
||||||
|
|
||||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ExternalIdReservation", b =>
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ExternalIdReservation", b =>
|
||||||
{
|
{
|
||||||
b.Property<Guid>("ReservationId")
|
b.Property<Guid>("ReservationId")
|
||||||
@@ -1231,6 +1373,17 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
b.Navigation("Generation");
|
b.Navigation("Generation");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportRow", b =>
|
||||||
|
{
|
||||||
|
b.HasOne("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", "Batch")
|
||||||
|
.WithMany("Rows")
|
||||||
|
.HasForeignKey("BatchId")
|
||||||
|
.OnDelete(DeleteBehavior.Cascade)
|
||||||
|
.IsRequired();
|
||||||
|
|
||||||
|
b.Navigation("Batch");
|
||||||
|
});
|
||||||
|
|
||||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.LdapGroupRoleMapping", b =>
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.LdapGroupRoleMapping", b =>
|
||||||
{
|
{
|
||||||
b.HasOne("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", "Cluster")
|
b.HasOne("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", "Cluster")
|
||||||
@@ -1330,6 +1483,11 @@ namespace ZB.MOM.WW.OtOpcUa.Configuration.Migrations
|
|||||||
b.Navigation("GenerationState");
|
b.Navigation("GenerationState");
|
||||||
});
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.EquipmentImportBatch", b =>
|
||||||
|
{
|
||||||
|
b.Navigation("Rows");
|
||||||
|
});
|
||||||
|
|
||||||
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", b =>
|
modelBuilder.Entity("ZB.MOM.WW.OtOpcUa.Configuration.Entities.ServerCluster", b =>
|
||||||
{
|
{
|
||||||
b.Navigation("Generations");
|
b.Navigation("Generations");
|
||||||
|
|||||||
@@ -30,6 +30,8 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
public DbSet<DriverHostStatus> DriverHostStatuses => Set<DriverHostStatus>();
|
public DbSet<DriverHostStatus> DriverHostStatuses => Set<DriverHostStatus>();
|
||||||
public DbSet<DriverInstanceResilienceStatus> DriverInstanceResilienceStatuses => Set<DriverInstanceResilienceStatus>();
|
public DbSet<DriverInstanceResilienceStatus> DriverInstanceResilienceStatuses => Set<DriverInstanceResilienceStatus>();
|
||||||
public DbSet<LdapGroupRoleMapping> LdapGroupRoleMappings => Set<LdapGroupRoleMapping>();
|
public DbSet<LdapGroupRoleMapping> LdapGroupRoleMappings => Set<LdapGroupRoleMapping>();
|
||||||
|
public DbSet<EquipmentImportBatch> EquipmentImportBatches => Set<EquipmentImportBatch>();
|
||||||
|
public DbSet<EquipmentImportRow> EquipmentImportRows => Set<EquipmentImportRow>();
|
||||||
|
|
||||||
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
protected override void OnModelCreating(ModelBuilder modelBuilder)
|
||||||
{
|
{
|
||||||
@@ -53,6 +55,7 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
ConfigureDriverHostStatus(modelBuilder);
|
ConfigureDriverHostStatus(modelBuilder);
|
||||||
ConfigureDriverInstanceResilienceStatus(modelBuilder);
|
ConfigureDriverInstanceResilienceStatus(modelBuilder);
|
||||||
ConfigureLdapGroupRoleMapping(modelBuilder);
|
ConfigureLdapGroupRoleMapping(modelBuilder);
|
||||||
|
ConfigureEquipmentImportBatch(modelBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static void ConfigureServerCluster(ModelBuilder modelBuilder)
|
private static void ConfigureServerCluster(ModelBuilder modelBuilder)
|
||||||
@@ -568,4 +571,52 @@ public sealed class OtOpcUaConfigDbContext(DbContextOptions<OtOpcUaConfigDbConte
|
|||||||
e.HasIndex(x => x.LdapGroup).HasDatabaseName("IX_LdapGroupRoleMapping_Group");
|
e.HasIndex(x => x.LdapGroup).HasDatabaseName("IX_LdapGroupRoleMapping_Group");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void ConfigureEquipmentImportBatch(ModelBuilder modelBuilder)
|
||||||
|
{
|
||||||
|
modelBuilder.Entity<EquipmentImportBatch>(e =>
|
||||||
|
{
|
||||||
|
e.ToTable("EquipmentImportBatch");
|
||||||
|
e.HasKey(x => x.Id);
|
||||||
|
e.Property(x => x.ClusterId).HasMaxLength(64);
|
||||||
|
e.Property(x => x.CreatedBy).HasMaxLength(128);
|
||||||
|
e.Property(x => x.CreatedAtUtc).HasColumnType("datetime2(3)");
|
||||||
|
e.Property(x => x.FinalisedAtUtc).HasColumnType("datetime2(3)");
|
||||||
|
|
||||||
|
// Admin preview modal filters by user; finalise / drop both hit this index.
|
||||||
|
e.HasIndex(x => new { x.CreatedBy, x.FinalisedAtUtc })
|
||||||
|
.HasDatabaseName("IX_EquipmentImportBatch_Creator_Finalised");
|
||||||
|
});
|
||||||
|
|
||||||
|
modelBuilder.Entity<EquipmentImportRow>(e =>
|
||||||
|
{
|
||||||
|
e.ToTable("EquipmentImportRow");
|
||||||
|
e.HasKey(x => x.Id);
|
||||||
|
e.Property(x => x.ZTag).HasMaxLength(128);
|
||||||
|
e.Property(x => x.MachineCode).HasMaxLength(128);
|
||||||
|
e.Property(x => x.SAPID).HasMaxLength(128);
|
||||||
|
e.Property(x => x.EquipmentId).HasMaxLength(64);
|
||||||
|
e.Property(x => x.EquipmentUuid).HasMaxLength(64);
|
||||||
|
e.Property(x => x.Name).HasMaxLength(128);
|
||||||
|
e.Property(x => x.UnsAreaName).HasMaxLength(64);
|
||||||
|
e.Property(x => x.UnsLineName).HasMaxLength(64);
|
||||||
|
e.Property(x => x.Manufacturer).HasMaxLength(256);
|
||||||
|
e.Property(x => x.Model).HasMaxLength(256);
|
||||||
|
e.Property(x => x.SerialNumber).HasMaxLength(256);
|
||||||
|
e.Property(x => x.HardwareRevision).HasMaxLength(64);
|
||||||
|
e.Property(x => x.SoftwareRevision).HasMaxLength(64);
|
||||||
|
e.Property(x => x.YearOfConstruction).HasMaxLength(8);
|
||||||
|
e.Property(x => x.AssetLocation).HasMaxLength(512);
|
||||||
|
e.Property(x => x.ManufacturerUri).HasMaxLength(512);
|
||||||
|
e.Property(x => x.DeviceManualUri).HasMaxLength(512);
|
||||||
|
e.Property(x => x.RejectReason).HasMaxLength(512);
|
||||||
|
|
||||||
|
e.HasOne(x => x.Batch)
|
||||||
|
.WithMany(b => b.Rows)
|
||||||
|
.HasForeignKey(x => x.BatchId)
|
||||||
|
.OnDelete(DeleteBehavior.Cascade);
|
||||||
|
|
||||||
|
e.HasIndex(x => x.BatchId).HasDatabaseName("IX_EquipmentImportRow_Batch");
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
146
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
Normal file
146
src/ZB.MOM.WW.OtOpcUa.Core.Abstractions/PollGroupEngine.cs
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Shared poll-based subscription engine for drivers whose underlying protocol has no
|
||||||
|
/// native push model (Modbus, AB CIP, S7, FOCAS). Owns one background Task per subscription
|
||||||
|
/// that periodically invokes the supplied reader, diffs each snapshot against the last
|
||||||
|
/// known value, and dispatches a change callback per changed tag. Extracted from
|
||||||
|
/// <c>ModbusDriver</c> (AB CIP PR 1) so poll-based drivers don't each re-ship the loop,
|
||||||
|
/// floor logic, and lifecycle plumbing.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>The engine is read-path agnostic: it calls the supplied <c>reader</c> delegate
|
||||||
|
/// and trusts the driver to map protocol errors into <see cref="DataValueSnapshot.StatusCode"/>.
|
||||||
|
/// Callbacks fire on: (a) the first poll after subscribe (initial-data push per the OPC UA
|
||||||
|
/// Part 4 convention), (b) any subsequent poll where the boxed value or status code differs
|
||||||
|
/// from the previously-seen snapshot.</para>
|
||||||
|
///
|
||||||
|
/// <para>Exceptions thrown by the reader on the initial poll or any subsequent poll are
|
||||||
|
/// swallowed — the loop continues on the next tick. The driver's own health surface is
|
||||||
|
/// where transient poll failures should be reported; the engine intentionally does not
|
||||||
|
/// double-book that responsibility.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class PollGroupEngine : IAsyncDisposable
|
||||||
|
{
|
||||||
|
private readonly Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> _reader;
|
||||||
|
private readonly Action<ISubscriptionHandle, string, DataValueSnapshot> _onChange;
|
||||||
|
private readonly TimeSpan _minInterval;
|
||||||
|
private readonly ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
||||||
|
private long _nextId;
|
||||||
|
|
||||||
|
/// <summary>Default floor for publishing intervals — matches the Modbus 100 ms cap.</summary>
|
||||||
|
public static readonly TimeSpan DefaultMinInterval = TimeSpan.FromMilliseconds(100);
|
||||||
|
|
||||||
|
/// <param name="reader">Driver-supplied batch reader; snapshots MUST be returned in the same
|
||||||
|
/// order as the input references.</param>
|
||||||
|
/// <param name="onChange">Callback invoked per changed tag — the driver forwards to its own
|
||||||
|
/// <see cref="ISubscribable.OnDataChange"/> event.</param>
|
||||||
|
/// <param name="minInterval">Interval floor; anything below is clamped. Defaults to 100 ms
|
||||||
|
/// per <see cref="DefaultMinInterval"/>.</param>
|
||||||
|
public PollGroupEngine(
|
||||||
|
Func<IReadOnlyList<string>, CancellationToken, Task<IReadOnlyList<DataValueSnapshot>>> reader,
|
||||||
|
Action<ISubscriptionHandle, string, DataValueSnapshot> onChange,
|
||||||
|
TimeSpan? minInterval = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(reader);
|
||||||
|
ArgumentNullException.ThrowIfNull(onChange);
|
||||||
|
_reader = reader;
|
||||||
|
_onChange = onChange;
|
||||||
|
_minInterval = minInterval ?? DefaultMinInterval;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Register a new polled subscription and start its background loop.</summary>
|
||||||
|
public ISubscriptionHandle Subscribe(IReadOnlyList<string> fullReferences, TimeSpan publishingInterval)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(fullReferences);
|
||||||
|
var id = Interlocked.Increment(ref _nextId);
|
||||||
|
var cts = new CancellationTokenSource();
|
||||||
|
var interval = publishingInterval < _minInterval ? _minInterval : publishingInterval;
|
||||||
|
var handle = new PollSubscriptionHandle(id);
|
||||||
|
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
||||||
|
_subscriptions[id] = state;
|
||||||
|
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Cancel the background loop for a handle returned by <see cref="Subscribe"/>.</summary>
|
||||||
|
/// <returns><c>true</c> when the handle was known to the engine and has been torn down.</returns>
|
||||||
|
public bool Unsubscribe(ISubscriptionHandle handle)
|
||||||
|
{
|
||||||
|
if (handle is PollSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
||||||
|
{
|
||||||
|
try { state.Cts.Cancel(); } catch { }
|
||||||
|
state.Cts.Dispose();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Snapshot of active subscription count — exposed for driver diagnostics.</summary>
|
||||||
|
public int ActiveSubscriptionCount => _subscriptions.Count;
|
||||||
|
|
||||||
|
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
||||||
|
{
|
||||||
|
// Initial-data push: every subscribed tag fires once at subscribe time regardless of
|
||||||
|
// whether it has changed, satisfying OPC UA Part 4 initial-value semantics.
|
||||||
|
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
catch { /* first-read error tolerated — loop continues */ }
|
||||||
|
|
||||||
|
while (!ct.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
|
||||||
|
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
||||||
|
catch (OperationCanceledException) { return; }
|
||||||
|
catch { /* transient poll error — loop continues, driver health surface logs it */ }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
||||||
|
{
|
||||||
|
var snapshots = await _reader(state.TagReferences, ct).ConfigureAwait(false);
|
||||||
|
for (var i = 0; i < state.TagReferences.Count; i++)
|
||||||
|
{
|
||||||
|
var tagRef = state.TagReferences[i];
|
||||||
|
var current = snapshots[i];
|
||||||
|
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
||||||
|
|
||||||
|
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
||||||
|
{
|
||||||
|
state.LastValues[tagRef] = current;
|
||||||
|
_onChange(state.Handle, tagRef, current);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Cancel every active subscription. Idempotent.</summary>
|
||||||
|
public ValueTask DisposeAsync()
|
||||||
|
{
|
||||||
|
foreach (var state in _subscriptions.Values)
|
||||||
|
{
|
||||||
|
try { state.Cts.Cancel(); } catch { }
|
||||||
|
state.Cts.Dispose();
|
||||||
|
}
|
||||||
|
_subscriptions.Clear();
|
||||||
|
return ValueTask.CompletedTask;
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record SubscriptionState(
|
||||||
|
PollSubscriptionHandle Handle,
|
||||||
|
IReadOnlyList<string> TagReferences,
|
||||||
|
TimeSpan Interval,
|
||||||
|
CancellationTokenSource Cts)
|
||||||
|
{
|
||||||
|
public ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
||||||
|
= new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record PollSubscriptionHandle(long Id) : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => $"poll-sub-{Id}";
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -22,6 +22,7 @@ public sealed class CapabilityInvoker
|
|||||||
private readonly string _driverInstanceId;
|
private readonly string _driverInstanceId;
|
||||||
private readonly string _driverType;
|
private readonly string _driverType;
|
||||||
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
private readonly Func<DriverResilienceOptions> _optionsAccessor;
|
||||||
|
private readonly DriverResilienceStatusTracker? _statusTracker;
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Construct an invoker for one driver instance.
|
/// Construct an invoker for one driver instance.
|
||||||
@@ -33,11 +34,13 @@ public sealed class CapabilityInvoker
|
|||||||
/// pipeline-invalidate can take effect without restarting the invoker.
|
/// pipeline-invalidate can take effect without restarting the invoker.
|
||||||
/// </param>
|
/// </param>
|
||||||
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
/// <param name="driverType">Driver type name for structured-log enrichment (e.g. <c>"Modbus"</c>).</param>
|
||||||
|
/// <param name="statusTracker">Optional resilience-status tracker. When wired, every capability call records start/complete so Admin <c>/hosts</c> can surface <see cref="ResilienceStatusSnapshot.CurrentInFlight"/> as the bulkhead-depth proxy.</param>
|
||||||
public CapabilityInvoker(
|
public CapabilityInvoker(
|
||||||
DriverResiliencePipelineBuilder builder,
|
DriverResiliencePipelineBuilder builder,
|
||||||
string driverInstanceId,
|
string driverInstanceId,
|
||||||
Func<DriverResilienceOptions> optionsAccessor,
|
Func<DriverResilienceOptions> optionsAccessor,
|
||||||
string driverType = "Unknown")
|
string driverType = "Unknown",
|
||||||
|
DriverResilienceStatusTracker? statusTracker = null)
|
||||||
{
|
{
|
||||||
ArgumentNullException.ThrowIfNull(builder);
|
ArgumentNullException.ThrowIfNull(builder);
|
||||||
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
ArgumentNullException.ThrowIfNull(optionsAccessor);
|
||||||
@@ -46,6 +49,7 @@ public sealed class CapabilityInvoker
|
|||||||
_driverInstanceId = driverInstanceId;
|
_driverInstanceId = driverInstanceId;
|
||||||
_driverType = driverType;
|
_driverType = driverType;
|
||||||
_optionsAccessor = optionsAccessor;
|
_optionsAccessor = optionsAccessor;
|
||||||
|
_statusTracker = statusTracker;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
/// <summary>Execute a capability call returning a value, honoring the per-capability pipeline.</summary>
|
||||||
@@ -59,11 +63,19 @@ public sealed class CapabilityInvoker
|
|||||||
ArgumentNullException.ThrowIfNull(callSite);
|
ArgumentNullException.ThrowIfNull(callSite);
|
||||||
|
|
||||||
var pipeline = ResolvePipeline(capability, hostName);
|
var pipeline = ResolvePipeline(capability, hostName);
|
||||||
|
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||||
|
try
|
||||||
|
{
|
||||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||||
{
|
{
|
||||||
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
return await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
|
/// <summary>Execute a void-returning capability call, honoring the per-capability pipeline.</summary>
|
||||||
public async ValueTask ExecuteAsync(
|
public async ValueTask ExecuteAsync(
|
||||||
@@ -75,11 +87,19 @@ public sealed class CapabilityInvoker
|
|||||||
ArgumentNullException.ThrowIfNull(callSite);
|
ArgumentNullException.ThrowIfNull(callSite);
|
||||||
|
|
||||||
var pipeline = ResolvePipeline(capability, hostName);
|
var pipeline = ResolvePipeline(capability, hostName);
|
||||||
|
_statusTracker?.RecordCallStart(_driverInstanceId, hostName);
|
||||||
|
try
|
||||||
|
{
|
||||||
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
using (LogContextEnricher.Push(_driverInstanceId, _driverType, capability, LogContextEnricher.NewCorrelationId()))
|
||||||
{
|
{
|
||||||
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
await pipeline.ExecuteAsync(callSite, cancellationToken).ConfigureAwait(false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
finally
|
||||||
|
{
|
||||||
|
_statusTracker?.RecordCallComplete(_driverInstanceId, hostName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>
|
/// <summary>
|
||||||
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
|
/// Execute a <see cref="DriverCapability.Write"/> call honoring <see cref="WriteIdempotentAttribute"/>
|
||||||
|
|||||||
@@ -81,6 +81,29 @@ public sealed class DriverResilienceStatusTracker
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Record the entry of a capability call for this (instance, host). Increments the
|
||||||
|
/// in-flight counter used as the <see cref="ResilienceStatusSnapshot.CurrentInFlight"/>
|
||||||
|
/// surface (a cheap stand-in for Polly bulkhead depth). Paired with
|
||||||
|
/// <see cref="RecordCallComplete"/>; callers use try/finally.
|
||||||
|
/// </summary>
|
||||||
|
public void RecordCallStart(string driverInstanceId, string hostName)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { CurrentInFlight = 1 },
|
||||||
|
(_, existing) => existing with { CurrentInFlight = existing.CurrentInFlight + 1 });
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>Paired with <see cref="RecordCallStart"/> — decrements the in-flight counter.</summary>
|
||||||
|
public void RecordCallComplete(string driverInstanceId, string hostName)
|
||||||
|
{
|
||||||
|
var key = new StatusKey(driverInstanceId, hostName);
|
||||||
|
_status.AddOrUpdate(key,
|
||||||
|
_ => new ResilienceStatusSnapshot { CurrentInFlight = 0 }, // start-without-complete shouldn't happen; clamp to 0
|
||||||
|
(_, existing) => existing with { CurrentInFlight = Math.Max(0, existing.CurrentInFlight - 1) });
|
||||||
|
}
|
||||||
|
|
||||||
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
/// <summary>Snapshot of a specific (instance, host) pair; null if no counters recorded yet.</summary>
|
||||||
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
public ResilienceStatusSnapshot? TryGet(string driverInstanceId, string hostName) =>
|
||||||
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
_status.TryGetValue(new StatusKey(driverInstanceId, hostName), out var snapshot) ? snapshot : null;
|
||||||
@@ -101,4 +124,12 @@ public sealed record ResilienceStatusSnapshot
|
|||||||
public long BaselineFootprintBytes { get; init; }
|
public long BaselineFootprintBytes { get; init; }
|
||||||
public long CurrentFootprintBytes { get; init; }
|
public long CurrentFootprintBytes { get; init; }
|
||||||
public DateTime LastSampledUtc { get; init; }
|
public DateTime LastSampledUtc { get; init; }
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// In-flight capability calls against this (instance, host). Bumped on call entry +
|
||||||
|
/// decremented on completion. Feeds <c>DriverInstanceResilienceStatus.CurrentBulkheadDepth</c>
|
||||||
|
/// for Admin <c>/hosts</c> — a cheap proxy for the Polly bulkhead depth until the full
|
||||||
|
/// telemetry observer lands.
|
||||||
|
/// </summary>
|
||||||
|
public int CurrentInFlight { get; init; }
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -11,19 +11,17 @@ namespace ZB.MOM.WW.OtOpcUa.Driver.Modbus;
|
|||||||
/// <c>IReadable</c>/<c>IWritable</c> abstractions generalize beyond Galaxy.
|
/// <c>IReadable</c>/<c>IWritable</c> abstractions generalize beyond Galaxy.
|
||||||
/// </summary>
|
/// </summary>
|
||||||
/// <remarks>
|
/// <remarks>
|
||||||
/// Scope limits: synchronous Read/Write only, no subscriptions (Modbus has no push model;
|
/// Scope limits: Historian + alarm capabilities are out of scope (the protocol doesn't
|
||||||
/// subscriptions would need a polling loop over the declared tags — additive PR). Historian
|
/// express them). Subscriptions overlay a polling loop via the shared
|
||||||
/// + alarm capabilities are out of scope (the protocol doesn't express them).
|
/// <see cref="PollGroupEngine"/> since Modbus has no native push model.
|
||||||
/// </remarks>
|
/// </remarks>
|
||||||
public sealed class ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
public sealed class ModbusDriver
|
||||||
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
|
||||||
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
: IDriver, ITagDiscovery, IReadable, IWritable, ISubscribable, IHostConnectivityProbe, IDisposable, IAsyncDisposable
|
||||||
{
|
{
|
||||||
// Active polling subscriptions. Each subscription owns a background Task that polls the
|
// Polled subscriptions delegate to the shared PollGroupEngine. The driver only supplies
|
||||||
// tags at its configured interval, diffs against _lastKnownValues, and fires OnDataChange
|
// the reader + on-change bridge; the engine owns the loop, interval floor, and lifecycle.
|
||||||
// per changed tag. UnsubscribeAsync cancels the task via the CTS stored on the handle.
|
private readonly PollGroupEngine _poll;
|
||||||
private readonly System.Collections.Concurrent.ConcurrentDictionary<long, SubscriptionState> _subscriptions = new();
|
private readonly string _driverInstanceId;
|
||||||
private long _nextSubscriptionId;
|
|
||||||
|
|
||||||
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
public event EventHandler<DataChangeEventArgs>? OnDataChange;
|
||||||
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
public event EventHandler<HostStatusChangedEventArgs>? OnHostStatusChanged;
|
||||||
@@ -35,15 +33,28 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
private HostState _hostState = HostState.Unknown;
|
private HostState _hostState = HostState.Unknown;
|
||||||
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
private DateTime _hostStateChangedUtc = DateTime.UtcNow;
|
||||||
private CancellationTokenSource? _probeCts;
|
private CancellationTokenSource? _probeCts;
|
||||||
private readonly ModbusDriverOptions _options = options;
|
private readonly ModbusDriverOptions _options;
|
||||||
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory =
|
private readonly Func<ModbusDriverOptions, IModbusTransport> _transportFactory;
|
||||||
transportFactory ?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
|
|
||||||
|
|
||||||
private IModbusTransport? _transport;
|
private IModbusTransport? _transport;
|
||||||
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
private DriverHealth _health = new(DriverState.Unknown, null, null);
|
||||||
private readonly Dictionary<string, ModbusTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
|
private readonly Dictionary<string, ModbusTagDefinition> _tagsByName = new(StringComparer.OrdinalIgnoreCase);
|
||||||
|
|
||||||
public string DriverInstanceId => driverInstanceId;
|
public ModbusDriver(ModbusDriverOptions options, string driverInstanceId,
|
||||||
|
Func<ModbusDriverOptions, IModbusTransport>? transportFactory = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(options);
|
||||||
|
_options = options;
|
||||||
|
_driverInstanceId = driverInstanceId;
|
||||||
|
_transportFactory = transportFactory
|
||||||
|
?? (o => new ModbusTcpTransport(o.Host, o.Port, o.Timeout, o.AutoReconnect));
|
||||||
|
_poll = new PollGroupEngine(
|
||||||
|
reader: ReadAsync,
|
||||||
|
onChange: (handle, tagRef, snapshot) =>
|
||||||
|
OnDataChange?.Invoke(this, new DataChangeEventArgs(handle, tagRef, snapshot)));
|
||||||
|
}
|
||||||
|
|
||||||
|
public string DriverInstanceId => _driverInstanceId;
|
||||||
public string DriverType => "Modbus";
|
public string DriverType => "Modbus";
|
||||||
|
|
||||||
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
public async Task InitializeAsync(string driverConfigJson, CancellationToken cancellationToken)
|
||||||
@@ -84,12 +95,7 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
_probeCts?.Dispose();
|
_probeCts?.Dispose();
|
||||||
_probeCts = null;
|
_probeCts = null;
|
||||||
|
|
||||||
foreach (var state in _subscriptions.Values)
|
await _poll.DisposeAsync().ConfigureAwait(false);
|
||||||
{
|
|
||||||
try { state.Cts.Cancel(); } catch { }
|
|
||||||
state.Cts.Dispose();
|
|
||||||
}
|
|
||||||
_subscriptions.Clear();
|
|
||||||
|
|
||||||
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
if (_transport is not null) await _transport.DisposeAsync().ConfigureAwait(false);
|
||||||
_transport = null;
|
_transport = null;
|
||||||
@@ -303,85 +309,18 @@ public sealed class ModbusDriver(ModbusDriverOptions options, string driverInsta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- ISubscribable (polling overlay) ----
|
// ---- ISubscribable (polling overlay via shared engine) ----
|
||||||
|
|
||||||
public Task<ISubscriptionHandle> SubscribeAsync(
|
public Task<ISubscriptionHandle> SubscribeAsync(
|
||||||
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken)
|
IReadOnlyList<string> fullReferences, TimeSpan publishingInterval, CancellationToken cancellationToken) =>
|
||||||
{
|
Task.FromResult(_poll.Subscribe(fullReferences, publishingInterval));
|
||||||
var id = Interlocked.Increment(ref _nextSubscriptionId);
|
|
||||||
var cts = new CancellationTokenSource();
|
|
||||||
var interval = publishingInterval < TimeSpan.FromMilliseconds(100)
|
|
||||||
? TimeSpan.FromMilliseconds(100) // floor — Modbus can't sustain < 100ms polling reliably
|
|
||||||
: publishingInterval;
|
|
||||||
var handle = new ModbusSubscriptionHandle(id);
|
|
||||||
var state = new SubscriptionState(handle, [.. fullReferences], interval, cts);
|
|
||||||
_subscriptions[id] = state;
|
|
||||||
_ = Task.Run(() => PollLoopAsync(state, cts.Token), cts.Token);
|
|
||||||
return Task.FromResult<ISubscriptionHandle>(handle);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
public Task UnsubscribeAsync(ISubscriptionHandle handle, CancellationToken cancellationToken)
|
||||||
{
|
{
|
||||||
if (handle is ModbusSubscriptionHandle h && _subscriptions.TryRemove(h.Id, out var state))
|
_poll.Unsubscribe(handle);
|
||||||
{
|
|
||||||
state.Cts.Cancel();
|
|
||||||
state.Cts.Dispose();
|
|
||||||
}
|
|
||||||
return Task.CompletedTask;
|
return Task.CompletedTask;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async Task PollLoopAsync(SubscriptionState state, CancellationToken ct)
|
|
||||||
{
|
|
||||||
// Initial-data push: read every tag once at subscribe time so OPC UA clients see the
|
|
||||||
// current value per Part 4 convention, even if the value never changes thereafter.
|
|
||||||
try { await PollOnceAsync(state, forceRaise: true, ct).ConfigureAwait(false); }
|
|
||||||
catch (OperationCanceledException) { return; }
|
|
||||||
catch { /* first-read error — polling continues */ }
|
|
||||||
|
|
||||||
while (!ct.IsCancellationRequested)
|
|
||||||
{
|
|
||||||
try { await Task.Delay(state.Interval, ct).ConfigureAwait(false); }
|
|
||||||
catch (OperationCanceledException) { return; }
|
|
||||||
|
|
||||||
try { await PollOnceAsync(state, forceRaise: false, ct).ConfigureAwait(false); }
|
|
||||||
catch (OperationCanceledException) { return; }
|
|
||||||
catch { /* transient polling error — loop continues, health surface reflects it */ }
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private async Task PollOnceAsync(SubscriptionState state, bool forceRaise, CancellationToken ct)
|
|
||||||
{
|
|
||||||
var snapshots = await ReadAsync(state.TagReferences, ct).ConfigureAwait(false);
|
|
||||||
for (var i = 0; i < state.TagReferences.Count; i++)
|
|
||||||
{
|
|
||||||
var tagRef = state.TagReferences[i];
|
|
||||||
var current = snapshots[i];
|
|
||||||
var lastSeen = state.LastValues.TryGetValue(tagRef, out var prev) ? prev : default;
|
|
||||||
|
|
||||||
// Raise on first read (forceRaise) OR when the boxed value differs from last-known.
|
|
||||||
if (forceRaise || !Equals(lastSeen?.Value, current.Value) || lastSeen?.StatusCode != current.StatusCode)
|
|
||||||
{
|
|
||||||
state.LastValues[tagRef] = current;
|
|
||||||
OnDataChange?.Invoke(this, new DataChangeEventArgs(state.Handle, tagRef, current));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed record SubscriptionState(
|
|
||||||
ModbusSubscriptionHandle Handle,
|
|
||||||
IReadOnlyList<string> TagReferences,
|
|
||||||
TimeSpan Interval,
|
|
||||||
CancellationTokenSource Cts)
|
|
||||||
{
|
|
||||||
public System.Collections.Concurrent.ConcurrentDictionary<string, DataValueSnapshot> LastValues { get; }
|
|
||||||
= new(StringComparer.OrdinalIgnoreCase);
|
|
||||||
}
|
|
||||||
|
|
||||||
private sealed record ModbusSubscriptionHandle(long Id) : ISubscriptionHandle
|
|
||||||
{
|
|
||||||
public string DiagnosticId => $"modbus-sub-{Id}";
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- IHostConnectivityProbe ----
|
// ---- IHostConnectivityProbe ----
|
||||||
|
|
||||||
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
public IReadOnlyList<HostConnectivityStatus> GetHostStatuses()
|
||||||
|
|||||||
@@ -0,0 +1,139 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Hosting;
|
||||||
|
using Microsoft.Extensions.Logging;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Samples <see cref="DriverResilienceStatusTracker"/> at a fixed tick + upserts each
|
||||||
|
/// <c>(DriverInstanceId, HostName)</c> snapshot into <see cref="DriverInstanceResilienceStatus"/>
|
||||||
|
/// so Admin <c>/hosts</c> can render live resilience counters across restarts.
|
||||||
|
/// </summary>
|
||||||
|
/// <remarks>
|
||||||
|
/// <para>Closes the HostedService piece of Phase 6.1 Stream E.2 flagged as a follow-up
|
||||||
|
/// when the tracker shipped in PR #82. The Admin UI column-refresh piece (red badge when
|
||||||
|
/// ConsecutiveFailures > breakerThreshold / 2 + SignalR push) is still deferred to
|
||||||
|
/// the visual-compliance pass — this service owns the persistence half alone.</para>
|
||||||
|
///
|
||||||
|
/// <para>Tick interval defaults to 5 s. Persistence is best-effort: a DB outage during
|
||||||
|
/// a tick logs + continues; the next tick tries again with the latest snapshots. The
|
||||||
|
/// hosted service never crashes the app on sample failure.</para>
|
||||||
|
///
|
||||||
|
/// <para><see cref="PersistOnceAsync"/> factored as a public method so tests can drive
|
||||||
|
/// it directly, matching the <see cref="ScheduledRecycleHostedService.TickOnceAsync"/>
|
||||||
|
/// pattern for deterministic unit-test timing.</para>
|
||||||
|
/// </remarks>
|
||||||
|
public sealed class ResilienceStatusPublisherHostedService : BackgroundService
|
||||||
|
{
|
||||||
|
private readonly DriverResilienceStatusTracker _tracker;
|
||||||
|
private readonly IDbContextFactory<OtOpcUaConfigDbContext> _dbContextFactory;
|
||||||
|
private readonly ILogger<ResilienceStatusPublisherHostedService> _logger;
|
||||||
|
private readonly TimeProvider _timeProvider;
|
||||||
|
|
||||||
|
/// <summary>Tick interval — how often the tracker snapshot is persisted.</summary>
|
||||||
|
public TimeSpan TickInterval { get; }
|
||||||
|
|
||||||
|
/// <summary>Snapshot of the tick count for diagnostics + test assertions.</summary>
|
||||||
|
public int TickCount { get; private set; }
|
||||||
|
|
||||||
|
public ResilienceStatusPublisherHostedService(
|
||||||
|
DriverResilienceStatusTracker tracker,
|
||||||
|
IDbContextFactory<OtOpcUaConfigDbContext> dbContextFactory,
|
||||||
|
ILogger<ResilienceStatusPublisherHostedService> logger,
|
||||||
|
TimeProvider? timeProvider = null,
|
||||||
|
TimeSpan? tickInterval = null)
|
||||||
|
{
|
||||||
|
ArgumentNullException.ThrowIfNull(tracker);
|
||||||
|
ArgumentNullException.ThrowIfNull(dbContextFactory);
|
||||||
|
|
||||||
|
_tracker = tracker;
|
||||||
|
_dbContextFactory = dbContextFactory;
|
||||||
|
_logger = logger;
|
||||||
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
||||||
|
TickInterval = tickInterval ?? TimeSpan.FromSeconds(5);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
|
||||||
|
{
|
||||||
|
_logger.LogInformation(
|
||||||
|
"ResilienceStatusPublisherHostedService starting — tick interval = {Interval}",
|
||||||
|
TickInterval);
|
||||||
|
|
||||||
|
while (!stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await Task.Delay(TickInterval, _timeProvider, stoppingToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
|
||||||
|
{
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
await PersistOnceAsync(stoppingToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
|
||||||
|
_logger.LogInformation("ResilienceStatusPublisherHostedService stopping after {TickCount} tick(s).", TickCount);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// <summary>
|
||||||
|
/// Take one snapshot of the tracker + upsert each pair into the persistence table.
|
||||||
|
/// Swallows transient exceptions + logs them; never throws from a sample failure.
|
||||||
|
/// </summary>
|
||||||
|
public async Task PersistOnceAsync(CancellationToken cancellationToken)
|
||||||
|
{
|
||||||
|
TickCount++;
|
||||||
|
var snapshot = _tracker.Snapshot();
|
||||||
|
if (snapshot.Count == 0) return;
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
await using var db = await _dbContextFactory.CreateDbContextAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
var now = _timeProvider.GetUtcNow().UtcDateTime;
|
||||||
|
|
||||||
|
foreach (var (driverInstanceId, hostName, counters) in snapshot)
|
||||||
|
{
|
||||||
|
var existing = await db.DriverInstanceResilienceStatuses
|
||||||
|
.FirstOrDefaultAsync(x => x.DriverInstanceId == driverInstanceId && x.HostName == hostName, cancellationToken)
|
||||||
|
.ConfigureAwait(false);
|
||||||
|
|
||||||
|
if (existing is null)
|
||||||
|
{
|
||||||
|
db.DriverInstanceResilienceStatuses.Add(new DriverInstanceResilienceStatus
|
||||||
|
{
|
||||||
|
DriverInstanceId = driverInstanceId,
|
||||||
|
HostName = hostName,
|
||||||
|
LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc,
|
||||||
|
ConsecutiveFailures = counters.ConsecutiveFailures,
|
||||||
|
CurrentBulkheadDepth = counters.CurrentInFlight,
|
||||||
|
LastRecycleUtc = counters.LastRecycleUtc,
|
||||||
|
BaselineFootprintBytes = counters.BaselineFootprintBytes,
|
||||||
|
CurrentFootprintBytes = counters.CurrentFootprintBytes,
|
||||||
|
LastSampledUtc = now,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
existing.LastCircuitBreakerOpenUtc = counters.LastBreakerOpenUtc;
|
||||||
|
existing.ConsecutiveFailures = counters.ConsecutiveFailures;
|
||||||
|
existing.CurrentBulkheadDepth = counters.CurrentInFlight;
|
||||||
|
existing.LastRecycleUtc = counters.LastRecycleUtc;
|
||||||
|
existing.BaselineFootprintBytes = counters.BaselineFootprintBytes;
|
||||||
|
existing.CurrentFootprintBytes = counters.CurrentFootprintBytes;
|
||||||
|
existing.LastSampledUtc = now;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await db.SaveChangesAsync(cancellationToken).ConfigureAwait(false);
|
||||||
|
}
|
||||||
|
catch (OperationCanceledException) { throw; }
|
||||||
|
catch (Exception ex)
|
||||||
|
{
|
||||||
|
_logger.LogWarning(ex,
|
||||||
|
"ResilienceStatusPublisher persistence tick failed; next tick will retry with latest snapshots.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,165 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Admin.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class EquipmentImportBatchServiceTests : IDisposable
|
||||||
|
{
|
||||||
|
private readonly OtOpcUaConfigDbContext _db;
|
||||||
|
private readonly EquipmentImportBatchService _svc;
|
||||||
|
|
||||||
|
public EquipmentImportBatchServiceTests()
|
||||||
|
{
|
||||||
|
var options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||||
|
.UseInMemoryDatabase($"import-batch-{Guid.NewGuid():N}")
|
||||||
|
.Options;
|
||||||
|
_db = new OtOpcUaConfigDbContext(options);
|
||||||
|
_svc = new EquipmentImportBatchService(_db);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() => _db.Dispose();
|
||||||
|
|
||||||
|
private static EquipmentCsvRow Row(string zTag, string name = "eq-1") => new()
|
||||||
|
{
|
||||||
|
ZTag = zTag,
|
||||||
|
MachineCode = "mc",
|
||||||
|
SAPID = "sap",
|
||||||
|
EquipmentId = "eq-id",
|
||||||
|
EquipmentUuid = Guid.NewGuid().ToString(),
|
||||||
|
Name = name,
|
||||||
|
UnsAreaName = "area",
|
||||||
|
UnsLineName = "line",
|
||||||
|
};
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CreateBatch_PopulatesId_AndTimestamp()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
|
||||||
|
batch.Id.ShouldNotBe(Guid.Empty);
|
||||||
|
batch.CreatedAtUtc.ShouldBeGreaterThan(DateTime.UtcNow.AddMinutes(-1));
|
||||||
|
batch.RowsStaged.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task StageRows_AcceptedAndRejected_AllPersist()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
|
||||||
|
await _svc.StageRowsAsync(batch.Id,
|
||||||
|
acceptedRows: [Row("z-1"), Row("z-2")],
|
||||||
|
rejectedRows: [new EquipmentCsvRowError(LineNumber: 5, Reason: "duplicate ZTag")],
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
var reloaded = await _db.EquipmentImportBatches.Include(b => b.Rows).FirstAsync(b => b.Id == batch.Id);
|
||||||
|
reloaded.RowsStaged.ShouldBe(3);
|
||||||
|
reloaded.RowsAccepted.ShouldBe(2);
|
||||||
|
reloaded.RowsRejected.ShouldBe(1);
|
||||||
|
reloaded.Rows.Count.ShouldBe(3);
|
||||||
|
reloaded.Rows.Count(r => r.IsAccepted).ShouldBe(2);
|
||||||
|
reloaded.Rows.Single(r => !r.IsAccepted).RejectReason.ShouldBe("duplicate ZTag");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DropBatch_RemovesBatch_AndCascades_Rows()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||||
|
|
||||||
|
await _svc.DropBatchAsync(batch.Id, CancellationToken.None);
|
||||||
|
|
||||||
|
(await _db.EquipmentImportBatches.AnyAsync(b => b.Id == batch.Id)).ShouldBeFalse();
|
||||||
|
(await _db.EquipmentImportRows.AnyAsync(r => r.BatchId == batch.Id)).ShouldBeFalse("cascaded delete clears rows");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DropBatch_AfterFinalise_Throws()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||||
|
await _svc.FinaliseBatchAsync(batch.Id, generationId: 1, driverInstanceIdForRows: "drv-1", unsLineIdForRows: "line-1", CancellationToken.None);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||||
|
() => _svc.DropBatchAsync(batch.Id, CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Finalise_AcceptedRows_BecomeEquipment()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(batch.Id,
|
||||||
|
[Row("z-1", name: "alpha"), Row("z-2", name: "beta")],
|
||||||
|
rejectedRows: [new EquipmentCsvRowError(1, "rejected")],
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
await _svc.FinaliseBatchAsync(batch.Id, 5, "drv-modbus", "line-warsaw", CancellationToken.None);
|
||||||
|
|
||||||
|
var equipment = await _db.Equipment.Where(e => e.GenerationId == 5).ToListAsync();
|
||||||
|
equipment.Count.ShouldBe(2);
|
||||||
|
equipment.Select(e => e.Name).ShouldBe(["alpha", "beta"], ignoreOrder: true);
|
||||||
|
equipment.All(e => e.DriverInstanceId == "drv-modbus").ShouldBeTrue();
|
||||||
|
equipment.All(e => e.UnsLineId == "line-warsaw").ShouldBeTrue();
|
||||||
|
|
||||||
|
var reloaded = await _db.EquipmentImportBatches.FirstAsync(b => b.Id == batch.Id);
|
||||||
|
reloaded.FinalisedAtUtc.ShouldNotBeNull();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Finalise_Twice_Throws()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||||
|
await _svc.FinaliseBatchAsync(batch.Id, 1, "drv", "line", CancellationToken.None);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||||
|
() => _svc.FinaliseBatchAsync(batch.Id, 2, "drv", "line", CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Finalise_MissingBatch_Throws()
|
||||||
|
{
|
||||||
|
await Should.ThrowAsync<ImportBatchNotFoundException>(
|
||||||
|
() => _svc.FinaliseBatchAsync(Guid.NewGuid(), 1, "drv", "line", CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Stage_After_Finalise_Throws()
|
||||||
|
{
|
||||||
|
var batch = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(batch.Id, [Row("z-1")], [], CancellationToken.None);
|
||||||
|
await _svc.FinaliseBatchAsync(batch.Id, 1, "drv", "line", CancellationToken.None);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<ImportBatchAlreadyFinalisedException>(
|
||||||
|
() => _svc.StageRowsAsync(batch.Id, [Row("z-2")], [], CancellationToken.None));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ListByUser_FiltersByCreator_AndFinalised()
|
||||||
|
{
|
||||||
|
var a = await _svc.CreateBatchAsync("c1", "alice", CancellationToken.None);
|
||||||
|
var b = await _svc.CreateBatchAsync("c1", "bob", CancellationToken.None);
|
||||||
|
await _svc.StageRowsAsync(a.Id, [Row("z-a")], [], CancellationToken.None);
|
||||||
|
await _svc.FinaliseBatchAsync(a.Id, 1, "d", "l", CancellationToken.None);
|
||||||
|
_ = b;
|
||||||
|
|
||||||
|
var aliceOpen = await _svc.ListByUserAsync("alice", includeFinalised: false, CancellationToken.None);
|
||||||
|
aliceOpen.ShouldBeEmpty("alice's only batch is finalised");
|
||||||
|
|
||||||
|
var aliceAll = await _svc.ListByUserAsync("alice", includeFinalised: true, CancellationToken.None);
|
||||||
|
aliceAll.Count.ShouldBe(1);
|
||||||
|
|
||||||
|
var bobOpen = await _svc.ListByUserAsync("bob", includeFinalised: false, CancellationToken.None);
|
||||||
|
bobOpen.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DropBatch_Unknown_IsNoOp()
|
||||||
|
{
|
||||||
|
await _svc.DropBatchAsync(Guid.NewGuid(), CancellationToken.None);
|
||||||
|
// no throw
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -31,6 +31,8 @@ public sealed class SchemaComplianceTests
|
|||||||
"DriverHostStatus",
|
"DriverHostStatus",
|
||||||
"DriverInstanceResilienceStatus",
|
"DriverInstanceResilienceStatus",
|
||||||
"LdapGroupRoleMapping",
|
"LdapGroupRoleMapping",
|
||||||
|
"EquipmentImportBatch",
|
||||||
|
"EquipmentImportRow",
|
||||||
};
|
};
|
||||||
|
|
||||||
var actual = QueryStrings(@"
|
var actual = QueryStrings(@"
|
||||||
|
|||||||
@@ -0,0 +1,245 @@
|
|||||||
|
using System.Collections.Concurrent;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Abstractions.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class PollGroupEngineTests
|
||||||
|
{
|
||||||
|
private sealed class FakeSource
|
||||||
|
{
|
||||||
|
public ConcurrentDictionary<string, object?> Values { get; } = new();
|
||||||
|
public int ReadCount;
|
||||||
|
|
||||||
|
public Task<IReadOnlyList<DataValueSnapshot>> ReadAsync(
|
||||||
|
IReadOnlyList<string> refs, CancellationToken ct)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref ReadCount);
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
IReadOnlyList<DataValueSnapshot> snapshots = refs
|
||||||
|
.Select(r => Values.TryGetValue(r, out var v)
|
||||||
|
? new DataValueSnapshot(v, 0u, now, now)
|
||||||
|
: new DataValueSnapshot(null, 0x80340000u, null, now))
|
||||||
|
.ToList();
|
||||||
|
return Task.FromResult(snapshots);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Initial_poll_force_raises_every_subscribed_tag()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["A"] = 1;
|
||||||
|
src.Values["B"] = "hello";
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<(ISubscriptionHandle h, string r, DataValueSnapshot s)>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue((h, r, s)));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["A", "B"], TimeSpan.FromMilliseconds(200));
|
||||||
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
events.Select(e => e.r).ShouldBe(["A", "B"], ignoreOrder: true);
|
||||||
|
engine.Unsubscribe(handle).ShouldBeTrue();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unchanged_value_raises_only_once()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 42;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue((h, r, s)));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
await Task.Delay(500);
|
||||||
|
engine.Unsubscribe(handle);
|
||||||
|
|
||||||
|
events.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Value_change_raises_new_event()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 1;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue((h, r, s)));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||||
|
src.Values["X"] = 2;
|
||||||
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
engine.Unsubscribe(handle);
|
||||||
|
events.Last().Item3.Value.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unsubscribe_halts_the_loop()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 1;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue((h, r, s)));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(1));
|
||||||
|
engine.Unsubscribe(handle).ShouldBeTrue();
|
||||||
|
var afterUnsub = events.Count;
|
||||||
|
|
||||||
|
src.Values["X"] = 999;
|
||||||
|
await Task.Delay(400);
|
||||||
|
events.Count.ShouldBe(afterUnsub);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Interval_below_floor_is_clamped()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 1;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<(ISubscriptionHandle, string, DataValueSnapshot)>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue((h, r, s)),
|
||||||
|
minInterval: TimeSpan.FromMilliseconds(200));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(5));
|
||||||
|
await Task.Delay(300);
|
||||||
|
engine.Unsubscribe(handle);
|
||||||
|
|
||||||
|
// 300 ms window, 200 ms floor, stable value → initial push + at most 1 extra poll.
|
||||||
|
// With zero changes only the initial-data push fires.
|
||||||
|
events.Count.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Multiple_subscriptions_are_independent()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["A"] = 1;
|
||||||
|
src.Values["B"] = 2;
|
||||||
|
|
||||||
|
var a = new ConcurrentQueue<string>();
|
||||||
|
var b = new ConcurrentQueue<string>();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) =>
|
||||||
|
{
|
||||||
|
if (r == "A") a.Enqueue(r);
|
||||||
|
else if (r == "B") b.Enqueue(r);
|
||||||
|
});
|
||||||
|
|
||||||
|
var ha = engine.Subscribe(["A"], TimeSpan.FromMilliseconds(100));
|
||||||
|
var hb = engine.Subscribe(["B"], TimeSpan.FromMilliseconds(100));
|
||||||
|
|
||||||
|
await WaitForAsync(() => a.Count >= 1 && b.Count >= 1, TimeSpan.FromSeconds(2));
|
||||||
|
engine.Unsubscribe(ha);
|
||||||
|
var aCount = a.Count;
|
||||||
|
src.Values["B"] = 77;
|
||||||
|
await WaitForAsync(() => b.Count >= 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
a.Count.ShouldBe(aCount);
|
||||||
|
b.Count.ShouldBeGreaterThanOrEqualTo(2);
|
||||||
|
engine.Unsubscribe(hb);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Reader_exception_does_not_crash_loop()
|
||||||
|
{
|
||||||
|
var throwCount = 0;
|
||||||
|
var readCount = 0;
|
||||||
|
Task<IReadOnlyList<DataValueSnapshot>> Reader(IReadOnlyList<string> refs, CancellationToken ct)
|
||||||
|
{
|
||||||
|
if (Interlocked.Increment(ref readCount) <= 2)
|
||||||
|
{
|
||||||
|
Interlocked.Increment(ref throwCount);
|
||||||
|
throw new InvalidOperationException("boom");
|
||||||
|
}
|
||||||
|
var now = DateTime.UtcNow;
|
||||||
|
return Task.FromResult<IReadOnlyList<DataValueSnapshot>>(
|
||||||
|
refs.Select(r => new DataValueSnapshot(1, 0u, now, now)).ToList());
|
||||||
|
}
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<string>();
|
||||||
|
await using var engine = new PollGroupEngine(Reader,
|
||||||
|
(h, r, s) => events.Enqueue(r));
|
||||||
|
|
||||||
|
var handle = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
await WaitForAsync(() => events.Count >= 1, TimeSpan.FromSeconds(2));
|
||||||
|
engine.Unsubscribe(handle);
|
||||||
|
|
||||||
|
throwCount.ShouldBe(2);
|
||||||
|
events.Count.ShouldBeGreaterThanOrEqualTo(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task Unsubscribe_unknown_handle_returns_false()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
||||||
|
|
||||||
|
var foreign = new DummyHandle();
|
||||||
|
engine.Unsubscribe(foreign).ShouldBeFalse();
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task ActiveSubscriptionCount_tracks_lifecycle()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 1;
|
||||||
|
await using var engine = new PollGroupEngine(src.ReadAsync, (_, _, _) => { });
|
||||||
|
|
||||||
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||||
|
var h1 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
||||||
|
var h2 = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(200));
|
||||||
|
engine.ActiveSubscriptionCount.ShouldBe(2);
|
||||||
|
|
||||||
|
engine.Unsubscribe(h1);
|
||||||
|
engine.ActiveSubscriptionCount.ShouldBe(1);
|
||||||
|
engine.Unsubscribe(h2);
|
||||||
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task DisposeAsync_cancels_all_subscriptions()
|
||||||
|
{
|
||||||
|
var src = new FakeSource();
|
||||||
|
src.Values["X"] = 1;
|
||||||
|
|
||||||
|
var events = new ConcurrentQueue<string>();
|
||||||
|
var engine = new PollGroupEngine(src.ReadAsync,
|
||||||
|
(h, r, s) => events.Enqueue(r));
|
||||||
|
|
||||||
|
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
_ = engine.Subscribe(["X"], TimeSpan.FromMilliseconds(100));
|
||||||
|
await WaitForAsync(() => events.Count >= 2, TimeSpan.FromSeconds(2));
|
||||||
|
|
||||||
|
await engine.DisposeAsync();
|
||||||
|
engine.ActiveSubscriptionCount.ShouldBe(0);
|
||||||
|
|
||||||
|
var afterDispose = events.Count;
|
||||||
|
await Task.Delay(300);
|
||||||
|
// After dispose no more events — everything is cancelled.
|
||||||
|
events.Count.ShouldBe(afterDispose);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed record DummyHandle : ISubscriptionHandle
|
||||||
|
{
|
||||||
|
public string DiagnosticId => "dummy";
|
||||||
|
}
|
||||||
|
|
||||||
|
private static async Task WaitForAsync(Func<bool> condition, TimeSpan timeout)
|
||||||
|
{
|
||||||
|
var deadline = DateTime.UtcNow + timeout;
|
||||||
|
while (!condition() && DateTime.UtcNow < deadline)
|
||||||
|
await Task.Delay(20);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,130 @@
|
|||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Abstractions;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Core.Tests.Resilience;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class InFlightCounterTests
|
||||||
|
{
|
||||||
|
[Fact]
|
||||||
|
public void StartThenComplete_NetsToZero()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void NestedStarts_SumDepth()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(3);
|
||||||
|
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void CompleteBeforeStart_ClampedToZero()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallComplete("drv", "host-a");
|
||||||
|
|
||||||
|
// A stray Complete without a matching Start shouldn't drive the counter negative.
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void DifferentHosts_TrackIndependently()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-a");
|
||||||
|
tracker.RecordCallStart("drv", "host-b");
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(2);
|
||||||
|
tracker.TryGet("drv", "host-b")!.CurrentInFlight.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public void ConcurrentStarts_DoNotLose_Count()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
Parallel.For(0, 500, _ => tracker.RecordCallStart("drv", "host-a"));
|
||||||
|
|
||||||
|
tracker.TryGet("drv", "host-a")!.CurrentInFlight.ShouldBe(500);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_IncrementsTracker_DuringExecution()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
driverType: "Modbus",
|
||||||
|
statusTracker: tracker);
|
||||||
|
|
||||||
|
var observedMidCall = -1;
|
||||||
|
await invoker.ExecuteAsync(
|
||||||
|
DriverCapability.Read,
|
||||||
|
"plc-1",
|
||||||
|
async _ =>
|
||||||
|
{
|
||||||
|
observedMidCall = tracker.TryGet("drv-live", "plc-1")?.CurrentInFlight ?? -1;
|
||||||
|
await Task.Yield();
|
||||||
|
return 42;
|
||||||
|
},
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
observedMidCall.ShouldBe(1, "during call, in-flight == 1");
|
||||||
|
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0, "post-call, counter decremented");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_ExceptionPath_DecrementsCounter()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
statusTracker: tracker);
|
||||||
|
|
||||||
|
await Should.ThrowAsync<InvalidOperationException>(async () =>
|
||||||
|
await invoker.ExecuteAsync<int>(
|
||||||
|
DriverCapability.Write,
|
||||||
|
"plc-1",
|
||||||
|
_ => throw new InvalidOperationException("boom"),
|
||||||
|
CancellationToken.None));
|
||||||
|
|
||||||
|
tracker.TryGet("drv-live", "plc-1")!.CurrentInFlight.ShouldBe(0,
|
||||||
|
"finally-block must decrement even when call-site throws");
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task CapabilityInvoker_WithoutTracker_DoesNotThrow()
|
||||||
|
{
|
||||||
|
var invoker = new CapabilityInvoker(
|
||||||
|
new DriverResiliencePipelineBuilder(),
|
||||||
|
"drv-live",
|
||||||
|
() => new DriverResilienceOptions { Tier = DriverTier.A },
|
||||||
|
statusTracker: null);
|
||||||
|
|
||||||
|
var result = await invoker.ExecuteAsync(
|
||||||
|
DriverCapability.Read, "host-1",
|
||||||
|
_ => ValueTask.FromResult(7),
|
||||||
|
CancellationToken.None);
|
||||||
|
|
||||||
|
result.ShouldBe(7);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,161 @@
|
|||||||
|
using Microsoft.EntityFrameworkCore;
|
||||||
|
using Microsoft.Extensions.Logging.Abstractions;
|
||||||
|
using Shouldly;
|
||||||
|
using Xunit;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Core.Resilience;
|
||||||
|
using ZB.MOM.WW.OtOpcUa.Server.Hosting;
|
||||||
|
|
||||||
|
namespace ZB.MOM.WW.OtOpcUa.Server.Tests;
|
||||||
|
|
||||||
|
[Trait("Category", "Unit")]
|
||||||
|
public sealed class ResilienceStatusPublisherHostedServiceTests : IDisposable
|
||||||
|
{
|
||||||
|
private static readonly DateTime T0 = new(2026, 4, 19, 12, 0, 0, DateTimeKind.Utc);
|
||||||
|
|
||||||
|
private sealed class FakeClock : TimeProvider
|
||||||
|
{
|
||||||
|
public DateTime Utc { get; set; } = T0;
|
||||||
|
public override DateTimeOffset GetUtcNow() => new(Utc, TimeSpan.Zero);
|
||||||
|
}
|
||||||
|
|
||||||
|
private sealed class InMemoryDbContextFactory : IDbContextFactory<OtOpcUaConfigDbContext>
|
||||||
|
{
|
||||||
|
private readonly DbContextOptions<OtOpcUaConfigDbContext> _options;
|
||||||
|
public InMemoryDbContextFactory(string dbName)
|
||||||
|
{
|
||||||
|
_options = new DbContextOptionsBuilder<OtOpcUaConfigDbContext>()
|
||||||
|
.UseInMemoryDatabase(dbName)
|
||||||
|
.Options;
|
||||||
|
}
|
||||||
|
public OtOpcUaConfigDbContext CreateDbContext() => new(_options);
|
||||||
|
}
|
||||||
|
|
||||||
|
private readonly string _dbName = $"resilience-pub-{Guid.NewGuid():N}";
|
||||||
|
private readonly InMemoryDbContextFactory _factory;
|
||||||
|
private readonly OtOpcUaConfigDbContext _readCtx;
|
||||||
|
|
||||||
|
public ResilienceStatusPublisherHostedServiceTests()
|
||||||
|
{
|
||||||
|
_factory = new InMemoryDbContextFactory(_dbName);
|
||||||
|
_readCtx = _factory.CreateDbContext();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void Dispose() => _readCtx.Dispose();
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task EmptyTracker_Tick_NoOp_NoRowsWritten()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||||
|
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
host.TickCount.ShouldBe(1);
|
||||||
|
(await _readCtx.DriverInstanceResilienceStatuses.CountAsync()).ShouldBe(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SingleHost_OnePairWithCounters_UpsertsNewRow()
|
||||||
|
{
|
||||||
|
var clock = new FakeClock();
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||||
|
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||||
|
tracker.RecordBreakerOpen("drv-1", "plc-a", T0.AddSeconds(1));
|
||||||
|
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance,
|
||||||
|
timeProvider: clock);
|
||||||
|
|
||||||
|
clock.Utc = T0.AddSeconds(2);
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||||
|
row.DriverInstanceId.ShouldBe("drv-1");
|
||||||
|
row.HostName.ShouldBe("plc-a");
|
||||||
|
row.ConsecutiveFailures.ShouldBe(2);
|
||||||
|
row.LastCircuitBreakerOpenUtc.ShouldBe(T0.AddSeconds(1));
|
||||||
|
row.LastSampledUtc.ShouldBe(T0.AddSeconds(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task SecondTick_UpdatesExistingRow_InPlace()
|
||||||
|
{
|
||||||
|
var clock = new FakeClock();
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||||
|
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance,
|
||||||
|
timeProvider: clock);
|
||||||
|
|
||||||
|
clock.Utc = T0.AddSeconds(5);
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
// Second tick: success resets the counter.
|
||||||
|
tracker.RecordSuccess("drv-1", "plc-a", T0.AddSeconds(6));
|
||||||
|
clock.Utc = T0.AddSeconds(10);
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
(await _readCtx.DriverInstanceResilienceStatuses.CountAsync()).ShouldBe(1, "one row, updated in place");
|
||||||
|
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||||
|
row.ConsecutiveFailures.ShouldBe(0);
|
||||||
|
row.LastSampledUtc.ShouldBe(T0.AddSeconds(10));
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task MultipleHosts_BothPersist_Independently()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||||
|
tracker.RecordFailure("drv-1", "plc-a", T0);
|
||||||
|
tracker.RecordFailure("drv-1", "plc-b", T0);
|
||||||
|
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||||
|
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
var rows = await _readCtx.DriverInstanceResilienceStatuses
|
||||||
|
.OrderBy(r => r.HostName)
|
||||||
|
.ToListAsync();
|
||||||
|
rows.Count.ShouldBe(2);
|
||||||
|
rows[0].HostName.ShouldBe("plc-a");
|
||||||
|
rows[0].ConsecutiveFailures.ShouldBe(2);
|
||||||
|
rows[1].HostName.ShouldBe("plc-b");
|
||||||
|
rows[1].ConsecutiveFailures.ShouldBe(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task FootprintCounters_Persist()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
tracker.RecordFootprint("drv-1", "plc-a",
|
||||||
|
baselineBytes: 100_000_000, currentBytes: 150_000_000, T0);
|
||||||
|
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||||
|
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
var row = await _readCtx.DriverInstanceResilienceStatuses.SingleAsync();
|
||||||
|
row.BaselineFootprintBytes.ShouldBe(100_000_000);
|
||||||
|
row.CurrentFootprintBytes.ShouldBe(150_000_000);
|
||||||
|
}
|
||||||
|
|
||||||
|
[Fact]
|
||||||
|
public async Task TickCount_Advances_OnEveryCall()
|
||||||
|
{
|
||||||
|
var tracker = new DriverResilienceStatusTracker();
|
||||||
|
var host = new ResilienceStatusPublisherHostedService(
|
||||||
|
tracker, _factory, NullLogger<ResilienceStatusPublisherHostedService>.Instance);
|
||||||
|
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
await host.PersistOnceAsync(CancellationToken.None);
|
||||||
|
|
||||||
|
host.TickCount.ShouldBe(3);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user