feat(jupyterhub): install buunstack package to kernel image

This commit is contained in:
Masaki Yatsu
2025-08-31 20:43:27 +09:00
parent ddf867d1f1
commit 2480ebae82
15 changed files with 1192 additions and 270 deletions

View File

@@ -0,0 +1,810 @@
"""
Secrets management for JupyterHub with Vault backend
"""
import logging
import os
import threading
import warnings
from datetime import datetime, timedelta
from typing import Any, overload
import hvac
import jwt
import requests
# Suppress SSL warnings for self-signed certificates
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
# Set up logging (disabled by default)
logger = logging.getLogger("buunstack")
logger.addHandler(logging.NullHandler()) # Default to no output
class SecretStore:
"""Simple and powerful secrets management for JupyterHub with Vault backend.
SecretStore provides a secure interface for managing secrets in JupyterHub
environments using HashiCorp Vault as the backend storage. It supports
automatic OIDC token refresh via Keycloak integration and provides both
manual and background token management options.
Attributes
----------
auto_token_refresh : bool
Whether automatic token refresh is enabled.
refresh_buffer_seconds : int
Seconds before token expiry to trigger refresh.
background_refresh_interval : int
Seconds between background refresh checks.
username : str or None
JupyterHub username from environment.
vault_addr : str or None
Vault server address from environment.
base_path : str
Base path for user's secrets in Vault.
Examples
--------
>>> secrets = SecretStore()
>>> secrets.put('api-keys', openai='sk-123', github='ghp-456')
'jupyter/users/username/api-keys'
>>> data = secrets.get('api-keys')
>>> print(data['openai'])
'sk-123'
>>> # Or get specific field directly
>>> openai_key = secrets.get('api-keys', field='openai')
>>> print(openai_key)
'sk-123'
"""
def __init__(
self,
auto_token_refresh: bool = True,
refresh_buffer_seconds: int = 300,
background_refresh_interval: int = 1800,
):
"""
Initialize SecretStore with authentication and configuration.
Parameters
----------
auto_token_refresh : bool, optional
Enable automatic token refresh using Keycloak OIDC, by default True.
Requires KEYCLOAK_HOST, KEYCLOAK_REALM, and JUPYTERHUB_OIDC_REFRESH_TOKEN
environment variables.
refresh_buffer_seconds : int, optional
Seconds before token expiry to trigger refresh, by default 300.
Only used when auto_token_refresh is True.
background_refresh_interval : int, optional
Seconds between background refresh checks, by default 1800.
Only used when background refresh is started.
Raises
------
ValueError
If required environment variables are missing:
- JUPYTERHUB_USER: JupyterHub username
- VAULT_ADDR: Vault server address
- JUPYTERHUB_OIDC_ACCESS_TOKEN: Initial access token
- KEYCLOAK_HOST, KEYCLOAK_REALM: Required for auto_token_refresh
ConnectionError
If unable to connect to Vault server or authenticate.
Examples
--------
>>> # Basic usage with auto-refresh
>>> secrets = SecretStore()
>>> # Manual token management
>>> secrets = SecretStore(auto_token_refresh=False)
>>> # Custom timing
>>> secrets = SecretStore(
... refresh_buffer_seconds=600,
... background_refresh_interval=3600
... )
"""
self.auto_token_refresh = auto_token_refresh
self.refresh_buffer_seconds = refresh_buffer_seconds
self.background_refresh_interval = background_refresh_interval
# User and environment info
self.username = os.getenv("JUPYTERHUB_USER")
self.vault_addr = os.getenv("VAULT_ADDR")
# Keycloak configuration (only needed if auto_token_refresh is enabled)
if self.auto_token_refresh:
self.keycloak_host = os.getenv("KEYCLOAK_HOST")
self.keycloak_realm = os.getenv("KEYCLOAK_REALM")
self.keycloak_client_id = os.getenv("KEYCLOAK_CLIENT_ID", "jupyterhub")
self.refresh_token = os.getenv("JUPYTERHUB_OIDC_REFRESH_TOKEN")
# Token management
self.access_token = os.getenv("JUPYTERHUB_OIDC_ACCESS_TOKEN")
self.token_expiry = (
self._get_token_expiry(self.access_token) if self.access_token else None
)
# Initialize Vault client
self.client = hvac.Client(url=self.vault_addr, verify=False)
# Background refresher
self._background_refresher = None
# Authenticate initially
self._authenticate_vault()
# Set base path for user storage
self.base_path = f"jupyter/users/{self.username}"
logger.info(f"SecretStore initialized for user: {self.username}")
logger.info(
f"Auto token refresh: {'enabled' if self.auto_token_refresh else 'disabled'}"
)
if self.auto_token_refresh and self.token_expiry:
logger.info(f"Token expires at: {self.token_expiry}")
def _get_token_expiry(self, token: str) -> datetime | None:
"""Extract expiry time from JWT token"""
if not token:
return None
try:
payload = jwt.decode(token, options={"verify_signature": False})
exp = payload.get("exp")
if exp:
return datetime.fromtimestamp(exp)
# Fallback to iat + 1 hour
iat = payload.get("iat")
if iat:
return datetime.fromtimestamp(iat + 3600)
except Exception as e:
logger.warning(f"Could not decode token expiry: {e}")
return datetime.now() + timedelta(hours=1)
def _is_token_valid(self) -> bool:
"""Check if current token is still valid"""
if not self.auto_token_refresh or not self.token_expiry:
return True # Assume valid if refresh is disabled
time_until_expiry = (self.token_expiry - datetime.now()).total_seconds()
return time_until_expiry > self.refresh_buffer_seconds
def _refresh_keycloak_tokens(self) -> bool:
"""Refresh tokens using Keycloak refresh token"""
if not self.auto_token_refresh:
return False
if not self.refresh_token or not self.keycloak_host or not self.keycloak_realm:
logger.error("Missing refresh token or Keycloak configuration")
return False
token_url = f"https://{self.keycloak_host}/realms/{self.keycloak_realm}/protocol/openid-connect/token"
try:
logger.info("Refreshing tokens from Keycloak...")
response = requests.post(
token_url,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token,
"client_id": self.keycloak_client_id,
},
verify=False,
)
if response.status_code == 200:
tokens = response.json()
# Update tokens
self.access_token = tokens["access_token"]
if "refresh_token" in tokens:
self.refresh_token = tokens["refresh_token"]
# Update environment variables
os.environ["JUPYTERHUB_OIDC_ACCESS_TOKEN"] = self.access_token
if "refresh_token" in tokens:
os.environ["JUPYTERHUB_OIDC_REFRESH_TOKEN"] = self.refresh_token
# Update token expiry
self.token_expiry = self._get_token_expiry(self.access_token)
logger.info("✅ Tokens refreshed successfully")
return True
else:
logger.error(
f"Token refresh failed: {response.status_code} - {response.text}"
)
return False
except Exception as e:
logger.error(f"Exception during token refresh: {e}")
return False
def _authenticate_vault(self):
"""Authenticate with Vault using current access token"""
if not self.access_token:
raise ValueError("No access token available")
try:
self.client.auth.jwt.jwt_login(
role="jupyter-token", jwt=self.access_token, path="jwt"
)
logger.info("✅ Authenticated with Vault successfully")
except Exception as e:
logger.error(f"Vault authentication failed: {e}")
raise
def _ensure_authenticated(self):
"""Ensure we have valid tokens and Vault authentication"""
if self.auto_token_refresh and not self._is_token_valid():
logger.info("Token invalid or expiring soon")
if self._refresh_keycloak_tokens():
self._authenticate_vault()
else:
raise Exception(
"Failed to refresh tokens. Manual re-authentication required."
)
def put(self, key: str, **kwargs: Any) -> str:
"""
Store data in your personal secret storage.
Saves the provided key-value pairs to Vault under the specified key.
Values must be strings. For complex data types, encode them as JSON strings.
Parameters
----------
key : Any
The key/name for the secret. Must be a valid Vault path component.
**kwargs : str
Key-value pairs to store as the secret data. All values must be strings.
For complex types, encode them as JSON strings first.
Returns
-------
str
Full Vault path where the secret was stored.
Raises
------
ValueError
If key is empty or contains invalid characters, if no kwargs provided,
or if any value is not a string.
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.Forbidden
If authentication fails or insufficient permissions.
hvac.exceptions.InvalidRequest
If the data format is invalid.
Examples
--------
>>> import json
>>> secrets = SecretStore()
>>> path = secrets.put('api-keys', openai='sk-123', github='ghp-456')
>>> print(path)
'jupyter/users/username/api-keys'
>>> # Store complex data as JSON strings
>>> config_data = {'debug': True, 'max_workers': 4}
>>> secrets.put('config',
... settings=json.dumps(config_data),
... endpoints=json.dumps(['api.example.com']))
"""
if not kwargs:
raise ValueError("At least one key-value pair must be provided")
# Validate all values are strings
for field_name, value in kwargs.items():
if not isinstance(value, str):
raise ValueError(
f"Value for '{field_name}' must be a string. "
f"Got {type(value).__name__}. "
"For complex types, encode as JSON string first."
)
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
self.client.secrets.kv.v2.create_or_update_secret(
path=path, secret=kwargs, mount_point="secret"
)
logger.info(f"Put secret: {key}")
return path
except Exception as e:
logger.error(f"Failed to put secret: {e}")
# Retry once with re-authentication
self._ensure_authenticated()
self.client.secrets.kv.v2.create_or_update_secret(
path=path, secret=kwargs, mount_point="secret"
)
return path
@overload
def get(self, key: str, field: None = None) -> dict[str, Any] | None: ...
@overload
def get(self, key: str, field: str) -> str | None: ...
def get(self, key: str, field: str | None = None) -> dict[str, Any] | str | None:
"""
Retrieve data from your personal secret storage.
Loads the data dictionary stored under the specified key from Vault.
If field is specified, returns only that field's value. Returns None
if the key doesn't exist or if there's an access error.
Parameters
----------
key : str
The key/name of the secret to retrieve.
field : str, optional
Specific field to retrieve from the secret. If provided, returns
only the value of this field instead of the entire secret dict.
Returns
-------
dict[str, Any] or str or None
- If field is None: The complete stored data dictionary if found, None otherwise.
- If field is specified: The value of the specified field, or None if
field doesn't exist or secret is not found.
Raises
------
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.InvalidRequest
If the key format is invalid.
Examples
--------
>>> secrets = SecretStore()
>>> # Get entire secret
>>> api_keys = secrets.get('api-keys')
>>> if api_keys:
... openai_key = api_keys['openai']
... print(f'OpenAI key: {openai_key}')
>>> # Get specific field (like vault kv get -field=...)
>>> openai_key = secrets.get('api-keys', field='openai')
>>> print(f'OpenAI key: {openai_key}')
>>> # Handle missing keys or fields
>>> config = secrets.get('nonexistent-key')
>>> if config is None:
... print('Key not found')
>>> missing_field = secrets.get('api-keys', field='nonexistent')
>>> if missing_field is None:
... print('Field not found')
"""
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
logger.info(f"Got secret: {key}")
# Return specific field if requested
if field is not None:
return data.get(field)
return data
return None
except Exception as e:
if "permission denied" in str(e).lower():
logger.info("Permission denied, re-authenticating...")
self._ensure_authenticated()
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
if field is not None:
return data.get(field)
return data
logger.warning(f'Could not get secret "{key}": {e}')
return None
def delete(self, key: str) -> None:
"""
Delete a secret from your personal storage.
Permanently removes the secret and all its versions from Vault.
This operation cannot be undone.
Parameters
----------
key : str
The key/name of the secret to delete.
Raises
------
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.Forbidden
If authentication fails or insufficient permissions.
hvac.exceptions.InvalidRequest
If the key format is invalid.
Examples
--------
>>> secrets = SecretStore()
>>> secrets.delete('old-api-key')
>>> # Secret is permanently removed
"""
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path, mount_point="secret"
)
logger.info(f"Deleted secret: {key}")
except Exception as e:
logger.error(f'Failed to delete secret "{key}": {e}')
raise
def list(self) -> list[str]:
"""
List all secret keys in your personal storage.
Returns a list of all secret keys that you have stored in Vault.
Does not include the actual secret values for security reasons.
Returns
-------
list[str]
List of secret keys. Empty list if no secrets found or on error.
Examples
--------
>>> secrets = SecretStore()
>>> keys = secrets.list()
>>> print(f'You have {len(keys)} secrets: {keys}')
['api-keys', 'database-config', 'certificates']
"""
self._ensure_authenticated()
try:
response = self.client.secrets.kv.v2.list_secrets(
path=self.base_path, mount_point="secret"
)
keys = response["data"]["keys"] if response else []
logger.info(f"Listed {len(keys)} secrets")
return keys
except Exception as e:
logger.warning(f"Could not list secrets: {e}")
return []
def get_status(self) -> dict[str, Any]:
"""
Get comprehensive status information about the SecretStore instance.
Returns detailed information about configuration, authentication status,
token validity, and background refresh status.
Returns
-------
dict[str, Any]
Status dictionary containing:
- username: JupyterHub username
- auto_token_refresh: Whether auto-refresh is enabled
- has_access_token: Whether access token is available
- vault_addr: Vault server address
- has_refresh_token: Whether refresh token is available (if auto_token_refresh=True)
- keycloak_configured: Whether Keycloak settings are configured (if auto_token_refresh=True)
- token_expires_at: Token expiration time (if available)
- token_expires_in_seconds: Seconds until token expires (if available)
- background_refresher_running: Whether background refresher is active
Examples
--------
>>> secrets = SecretStore()
>>> status = secrets.get_status()
>>> print(f"User: {status['username']}")
>>> print(f"Token expires in: {status.get('token_expires_in_seconds', 'N/A')} seconds")
"""
status = {
"username": self.username,
"auto_token_refresh": self.auto_token_refresh,
"has_access_token": bool(self.access_token),
"vault_addr": self.vault_addr,
}
if self.auto_token_refresh:
status.update(
{
"has_refresh_token": bool(self.refresh_token),
"keycloak_configured": bool(
self.keycloak_host and self.keycloak_realm
),
}
)
if self.token_expiry:
time_remaining = (self.token_expiry - datetime.now()).total_seconds()
status.update(
{
"token_valid": self._is_token_valid(),
"token_expiry": self.token_expiry.isoformat(),
"seconds_remaining": max(0, time_remaining),
"minutes_remaining": max(0, time_remaining / 60),
}
)
return status
def start_background_refresh(self) -> "BackgroundRefresher":
"""
Start automatic background token refreshing.
Begins a background thread that periodically checks and refreshes
the access token before it expires. Only available when
auto_token_refresh is enabled.
Returns
-------
BackgroundRefresher
The background refresher instance that can be used to monitor
or control the refresh process.
Raises
------
ValueError
If auto_token_refresh is False. Background refresh requires
automatic token refresh to be enabled.
Examples
--------
>>> secrets = SecretStore(auto_token_refresh=True)
>>> refresher = secrets.start_background_refresh()
>>> status = refresher.get_status()
>>> print(f"Background refresh running: {status['running']}")
"""
if not self.auto_token_refresh:
raise ValueError("Background refresh requires auto_token_refresh=True")
if self._background_refresher is None:
self._background_refresher = BackgroundRefresher(
self, interval_seconds=self.background_refresh_interval
)
self._background_refresher.start()
return self._background_refresher
def stop_background_refresh(self) -> None:
"""
Stop the background token refresher.
Stops the background thread that was refreshing tokens automatically.
It's safe to call this method even if no background refresher is running.
Examples
--------
>>> secrets = SecretStore()
>>> refresher = secrets.start_background_refresh()
>>> # ... do some work ...
>>> secrets.stop_background_refresh()
"""
if self._background_refresher:
self._background_refresher.stop()
class BackgroundRefresher:
"""
Background token refresher for automatic token management.
This class runs in a separate daemon thread and periodically checks if
the access token needs to be refreshed, automatically handling the refresh
process to maintain uninterrupted access to Vault.
Attributes
----------
secret_store : SecretStore
The SecretStore instance to refresh tokens for.
interval_seconds : int
Seconds between refresh checks.
refresh_count : int
Number of successful refreshes performed.
last_refresh : datetime or None
Timestamp of the last successful refresh.
Examples
--------
>>> secrets = SecretStore(auto_token_refresh=True)
>>> refresher = secrets.start_background_refresh()
>>> # Refresher runs automatically in background
>>> status = refresher.get_status()
>>> print(f"Refreshes performed: {status['refresh_count']}")
"""
def __init__(self, secret_store: SecretStore, interval_seconds: int = 1800):
"""
Initialize the background refresher.
Parameters
----------
secret_store : SecretStore
The SecretStore instance to manage tokens for.
interval_seconds : int, optional
Seconds between refresh checks, by default 1800 (30 minutes).
"""
self.secret_store = secret_store
self.interval_seconds = interval_seconds
self._stop_event = threading.Event()
self._thread = None
self.refresh_count = 0
self.last_refresh = None
def start(self) -> None:
"""
Start the background refresh thread.
Creates and starts a daemon thread that will periodically check
and refresh tokens. Safe to call multiple times.
"""
if self._thread is None or not self._thread.is_alive():
self._stop_event.clear()
self._thread = threading.Thread(target=self._refresh_loop, daemon=True)
self._thread.start()
logger.info(
f"Started background refresher (interval: {self.interval_seconds}s)"
)
def stop(self) -> None:
"""
Stop the background refresh thread.
Signals the refresh thread to stop and waits up to 5 seconds
for it to finish gracefully.
"""
if self._thread and self._thread.is_alive():
self._stop_event.set()
self._thread.join(timeout=5)
logger.info("Stopped background refresher")
def _refresh_loop(self):
while not self._stop_event.is_set():
if self._stop_event.wait(self.interval_seconds):
break
try:
if self.secret_store._refresh_keycloak_tokens():
self.secret_store._authenticate_vault()
self.refresh_count += 1
self.last_refresh = datetime.now()
logger.info(
f"✅ Background refresh #{self.refresh_count} successful"
)
else:
logger.error("❌ Background refresh failed")
except Exception as e:
logger.error(f"Exception in background refresh: {e}")
def get_status(self) -> dict[str, Any]:
"""
Get the current status of the background refresher.
Returns
-------
dict[str, Any]
Status dictionary containing:
- running: Whether the refresh thread is active
- refresh_count: Number of successful refreshes performed
- last_refresh: ISO timestamp of last successful refresh (or None)
- interval_seconds: Configured refresh interval
Examples
--------
>>> refresher = secrets.start_background_refresh()
>>> status = refresher.get_status()
>>> print(f"Running: {status['running']}, Count: {status['refresh_count']}")
"""
return {
"running": self._thread and self._thread.is_alive(),
"refresh_count": self.refresh_count,
"last_refresh": self.last_refresh.isoformat()
if self.last_refresh
else None,
"interval_seconds": self.interval_seconds,
}
# Utility functions
def get_env_from_secrets(secrets: SecretStore, key: str = "environment") -> list[str]:
"""
Load environment variables from SecretStore into os.environ.
Retrieves stored environment variables and sets them in the current
process's environment. This is useful for loading configuration that
was previously stored securely.
Parameters
----------
secrets : SecretStore
The SecretStore instance to load from.
key : str, optional
The key where environment variables are stored, by default "environment".
Returns
-------
list[str]
List of environment variable names that were loaded and set.
Empty list if the key doesn't exist or contains no data.
Examples
--------
>>> secrets = SecretStore()
>>> # First, store some environment variables
>>> put_env_to_secrets(secrets, {'DEBUG': 'true', 'PORT': '8080'})
>>> # Later, load them back
>>> loaded = get_env_from_secrets(secrets)
>>> print(f'Loaded {len(loaded)} variables: {loaded}')
['DEBUG', 'PORT']
>>> print(os.environ['DEBUG']) # Now available
'true'
"""
env_vars = secrets.get(key)
if env_vars:
for name, value in env_vars.items():
os.environ[name] = str(value)
logger.info(f"Set environment variable: {name}")
return list(env_vars.keys())
return []
def put_env_to_secrets(
secrets: SecretStore, env_dict: dict, key: str = "environment"
) -> str:
"""
Store environment variables in SecretStore.
Saves a dictionary of environment variables to secure storage.
This is useful for persisting configuration across sessions.
Parameters
----------
secrets : SecretStore
The SecretStore instance to save to.
env_dict : dict[str, Any]
Dictionary of environment variables to store. Keys should be
environment variable names, values will be converted to strings.
key : str, optional
The key to store environment variables under, by default "environment".
Returns
-------
str
Full Vault path where the environment variables were stored.
Examples
--------
>>> secrets = SecretStore()
>>> env_vars = {
... 'DATABASE_URL': 'postgresql://localhost:5432/mydb',
... 'DEBUG': 'false',
... 'MAX_WORKERS': '4'
... }
>>> path = put_env_to_secrets(secrets, env_vars)
>>> print(f'Stored at: {path}')
'jupyter/users/username/environment'
>>> # Store with custom key
>>> put_env_to_secrets(secrets, {'API_KEY': 'secret'}, 'production-config')
"""
# Convert all values to strings and use **kwargs for put()
string_env_dict = {k: str(v) for k, v in env_dict.items()}
path = secrets.put(key, **string_env_dict)
logger.info(f"Put {len(env_dict)} environment variables")
return path