Files
lmxopcua/gr/parse_tables.py
Joseph Doherty a7576ffb38 Implement LmxOpcUa server — all 6 phases complete
Full OPC UA server on .NET Framework 4.8 (x86) exposing AVEVA System
Platform Galaxy tags via MXAccess. Mirrors Galaxy object hierarchy as
OPC UA address space, translating contained-name browse paths to
tag-name runtime references.

Components implemented:
- Configuration: AppConfiguration with 4 sections, validator
- Domain: ConnectionState, Quality, Vtq, MxDataTypeMapper, error codes
- MxAccess: StaComThread, MxAccessClient (partial classes), MxProxyAdapter
  using strongly-typed ArchestrA.MxAccess COM interop
- Galaxy Repository: SQL queries (hierarchy, attributes, change detection),
  ChangeDetectionService with auto-rebuild on deploy
- OPC UA Server: LmxNodeManager (CustomNodeManager2), LmxOpcUaServer,
  OpcUaServerHost with programmatic config, SecurityPolicy None
- Status Dashboard: HTTP server with HTML/JSON/health endpoints
- Integration: Full 14-step startup, graceful shutdown, component wiring

175 tests (174 unit + 1 integration), all passing.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-25 05:55:27 -04:00

123 lines
5.0 KiB
Python

import re, os
base = r"C:\Users\dohertj2\Desktop\gr"
ddl_dir = os.path.join(base, "ddl", "tables")
os.makedirs(ddl_dir, exist_ok=True)
with open(os.path.join(base, "table_dump_raw.txt"), "r", encoding="utf-8") as f:
content = f.read()
tables = re.findall(r'---TABLE_START:(.+?)---(.+?)---TABLE_END:\1---', content, re.DOTALL)
schema_lines = ["# Schema Reference\n\nGenerated from the ZB (Galaxy Repository) database.\n\n## Tables\n"]
for tbl_name, tbl_content in tables:
tbl_name = tbl_name.strip()
# Parse columns
sections = tbl_content.strip().split("COLUMN_NAME")
cols = []
pk_cols = []
fks = []
# Parse column definitions (first section after header)
if len(sections) >= 2:
col_section = sections[1]
lines = [l.strip() for l in col_section.split("\n") if l.strip() and not l.strip().startswith("---")]
for line in lines:
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 6 and parts[0] not in ("", "COLUMN_NAME") and not all(c in "-|" for c in parts[0]):
col_name = parts[0]
data_type = parts[1]
char_len = parts[2] if parts[2] != "NULL" else None
num_prec = parts[3] if parts[3] != "NULL" else None
num_scale = parts[4] if parts[4] != "NULL" else None
nullable = parts[5]
default_val = parts[6] if len(parts) > 6 and parts[6] != "NULL" else None
# Build type string
if char_len:
type_str = f"{data_type}({char_len})"
elif num_prec and data_type not in ("int", "smallint", "bigint", "tinyint", "bit"):
type_str = f"{data_type}({num_prec},{num_scale})"
else:
type_str = data_type
cols.append({
"name": col_name,
"type": type_str,
"nullable": nullable,
"default": default_val
})
# Parse PK columns (second COLUMN_NAME section)
if len(sections) >= 3:
pk_section = sections[2]
lines = [l.strip() for l in pk_section.split("\n") if l.strip() and not l.strip().startswith("---") and not l.strip().startswith("fk_")]
for line in lines:
parts = [p.strip() for p in line.split("|")]
pk_candidate = parts[0]
if pk_candidate and pk_candidate not in ("", "COLUMN_NAME") and not all(c in "-|" for c in pk_candidate):
# Stop if we hit the FK header
if "ref_table" in line or "fk_column" in line:
break
pk_cols.append(pk_candidate)
# Parse FKs
fk_matches = re.findall(r'fk_column\|ref_table\|ref_column\|fk_name\n-+\|.+\n(.*?)(?=---TABLE_END|$)', tbl_content, re.DOTALL)
if fk_matches:
for fk_block in fk_matches:
for line in fk_block.strip().split("\n"):
line = line.strip()
if line and "|" in line:
parts = [p.strip() for p in line.split("|")]
if len(parts) >= 3 and parts[0] and not all(c in "-" for c in parts[0]):
fks.append({"col": parts[0], "ref_table": parts[1], "ref_col": parts[2]})
# Write DDL file
ddl_lines = [f"-- Table: {tbl_name}\n"]
ddl_lines.append(f"CREATE TABLE [{tbl_name}] (")
col_defs = []
for c in cols:
line = f" [{c['name']}] {c['type']}"
if c['nullable'] == 'NO':
line += " NOT NULL"
else:
line += " NULL"
if c['default']:
line += f" DEFAULT {c['default']}"
col_defs.append(line)
if pk_cols:
col_defs.append(f" CONSTRAINT [PK_{tbl_name}] PRIMARY KEY ({', '.join('[' + c + ']' for c in pk_cols)})")
ddl_lines.append(",\n".join(col_defs))
ddl_lines.append(");\nGO\n")
if fks:
for fk in fks:
ddl_lines.append(f"ALTER TABLE [{tbl_name}] ADD FOREIGN KEY ([{fk['col']}]) REFERENCES [{fk['ref_table']}] ([{fk['ref_col']}]);")
ddl_lines.append("GO\n")
with open(os.path.join(ddl_dir, f"{tbl_name}.sql"), "w", encoding="utf-8") as f:
f.write("\n".join(ddl_lines))
# Build schema.md entry
schema_lines.append(f"### {tbl_name}\n")
schema_lines.append("| Column | Type | Nullable | Notes |")
schema_lines.append("|--------|------|----------|-------|")
for c in cols:
notes = []
if c['name'] in pk_cols:
notes.append("PK")
for fk in fks:
if fk['col'] == c['name']:
notes.append(f"FK -> {fk['ref_table']}.{fk['ref_col']}")
if c['default']:
notes.append(f"Default: {c['default']}")
schema_lines.append(f"| {c['name']} | {c['type']} | {c['nullable']} | {' '.join(notes)} |")
schema_lines.append("")
print(f"Processed {len(tables)} tables")
with open(os.path.join(base, "schema_tables.md"), "w", encoding="utf-8") as f:
f.write("\n".join(schema_lines))