using System; using System.Collections.Generic; using System.Data; using System.Data.SqlClient; using System.IO; using System.Linq; using System.Threading.Tasks; using Dapper; using DataModel.Helpers; using DataModel.Models; using DataModel.Process; using Newtonsoft.Json; using NLog; using WorkerService.Models; namespace WorkerService.Process { /// /// Data update processor /// public partial class UpdateProcessor { /// /// Shared logger instance /// private static readonly Logger logger = LogManager.GetCurrentClassLogger(); /// /// Collection of configured data source update configurations /// public static readonly List configs = new List(); /// /// Static class initializer /// static UpdateProcessor() { foreach (string configFileName in Directory.GetFiles("dsconfig")) { //continue; DataSourceConfig config = JsonConvert.DeserializeObject(File.ReadAllText(configFileName)); if (config.IsEnabled) { configs.Add(config); } } } public static void DoUpdate(string tableName, UpdateTypes updateType = UpdateTypes.Mass) { DataSourceConfig dataSourceConfig = configs.FirstOrDefault(c => c.TableName.Equals(tableName, StringComparison.CurrentCultureIgnoreCase)); //Get last data updates List lastDataUpdates = null; using (SqlConnection connection = LotFinderDB.GetConnection()) { lastDataUpdates = GetLastDataUpdates(connection); } LastDataUpdate lastDataUpdate = lastDataUpdates.FirstOrDefault(ldu => ldu.TableName.Equals(dataSourceConfig.TableName)); DateTime? minDT = null; switch (updateType) { case UpdateTypes.Mass: minDT = null; break; case UpdateTypes.Daily: minDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval); break; case UpdateTypes.Hourly: minDT = lastDataUpdate.HourlyUpdateDT.AddMinutes(-3 * dataSourceConfig.HourlyUpdateConfig.Interval); break; } DoUpdate(dataSourceConfig, updateType, minDT); } /// /// Gets list of pending data update tasks /// /// Pending data update tasks public static List GetPendingUpdateTasks() { List pending = new List(); using (SqlConnection connection = LotFinderDB.GetConnection()) { //Get last data updates List lastDataUpdates = GetLastDataUpdates(connection); foreach (DataSourceConfig dataSourceConfig in configs) { LastDataUpdate lastDataUpdate = lastDataUpdates.FirstOrDefault(ldu => ldu.TableName.Equals(dataSourceConfig.TableName)); if (lastDataUpdate == null || (dataSourceConfig.MassUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.MassUpdateDT.AddMinutes(dataSourceConfig.MassUpdateConfig.Interval))) { pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Mass }); } else if (dataSourceConfig.DailyUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.DailyUpdateDT.AddMinutes(dataSourceConfig.DailyUpdateConfig.Interval)) { pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Daily, MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval) }); } else if (dataSourceConfig.HourlyUpdateConfig.Enabled && DateTime.Now > lastDataUpdate.HourlyUpdateDT.AddMinutes(dataSourceConfig.HourlyUpdateConfig.Interval)) { pending.Add(new DataUpdateTask() { Configuration = dataSourceConfig, UpdateType = UpdateTypes.Hourly, MinimumDT = lastDataUpdate.DailyUpdateDT.AddMinutes(-3 * dataSourceConfig.DailyUpdateConfig.Interval) }); } } } return pending; } /// /// Performs the data update /// /// Data update task to execute /// Data update results public static DataUpdate DoUpdate(DataUpdateTask dataUpdateTask) { logger.Info($"Starting [{dataUpdateTask.UpdateType}] data update for {dataUpdateTask.Configuration.TableName}..."); return DoUpdate(dataUpdateTask.Configuration, dataUpdateTask.UpdateType, dataUpdateTask.MinimumDT); } /// /// Performs the data update /// /// Data source configuration /// Type of update to perform /// Minimum timestamp to update data from /// Data update results public static DataUpdate DoUpdate(DataSourceConfig config, UpdateTypes updateType, DateTime? minDT) { //Log start of data update DataUpdate dataUpdate = new DataUpdate() { SourceSystem = config.SourceSystem, SourceData = config.SourceData, TableName = config.TableName, UpdateType = updateType, StartDT = DateTime.Now, NumberRecords = 0 }; //Get data update configuration details DataUpdateConfig updateConfig; switch (updateType) { case UpdateTypes.Hourly: updateConfig = config.HourlyUpdateConfig; break; case UpdateTypes.Daily: updateConfig = config.DailyUpdateConfig; break; case UpdateTypes.Mass: updateConfig = config.MassUpdateConfig; break; default: throw new ArgumentOutOfRangeException(nameof(updateType), updateType, null); } using (SqlConnection connection = LotFinderDB.GetConnection()) { LogDataUpdateStart(connection, dataUpdate); //Clear destination table if needed if (updateConfig.PrepurgeData) { TruncateTable(connection, config.TableName); } Type sourceType = config.DataFetchFunction.Method.ReturnType.GenericTypeArguments[0]; //Fetch data IEnumerable data = config.DataFetchFunction(minDT); //Generate SQL to merge temp data to destination table string mergeSQL = GenerateMerge(connection, config.TableName); foreach (var batch in data.BatchGroup(1000000)) { //Setup temp table string stagingTableName = CreateStagingTable(connection, config.TableName); //Copy data to temp table SqlBulkCopy bulkCopy = GenerateBulkCopy(connection, config.TableName, sourceType); IDataReader reader = new GenericListDataReader(batch, sourceType); bulkCopy.WriteToServer(reader); dataUpdate.NumberRecords += batch.Count; //Index temp table RebuildIndicies(connection, stagingTableName); //Copy to temp table string tempTableName = CreateTempTable(connection, config.TableName); //Merge data from temp table to destination table connection.Execute(mergeSQL, commandTimeout: 6000); logger.Debug("DoUpdate: {0:n0} rows merged to {1}", dataUpdate.NumberRecords, config.TableName); } //Run post processing action if configured if (config.PostProcessingAction != null) { config.PostProcessingAction(); } //Re-index destination table if needed if (updateConfig.ReIndexData) { RebuildIndicies(connection, dataUpdate.TableName); } //Update data update entry dataUpdate.WasSuccessful = true; LogDataUpdateEnd(connection, dataUpdate); } return dataUpdate; } } }