docs: update documentation for extraction functions migration
- Add ExtractionFunctions.md reference document - Update database-schema spec with 11 extraction functions - Update data-access spec to document extraction function approach - Update search-processing spec with new query builder interface - Add Database.Tests to Testing.md architecture doc - Update DataFlow.md with extraction function flow
This commit is contained in:
+61
-121
@@ -1,5 +1,4 @@
|
||||
using System.Data;
|
||||
using System.Text;
|
||||
using System.Text.Json;
|
||||
using System.Text.RegularExpressions;
|
||||
using ProtoBuf.Data;
|
||||
@@ -102,55 +101,32 @@ foreach (var jsonFile in jsonFiles)
|
||||
}
|
||||
|
||||
// Stream JSON and write to protobuf in batches
|
||||
using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan);
|
||||
using var decompressStream = new DecompressionStream(inputFs);
|
||||
using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024);
|
||||
using var compressStream = new CompressionStream(outputFs, level: 3);
|
||||
await using var inputFs = new FileStream(jsonFile, FileMode.Open, FileAccess.Read, FileShare.Read, 256 * 1024, FileOptions.SequentialScan | FileOptions.Asynchronous);
|
||||
await using var decompressStream = new DecompressionStream(inputFs);
|
||||
await using var outputFs = new FileStream(outputFile, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024, FileOptions.Asynchronous);
|
||||
await using var compressStream = new CompressionStream(outputFs, level: 3);
|
||||
|
||||
int rowCount = 0;
|
||||
int batchCount = 0;
|
||||
|
||||
// Stream JSON records one at a time
|
||||
var buffer = new byte[4096];
|
||||
using var memoryStream = new MemoryStream();
|
||||
|
||||
int bytesRead;
|
||||
while ((bytesRead = decompressStream.Read(buffer, 0, buffer.Length)) > 0)
|
||||
// True streaming: DeserializeAsyncEnumerable streams each array element without loading entire JSON
|
||||
var jsonOptions = new JsonSerializerOptions { PropertyNameCaseInsensitive = true };
|
||||
await foreach (var element in JsonSerializer.DeserializeAsyncEnumerable<JsonElement>(
|
||||
decompressStream,
|
||||
jsonOptions))
|
||||
{
|
||||
memoryStream.Write(buffer, 0, bytesRead);
|
||||
}
|
||||
var row = dataTable.NewRow();
|
||||
ReadJsonElement(element, row, dataTable);
|
||||
dataTable.Rows.Add(row);
|
||||
rowCount++;
|
||||
|
||||
memoryStream.Position = 0;
|
||||
var jsonReader = new Utf8JsonReader(memoryStream.ToArray(), new JsonReaderOptions { AllowTrailingCommas = true });
|
||||
|
||||
// Skip to start of array
|
||||
while (jsonReader.Read())
|
||||
{
|
||||
if (jsonReader.TokenType == JsonTokenType.StartArray)
|
||||
break;
|
||||
}
|
||||
|
||||
// Read each object in the array
|
||||
while (jsonReader.Read())
|
||||
{
|
||||
if (jsonReader.TokenType == JsonTokenType.EndArray)
|
||||
break;
|
||||
|
||||
if (jsonReader.TokenType == JsonTokenType.StartObject)
|
||||
// Write batch when we hit the batch size
|
||||
if (dataTable.Rows.Count >= BatchSize)
|
||||
{
|
||||
var row = dataTable.NewRow();
|
||||
ReadJsonObject(ref jsonReader, row, dataTable);
|
||||
dataTable.Rows.Add(row);
|
||||
rowCount++;
|
||||
|
||||
// Write batch when we hit the batch size
|
||||
if (dataTable.Rows.Count >= BatchSize)
|
||||
{
|
||||
using var reader = dataTable.CreateDataReader();
|
||||
DataSerializer.Serialize(compressStream, reader);
|
||||
dataTable.Clear();
|
||||
batchCount++;
|
||||
}
|
||||
using var reader = dataTable.CreateDataReader();
|
||||
DataSerializer.Serialize(compressStream, reader);
|
||||
dataTable.Clear();
|
||||
batchCount++;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -162,7 +138,7 @@ foreach (var jsonFile in jsonFiles)
|
||||
batchCount++;
|
||||
}
|
||||
|
||||
compressStream.Flush();
|
||||
await compressStream.FlushAsync();
|
||||
|
||||
var newSize = new FileInfo(outputFile).Length;
|
||||
totalNewSize += newSize;
|
||||
@@ -243,68 +219,58 @@ static Type MapSqlTypeToNet(string sqlType) => sqlType.ToUpperInvariant() switch
|
||||
};
|
||||
|
||||
/// <summary>
|
||||
/// Read a JSON object into a DataRow using streaming reader.
|
||||
/// Read a JsonElement (object) into a DataRow.
|
||||
/// </summary>
|
||||
static void ReadJsonObject(ref Utf8JsonReader reader, DataRow row, DataTable table)
|
||||
static void ReadJsonElement(JsonElement element, DataRow row, DataTable table)
|
||||
{
|
||||
while (reader.Read())
|
||||
foreach (var property in element.EnumerateObject())
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.EndObject)
|
||||
break;
|
||||
|
||||
if (reader.TokenType == JsonTokenType.PropertyName)
|
||||
// Find matching column (case-insensitive)
|
||||
DataColumn? column = null;
|
||||
foreach (DataColumn col in table.Columns)
|
||||
{
|
||||
var propertyName = reader.GetString()!;
|
||||
reader.Read(); // Move to value
|
||||
|
||||
// Find matching column (case-insensitive)
|
||||
DataColumn? column = null;
|
||||
foreach (DataColumn col in table.Columns)
|
||||
if (col.ColumnName.Equals(property.Name, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
if (col.ColumnName.Equals(propertyName, StringComparison.OrdinalIgnoreCase))
|
||||
{
|
||||
column = col;
|
||||
break;
|
||||
}
|
||||
column = col;
|
||||
break;
|
||||
}
|
||||
|
||||
if (column == null)
|
||||
{
|
||||
// Skip unknown property
|
||||
SkipJsonValue(ref reader);
|
||||
continue;
|
||||
}
|
||||
|
||||
row[column] = ReadJsonValue(ref reader, column.DataType);
|
||||
}
|
||||
|
||||
if (column == null)
|
||||
{
|
||||
// Skip unknown property
|
||||
continue;
|
||||
}
|
||||
|
||||
row[column] = ReadJsonElementValue(property.Value, column.DataType);
|
||||
}
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Read a JSON value and convert to the target .NET type.
|
||||
/// Read a JSON value from JsonElement and convert to the target .NET type.
|
||||
/// </summary>
|
||||
static object ReadJsonValue(ref Utf8JsonReader reader, Type targetType)
|
||||
static object ReadJsonElementValue(JsonElement element, Type targetType)
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.Null)
|
||||
if (element.ValueKind == JsonValueKind.Null || element.ValueKind == JsonValueKind.Undefined)
|
||||
return DBNull.Value;
|
||||
|
||||
if (targetType == typeof(string))
|
||||
{
|
||||
return reader.TokenType switch
|
||||
return element.ValueKind switch
|
||||
{
|
||||
JsonTokenType.String => reader.GetString() ?? (object)DBNull.Value,
|
||||
JsonTokenType.Number => reader.GetDecimal().ToString(),
|
||||
JsonTokenType.True => "true",
|
||||
JsonTokenType.False => "false",
|
||||
JsonValueKind.String => element.GetString() ?? (object)DBNull.Value,
|
||||
JsonValueKind.Number => element.GetDecimal().ToString(),
|
||||
JsonValueKind.True => "true",
|
||||
JsonValueKind.False => "false",
|
||||
_ => DBNull.Value
|
||||
};
|
||||
}
|
||||
|
||||
if (targetType == typeof(DateTime))
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.String)
|
||||
if (element.ValueKind == JsonValueKind.String)
|
||||
{
|
||||
var str = reader.GetString();
|
||||
var str = element.GetString();
|
||||
if (str != null && DateTime.TryParse(str, out var dt))
|
||||
return dt;
|
||||
}
|
||||
@@ -313,71 +279,45 @@ static object ReadJsonValue(ref Utf8JsonReader reader, Type targetType)
|
||||
|
||||
if (targetType == typeof(long))
|
||||
{
|
||||
return reader.TokenType switch
|
||||
return element.ValueKind switch
|
||||
{
|
||||
JsonTokenType.Number => reader.GetInt64(),
|
||||
JsonTokenType.String when long.TryParse(reader.GetString(), out var val) => val,
|
||||
JsonValueKind.Number => element.GetInt64(),
|
||||
JsonValueKind.String when long.TryParse(element.GetString(), out var val) => val,
|
||||
_ => DBNull.Value
|
||||
};
|
||||
}
|
||||
|
||||
if (targetType == typeof(int))
|
||||
{
|
||||
return reader.TokenType switch
|
||||
return element.ValueKind switch
|
||||
{
|
||||
JsonTokenType.Number => reader.GetInt32(),
|
||||
JsonTokenType.String when int.TryParse(reader.GetString(), out var val) => val,
|
||||
JsonValueKind.Number => element.GetInt32(),
|
||||
JsonValueKind.String when int.TryParse(element.GetString(), out var val) => val,
|
||||
_ => DBNull.Value
|
||||
};
|
||||
}
|
||||
|
||||
if (targetType == typeof(decimal))
|
||||
{
|
||||
return reader.TokenType switch
|
||||
return element.ValueKind switch
|
||||
{
|
||||
JsonTokenType.Number => reader.GetDecimal(),
|
||||
JsonTokenType.String when decimal.TryParse(reader.GetString(), out var val) => val,
|
||||
JsonValueKind.Number => element.GetDecimal(),
|
||||
JsonValueKind.String when decimal.TryParse(element.GetString(), out var val) => val,
|
||||
_ => DBNull.Value
|
||||
};
|
||||
}
|
||||
|
||||
if (targetType == typeof(bool))
|
||||
{
|
||||
return reader.TokenType switch
|
||||
return element.ValueKind switch
|
||||
{
|
||||
JsonTokenType.True => true,
|
||||
JsonTokenType.False => false,
|
||||
JsonTokenType.Number => reader.GetInt32() != 0,
|
||||
JsonTokenType.String => reader.GetString()?.Equals("true", StringComparison.OrdinalIgnoreCase) ?? false,
|
||||
JsonValueKind.True => true,
|
||||
JsonValueKind.False => false,
|
||||
JsonValueKind.Number => element.GetInt32() != 0,
|
||||
JsonValueKind.String => element.GetString()?.Equals("true", StringComparison.OrdinalIgnoreCase) ?? false,
|
||||
_ => DBNull.Value
|
||||
};
|
||||
}
|
||||
|
||||
return DBNull.Value;
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Skip a JSON value (used for unknown properties).
|
||||
/// </summary>
|
||||
static void SkipJsonValue(ref Utf8JsonReader reader)
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.StartObject)
|
||||
{
|
||||
int depth = 1;
|
||||
while (depth > 0 && reader.Read())
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.StartObject) depth++;
|
||||
else if (reader.TokenType == JsonTokenType.EndObject) depth--;
|
||||
}
|
||||
}
|
||||
else if (reader.TokenType == JsonTokenType.StartArray)
|
||||
{
|
||||
int depth = 1;
|
||||
while (depth > 0 && reader.Read())
|
||||
{
|
||||
if (reader.TokenType == JsonTokenType.StartArray) depth++;
|
||||
else if (reader.TokenType == JsonTokenType.EndArray) depth--;
|
||||
}
|
||||
}
|
||||
// Simple values are already consumed by the Read() call
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user