325 lines
15 KiB
C#
325 lines
15 KiB
C#
using Microsoft.EntityFrameworkCore;
|
|
using ZB.MOM.WW.OtOpcUa.Admin.Services;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Entities;
|
|
using ZB.MOM.WW.OtOpcUa.Configuration.Enums;
|
|
|
|
namespace ZB.MOM.WW.OtOpcUa.Admin.Services;
|
|
|
|
/// <summary>
|
|
/// Staged-import orchestrator per Phase 6.4 Stream B.2-B.4. Covers the four operator
|
|
/// actions: CreateBatch → StageRows (chunked) → FinaliseBatch (atomic apply into
|
|
/// <see cref="Equipment"/>) → DropBatch (rollback of pre-finalise state).
|
|
/// </summary>
|
|
/// <remarks>
|
|
/// <para>FinaliseBatch runs inside one EF transaction + bulk-inserts accepted rows into
|
|
/// <see cref="Equipment"/>. Rejected rows stay behind as audit evidence; the batch row
|
|
/// gains <see cref="EquipmentImportBatch.FinalisedAtUtc"/> so future writes know it's
|
|
/// archived. DropBatch removes the batch + its cascaded rows.</para>
|
|
///
|
|
/// <para>Idempotence: calling FinaliseBatch twice throws <see cref="ImportBatchAlreadyFinalisedException"/>
|
|
/// rather than double-inserting. Operator refreshes the admin page to see the first
|
|
/// finalise completed.</para>
|
|
///
|
|
/// <para>ExternalIdReservation merging (ZTag + SAPID uniqueness) is NOT done here — a
|
|
/// narrower follow-up wires it once the concurrent-insert test matrix is green.</para>
|
|
/// </remarks>
|
|
public sealed class EquipmentImportBatchService(OtOpcUaConfigDbContext db)
|
|
{
|
|
/// <summary>Create a new empty batch header. Returns the row with Id populated.</summary>
|
|
public async Task<EquipmentImportBatch> CreateBatchAsync(string clusterId, string createdBy, CancellationToken ct)
|
|
{
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(clusterId);
|
|
ArgumentException.ThrowIfNullOrWhiteSpace(createdBy);
|
|
|
|
var batch = new EquipmentImportBatch
|
|
{
|
|
Id = Guid.NewGuid(),
|
|
ClusterId = clusterId,
|
|
CreatedBy = createdBy,
|
|
CreatedAtUtc = DateTime.UtcNow,
|
|
};
|
|
db.EquipmentImportBatches.Add(batch);
|
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
|
return batch;
|
|
}
|
|
|
|
/// <summary>
|
|
/// Stage one chunk of rows into the batch. Caller usually feeds
|
|
/// <see cref="EquipmentCsvImporter.Parse"/> output here — each
|
|
/// <see cref="EquipmentCsvRow"/> becomes one accepted <see cref="EquipmentImportRow"/>,
|
|
/// each rejected parser error becomes one row with <see cref="EquipmentImportRow.IsAccepted"/> false.
|
|
/// </summary>
|
|
public async Task StageRowsAsync(
|
|
Guid batchId,
|
|
IReadOnlyList<EquipmentCsvRow> acceptedRows,
|
|
IReadOnlyList<EquipmentCsvRowError> rejectedRows,
|
|
CancellationToken ct)
|
|
{
|
|
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false)
|
|
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
|
|
|
if (batch.FinalisedAtUtc is not null)
|
|
throw new ImportBatchAlreadyFinalisedException(
|
|
$"Batch {batchId} finalised at {batch.FinalisedAtUtc:o}; no more rows can be staged.");
|
|
|
|
foreach (var row in acceptedRows)
|
|
{
|
|
db.EquipmentImportRows.Add(new EquipmentImportRow
|
|
{
|
|
Id = Guid.NewGuid(),
|
|
BatchId = batchId,
|
|
IsAccepted = true,
|
|
ZTag = row.ZTag,
|
|
MachineCode = row.MachineCode,
|
|
SAPID = row.SAPID,
|
|
EquipmentId = row.EquipmentId,
|
|
EquipmentUuid = row.EquipmentUuid,
|
|
Name = row.Name,
|
|
UnsAreaName = row.UnsAreaName,
|
|
UnsLineName = row.UnsLineName,
|
|
Manufacturer = row.Manufacturer,
|
|
Model = row.Model,
|
|
SerialNumber = row.SerialNumber,
|
|
HardwareRevision = row.HardwareRevision,
|
|
SoftwareRevision = row.SoftwareRevision,
|
|
YearOfConstruction = row.YearOfConstruction,
|
|
AssetLocation = row.AssetLocation,
|
|
ManufacturerUri = row.ManufacturerUri,
|
|
DeviceManualUri = row.DeviceManualUri,
|
|
});
|
|
}
|
|
|
|
foreach (var error in rejectedRows)
|
|
{
|
|
db.EquipmentImportRows.Add(new EquipmentImportRow
|
|
{
|
|
Id = Guid.NewGuid(),
|
|
BatchId = batchId,
|
|
IsAccepted = false,
|
|
RejectReason = error.Reason,
|
|
LineNumberInFile = error.LineNumber,
|
|
// Required columns need values for EF; reject rows use sentinel placeholders.
|
|
ZTag = "", MachineCode = "", SAPID = "", EquipmentId = "", EquipmentUuid = "",
|
|
Name = "", UnsAreaName = "", UnsLineName = "",
|
|
});
|
|
}
|
|
|
|
batch.RowsStaged += acceptedRows.Count + rejectedRows.Count;
|
|
batch.RowsAccepted += acceptedRows.Count;
|
|
batch.RowsRejected += rejectedRows.Count;
|
|
|
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>Drop the batch (pre-finalise rollback). Cascaded row delete removes staged rows.</summary>
|
|
public async Task DropBatchAsync(Guid batchId, CancellationToken ct)
|
|
{
|
|
var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false);
|
|
if (batch is null) return;
|
|
if (batch.FinalisedAtUtc is not null)
|
|
throw new ImportBatchAlreadyFinalisedException(
|
|
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}; cannot drop.");
|
|
|
|
db.EquipmentImportBatches.Remove(batch);
|
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Atomic finalise. Inserts every accepted row into the live
|
|
/// <see cref="Equipment"/> table under the target generation + stamps
|
|
/// <see cref="EquipmentImportBatch.FinalisedAtUtc"/>. Failure rolls the whole tx
|
|
/// back — <see cref="Equipment"/> never partially mutates.
|
|
/// </summary>
|
|
public async Task FinaliseBatchAsync(
|
|
Guid batchId, long generationId, string driverInstanceIdForRows, string unsLineIdForRows, CancellationToken ct)
|
|
{
|
|
var batch = await db.EquipmentImportBatches
|
|
.Include(b => b.Rows)
|
|
.FirstOrDefaultAsync(b => b.Id == batchId, ct)
|
|
.ConfigureAwait(false)
|
|
?? throw new ImportBatchNotFoundException($"Batch {batchId} not found.");
|
|
|
|
if (batch.FinalisedAtUtc is not null)
|
|
throw new ImportBatchAlreadyFinalisedException(
|
|
$"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}.");
|
|
|
|
// EF InMemory provider doesn't honour BeginTransaction; SQL Server provider does.
|
|
// Tests run the happy path under in-memory; production SQL Server runs the atomic tx.
|
|
var supportsTx = db.Database.IsRelational();
|
|
Microsoft.EntityFrameworkCore.Storage.IDbContextTransaction? tx = null;
|
|
if (supportsTx)
|
|
tx = await db.Database.BeginTransactionAsync(ct).ConfigureAwait(false);
|
|
|
|
try
|
|
{
|
|
// Snapshot active reservations that overlap this batch's ZTag + SAPID set — one
|
|
// round-trip instead of N. Released rows (ReleasedAt IS NOT NULL) are ignored so
|
|
// an explicitly-released value can be reused.
|
|
var accepted = batch.Rows.Where(r => r.IsAccepted).ToList();
|
|
var zTags = accepted.Where(r => !string.IsNullOrWhiteSpace(r.ZTag))
|
|
.Select(r => r.ZTag).Distinct(StringComparer.OrdinalIgnoreCase).ToList();
|
|
var sapIds = accepted.Where(r => !string.IsNullOrWhiteSpace(r.SAPID))
|
|
.Select(r => r.SAPID).Distinct(StringComparer.OrdinalIgnoreCase).ToList();
|
|
|
|
var existingReservations = await db.ExternalIdReservations
|
|
.Where(r => r.ReleasedAt == null &&
|
|
((r.Kind == ReservationKind.ZTag && zTags.Contains(r.Value)) ||
|
|
(r.Kind == ReservationKind.SAPID && sapIds.Contains(r.Value))))
|
|
.ToListAsync(ct).ConfigureAwait(false);
|
|
var resByKey = existingReservations.ToDictionary(
|
|
r => (r.Kind, r.Value.ToLowerInvariant()),
|
|
r => r);
|
|
|
|
var nowUtc = DateTime.UtcNow;
|
|
var firstPublishedBy = batch.CreatedBy;
|
|
|
|
foreach (var row in accepted)
|
|
{
|
|
var equipmentUuid = Guid.TryParse(row.EquipmentUuid, out var u) ? u : Guid.NewGuid();
|
|
|
|
db.Equipment.Add(new Equipment
|
|
{
|
|
EquipmentRowId = Guid.NewGuid(),
|
|
GenerationId = generationId,
|
|
EquipmentId = row.EquipmentId,
|
|
EquipmentUuid = equipmentUuid,
|
|
DriverInstanceId = driverInstanceIdForRows,
|
|
UnsLineId = unsLineIdForRows,
|
|
Name = row.Name,
|
|
MachineCode = row.MachineCode,
|
|
ZTag = row.ZTag,
|
|
SAPID = row.SAPID,
|
|
Manufacturer = row.Manufacturer,
|
|
Model = row.Model,
|
|
SerialNumber = row.SerialNumber,
|
|
HardwareRevision = row.HardwareRevision,
|
|
SoftwareRevision = row.SoftwareRevision,
|
|
YearOfConstruction = short.TryParse(row.YearOfConstruction, out var y) ? y : null,
|
|
AssetLocation = row.AssetLocation,
|
|
ManufacturerUri = row.ManufacturerUri,
|
|
DeviceManualUri = row.DeviceManualUri,
|
|
});
|
|
|
|
MergeReservation(row.ZTag, ReservationKind.ZTag, equipmentUuid, batch.ClusterId,
|
|
firstPublishedBy, nowUtc, resByKey);
|
|
MergeReservation(row.SAPID, ReservationKind.SAPID, equipmentUuid, batch.ClusterId,
|
|
firstPublishedBy, nowUtc, resByKey);
|
|
}
|
|
|
|
batch.FinalisedAtUtc = nowUtc;
|
|
try
|
|
{
|
|
await db.SaveChangesAsync(ct).ConfigureAwait(false);
|
|
}
|
|
catch (DbUpdateException ex) when (IsReservationUniquenessViolation(ex))
|
|
{
|
|
throw new ExternalIdReservationConflictException(
|
|
"Finalise rejected: one or more ZTag/SAPID values were reserved by another operator " +
|
|
"between batch preview and commit. Inspect active reservations + retry after resolving the conflict.",
|
|
ex);
|
|
}
|
|
if (tx is not null) await tx.CommitAsync(ct).ConfigureAwait(false);
|
|
}
|
|
catch
|
|
{
|
|
if (tx is not null) await tx.RollbackAsync(ct).ConfigureAwait(false);
|
|
throw;
|
|
}
|
|
finally
|
|
{
|
|
if (tx is not null) await tx.DisposeAsync().ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Merge one external-ID reservation for an equipment row. Three outcomes:
|
|
/// (1) value is empty → skip; (2) reservation exists for same <paramref name="equipmentUuid"/>
|
|
/// → bump <c>LastPublishedAt</c>; (3) reservation exists for a different EquipmentUuid
|
|
/// → throw <see cref="ExternalIdReservationConflictException"/> with the conflicting UUID
|
|
/// so the caller sees which equipment already owns the value; (4) no reservation → create new.
|
|
/// </summary>
|
|
private void MergeReservation(
|
|
string? value,
|
|
ReservationKind kind,
|
|
Guid equipmentUuid,
|
|
string clusterId,
|
|
string firstPublishedBy,
|
|
DateTime nowUtc,
|
|
Dictionary<(ReservationKind, string), ExternalIdReservation> cache)
|
|
{
|
|
if (string.IsNullOrWhiteSpace(value)) return;
|
|
|
|
var key = (kind, value.ToLowerInvariant());
|
|
if (cache.TryGetValue(key, out var existing))
|
|
{
|
|
if (existing.EquipmentUuid != equipmentUuid)
|
|
throw new ExternalIdReservationConflictException(
|
|
$"{kind} '{value}' is already reserved by EquipmentUuid {existing.EquipmentUuid} " +
|
|
$"(first published {existing.FirstPublishedAt:u} on cluster '{existing.ClusterId}'). " +
|
|
$"Refusing to re-assign to {equipmentUuid}.");
|
|
|
|
existing.LastPublishedAt = nowUtc;
|
|
return;
|
|
}
|
|
|
|
var fresh = new ExternalIdReservation
|
|
{
|
|
ReservationId = Guid.NewGuid(),
|
|
Kind = kind,
|
|
Value = value,
|
|
EquipmentUuid = equipmentUuid,
|
|
ClusterId = clusterId,
|
|
FirstPublishedAt = nowUtc,
|
|
FirstPublishedBy = firstPublishedBy,
|
|
LastPublishedAt = nowUtc,
|
|
};
|
|
db.ExternalIdReservations.Add(fresh);
|
|
cache[key] = fresh;
|
|
}
|
|
|
|
/// <summary>
|
|
/// True when the <see cref="DbUpdateException"/> root-cause was the filtered-unique
|
|
/// index <c>UX_ExternalIdReservation_KindValue_Active</c> — i.e. another transaction
|
|
/// won the race between our cache-load + commit. SQL Server surfaces this as 2601 / 2627.
|
|
/// </summary>
|
|
private static bool IsReservationUniquenessViolation(DbUpdateException ex)
|
|
{
|
|
for (Exception? inner = ex; inner is not null; inner = inner.InnerException)
|
|
{
|
|
if (inner is Microsoft.Data.SqlClient.SqlException sql &&
|
|
(sql.Number == 2601 || sql.Number == 2627) &&
|
|
sql.Message.Contains("UX_ExternalIdReservation_KindValue_Active", StringComparison.OrdinalIgnoreCase))
|
|
{
|
|
return true;
|
|
}
|
|
}
|
|
return false;
|
|
}
|
|
|
|
/// <summary>List batches created by the given user. Finalised batches are archived; include them on demand.</summary>
|
|
public async Task<IReadOnlyList<EquipmentImportBatch>> ListByUserAsync(string createdBy, bool includeFinalised, CancellationToken ct)
|
|
{
|
|
var query = db.EquipmentImportBatches.AsNoTracking().Where(b => b.CreatedBy == createdBy);
|
|
if (!includeFinalised)
|
|
query = query.Where(b => b.FinalisedAtUtc == null);
|
|
return await query.OrderByDescending(b => b.CreatedAtUtc).ToListAsync(ct).ConfigureAwait(false);
|
|
}
|
|
}
|
|
|
|
public sealed class ImportBatchNotFoundException(string message) : Exception(message);
|
|
public sealed class ImportBatchAlreadyFinalisedException(string message) : Exception(message);
|
|
|
|
/// <summary>
|
|
/// Thrown when a <c>FinaliseBatchAsync</c> call detects that one of its ZTag/SAPID values is
|
|
/// already reserved by a different EquipmentUuid — either from a prior published generation
|
|
/// or a concurrent finalise that won the race. The operator sees the message + the conflicting
|
|
/// equipment ownership so they can resolve the conflict (pick a new ZTag, release the existing
|
|
/// reservation via <c>sp_ReleaseExternalIdReservation</c>, etc.) and retry the finalise.
|
|
/// </summary>
|
|
public sealed class ExternalIdReservationConflictException : Exception
|
|
{
|
|
public ExternalIdReservationConflictException(string message) : base(message) { }
|
|
public ExternalIdReservationConflictException(string message, Exception inner) : base(message, inner) { }
|
|
}
|