feat(datasync): add JsonStreamingDataReader for streaming JSON array parsing
This commit is contained in:
@@ -0,0 +1,283 @@
|
||||
using System.Data;
|
||||
using System.Text.Json;
|
||||
using JdeScoping.DataSync.Etl.Models;
|
||||
|
||||
namespace JdeScoping.DataSync.Etl.Sources;
|
||||
|
||||
/// <summary>
|
||||
/// Streams a JSON array as an IDataReader, parsing one object at a time.
|
||||
/// </summary>
|
||||
internal sealed class JsonStreamingDataReader : IDataReader
|
||||
{
|
||||
private readonly Stream _stream;
|
||||
private readonly StreamReader _streamReader;
|
||||
private readonly JsonColumnSchema[] _schema;
|
||||
private readonly Dictionary<string, int> _nameToOrdinal;
|
||||
private object?[] _currentRow;
|
||||
private bool _disposed;
|
||||
private bool _started;
|
||||
private bool _finished;
|
||||
|
||||
public JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema)
|
||||
{
|
||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
||||
_streamReader = new StreamReader(stream);
|
||||
_currentRow = new object?[schema.Length];
|
||||
|
||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
||||
for (int i = 0; i < schema.Length; i++)
|
||||
{
|
||||
_nameToOrdinal[schema[i].Name] = i;
|
||||
}
|
||||
}
|
||||
|
||||
public int FieldCount => _schema.Length;
|
||||
public int Depth => 0;
|
||||
public bool IsClosed => _disposed;
|
||||
public int RecordsAffected => -1;
|
||||
|
||||
public object this[int ordinal] => GetValue(ordinal);
|
||||
public object this[string name] => GetValue(GetOrdinal(name));
|
||||
|
||||
public string GetName(int ordinal) => _schema[ordinal].Name;
|
||||
public int GetOrdinal(string name) => _nameToOrdinal.TryGetValue(name, out var ordinal)
|
||||
? ordinal
|
||||
: throw new IndexOutOfRangeException($"Column '{name}' not found.");
|
||||
|
||||
public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType;
|
||||
public string GetDataTypeName(int ordinal) => _schema[ordinal].SqlTypeName;
|
||||
|
||||
public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value;
|
||||
public bool IsDBNull(int ordinal) => _currentRow[ordinal] is null;
|
||||
|
||||
public bool Read()
|
||||
{
|
||||
if (_disposed || _finished) return false;
|
||||
|
||||
try
|
||||
{
|
||||
// Skip to start of array on first read
|
||||
if (!_started)
|
||||
{
|
||||
SkipWhitespaceAndExpect('[');
|
||||
_started = true;
|
||||
}
|
||||
|
||||
// Check for end of array or next object
|
||||
SkipWhitespace();
|
||||
var next = (char)_streamReader.Peek();
|
||||
|
||||
if (next == ']')
|
||||
{
|
||||
_finished = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
if (next == ',')
|
||||
{
|
||||
_streamReader.Read(); // consume comma
|
||||
SkipWhitespace();
|
||||
}
|
||||
|
||||
// Read the next JSON object
|
||||
var jsonObject = ReadJsonObject();
|
||||
if (jsonObject == null)
|
||||
{
|
||||
_finished = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
// Map JSON properties to row
|
||||
Array.Clear(_currentRow);
|
||||
foreach (var property in jsonObject.RootElement.EnumerateObject())
|
||||
{
|
||||
if (_nameToOrdinal.TryGetValue(property.Name, out var ordinal))
|
||||
{
|
||||
_currentRow[ordinal] = ParseValue(property.Value, _schema[ordinal].ClrType);
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
catch (JsonException ex)
|
||||
{
|
||||
throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex);
|
||||
}
|
||||
}
|
||||
|
||||
private JsonDocument? ReadJsonObject()
|
||||
{
|
||||
SkipWhitespace();
|
||||
if (_streamReader.Peek() == -1 || (char)_streamReader.Peek() == ']')
|
||||
return null;
|
||||
|
||||
// Read characters until we have a complete JSON object
|
||||
var buffer = new System.Text.StringBuilder();
|
||||
int braceCount = 0;
|
||||
bool inString = false;
|
||||
bool escaped = false;
|
||||
|
||||
while (true)
|
||||
{
|
||||
int c = _streamReader.Read();
|
||||
if (c == -1) break;
|
||||
|
||||
char ch = (char)c;
|
||||
buffer.Append(ch);
|
||||
|
||||
if (escaped)
|
||||
{
|
||||
escaped = false;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ch == '\\' && inString)
|
||||
{
|
||||
escaped = true;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (ch == '"')
|
||||
{
|
||||
inString = !inString;
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!inString)
|
||||
{
|
||||
if (ch == '{') braceCount++;
|
||||
else if (ch == '}')
|
||||
{
|
||||
braceCount--;
|
||||
if (braceCount == 0) break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var json = buffer.ToString().Trim();
|
||||
if (string.IsNullOrEmpty(json) || json == "]")
|
||||
return null;
|
||||
|
||||
return JsonDocument.Parse(json);
|
||||
}
|
||||
|
||||
private static object? ParseValue(JsonElement element, Type targetType)
|
||||
{
|
||||
if (element.ValueKind == JsonValueKind.Null)
|
||||
return null;
|
||||
|
||||
if (targetType == typeof(string))
|
||||
return element.GetString();
|
||||
|
||||
if (targetType == typeof(int))
|
||||
return element.TryGetInt32(out var i) ? i : (int)element.GetDouble();
|
||||
|
||||
if (targetType == typeof(long))
|
||||
return element.TryGetInt64(out var l) ? l : (long)element.GetDouble();
|
||||
|
||||
if (targetType == typeof(decimal))
|
||||
return element.TryGetDecimal(out var d) ? d : (decimal)element.GetDouble();
|
||||
|
||||
if (targetType == typeof(DateTime))
|
||||
{
|
||||
if (element.ValueKind == JsonValueKind.String)
|
||||
return DateTime.Parse(element.GetString()!, null, System.Globalization.DateTimeStyles.RoundtripKind);
|
||||
return element.GetDateTime();
|
||||
}
|
||||
|
||||
if (targetType == typeof(bool))
|
||||
return element.GetBoolean();
|
||||
|
||||
if (targetType == typeof(byte[]))
|
||||
return element.GetBytesFromBase64();
|
||||
|
||||
if (targetType == typeof(double))
|
||||
return element.GetDouble();
|
||||
|
||||
throw new NotSupportedException($"Type {targetType.Name} is not supported.");
|
||||
}
|
||||
|
||||
private void SkipWhitespace()
|
||||
{
|
||||
while (_streamReader.Peek() != -1 && char.IsWhiteSpace((char)_streamReader.Peek()))
|
||||
{
|
||||
_streamReader.Read();
|
||||
}
|
||||
}
|
||||
|
||||
private void SkipWhitespaceAndExpect(char expected)
|
||||
{
|
||||
SkipWhitespace();
|
||||
var actual = (char)_streamReader.Read();
|
||||
if (actual != expected)
|
||||
throw new InvalidDataException($"Expected '{expected}' but found '{actual}'.");
|
||||
}
|
||||
|
||||
// IDataReader methods - typed getters
|
||||
public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);
|
||||
public byte GetByte(int ordinal) => (byte)GetValue(ordinal);
|
||||
public long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length)
|
||||
{
|
||||
var data = (byte[])GetValue(ordinal);
|
||||
if (buffer == null) return data.Length;
|
||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
||||
Array.Copy(data, fieldOffset, buffer, bufferOffset, toCopy);
|
||||
return toCopy;
|
||||
}
|
||||
public char GetChar(int ordinal) => ((string)GetValue(ordinal))[0];
|
||||
public long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length)
|
||||
{
|
||||
var data = (string)GetValue(ordinal);
|
||||
if (buffer == null) return data.Length;
|
||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
||||
data.CopyTo((int)fieldOffset, buffer, bufferOffset, toCopy);
|
||||
return toCopy;
|
||||
}
|
||||
public IDataReader GetData(int ordinal) => throw new NotSupportedException();
|
||||
public DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
|
||||
public decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
|
||||
public double GetDouble(int ordinal) => (double)GetValue(ordinal);
|
||||
public float GetFloat(int ordinal) => (float)GetValue(ordinal);
|
||||
public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
|
||||
public short GetInt16(int ordinal) => (short)GetValue(ordinal);
|
||||
public int GetInt32(int ordinal) => (int)GetValue(ordinal);
|
||||
public long GetInt64(int ordinal) => (long)GetValue(ordinal);
|
||||
public string GetString(int ordinal) => (string)GetValue(ordinal);
|
||||
public int GetValues(object[] values)
|
||||
{
|
||||
var count = Math.Min(values.Length, _currentRow.Length);
|
||||
for (int i = 0; i < count; i++)
|
||||
values[i] = GetValue(i);
|
||||
return count;
|
||||
}
|
||||
|
||||
public DataTable GetSchemaTable()
|
||||
{
|
||||
var table = new DataTable("SchemaTable");
|
||||
table.Columns.Add("ColumnName", typeof(string));
|
||||
table.Columns.Add("ColumnOrdinal", typeof(int));
|
||||
table.Columns.Add("DataType", typeof(Type));
|
||||
table.Columns.Add("AllowDBNull", typeof(bool));
|
||||
|
||||
for (int i = 0; i < _schema.Length; i++)
|
||||
{
|
||||
table.Rows.Add(_schema[i].Name, i, _schema[i].ClrType, _schema[i].IsNullable);
|
||||
}
|
||||
|
||||
return table;
|
||||
}
|
||||
|
||||
public bool NextResult() => false;
|
||||
|
||||
public void Close() => Dispose();
|
||||
|
||||
public void Dispose()
|
||||
{
|
||||
if (!_disposed)
|
||||
{
|
||||
_streamReader.Dispose();
|
||||
_disposed = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user