Files
buun-stack/python-package/buunstack/secrets.py
2025-09-03 10:11:06 +09:00

556 lines
19 KiB
Python

"""
Secrets management with user-specific Vault token authentication
"""
import logging
import os
import warnings
from typing import Any, overload
import hvac
# Suppress SSL warnings for self-signed certificates
warnings.filterwarnings("ignore", message="Unverified HTTPS request")
logger = logging.getLogger("buunstack")
log_level_str = os.getenv("BUUNSTACK_LOG_LEVEL", "warning").upper()
log_level = getattr(logging, log_level_str, logging.WARNING)
logger.setLevel(log_level)
# For Jupyter notebooks, we need to ensure proper logging configuration
# Always add handler if none exists, regardless of conditions
if not logger.handlers:
handler = logging.StreamHandler()
handler.setLevel(log_level)
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
handler.setFormatter(formatter)
logger.addHandler(handler)
# Disable propagation to avoid root logger interference in notebooks
logger.propagate = False
# Debug: Log the handler addition
if log_level <= logging.DEBUG:
print(f"DEBUG: Added StreamHandler to buunstack logger (level={log_level})")
logging.getLogger().setLevel(log_level)
# Additional debug information for troubleshooting
if log_level <= logging.DEBUG:
print(
f"DEBUG: buunstack logger initialized - level={logger.level}, handlers={len(logger.handlers)}"
)
class SecretStore:
"""
Secure secrets management with JupyterHub API authentication.
Uses JupyterHub's vault-token API endpoint to obtain Vault tokens
by exchanging auth_state JWT. Implements singleton pattern for
consistent state across imports.
Examples
--------
>>> secrets = SecretStore()
>>> secrets.put('api-keys', openai='sk-123', github='ghp-456')
>>> openai_key = secrets.get('api-keys', field='openai')
>>> print(openai_key)
'sk-123'
"""
_instance = None
_initialized = False
def __new__(cls, *args, **kwargs):
"""Return singleton SecretStore instance."""
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self):
"""
Initialize SecretStore with JupyterHub API authentication.
Uses JupyterHub's vault-token API endpoint to exchange
auth_state JWT for Vault tokens.
"""
if self._initialized:
return
self.username = os.getenv("JUPYTERHUB_USER")
self.vault_addr = os.getenv("VAULT_ADDR")
self.base_path = f"jupyter/users/{self.username}"
# Using pre-acquired Vault token from notebook spawn
# Initialize Vault client
self.client = hvac.Client(url=self.vault_addr, verify=False)
# Attempt authentication
self._authenticate_vault()
logger.info(f"SecretStore initialized for user: {self.username}")
logger.info("Using user-specific Vault token authentication")
self._initialized = True
def _authenticate_vault(self):
"""
Authenticate with Vault using user-specific token from notebook spawn.
Raises
------
Exception
If user-specific Vault token is not available.
"""
vault_token = os.getenv("NOTEBOOK_VAULT_TOKEN")
if not vault_token:
raise Exception(
"No user-specific Vault token available. "
"Please restart your notebook server."
)
self.client.token = vault_token
logger.info("✅ Using user-specific Vault token from notebook spawn")
def _ensure_authenticated(self):
"""
Ensure we have valid Vault authentication with token renewal.
"""
try:
if self.client.is_authenticated():
# Check if token needs renewal (if renewable and close to expiry)
try:
token_info = self.client.auth.token.lookup_self()
ttl = token_info.get("data", {}).get("ttl", 0)
renewable = token_info.get("data", {}).get("renewable", False)
# Renew if TTL < 10 minutes and renewable
if renewable and ttl > 0 and ttl < 600:
logger.info(f"Renewing Vault token (TTL: {ttl}s)")
self.client.auth.token.renew_self()
logger.info("✅ Vault token renewed successfully")
except Exception as e:
logger.warning(f"Token renewal check failed: {e}")
return
except Exception:
pass
# Token expired or invalid - no fallback available with user-specific tokens
raise Exception(
"User-specific Vault token expired and cannot be refreshed. Please restart your notebook server."
)
def put(self, key: str, **kwargs: Any) -> None:
"""
Store data in your personal secret storage.
Saves the provided key-value pairs to Vault under the specified key.
Values must be strings. For complex data types, encode them as JSON strings.
Parameters
----------
key : Any
The key/name for the secret. Must be a valid Vault path component.
**kwargs : str
Key-value pairs to store as the secret data. All values must be strings.
For complex types, encode them as JSON strings first.
Raises
------
ValueError
If key is empty or contains invalid characters, if no kwargs provided,
or if any value is not a string.
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.Forbidden
If authentication fails or insufficient permissions.
hvac.exceptions.InvalidRequest
If the data format is invalid.
Examples
--------
>>> import json
>>> secrets = SecretStore()
>>> secrets.put('api-keys', openai='sk-123', github='ghp-456')
>>> # Store complex data as JSON strings
>>> config_data = {'debug': True, 'max_workers': 4}
>>> secrets.put('config',
... settings=json.dumps(config_data),
... endpoints=json.dumps(['api.example.com']))
"""
if not kwargs:
raise ValueError("At least one key-value pair must be provided")
# Validate all values are strings
for field_name, value in kwargs.items():
if not isinstance(value, str):
raise ValueError(
f"Value for '{field_name}' must be a string. "
f"Got {type(value).__name__}. "
"For complex types, encode as JSON string first."
)
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
self.client.secrets.kv.v2.create_or_update_secret(
path=path, secret=kwargs, mount_point="secret"
)
logger.info(f"Put secret: {key}")
except Exception as e:
logger.error(f"Failed to put secret: {e}")
# Retry once with re-authentication
self._ensure_authenticated()
self.client.secrets.kv.v2.create_or_update_secret(
path=path, secret=kwargs, mount_point="secret"
)
@overload
def get(self, key: str, field: None = None) -> dict[str, Any]: ...
@overload
def get(self, key: str, field: str) -> str: ...
def get(self, key: str, field: str | None = None) -> dict[str, Any] | str:
"""
Retrieve data from your personal secret storage.
Loads the data dictionary stored under the specified key from Vault.
If field is specified, returns only that field's value. Raises KeyError
if the key doesn't exist or if the specified field is not found.
Parameters
----------
key : str
The key/name of the secret to retrieve.
field : str, optional
Specific field to retrieve from the secret. If provided, returns
only the value of this field instead of the entire secret dict.
Returns
-------
dict[str, Any] or str
- If field is None: The complete stored data dictionary.
- If field is specified: The value of the specified field.
Raises
------
KeyError
If the key doesn't exist or if the specified field is not found.
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.InvalidRequest
If the key format is invalid.
Examples
--------
>>> secrets = SecretStore()
>>> # Get entire secret
>>> api_keys = secrets.get('api-keys')
>>> if api_keys:
... openai_key = api_keys['openai']
... print(f'OpenAI key: {openai_key}')
>>> # Get specific field (like vault kv get -field=...)
>>> openai_key = secrets.get('api-keys', field='openai')
>>> print(f'OpenAI key: {openai_key}')
>>> # Handle missing keys or fields
>>> try:
... config = secrets.get('nonexistent-key')
... except KeyError:
... print('Key not found')
>>> try:
... missing_field = secrets.get('api-keys', field='nonexistent')
... except KeyError:
... print('Field not found')
"""
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
logger.info(f"Got secret: {key}")
# Return specific field if requested
if field is not None:
if field not in data:
raise KeyError(f"Field '{field}' not found in secret '{key}'")
return data[field]
return data
else:
raise KeyError(f"Secret '{key}' not found")
except Exception as e:
if "permission denied" in str(e).lower():
logger.info("Permission denied, re-authenticating...")
self._ensure_authenticated()
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
if field is not None:
if field not in data:
raise KeyError(
f"Field '{field}' not found in secret '{key}'"
)
return data[field]
return data
else:
raise KeyError(f"Secret '{key}' not found")
logger.warning(f'Could not get secret "{key}": {e}')
raise KeyError(f"Secret '{key}' not found") from e
def delete(self, key: str, field: str | None = None) -> None:
"""
Delete a secret or a specific field from your personal storage.
If field is None, permanently removes the entire secret and all its versions.
If field is specified, removes only that field from the secret.
Parameters
----------
key : str
The key/name of the secret to delete or modify.
field : str, optional
Specific field to delete from the secret. If None, deletes entire secret.
Raises
------
KeyError
If the key or field doesn't exist.
ConnectionError
If unable to connect to Vault server.
hvac.exceptions.Forbidden
If authentication fails or insufficient permissions.
hvac.exceptions.InvalidRequest
If the key format is invalid.
Examples
--------
>>> secrets = SecretStore()
>>> # Delete entire secret
>>> secrets.delete('old-api-key')
>>>
>>> # Delete only specific field
>>> secrets.put('credentials', github='token123', aws='secret456')
>>> secrets.delete('credentials', field='github')
>>> # Now only 'aws' field remains
"""
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
if field is None:
# Delete entire secret
try:
self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path, mount_point="secret"
)
logger.info(f"Deleted secret: {key}")
except Exception as e:
logger.error(f'Failed to delete secret "{key}": {e}')
raise
else:
# Delete specific field only
try:
# First, get the current secret
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
# Check if field exists
if field not in data:
raise KeyError(f"Field '{field}' not found in secret '{key}'")
# Remove the field
del data[field]
# If no fields remain, delete the entire secret
if not data:
self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path, mount_point="secret"
)
logger.info(f"Deleted secret '{key}' (no fields remaining)")
else:
# Update the secret without the deleted field
self.client.secrets.kv.v2.create_or_update_secret(
path=path, secret=data, mount_point="secret"
)
logger.info(f"Deleted field '{field}' from secret '{key}'")
else:
raise KeyError(f"Secret '{key}' not found")
except KeyError:
raise
except Exception as e:
logger.error(
f"Failed to delete field '{field}' from secret '{key}': {e}"
)
raise
def list(self) -> list[str]:
"""
List all secret keys in your personal storage.
Returns a list of all secret keys that you have stored in Vault.
Does not include the actual secret values for security reasons.
Returns
-------
list[str]
List of secret keys. Empty list if no secrets found or on error.
Examples
--------
>>> secrets = SecretStore()
>>> keys = secrets.list()
>>> print(f'You have {len(keys)} secrets: {keys}')
['api-keys', 'database-config', 'certificates']
"""
self._ensure_authenticated()
try:
response = self.client.secrets.kv.v2.list_secrets(
path=self.base_path, mount_point="secret"
)
keys = response["data"]["keys"] if response else []
logger.info(f"Listed {len(keys)} secrets")
return keys
except Exception as e:
logger.warning(f"Could not list secrets: {e}")
return []
def get_status(self) -> dict[str, Any]:
"""
Get status information about the SecretStore instance.
Returns
-------
dict[str, Any]
Status dictionary containing:
- username: JupyterHub username
- vault_addr: Vault server address
- authentication_method: Authentication method used
- vault_authenticated: Whether Vault client is authenticated
Examples
--------
>>> secrets = SecretStore()
>>> status = secrets.get_status()
>>> print(f"User: {status['username']}")
"""
status = {
"username": self.username,
"vault_addr": self.vault_addr,
"authentication_method": "User-specific Vault token",
}
try:
status["vault_authenticated"] = self.client.is_authenticated()
except Exception:
status["vault_authenticated"] = False
return status
# Utility functions
def get_env_from_secrets(secrets: SecretStore, key: str = "environment") -> list[str]:
"""
Load environment variables from SecretStore into os.environ.
Retrieves stored environment variables and sets them in the current
process's environment. This is useful for loading configuration that
was previously stored securely.
Parameters
----------
secrets : SecretStore
The SecretStore instance to load from.
key : str, optional
The key where environment variables are stored, by default "environment".
Returns
-------
list[str]
List of environment variable names that were loaded and set.
Empty list if the key doesn't exist or contains no data.
Examples
--------
>>> secrets = SecretStore()
>>> # First, store some environment variables
>>> put_env_to_secrets(secrets, {'DEBUG': 'true', 'PORT': '8080'})
>>> # Later, load them back
>>> loaded = get_env_from_secrets(secrets)
>>> print(f'Loaded {len(loaded)} variables: {loaded}')
['DEBUG', 'PORT']
>>> print(os.environ['DEBUG']) # Now available
'true'
"""
env_vars = secrets.get(key)
if env_vars:
for name, value in env_vars.items():
os.environ[name] = str(value)
logger.info(f"Set environment variable: {name}")
return list(env_vars.keys())
return []
def put_env_to_secrets(
secrets: SecretStore, env_dict: dict, key: str = "environment"
) -> str:
"""
Store environment variables in SecretStore.
Saves a dictionary of environment variables to secure storage.
This is useful for persisting configuration across sessions.
Parameters
----------
secrets : SecretStore
The SecretStore instance to save to.
env_dict : dict[str, Any]
Dictionary of environment variables to store. Keys should be
environment variable names, values will be converted to strings.
key : str, optional
The key to store environment variables under, by default "environment".
Returns
-------
str
Full Vault path where the environment variables were stored.
Examples
--------
>>> secrets = SecretStore()
>>> env_vars = {
... 'DATABASE_URL': 'postgresql://localhost:5432/mydb',
... 'DEBUG': 'false',
... 'MAX_WORKERS': '4'
... }
>>> put_env_to_secrets(secrets, env_vars)
'jupyter/users/username/environment'
>>> # Store with custom key
>>> put_env_to_secrets(secrets, {'API_KEY': 'secret'}, 'production-config')
'jupyter/users/username/production-config'
"""
# Convert all values to strings and use **kwargs for put()
string_env_dict = {k: str(v) for k, v in env_dict.items()}
secrets.put(key, **string_env_dict)
logger.info(f"Put {len(env_dict)} environment variables")
return f"jupyter/users/{secrets.username}/{key}"