apiVersion: v1 kind: ConfigMap metadata: name: querybook-keycloak-auth namespace: {{ .Env.QUERYBOOK_NAMESPACE }} data: keycloak_auth.py: | """ Keycloak OIDC authentication backend for Querybook """ from app.auth.oauth_auth import OAuthLoginManager, OAUTH_CALLBACK_PATH from env import QuerybookSettings from lib.logger import get_logger from logic.user import get_user_by_name, create_user LOG = get_logger(__file__) class KeycloakLoginManager(OAuthLoginManager): def __init__(self): super().__init__() self._current_user_groups = [] @property def oauth_config(self): return { "callback_url": "{}{}".format( QuerybookSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH ), "client_id": QuerybookSettings.OAUTH_CLIENT_ID, "client_secret": QuerybookSettings.OAUTH_CLIENT_SECRET, "authorization_url": QuerybookSettings.OAUTH_AUTHORIZATION_URL, "token_url": QuerybookSettings.OAUTH_TOKEN_URL, "profile_url": QuerybookSettings.OAUTH_USER_PROFILE, "scope": ["openid", "email", "profile"], } def _parse_user_profile(self, resp): """Parse standard OIDC UserInfo response from Keycloak""" user = resp.json() username = user.get("preferred_username") or user.get("email", "").split("@")[0] email = user.get("email", "") # Store groups for role synchronization self._current_user_groups = user.get("groups", []) LOG.info(f"User {username} groups: {self._current_user_groups}") return username, email def login_user(self, username, email, session=None): """Override login_user - using default Querybook behavior Note: Querybook automatically makes the first user an admin via create_admin_when_no_admin() function. Additional users can be granted admin access through Querybook's UI or database. """ from .utils import AuthenticationError if not username or not isinstance(username, str): raise AuthenticationError("Please provide a valid username") user = get_user_by_name(username, session=session) if not user: user = create_user( username=username, fullname=username, email=email, session=session ) # Log group membership for debugging LOG.info(f"User {username} Keycloak groups: {self._current_user_groups}") return user login_manager = KeycloakLoginManager() ignore_paths = [OAUTH_CALLBACK_PATH] def init_app(app): login_manager.init_app(app) def login(request): return login_manager.login(request)