6ae605160c
Replace dc=scadabridge,dc=local with dc=zb,dc=local in all dev/test LDAP references — app config, docker test-cluster node configs (docker/ and docker-env2/), GLAuth fixture, dev tooling, Host.Tests fixtures, IntegrationTests factory, and operational test_infra docs. OU structure (ou=SCADA-Admins,ou=users,etc.) preserved throughout. Email domains (@scadabridge.local), hostnames, and container names are untouched. Historical plan docs (2026-05-24-second-environment.md, 2026-05-31-folder-repo-rename-scadabridge-design.md) excluded as point-in-time records. No synthetic dc=example,dc=com placeholders touched.
232 lines
8.4 KiB
Python
232 lines
8.4 KiB
Python
#!/usr/bin/env python3
|
|
"""LDAP client tool for ScadaBridge test infrastructure."""
|
|
|
|
import argparse
|
|
import sys
|
|
|
|
from ldap3 import Server, Connection, NONE, SUBTREE, SIMPLE
|
|
|
|
|
|
DEFAULT_HOST = "localhost"
|
|
DEFAULT_PORT = 3893
|
|
DEFAULT_BASE_DN = "dc=zb,dc=local"
|
|
# GLAuth places users under ou=<PrimaryGroupName>,ou=users,dc=...
|
|
# The admin user (primarygroup SCADA-Admins) needs search capabilities in config.
|
|
DEFAULT_BIND_DN = "cn=admin,ou=SCADA-Admins,ou=users,dc=zb,dc=local"
|
|
DEFAULT_BIND_PASSWORD = "password"
|
|
|
|
|
|
def make_connection(args):
|
|
"""Create and return an authenticated LDAP connection."""
|
|
server = Server(args.host, port=args.port, get_info=NONE)
|
|
bind_dn = getattr(args, "bind_dn", DEFAULT_BIND_DN) or DEFAULT_BIND_DN
|
|
bind_password = getattr(args, "bind_password", DEFAULT_BIND_PASSWORD) or DEFAULT_BIND_PASSWORD
|
|
conn = Connection(server, user=bind_dn, password=bind_password, authentication=SIMPLE, auto_bind=True)
|
|
return conn
|
|
|
|
|
|
def cmd_check(args):
|
|
"""Test LDAP connectivity and base DN search."""
|
|
try:
|
|
conn = make_connection(args)
|
|
print(f"Connected to: {args.host}:{args.port}")
|
|
print(f"Bind DN: {DEFAULT_BIND_DN}")
|
|
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=*)", search_scope=SUBTREE, attributes=["dn"])
|
|
print(f"Base DN search ({DEFAULT_BASE_DN}): {len(conn.entries)} entries found")
|
|
|
|
for entry in conn.entries:
|
|
print(f" {entry.entry_dn}")
|
|
|
|
conn.unbind()
|
|
print("\nLDAP server is healthy.")
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_bind(args):
|
|
"""Test user authentication via bind.
|
|
|
|
GLAuth DN format: cn=<user>,ou=<PrimaryGroup>,ou=users,dc=zb,dc=local
|
|
Since we don't know the user's primary group upfront, we search for the user first
|
|
to discover the full DN, then rebind with that DN.
|
|
"""
|
|
try:
|
|
# First, use admin to find the user's actual DN
|
|
admin_conn = make_connection(args)
|
|
admin_conn.search(
|
|
DEFAULT_BASE_DN,
|
|
f"(cn={args.user})",
|
|
search_scope=SUBTREE,
|
|
attributes=["cn"],
|
|
)
|
|
|
|
if not admin_conn.entries:
|
|
print(f"User not found: {args.user}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Find the user entry (skip group entries that also match cn)
|
|
user_dn = None
|
|
for entry in admin_conn.entries:
|
|
dn = entry.entry_dn
|
|
if dn.startswith(f"cn={args.user},"):
|
|
user_dn = dn
|
|
break
|
|
|
|
admin_conn.unbind()
|
|
|
|
if not user_dn:
|
|
print(f"Could not resolve DN for user: {args.user}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
# Now bind with the discovered DN
|
|
server = Server(args.host, port=args.port, get_info=NONE)
|
|
conn = Connection(server, user=user_dn, password=args.password, authentication=SIMPLE, auto_bind=True)
|
|
print(f"Bind successful: {user_dn}")
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Bind failed for {args.user}: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_search(args):
|
|
"""Search with arbitrary LDAP filter."""
|
|
try:
|
|
conn = make_connection(args)
|
|
base = args.base or DEFAULT_BASE_DN
|
|
conn.search(base, args.filter, search_scope=SUBTREE, attributes=["*"])
|
|
|
|
print(f"Filter: {args.filter}")
|
|
print(f"Base: {base}")
|
|
print(f"Results: {len(conn.entries)}\n")
|
|
|
|
for entry in conn.entries:
|
|
print(f"DN: {entry.entry_dn}")
|
|
for attr in entry.entry_attributes:
|
|
print(f" {attr}: {entry[attr].value}")
|
|
print()
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_users(args):
|
|
"""List all users with group memberships."""
|
|
try:
|
|
conn = make_connection(args)
|
|
|
|
# Find all groups to map GIDs and memberships
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
|
|
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid"])
|
|
groups_by_gid = {}
|
|
groups_with_members = {}
|
|
for entry in conn.entries:
|
|
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else None
|
|
cn = entry["cn"].value
|
|
if gid:
|
|
groups_by_gid[int(gid)] = cn
|
|
member_uids = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
|
|
if isinstance(member_uids, str):
|
|
member_uids = [member_uids]
|
|
groups_with_members[cn] = member_uids
|
|
|
|
# Find all users
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixAccount)",
|
|
search_scope=SUBTREE, attributes=["cn", "mail", "gidNumber"])
|
|
|
|
print("Users:")
|
|
print(f"{'Username':<20} {'Email':<35} {'Primary Group':<25} {'Additional Groups'}")
|
|
print("-" * 110)
|
|
|
|
for entry in conn.entries:
|
|
cn = entry["cn"].value
|
|
mail = entry["mail"].value if "mail" in entry.entry_attributes else ""
|
|
gid = int(entry["gidNumber"].value) if "gidNumber" in entry.entry_attributes else None
|
|
primary = groups_by_gid.get(gid, str(gid)) if gid else ""
|
|
|
|
# Find additional groups via memberUid
|
|
additional = []
|
|
for group_name, members in groups_with_members.items():
|
|
if cn in members and group_name != primary:
|
|
additional.append(group_name)
|
|
|
|
print(f"{cn:<20} {mail:<35} {primary:<25} {', '.join(additional)}")
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def cmd_groups(args):
|
|
"""List all groups with member counts."""
|
|
try:
|
|
conn = make_connection(args)
|
|
conn.search(DEFAULT_BASE_DN, "(objectClass=posixGroup)",
|
|
search_scope=SUBTREE, attributes=["cn", "gidNumber", "memberUid", "description"])
|
|
|
|
print("Groups:")
|
|
print(f"{'Group':<25} {'GID':<10} {'Members'}")
|
|
print("-" * 70)
|
|
|
|
for entry in conn.entries:
|
|
cn = entry["cn"].value
|
|
gid = entry["gidNumber"].value if "gidNumber" in entry.entry_attributes else ""
|
|
members = entry["memberUid"].value if "memberUid" in entry.entry_attributes else []
|
|
if isinstance(members, str):
|
|
members = [members]
|
|
member_list = ", ".join(members) if members else "(none)"
|
|
print(f"{cn:<25} {gid:<10} {member_list}")
|
|
|
|
conn.unbind()
|
|
except Exception as e:
|
|
print(f"Error: {e}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="LDAP client tool for ScadaBridge test infrastructure")
|
|
parser.add_argument("--host", default=DEFAULT_HOST, help=f"LDAP host (default: {DEFAULT_HOST})")
|
|
parser.add_argument("--port", type=int, default=DEFAULT_PORT, help=f"LDAP port (default: {DEFAULT_PORT})")
|
|
|
|
sub = parser.add_subparsers(dest="command", required=True)
|
|
|
|
sub.add_parser("check", help="Test LDAP connectivity")
|
|
|
|
bind_p = sub.add_parser("bind", help="Test user authentication")
|
|
bind_p.add_argument("--user", required=True, help="Username (e.g. designer)")
|
|
bind_p.add_argument("--password", default="password", help="Password (default: password)")
|
|
|
|
search_p = sub.add_parser("search", help="Search with LDAP filter")
|
|
search_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
search_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
search_p.add_argument("--filter", required=True, help="LDAP filter")
|
|
search_p.add_argument("--base", help=f"Search base (default: {DEFAULT_BASE_DN})")
|
|
|
|
users_p = sub.add_parser("users", help="List all users with group memberships")
|
|
users_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
users_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
|
|
groups_p = sub.add_parser("groups", help="List all groups with members")
|
|
groups_p.add_argument("--bind-dn", default=DEFAULT_BIND_DN, help="Bind DN")
|
|
groups_p.add_argument("--bind-password", default=DEFAULT_BIND_PASSWORD, help="Bind password")
|
|
|
|
args = parser.parse_args()
|
|
|
|
commands = {
|
|
"check": cmd_check,
|
|
"bind": cmd_bind,
|
|
"search": cmd_search,
|
|
"users": cmd_users,
|
|
"groups": cmd_groups,
|
|
}
|
|
|
|
commands[args.command](args)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|