85 lines
2.9 KiB
YAML
85 lines
2.9 KiB
YAML
apiVersion: v1
|
|
kind: ConfigMap
|
|
metadata:
|
|
name: querybook-keycloak-auth
|
|
namespace: {{ .Env.QUERYBOOK_NAMESPACE }}
|
|
data:
|
|
keycloak_auth.py: |
|
|
"""
|
|
Keycloak OIDC authentication backend for Querybook
|
|
"""
|
|
from app.auth.oauth_auth import OAuthLoginManager, OAUTH_CALLBACK_PATH
|
|
from env import QuerybookSettings
|
|
from lib.logger import get_logger
|
|
from logic.user import get_user_by_name, create_user
|
|
|
|
LOG = get_logger(__file__)
|
|
|
|
|
|
class KeycloakLoginManager(OAuthLoginManager):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._current_user_groups = []
|
|
|
|
@property
|
|
def oauth_config(self):
|
|
return {
|
|
"callback_url": "{}{}".format(
|
|
QuerybookSettings.PUBLIC_URL, OAUTH_CALLBACK_PATH
|
|
),
|
|
"client_id": QuerybookSettings.OAUTH_CLIENT_ID,
|
|
"client_secret": QuerybookSettings.OAUTH_CLIENT_SECRET,
|
|
"authorization_url": QuerybookSettings.OAUTH_AUTHORIZATION_URL,
|
|
"token_url": QuerybookSettings.OAUTH_TOKEN_URL,
|
|
"profile_url": QuerybookSettings.OAUTH_USER_PROFILE,
|
|
"scope": ["openid", "email", "profile"],
|
|
}
|
|
|
|
def _parse_user_profile(self, resp):
|
|
"""Parse standard OIDC UserInfo response from Keycloak"""
|
|
user = resp.json()
|
|
username = user.get("preferred_username") or user.get("email", "").split("@")[0]
|
|
email = user.get("email", "")
|
|
|
|
# Store groups for role synchronization
|
|
self._current_user_groups = user.get("groups", [])
|
|
LOG.info(f"User {username} groups: {self._current_user_groups}")
|
|
|
|
return username, email
|
|
|
|
def login_user(self, username, email, session=None):
|
|
"""Override login_user - using default Querybook behavior
|
|
|
|
Note: Querybook automatically makes the first user an admin via
|
|
create_admin_when_no_admin() function. Additional users can be
|
|
granted admin access through Querybook's UI or database.
|
|
"""
|
|
from .utils import AuthenticationError
|
|
|
|
if not username or not isinstance(username, str):
|
|
raise AuthenticationError("Please provide a valid username")
|
|
|
|
user = get_user_by_name(username, session=session)
|
|
if not user:
|
|
user = create_user(
|
|
username=username, fullname=username, email=email, session=session
|
|
)
|
|
|
|
# Log group membership for debugging
|
|
LOG.info(f"User {username} Keycloak groups: {self._current_user_groups}")
|
|
|
|
return user
|
|
|
|
|
|
login_manager = KeycloakLoginManager()
|
|
|
|
ignore_paths = [OAUTH_CALLBACK_PATH]
|
|
|
|
|
|
def init_app(app):
|
|
login_manager.init_app(app)
|
|
|
|
|
|
def login(request):
|
|
return login_manager.login(request)
|