356 lines
14 KiB
C#
356 lines
14 KiB
C#
using Microsoft.Extensions.Logging;
|
|
using ZB.MOM.WW.GalaxyRepository.Grpc;
|
|
|
|
namespace ZB.MOM.WW.GalaxyRepository;
|
|
|
|
/// <summary>
|
|
/// Server-side cache of Galaxy Repository browse data. All gRPC clients share the same
|
|
/// entry — the materialized object list is produced once per refresh and reused across
|
|
/// requests. Refreshes are deploy-time gated: every tick queries
|
|
/// <c>galaxy.time_of_last_deploy</c> (cheap), and the heavy hierarchy + attributes rowsets
|
|
/// are pulled only when that timestamp has advanced.
|
|
/// Each successful heavy refresh is persisted to disk through
|
|
/// <see cref="IGalaxyHierarchySnapshotStore"/>; the first refresh restores that
|
|
/// snapshot (as <see cref="GalaxyCacheStatus.Stale"/>) so clients can browse
|
|
/// last-known data when the Galaxy database is unreachable on a cold start.
|
|
/// </summary>
|
|
public sealed class GalaxyHierarchyCache : IGalaxyHierarchyCache
|
|
{
|
|
private static readonly TimeSpan StaleThreshold = TimeSpan.FromMinutes(5);
|
|
|
|
private readonly IGalaxyRepository _repository;
|
|
private readonly IGalaxyDeployNotifier _notifier;
|
|
private readonly IGalaxyHierarchySnapshotStore? _snapshotStore;
|
|
private readonly TimeProvider _timeProvider;
|
|
private readonly ILogger<GalaxyHierarchyCache>? _logger;
|
|
private readonly TaskCompletionSource _firstLoad = new(TaskCreationOptions.RunContinuationsAsynchronously);
|
|
private readonly SemaphoreSlim _refreshGate = new(1, 1);
|
|
private GalaxyHierarchyCacheEntry _current = GalaxyHierarchyCacheEntry.Empty;
|
|
private bool _restoreAttempted;
|
|
|
|
/// <summary>Initializes a new instance of the <see cref="GalaxyHierarchyCache"/> class.</summary>
|
|
/// <param name="repository">Galaxy Repository client for SQL queries.</param>
|
|
/// <param name="notifier">Galaxy deploy event notifier.</param>
|
|
/// <param name="timeProvider">Provider for current time; defaults to system time.</param>
|
|
/// <param name="logger">Optional logger for diagnostic output.</param>
|
|
/// <param name="snapshotStore">
|
|
/// Optional on-disk snapshot store. When supplied, the cache persists each
|
|
/// successful refresh and restores the last snapshot on first load.
|
|
/// </param>
|
|
public GalaxyHierarchyCache(
|
|
IGalaxyRepository repository,
|
|
IGalaxyDeployNotifier notifier,
|
|
TimeProvider? timeProvider = null,
|
|
ILogger<GalaxyHierarchyCache>? logger = null,
|
|
IGalaxyHierarchySnapshotStore? snapshotStore = null)
|
|
{
|
|
_repository = repository;
|
|
_notifier = notifier;
|
|
_timeProvider = timeProvider ?? TimeProvider.System;
|
|
_logger = logger;
|
|
_snapshotStore = snapshotStore;
|
|
}
|
|
|
|
/// <summary>Gets the current Galaxy hierarchy cache entry with projected status.</summary>
|
|
public GalaxyHierarchyCacheEntry Current
|
|
{
|
|
get
|
|
{
|
|
GalaxyHierarchyCacheEntry snapshot = Volatile.Read(ref _current);
|
|
GalaxyCacheStatus projected = ProjectStatus(snapshot);
|
|
return projected == snapshot.Status
|
|
? snapshot
|
|
: snapshot with { Status = projected };
|
|
}
|
|
}
|
|
|
|
/// <summary>Refreshes the Galaxy hierarchy cache if the deploy time has advanced.</summary>
|
|
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
|
|
/// <returns>Asynchronous task representing the refresh operation.</returns>
|
|
public async Task RefreshAsync(CancellationToken cancellationToken)
|
|
{
|
|
await _refreshGate.WaitAsync(cancellationToken).ConfigureAwait(false);
|
|
try
|
|
{
|
|
await RefreshCoreAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
finally
|
|
{
|
|
_refreshGate.Release();
|
|
}
|
|
}
|
|
|
|
/// <summary>Waits for the Galaxy hierarchy cache to complete its first load.</summary>
|
|
/// <param name="cancellationToken">Token to cancel the asynchronous operation.</param>
|
|
/// <returns>Asynchronous task representing the wait operation.</returns>
|
|
public Task WaitForFirstLoadAsync(CancellationToken cancellationToken)
|
|
{
|
|
return _firstLoad.Task.WaitAsync(cancellationToken);
|
|
}
|
|
|
|
private async Task RefreshCoreAsync(CancellationToken cancellationToken)
|
|
{
|
|
// First refresh only: seed the cache from the on-disk snapshot before
|
|
// querying SQL, so a cold start with an unreachable Galaxy database can
|
|
// still serve last-known browse data. Runs under the refresh gate.
|
|
if (!_restoreAttempted)
|
|
{
|
|
_restoreAttempted = true;
|
|
await TryRestoreFromDiskAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
|
|
GalaxyHierarchyCacheEntry previous = Volatile.Read(ref _current);
|
|
DateTimeOffset queriedAt = _timeProvider.GetUtcNow();
|
|
|
|
try
|
|
{
|
|
DateTime? deployRaw = await _repository.GetLastDeployTimeAsync(cancellationToken).ConfigureAwait(false);
|
|
DateTimeOffset? deployTime = deployRaw.HasValue
|
|
? new DateTimeOffset(DateTime.SpecifyKind(deployRaw.Value, DateTimeKind.Utc))
|
|
: null;
|
|
|
|
bool hasPriorData = previous.HasData;
|
|
bool deployChanged = !hasPriorData || deployTime != previous.LastDeployTime;
|
|
|
|
if (!deployChanged)
|
|
{
|
|
// No deploy change — skip heavy queries; just bump LastSuccessAt.
|
|
GalaxyHierarchyCacheEntry refreshed = previous with
|
|
{
|
|
Status = GalaxyCacheStatus.Healthy,
|
|
LastQueriedAt = queriedAt,
|
|
LastSuccessAt = queriedAt,
|
|
LastError = null,
|
|
};
|
|
Volatile.Write(ref _current, refreshed);
|
|
_firstLoad.TrySetResult();
|
|
return;
|
|
}
|
|
|
|
Task<List<GalaxyHierarchyRow>> hierarchyTask = _repository.GetHierarchyAsync(cancellationToken);
|
|
Task<List<GalaxyAttributeRow>> attributesTask = _repository.GetAttributesAsync(cancellationToken);
|
|
await Task.WhenAll(hierarchyTask, attributesTask).ConfigureAwait(false);
|
|
|
|
List<GalaxyHierarchyRow> hierarchy = hierarchyTask.Result;
|
|
List<GalaxyAttributeRow> attributes = attributesTask.Result;
|
|
|
|
long nextSequence = previous.Sequence + 1;
|
|
GalaxyHierarchyCacheEntry next = BuildEntry(
|
|
status: GalaxyCacheStatus.Healthy,
|
|
sequence: nextSequence,
|
|
lastQueriedAt: queriedAt,
|
|
lastSuccessAt: queriedAt,
|
|
lastDeployTime: deployTime,
|
|
lastError: null,
|
|
hierarchy: hierarchy,
|
|
attributes: attributes);
|
|
|
|
Volatile.Write(ref _current, next);
|
|
_firstLoad.TrySetResult();
|
|
|
|
_notifier.Publish(new GalaxyDeployEventInfo(
|
|
Sequence: nextSequence,
|
|
ObservedAt: queriedAt,
|
|
TimeOfLastDeploy: deployTime,
|
|
ObjectCount: hierarchy.Count,
|
|
AttributeCount: attributes.Count));
|
|
|
|
await PersistSnapshotAsync(deployTime, queriedAt, hierarchy, attributes, cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
// Catch every non-cancellation failure — not just SqlException /
|
|
// InvalidOperationException. A TimeoutException or Win32Exception
|
|
// from connection establishment, or another DbException subtype,
|
|
// must still degrade gracefully to Stale/Unavailable and complete
|
|
// _firstLoad rather than escape and fault the refresh BackgroundService.
|
|
_logger?.LogWarning(exception, "Galaxy hierarchy cache refresh failed.");
|
|
GalaxyHierarchyCacheEntry failed = previous with
|
|
{
|
|
Status = previous.HasData ? GalaxyCacheStatus.Stale : GalaxyCacheStatus.Unavailable,
|
|
LastQueriedAt = queriedAt,
|
|
LastError = exception.Message,
|
|
};
|
|
Volatile.Write(ref _current, failed);
|
|
_firstLoad.TrySetResult();
|
|
}
|
|
}
|
|
|
|
/// <summary>
|
|
/// Materializes a complete <see cref="GalaxyHierarchyCacheEntry"/> from raw
|
|
/// hierarchy and attribute rowsets. Shared by the live refresh path and the
|
|
/// on-disk restore path so both produce an identical object list and index.
|
|
/// </summary>
|
|
private static GalaxyHierarchyCacheEntry BuildEntry(
|
|
GalaxyCacheStatus status,
|
|
long sequence,
|
|
DateTimeOffset? lastQueriedAt,
|
|
DateTimeOffset? lastSuccessAt,
|
|
DateTimeOffset? lastDeployTime,
|
|
string? lastError,
|
|
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
|
|
IReadOnlyList<GalaxyAttributeRow> attributes)
|
|
{
|
|
IReadOnlyList<GalaxyObject> objects = BuildObjects(hierarchy, attributes);
|
|
GalaxyHierarchyIndex index = GalaxyHierarchyIndex.Build(objects);
|
|
|
|
int areaCount = hierarchy.Count(row => row.IsArea);
|
|
int historized = attributes.Count(row => row.IsHistorized);
|
|
int alarms = attributes.Count(row => row.IsAlarm);
|
|
|
|
return new GalaxyHierarchyCacheEntry(
|
|
Status: status,
|
|
Sequence: sequence,
|
|
LastQueriedAt: lastQueriedAt,
|
|
LastSuccessAt: lastSuccessAt,
|
|
LastDeployTime: lastDeployTime,
|
|
LastError: lastError,
|
|
Objects: objects,
|
|
Index: index,
|
|
ObjectCount: hierarchy.Count,
|
|
AreaCount: areaCount,
|
|
AttributeCount: attributes.Count,
|
|
HistorizedAttributeCount: historized,
|
|
AlarmAttributeCount: alarms);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Seeds the cache from the on-disk snapshot when no live data has loaded yet.
|
|
/// The restored entry is marked <see cref="GalaxyCacheStatus.Stale"/> — it is
|
|
/// last-known data, not live. A later refresh that observes the same deploy
|
|
/// time promotes it to healthy; one that observes a newer deploy replaces it.
|
|
/// </summary>
|
|
private async Task TryRestoreFromDiskAsync(CancellationToken cancellationToken)
|
|
{
|
|
if (_snapshotStore is null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
if (Volatile.Read(ref _current).HasData)
|
|
{
|
|
return;
|
|
}
|
|
|
|
GalaxyHierarchySnapshot? snapshot;
|
|
try
|
|
{
|
|
snapshot = await _snapshotStore.TryLoadAsync(cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
throw;
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger?.LogWarning(exception, "Failed to restore the Galaxy hierarchy from the on-disk snapshot.");
|
|
return;
|
|
}
|
|
|
|
if (snapshot is null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
long sequence = Volatile.Read(ref _current).Sequence + 1;
|
|
GalaxyHierarchyCacheEntry restored = BuildEntry(
|
|
status: GalaxyCacheStatus.Stale,
|
|
sequence: sequence,
|
|
lastQueriedAt: snapshot.SavedAt,
|
|
lastSuccessAt: snapshot.SavedAt,
|
|
lastDeployTime: snapshot.LastDeployTime,
|
|
lastError: null,
|
|
hierarchy: snapshot.Hierarchy,
|
|
attributes: snapshot.Attributes);
|
|
Volatile.Write(ref _current, restored);
|
|
|
|
// Restored data is a valid completed first load: unblock callers waiting on
|
|
// the bootstrap gate immediately, rather than making them wait out the full
|
|
// wait budget for a live query that — when the database is unreachable, the
|
|
// scenario this restore exists for — may not return for seconds.
|
|
_firstLoad.TrySetResult();
|
|
|
|
_notifier.Publish(new GalaxyDeployEventInfo(
|
|
Sequence: sequence,
|
|
ObservedAt: _timeProvider.GetUtcNow(),
|
|
TimeOfLastDeploy: snapshot.LastDeployTime,
|
|
ObjectCount: snapshot.Hierarchy.Count,
|
|
AttributeCount: snapshot.Attributes.Count));
|
|
|
|
_logger?.LogInformation(
|
|
"Restored Galaxy hierarchy from on-disk snapshot saved {SavedAt:o}: {ObjectCount} objects, {AttributeCount} attributes (status Stale until the Galaxy database confirms).",
|
|
snapshot.SavedAt,
|
|
snapshot.Hierarchy.Count,
|
|
snapshot.Attributes.Count);
|
|
}
|
|
|
|
/// <summary>
|
|
/// Persists a successful refresh to disk. Persistence failures are logged and
|
|
/// swallowed — a cache that cannot write its backup is still fully usable.
|
|
/// </summary>
|
|
private async Task PersistSnapshotAsync(
|
|
DateTimeOffset? deployTime,
|
|
DateTimeOffset savedAt,
|
|
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
|
|
IReadOnlyList<GalaxyAttributeRow> attributes,
|
|
CancellationToken cancellationToken)
|
|
{
|
|
if (_snapshotStore is null)
|
|
{
|
|
return;
|
|
}
|
|
|
|
try
|
|
{
|
|
await _snapshotStore.SaveAsync(
|
|
new GalaxyHierarchySnapshot(deployTime, savedAt, hierarchy, attributes),
|
|
cancellationToken).ConfigureAwait(false);
|
|
}
|
|
catch (OperationCanceledException) when (cancellationToken.IsCancellationRequested)
|
|
{
|
|
// The refresh was cancelled (service shutdown) before the write finished.
|
|
// That is not a persistence failure — do not log it as a warning.
|
|
}
|
|
catch (Exception exception)
|
|
{
|
|
_logger?.LogWarning(exception, "Failed to persist the Galaxy hierarchy snapshot to disk.");
|
|
}
|
|
}
|
|
|
|
private static IReadOnlyList<GalaxyObject> BuildObjects(
|
|
IReadOnlyList<GalaxyHierarchyRow> hierarchy,
|
|
IReadOnlyList<GalaxyAttributeRow> attributes)
|
|
{
|
|
Dictionary<int, List<GalaxyAttributeRow>> attributesByGobjectId = attributes
|
|
.GroupBy(a => a.GobjectId)
|
|
.ToDictionary(g => g.Key, g => g.ToList());
|
|
|
|
List<GalaxyObject> objects = new(hierarchy.Count);
|
|
foreach (GalaxyHierarchyRow row in hierarchy)
|
|
{
|
|
objects.Add(GalaxyProtoMapper.MapObject(row, attributesByGobjectId));
|
|
}
|
|
return objects;
|
|
}
|
|
|
|
private GalaxyCacheStatus ProjectStatus(GalaxyHierarchyCacheEntry snapshot)
|
|
{
|
|
if (snapshot.Status is GalaxyCacheStatus.Unknown or GalaxyCacheStatus.Unavailable)
|
|
{
|
|
return snapshot.Status;
|
|
}
|
|
|
|
if (snapshot.LastSuccessAt is { } success
|
|
&& _timeProvider.GetUtcNow() - success > StaleThreshold)
|
|
{
|
|
return GalaxyCacheStatus.Stale;
|
|
}
|
|
|
|
return snapshot.Status;
|
|
}
|
|
}
|