diff --git a/postgres/justfile b/postgres/justfile index 1813aa8..139b1b9 100644 --- a/postgres/justfile +++ b/postgres/justfile @@ -292,9 +292,281 @@ delete-user-and-db username='' db_name='': else echo "User ${USERNAME} does not exist, skipping user deletion." fi - echo "Cleanup completed." +# Create logical replication slot for CDC +create-replication-slot slot_name='' db_name='postgres' plugin='pgoutput': + #!/bin/bash + set -euo pipefail + SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"} + DB_NAME=${DB_NAME:-"{{ db_name }}"} + PLUGIN=${PLUGIN:-"{{ plugin }}"} + while [ -z "${SLOT_NAME}" ]; do + SLOT_NAME=$(gum input --prompt="Replication slot name: " --width=100 \ + --placeholder="e.g., airbyte_slot") + done + if kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -tAc \ + "SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" | grep -q "${SLOT_NAME}"; then + echo "Replication slot '${SLOT_NAME}' already exists." + exit 0 + fi + echo "Creating replication slot '${SLOT_NAME}' with plugin '${PLUGIN}'..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "SELECT pg_create_logical_replication_slot('${SLOT_NAME}', '${PLUGIN}');" + echo "Replication slot '${SLOT_NAME}' created." + +# Delete replication slot +delete-replication-slot slot_name='' db_name='postgres': + #!/bin/bash + set -euo pipefail + SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"} + DB_NAME=${DB_NAME:-"{{ db_name }}"} + while [ -z "${SLOT_NAME}" ]; do + SLOT_NAME=$(gum input --prompt="Replication slot name to delete: " --width=100) + done + if ! kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -tAc \ + "SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" | grep -q "${SLOT_NAME}"; then + echo "Replication slot '${SLOT_NAME}' does not exist." + exit 1 + fi + echo "Deleting replication slot '${SLOT_NAME}'..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "SELECT pg_drop_replication_slot('${SLOT_NAME}');" + echo "Replication slot '${SLOT_NAME}' deleted." + +# List all replication slots +list-replication-slots: + @echo "Replication slots:" + @kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d postgres -c \ + "SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;" + +# Create publication for CDC +create-publication pub_name='' db_name='' tables='': + #!/bin/bash + set -euo pipefail + PUB_NAME=${PUB_NAME:-"{{ pub_name }}"} + DB_NAME=${DB_NAME:-"{{ db_name }}"} + TABLES="${TABLES:-{{ tables }}}" + while [ -z "${PUB_NAME}" ]; do + PUB_NAME=$(gum input --prompt="Publication name: " --width=100 \ + --placeholder="e.g., airbyte_publication") + done + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name: " --width=100) + done + if kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -tAc \ + "SELECT pubname FROM pg_publication WHERE pubname = '${PUB_NAME}';" | grep -q "${PUB_NAME}"; then + echo "Publication '${PUB_NAME}' already exists in database '${DB_NAME}'." + exit 0 + fi + if [ -z "${TABLES}" ]; then + echo "Select tables to include in publication:" + echo "1) All tables (ALL TABLES)" + echo "2) All user tables (exclude system/internal tables)" + echo "3) Specific tables (comma-separated list)" + CHOICE=$(gum choose "All tables" "User tables only" "Specific tables") + case "${CHOICE}" in + "All tables") + TABLES="ALL TABLES" + ;; + "User tables only") + # Get list of user tables (excluding _airbyte* and other system tables) + USER_TABLES=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -tAc \ + "SELECT string_agg(tablename, ', ') FROM pg_tables + WHERE schemaname = 'public' + AND tablename NOT LIKE '\_%' + AND tablename NOT LIKE 'pg_%';") + if [ -z "${USER_TABLES}" ]; then + echo "No user tables found in database '${DB_NAME}'" + exit 1 + fi + TABLES="TABLE ${USER_TABLES}" + echo "Including tables: ${USER_TABLES}" + ;; + "Specific tables") + TABLES=$(gum input --prompt="Enter table names (comma-separated): " --width=100 \ + --placeholder="e.g., users, products, orders") + TABLES="TABLE ${TABLES}" + ;; + esac + elif [ "${TABLES}" = "ALL" ]; then + TABLES="ALL TABLES" + fi + echo "Creating publication '${PUB_NAME}' in database '${DB_NAME}'..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "CREATE PUBLICATION ${PUB_NAME} FOR ${TABLES};" + if [ "${TABLES}" != "ALL TABLES" ]; then + echo "Setting REPLICA IDENTITY for included tables..." + TABLE_LIST=$(echo "${TABLES}" | sed 's/TABLE //') + IFS=',' read -ra TABLE_ARRAY <<< "${TABLE_LIST}" + for table in "${TABLE_ARRAY[@]}"; do + table=$(echo "$table" | xargs) # trim whitespace + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "ALTER TABLE ${table} REPLICA IDENTITY FULL;" 2>/dev/null || true + done + fi + echo "Publication '${PUB_NAME}' created." + +# Delete publication +delete-publication pub_name='' db_name='': + #!/bin/bash + set -euo pipefail + PUB_NAME=${PUB_NAME:-"{{ pub_name }}"} + DB_NAME=${DB_NAME:-"{{ db_name }}"} + while [ -z "${PUB_NAME}" ]; do + PUB_NAME=$(gum input --prompt="Publication name to delete: " --width=100) + done + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name: " --width=100) + done + if ! kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -tAc \ + "SELECT pubname FROM pg_publication WHERE pubname = '${PUB_NAME}';" | grep -q "${PUB_NAME}"; then + echo "Publication '${PUB_NAME}' does not exist in database '${DB_NAME}'." + exit 1 + fi + echo "Deleting publication '${PUB_NAME}' from database '${DB_NAME}'..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "DROP PUBLICATION ${PUB_NAME};" + echo "Publication '${PUB_NAME}' deleted." + +# List all publications in a database +list-publications db_name='': + #!/bin/bash + set -euo pipefail + DB_NAME=${DB_NAME:-"{{ db_name }}"} + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name: " --width=100) + done + echo "Publications in database '${DB_NAME}':" + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication;" + +# Grant CDC privileges to user +grant-cdc-privileges username='' db_name='': + #!/bin/bash + set -euo pipefail + USERNAME=${USERNAME:-"{{ username }}"} + DB_NAME=${DB_NAME:-"{{ db_name }}"} + while [ -z "${USERNAME}" ]; do + USERNAME=$(gum input --prompt="Username to grant CDC privileges: " --width=100) + done + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name: " --width=100) + done + echo "Granting CDC privileges to user '${USERNAME}' on database '${DB_NAME}'..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c "ALTER USER ${USERNAME} WITH REPLICATION;" + echo "Granting schema and table privileges..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c \ + "GRANT USAGE ON SCHEMA public TO ${USERNAME}; + GRANT CREATE ON SCHEMA public TO ${USERNAME}; + GRANT SELECT ON ALL TABLES IN SCHEMA public TO ${USERNAME}; + ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO ${USERNAME};" + echo "Granting pg_read_all_data role..." + kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d ${DB_NAME} -c "GRANT pg_read_all_data TO ${USERNAME};" 2>/dev/null || true + echo "CDC privileges granted to user '${USERNAME}'" + +# Setup CDC (Change Data Capture) +setup-cdc db_name='' slot_name='' pub_name='' username='': + #!/bin/bash + set -euo pipefail + DB_NAME=${DB_NAME:-"{{ db_name }}"} + SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"} + PUB_NAME=${PUB_NAME:-"{{ pub_name }}"} + USERNAME=${USERNAME:-"{{ username }}"} + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name for CDC setup: " --width=100) + done + while [ -z "${SLOT_NAME}" ]; do + SLOT_NAME=$(gum input --prompt="Replication slot name: " --width=100 \ + --placeholder="e.g., demo_slot") + done + while [ -z "${PUB_NAME}" ]; do + PUB_NAME=$(gum input --prompt="Publication name: " --width=100 \ + --placeholder="e.g., demo_pub") + done + echo "Setting up CDC on database '${DB_NAME}'..." + WAL_LEVEL=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d postgres -tAc "SHOW wal_level;") + if [ "${WAL_LEVEL}" != "logical" ]; then + echo "WARNING: wal_level is '${WAL_LEVEL}', should be 'logical' for CDC" + echo "Please ensure PostgreSQL is configured with wal_level=logical" + exit 1 + fi + just create-replication-slot "${SLOT_NAME}" "${DB_NAME}" + just create-publication "${PUB_NAME}" "${DB_NAME}" + if [ -n "${USERNAME}" ]; then + echo "" + just grant-cdc-privileges "${USERNAME}" "${DB_NAME}" + fi + echo "" + echo "CDC setup completed for database '${DB_NAME}'" + echo " Replication Method: Logical Replication (CDC)" + echo " Replication Slot: ${SLOT_NAME}" + echo " Publication: ${PUB_NAME}" + if [ -n "${USERNAME}" ]; then + echo " User with CDC privileges: ${USERNAME}" + fi + +# Cleanup CDC (removes slot and publication) +cleanup-cdc db_name='' slot_name='' pub_name='': + #!/bin/bash + set -euo pipefail + DB_NAME=${DB_NAME:-"{{ db_name }}"} + SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"} + PUB_NAME=${PUB_NAME:-"{{ pub_name }}"} + + while [ -z "${DB_NAME}" ]; do + DB_NAME=$(gum input --prompt="Database name for CDC cleanup: " --width=100) + done + while [ -z "${SLOT_NAME}" ]; do + SLOT_NAME=$(gum input --prompt="Replication slot name to delete: " --width=100 \ + --placeholder="e.g., demo_slot") + done + while [ -z "${PUB_NAME}" ]; do + PUB_NAME=$(gum input --prompt="Publication name to delete: " --width=100 \ + --placeholder="e.g., demo_pub") + done + echo "Cleaning up CDC configuration for database '${DB_NAME}'..." + + # Check if slot is active + SLOT_ACTIVE=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \ + psql -U postgres -d postgres -tAc \ + "SELECT active FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" 2>/dev/null || echo "") + if [ "${SLOT_ACTIVE}" = "t" ]; then + echo "WARNING: Replication slot '${SLOT_NAME}' is currently active!" + echo "Please stop any active replication connections first." + if ! gum confirm "Proceed with deletion anyway?"; then + echo "Cleanup cancelled" + exit 1 + fi + fi + + # Delete in correct order: Slot first, then Publication + echo "Step 1: Deleting replication slot '${SLOT_NAME}'..." + just delete-replication-slot "${SLOT_NAME}" "${DB_NAME}" || \ + echo "Replication slot '${SLOT_NAME}' not found or already deleted" + + echo "Step 2: Deleting publication '${PUB_NAME}'..." + just delete-publication "${PUB_NAME}" "${DB_NAME}" || \ + echo "Publication '${PUB_NAME}' not found or already deleted" + + echo "CDC cleanup completed for database '${DB_NAME}'" + # Run psql [no-exit-message] psql *args='':