From 6f58bfd8ccdad9f411537d4c0e34453e1dbbf505 Mon Sep 17 00:00:00 2001 From: Joseph Doherty Date: Tue, 6 Jan 2026 16:52:43 -0500 Subject: [PATCH] feat(DbExporter): implement database export with protobuf+zstd Adds DatabaseExporter class that exports query results to compressed protobuf format. Supports SQL Server and Oracle providers with streaming compression and SHA256 hash verification. --- Tools/DbExporter/DatabaseExporter.cs | 135 +++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 Tools/DbExporter/DatabaseExporter.cs diff --git a/Tools/DbExporter/DatabaseExporter.cs b/Tools/DbExporter/DatabaseExporter.cs new file mode 100644 index 0000000..33b4206 --- /dev/null +++ b/Tools/DbExporter/DatabaseExporter.cs @@ -0,0 +1,135 @@ +using System.Data; +using System.Data.Common; +using System.Security.Cryptography; +using Microsoft.Data.SqlClient; +using Oracle.ManagedDataAccess.Client; +using ProtoBuf.Data; +using ZstdSharp; + +namespace DbExporter; + +public sealed class DatabaseExporter +{ + public record ExportResult(int RowCount, long UncompressedSize, long CompressedSize, string Sha256Hash); + + public async Task ExportAsync(ExportDefinition definition, CancellationToken cancellationToken = default) + { + // Ensure output directory exists + var outputDir = Path.GetDirectoryName(definition.OutputPath); + if (!string.IsNullOrEmpty(outputDir)) + Directory.CreateDirectory(outputDir); + + await using var connection = CreateConnection(definition.ProviderType, definition.ConnectionString); + await connection.OpenAsync(cancellationToken); + + await using var command = connection.CreateCommand(); + command.CommandText = definition.Query; + command.CommandTimeout = 0; // No timeout for large exports + + await using var reader = await command.ExecuteReaderAsync(cancellationToken); + + long uncompressedSize = 0; + + // Use a counting stream wrapper to track uncompressed bytes + using var sha256 = SHA256.Create(); + await using var outputFile = new FileStream(definition.OutputPath, FileMode.Create, FileAccess.Write, FileShare.None, 256 * 1024); + await using var compressStream = new CompressionStream(outputFile, definition.CompressionLevel); + await using var countingStream = new CountingStream(compressStream); + await using var hashStream = new CryptoStream(countingStream, sha256, CryptoStreamMode.Write); + + // Serialize to protobuf + DataSerializer.Serialize(hashStream, reader); + + hashStream.FlushFinalBlock(); + uncompressedSize = countingStream.BytesWritten; + + var hash = Convert.ToHexString(sha256.Hash!).ToLowerInvariant(); + + // Write sidecar hash file + var hashFilePath = definition.OutputPath + ".sha256"; + await File.WriteAllTextAsync(hashFilePath, hash, cancellationToken); + + var compressedSize = new FileInfo(definition.OutputPath).Length; + + // Row count requires a separate pass or we estimate from verify + // Return 0 for now, verify will get accurate count + return new ExportResult(0, uncompressedSize, compressedSize, hash); + } + + private static DbConnection CreateConnection(string providerType, string connectionString) + { + return providerType.ToLowerInvariant() switch + { + "sqlserver" => new SqlConnection(connectionString), + "oracle" => new OracleConnection(connectionString), + _ => throw new ArgumentException($"Unknown provider type: {providerType}. Use 'SqlServer' or 'Oracle'.") + }; + } + + /// + /// Stream wrapper that counts bytes written through it. + /// + private sealed class CountingStream : Stream + { + private readonly Stream _inner; + private long _bytesWritten; + + public CountingStream(Stream inner) => _inner = inner; + + public long BytesWritten => _bytesWritten; + + public override bool CanRead => false; + public override bool CanSeek => false; + public override bool CanWrite => true; + public override long Length => throw new NotSupportedException(); + public override long Position + { + get => throw new NotSupportedException(); + set => throw new NotSupportedException(); + } + + public override void Flush() => _inner.Flush(); + public override Task FlushAsync(CancellationToken cancellationToken) => _inner.FlushAsync(cancellationToken); + + public override int Read(byte[] buffer, int offset, int count) => throw new NotSupportedException(); + public override long Seek(long offset, SeekOrigin origin) => throw new NotSupportedException(); + public override void SetLength(long value) => throw new NotSupportedException(); + + public override void Write(byte[] buffer, int offset, int count) + { + _inner.Write(buffer, offset, count); + _bytesWritten += count; + } + + public override void Write(ReadOnlySpan buffer) + { + _inner.Write(buffer); + _bytesWritten += buffer.Length; + } + + public override async Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) + { + await _inner.WriteAsync(buffer, offset, count, cancellationToken); + _bytesWritten += count; + } + + public override async ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) + { + await _inner.WriteAsync(buffer, cancellationToken); + _bytesWritten += buffer.Length; + } + + protected override void Dispose(bool disposing) + { + if (disposing) + _inner.Dispose(); + base.Dispose(disposing); + } + + public override async ValueTask DisposeAsync() + { + await _inner.DisposeAsync(); + await base.DisposeAsync(); + } + } +}