using Microsoft.EntityFrameworkCore; using ZB.MOM.WW.OtOpcUa.Admin.Services; using ZB.MOM.WW.OtOpcUa.Configuration; using ZB.MOM.WW.OtOpcUa.Configuration.Entities; using ZB.MOM.WW.OtOpcUa.Configuration.Enums; namespace ZB.MOM.WW.OtOpcUa.Admin.Services; /// /// Staged-import orchestrator per Phase 6.4 Stream B.2-B.4. Covers the four operator /// actions: CreateBatch → StageRows (chunked) → FinaliseBatch (atomic apply into /// ) → DropBatch (rollback of pre-finalise state). /// /// /// FinaliseBatch runs inside one EF transaction + bulk-inserts accepted rows into /// . Rejected rows stay behind as audit evidence; the batch row /// gains so future writes know it's /// archived. DropBatch removes the batch + its cascaded rows. /// /// Idempotence: calling FinaliseBatch twice throws /// rather than double-inserting. Operator refreshes the admin page to see the first /// finalise completed. /// /// ExternalIdReservation merging (ZTag + SAPID uniqueness) is NOT done here — a /// narrower follow-up wires it once the concurrent-insert test matrix is green. /// public sealed class EquipmentImportBatchService(OtOpcUaConfigDbContext db) { /// Create a new empty batch header. Returns the row with Id populated. public async Task CreateBatchAsync(string clusterId, string createdBy, CancellationToken ct) { ArgumentException.ThrowIfNullOrWhiteSpace(clusterId); ArgumentException.ThrowIfNullOrWhiteSpace(createdBy); var batch = new EquipmentImportBatch { Id = Guid.NewGuid(), ClusterId = clusterId, CreatedBy = createdBy, CreatedAtUtc = DateTime.UtcNow, }; db.EquipmentImportBatches.Add(batch); await db.SaveChangesAsync(ct).ConfigureAwait(false); return batch; } /// /// Stage one chunk of rows into the batch. Caller usually feeds /// output here — each /// becomes one accepted , /// each rejected parser error becomes one row with false. /// public async Task StageRowsAsync( Guid batchId, IReadOnlyList acceptedRows, IReadOnlyList rejectedRows, CancellationToken ct) { var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false) ?? throw new ImportBatchNotFoundException($"Batch {batchId} not found."); if (batch.FinalisedAtUtc is not null) throw new ImportBatchAlreadyFinalisedException( $"Batch {batchId} finalised at {batch.FinalisedAtUtc:o}; no more rows can be staged."); foreach (var row in acceptedRows) { db.EquipmentImportRows.Add(new EquipmentImportRow { Id = Guid.NewGuid(), BatchId = batchId, IsAccepted = true, ZTag = row.ZTag, MachineCode = row.MachineCode, SAPID = row.SAPID, EquipmentId = row.EquipmentId, EquipmentUuid = row.EquipmentUuid, Name = row.Name, UnsAreaName = row.UnsAreaName, UnsLineName = row.UnsLineName, Manufacturer = row.Manufacturer, Model = row.Model, SerialNumber = row.SerialNumber, HardwareRevision = row.HardwareRevision, SoftwareRevision = row.SoftwareRevision, YearOfConstruction = row.YearOfConstruction, AssetLocation = row.AssetLocation, ManufacturerUri = row.ManufacturerUri, DeviceManualUri = row.DeviceManualUri, }); } foreach (var error in rejectedRows) { db.EquipmentImportRows.Add(new EquipmentImportRow { Id = Guid.NewGuid(), BatchId = batchId, IsAccepted = false, RejectReason = error.Reason, LineNumberInFile = error.LineNumber, // Required columns need values for EF; reject rows use sentinel placeholders. ZTag = "", MachineCode = "", SAPID = "", EquipmentId = "", EquipmentUuid = "", Name = "", UnsAreaName = "", UnsLineName = "", }); } batch.RowsStaged += acceptedRows.Count + rejectedRows.Count; batch.RowsAccepted += acceptedRows.Count; batch.RowsRejected += rejectedRows.Count; await db.SaveChangesAsync(ct).ConfigureAwait(false); } /// Drop the batch (pre-finalise rollback). Cascaded row delete removes staged rows. public async Task DropBatchAsync(Guid batchId, CancellationToken ct) { var batch = await db.EquipmentImportBatches.FirstOrDefaultAsync(b => b.Id == batchId, ct).ConfigureAwait(false); if (batch is null) return; if (batch.FinalisedAtUtc is not null) throw new ImportBatchAlreadyFinalisedException( $"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}; cannot drop."); db.EquipmentImportBatches.Remove(batch); await db.SaveChangesAsync(ct).ConfigureAwait(false); } /// /// Atomic finalise. Inserts every accepted row into the live /// table under the target generation + stamps /// . Failure rolls the whole tx /// back — never partially mutates. /// public async Task FinaliseBatchAsync( Guid batchId, long generationId, string driverInstanceIdForRows, string unsLineIdForRows, CancellationToken ct) { var batch = await db.EquipmentImportBatches .Include(b => b.Rows) .FirstOrDefaultAsync(b => b.Id == batchId, ct) .ConfigureAwait(false) ?? throw new ImportBatchNotFoundException($"Batch {batchId} not found."); if (batch.FinalisedAtUtc is not null) throw new ImportBatchAlreadyFinalisedException( $"Batch {batchId} already finalised at {batch.FinalisedAtUtc:o}."); // EF InMemory provider doesn't honour BeginTransaction; SQL Server provider does. // Tests run the happy path under in-memory; production SQL Server runs the atomic tx. var supportsTx = db.Database.IsRelational(); Microsoft.EntityFrameworkCore.Storage.IDbContextTransaction? tx = null; if (supportsTx) tx = await db.Database.BeginTransactionAsync(ct).ConfigureAwait(false); try { // Snapshot active reservations that overlap this batch's ZTag + SAPID set — one // round-trip instead of N. Released rows (ReleasedAt IS NOT NULL) are ignored so // an explicitly-released value can be reused. var accepted = batch.Rows.Where(r => r.IsAccepted).ToList(); var zTags = accepted.Where(r => !string.IsNullOrWhiteSpace(r.ZTag)) .Select(r => r.ZTag).Distinct(StringComparer.OrdinalIgnoreCase).ToList(); var sapIds = accepted.Where(r => !string.IsNullOrWhiteSpace(r.SAPID)) .Select(r => r.SAPID).Distinct(StringComparer.OrdinalIgnoreCase).ToList(); var existingReservations = await db.ExternalIdReservations .Where(r => r.ReleasedAt == null && ((r.Kind == ReservationKind.ZTag && zTags.Contains(r.Value)) || (r.Kind == ReservationKind.SAPID && sapIds.Contains(r.Value)))) .ToListAsync(ct).ConfigureAwait(false); var resByKey = existingReservations.ToDictionary( r => (r.Kind, r.Value.ToLowerInvariant()), r => r); var nowUtc = DateTime.UtcNow; var firstPublishedBy = batch.CreatedBy; foreach (var row in accepted) { var equipmentUuid = Guid.TryParse(row.EquipmentUuid, out var u) ? u : Guid.NewGuid(); db.Equipment.Add(new Equipment { EquipmentRowId = Guid.NewGuid(), GenerationId = generationId, EquipmentId = row.EquipmentId, EquipmentUuid = equipmentUuid, DriverInstanceId = driverInstanceIdForRows, UnsLineId = unsLineIdForRows, Name = row.Name, MachineCode = row.MachineCode, ZTag = row.ZTag, SAPID = row.SAPID, Manufacturer = row.Manufacturer, Model = row.Model, SerialNumber = row.SerialNumber, HardwareRevision = row.HardwareRevision, SoftwareRevision = row.SoftwareRevision, YearOfConstruction = short.TryParse(row.YearOfConstruction, out var y) ? y : null, AssetLocation = row.AssetLocation, ManufacturerUri = row.ManufacturerUri, DeviceManualUri = row.DeviceManualUri, }); MergeReservation(row.ZTag, ReservationKind.ZTag, equipmentUuid, batch.ClusterId, firstPublishedBy, nowUtc, resByKey); MergeReservation(row.SAPID, ReservationKind.SAPID, equipmentUuid, batch.ClusterId, firstPublishedBy, nowUtc, resByKey); } batch.FinalisedAtUtc = nowUtc; try { await db.SaveChangesAsync(ct).ConfigureAwait(false); } catch (DbUpdateException ex) when (IsReservationUniquenessViolation(ex)) { throw new ExternalIdReservationConflictException( "Finalise rejected: one or more ZTag/SAPID values were reserved by another operator " + "between batch preview and commit. Inspect active reservations + retry after resolving the conflict.", ex); } if (tx is not null) await tx.CommitAsync(ct).ConfigureAwait(false); } catch { if (tx is not null) await tx.RollbackAsync(ct).ConfigureAwait(false); throw; } finally { if (tx is not null) await tx.DisposeAsync().ConfigureAwait(false); } } /// /// Merge one external-ID reservation for an equipment row. Three outcomes: /// (1) value is empty → skip; (2) reservation exists for same /// → bump LastPublishedAt; (3) reservation exists for a different EquipmentUuid /// → throw with the conflicting UUID /// so the caller sees which equipment already owns the value; (4) no reservation → create new. /// private void MergeReservation( string? value, ReservationKind kind, Guid equipmentUuid, string clusterId, string firstPublishedBy, DateTime nowUtc, Dictionary<(ReservationKind, string), ExternalIdReservation> cache) { if (string.IsNullOrWhiteSpace(value)) return; var key = (kind, value.ToLowerInvariant()); if (cache.TryGetValue(key, out var existing)) { if (existing.EquipmentUuid != equipmentUuid) throw new ExternalIdReservationConflictException( $"{kind} '{value}' is already reserved by EquipmentUuid {existing.EquipmentUuid} " + $"(first published {existing.FirstPublishedAt:u} on cluster '{existing.ClusterId}'). " + $"Refusing to re-assign to {equipmentUuid}."); existing.LastPublishedAt = nowUtc; return; } var fresh = new ExternalIdReservation { ReservationId = Guid.NewGuid(), Kind = kind, Value = value, EquipmentUuid = equipmentUuid, ClusterId = clusterId, FirstPublishedAt = nowUtc, FirstPublishedBy = firstPublishedBy, LastPublishedAt = nowUtc, }; db.ExternalIdReservations.Add(fresh); cache[key] = fresh; } /// /// True when the root-cause was the filtered-unique /// index UX_ExternalIdReservation_KindValue_Active — i.e. another transaction /// won the race between our cache-load + commit. SQL Server surfaces this as 2601 / 2627. /// private static bool IsReservationUniquenessViolation(DbUpdateException ex) { for (Exception? inner = ex; inner is not null; inner = inner.InnerException) { if (inner is Microsoft.Data.SqlClient.SqlException sql && (sql.Number == 2601 || sql.Number == 2627) && sql.Message.Contains("UX_ExternalIdReservation_KindValue_Active", StringComparison.OrdinalIgnoreCase)) { return true; } } return false; } /// List batches created by the given user. Finalised batches are archived; include them on demand. public async Task> ListByUserAsync(string createdBy, bool includeFinalised, CancellationToken ct) { var query = db.EquipmentImportBatches.AsNoTracking().Where(b => b.CreatedBy == createdBy); if (!includeFinalised) query = query.Where(b => b.FinalisedAtUtc == null); return await query.OrderByDescending(b => b.CreatedAtUtc).ToListAsync(ct).ConfigureAwait(false); } } public sealed class ImportBatchNotFoundException(string message) : Exception(message); public sealed class ImportBatchAlreadyFinalisedException(string message) : Exception(message); /// /// Thrown when a FinaliseBatchAsync call detects that one of its ZTag/SAPID values is /// already reserved by a different EquipmentUuid — either from a prior published generation /// or a concurrent finalise that won the race. The operator sees the message + the conflicting /// equipment ownership so they can resolve the conflict (pick a new ZTag, release the existing /// reservation via sp_ReleaseExternalIdReservation, etc.) and retry the finalise. /// public sealed class ExternalIdReservationConflictException : Exception { public ExternalIdReservationConflictException(string message) : base(message) { } public ExternalIdReservationConflictException(string message, Exception inner) : base(message, inner) { } }