Stand up local dev infrastructure (OPC UA, LDAP, MS SQL) with Docker Compose, Python CLI tools for service interaction, and teardown script. Fix GLAuth config mount, OPC PLC node format, and document actual DN/namespace behavior discovered during testing. Resolve Q1-Q8,Q10: .NET 10, Akka.NET 1.5.x, monorepo with slnx, appsettings JWT, Windows Server 2022 site target.
232 lines
8.4 KiB
Python
232 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""LDAP client tool for ScadaLink test infrastructure."""
|
|
|
|
import argparse
|
|
import sys
|
|
|
|
from ldap3 import Server, Connection, NONE, SUBTREE, SIMPLE
|
|
|
|
|
|
DEFAULT_HOST = "localhost"
|
|
DEFAULT_PORT = 3893
|
|
DEFAULT_BASE_DN = "dc=scadalink,dc=local"
|
|
# GLAuth places users under ou=<PrimaryGroupName>,ou=users,dc=...
|
|
# The admin user (primarygroup SCADA-Admins) needs search capabilities in config.
|
|
DEFAULT_BIND_DN = "cn=admin,ou=SCADA-Admins,ou=users,dc=scadalink,dc=local"
|
|
DEFAULT_BIND_PASSWORD = "password"
|
|
|
|
|
|
def make_connection(args):
|
|
"""Create and return an authenticated LDAP connection."""
|
|
server = Server(args.host, port=args.port, get_info=NONE)
|
|
bind_dn = getattr(args, "bind_dn", DEFAULT_BIND_DN) or DEFAULT_BIND_DN
|
|
bind_password = getattr(args, "bind_password", DEFAULT_BIND_PASSWORD) or DEFAULT_BIND_PASSWORD
|
|
conn = Connection(server, user=bind_dn, password=bind_password, authentication=SIMPLE, auto_bind=True)
|
|
return conn
|
|
|
|
|
|
def cmd_check(args):
|
|
"""Test LDAP connectivity and base DN search."""
|
|
try:
|
|
conn = make_connection(args)
|
|
print(f"Connected to: {args.host}:{args.port}")
|
|
print(f"Bind DN: {DEFAULT_BIND_DN}")
|
|
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=*)", search_scope=SUBTREE, attributes=["dn"])
|
|
print(f"Base DN search ({DEFAULT_BASE_DN}): {len(conn.entries)} entries found")
|
|
|
|
for entry in conn.entries:
|
|
print(f" {entry.entry_dn}")
|
|
|
|
conn.unbind()
|
|
print("\nLDAP server is healthy.")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_bind(args):
|
|
"""Test user authentication via bind.
|
|
|
|
GLAuth DN format: cn=<user>,ou=<PrimaryGroup>,ou=users,dc=scadalink,dc=local
|
|
Since we don't know the user's primary group upfront, we search for the user first
|
|
to discover the full DN, then rebind with that DN.
|
|
"""
|
|
try:
|
|
# First, use admin to find the user's actual DN
|
|
admin_conn = make_connection(args)
|
|
admin_conn.search(
|
|
DEFAULT_BASE_DN,
|
|
f"(cn={args.user})",
|
|
search_scope=SUBTREE,
|
|
attributes=["cn"],
|
|
)
|
|
|
|
if not admin_conn.entries:
|
|
print(f"User not found: {args.user}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Find the user entry (skip group entries that also match cn)
|
|
user_dn = None
|
|
for entry in admin_conn.entries:
|
|
dn = entry.entry_dn
|
|
if dn.startswith(f"cn={args.user},"):
|
|
user_dn = dn
|
|
break
|
|
|
|
admin_conn.unbind()
|
|
|
|
if not user_dn:
|
|
print(f"Could not resolve DN for user: {args.user}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Now bind with the discovered DN
|
|
server = Server(args.host, port=args.port, get_info=NONE)
|
|
conn = Connection(server, user=user_dn, password=args.password, authentication=SIMPLE, auto_bind=True)
|
|
print(f"Bind successful: {user_dn}")
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Bind failed for {args.user}: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_search(args):
|
|
"""Search with arbitrary LDAP filter."""
|
|
try:
|
|
conn = make_connection(args)
|
|
base = args.base or DEFAULT_BASE_DN
|
|
conn.search(base, args.filter, search_scope=SUBTREE, attributes=["*"])
|
|
|
|
print(f"Filter: {args.filter}")
|
|
print(f"Base: {base}")
|
|
print(f"Results: {len(conn.entries)}\n")
|
|
|
|
for entry in conn.entries:
|
|
print(f"DN: {entry.entry_dn}")
|
|
for attr in entry.entry_attributes:
|
|
print(f" {attr}: {entry[attr].value}")
|
|
print()
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_users(args):
|
|
"""List all users with group memberships."""
|
|
try:
|
|
conn = make_connection(args)
|
|
|
|
# Find all groups to map GIDs and memberships
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
|
|
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid"])
|
|
groups_by_gid = {}
|
|
groups_with_members = {}
|
|
for entry in conn.entries:
|
|
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else None
|
|
cn = entry["cn"].value
|
|
if gid:
|
|
groups_by_gid[int(gid)] = cn
|
|
member_uids = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
|
|
if isinstance(member_uids, str):
|
|
member_uids = [member_uids]
|
|
groups_with_members[cn] = member_uids
|
|
|
|
# Find all users
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixAccount)",
|
|
search_scope=SUBTREE, attributes=["cn", "mail", "gidNumber"])
|
|
|
|
print("Users:")
|
|
print(f"{'Username':<20} {'Email':<35} {'Primary Group':<25} {'Additional Groups'}")
|
|
print("-" * 110)
|
|
|
|
for entry in conn.entries:
|
|
cn = entry["cn"].value
|
|
mail = entry["mail"].value if "mail" in entry.entry_attributes else ""
|
|
gid = int(entry["gidNumber"].value) if "gidNumber" in entry.entry_attributes else None
|
|
primary = groups_by_gid.get(gid, str(gid)) if gid else ""
|
|
|
|
# Find additional groups via memberUid
|
|
additional = []
|
|
for group_name, members in groups_with_members.items():
|
|
if cn in members and group_name != primary:
|
|
additional.append(group_name)
|
|
|
|
print(f"{cn:<20} {mail:<35} {primary:<25} {', '.join(additional)}")
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_groups(args):
|
|
"""List all groups with member counts."""
|
|
try:
|
|
conn = make_connection(args)
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
|
|
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid", "description"])
|
|
|
|
print("Groups:")
|
|
print(f"{'Group':<25} {'GID':<10} {'Members'}")
|
|
print("-" * 70)
|
|
|
|
for entry in conn.entries:
|
|
cn = entry["cn"].value
|
|
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else ""
|
|
members = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
|
|
if isinstance(members, str):
|
|
members = [members]
|
|
member_list = ", ".join(members) if members else "(none)"
|
|
print(f"{cn:<25} {gid:<10} {member_list}")
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="LDAP client tool for ScadaLink test infrastructure")
|
|
parser.add_argument("--host", default=DEFAULT_HOST, help=f"LDAP host (default: {DEFAULT_HOST})")
|
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"LDAP port (default: {DEFAULT_PORT})")
|
|
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
sub.add_parser("check", help="Test LDAP connectivity")
|
|
|
|
bind_p = sub.add_parser("bind", help="Test user authentication")
|
|
bind_p.add_argument("--user", required=True, help="Username (e.g. designer)")
|
|
bind_p.add_argument("--password", default="password", help="Password (default: password)")
|
|
|
|
search_p = sub.add_parser("search", help="Search with LDAP filter")
|
|
search_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
search_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
search_p.add_argument("--filter", required=True, help="LDAP filter")
|
|
search_p.add_argument("--base", help=f"Search base (default: {DEFAULT_BASE_DN})")
|
|
|
|
users_p = sub.add_parser("users", help="List all users with group memberships")
|
|
users_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
users_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
|
|
groups_p = sub.add_parser("groups", help="List all groups with members")
|
|
groups_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
groups_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
|
|
args = parser.parse_args()
|
|
|
|
commands = {
|
|
"check": cmd_check,
|
|
"bind": cmd_bind,
|
|
"search": cmd_search,
|
|
"users": cmd_users,
|
|
"groups": cmd_groups,
|
|
}
|
|
|
|
commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|