Phase 6.4 Stream B.2-B.4 server-side — EquipmentImportBatch staging + FinaliseBatch transaction
Closes the server-side/data-layer piece of Phase 6.4 Stream B.2-B.4. The CSV-import preview + modal UI (Stream B.3/B.5) still belongs to the Admin UI follow-up — this PR owns the staging tables + atomic finalise alone. Configuration: - New EquipmentImportBatch entity (Id, ClusterId, CreatedBy, CreatedAtUtc, RowsStaged/Accepted/Rejected, FinalisedAtUtc?). Composite index on (CreatedBy, FinalisedAtUtc) powers the Admin preview modal's "my open batches" query. - New EquipmentImportRow entity — one row per CSV row, 8 required columns from decision #117 + 9 optional from decision #139 + IsAccepted flag + RejectReason. FK to EquipmentImportBatch with cascade delete so DropBatch collapses the whole tree. - EF migration 20260419_..._AddEquipmentImportBatch. - SchemaComplianceTests expected tables list gains the two new tables. Admin.Services.EquipmentImportBatchService: - CreateBatchAsync — new header row, caller-supplied ClusterId + CreatedBy. - StageRowsAsync(batchId, acceptedRows, rejectedRows) — bulk-inserts the parsed CSV rows into staging. Rejected rows carry LineNumberInFile + RejectReason for the preview modal. Throws when the batch is finalised. - DropBatchAsync — removes batch + cascaded rows. Throws when the batch was already finalised (rollback via staging is not a time machine). - FinaliseBatchAsync(batchId, generationId, driverInstanceId, unsLineId) — atomic apply. Opens an EF transaction when the provider supports it (SQL Server in prod; InMemory in tests skips the tx), bulk-inserts every accepted staging row into Equipment, stamps EquipmentImportBatch.FinalisedAtUtc, commits. Failure rolls back so Equipment never partially mutates. Idempotent-under-double-call: second finalise throws ImportBatchAlreadyFinalisedException. - ListByUserAsync(createdBy, includeFinalised) — the Admin preview modal's backing query. OrderByDescending on CreatedAtUtc so the most-recent batch shows first. - Two exception types: ImportBatchNotFoundException + ImportBatchAlreadyFinalisedException. ExternalIdReservation merging (ZTag + SAPID fleet-wide uniqueness) is NOT done here — a narrower follow-up wires it once the concurrent-insert test matrix is green. Tests (10 new EquipmentImportBatchServiceTests, all pass): - CreateBatch populates Id + CreatedAtUtc + zero-ed counters. - StageRows accepted + rejected both persist; counters advance. - DropBatch cascades row delete. - DropBatch after finalise throws. - Finalise translates accepted staging rows → Equipment under the target GenerationId + DriverInstanceId + UnsLineId. - Finalise twice throws. - Finalise of unknown batch throws. - Stage after finalise throws. - ListByUserAsync filters by creator + finalised flag. - Drop of unknown batch is a no-op (idempotent rollback). Full solution dotnet test: 1235 passing (was 1225, +10). Pre-existing Client.CLI Subscribe flake unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,207 @@
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration;
|
||||
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
||||
|
||||
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
|
||||
|
||||
/// <summary>
|
||||
/// Staged-import orchestrator per Phase 6.4 Stream B.2-B.4. Covers the four operator
|
||||
/// actions: CreateBatch → StageRows (chunked) → FinaliseBatch (atomic apply into
|
||||
/// <see cref="Equipment"/>) → DropBatch (rollback of pre-finalise state).
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>FinaliseBatch runs inside one EF transaction + bulk-inserts accepted rows into
|
||||
/// <see cref="Equipment"/>. Rejected rows stay behind as audit evidence; the batch row
|
||||
/// gains <see cref="EquipmentImportBatch.FinalisedAtUtc"/> so future writes know it's
|
||||
/// archived. DropBatch removes the batch + its cascaded rows.</para>
|
||||
///
|
||||
/// <para>Idempotence: calling FinaliseBatch twice throws <see cref="ImportBatchAlreadyFinalisedException"/>
|
||||
/// rather than double-inserting. Operator refreshes the admin page to see the first
|
||||
/// finalise completed.</para>
|
||||
///
|
||||
/// <para>ExternalIdReservation merging (ZTag + SAPID uniqueness) is NOT done here — a
|
||||
/// narrower follow-up wires it once the concurrent-insert test matrix is green.</para>
|
||||
/// </remarks>
|
||||
public sealed class EquipmentImportBatchService(OtOpcUaConfigDbContext db)
|
||||
{
|
||||
/// <summary>Create a new empty batch header. Returns the row with Id populated.</summary>
|
||||
public async Task<EquipmentImportBatch> CreateBatchAsync(string clusterId, string createdBy, CancellationToken ct)
|
||||
{
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
||||
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
||||
|
||||
var batch = new EquipmentImportBatch
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
ClusterId = clusterId,
|
||||
CreatedBy = createdBy,
|
||||
CreatedAtUtc = DateTime.UtcNow,
|
||||
};
|
||||
db.EquipmentImportBatches.Add(batch);
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
return batch;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stage one chunk of rows into the batch. Caller usually feeds
|
||||
/// <see cref="EquipmentCsvImporter.Parse"/> output here — each
|
||||
/// <see cref="EquipmentCsvRow"/> becomes one accepted <see cref="EquipmentImportRow"/>,
|
||||
/// each rejected parser error becomes one row with <see cref="EquipmentImportRow.IsAccepted"/> false.
|
||||
/// </summary>
|
||||
public async Task StageRowsAsync(
|
||||
Guid batchId,
|
||||
IReadOnlyList<EquipmentCsvRow> acceptedRows,
|
||||
IReadOnlyList<EquipmentCsvRowError> rejectedRows,
|
||||
CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false)
|
||||
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} finalised at {batch.FinalisedAtUtc:o}; no more rows can be staged.");
|
||||
|
||||
foreach (var row in acceptedRows)
|
||||
{
|
||||
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
BatchId = batchId,
|
||||
IsAccepted = true,
|
||||
ZTag = row.ZTag,
|
||||
MachineCode = row.MachineCode,
|
||||
SAPID = row.SAPID,
|
||||
EquipmentId = row.EquipmentId,
|
||||
EquipmentUuid = row.EquipmentUuid,
|
||||
Name = row.Name,
|
||||
UnsAreaName = row.UnsAreaName,
|
||||
UnsLineName = row.UnsLineName,
|
||||
Manufacturer = row.Manufacturer,
|
||||
Model = row.Model,
|
||||
SerialNumber = row.SerialNumber,
|
||||
HardwareRevision = row.HardwareRevision,
|
||||
SoftwareRevision = row.SoftwareRevision,
|
||||
YearOfConstruction = row.YearOfConstruction,
|
||||
AssetLocation = row.AssetLocation,
|
||||
ManufacturerUri = row.ManufacturerUri,
|
||||
DeviceManualUri = row.DeviceManualUri,
|
||||
});
|
||||
}
|
||||
|
||||
foreach (var error in rejectedRows)
|
||||
{
|
||||
db.EquipmentImportRows.Add(new EquipmentImportRow
|
||||
{
|
||||
Id = Guid.NewGuid(),
|
||||
BatchId = batchId,
|
||||
IsAccepted = false,
|
||||
RejectReason = error.Reason,
|
||||
LineNumberInFile = error.LineNumber,
|
||||
// Required columns need values for EF; reject rows use sentinel placeholders.
|
||||
ZTag = "", MachineCode = "", SAPID = "", EquipmentId = "", EquipmentUuid = "",
|
||||
Name = "", UnsAreaName = "", UnsLineName = "",
|
||||
});
|
||||
}
|
||||
|
||||
batch.RowsStaged += acceptedRows.Count + rejectedRows.Count;
|
||||
batch.RowsAccepted += acceptedRows.Count;
|
||||
batch.RowsRejected += rejectedRows.Count;
|
||||
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>Drop the batch (pre-finalise rollback). Cascaded row delete removes staged rows.</summary>
|
||||
public async Task DropBatchAsync(Guid batchId, CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false);
|
||||
if (batch is null) return;
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}; cannot drop.");
|
||||
|
||||
db.EquipmentImportBatches.Remove(batch);
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Atomic finalise. Inserts every accepted row into the live
|
||||
/// <see cref="Equipment"/> table under the target generation + stamps
|
||||
/// <see cref="EquipmentImportBatch.FinalisedAtUtc"/>. Failure rolls the whole tx
|
||||
/// back — <see cref="Equipment"/> never partially mutates.
|
||||
/// </summary>
|
||||
public async Task FinaliseBatchAsync(
|
||||
Guid batchId, long generationId, string driverInstanceIdForRows, string unsLineIdForRows, CancellationToken ct)
|
||||
{
|
||||
var batch = await db.EquipmentImportBatches
|
||||
.Include(b => b.Rows)
|
||||
.FirstOrDefaultAsync(b => b.Id == batchId, ct)
|
||||
.ConfigureAwait(false)
|
||||
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
||||
|
||||
if (batch.FinalisedAtUtc is not null)
|
||||
throw new ImportBatchAlreadyFinalisedException(
|
||||
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}.");
|
||||
|
||||
// EF InMemory provider doesn't honour BeginTransaction; SQL Server provider does.
|
||||
// Tests run the happy path under in-memory; production SQL Server runs the atomic tx.
|
||||
var supportsTx = db.Database.IsRelational();
|
||||
Microsoft.EntityFrameworkCore.Storage.IDbContextTransaction? tx = null;
|
||||
if (supportsTx)
|
||||
tx = await db.Database.BeginTransactionAsync(ct).ConfigureAwait(false);
|
||||
|
||||
try
|
||||
{
|
||||
foreach (var row in batch.Rows.Where(r => r.IsAccepted))
|
||||
{
|
||||
db.Equipment.Add(new Equipment
|
||||
{
|
||||
EquipmentRowId = Guid.NewGuid(),
|
||||
GenerationId = generationId,
|
||||
EquipmentId = row.EquipmentId,
|
||||
EquipmentUuid = Guid.TryParse(row.EquipmentUuid, out var u) ? u : Guid.NewGuid(),
|
||||
DriverInstanceId = driverInstanceIdForRows,
|
||||
UnsLineId = unsLineIdForRows,
|
||||
Name = row.Name,
|
||||
MachineCode = row.MachineCode,
|
||||
ZTag = row.ZTag,
|
||||
SAPID = row.SAPID,
|
||||
Manufacturer = row.Manufacturer,
|
||||
Model = row.Model,
|
||||
SerialNumber = row.SerialNumber,
|
||||
HardwareRevision = row.HardwareRevision,
|
||||
SoftwareRevision = row.SoftwareRevision,
|
||||
YearOfConstruction = short.TryParse(row.YearOfConstruction, out var y) ? y : null,
|
||||
AssetLocation = row.AssetLocation,
|
||||
ManufacturerUri = row.ManufacturerUri,
|
||||
DeviceManualUri = row.DeviceManualUri,
|
||||
});
|
||||
}
|
||||
|
||||
batch.FinalisedAtUtc = DateTime.UtcNow;
|
||||
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
||||
if (tx is not null) await tx.CommitAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
catch
|
||||
{
|
||||
if (tx is not null) await tx.RollbackAsync(ct).ConfigureAwait(false);
|
||||
throw;
|
||||
}
|
||||
finally
|
||||
{
|
||||
if (tx is not null) await tx.DisposeAsync().ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>List batches created by the given user. Finalised batches are archived; include them on demand.</summary>
|
||||
public async Task<IReadOnlyList<EquipmentImportBatch>> ListByUserAsync(string createdBy, bool includeFinalised, CancellationToken ct)
|
||||
{
|
||||
var query = db.EquipmentImportBatches.AsNoTracking().Where(b => b.CreatedBy == createdBy);
|
||||
if (!includeFinalised)
|
||||
query = query.Where(b => b.FinalisedAtUtc == null);
|
||||
return await query.OrderByDescending(b => b.CreatedAtUtc).ToListAsync(ct).ConfigureAwait(false);
|
||||
}
|
||||
}
|
||||
|
||||
public sealed class ImportBatchNotFoundException(string message) : Exception(message);
|
||||
public sealed class ImportBatchAlreadyFinalisedException(string message) : Exception(message);
|
||||
Reference in New Issue
Block a user