feat(jupyterhub): SecretStore.list_fieds()

This commit is contained in:
Masaki Yatsu
2025-09-06 20:56:00 +09:00
parent 20739aadec
commit 979a294c8f
2 changed files with 113 additions and 3 deletions

View File

@@ -37,8 +37,15 @@ secrets.put('api-keys',
api_keys = secrets.get('api-keys') api_keys = secrets.get('api-keys')
openai_key = api_keys['openai_key'] openai_key = api_keys['openai_key']
# Get specific field directly
openai_key = secrets.get('api-keys', field='openai_key')
# List all your secrets # List all your secrets
all_secrets = secrets.list() all_secrets = secrets.list()
# List fields in a specific secret
fields = secrets.list_fields('api-keys')
print(f'Available fields: {fields}') # ['openai_key', 'github_token', 'database_url']
``` ```
### Configuration Options ### Configuration Options
@@ -59,6 +66,20 @@ print(f"JupyterHub sync enabled: {status['sync_with_jupyterhub']}")
print(f"API configured: {status.get('jupyterhub_api_configured', False)}") print(f"API configured: {status.get('jupyterhub_api_configured', False)}")
``` ```
### Advanced Operations
```python
# Delete a specific field from a secret
secrets.delete('api-keys', field='github_token')
# Delete an entire secret
secrets.delete('old-config')
# Check if a field exists before accessing
if 'openai_key' in secrets.list_fields('api-keys'):
key = secrets.get('api-keys', field='openai_key')
```
### Environment Variables Helper ### Environment Variables Helper
```python ```python

View File

@@ -2,6 +2,8 @@
Secrets management with user-specific Vault token authentication Secrets management with user-specific Vault token authentication
""" """
from __future__ import annotations
import logging import logging
import os import os
import warnings import warnings
@@ -353,13 +355,31 @@ class SecretStore:
path = f"{self.base_path}/{key}" path = f"{self.base_path}/{key}"
if field is None: if field is None:
# Delete entire secret # Delete entire secret - first check if it exists
try: try:
# Check if the secret exists first
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if (
not response
or "data" not in response
or "data" not in response["data"]
):
raise KeyError(f"Secret '{key}' not found")
# Now delete it
self.client.secrets.kv.v2.delete_metadata_and_all_versions( self.client.secrets.kv.v2.delete_metadata_and_all_versions(
path=path, mount_point="secret" path=path, mount_point="secret"
) )
logger.info(f"Deleted secret: {key}") logger.info(f"Deleted secret: {key}")
except KeyError as e:
logger.error(f"Failed to delete: {e}")
raise
except Exception as e: except Exception as e:
# Check if the error is due to the secret not existing
if "path not found" in str(e).lower() or "not found" in str(e).lower():
raise KeyError(f"Secret '{key}' not found") from e
logger.error(f'Failed to delete secret "{key}": {e}') logger.error(f'Failed to delete secret "{key}": {e}')
raise raise
else: else:
@@ -393,7 +413,8 @@ class SecretStore:
logger.info(f"Deleted field '{field}' from secret '{key}'") logger.info(f"Deleted field '{field}' from secret '{key}'")
else: else:
raise KeyError(f"Secret '{key}' not found") raise KeyError(f"Secret '{key}' not found")
except KeyError: except KeyError as e:
logger.error(f"Failed to delete field: {e}")
raise raise
except Exception as e: except Exception as e:
logger.error( logger.error(
@@ -430,9 +451,77 @@ class SecretStore:
logger.info(f"Listed {len(keys)} secrets") logger.info(f"Listed {len(keys)} secrets")
return keys return keys
except Exception as e: except Exception as e:
logger.warning(f"Could not list secrets: {e}") # This is expected when no secrets exist yet - just return empty list
logger.debug(f"No secrets found or error listing: {e}")
return [] return []
def list_fields(self, key: str) -> list[str]:
"""
List all field names in a specific secret.
Returns a list of all field names (keys) stored in the specified secret.
Does not include the actual field values for security reasons.
Parameters
----------
key : str
The key/name of the secret to list fields for.
Returns
-------
list[str]
List of field names in the secret. Empty list if the secret doesn't exist.
Raises
------
KeyError
If the secret key doesn't exist.
ConnectionError
If unable to connect to Vault server.
Examples
--------
>>> secrets = SecretStore()
>>> secrets.put('api-keys', openai='sk-123', github='ghp-456', azure='az-789')
>>> fields = secrets.list_fields('api-keys')
>>> print(f'Fields in api-keys: {fields}')
['openai', 'github', 'azure']
>>> # Check available fields before accessing
>>> if 'openai' in secrets.list_fields('api-keys'):
... openai_key = secrets.get('api-keys', field='openai')
"""
self._ensure_authenticated()
path = f"{self.base_path}/{key}"
try:
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
fields = list(data.keys())
logger.info(f"Listed {len(fields)} fields in secret '{key}'")
return fields
else:
raise KeyError(f"Secret '{key}' not found")
except Exception as e:
if "permission denied" in str(e).lower():
logger.info("Permission denied, re-authenticating...")
self._ensure_authenticated()
response = self.client.secrets.kv.v2.read_secret_version(
path=path, mount_point="secret", raise_on_deleted_version=False
)
if response and "data" in response and "data" in response["data"]:
data = response["data"]["data"]
fields = list(data.keys())
logger.info(f"Listed {len(fields)} fields in secret '{key}'")
return fields
else:
raise KeyError(f"Secret '{key}' not found")
logger.warning(f"Could not list fields for secret '{key}': {e}")
raise KeyError(f"Secret '{key}' not found") from e
def get_status(self) -> dict[str, Any]: def get_status(self) -> dict[str, Any]:
""" """
Get status information about the SecretStore instance. Get status information about the SecretStore instance.