diff --git a/NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs index 584de0e..de04788 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/BranchDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/FunctionCodeDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/FunctionCodeDevEtl.cs index 11d13c7..9d9102c 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/FunctionCodeDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/FunctionCodeDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/ItemDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/ItemDevEtl.cs index 8d58929..7b3b645 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/ItemDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/ItemDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/JdeUserDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/JdeUserDevEtl.cs index 419247c..5759c20 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/JdeUserDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/JdeUserDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/LotDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/LotDevEtl.cs index 643c4e4..52a2411 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/LotDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/LotDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/LotUsageCurrDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/LotUsageCurrDevEtl.cs index 7d580e9..42cf9fe 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/LotUsageCurrDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/LotUsageCurrDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/LotUsageHistDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/LotUsageHistDevEtl.cs index 7c9079e..1a2604a 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/LotUsageHistDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/LotUsageHistDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/MisDataDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/MisDataDevEtl.cs index 4d07326..f9f238a 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/MisDataDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/MisDataDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync/Etl/Models/JsonColumnSchema.cs b/NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs similarity index 91% rename from NEW/src/JdeScoping.DataSync/Etl/Models/JsonColumnSchema.cs rename to NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs index 2183968..276080b 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Models/JsonColumnSchema.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/Models/JsonColumnSchema.cs @@ -1,4 +1,4 @@ -namespace JdeScoping.DataSync.Etl.Models; +namespace JdeScoping.DataSync.Dev.Models; /// /// Defines a column schema for JSON-to-DataReader mapping. diff --git a/NEW/src/JdeScoping.DataSync.Dev/OrgHierarchyDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/OrgHierarchyDevEtl.cs index f198da0..45bf0a7 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/OrgHierarchyDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/OrgHierarchyDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/ProfitCenterDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/ProfitCenterDevEtl.cs index 7dbb96a..d3ac3e1 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/ProfitCenterDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/ProfitCenterDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/RouteMasterDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/RouteMasterDevEtl.cs index 04f8116..717b1ad 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/RouteMasterDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/RouteMasterDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs b/NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs similarity index 96% rename from NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs rename to NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs index 7d51035..6ce7661 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/Sources/JsonStreamingDataReader.cs @@ -1,8 +1,8 @@ using System.Data; using System.Text.Json; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; -namespace JdeScoping.DataSync.Etl.Sources; +namespace JdeScoping.DataSync.Dev.Sources; /// /// Streams a JSON array as an IDataReader, parsing one object at a time. diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs b/NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs similarity index 51% rename from NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs rename to NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs index d7f7204..63ca303 100644 --- a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonZstdFileSource.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/Sources/JsonZstdFileSource.cs @@ -1,24 +1,36 @@ using System.Data; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Contracts; -using JdeScoping.DataSync.Etl.Models; using ZstdSharp; -namespace JdeScoping.DataSync.Etl.Sources; +namespace JdeScoping.DataSync.Dev.Sources; /// /// Import source that reads from a zstd-compressed JSON array file. +/// Uses high-performance Utf8JsonReader for parsing. /// public sealed class JsonZstdFileSource : IImportSource { + private const int FileBufferSize = 256 * 1024; // 256 KB + private const int DecompressBufferSize = 256 * 1024; // 256 KB + private readonly string _filePath; private readonly JsonColumnSchema[] _schema; + private readonly bool _useHighPerformanceReader; private FileStream? _fileStream; private DecompressionStream? _decompressionStream; - private JsonStreamingDataReader? _reader; + private BufferedStream? _bufferedStream; + private IDataReader? _reader; public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}"; - public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema) + /// + /// Creates a new source for reading zstd-compressed JSON files. + /// + /// Path to the .json.zstd file. + /// Column schema for the JSON objects. + /// Use Utf8JsonReader (true) or legacy JsonDocument reader (false). + public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema, bool useHighPerformanceReader = true) { if (string.IsNullOrWhiteSpace(filePath)) throw new ArgumentException("File path cannot be null or empty.", nameof(filePath)); @@ -28,6 +40,7 @@ public sealed class JsonZstdFileSource : IImportSource _filePath = filePath; _schema = schema ?? throw new ArgumentNullException(nameof(schema)); + _useHighPerformanceReader = useHighPerformanceReader; } public Task ReadDataAsync(CancellationToken cancellationToken = default) @@ -37,20 +50,35 @@ public sealed class JsonZstdFileSource : IImportSource try { - _fileStream = new FileStream(_filePath, FileMode.Open, FileAccess.Read, FileShare.Read, - bufferSize: 65536, useAsync: true); - _decompressionStream = new DecompressionStream(_fileStream); - _reader = new JsonStreamingDataReader(_decompressionStream, _schema); + // Use SequentialScan hint for OS read-ahead optimization + _fileStream = new FileStream( + _filePath, + FileMode.Open, + FileAccess.Read, + FileShare.Read, + bufferSize: FileBufferSize, + FileOptions.SequentialScan); - return Task.FromResult(_reader); + _decompressionStream = new DecompressionStream(_fileStream); + + // Wrap in BufferedStream for smoother decompression throughput + _bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize); + + _reader = _useHighPerformanceReader + ? new Utf8JsonStreamingDataReader(_bufferedStream, _schema) + : new JsonStreamingDataReader(_bufferedStream, _schema); + + return Task.FromResult(_reader); } catch { // Clean up on failure _reader?.Dispose(); + _bufferedStream?.Dispose(); _decompressionStream?.Dispose(); _fileStream?.Dispose(); _reader = null; + _bufferedStream = null; _decompressionStream = null; _fileStream = null; throw; @@ -65,6 +93,12 @@ public sealed class JsonZstdFileSource : IImportSource _reader = null; } + if (_bufferedStream != null) + { + await _bufferedStream.DisposeAsync(); + _bufferedStream = null; + } + if (_decompressionStream != null) { await _decompressionStream.DisposeAsync(); diff --git a/NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs b/NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs new file mode 100644 index 0000000..9880374 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync.Dev/Sources/Utf8JsonStreamingDataReader.cs @@ -0,0 +1,374 @@ +using System.Buffers; +using System.Data; +using System.Globalization; +using System.Text; +using System.Text.Json; +using JdeScoping.DataSync.Dev.Models; + +namespace JdeScoping.DataSync.Dev.Sources; + +/// +/// High-performance streaming JSON array reader using Utf8JsonReader. +/// Buffers complete JSON objects then parses with zero-allocation property matching. +/// +internal sealed class Utf8JsonStreamingDataReader : IDataReader +{ + private const int InitialObjectBufferSize = 8 * 1024; // 8 KB per object + + private readonly Stream _stream; + private readonly StreamReader _streamReader; + private readonly JsonColumnSchema[] _schema; + private readonly Dictionary _nameToOrdinal; + private readonly byte[][] _encodedColumnNames; + private byte[] _objectBuffer; + private object?[] _currentRow; + private bool _disposed; + private bool _started; + private bool _finished; + + public Utf8JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _schema = schema ?? throw new ArgumentNullException(nameof(schema)); + + // Use larger buffer for StreamReader + _streamReader = new StreamReader(stream, Encoding.UTF8, detectEncodingFromByteOrderMarks: false, + bufferSize: 256 * 1024, leaveOpen: true); + + _objectBuffer = ArrayPool.Shared.Rent(InitialObjectBufferSize); + _currentRow = new object?[schema.Length]; + + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + _encodedColumnNames = new byte[schema.Length][]; + for (int i = 0; i < schema.Length; i++) + { + _nameToOrdinal[schema[i].Name] = i; + _encodedColumnNames[i] = Encoding.UTF8.GetBytes(schema[i].Name); + } + } + + public int FieldCount => _schema.Length; + public int Depth => 0; + public bool IsClosed => _disposed; + public int RecordsAffected => -1; + + public object this[int ordinal] => GetValue(ordinal); + public object this[string name] => GetValue(GetOrdinal(name)); + + public string GetName(int ordinal) => _schema[ordinal].Name; + public int GetOrdinal(string name) => _nameToOrdinal.TryGetValue(name, out var ordinal) + ? ordinal + : throw new IndexOutOfRangeException($"Column '{name}' not found."); + + public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType; + public string GetDataTypeName(int ordinal) => _schema[ordinal].SqlTypeName; + + public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value; + public bool IsDBNull(int ordinal) => _currentRow[ordinal] is null; + + public bool Read() + { + if (_disposed || _finished) return false; + + try + { + // Skip to start of array on first read + if (!_started) + { + SkipWhitespaceAndExpect('['); + _started = true; + } + + // Check for end of array or next object + SkipWhitespace(); + var next = (char)_streamReader.Peek(); + + if (next == ']') + { + _finished = true; + return false; + } + + if (next == ',') + { + _streamReader.Read(); // consume comma + SkipWhitespace(); + } + + // Read complete JSON object into buffer + var objectBytes = ReadJsonObjectBytes(); + if (objectBytes.Length == 0) + { + _finished = true; + return false; + } + + // Parse with Utf8JsonReader (single-shot, no partial state) + ParseJsonObject(objectBytes); + return true; + } + catch (JsonException ex) + { + throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex); + } + } + + private ReadOnlySpan ReadJsonObjectBytes() + { + SkipWhitespace(); + if (_streamReader.Peek() == -1 || (char)_streamReader.Peek() == ']') + return ReadOnlySpan.Empty; + + // Read characters until we have a complete JSON object + int braceCount = 0; + bool inString = false; + bool escaped = false; + int bytesWritten = 0; + + while (true) + { + int c = _streamReader.Read(); + if (c == -1) break; + + char ch = (char)c; + + // Ensure buffer capacity (grow if needed) + int bytesNeeded = Encoding.UTF8.GetMaxByteCount(1); + if (bytesWritten + bytesNeeded > _objectBuffer.Length) + { + var newBuffer = ArrayPool.Shared.Rent(_objectBuffer.Length * 2); + Buffer.BlockCopy(_objectBuffer, 0, newBuffer, 0, bytesWritten); + ArrayPool.Shared.Return(_objectBuffer); + _objectBuffer = newBuffer; + } + + // Encode character to buffer + bytesWritten += Encoding.UTF8.GetBytes([ch], _objectBuffer.AsSpan(bytesWritten)); + + if (escaped) + { + escaped = false; + continue; + } + + if (ch == '\\' && inString) + { + escaped = true; + continue; + } + + if (ch == '"') + { + inString = !inString; + continue; + } + + if (!inString) + { + if (ch == '{') braceCount++; + else if (ch == '}') + { + braceCount--; + if (braceCount == 0) break; + } + } + } + + return _objectBuffer.AsSpan(0, bytesWritten); + } + + private void ParseJsonObject(ReadOnlySpan objectBytes) + { + var reader = new Utf8JsonReader(objectBytes); + + // Move to StartObject + if (!reader.Read() || reader.TokenType != JsonTokenType.StartObject) + throw new InvalidDataException($"Expected object, got {reader.TokenType}."); + + // Clear current row + Array.Clear(_currentRow); + + // Parse object properties + while (reader.Read() && reader.TokenType != JsonTokenType.EndObject) + { + if (reader.TokenType != JsonTokenType.PropertyName) + continue; + + // Find matching column using pre-encoded names (avoids string allocation) + int ordinal = -1; + for (int i = 0; i < _encodedColumnNames.Length; i++) + { + if (reader.ValueTextEquals(_encodedColumnNames[i])) + { + ordinal = i; + break; + } + } + + // Read the value + if (!reader.Read()) + break; + + if (ordinal >= 0) + { + _currentRow[ordinal] = ParseValue(ref reader, _schema[ordinal].ClrType); + } + else + { + // Skip unknown property value + reader.Skip(); + } + } + } + + private void SkipWhitespace() + { + while (_streamReader.Peek() != -1 && char.IsWhiteSpace((char)_streamReader.Peek())) + { + _streamReader.Read(); + } + } + + private void SkipWhitespaceAndExpect(char expected) + { + SkipWhitespace(); + var actual = (char)_streamReader.Read(); + if (actual != expected) + throw new InvalidDataException($"Expected '{expected}' but found '{actual}'."); + } + + private static object? ParseValue(ref Utf8JsonReader reader, Type targetType) + { + if (reader.TokenType == JsonTokenType.Null) + return null; + + if (targetType == typeof(string)) + return reader.GetString(); + + if (targetType == typeof(int)) + { + if (reader.TokenType == JsonTokenType.Number) + return reader.TryGetInt32(out var i) ? i : (int)reader.GetDouble(); + return null; + } + + if (targetType == typeof(long)) + { + if (reader.TokenType == JsonTokenType.Number) + return reader.TryGetInt64(out var l) ? l : (long)reader.GetDouble(); + return null; + } + + if (targetType == typeof(decimal)) + { + if (reader.TokenType == JsonTokenType.Number) + return reader.TryGetDecimal(out var d) ? d : (decimal)reader.GetDouble(); + return null; + } + + if (targetType == typeof(DateTime)) + { + if (reader.TokenType == JsonTokenType.String) + { + var str = reader.GetString(); + if (str != null) + return DateTime.Parse(str, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind); + } + return null; + } + + if (targetType == typeof(bool)) + { + if (reader.TokenType == JsonTokenType.True) + return true; + if (reader.TokenType == JsonTokenType.False) + return false; + return null; + } + + if (targetType == typeof(byte[])) + { + if (reader.TokenType == JsonTokenType.String) + return reader.GetBytesFromBase64(); + return null; + } + + if (targetType == typeof(double)) + { + if (reader.TokenType == JsonTokenType.Number) + return reader.GetDouble(); + return null; + } + + throw new NotSupportedException($"Type {targetType.Name} is not supported."); + } + + // IDataReader methods - typed getters + public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal); + public byte GetByte(int ordinal) => (byte)GetValue(ordinal); + public long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length) + { + var data = (byte[])GetValue(ordinal); + if (buffer == null) return data.Length; + var toCopy = Math.Min(length, data.Length - (int)fieldOffset); + Array.Copy(data, fieldOffset, buffer, bufferOffset, toCopy); + return toCopy; + } + public char GetChar(int ordinal) => ((string)GetValue(ordinal))[0]; + public long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length) + { + var data = (string)GetValue(ordinal); + if (buffer == null) return data.Length; + var toCopy = Math.Min(length, data.Length - (int)fieldOffset); + data.CopyTo((int)fieldOffset, buffer, bufferOffset, toCopy); + return toCopy; + } + public IDataReader GetData(int ordinal) => throw new NotSupportedException(); + public DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal); + public decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal); + public double GetDouble(int ordinal) => (double)GetValue(ordinal); + public float GetFloat(int ordinal) => (float)GetValue(ordinal); + public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal); + public short GetInt16(int ordinal) => (short)GetValue(ordinal); + public int GetInt32(int ordinal) => (int)GetValue(ordinal); + public long GetInt64(int ordinal) => (long)GetValue(ordinal); + public string GetString(int ordinal) => (string)GetValue(ordinal); + public int GetValues(object[] values) + { + var count = Math.Min(values.Length, _currentRow.Length); + for (int i = 0; i < count; i++) + values[i] = GetValue(i); + return count; + } + + public DataTable GetSchemaTable() + { + var table = new DataTable("SchemaTable"); + table.Columns.Add("ColumnName", typeof(string)); + table.Columns.Add("ColumnOrdinal", typeof(int)); + table.Columns.Add("DataType", typeof(Type)); + table.Columns.Add("AllowDBNull", typeof(bool)); + + for (int i = 0; i < _schema.Length; i++) + { + table.Rows.Add(_schema[i].Name, i, _schema[i].ClrType, _schema[i].IsNullable); + } + + return table; + } + + public bool NextResult() => false; + + public void Close() => Dispose(); + + public void Dispose() + { + if (!_disposed) + { + ArrayPool.Shared.Return(_objectBuffer); + _objectBuffer = Array.Empty(); + _streamReader.Dispose(); + _disposed = true; + } + } +} diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkCenterDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkCenterDevEtl.cs index 21a121b..c0c9490 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkCenterDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkCenterDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentCurrDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentCurrDevEtl.cs index 04a6409..a24303f 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentCurrDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentCurrDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentHistDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentHistDevEtl.cs index e19ad48..365f2a8 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentHistDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderComponentHistDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderCurrDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderCurrDevEtl.cs index eab6217..c786cb4 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderCurrDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderCurrDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderHistDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderHistDevEtl.cs index 0c8b3fc..272d6c8 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderHistDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderHistDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderRoutingDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderRoutingDevEtl.cs index 1b01255..b9019d6 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderRoutingDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderRoutingDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepCurrDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepCurrDevEtl.cs index 52c7015..67d569b 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepCurrDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepCurrDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepHistDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepHistDevEtl.cs index 63b4be5..387e205 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepHistDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderStepHistDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeCurrDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeCurrDevEtl.cs index 14ff1f2..7d4088e 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeCurrDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeCurrDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev; diff --git a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeHistDevEtl.cs b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeHistDevEtl.cs index ef63f89..7d987cb 100644 --- a/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeHistDevEtl.cs +++ b/NEW/src/JdeScoping.DataSync.Dev/WorkOrderTimeHistDevEtl.cs @@ -1,8 +1,8 @@ using JdeScoping.DataAccess.Interfaces; using JdeScoping.DataSync.Etl.Destinations; -using JdeScoping.DataSync.Etl.Models; +using JdeScoping.DataSync.Dev.Models; using JdeScoping.DataSync.Etl.Pipeline; -using JdeScoping.DataSync.Etl.Sources; +using JdeScoping.DataSync.Dev.Sources; namespace JdeScoping.DataSync.Dev;