feat(DbExporter): implement database export with protobuf+zstd
Adds DatabaseExporter class that exports query results to compressed protobuf format. Supports SQL Server and Oracle providers with streaming compression and SHA256 hash verification.
This commit is contained in:
@@ -0,0 +1,135 @@
|
||||
using System.Data;
|
||||
using System.Data.Common;
|
||||
using System.Security.Cryptography;
|
||||
using Microsoft.Data.SqlClient;
|
||||
using Oracle.ManagedDataAccess.Client;
|
||||
using ProtoBuf.Data;
|
||||
using ZstdSharp;
|
||||
|
||||
namespace DbExporter;
|
||||
|
||||
public sealed class DatabaseExporter
|
||||
{
|
||||
public record ExportResult(int RowCount, long UncompressedSize, long CompressedSize, string Sha256Hash);
|
||||
|
||||
public async Task<ExportResult> ExportAsync(ExportDefinition definition, CancellationToken cancellationToken = default)
|
||||
{
|
||||
// Ensure output directory exists
|
||||
var outputDir = Path.GetDirectoryName(definition.OutputPath);
|
||||
if (!string.IsNullOrEmpty(outputDir))
|
||||
Directory.CreateDirectory(outputDir);
|
||||
|
||||
await using var connection = CreateConnection(definition.ProviderType, definition.ConnectionString);
|
||||
await connection.OpenAsync(cancellationToken);
|
||||
|
||||
await using var command = connection.CreateCommand();
|
||||
command.CommandText = definition.Query;
|
||||
command.CommandTimeout = 0; // No timeout for large exports
|
||||
|
||||
await using var reader = await command.ExecuteReaderAsync(cancellationToken);
|
||||
|
||||
long uncompressedSize = 0;
|
||||
|
||||
// Use a counting stream wrapper to track uncompressed bytes
|
||||
using var sha256 = SHA256.Create();
|
||||
await using var outputFile = new FileStream(definition.OutputPath, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024);
|
||||
await using var compressStream = new CompressionStream(outputFile, definition.CompressionLevel);
|
||||
await using var countingStream = new CountingStream(compressStream);
|
||||
await using var hashStream = new CryptoStream(countingStream, sha256, CryptoStreamMode.Write);
|
||||
|
||||
// Serialize to protobuf
|
||||
DataSerializer.Serialize(hashStream, reader);
|
||||
|
||||
hashStream.FlushFinalBlock();
|
||||
uncompressedSize = countingStream.BytesWritten;
|
||||
|
||||
var hash = Convert.ToHexString(sha256.Hash!).ToLowerInvariant();
|
||||
|
||||
// Write sidecar hash file
|
||||
var hashFilePath = definition.OutputPath + ".sha256";
|
||||
await File.WriteAllTextAsync(hashFilePath, hash, cancellationToken);
|
||||
|
||||
var compressedSize = new FileInfo(definition.OutputPath).Length;
|
||||
|
||||
// Row count requires a separate pass or we estimate from verify
|
||||
// Return 0 for now, verify will get accurate count
|
||||
return new ExportResult(0, uncompressedSize, compressedSize, hash);
|
||||
}
|
||||
|
||||
private static DbConnection CreateConnection(string providerType, string connectionString)
|
||||
{
|
||||
return providerType.ToLowerInvariant() switch
|
||||
{
|
||||
"sqlserver" => new SqlConnection(connectionString),
|
||||
"oracle" => new OracleConnection(connectionString),
|
||||
_ => throw new ArgumentException($"Unknown provider type: {providerType}. Use 'SqlServer' or 'Oracle'.")
|
||||
};
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Stream wrapper that counts bytes written through it.
|
||||
/// </summary>
|
||||
private sealed class CountingStream : Stream
|
||||
{
|
||||
private readonly Stream _inner;
|
||||
private long _bytesWritten;
|
||||
|
||||
public CountingStream(Stream inner) => _inner = inner;
|
||||
|
||||
public long BytesWritten => _bytesWritten;
|
||||
|
||||
public override bool CanRead => false;
|
||||
public override bool CanSeek => false;
|
||||
public override bool CanWrite => true;
|
||||
public override long Length => throw new NotSupportedException();
|
||||
public override long Position
|
||||
{
|
||||
get => throw new NotSupportedException();
|
||||
set => throw new NotSupportedException();
|
||||
}
|
||||
|
||||
public override void Flush() => _inner.Flush();
|
||||
public override Task FlushAsync(CancellationToken cancellationToken) => _inner.FlushAsync(cancellationToken);
|
||||
|
||||
public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException();
|
||||
public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException();
|
||||
public override void SetLength(long value) => throw new NotSupportedException();
|
||||
|
||||
public override void Write(byte[] buffer, int offset, int count)
|
||||
{
|
||||
_inner.Write(buffer, offset, count);
|
||||
_bytesWritten += count;
|
||||
}
|
||||
|
||||
public override void Write(ReadOnlySpan<byte> buffer)
|
||||
{
|
||||
_inner.Write(buffer);
|
||||
_bytesWritten += buffer.Length;
|
||||
}
|
||||
|
||||
public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
|
||||
{
|
||||
await _inner.WriteAsync(buffer, offset, count, cancellationToken);
|
||||
_bytesWritten += count;
|
||||
}
|
||||
|
||||
public override async ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
|
||||
{
|
||||
await _inner.WriteAsync(buffer, cancellationToken);
|
||||
_bytesWritten += buffer.Length;
|
||||
}
|
||||
|
||||
protected override void Dispose(bool disposing)
|
||||
{
|
||||
if (disposing)
|
||||
_inner.Dispose();
|
||||
base.Dispose(disposing);
|
||||
}
|
||||
|
||||
public override async ValueTask DisposeAsync()
|
||||
{
|
||||
await _inner.DisposeAsync();
|
||||
await base.DisposeAsync();
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user