feat(postgres): setting CDC

This commit is contained in:
Masaki Yatsu
2025-09-13 21:02:18 +09:00
parent 833c5a2b70
commit 7c6383c037

View File

@@ -292,9 +292,281 @@ delete-user-and-db username='' db_name='':
else else
echo "User ${USERNAME} does not exist, skipping user deletion." echo "User ${USERNAME} does not exist, skipping user deletion."
fi fi
echo "Cleanup completed." echo "Cleanup completed."
# Create logical replication slot for CDC
create-replication-slot slot_name='' db_name='postgres' plugin='pgoutput':
#!/bin/bash
set -euo pipefail
SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"}
DB_NAME=${DB_NAME:-"{{ db_name }}"}
PLUGIN=${PLUGIN:-"{{ plugin }}"}
while [ -z "${SLOT_NAME}" ]; do
SLOT_NAME=$(gum input --prompt="Replication slot name: " --width=100 \
--placeholder="e.g., airbyte_slot")
done
if kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -tAc \
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" | grep -q "${SLOT_NAME}"; then
echo "Replication slot '${SLOT_NAME}' already exists."
exit 0
fi
echo "Creating replication slot '${SLOT_NAME}' with plugin '${PLUGIN}'..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"SELECT pg_create_logical_replication_slot('${SLOT_NAME}', '${PLUGIN}');"
echo "Replication slot '${SLOT_NAME}' created."
# Delete replication slot
delete-replication-slot slot_name='' db_name='postgres':
#!/bin/bash
set -euo pipefail
SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"}
DB_NAME=${DB_NAME:-"{{ db_name }}"}
while [ -z "${SLOT_NAME}" ]; do
SLOT_NAME=$(gum input --prompt="Replication slot name to delete: " --width=100)
done
if ! kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -tAc \
"SELECT slot_name FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" | grep -q "${SLOT_NAME}"; then
echo "Replication slot '${SLOT_NAME}' does not exist."
exit 1
fi
echo "Deleting replication slot '${SLOT_NAME}'..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"SELECT pg_drop_replication_slot('${SLOT_NAME}');"
echo "Replication slot '${SLOT_NAME}' deleted."
# List all replication slots
list-replication-slots:
@echo "Replication slots:"
@kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d postgres -c \
"SELECT slot_name, plugin, slot_type, database, active, restart_lsn FROM pg_replication_slots;"
# Create publication for CDC
create-publication pub_name='' db_name='' tables='':
#!/bin/bash
set -euo pipefail
PUB_NAME=${PUB_NAME:-"{{ pub_name }}"}
DB_NAME=${DB_NAME:-"{{ db_name }}"}
TABLES="${TABLES:-{{ tables }}}"
while [ -z "${PUB_NAME}" ]; do
PUB_NAME=$(gum input --prompt="Publication name: " --width=100 \
--placeholder="e.g., airbyte_publication")
done
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name: " --width=100)
done
if kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -tAc \
"SELECT pubname FROM pg_publication WHERE pubname = '${PUB_NAME}';" | grep -q "${PUB_NAME}"; then
echo "Publication '${PUB_NAME}' already exists in database '${DB_NAME}'."
exit 0
fi
if [ -z "${TABLES}" ]; then
echo "Select tables to include in publication:"
echo "1) All tables (ALL TABLES)"
echo "2) All user tables (exclude system/internal tables)"
echo "3) Specific tables (comma-separated list)"
CHOICE=$(gum choose "All tables" "User tables only" "Specific tables")
case "${CHOICE}" in
"All tables")
TABLES="ALL TABLES"
;;
"User tables only")
# Get list of user tables (excluding _airbyte* and other system tables)
USER_TABLES=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -tAc \
"SELECT string_agg(tablename, ', ') FROM pg_tables
WHERE schemaname = 'public'
AND tablename NOT LIKE '\_%'
AND tablename NOT LIKE 'pg_%';")
if [ -z "${USER_TABLES}" ]; then
echo "No user tables found in database '${DB_NAME}'"
exit 1
fi
TABLES="TABLE ${USER_TABLES}"
echo "Including tables: ${USER_TABLES}"
;;
"Specific tables")
TABLES=$(gum input --prompt="Enter table names (comma-separated): " --width=100 \
--placeholder="e.g., users, products, orders")
TABLES="TABLE ${TABLES}"
;;
esac
elif [ "${TABLES}" = "ALL" ]; then
TABLES="ALL TABLES"
fi
echo "Creating publication '${PUB_NAME}' in database '${DB_NAME}'..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"CREATE PUBLICATION ${PUB_NAME} FOR ${TABLES};"
if [ "${TABLES}" != "ALL TABLES" ]; then
echo "Setting REPLICA IDENTITY for included tables..."
TABLE_LIST=$(echo "${TABLES}" | sed 's/TABLE //')
IFS=',' read -ra TABLE_ARRAY <<< "${TABLE_LIST}"
for table in "${TABLE_ARRAY[@]}"; do
table=$(echo "$table" | xargs) # trim whitespace
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"ALTER TABLE ${table} REPLICA IDENTITY FULL;" 2>/dev/null || true
done
fi
echo "Publication '${PUB_NAME}' created."
# Delete publication
delete-publication pub_name='' db_name='':
#!/bin/bash
set -euo pipefail
PUB_NAME=${PUB_NAME:-"{{ pub_name }}"}
DB_NAME=${DB_NAME:-"{{ db_name }}"}
while [ -z "${PUB_NAME}" ]; do
PUB_NAME=$(gum input --prompt="Publication name to delete: " --width=100)
done
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name: " --width=100)
done
if ! kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -tAc \
"SELECT pubname FROM pg_publication WHERE pubname = '${PUB_NAME}';" | grep -q "${PUB_NAME}"; then
echo "Publication '${PUB_NAME}' does not exist in database '${DB_NAME}'."
exit 1
fi
echo "Deleting publication '${PUB_NAME}' from database '${DB_NAME}'..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"DROP PUBLICATION ${PUB_NAME};"
echo "Publication '${PUB_NAME}' deleted."
# List all publications in a database
list-publications db_name='':
#!/bin/bash
set -euo pipefail
DB_NAME=${DB_NAME:-"{{ db_name }}"}
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name: " --width=100)
done
echo "Publications in database '${DB_NAME}':"
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"SELECT pubname, puballtables, pubinsert, pubupdate, pubdelete FROM pg_publication;"
# Grant CDC privileges to user
grant-cdc-privileges username='' db_name='':
#!/bin/bash
set -euo pipefail
USERNAME=${USERNAME:-"{{ username }}"}
DB_NAME=${DB_NAME:-"{{ db_name }}"}
while [ -z "${USERNAME}" ]; do
USERNAME=$(gum input --prompt="Username to grant CDC privileges: " --width=100)
done
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name: " --width=100)
done
echo "Granting CDC privileges to user '${USERNAME}' on database '${DB_NAME}'..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c "ALTER USER ${USERNAME} WITH REPLICATION;"
echo "Granting schema and table privileges..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c \
"GRANT USAGE ON SCHEMA public TO ${USERNAME};
GRANT CREATE ON SCHEMA public TO ${USERNAME};
GRANT SELECT ON ALL TABLES IN SCHEMA public TO ${USERNAME};
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO ${USERNAME};"
echo "Granting pg_read_all_data role..."
kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d ${DB_NAME} -c "GRANT pg_read_all_data TO ${USERNAME};" 2>/dev/null || true
echo "CDC privileges granted to user '${USERNAME}'"
# Setup CDC (Change Data Capture)
setup-cdc db_name='' slot_name='' pub_name='' username='':
#!/bin/bash
set -euo pipefail
DB_NAME=${DB_NAME:-"{{ db_name }}"}
SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"}
PUB_NAME=${PUB_NAME:-"{{ pub_name }}"}
USERNAME=${USERNAME:-"{{ username }}"}
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name for CDC setup: " --width=100)
done
while [ -z "${SLOT_NAME}" ]; do
SLOT_NAME=$(gum input --prompt="Replication slot name: " --width=100 \
--placeholder="e.g., demo_slot")
done
while [ -z "${PUB_NAME}" ]; do
PUB_NAME=$(gum input --prompt="Publication name: " --width=100 \
--placeholder="e.g., demo_pub")
done
echo "Setting up CDC on database '${DB_NAME}'..."
WAL_LEVEL=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d postgres -tAc "SHOW wal_level;")
if [ "${WAL_LEVEL}" != "logical" ]; then
echo "WARNING: wal_level is '${WAL_LEVEL}', should be 'logical' for CDC"
echo "Please ensure PostgreSQL is configured with wal_level=logical"
exit 1
fi
just create-replication-slot "${SLOT_NAME}" "${DB_NAME}"
just create-publication "${PUB_NAME}" "${DB_NAME}"
if [ -n "${USERNAME}" ]; then
echo ""
just grant-cdc-privileges "${USERNAME}" "${DB_NAME}"
fi
echo ""
echo "CDC setup completed for database '${DB_NAME}'"
echo " Replication Method: Logical Replication (CDC)"
echo " Replication Slot: ${SLOT_NAME}"
echo " Publication: ${PUB_NAME}"
if [ -n "${USERNAME}" ]; then
echo " User with CDC privileges: ${USERNAME}"
fi
# Cleanup CDC (removes slot and publication)
cleanup-cdc db_name='' slot_name='' pub_name='':
#!/bin/bash
set -euo pipefail
DB_NAME=${DB_NAME:-"{{ db_name }}"}
SLOT_NAME=${SLOT_NAME:-"{{ slot_name }}"}
PUB_NAME=${PUB_NAME:-"{{ pub_name }}"}
while [ -z "${DB_NAME}" ]; do
DB_NAME=$(gum input --prompt="Database name for CDC cleanup: " --width=100)
done
while [ -z "${SLOT_NAME}" ]; do
SLOT_NAME=$(gum input --prompt="Replication slot name to delete: " --width=100 \
--placeholder="e.g., demo_slot")
done
while [ -z "${PUB_NAME}" ]; do
PUB_NAME=$(gum input --prompt="Publication name to delete: " --width=100 \
--placeholder="e.g., demo_pub")
done
echo "Cleaning up CDC configuration for database '${DB_NAME}'..."
# Check if slot is active
SLOT_ACTIVE=$(kubectl exec -n ${CNPG_NAMESPACE} postgres-cluster-1 -c postgres -- \
psql -U postgres -d postgres -tAc \
"SELECT active FROM pg_replication_slots WHERE slot_name = '${SLOT_NAME}';" 2>/dev/null || echo "")
if [ "${SLOT_ACTIVE}" = "t" ]; then
echo "WARNING: Replication slot '${SLOT_NAME}' is currently active!"
echo "Please stop any active replication connections first."
if ! gum confirm "Proceed with deletion anyway?"; then
echo "Cleanup cancelled"
exit 1
fi
fi
# Delete in correct order: Slot first, then Publication
echo "Step 1: Deleting replication slot '${SLOT_NAME}'..."
just delete-replication-slot "${SLOT_NAME}" "${DB_NAME}" || \
echo "Replication slot '${SLOT_NAME}' not found or already deleted"
echo "Step 2: Deleting publication '${PUB_NAME}'..."
just delete-publication "${PUB_NAME}" "${DB_NAME}" || \
echo "Publication '${PUB_NAME}' not found or already deleted"
echo "CDC cleanup completed for database '${DB_NAME}'"
# Run psql # Run psql
[no-exit-message] [no-exit-message]
psql *args='': psql *args='':