chore: remove obsolete JSON source files
This commit is contained in:
@@ -1,25 +0,0 @@
|
|||||||
namespace JdeScoping.DataSync.Dev.Models;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Defines a column schema for JSON-to-DataReader mapping.
|
|
||||||
/// </summary>
|
|
||||||
public record JsonColumnSchema(
|
|
||||||
string Name,
|
|
||||||
Type ClrType,
|
|
||||||
bool IsNullable = true)
|
|
||||||
{
|
|
||||||
/// <summary>
|
|
||||||
/// Gets the SQL type name for this column (used in error messages).
|
|
||||||
/// </summary>
|
|
||||||
public string SqlTypeName => ClrType switch
|
|
||||||
{
|
|
||||||
Type t when t == typeof(string) => "VARCHAR",
|
|
||||||
Type t when t == typeof(int) => "INT",
|
|
||||||
Type t when t == typeof(long) => "BIGINT",
|
|
||||||
Type t when t == typeof(decimal) => "DECIMAL",
|
|
||||||
Type t when t == typeof(DateTime) => "DATETIME2",
|
|
||||||
Type t when t == typeof(bool) => "BIT",
|
|
||||||
Type t when t == typeof(byte[]) => "VARBINARY",
|
|
||||||
_ => "UNKNOWN"
|
|
||||||
};
|
|
||||||
}
|
|
||||||
@@ -1,283 +0,0 @@
|
|||||||
using System.Data;
|
|
||||||
using System.Text.Json;
|
|
||||||
using JdeScoping.DataSync.Dev.Models;
|
|
||||||
|
|
||||||
namespace JdeScoping.DataSync.Dev.Sources;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Streams a JSON array as an IDataReader, parsing one object at a time.
|
|
||||||
/// </summary>
|
|
||||||
internal sealed class JsonStreamingDataReader : IDataReader
|
|
||||||
{
|
|
||||||
private readonly Stream _stream;
|
|
||||||
private readonly StreamReader _streamReader;
|
|
||||||
private readonly JsonColumnSchema[] _schema;
|
|
||||||
private readonly Dictionary<string, int> _nameToOrdinal;
|
|
||||||
private object?[] _currentRow;
|
|
||||||
private bool _disposed;
|
|
||||||
private bool _started;
|
|
||||||
private bool _finished;
|
|
||||||
|
|
||||||
public JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema)
|
|
||||||
{
|
|
||||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
|
||||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
|
||||||
_streamReader = new StreamReader(stream);
|
|
||||||
_currentRow = new object?[schema.Length];
|
|
||||||
|
|
||||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
|
||||||
for (int i = 0; i < schema.Length; i++)
|
|
||||||
{
|
|
||||||
_nameToOrdinal[schema[i].Name] = i;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int FieldCount => _schema.Length;
|
|
||||||
public int Depth => 0;
|
|
||||||
public bool IsClosed => _disposed;
|
|
||||||
public int RecordsAffected => -1;
|
|
||||||
|
|
||||||
public object this[int ordinal] => GetValue(ordinal);
|
|
||||||
public object this[string name] => GetValue(GetOrdinal(name));
|
|
||||||
|
|
||||||
public string GetName(int ordinal) => _schema[ordinal].Name;
|
|
||||||
public int GetOrdinal(string name) => _nameToOrdinal.TryGetValue(name, out var ordinal)
|
|
||||||
? ordinal
|
|
||||||
: throw new IndexOutOfRangeException($"Column '{name}' not found.");
|
|
||||||
|
|
||||||
public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType;
|
|
||||||
public string GetDataTypeName(int ordinal) => _schema[ordinal].SqlTypeName;
|
|
||||||
|
|
||||||
public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value;
|
|
||||||
public bool IsDBNull(int ordinal) => _currentRow[ordinal] is null;
|
|
||||||
|
|
||||||
public bool Read()
|
|
||||||
{
|
|
||||||
if (_disposed || _finished) return false;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// Skip to start of array on first read
|
|
||||||
if (!_started)
|
|
||||||
{
|
|
||||||
SkipWhitespaceAndExpect('[');
|
|
||||||
_started = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for end of array or next object
|
|
||||||
SkipWhitespace();
|
|
||||||
var next = (char)_streamReader.Peek();
|
|
||||||
|
|
||||||
if (next == ']')
|
|
||||||
{
|
|
||||||
_finished = true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (next == ',')
|
|
||||||
{
|
|
||||||
_streamReader.Read(); // consume comma
|
|
||||||
SkipWhitespace();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the next JSON object
|
|
||||||
using var jsonObject = ReadJsonObject();
|
|
||||||
if (jsonObject == null)
|
|
||||||
{
|
|
||||||
_finished = true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Map JSON properties to row
|
|
||||||
Array.Clear(_currentRow);
|
|
||||||
foreach (var property in jsonObject.RootElement.EnumerateObject())
|
|
||||||
{
|
|
||||||
if (_nameToOrdinal.TryGetValue(property.Name, out var ordinal))
|
|
||||||
{
|
|
||||||
_currentRow[ordinal] = ParseValue(property.Value, _schema[ordinal].ClrType);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
catch (JsonException ex)
|
|
||||||
{
|
|
||||||
throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private JsonDocument? ReadJsonObject()
|
|
||||||
{
|
|
||||||
SkipWhitespace();
|
|
||||||
if (_streamReader.Peek() == -1 || (char)_streamReader.Peek() == ']')
|
|
||||||
return null;
|
|
||||||
|
|
||||||
// Read characters until we have a complete JSON object
|
|
||||||
var buffer = new System.Text.StringBuilder();
|
|
||||||
int braceCount = 0;
|
|
||||||
bool inString = false;
|
|
||||||
bool escaped = false;
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
int c = _streamReader.Read();
|
|
||||||
if (c == -1) break;
|
|
||||||
|
|
||||||
char ch = (char)c;
|
|
||||||
buffer.Append(ch);
|
|
||||||
|
|
||||||
if (escaped)
|
|
||||||
{
|
|
||||||
escaped = false;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ch == '\\' && inString)
|
|
||||||
{
|
|
||||||
escaped = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ch == '"')
|
|
||||||
{
|
|
||||||
inString = !inString;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!inString)
|
|
||||||
{
|
|
||||||
if (ch == '{') braceCount++;
|
|
||||||
else if (ch == '}')
|
|
||||||
{
|
|
||||||
braceCount--;
|
|
||||||
if (braceCount == 0) break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
var json = buffer.ToString().Trim();
|
|
||||||
if (string.IsNullOrEmpty(json) || json == "]")
|
|
||||||
return null;
|
|
||||||
|
|
||||||
return JsonDocument.Parse(json);
|
|
||||||
}
|
|
||||||
|
|
||||||
private static object? ParseValue(JsonElement element, Type targetType)
|
|
||||||
{
|
|
||||||
if (element.ValueKind == JsonValueKind.Null)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
if (targetType == typeof(string))
|
|
||||||
return element.GetString();
|
|
||||||
|
|
||||||
if (targetType == typeof(int))
|
|
||||||
return element.TryGetInt32(out var i) ? i : (int)element.GetDouble();
|
|
||||||
|
|
||||||
if (targetType == typeof(long))
|
|
||||||
return element.TryGetInt64(out var l) ? l : (long)element.GetDouble();
|
|
||||||
|
|
||||||
if (targetType == typeof(decimal))
|
|
||||||
return element.TryGetDecimal(out var d) ? d : (decimal)element.GetDouble();
|
|
||||||
|
|
||||||
if (targetType == typeof(DateTime))
|
|
||||||
{
|
|
||||||
if (element.ValueKind == JsonValueKind.String)
|
|
||||||
return DateTime.Parse(element.GetString()!, null, System.Globalization.DateTimeStyles.RoundtripKind);
|
|
||||||
return element.GetDateTime();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(bool))
|
|
||||||
return element.GetBoolean();
|
|
||||||
|
|
||||||
if (targetType == typeof(byte[]))
|
|
||||||
return element.GetBytesFromBase64();
|
|
||||||
|
|
||||||
if (targetType == typeof(double))
|
|
||||||
return element.GetDouble();
|
|
||||||
|
|
||||||
throw new NotSupportedException($"Type {targetType.Name} is not supported.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private void SkipWhitespace()
|
|
||||||
{
|
|
||||||
while (_streamReader.Peek() != -1 && char.IsWhiteSpace((char)_streamReader.Peek()))
|
|
||||||
{
|
|
||||||
_streamReader.Read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void SkipWhitespaceAndExpect(char expected)
|
|
||||||
{
|
|
||||||
SkipWhitespace();
|
|
||||||
var actual = (char)_streamReader.Read();
|
|
||||||
if (actual != expected)
|
|
||||||
throw new InvalidDataException($"Expected '{expected}' but found '{actual}'.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// IDataReader methods - typed getters
|
|
||||||
public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);
|
|
||||||
public byte GetByte(int ordinal) => (byte)GetValue(ordinal);
|
|
||||||
public long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length)
|
|
||||||
{
|
|
||||||
var data = (byte[])GetValue(ordinal);
|
|
||||||
if (buffer == null) return data.Length;
|
|
||||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
|
||||||
Array.Copy(data, fieldOffset, buffer, bufferOffset, toCopy);
|
|
||||||
return toCopy;
|
|
||||||
}
|
|
||||||
public char GetChar(int ordinal) => ((string)GetValue(ordinal))[0];
|
|
||||||
public long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length)
|
|
||||||
{
|
|
||||||
var data = (string)GetValue(ordinal);
|
|
||||||
if (buffer == null) return data.Length;
|
|
||||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
|
||||||
data.CopyTo((int)fieldOffset, buffer, bufferOffset, toCopy);
|
|
||||||
return toCopy;
|
|
||||||
}
|
|
||||||
public IDataReader GetData(int ordinal) => throw new NotSupportedException();
|
|
||||||
public DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
|
|
||||||
public decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
|
|
||||||
public double GetDouble(int ordinal) => (double)GetValue(ordinal);
|
|
||||||
public float GetFloat(int ordinal) => (float)GetValue(ordinal);
|
|
||||||
public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
|
|
||||||
public short GetInt16(int ordinal) => (short)GetValue(ordinal);
|
|
||||||
public int GetInt32(int ordinal) => (int)GetValue(ordinal);
|
|
||||||
public long GetInt64(int ordinal) => (long)GetValue(ordinal);
|
|
||||||
public string GetString(int ordinal) => (string)GetValue(ordinal);
|
|
||||||
public int GetValues(object[] values)
|
|
||||||
{
|
|
||||||
var count = Math.Min(values.Length, _currentRow.Length);
|
|
||||||
for (int i = 0; i < count; i++)
|
|
||||||
values[i] = GetValue(i);
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DataTable GetSchemaTable()
|
|
||||||
{
|
|
||||||
var table = new DataTable("SchemaTable");
|
|
||||||
table.Columns.Add("ColumnName", typeof(string));
|
|
||||||
table.Columns.Add("ColumnOrdinal", typeof(int));
|
|
||||||
table.Columns.Add("DataType", typeof(Type));
|
|
||||||
table.Columns.Add("AllowDBNull", typeof(bool));
|
|
||||||
|
|
||||||
for (int i = 0; i < _schema.Length; i++)
|
|
||||||
{
|
|
||||||
table.Rows.Add(_schema[i].Name, i, _schema[i].ClrType, _schema[i].IsNullable);
|
|
||||||
}
|
|
||||||
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool NextResult() => false;
|
|
||||||
|
|
||||||
public void Close() => Dispose();
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
if (!_disposed)
|
|
||||||
{
|
|
||||||
_streamReader.Dispose();
|
|
||||||
_disposed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,114 +0,0 @@
|
|||||||
using System.Data;
|
|
||||||
using JdeScoping.DataSync.Dev.Models;
|
|
||||||
using JdeScoping.DataSync.Etl.Contracts;
|
|
||||||
using ZstdSharp;
|
|
||||||
|
|
||||||
namespace JdeScoping.DataSync.Dev.Sources;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Import source that reads from a zstd-compressed JSON array file.
|
|
||||||
/// Uses high-performance Utf8JsonReader for parsing.
|
|
||||||
/// </summary>
|
|
||||||
public sealed class JsonZstdFileSource : IImportSource
|
|
||||||
{
|
|
||||||
private const int FileBufferSize = 256 * 1024; // 256 KB
|
|
||||||
private const int DecompressBufferSize = 256 * 1024; // 256 KB
|
|
||||||
|
|
||||||
private readonly string _filePath;
|
|
||||||
private readonly JsonColumnSchema[] _schema;
|
|
||||||
private readonly bool _useHighPerformanceReader;
|
|
||||||
private FileStream? _fileStream;
|
|
||||||
private DecompressionStream? _decompressionStream;
|
|
||||||
private BufferedStream? _bufferedStream;
|
|
||||||
private IDataReader? _reader;
|
|
||||||
|
|
||||||
public string SourceName => $"JsonZstd:{Path.GetFileName(_filePath)}";
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// Creates a new source for reading zstd-compressed JSON files.
|
|
||||||
/// </summary>
|
|
||||||
/// <param name="filePath">Path to the .json.zstd file.</param>
|
|
||||||
/// <param name="schema">Column schema for the JSON objects.</param>
|
|
||||||
/// <param name="useHighPerformanceReader">Use Utf8JsonReader (true) or legacy JsonDocument reader (false).</param>
|
|
||||||
public JsonZstdFileSource(string filePath, JsonColumnSchema[] schema, bool useHighPerformanceReader = true)
|
|
||||||
{
|
|
||||||
if (string.IsNullOrWhiteSpace(filePath))
|
|
||||||
throw new ArgumentException("File path cannot be null or empty.", nameof(filePath));
|
|
||||||
|
|
||||||
if (!File.Exists(filePath))
|
|
||||||
throw new FileNotFoundException($"Cache file not found: {filePath}", filePath);
|
|
||||||
|
|
||||||
_filePath = filePath;
|
|
||||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
|
||||||
_useHighPerformanceReader = useHighPerformanceReader;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Task<IDataReader> ReadDataAsync(CancellationToken cancellationToken = default)
|
|
||||||
{
|
|
||||||
if (_fileStream != null)
|
|
||||||
throw new InvalidOperationException("ReadDataAsync has already been called. Dispose and create a new source to read again.");
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// Use SequentialScan hint for OS read-ahead optimization
|
|
||||||
_fileStream = new FileStream(
|
|
||||||
_filePath,
|
|
||||||
FileMode.Open,
|
|
||||||
FileAccess.Read,
|
|
||||||
FileShare.Read,
|
|
||||||
bufferSize: FileBufferSize,
|
|
||||||
FileOptions.SequentialScan);
|
|
||||||
|
|
||||||
_decompressionStream = new DecompressionStream(_fileStream);
|
|
||||||
|
|
||||||
// Wrap in BufferedStream for smoother decompression throughput
|
|
||||||
_bufferedStream = new BufferedStream(_decompressionStream, DecompressBufferSize);
|
|
||||||
|
|
||||||
_reader = _useHighPerformanceReader
|
|
||||||
? new Utf8JsonStreamingDataReader(_bufferedStream, _schema)
|
|
||||||
: new JsonStreamingDataReader(_bufferedStream, _schema);
|
|
||||||
|
|
||||||
return Task.FromResult(_reader);
|
|
||||||
}
|
|
||||||
catch
|
|
||||||
{
|
|
||||||
// Clean up on failure
|
|
||||||
_reader?.Dispose();
|
|
||||||
_bufferedStream?.Dispose();
|
|
||||||
_decompressionStream?.Dispose();
|
|
||||||
_fileStream?.Dispose();
|
|
||||||
_reader = null;
|
|
||||||
_bufferedStream = null;
|
|
||||||
_decompressionStream = null;
|
|
||||||
_fileStream = null;
|
|
||||||
throw;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public async ValueTask DisposeAsync()
|
|
||||||
{
|
|
||||||
if (_reader != null)
|
|
||||||
{
|
|
||||||
_reader.Dispose();
|
|
||||||
_reader = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_bufferedStream != null)
|
|
||||||
{
|
|
||||||
await _bufferedStream.DisposeAsync();
|
|
||||||
_bufferedStream = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_decompressionStream != null)
|
|
||||||
{
|
|
||||||
await _decompressionStream.DisposeAsync();
|
|
||||||
_decompressionStream = null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_fileStream != null)
|
|
||||||
{
|
|
||||||
await _fileStream.DisposeAsync();
|
|
||||||
_fileStream = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,374 +0,0 @@
|
|||||||
using System.Buffers;
|
|
||||||
using System.Data;
|
|
||||||
using System.Globalization;
|
|
||||||
using System.Text;
|
|
||||||
using System.Text.Json;
|
|
||||||
using JdeScoping.DataSync.Dev.Models;
|
|
||||||
|
|
||||||
namespace JdeScoping.DataSync.Dev.Sources;
|
|
||||||
|
|
||||||
/// <summary>
|
|
||||||
/// High-performance streaming JSON array reader using Utf8JsonReader.
|
|
||||||
/// Buffers complete JSON objects then parses with zero-allocation property matching.
|
|
||||||
/// </summary>
|
|
||||||
internal sealed class Utf8JsonStreamingDataReader : IDataReader
|
|
||||||
{
|
|
||||||
private const int InitialObjectBufferSize = 8 * 1024; // 8 KB per object
|
|
||||||
|
|
||||||
private readonly Stream _stream;
|
|
||||||
private readonly StreamReader _streamReader;
|
|
||||||
private readonly JsonColumnSchema[] _schema;
|
|
||||||
private readonly Dictionary<string, int> _nameToOrdinal;
|
|
||||||
private readonly byte[][] _encodedColumnNames;
|
|
||||||
private byte[] _objectBuffer;
|
|
||||||
private object?[] _currentRow;
|
|
||||||
private bool _disposed;
|
|
||||||
private bool _started;
|
|
||||||
private bool _finished;
|
|
||||||
|
|
||||||
public Utf8JsonStreamingDataReader(Stream stream, JsonColumnSchema[] schema)
|
|
||||||
{
|
|
||||||
_stream = stream ?? throw new ArgumentNullException(nameof(stream));
|
|
||||||
_schema = schema ?? throw new ArgumentNullException(nameof(schema));
|
|
||||||
|
|
||||||
// Use larger buffer for StreamReader
|
|
||||||
_streamReader = new StreamReader(stream, Encoding.UTF8, detectEncodingFromByteOrderMarks: false,
|
|
||||||
bufferSize: 256 * 1024, leaveOpen: true);
|
|
||||||
|
|
||||||
_objectBuffer = ArrayPool<byte>.Shared.Rent(InitialObjectBufferSize);
|
|
||||||
_currentRow = new object?[schema.Length];
|
|
||||||
|
|
||||||
_nameToOrdinal = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase);
|
|
||||||
_encodedColumnNames = new byte[schema.Length][];
|
|
||||||
for (int i = 0; i < schema.Length; i++)
|
|
||||||
{
|
|
||||||
_nameToOrdinal[schema[i].Name] = i;
|
|
||||||
_encodedColumnNames[i] = Encoding.UTF8.GetBytes(schema[i].Name);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public int FieldCount => _schema.Length;
|
|
||||||
public int Depth => 0;
|
|
||||||
public bool IsClosed => _disposed;
|
|
||||||
public int RecordsAffected => -1;
|
|
||||||
|
|
||||||
public object this[int ordinal] => GetValue(ordinal);
|
|
||||||
public object this[string name] => GetValue(GetOrdinal(name));
|
|
||||||
|
|
||||||
public string GetName(int ordinal) => _schema[ordinal].Name;
|
|
||||||
public int GetOrdinal(string name) => _nameToOrdinal.TryGetValue(name, out var ordinal)
|
|
||||||
? ordinal
|
|
||||||
: throw new IndexOutOfRangeException($"Column '{name}' not found.");
|
|
||||||
|
|
||||||
public Type GetFieldType(int ordinal) => _schema[ordinal].ClrType;
|
|
||||||
public string GetDataTypeName(int ordinal) => _schema[ordinal].SqlTypeName;
|
|
||||||
|
|
||||||
public object GetValue(int ordinal) => _currentRow[ordinal] ?? DBNull.Value;
|
|
||||||
public bool IsDBNull(int ordinal) => _currentRow[ordinal] is null;
|
|
||||||
|
|
||||||
public bool Read()
|
|
||||||
{
|
|
||||||
if (_disposed || _finished) return false;
|
|
||||||
|
|
||||||
try
|
|
||||||
{
|
|
||||||
// Skip to start of array on first read
|
|
||||||
if (!_started)
|
|
||||||
{
|
|
||||||
SkipWhitespaceAndExpect('[');
|
|
||||||
_started = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check for end of array or next object
|
|
||||||
SkipWhitespace();
|
|
||||||
var next = (char)_streamReader.Peek();
|
|
||||||
|
|
||||||
if (next == ']')
|
|
||||||
{
|
|
||||||
_finished = true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (next == ',')
|
|
||||||
{
|
|
||||||
_streamReader.Read(); // consume comma
|
|
||||||
SkipWhitespace();
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read complete JSON object into buffer
|
|
||||||
var objectBytes = ReadJsonObjectBytes();
|
|
||||||
if (objectBytes.Length == 0)
|
|
||||||
{
|
|
||||||
_finished = true;
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse with Utf8JsonReader (single-shot, no partial state)
|
|
||||||
ParseJsonObject(objectBytes);
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
catch (JsonException ex)
|
|
||||||
{
|
|
||||||
throw new InvalidDataException($"Failed to parse JSON: {ex.Message}", ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private ReadOnlySpan<byte> ReadJsonObjectBytes()
|
|
||||||
{
|
|
||||||
SkipWhitespace();
|
|
||||||
if (_streamReader.Peek() == -1 || (char)_streamReader.Peek() == ']')
|
|
||||||
return ReadOnlySpan<byte>.Empty;
|
|
||||||
|
|
||||||
// Read characters until we have a complete JSON object
|
|
||||||
int braceCount = 0;
|
|
||||||
bool inString = false;
|
|
||||||
bool escaped = false;
|
|
||||||
int bytesWritten = 0;
|
|
||||||
|
|
||||||
while (true)
|
|
||||||
{
|
|
||||||
int c = _streamReader.Read();
|
|
||||||
if (c == -1) break;
|
|
||||||
|
|
||||||
char ch = (char)c;
|
|
||||||
|
|
||||||
// Ensure buffer capacity (grow if needed)
|
|
||||||
int bytesNeeded = Encoding.UTF8.GetMaxByteCount(1);
|
|
||||||
if (bytesWritten + bytesNeeded > _objectBuffer.Length)
|
|
||||||
{
|
|
||||||
var newBuffer = ArrayPool<byte>.Shared.Rent(_objectBuffer.Length * 2);
|
|
||||||
Buffer.BlockCopy(_objectBuffer, 0, newBuffer, 0, bytesWritten);
|
|
||||||
ArrayPool<byte>.Shared.Return(_objectBuffer);
|
|
||||||
_objectBuffer = newBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Encode character to buffer
|
|
||||||
bytesWritten += Encoding.UTF8.GetBytes([ch], _objectBuffer.AsSpan(bytesWritten));
|
|
||||||
|
|
||||||
if (escaped)
|
|
||||||
{
|
|
||||||
escaped = false;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ch == '\\' && inString)
|
|
||||||
{
|
|
||||||
escaped = true;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ch == '"')
|
|
||||||
{
|
|
||||||
inString = !inString;
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!inString)
|
|
||||||
{
|
|
||||||
if (ch == '{') braceCount++;
|
|
||||||
else if (ch == '}')
|
|
||||||
{
|
|
||||||
braceCount--;
|
|
||||||
if (braceCount == 0) break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return _objectBuffer.AsSpan(0, bytesWritten);
|
|
||||||
}
|
|
||||||
|
|
||||||
private void ParseJsonObject(ReadOnlySpan<byte> objectBytes)
|
|
||||||
{
|
|
||||||
var reader = new Utf8JsonReader(objectBytes);
|
|
||||||
|
|
||||||
// Move to StartObject
|
|
||||||
if (!reader.Read() || reader.TokenType != JsonTokenType.StartObject)
|
|
||||||
throw new InvalidDataException($"Expected object, got {reader.TokenType}.");
|
|
||||||
|
|
||||||
// Clear current row
|
|
||||||
Array.Clear(_currentRow);
|
|
||||||
|
|
||||||
// Parse object properties
|
|
||||||
while (reader.Read() && reader.TokenType != JsonTokenType.EndObject)
|
|
||||||
{
|
|
||||||
if (reader.TokenType != JsonTokenType.PropertyName)
|
|
||||||
continue;
|
|
||||||
|
|
||||||
// Find matching column using pre-encoded names (avoids string allocation)
|
|
||||||
int ordinal = -1;
|
|
||||||
for (int i = 0; i < _encodedColumnNames.Length; i++)
|
|
||||||
{
|
|
||||||
if (reader.ValueTextEquals(_encodedColumnNames[i]))
|
|
||||||
{
|
|
||||||
ordinal = i;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Read the value
|
|
||||||
if (!reader.Read())
|
|
||||||
break;
|
|
||||||
|
|
||||||
if (ordinal >= 0)
|
|
||||||
{
|
|
||||||
_currentRow[ordinal] = ParseValue(ref reader, _schema[ordinal].ClrType);
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
// Skip unknown property value
|
|
||||||
reader.Skip();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void SkipWhitespace()
|
|
||||||
{
|
|
||||||
while (_streamReader.Peek() != -1 && char.IsWhiteSpace((char)_streamReader.Peek()))
|
|
||||||
{
|
|
||||||
_streamReader.Read();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void SkipWhitespaceAndExpect(char expected)
|
|
||||||
{
|
|
||||||
SkipWhitespace();
|
|
||||||
var actual = (char)_streamReader.Read();
|
|
||||||
if (actual != expected)
|
|
||||||
throw new InvalidDataException($"Expected '{expected}' but found '{actual}'.");
|
|
||||||
}
|
|
||||||
|
|
||||||
private static object? ParseValue(ref Utf8JsonReader reader, Type targetType)
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.Null)
|
|
||||||
return null;
|
|
||||||
|
|
||||||
if (targetType == typeof(string))
|
|
||||||
return reader.GetString();
|
|
||||||
|
|
||||||
if (targetType == typeof(int))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.Number)
|
|
||||||
return reader.TryGetInt32(out var i) ? i : (int)reader.GetDouble();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(long))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.Number)
|
|
||||||
return reader.TryGetInt64(out var l) ? l : (long)reader.GetDouble();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(decimal))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.Number)
|
|
||||||
return reader.TryGetDecimal(out var d) ? d : (decimal)reader.GetDouble();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(DateTime))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.String)
|
|
||||||
{
|
|
||||||
var str = reader.GetString();
|
|
||||||
if (str != null)
|
|
||||||
return DateTime.Parse(str, CultureInfo.InvariantCulture, DateTimeStyles.RoundtripKind);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(bool))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.True)
|
|
||||||
return true;
|
|
||||||
if (reader.TokenType == JsonTokenType.False)
|
|
||||||
return false;
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(byte[]))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.String)
|
|
||||||
return reader.GetBytesFromBase64();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (targetType == typeof(double))
|
|
||||||
{
|
|
||||||
if (reader.TokenType == JsonTokenType.Number)
|
|
||||||
return reader.GetDouble();
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new NotSupportedException($"Type {targetType.Name} is not supported.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// IDataReader methods - typed getters
|
|
||||||
public bool GetBoolean(int ordinal) => (bool)GetValue(ordinal);
|
|
||||||
public byte GetByte(int ordinal) => (byte)GetValue(ordinal);
|
|
||||||
public long GetBytes(int ordinal, long fieldOffset, byte[]? buffer, int bufferOffset, int length)
|
|
||||||
{
|
|
||||||
var data = (byte[])GetValue(ordinal);
|
|
||||||
if (buffer == null) return data.Length;
|
|
||||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
|
||||||
Array.Copy(data, fieldOffset, buffer, bufferOffset, toCopy);
|
|
||||||
return toCopy;
|
|
||||||
}
|
|
||||||
public char GetChar(int ordinal) => ((string)GetValue(ordinal))[0];
|
|
||||||
public long GetChars(int ordinal, long fieldOffset, char[]? buffer, int bufferOffset, int length)
|
|
||||||
{
|
|
||||||
var data = (string)GetValue(ordinal);
|
|
||||||
if (buffer == null) return data.Length;
|
|
||||||
var toCopy = Math.Min(length, data.Length - (int)fieldOffset);
|
|
||||||
data.CopyTo((int)fieldOffset, buffer, bufferOffset, toCopy);
|
|
||||||
return toCopy;
|
|
||||||
}
|
|
||||||
public IDataReader GetData(int ordinal) => throw new NotSupportedException();
|
|
||||||
public DateTime GetDateTime(int ordinal) => (DateTime)GetValue(ordinal);
|
|
||||||
public decimal GetDecimal(int ordinal) => (decimal)GetValue(ordinal);
|
|
||||||
public double GetDouble(int ordinal) => (double)GetValue(ordinal);
|
|
||||||
public float GetFloat(int ordinal) => (float)GetValue(ordinal);
|
|
||||||
public Guid GetGuid(int ordinal) => (Guid)GetValue(ordinal);
|
|
||||||
public short GetInt16(int ordinal) => (short)GetValue(ordinal);
|
|
||||||
public int GetInt32(int ordinal) => (int)GetValue(ordinal);
|
|
||||||
public long GetInt64(int ordinal) => (long)GetValue(ordinal);
|
|
||||||
public string GetString(int ordinal) => (string)GetValue(ordinal);
|
|
||||||
public int GetValues(object[] values)
|
|
||||||
{
|
|
||||||
var count = Math.Min(values.Length, _currentRow.Length);
|
|
||||||
for (int i = 0; i < count; i++)
|
|
||||||
values[i] = GetValue(i);
|
|
||||||
return count;
|
|
||||||
}
|
|
||||||
|
|
||||||
public DataTable GetSchemaTable()
|
|
||||||
{
|
|
||||||
var table = new DataTable("SchemaTable");
|
|
||||||
table.Columns.Add("ColumnName", typeof(string));
|
|
||||||
table.Columns.Add("ColumnOrdinal", typeof(int));
|
|
||||||
table.Columns.Add("DataType", typeof(Type));
|
|
||||||
table.Columns.Add("AllowDBNull", typeof(bool));
|
|
||||||
|
|
||||||
for (int i = 0; i < _schema.Length; i++)
|
|
||||||
{
|
|
||||||
table.Rows.Add(_schema[i].Name, i, _schema[i].ClrType, _schema[i].IsNullable);
|
|
||||||
}
|
|
||||||
|
|
||||||
return table;
|
|
||||||
}
|
|
||||||
|
|
||||||
public bool NextResult() => false;
|
|
||||||
|
|
||||||
public void Close() => Dispose();
|
|
||||||
|
|
||||||
public void Dispose()
|
|
||||||
{
|
|
||||||
if (!_disposed)
|
|
||||||
{
|
|
||||||
ArrayPool<byte>.Shared.Return(_objectBuffer);
|
|
||||||
_objectBuffer = Array.Empty<byte>();
|
|
||||||
_streamReader.Dispose();
|
|
||||||
_disposed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user