#!/usr/bin/env python3 """LDAP client tool for ScadaLink test infrastructure.""" import argparse import sys from ldap3 import Server, Connection, NONE, SUBTREE, SIMPLE DEFAULT_HOST = "localhost" DEFAULT_PORT = 3893 DEFAULT_BASE_DN = "dc=scadalink,dc=local" # GLAuth places users under ou=,ou=users,dc=... # The admin user (primarygroup SCADA-Admins) needs search capabilities in config. DEFAULT_BIND_DN = "cn=admin,ou=SCADA-Admins,ou=users,dc=scadalink,dc=local" DEFAULT_BIND_PASSWORD = "password" def make_connection(args): """Create and return an authenticated LDAP connection.""" server = Server(args.host, port=args.port, get_info=NONE) bind_dn = getattr(args, "bind_dn", DEFAULT_BIND_DN) or DEFAULT_BIND_DN bind_password = getattr(args, "bind_password", DEFAULT_BIND_PASSWORD) or DEFAULT_BIND_PASSWORD conn = Connection(server, user=bind_dn, password=bind_password, authentication=SIMPLE, auto_bind=True) return conn def cmd_check(args): """Test LDAP connectivity and base DN search.""" try: conn = make_connection(args) print(f"Connected to: {args.host}:{args.port}") print(f"Bind DN: {DEFAULT_BIND_DN}") conn.search(DEFAULT_BASE_DN, "(objectClass=*)", search_scope=SUBTREE, attributes=["dn"]) print(f"Base DN search ({DEFAULT_BASE_DN}): {len(conn.entries)} entries found") for entry in conn.entries: print(f" {entry.entry_dn}") conn.unbind() print("\nLDAP server is healthy.") except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cmd_bind(args): """Test user authentication via bind. GLAuth DN format: cn=,ou=,ou=users,dc=scadalink,dc=local Since we don't know the user's primary group upfront, we search for the user first to discover the full DN, then rebind with that DN. """ try: # First, use admin to find the user's actual DN admin_conn = make_connection(args) admin_conn.search( DEFAULT_BASE_DN, f"(cn={args.user})", search_scope=SUBTREE, attributes=["cn"], ) if not admin_conn.entries: print(f"User not found: {args.user}", file=sys.stderr) sys.exit(1) # Find the user entry (skip group entries that also match cn) user_dn = None for entry in admin_conn.entries: dn = entry.entry_dn if dn.startswith(f"cn={args.user},"): user_dn = dn break admin_conn.unbind() if not user_dn: print(f"Could not resolve DN for user: {args.user}", file=sys.stderr) sys.exit(1) # Now bind with the discovered DN server = Server(args.host, port=args.port, get_info=NONE) conn = Connection(server, user=user_dn, password=args.password, authentication=SIMPLE, auto_bind=True) print(f"Bind successful: {user_dn}") conn.unbind() except Exception as e: print(f"Bind failed for {args.user}: {e}", file=sys.stderr) sys.exit(1) def cmd_search(args): """Search with arbitrary LDAP filter.""" try: conn = make_connection(args) base = args.base or DEFAULT_BASE_DN conn.search(base, args.filter, search_scope=SUBTREE, attributes=["*"]) print(f"Filter: {args.filter}") print(f"Base: {base}") print(f"Results: {len(conn.entries)}\n") for entry in conn.entries: print(f"DN: {entry.entry_dn}") for attr in entry.entry_attributes: print(f" {attr}: {entry[attr].value}") print() conn.unbind() except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cmd_users(args): """List all users with group memberships.""" try: conn = make_connection(args) # Find all groups to map GIDs and memberships conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)", search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid"]) groups_by_gid = {} groups_with_members = {} for entry in conn.entries: gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else None cn = entry["cn"].value if gid: groups_by_gid[int(gid)] = cn member_uids = entry["memberUid"].value if "memberUid" in entry.entry_attributes else [] if isinstance(member_uids, str): member_uids = [member_uids] groups_with_members[cn] = member_uids # Find all users conn.search(DEFAULT_BASE_DN, "(objectClass=posixAccount)", search_scope=SUBTREE, attributes=["cn", "mail", "gidNumber"]) print("Users:") print(f"{'Username':<20} {'Email':<35} {'Primary Group':<25} {'Additional Groups'}") print("-" * 110) for entry in conn.entries: cn = entry["cn"].value mail = entry["mail"].value if "mail" in entry.entry_attributes else "" gid = int(entry["gidNumber"].value) if "gidNumber" in entry.entry_attributes else None primary = groups_by_gid.get(gid, str(gid)) if gid else "" # Find additional groups via memberUid additional = [] for group_name, members in groups_with_members.items(): if cn in members and group_name != primary: additional.append(group_name) print(f"{cn:<20} {mail:<35} {primary:<25} {', '.join(additional)}") conn.unbind() except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def cmd_groups(args): """List all groups with member counts.""" try: conn = make_connection(args) conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)", search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid", "description"]) print("Groups:") print(f"{'Group':<25} {'GID':<10} {'Members'}") print("-" * 70) for entry in conn.entries: cn = entry["cn"].value gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else "" members = entry["memberUid"].value if "memberUid" in entry.entry_attributes else [] if isinstance(members, str): members = [members] member_list = ", ".join(members) if members else "(none)" print(f"{cn:<25} {gid:<10} {member_list}") conn.unbind() except Exception as e: print(f"Error: {e}", file=sys.stderr) sys.exit(1) def main(): parser = argparse.ArgumentParser(description="LDAP client tool for ScadaLink test infrastructure") parser.add_argument("--host", default=DEFAULT_HOST, help=f"LDAP host (default: {DEFAULT_HOST})") parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"LDAP port (default: {DEFAULT_PORT})") sub = parser.add_subparsers(dest="command", required=True) sub.add_parser("check", help="Test LDAP connectivity") bind_p = sub.add_parser("bind", help="Test user authentication") bind_p.add_argument("--user", required=True, help="Username (e.g. designer)") bind_p.add_argument("--password", default="password", help="Password (default: password)") search_p = sub.add_parser("search", help="Search with LDAP filter") search_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN") search_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password") search_p.add_argument("--filter", required=True, help="LDAP filter") search_p.add_argument("--base", help=f"Search base (default: {DEFAULT_BASE_DN})") users_p = sub.add_parser("users", help="List all users with group memberships") users_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN") users_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password") groups_p = sub.add_parser("groups", help="List all groups with members") groups_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN") groups_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password") args = parser.parse_args() commands = { "check": cmd_check, "bind": cmd_bind, "search": cmd_search, "users": cmd_users, "groups": cmd_groups, } commands[args.command](args) if __name__ == "__main__": main()