diff --git a/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs new file mode 100644 index 0000000..81e8596 --- /dev/null +++ b/NEW/src/JdeScoping.DataSync/Etl/Sources/JsonStreamingDataReader.cs @@ -0,0 +1,283 @@ +using System.Data; +using System.Text.Json; +using JdeScoping.DataSync.Etl.Models; + +namespace JdeScoping.DataSync.Etl.Sources; + +/// +/// Streams a JSON array as an IDataReader, parsing one object at a time. +/// +internal sealed class JsonStreamingDataReader : IDataReader +{ + private readonly Stream _stream; + private readonly StreamReader _streamReader; + private readonly JsonColumnSchema[] _schema; + private readonly Dictionary _nameToOrdinal; + private object?[] _currentRow; + private bool _disposed; + private bool _started; + private bool _finished; + + public JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema) + { + _stream = stream ?? throw new ArgumentNullException(nameof(stream)); + _schema = schema ?? throw new ArgumentNullException(nameof(schema)); + _streamReader = new StreamReader(stream); + _currentRow = new object?[schema.Length]; + + _nameToOrdinal = new Dictionary(StringComparer.OrdinalIgnoreCase); + for (int i = 0; i < schema.Length; i++) + { + _nameToOrdinal[schema[i].Name] = i; + } + } + + public int FieldCount => _schema.Length; + public int Depth => 0; + public bool IsClosed => _disposed; + public int RecordsAffected => -1; + + public object this[int ordinal] => GetValue(ordinal); + public object this[string name] => GetValue(GetOrdinal(name)); + + public string GetName(int ordinal) => _schema[ordinal].Name; + public int GetOrdinal(string name) => _nameToOrdinal.TryGetValue(name, out var ordinal) + ? ordinal + : throw new IndexOutOfRangeException($"Column '{name}' not found."); + + public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType; + public string GetDataTypeName(int ordinal) => _schema[ordinal].SqlTypeName; + + public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value; + public bool IsDBNull(int ordinal) => _currentRow[ordinal] is null; + + public bool Read() + { + if (_disposed || _finished) return false; + + try + { + // Skip to start of array on first read + if (!_started) + { + SkipWhitespaceAndExpect('['); + _started = true; + } + + // Check for end of array or next object + SkipWhitespace(); + var next = (char)_streamReader.Peek(); + + if (next == ']') + { + _finished = true; + return false; + } + + if (next == ',') + { + _streamReader.Read(); // consume comma + SkipWhitespace(); + } + + // Read the next JSON object + var jsonObject = ReadJsonObject(); + if (jsonObject == null) + { + _finished = true; + return false; + } + + // Map JSON properties to row + Array.Clear(_currentRow); + foreach (var property in jsonObject.RootElement.EnumerateObject()) + { + if (_nameToOrdinal.TryGetValue(property.Name, out var ordinal)) + { + _currentRow[ordinal] = ParseValue(property.Value, _schema[ordinal].ClrType); + } + } + + return true; + } + catch (JsonException ex) + { + throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex); + } + } + + private JsonDocument? ReadJsonObject() + { + SkipWhitespace(); + if (_streamReader.Peek() == -1 || (char)_streamReader.Peek() == ']') + return null; + + // Read characters until we have a complete JSON object + var buffer = new System.Text.StringBuilder(); + int braceCount = 0; + bool inString = false; + bool escaped = false; + + while (true) + { + int c = _streamReader.Read(); + if (c == -1) break; + + char ch = (char)c; + buffer.Append(ch); + + if (escaped) + { + escaped = false; + continue; + } + + if (ch == '\\' && inString) + { + escaped = true; + continue; + } + + if (ch == '"') + { + inString = !inString; + continue; + } + + if (!inString) + { + if (ch == '{') braceCount++; + else if (ch == '}') + { + braceCount--; + if (braceCount == 0) break; + } + } + } + + var json = buffer.ToString().Trim(); + if (string.IsNullOrEmpty(json) || json == "]") + return null; + + return JsonDocument.Parse(json); + } + + private static object? ParseValue(JsonElement element, Type targetType) + { + if (element.ValueKind == JsonValueKind.Null) + return null; + + if (targetType == typeof(string)) + return element.GetString(); + + if (targetType == typeof(int)) + return element.TryGetInt32(out var i) ? i : (int)element.GetDouble(); + + if (targetType == typeof(long)) + return element.TryGetInt64(out var l) ? l : (long)element.GetDouble(); + + if (targetType == typeof(decimal)) + return element.TryGetDecimal(out var d) ? d : (decimal)element.GetDouble(); + + if (targetType == typeof(DateTime)) + { + if (element.ValueKind == JsonValueKind.String) + return DateTime.Parse(element.GetString()!, null, System.Globalization.DateTimeStyles.RoundtripKind); + return element.GetDateTime(); + } + + if (targetType == typeof(bool)) + return element.GetBoolean(); + + if (targetType == typeof(byte[])) + return element.GetBytesFromBase64(); + + if (targetType == typeof(double)) + return element.GetDouble(); + + throw new NotSupportedException($"Type {targetType.Name} is not supported."); + } + + private void SkipWhitespace() + { + while (_streamReader.Peek() != -1 && char.IsWhiteSpace((char)_streamReader.Peek())) + { + _streamReader.Read(); + } + } + + private void SkipWhitespaceAndExpect(char expected) + { + SkipWhitespace(); + var actual = (char)_streamReader.Read(); + if (actual != expected) + throw new InvalidDataException($"Expected '{expected}' but found '{actual}'."); + } + + // IDataReader methods - typed getters + public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal); + public byte GetByte(int ordinal) => (byte)GetValue(ordinal); + public long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length) + { + var data = (byte[])GetValue(ordinal); + if (buffer == null) return data.Length; + var toCopy = Math.Min(length, data.Length - (int)fieldOffset); + Array.Copy(data, fieldOffset, buffer, bufferOffset, toCopy); + return toCopy; + } + public char GetChar(int ordinal) => ((string)GetValue(ordinal))[0]; + public long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length) + { + var data = (string)GetValue(ordinal); + if (buffer == null) return data.Length; + var toCopy = Math.Min(length, data.Length - (int)fieldOffset); + data.CopyTo((int)fieldOffset, buffer, bufferOffset, toCopy); + return toCopy; + } + public IDataReader GetData(int ordinal) => throw new NotSupportedException(); + public DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal); + public decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal); + public double GetDouble(int ordinal) => (double)GetValue(ordinal); + public float GetFloat(int ordinal) => (float)GetValue(ordinal); + public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal); + public short GetInt16(int ordinal) => (short)GetValue(ordinal); + public int GetInt32(int ordinal) => (int)GetValue(ordinal); + public long GetInt64(int ordinal) => (long)GetValue(ordinal); + public string GetString(int ordinal) => (string)GetValue(ordinal); + public int GetValues(object[] values) + { + var count = Math.Min(values.Length, _currentRow.Length); + for (int i = 0; i < count; i++) + values[i] = GetValue(i); + return count; + } + + public DataTable GetSchemaTable() + { + var table = new DataTable("SchemaTable"); + table.Columns.Add("ColumnName", typeof(string)); + table.Columns.Add("ColumnOrdinal", typeof(int)); + table.Columns.Add("DataType", typeof(Type)); + table.Columns.Add("AllowDBNull", typeof(bool)); + + for (int i = 0; i < _schema.Length; i++) + { + table.Rows.Add(_schema[i].Name, i, _schema[i].ClrType, _schema[i].IsNullable); + } + + return table; + } + + public bool NextResult() => false; + + public void Close() => Dispose(); + + public void Dispose() + { + if (!_disposed) + { + _streamReader.Dispose(); + _disposed = true; + } + } +}