Full OPC UA server on .NET Framework 4.8 (x86) exposing AVEVA System Platform Galaxy tags via MXAccess. Mirrors Galaxy object hierarchy as OPC UA address space, translating contained-name browse paths to tag-name runtime references. Components implemented: - Configuration: AppConfiguration with 4 sections, validator - Domain: ConnectionState, Quality, Vtq, MxDataTypeMapper, error codes - MxAccess: StaComThread, MxAccessClient (partial classes), MxProxyAdapter using strongly-typed ArchestrA.MxAccess COM interop - Galaxy Repository: SQL queries (hierarchy, attributes, change detection), ChangeDetectionService with auto-rebuild on deploy - OPC UA Server: LmxNodeManager (CustomNodeManager2), LmxOpcUaServer, OpcUaServerHost with programmatic config, SecurityPolicy None - Status Dashboard: HTTP server with HTML/JSON/health endpoints - Integration: Full 14-step startup, graceful shutdown, component wiring 175 tests (174 unit + 1 integration), all passing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
123 lines
5.0 KiB
Python
123 lines
5.0 KiB
Python
import re, os
|
|
|
|
base = r"C:\Users\dohertj2\Desktop\gr"
|
|
ddl_dir = os.path.join(base, "ddl", "tables")
|
|
os.makedirs(ddl_dir, exist_ok=True)
|
|
|
|
with open(os.path.join(base, "table_dump_raw.txt"), "r", encoding="utf-8") as f:
|
|
content = f.read()
|
|
|
|
tables = re.findall(r'---TABLE_START:(.+?)---(.+?)---TABLE_END:\1---', content, re.DOTALL)
|
|
|
|
schema_lines = ["# Schema Reference\n\nGenerated from the ZB (Galaxy Repository) database.\n\n## Tables\n"]
|
|
|
|
for tbl_name, tbl_content in tables:
|
|
tbl_name = tbl_name.strip()
|
|
|
|
# Parse columns
|
|
sections = tbl_content.strip().split("COLUMN_NAME")
|
|
cols = []
|
|
pk_cols = []
|
|
fks = []
|
|
|
|
# Parse column definitions (first section after header)
|
|
if len(sections) >= 2:
|
|
col_section = sections[1]
|
|
lines = [l.strip() for l in col_section.split("\n") if l.strip() and not l.strip().startswith("---")]
|
|
for line in lines:
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 6 and parts[0] not in ("", "COLUMN_NAME") and not all(c in "-|" for c in parts[0]):
|
|
col_name = parts[0]
|
|
data_type = parts[1]
|
|
char_len = parts[2] if parts[2] != "NULL" else None
|
|
num_prec = parts[3] if parts[3] != "NULL" else None
|
|
num_scale = parts[4] if parts[4] != "NULL" else None
|
|
nullable = parts[5]
|
|
default_val = parts[6] if len(parts) > 6 and parts[6] != "NULL" else None
|
|
|
|
# Build type string
|
|
if char_len:
|
|
type_str = f"{data_type}({char_len})"
|
|
elif num_prec and data_type not in ("int", "smallint", "bigint", "tinyint", "bit"):
|
|
type_str = f"{data_type}({num_prec},{num_scale})"
|
|
else:
|
|
type_str = data_type
|
|
|
|
cols.append({
|
|
"name": col_name,
|
|
"type": type_str,
|
|
"nullable": nullable,
|
|
"default": default_val
|
|
})
|
|
|
|
# Parse PK columns (second COLUMN_NAME section)
|
|
if len(sections) >= 3:
|
|
pk_section = sections[2]
|
|
lines = [l.strip() for l in pk_section.split("\n") if l.strip() and not l.strip().startswith("---") and not l.strip().startswith("fk_")]
|
|
for line in lines:
|
|
parts = [p.strip() for p in line.split("|")]
|
|
pk_candidate = parts[0]
|
|
if pk_candidate and pk_candidate not in ("", "COLUMN_NAME") and not all(c in "-|" for c in pk_candidate):
|
|
# Stop if we hit the FK header
|
|
if "ref_table" in line or "fk_column" in line:
|
|
break
|
|
pk_cols.append(pk_candidate)
|
|
|
|
# Parse FKs
|
|
fk_matches = re.findall(r'fk_column\|ref_table\|ref_column\|fk_name\n-+\|.+\n(.*?)(?=---TABLE_END|$)', tbl_content, re.DOTALL)
|
|
if fk_matches:
|
|
for fk_block in fk_matches:
|
|
for line in fk_block.strip().split("\n"):
|
|
line = line.strip()
|
|
if line and "|" in line:
|
|
parts = [p.strip() for p in line.split("|")]
|
|
if len(parts) >= 3 and parts[0] and not all(c in "-" for c in parts[0]):
|
|
fks.append({"col": parts[0], "ref_table": parts[1], "ref_col": parts[2]})
|
|
|
|
# Write DDL file
|
|
ddl_lines = [f"-- Table: {tbl_name}\n"]
|
|
ddl_lines.append(f"CREATE TABLE [{tbl_name}] (")
|
|
col_defs = []
|
|
for c in cols:
|
|
line = f" [{c['name']}] {c['type']}"
|
|
if c['nullable'] == 'NO':
|
|
line += " NOT NULL"
|
|
else:
|
|
line += " NULL"
|
|
if c['default']:
|
|
line += f" DEFAULT {c['default']}"
|
|
col_defs.append(line)
|
|
if pk_cols:
|
|
col_defs.append(f" CONSTRAINT [PK_{tbl_name}] PRIMARY KEY ({', '.join('[' + c + ']' for c in pk_cols)})")
|
|
ddl_lines.append(",\n".join(col_defs))
|
|
ddl_lines.append(");\nGO\n")
|
|
|
|
if fks:
|
|
for fk in fks:
|
|
ddl_lines.append(f"ALTER TABLE [{tbl_name}] ADD FOREIGN KEY ([{fk['col']}]) REFERENCES [{fk['ref_table']}] ([{fk['ref_col']}]);")
|
|
ddl_lines.append("GO\n")
|
|
|
|
with open(os.path.join(ddl_dir, f"{tbl_name}.sql"), "w", encoding="utf-8") as f:
|
|
f.write("\n".join(ddl_lines))
|
|
|
|
# Build schema.md entry
|
|
schema_lines.append(f"### {tbl_name}\n")
|
|
schema_lines.append("| Column | Type | Nullable | Notes |")
|
|
schema_lines.append("|--------|------|----------|-------|")
|
|
for c in cols:
|
|
notes = []
|
|
if c['name'] in pk_cols:
|
|
notes.append("PK")
|
|
for fk in fks:
|
|
if fk['col'] == c['name']:
|
|
notes.append(f"FK -> {fk['ref_table']}.{fk['ref_col']}")
|
|
if c['default']:
|
|
notes.append(f"Default: {c['default']}")
|
|
schema_lines.append(f"| {c['name']} | {c['type']} | {c['nullable']} | {' '.join(notes)} |")
|
|
schema_lines.append("")
|
|
|
|
print(f"Processed {len(tables)} tables")
|
|
|
|
with open(os.path.join(base, "schema_tables.md"), "w", encoding="utf-8") as f:
|
|
f.write("\n".join(schema_lines))
|