Strip JDE/CMS data-sync from OLD/ for v5 POC

Remove JDE/CMS source-system integration: JDE/CMS query classes, SQL files,
WorkerService UpdateProcessor pipeline, dsconfig data-source configs, and
Oracle/Sybase/DDTek driver references. WorkProcessor now goes straight to
processing queued searches against the existing local SQL Server cache; DB
schema (DataUpdate table, MatchMis function) is left intact.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
Joseph Doherty
2026-05-04 08:25:08 -04:00
parent a6c4cc2173
commit 427c488cd6
99 changed files with 102 additions and 3814 deletions
@@ -1,158 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using Dapper;
using DataModel.Models;
using DataModel.Process;
using WorkerService.Models;
namespace WorkerService.Process
{
/// <summary>
/// Data update entry management functionality for data update processor
/// </summary>
public partial class UpdateProcessor
{
/// <summary>
/// SQL to close any open update entries (identified by number records = -2)
/// </summary>
private const string SQL_CLOSE_OPEN_UPDATE_ENTRIES = @"
UPDATE dbo.DataUpdate
SET EndDT = GETDATE(), WasSuccessful = 0, NumberRecords = -1
WHERE NumberRecords = -2";
/// <summary>
/// Closes any open data update entries
/// </summary>
private static void CloseOpenUpdateEntries()
{
try
{
using (SqlConnection connection = LotFinderDB.GetConnection())
{
//Close any open update entries
connection.Execute(SQL_CLOSE_OPEN_UPDATE_ENTRIES);
}
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("CloseOpenUpdateEntries: failed to close open data update entries: {0}.", error.Message);
}
}
/// <summary>
/// SQL to purge data update records
/// </summary>
private const string SQL_PURGE_UPDATE_ENTRIES = @"
DELETE FROM dbo.DataUpdate
WHERE StartDT < DATEADD(DAY, @maxAge * -1, GETDATE())";
/// <summary>
/// Purges any data update entries older than given max age
/// </summary>
/// <param name="maxAge">Maximum entry age (in days)</param>
private static void PurgeUpdateEntries(int maxAge)
{
try
{
using (SqlConnection connection = LotFinderDB.GetConnection())
{
//Purge the records
connection.Execute(SQL_PURGE_UPDATE_ENTRIES, new { maxAge });
}
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("PurgeUpdateEntries: failed to purge data update entries older than {0} days: {1}.", maxAge, error.Message);
}
}
/// <summary>
/// SQL to insert data update record
/// </summary>
private const string SQL_LOG_DATA_UPDATE_START = @"
INSERT INTO dbo.DataUpdate(SourceSystem, SourceData, TableName, StartDT, EndDT, UpdateType, WasSuccessful, NumberRecords)
OUTPUT INSERTED.*
VALUES(@sourceSystem, @sourceData, @tableName, GETDATE(), '1970-01-01', @updateType, 0, -2);";
/// <summary>
/// Logs the data update entry at the start of the update
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="dataUpdate">Data update entry to log</param>
public static void LogDataUpdateStart(SqlConnection connection, DataUpdate dataUpdate)
{
try
{
//Update the record
DataUpdate inserted = connection.QueryFirst<DataUpdate>(SQL_LOG_DATA_UPDATE_START,
new
{
sourceSystem = dataUpdate.SourceSystem,
sourceData = dataUpdate.SourceData,
tableName = dataUpdate.TableName,
updateType = dataUpdate.UpdateType
});
//Copy output values to model
dataUpdate.ID = inserted.ID;
dataUpdate.StartDT = inserted.StartDT;
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("LogDataUpdateStart: failed to log starting data update entry: {0}.", error.Message);
}
}
/// <summary>
/// SQL to update data update record with results
/// </summary>
private const string SQL_LOG_DATA_UPDATE_END = @"
UPDATE dbo.DataUpdate
SET EndDT = GETDATE(), WasSuccessful = @wasSuccessful, NumberRecords = @numberRecords
OUTPUT INSERTED.*
WHERE ID = @id";
/// <summary>
/// Logs the data update entry at the end of the update
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="dataUpdate">Data update entry to log</param>
public static void LogDataUpdateEnd(SqlConnection connection, DataUpdate dataUpdate)
{
try
{
//Update the record
DataUpdate updated = connection.QueryFirst<DataUpdate>(SQL_LOG_DATA_UPDATE_END,
new
{
id = dataUpdate.ID,
wasSuccessful = dataUpdate.WasSuccessful,
numberRecords = dataUpdate.NumberRecords
});
//Copy output values to model
dataUpdate.EndDT = updated.EndDT;
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("LogDataUpdateEnd: failed to log ending data update entry: {0}.", error.Message);
}
}
/// <summary>
/// Gets set of last successful data updates for all tables
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <returns>Set of last successful data updates for all tables</returns>
private static List<LastDataUpdate> GetLastDataUpdates(SqlConnection connection)
{
return connection.Query<LastDataUpdate>("SELECT * FROM dbo.LastDataUpdates").ToList();
}
}
}
@@ -1,350 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Reflection;
using System.Text;
using Dapper;
using WorkerService.Models;
namespace WorkerService.Process
{
/// <summary>
/// Table management functionality for data update processor
/// </summary>
public partial class UpdateProcessor
{
/// <summary>
/// Creates staging table with matching column layout of given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to create staging table for</param>
/// <returns>Name of created temporary table</returns>
public static string CreateStagingTable(SqlConnection connection, string tableName)
{
try
{
//Get table specification
TableSpec tableSpec = GetTableSpec(connection, tableName);
//Drop temp table if it already exists
connection.Execute($"IF OBJECT_ID('tempdb..{tableSpec.StagingTableName}') IS NOT NULL DROP TABLE {tableSpec.StagingTableName};");
//Create temp table
connection.Execute($"CREATE TABLE {tableSpec.StagingTableName}({string.Join(",", tableSpec.Columns.Select(c => $"{c.Name} {c.Definition}"))});");
//Create indicies on temp table
StringBuilder builder = new StringBuilder();
builder.Append($"CREATE INDEX IDX_STAGING_{tableSpec.Name} ON {tableSpec.StagingTableName}(");
builder.Append(string.Join(",", tableSpec.PrimaryKey.Select(c => $"{c.Name}")));
if (tableSpec.Columns.Any(c => c.Name.Equals("LastUpdateDT", StringComparison.CurrentCultureIgnoreCase)))
{
builder.Append(", LastUpdateDT DESC");
}
else if (tableSpec.Columns.Any(c => c.Name.Equals("ReleaseDate", StringComparison.CurrentCultureIgnoreCase)))
{
builder.Append(", ReleaseDate DESC");
}
builder.Append(");");
connection.Execute(builder.ToString());
//Disable indicies on temp table
DisableIndicies(connection, tableSpec.StagingTableName);
return tableSpec.StagingTableName;
}
catch (Exception error)
{
//Log and forward error
logger.Error("GetStagingTable: failed to create staging table for '{0}': {1}.", tableName, error.Message);
throw;
}
}
/// <summary>
/// Creates temporary table with matching column layout of given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to create temporary table for</param>
/// <returns>Name of created temporary table</returns>
public static string CreateTempTable(SqlConnection connection, string tableName)
{
try
{
//Get table specification
TableSpec tableSpec = GetTableSpec(connection, tableName);
//Drop temp table if it already exists
connection.Execute($"IF OBJECT_ID('tempdb..{tableSpec.TempTableName}') IS NOT NULL DROP TABLE {tableSpec.TempTableName};");
//Create temp table
connection.Execute($"CREATE TABLE {tableSpec.TempTableName}({string.Join(",", tableSpec.Columns.Select(c => $"{c.Name} {c.Definition}"))}, CONSTRAINT PK_{tableSpec.TempTableName} PRIMARY KEY CLUSTERED({string.Join(",", tableSpec.PrimaryKey.Select(c => $"{c.Name}"))}));");
StringBuilder builder = new StringBuilder();
builder.AppendLine("WITH StagingCTE AS (");
builder.Append($"SELECT st.*, ROW_NUMBER() OVER(PARTITION BY {string.Join(", ", tableSpec.PrimaryKey.Select(c=>c.Name))} ORDER BY {tableSpec.Columns.FirstOrDefault(c => c.Name.Equals("LastUpdateDT", StringComparison.CurrentCultureIgnoreCase) || c.Name.Equals("ReleaseDate", StringComparison.CurrentCultureIgnoreCase))?.Name}) RN FROM {tableSpec.StagingTableName} st");
builder.AppendLine(")");
builder.AppendLine($"INSERT INTO {tableSpec.TempTableName}({string.Join(", ", tableSpec.Columns.Select(c=>c.Name))})");
builder.AppendLine($"SELECT {string.Join(", ", tableSpec.Columns.Select(c => c.Name))} FROM StagingCTE WHERE RN = 1 ORDER BY {string.Join(", ", tableSpec.PrimaryKey.Select(c=>c.Name))};");
connection.Execute(builder.ToString(),commandTimeout:600);
return tableSpec.TempTableName;
}
catch (Exception error)
{
//Log and forward error
logger.Error("CreateTableTable: failed to create temporary table for '{0}': {1}.", tableName, error.Message);
throw;
}
}
/// <summary>
/// Generates merge statement for given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to generate merge statement for</param>
/// <returns>Merge statement for given table</returns>
private static string GenerateMerge(SqlConnection connection, string tableName)
{
try
{
//Get table specification
TableSpec tableSpec = GetTableSpec(connection, tableName);
StringBuilder builder = new StringBuilder();
builder.AppendFormat("MERGE {0} AS TARGET", tableSpec.Name);
builder.AppendLine("");
builder.AppendFormat("USING {0} AS SOURCE ON({1})", tableSpec.TempTableName, string.Join(" AND ", tableSpec.PrimaryKey.Select(c => $"TARGET.{c.Name} = SOURCE.{c.Name}")));
builder.AppendLine("");
builder.Append("WHEN MATCHED");
if (tableSpec.Columns.Exists(c => c.Name.Equals("LastUpdateDT", StringComparison.CurrentCultureIgnoreCase)))
{
builder.Append(" AND TARGET.LastUpdateDT < SOURCE.LastUpdateDT");
}
builder.AppendLine(" THEN");
builder.Append("UPDATE SET ");
builder.Append(string.Join(", ", tableSpec.Columns.Where(c => !tableSpec.PrimaryKey.Contains(c)).Select(c => $"TARGET.{c.Name} = SOURCE.{c.Name}")));
builder.AppendLine();
builder.AppendLine("WHEN NOT MATCHED BY TARGET THEN");
builder.Append("INSERT(");
builder.Append(string.Join(", ", tableSpec.Columns.Select(c => c.Name)));
builder.AppendLine(")");
builder.Append("VALUES(");
builder.Append(string.Join(", ", tableSpec.Columns.Select(c => $"SOURCE.{c.Name}")));
builder.AppendLine(");");
return builder.ToString();
}
catch (Exception error)
{
//Log and forward error
logger.Error("GenerateMerge: failed to generate merge statement for '{0}': {1}.", tableName, error.Message);
throw;
}
}
/// <summary>
/// Generates bulk copy specification for given table
/// </summary>
/// <typeparam name="T">Data type being bulk copied</typeparam>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to generate bulk copy for</param>
/// <returns>Bulk copy for given table</returns>
private static SqlBulkCopy GenerateBulkCopy<T>(SqlConnection connection, string tableName)
{
return GenerateBulkCopy(connection, tableName, typeof(T));
}
/// <summary>
/// Generates bulk copy specification for given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to generate bulk copy for</param>
/// <param name="type">Data type being bulk copied</param>
/// <returns>Bulk copy for given table</returns>
private static SqlBulkCopy GenerateBulkCopy(SqlConnection connection, string tableName, Type type)
{
try
{
//Get table specification
TableSpec tableSpec = GetTableSpec(connection, tableName);
//Get class properties
List<string> properties = type.GetProperties(BindingFlags.Instance | BindingFlags.Public).Select(p => p.Name).ToList();
//Build bulk copy specification
SqlBulkCopy bulkCopy = new SqlBulkCopy(connection)
{
BatchSize = 10000,
NotifyAfter = 5000,
EnableStreaming = true,
DestinationTableName = tableSpec.StagingTableName
};
foreach (ColumnSpec columnSpec in tableSpec.Columns)
{
string property = properties.FirstOrDefault(p => p.Equals(columnSpec.Name, StringComparison.CurrentCultureIgnoreCase));
if (!string.IsNullOrEmpty(property))
{
bulkCopy.ColumnMappings.Add(property, columnSpec.Name);
}
}
return bulkCopy;
}
catch (Exception error)
{
//Log and forward error
logger.Error("GenerateBulkCopy: failed to generate bulk copy for '{0}': {1}.", tableName, error.Message);
throw;
}
}
/// <summary>
/// SQL to get the columns for the given table
/// </summary>
private const string SQL_GET_TABLE_COLUMNS = @"
SELECT c.name AS Name,
CASE t2.name
WHEN 'char' THEN 'CHAR(' + CAST(c.max_length AS VARCHAR(10)) + ')'
WHEN 'varchar' THEN 'VARCHAR(' + CASE c.max_length WHEN -1 THEN 'MAX' ELSE CAST(c.max_length AS VARCHAR(10)) END + ')'
WHEN 'decimal' THEN 'DECIMAL(' + CAST(c.precision AS VARCHAR(4)) + ',' + CAST(c.scale AS VARCHAR(4)) + ')'
ELSE UPPER(t2.name)
END AS Definition
FROM sys.columns c INNER JOIN
sys.types AS t2 ON (c.system_type_id = t2.system_type_id) INNER JOIN
sys.tables t ON (c.object_id = t.object_id)
WHERE t.name = @tableName
ORDER BY c.column_id";
/// <summary>
/// SQL to get the primary key columns for the given table
/// </summary>
private const string SQL_GET_TABLE_PRIMARY_KEY = @"
SELECT COLUMN_NAME AS Name
FROM INFORMATION_SCHEMA.KEY_COLUMN_USAGE
WHERE OBJECTPROPERTY(OBJECT_ID(CONSTRAINT_SCHEMA + '.' + QUOTENAME(CONSTRAINT_NAME)), 'IsPrimaryKey') = 1 AND
TABLE_NAME = @tableName
ORDER BY ORDINAL_POSITION";
/// <summary>
/// Gets the table specification for the given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to get specification for</param>
/// <returns>Table specification for the given table</returns>
public static TableSpec GetTableSpec(SqlConnection connection, string tableName)
{
TableSpec tableSpec = new TableSpec() { Name = tableName };
//Load columns
tableSpec.Columns.AddRange(connection.Query<ColumnSpec>(SQL_GET_TABLE_COLUMNS, new { tableName }));
//Load primary key
tableSpec.PrimaryKey.AddRange(connection.Query<string>(SQL_GET_TABLE_PRIMARY_KEY, new { tableName }).Select(cn => tableSpec.Columns.First(c => c.Name.Equals(cn, StringComparison.CurrentCultureIgnoreCase))));
return tableSpec;
}
/// <summary>
/// Truncates given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to truncate</param>
private static void TruncateTable(SqlConnection connection, string tableName)
{
try
{
logger.Debug("TruncateTable: truncating table '{0}'", tableName);
//Generate and execute SQL to truncate table
string sql = $"TRUNCATE TABLE {tableName};";
connection.Execute(sql);
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("TruncateTable: failed to truncate table '{0}': {1}.", tableName, error.Message);
}
}
/// <summary>
/// SQL to get indices on given table
/// </summary>
private const string SQL_GET_INDICES = @"
SELECT DISTINCT
ind.name AS Name,
ind.is_primary_key AS IsPrimaryKey,
ind.is_unique AS IsUnique,
ind.is_unique_constraint AS IsUniqueConstraint
FROM sys.indexes ind INNER JOIN
sys.index_columns ic ON (ind.object_id = ic.object_id AND ind.index_id = ic.index_id) INNER JOIN
sys.columns col ON (ic.object_id = col.object_id AND ic.column_id = col.column_id) INNER JOIN
sys.tables t ON (ind.object_id = t.object_id)
WHERE t.name = @tableName";
/// <summary>
/// Disables all non-PK indices on given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>
/// <param name="tableName">Name of table to disable non-PK indices for</param>
private static void DisableIndicies(SqlConnection connection, string tableName)
{
try
{
//Get all indices on table
List<Index> indices = connection.Query<Index>(SQL_GET_INDICES, new { tableName }).ToList();
//Loop through all non-PK/non-cluster indices
foreach (Index index in indices.Where(i => !i.IsPrimaryKey && !i.IsUnique && !i.IsUniqueConstraint))
{
//Generate and execute SQL to disable index
string sql = $"ALTER INDEX {index.Name} ON {tableName} DISABLE;";
connection.Execute(sql);
}
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("DisableIndicies: failed to disable non-PK indicies on table '{0}': {1}.", tableName, error.Message);
}
}
/// <summary>
/// Rebuilds all indices on given table
/// </summary>
/// <param name="connection">SQL connection to execute commands on</param>So
/// <param name="tableName">Name of table to rebuild indices for</param>
private static void RebuildIndicies(SqlConnection connection, string tableName)
{
try
{
//Get all indices on table
List<Index> indices = connection.Query<Index>(SQL_GET_INDICES, new { tableName }).ToList();
//Loop through indices
foreach (Index index in indices.Where(i => !i.IsPrimaryKey && !i.IsUnique && !i.IsUniqueConstraint))
{
//Generate and execute SQL to rebuild index
string sql = $"ALTER INDEX {index.Name} ON {tableName} REBUILD;";
connection.Execute(sql);
}
}
catch (Exception error)
{
//Log but do not forward error
logger.Error("RebuildIndicies: failed to rebuild indicies on table '{0}': {1}.", tableName, error.Message);
}
}
}
}
@@ -1,220 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.SqlClient;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Dapper;
using DataModel.Helpers;
using DataModel.Models;
using DataModel.Process;
using Newtonsoft.Json;
using NLog;
using WorkerService.Models;
namespace WorkerService.Process
{
/// <summary>
/// Data update processor
/// </summary>
public partial class UpdateProcessor
{
/// <summary>
/// Shared logger instance
/// </summary>
private static readonly Logger logger = LogManager.GetCurrentClassLogger();
/// <summary>
/// Collection of configured data source update configurations
/// </summary>
public static readonly List<DataSourceConfig> configs = new List<DataSourceConfig>();
/// <summary>
/// Static class initializer
/// </summary>
static UpdateProcessor()
{
foreach (string configFileName in Directory.GetFiles("dsconfig"))
{
//continue;
DataSourceConfig config = JsonConvert.DeserializeObject<DataSourceConfig>(File.ReadAllText(configFileName));
if (config.IsEnabled) { configs.Add(config); }
}
}
public static void DoUpdate(string tableName, UpdateTypes updateType = UpdateTypes.Mass)
{
DataSourceConfig dataSourceConfig = configs.FirstOrDefault(c => c.TableName.Equals(tableName, StringComparison.CurrentCultureIgnoreCase));
//Get last data updates
List<LastDataUpdate> lastDataUpdates = null;
using (SqlConnection connection = LotFinderDB.GetConnection())
{
lastDataUpdates = GetLastDataUpdates(connection);
}
LastDataUpdate lastDataUpdate = lastDataUpdates.FirstOrDefault(ldu => ldu.TableName.Equals(dataSourceConfig.TableName));
DateTime? minDT = null;
switch (updateType)
{
case UpdateTypes.Mass:
minDT = null;
break;
case UpdateTypes.Daily:
minDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval);
break;
case UpdateTypes.Hourly:
minDT = lastDataUpdate.HourlyUpdateDT.AddMinutes(-3 * dataSourceConfig.HourlyUpdateConfig.Interval);
break;
}
DoUpdate(dataSourceConfig, updateType, minDT);
}
/// <summary>
/// Gets list of pending data update tasks
/// </summary>
/// <returns>Pending data update tasks</returns>
public static List<DataUpdateTask> GetPendingUpdateTasks()
{
List<DataUpdateTask> pending = new List<DataUpdateTask>();
using (SqlConnection connection = LotFinderDB.GetConnection())
{
//Get last data updates
List<LastDataUpdate> lastDataUpdates = GetLastDataUpdates(connection);
foreach (DataSourceConfig dataSourceConfig in configs)
{
LastDataUpdate lastDataUpdate = lastDataUpdates.FirstOrDefault(ldu => ldu.TableName.Equals(dataSourceConfig.TableName));
if (lastDataUpdate == null || (dataSourceConfig.MassUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.MassUpdateDT.AddMinutes(dataSourceConfig.MassUpdateConfig.Interval)))
{
pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Mass });
}
else if (dataSourceConfig.DailyUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.DailyUpdateDT.AddMinutes(dataSourceConfig.DailyUpdateConfig.Interval))
{
pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Daily, MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval) });
}
else if (dataSourceConfig.HourlyUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.HourlyUpdateDT.AddMinutes(dataSourceConfig.HourlyUpdateConfig.Interval))
{
pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Hourly, MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval) });
}
}
}
return pending;
}
/// <summary>
/// Performs the data update
/// </summary>
/// <param name="dataUpdateTask">Data update task to execute</param>
/// <returns>Data update results</returns>
public static DataUpdate DoUpdate(DataUpdateTask dataUpdateTask)
{
logger.Info($"Starting [{dataUpdateTask.UpdateType}] data update for {dataUpdateTask.Configuration.TableName}...");
return DoUpdate(dataUpdateTask.Configuration, dataUpdateTask.UpdateType, dataUpdateTask.MinimumDT);
}
/// <summary>
/// Performs the data update
/// </summary>
/// <param name="config">Data source configuration</param>
/// <param name="updateType">Type of update to perform</param>
/// <param name="minDT">Minimum timestamp to update data from</param>
/// <returns>Data update results</returns>
public static DataUpdate DoUpdate(DataSourceConfig config, UpdateTypes updateType, DateTime? minDT)
{
//Log start of data update
DataUpdate dataUpdate = new DataUpdate()
{
SourceSystem = config.SourceSystem,
SourceData = config.SourceData,
TableName = config.TableName,
UpdateType = updateType,
StartDT = DateTime.Now,
NumberRecords = 0
};
//Get data update configuration details
DataUpdateConfig updateConfig;
switch (updateType)
{
case UpdateTypes.Hourly:
updateConfig = config.HourlyUpdateConfig;
break;
case UpdateTypes.Daily:
updateConfig = config.DailyUpdateConfig;
break;
case UpdateTypes.Mass:
updateConfig = config.MassUpdateConfig;
break;
default:
throw new ArgumentOutOfRangeException(nameof(updateType), updateType, null);
}
using (SqlConnection connection = LotFinderDB.GetConnection())
{
LogDataUpdateStart(connection, dataUpdate);
//Clear destination table if needed
if (updateConfig.PrepurgeData)
{
TruncateTable(connection, config.TableName);
}
Type sourceType = config.DataFetchFunction.Method.ReturnType.GenericTypeArguments[0];
//Fetch data
IEnumerable<dynamic> data = config.DataFetchFunction(minDT);
//Generate SQL to merge temp data to destination table
string mergeSQL = GenerateMerge(connection, config.TableName);
foreach (var batch in data.BatchGroup(1000000))
{
//Setup temp table
string stagingTableName = CreateStagingTable(connection, config.TableName);
//Copy data to temp table
SqlBulkCopy bulkCopy = GenerateBulkCopy(connection, config.TableName, sourceType);
IDataReader reader = new GenericListDataReader(batch, sourceType);
bulkCopy.WriteToServer(reader);
dataUpdate.NumberRecords += batch.Count;
//Index temp table
RebuildIndicies(connection, stagingTableName);
//Copy to temp table
string tempTableName = CreateTempTable(connection, config.TableName);
//Merge data from temp table to destination table
connection.Execute(mergeSQL, commandTimeout: 6000);
logger.Debug("DoUpdate: {0:n0} rows merged to {1}", dataUpdate.NumberRecords, config.TableName);
}
//Run post processing action if configured
if (config.PostProcessingAction != null)
{
config.PostProcessingAction();
}
//Re-index destination table if needed
if (updateConfig.ReIndexData)
{
RebuildIndicies(connection, dataUpdate.TableName);
}
//Update data update entry
dataUpdate.WasSuccessful = true;
LogDataUpdateEnd(connection, dataUpdate);
}
return dataUpdate;
}
}
}
@@ -1,240 +0,0 @@
using System;
using System.Collections.Generic;
using System.Data.SQLite;
using System.IO;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Commons.Helpers;
using Commons.Models;
using Commons.Process;
using Dapper;
namespace WorkerService.Process
{
public partial class Updater
{
private const string SQL_GET_MIS_DATA = @"
select
ItemNumber, BranchCode, SequenceNumber, MisNumber, RevID,
CharNumber, TestGroup, TestDefinition, TestDescription, SamplingType, SamplingValue, ToolsGauges, WorkInstructions,
Status, ReleaseDate, ObsoleteDate
FROM infodb.MisInfo info INNER JOIN
relationdb.MisRelation rel ON (info.RevFK = rel.RevFK) INNER JOIN
detaildb.MisDetail det ON (rel.MisFK = det.MisFK) INNER JOIN
partdb.MisPart part ON (det.PartFK = part.PartFK)
ORDER BY ItemNumber, BranchCode, SequenceNumber, MisNumber, RevID, Status, CharNumber";
public static IEnumerable<MisData> GetMisData()
{
Task misInfoTask = Task.Run(() => PrepareMisInfo());
Task misRelationTask = Task.Run(() => PrepareMisRelation());
Task misDetailTask = Task.Run(() => PrepareMisDetail());
Task misPartTask = Task.Run(() => PrepareMisPart());
Task.WaitAll(misInfoTask, misRelationTask, misDetailTask, misPartTask);
if (File.Exists("misdata.db"))
{
File.Delete("misdata.db");
}
SQLiteConnection.CreateFile("misdata.db");
using (SQLiteConnection sqlite = new SQLiteConnection(@"Data Source=misdata.db;Version=3;Journal Mode=Off;"))
{
sqlite.Open();
//Attach databases
sqlite.Execute("ATTACH DATABASE 'misinfo.db' AS infodb;");
sqlite.Execute("ATTACH DATABASE 'misrelation.db' AS relationdb;");
sqlite.Execute("ATTACH DATABASE 'misdetail.db' AS detaildb;");
sqlite.Execute("ATTACH DATABASE 'mispart.db' AS partdb;");
foreach (MisData misData in sqlite.Query<MisData>(SQL_GET_MIS_DATA, buffered: false))
{
yield return misData;
}
}
}
private static void PrepareMisInfo()
{
return;
if (File.Exists("misinfo.db"))
{
File.Delete("misinfo.db");
}
SQLiteConnection.CreateFile("misinfo.db");
//Fetch MIS info
List<MisInfo> misInfos = CMS.GetMisInfos().ToList();
//Lookup obsolete date by backlevel release
foreach (var group in misInfos.GroupBy(m => new { m.MisNumber, m.RevID }))
{
DateTime? obsoleteDate = group.Where(g => g.Status.Equals("BackLevel") && g.ReleaseDate != null).Select(g => g.ReleaseDate).FirstOrDefault();
foreach (MisInfo current in group.Where(g => g.Status.Equals("Current")))
{
current.ObsoleteDate = obsoleteDate;
}
}
//Lookup obsolete date by next revision release
var lookup = misInfos.Where(m => m.ReleaseDate != null).GroupBy(m => new { m.MisNumber, m.RevID, m.Status }).Select(g => new { g.Key.MisNumber, g.Key.RevID, g.Key.Status, ReleaseDate = g.Min(m => m.ReleaseDate) }).ToDictionary(g => new { g.MisNumber, g.RevID, g.Status }, g => g.ReleaseDate);
foreach (MisInfo misInfo in misInfos.Where(m => !m.ObsoleteDate.HasValue))
{
DateTime? obsoleteDate;
if (lookup.TryGetValue(new { misInfo.MisNumber, misInfo.RevID, misInfo.Status }, out obsoleteDate))
{
misInfo.ObsoleteDate = obsoleteDate;
}
}
//Write MIS info to database
using (SQLiteConnection sqlite = new SQLiteConnection(@"Data Source=misinfo.db;Version=3;Journal Mode=Off;"))
{
sqlite.Open();
//Create table for MIS info
sqlite.Execute("CREATE TABLE MisInfo(RevFK TEXT, MisNumber TEXT, RevID Text, Status TEXT, ReleaseDate DATETIME, ObsoleteDate DATETIME);");
foreach (var batch in misInfos.BatchGroup(10000))
{
using (SQLiteTransaction transaction = sqlite.BeginTransaction())
{
foreach (MisInfo misInfo in batch)
{
sqlite.Execute("INSERT INTO MisInfo(RevFK, MisNumber, RevID, Status, ReleaseDate, ObsoleteDate) VALUES(@RevFK, @MisNumber, @RevID, @Status, @ReleaseDate, @ObsoleteDate)", misInfo);
}
transaction.Commit();
}
}
//Add index on FK field
sqlite.Execute("CREATE INDEX IDX_MisInfo_RevFK ON MisInfo(RevFK);");
sqlite.Execute("VACUUM");
}
}
private static void PrepareMisRelation()
{
return;
if (File.Exists("misrelation.db"))
{
File.Delete("misrelation.db");
}
SQLiteConnection.CreateFile("misrelation.db");
IEnumerable<MisRelation> misRelations = CMS.GetMisRelations();
//Write MIS relation to database
using (SQLiteConnection sqlite = new SQLiteConnection(@"Data Source=misrelation.db;Version=3;Journal Mode=Off;"))
{
sqlite.Open();
//Create table for MIS relation
sqlite.Execute("CREATE TABLE MisRelation(RevFK TEXT, MisFK TEXT);");
foreach (var batch in misRelations.BatchGroup(10000))
{
using (SQLiteTransaction transaction = sqlite.BeginTransaction())
{
foreach (MisRelation misRelation in batch)
{
sqlite.Execute("INSERT INTO MisRelation(RevFK, MisFK) VALUES(@RevFK, @MisFK)", misRelation);
}
transaction.Commit();
}
}
//Add indices on FK fields
sqlite.Execute("CREATE INDEX IDX_MisRelation_RevFK ON MisRelation(RevFK);");
sqlite.Execute("CREATE INDEX IDX_MisRelation_MisFK ON MisRelation(MisFK);");
sqlite.Execute("VACUUM");
}
}
public static void PrepareMisDetail()
{
return;
if (File.Exists("misdetail.db"))
{
File.Delete("misdetail.db");
}
SQLiteConnection.CreateFile("misdetail.db");
IEnumerable<MisDetail> misDetails = CMS.GetMisDetails();
//Write MIS relation to database
using (SQLiteConnection sqlite = new SQLiteConnection(@"Data Source=misdetail.db;Version=3;Journal Mode=Off;"))
{
sqlite.Open();
//Create table for MIS detail
sqlite.Execute("CREATE TABLE MisDetail(MisFK TEXT, PartFK TEXT, CharNumber TEXT, TestGroup TEXT, TestDefinition TEXT, TestDescription TEXT, SamplingType TEXT, SamplingValue TEXT, ToolsGauges TEXT, WorkInstructions TEXT);");
foreach (var batch in misDetails.BatchGroup(250000))
{
using (SQLiteTransaction transaction = sqlite.BeginTransaction())
{
foreach (MisDetail misDetail in batch)
{
sqlite.Execute("INSERT INTO MisDetail(MisFK, PartFK, CharNumber, TestGroup, TestDefinition, TestDescription, SamplingType, SamplingValue, ToolsGauges, WorkInstructions) VALUES(@MisFK, @PartFK, @CharNumber, @TestGroup, @TestDefinition, @TestDescription, @SamplingType, @SamplingValue, @ToolsGauges, @WorkInstructions)", misDetail);
}
transaction.Commit();
}
}
//Add indices on FK fields
sqlite.Execute("CREATE INDEX IDX_MisDetail_MisFK ON MisDetail(MisFK);");
sqlite.Execute("CREATE INDEX IDX_MisDetail_PartFK ON MisDetail(PartFK);");
sqlite.Execute("VACUUM");
}
}
private static void PrepareMisPart()
{
return;
if (File.Exists("mispart.db"))
{
File.Delete("mispart.db");
}
SQLiteConnection.CreateFile("mispart.db");
IEnumerable<MisPart> misParts = CMS.GetMisParts();
//Write MIS relation to database
using (SQLiteConnection sqlite = new SQLiteConnection(@"Data Source=mispart.db;Version=3;Journal Mode=Off;"))
{
sqlite.Open();
//Create table for MIS detail
sqlite.Execute("CREATE TABLE MisPart(PartFK TEXT, ItemNumber TEXT, BranchCode TEXT, SequenceNumber TEXT);");
foreach (var batch in misParts.BatchGroup(100000))
{
using (SQLiteTransaction transaction = sqlite.BeginTransaction())
{
foreach (MisPart misPart in batch)
{
sqlite.Execute("INSERT INTO MisPart(PartFK, ItemNumber, BranchCode, SequenceNumber) VALUES(@PartFK, @ItemNumber, @BranchCode, @SequenceNumber);", misPart);
}
transaction.Commit();
}
}
//Add indices on FK fields
sqlite.Execute("CREATE INDEX IDX_MisPart_PartFK ON MisPart(PartFK);");
sqlite.Execute("VACUUM");
}
}
}
}
+30 -43
View File
@@ -1,10 +1,7 @@
using System;
using System.Collections.Generic;
using System.Configuration;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using DataModel.Models;
using Microsoft.AspNet.SignalR.Client;
using NLog;
@@ -146,54 +143,44 @@ namespace WorkerService.Process
{
try
{
//Verify all data sources up to date
List<DataUpdateTask> pending = UpdateProcessor.GetPendingUpdateTasks();
if (pending.Any())
{
Status = "Updating data cache";
Parallel.ForEach(pending, new ParallelOptions() { MaxDegreeOfParallelism = 8 }, pendingTask => { UpdateProcessor.DoUpdate(pendingTask); });
}
else
{
//Reset any partially completed searches
LotFinderDBExt.ResetPartialSearches();
//Reset any partially completed searches
LotFinderDBExt.ResetPartialSearches();
//Check for queued searches
Search search = LotFinderDBExt.GetNextSearch();
if (search != null)
//Check for queued searches
Search search = LotFinderDBExt.GetNextSearch();
if (search != null)
{
try
{
try
{
Status = $"Processing search #{search.ID}";
Status = $"Processing search #{search.ID}";
//Start search
LotFinderDBExt.StartSearch(search);
PublishSearchUpdate(search);
//Start search
LotFinderDBExt.StartSearch(search);
PublishSearchUpdate(search);
//Do search
SearchModel searchModel = search.ToSearchModel();
LotFinderDBExt.Search(searchModel);
//Do search
SearchModel searchModel = search.ToSearchModel();
LotFinderDBExt.Search(searchModel);
//Record end timestamp
search.EndDT = DateTime.Now;
//Record end timestamp
search.EndDT = DateTime.Now;
//Generate output
search.Results = ExcelWriter.Generate(searchModel);
//Generate output
search.Results = ExcelWriter.Generate(searchModel);
File.WriteAllBytes($"search_{search.ID}.xlsx", search.Results);
File.WriteAllBytes($"search_{search.ID}.xlsx", search.Results);
//Complete search
LotFinderDBExt.CompleteSearch(search, true);
PublishSearchUpdate(search);
}
catch (Exception error)
{
//Log error and mark search as failed
logger.Error("DoWork: failed to process search: {0}.", error.Message);
search.EndDT = DateTime.Now;
LotFinderDBExt.CompleteSearch(search, false);
PublishSearchUpdate(search);
}
//Complete search
LotFinderDBExt.CompleteSearch(search, true);
PublishSearchUpdate(search);
}
catch (Exception error)
{
//Log error and mark search as failed
logger.Error("DoWork: failed to process search: {0}.", error.Message);
search.EndDT = DateTime.Now;
LotFinderDBExt.CompleteSearch(search, false);
PublishSearchUpdate(search);
}
}