Files
ScadaBridge/infra/tools/ldap_tool.py
T
Joseph Doherty 6ae605160c chore(auth): ScadaBridge unify dev LDAP base DN to dc=zb,dc=local (Task 1.6)
Replace dc=scadabridge,dc=local with dc=zb,dc=local in all dev/test LDAP
references — app config, docker test-cluster node configs (docker/ and
docker-env2/), GLAuth fixture, dev tooling, Host.Tests fixtures,
IntegrationTests factory, and operational test_infra docs. OU structure
(ou=SCADA-Admins,ou=users,etc.) preserved throughout. Email domains
(@scadabridge.local), hostnames, and container names are untouched.
Historical plan docs (2026-05-24-second-environment.md,
2026-05-31-folder-repo-rename-scadabridge-design.md) excluded as
point-in-time records. No synthetic dc=example,dc=com placeholders touched.
2026-06-02 06:54:14 -04:00

232 lines
8.4 KiB
Python

#!/usr/bin/env python3
"""LDAP client tool for ScadaBridge test infrastructure."""
import argparse
import sys
from ldap3 import Server, Connection, NONE, SUBTREE, SIMPLE
DEFAULT_HOST = "localhost"
DEFAULT_PORT = 3893
DEFAULT_BASE_DN = "dc=zb,dc=local"
# GLAuth places users under ou=<PrimaryGroupName>,ou=users,dc=...
# The admin user (primarygroup SCADA-Admins) needs search capabilities in config.
DEFAULT_BIND_DN = "cn=admin,ou=SCADA-Admins,ou=users,dc=zb,dc=local"
DEFAULT_BIND_PASSWORD = "password"
def make_connection(args):
"""Create and return an authenticated LDAP connection."""
server = Server(args.host, port=args.port, get_info=NONE)
bind_dn = getattr(args, "bind_dn", DEFAULT_BIND_DN) or DEFAULT_BIND_DN
bind_password = getattr(args, "bind_password", DEFAULT_BIND_PASSWORD) or DEFAULT_BIND_PASSWORD
conn = Connection(server, user=bind_dn, password=bind_password, authentication=SIMPLE, auto_bind=True)
return conn
def cmd_check(args):
"""Test LDAP connectivity and base DN search."""
try:
conn = make_connection(args)
print(f"Connected to: {args.host}:{args.port}")
print(f"Bind DN: {DEFAULT_BIND_DN}")
conn.search(DEFAULT_BASE_DN, "(objectClass=*)", search_scope=SUBTREE, attributes=["dn"])
print(f"Base DN search ({DEFAULT_BASE_DN}): {len(conn.entries)} entries found")
for entry in conn.entries:
print(f" {entry.entry_dn}")
conn.unbind()
print("\nLDAP server is healthy.")
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cmd_bind(args):
"""Test user authentication via bind.
GLAuth DN format: cn=<user>,ou=<PrimaryGroup>,ou=users,dc=zb,dc=local
Since we don't know the user's primary group upfront, we search for the user first
to discover the full DN, then rebind with that DN.
"""
try:
# First, use admin to find the user's actual DN
admin_conn = make_connection(args)
admin_conn.search(
DEFAULT_BASE_DN,
f"(cn={args.user})",
search_scope=SUBTREE,
attributes=["cn"],
)
if not admin_conn.entries:
print(f"User not found: {args.user}", file=sys.stderr)
sys.exit(1)
# Find the user entry (skip group entries that also match cn)
user_dn = None
for entry in admin_conn.entries:
dn = entry.entry_dn
if dn.startswith(f"cn={args.user},"):
user_dn = dn
break
admin_conn.unbind()
if not user_dn:
print(f"Could not resolve DN for user: {args.user}", file=sys.stderr)
sys.exit(1)
# Now bind with the discovered DN
server = Server(args.host, port=args.port, get_info=NONE)
conn = Connection(server, user=user_dn, password=args.password, authentication=SIMPLE, auto_bind=True)
print(f"Bind successful: {user_dn}")
conn.unbind()
except Exception as e:
print(f"Bind failed for {args.user}: {e}", file=sys.stderr)
sys.exit(1)
def cmd_search(args):
"""Search with arbitrary LDAP filter."""
try:
conn = make_connection(args)
base = args.base or DEFAULT_BASE_DN
conn.search(base, args.filter, search_scope=SUBTREE, attributes=["*"])
print(f"Filter: {args.filter}")
print(f"Base: {base}")
print(f"Results: {len(conn.entries)}\n")
for entry in conn.entries:
print(f"DN: {entry.entry_dn}")
for attr in entry.entry_attributes:
print(f" {attr}: {entry[attr].value}")
print()
conn.unbind()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cmd_users(args):
"""List all users with group memberships."""
try:
conn = make_connection(args)
# Find all groups to map GIDs and memberships
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid"])
groups_by_gid = {}
groups_with_members = {}
for entry in conn.entries:
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else None
cn = entry["cn"].value
if gid:
groups_by_gid[int(gid)] = cn
member_uids = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
if isinstance(member_uids, str):
member_uids = [member_uids]
groups_with_members[cn] = member_uids
# Find all users
conn.search(DEFAULT_BASE_DN, "(objectClass=posixAccount)",
search_scope=SUBTREE, attributes=["cn", "mail", "gidNumber"])
print("Users:")
print(f"{'Username':<20} {'Email':<35} {'Primary Group':<25} {'Additional Groups'}")
print("-" * 110)
for entry in conn.entries:
cn = entry["cn"].value
mail = entry["mail"].value if "mail" in entry.entry_attributes else ""
gid = int(entry["gidNumber"].value) if "gidNumber" in entry.entry_attributes else None
primary = groups_by_gid.get(gid, str(gid)) if gid else ""
# Find additional groups via memberUid
additional = []
for group_name, members in groups_with_members.items():
if cn in members and group_name != primary:
additional.append(group_name)
print(f"{cn:<20} {mail:<35} {primary:<25} {', '.join(additional)}")
conn.unbind()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def cmd_groups(args):
"""List all groups with member counts."""
try:
conn = make_connection(args)
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid", "description"])
print("Groups:")
print(f"{'Group':<25} {'GID':<10} {'Members'}")
print("-" * 70)
for entry in conn.entries:
cn = entry["cn"].value
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else ""
members = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
if isinstance(members, str):
members = [members]
member_list = ", ".join(members) if members else "(none)"
print(f"{cn:<25} {gid:<10} {member_list}")
conn.unbind()
except Exception as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
def main():
parser = argparse.ArgumentParser(description="LDAP client tool for ScadaBridge test infrastructure")
parser.add_argument("--host", default=DEFAULT_HOST, help=f"LDAP host (default: {DEFAULT_HOST})")
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"LDAP port (default: {DEFAULT_PORT})")
sub = parser.add_subparsers(dest="command", required=True)
sub.add_parser("check", help="Test LDAP connectivity")
bind_p = sub.add_parser("bind", help="Test user authentication")
bind_p.add_argument("--user", required=True, help="Username (e.g. designer)")
bind_p.add_argument("--password", default="password", help="Password (default: password)")
search_p = sub.add_parser("search", help="Search with LDAP filter")
search_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
search_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
search_p.add_argument("--filter", required=True, help="LDAP filter")
search_p.add_argument("--base", help=f"Search base (default: {DEFAULT_BASE_DN})")
users_p = sub.add_parser("users", help="List all users with group memberships")
users_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
users_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
groups_p = sub.add_parser("groups", help="List all groups with members")
groups_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
groups_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
args = parser.parse_args()
commands = {
"check": cmd_check,
"bind": cmd_bind,
"search": cmd_search,
"users": cmd_users,
"groups": cmd_groups,
}
commands[args.command](args)
if __name__ == "__main__":
main()