diff --git a/.devcontainer/.gdbinit b/.devcontainer/.gdbinit index 9c710923f..9d544512b 100644 --- a/.devcontainer/.gdbinit +++ b/.devcontainer/.gdbinit @@ -3,3 +3,31 @@ # actually also works when debugging with vscode. Providing nice tools # to understand the internal datastructures we are working with. source /root/gdbpg.py + +# when debugging postgres it is convenient to _always_ have a breakpoint +# trigger when an error is logged. Because .gdbinit is sourced before gdb +# is fully attached and has the sources loaded. To make sure the breakpoint +# is added when the library is loaded we temporary set the breakpoint pending +# to on. After we have added out breakpoint we revert back to the default +# configuration for breakpoint pending. +# The breakpoint is hard to read, but at entry of the function we don't have +# the level loaded in elevel. Instead we hardcode the location where the +# level of the current error is stored. Also gdb doesn't understand the +# ERROR symbol so we hardcode this to the value of ERROR. It is very unlikely +# this value will ever change in postgres, but if it does we might need to +# find a way to conditionally load the correct breakpoint. +set breakpoint pending on +break elog.c:errfinish if errordata[errordata_stack_depth].elevel == 21 +set breakpoint pending auto + +echo \n +echo ----------------------------------------------------------------------------------\n +echo when attaching to a postgres backend a breakpoint will be set on elog.c:errfinish \n +echo it will only break on errors being raised in postgres \n +echo \n +echo to disable this breakpoint from vscode run `-exec disable 1` in the debug console \n +echo this assumes it's the first breakpoint loaded as it is loaded from .gdbinit \n +echo this can be verified with `-exec info break`, enabling can be done with \n +echo `-exec enable 1` \n +echo ----------------------------------------------------------------------------------\n +echo \n diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 5944c38db..1f22ff034 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -13,10 +13,33 @@ on: pull_request: types: [opened, reopened,synchronize] jobs: + # Since GHA does not interpolate env varibles in matrix context, we need to + # define them in a separate job and use them in other jobs. + params: + runs-on: ubuntu-latest + name: Initialize parameters + outputs: + build_image_name: "citus/extbuilder" + test_image_name: "citus/exttester" + citusupgrade_image_name: "citus/citusupgradetester" + fail_test_image_name: "citus/failtester" + pgupgrade_image_name: "citus/pgupgradetester" + style_checker_image_name: "citus/stylechecker" + style_checker_tools_version: "0.8.18" + image_suffix: "-v9d71045" + pg14_version: "14.9" + pg15_version: "15.4" + pg16_version: "16.0" + upgrade_pg_versions: "14.9-15.4-16.0" + steps: + # Since GHA jobs needs at least one step we use a noop step here. + - name: Set up parameters + run: echo 'noop' check-sql-snapshots: + needs: params runs-on: ubuntu-20.04 container: - image: ${{ vars.build_image_name }}:latest + image: ${{ needs.params.outputs.build_image_name }}:latest options: --user root steps: - uses: actions/checkout@v3.5.0 @@ -25,9 +48,10 @@ jobs: git config --global --add safe.directory ${GITHUB_WORKSPACE} ci/check_sql_snapshots.sh check-style: + needs: params runs-on: ubuntu-20.04 container: - image: ${{ vars.style_checker_image_name }}:${{ vars.style_checker_tools_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.style_checker_image_name }}:${{ needs.params.outputs.style_checker_tools_version }}${{ needs.params.outputs.image_suffix }} steps: - name: Check Snapshots run: | @@ -68,18 +92,19 @@ jobs: - name: Check for missing downgrade scripts run: ci/check_migration_files.sh build: + needs: params name: Build for PG ${{ matrix.pg_version}} strategy: fail-fast: false matrix: image_name: - - ${{ vars.build_image_name }} + - ${{ needs.params.outputs.build_image_name }} image_suffix: - - ${{ vars.image_suffix}} + - ${{ needs.params.outputs.image_suffix}} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} runs-on: ubuntu-20.04 container: image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}" @@ -106,11 +131,11 @@ jobs: suite: - regress image_name: - - ${{ vars.test_image_name }} + - ${{ needs.params.outputs.test_image_name }} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} make: - check-split - check-multi @@ -129,69 +154,70 @@ jobs: - check-enterprise-isolation-logicalrep-3 include: - make: check-failure - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-failure - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-failure - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-enterprise-failure - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-pytest - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: installcheck suite: cdc - image_name: ${{ vars.test_image_name }} - pg_version: ${{ vars.pg15_version }} + image_name: ${{ needs.params.outputs.test_image_name }} + pg_version: ${{ needs.params.outputs.pg15_version }} - make: installcheck suite: cdc - image_name: ${{ vars.test_image_name }} - pg_version: ${{ vars.pg16_version }} + image_name: ${{ needs.params.outputs.test_image_name }} + pg_version: ${{ needs.params.outputs.pg16_version }} - make: check-query-generator - pg_version: ${{ vars.pg14_version }} + pg_version: ${{ needs.params.outputs.pg14_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-query-generator - pg_version: ${{ vars.pg15_version }} + pg_version: ${{ needs.params.outputs.pg15_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} - make: check-query-generator - pg_version: ${{ vars.pg16_version }} + pg_version: ${{ needs.params.outputs.pg16_version }} suite: regress - image_name: ${{ vars.fail_test_image_name }} + image_name: ${{ needs.params.outputs.fail_test_image_name }} runs-on: ubuntu-20.04 container: - image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}" + image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}" options: --user root --dns=8.8.8.8 # Due to Github creates a default network for each job, we need to use # --dns= to have similar DNS settings as our other CI systems or local # machines. Otherwise, we may see different results. needs: + - params - build steps: - uses: actions/checkout@v3.5.0 @@ -212,19 +238,20 @@ jobs: name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }} runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"] container: - image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}" + image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build strategy: fail-fast: false matrix: image_name: - - ${{ vars.fail_test_image_name }} + - ${{ needs.params.outputs.fail_test_image_name }} pg_version: - - ${{ vars.pg14_version }} - - ${{ vars.pg15_version }} - - ${{ vars.pg16_version }} + - ${{ needs.params.outputs.pg14_version }} + - ${{ needs.params.outputs.pg15_version }} + - ${{ needs.params.outputs.pg16_version }} parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs steps: - uses: actions/checkout@v3.5.0 @@ -258,9 +285,10 @@ jobs: name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade runs-on: ubuntu-20.04 container: - image: "${{ vars.pgupgrade_image_name }}:${{ vars.upgrade_pg_versions }}${{ vars.image_suffix }}" + image: "${{ needs.params.outputs.pgupgrade_image_name }}:${{ needs.params.outputs.upgrade_pg_versions }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build strategy: fail-fast: false @@ -305,12 +333,13 @@ jobs: flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade codecov_token: ${{ secrets.CODECOV_TOKEN }} test-citus-upgrade: - name: PG${{ vars.pg14_version }} - check-citus-upgrade + name: PG${{ needs.params.outputs.pg14_version }} - check-citus-upgrade runs-on: ubuntu-20.04 container: - image: "${{ vars.citusupgrade_image_name }}:${{ vars.pg14_version }}${{ vars.image_suffix }}" + image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ needs.params.outputs.pg14_version }}${{ needs.params.outputs.image_suffix }}" options: --user root needs: + - params - build steps: - uses: actions/checkout@v3.5.0 @@ -354,8 +383,9 @@ jobs: CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }} runs-on: ubuntu-20.04 container: - image: ${{ vars.test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }} needs: + - params - test-citus - test-arbitrary-configs - test-citus-upgrade @@ -448,11 +478,12 @@ jobs: name: Test flakyness runs-on: ubuntu-20.04 container: - image: ${{ vars.fail_test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }} + image: ${{ needs.params.outputs.fail_test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }} options: --user root env: runs: 8 needs: + - params - build - test-flakyness-pre - prepare_parallelization_matrix_32 diff --git a/.github/workflows/packaging-test-pipelines.yml b/.github/workflows/packaging-test-pipelines.yml index 9d3fb81be..0fb4b7092 100644 --- a/.github/workflows/packaging-test-pipelines.yml +++ b/.github/workflows/packaging-test-pipelines.yml @@ -24,14 +24,14 @@ jobs: - name: Get Postgres Versions id: get-postgres-versions run: | - # Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command - # extracts the versions and get the unique values. - pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1` + # Postgres versions are stored in .github/workflows/build_and_test.yml file in "pg[pg-version]_version" + # format. Below command extracts the versions and get the unique values. + pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE 'pg[0-9]+_version: "[0-9.]+"' | sed -E 's/pg([0-9]+)_version: "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',') pg_versions_array="[ ${pg_versions} ]" echo "Supported PG Versions: ${pg_versions_array}" # Below line is needed to set the output variable to be used in the next job echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT - + shell: bash rpm_build_tests: name: rpm_build_tests needs: get_postgres_versions_from_file diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 1ac20c8bc..9b5768ee7 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -481,6 +481,7 @@ _PG_init(void) #endif InitializeMaintenanceDaemon(); + InitializeMaintenanceDaemonForMainDb(); /* initialize coordinated transaction management */ InitializeTransactionManagement(); @@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.main_db", + gettext_noop("Which database is designated as the main_db"), + NULL, + &MainDb, + "", + PGC_POSTMASTER, + GUC_STANDARD, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.max_adaptive_executor_pool_size", gettext_noop("Sets the maximum number of connections per worker node used by " diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 5f49de20a..22a0843bd 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -99,6 +99,7 @@ int Recover2PCInterval = 60000; int DeferShardDeleteInterval = 15000; int BackgroundTaskQueueCheckInterval = 5000; int MaxBackgroundTaskExecutors = 4; +char *MainDb = ""; /* config variables for metadata sync timeout */ int MetadataSyncInterval = 60000; @@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL; * activated. */ static HTAB *MaintenanceDaemonDBHash; - +static ErrorContextCallback errorCallback = { 0 }; static volatile sig_atomic_t got_SIGHUP = false; static volatile sig_atomic_t got_SIGTERM = false; @@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); +static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId, + bool *found); /* * InitializeMaintenanceDaemon, called at server start, is responsible for @@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void) } +/* + * GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the + * databaseId. It returns the entry if found or creates a new entry and initializes + * the value with zeroes. + */ +MaintenanceDaemonDBData * +GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found) +{ + MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( + MaintenanceDaemonDBHash, + &MyDatabaseId, + HASH_ENTER_NULL, + found); + + if (!dbData) + { + elog(LOG, + "cannot create or find the maintenance deamon hash entry for database %u", + databaseId); + return NULL; + } + + if (!*found) + { + /* ensure the values in MaintenanceDaemonDBData are zero */ + memset(((char *) dbData) + sizeof(Oid), 0, + sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); + } + + return dbData; +} + + +/* + * InitializeMaintenanceDaemonForMainDb is called in _PG_Init + * at which stage we are not in a transaction or have databaseOid + */ +void +InitializeMaintenanceDaemonForMainDb(void) +{ + if (strcmp(MainDb, "") == 0) + { + elog(LOG, "There is no designated Main database."); + return; + } + + BackgroundWorker worker; + + memset(&worker, 0, sizeof(worker)); + + + strcpy_s(worker.bgw_name, sizeof(worker.bgw_name), + "Citus Maintenance Daemon for Main DB"); + + /* request ability to connect to target database */ + worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + + /* + * No point in getting started before able to run query, but we do + * want to get started on Hot-Standby. + */ + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* Restart after a bit after errors, but don't bog the system. */ + worker.bgw_restart_time = 5; + strcpy_s(worker.bgw_library_name, + sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "CitusMaintenanceDaemonMain"); + + worker.bgw_main_arg = (Datum) 0; + + RegisterBackgroundWorker(&worker); +} + + /* * InitializeMaintenanceDaemonBackend, called at backend start and * configuration changes, is responsible for starting a per-database @@ -148,31 +227,20 @@ void InitializeMaintenanceDaemonBackend(void) { Oid extensionOwner = CitusExtensionOwner(); - bool found; + bool found = false; LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search( - MaintenanceDaemonDBHash, - &MyDatabaseId, - HASH_ENTER_NULL, - &found); + MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, + &found); if (dbData == NULL) { WarnMaintenanceDaemonNotStarted(); LWLockRelease(&MaintenanceDaemonControl->lock); - return; } - if (!found) - { - /* ensure the values in MaintenanceDaemonDBData are zero */ - memset(((char *) dbData) + sizeof(Oid), 0, - sizeof(MaintenanceDaemonDBData) - sizeof(Oid)); - } - if (IsMaintenanceDaemon) { /* @@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void) /* - * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll - * be started by the background worker infrastructure. If it errors out, - * it'll be restarted after a few seconds. + * ConnectToDatabase connects to the database for the given databaseOid. + * if databaseOid is 0, connects to MainDb and then creates a hash entry. + * If a hash entry cannot be created for MainDb it exits the process requesting a restart. + * However for regular databases, it exits without requesting a restart since another + * subsequent backend is expected to start the Maintenance Daemon. + * If the found hash entry has a valid workerPid, it exits + * without requesting a restart since there is already a daemon running. */ -void -CitusMaintenanceDaemonMain(Datum main_arg) +static MaintenanceDaemonDBData * +ConnectToDatabase(Oid databaseOid) { - Oid databaseOid = DatumGetObjectId(main_arg); - TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = - TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); - bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; - TimestampTz lastRecoveryTime = 0; - TimestampTz lastShardCleanTime = 0; - TimestampTz lastStatStatementsPurgeTime = 0; - TimestampTz nextMetadataSyncTime = 0; + MaintenanceDaemonDBData *myDbData = NULL; - /* state kept for the background tasks queue monitor */ - TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); - BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; - bool backgroundTasksQueueWarnedForLock = false; - /* - * We do metadata sync in a separate background worker. We need its - * handle to be able to check its status. - */ - BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + bool isMainDb = false; - /* - * Look up this worker's configuration. - */ LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE); - MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *) - hash_search(MaintenanceDaemonDBHash, &databaseOid, - HASH_FIND, NULL); - if (!myDbData) - { - /* - * When the database crashes, background workers are restarted, but - * the state in shared memory is lost. In that case, we exit and - * wait for a session to call InitializeMaintenanceDaemonBackend - * to properly add it to the hash. - */ - proc_exit(0); + if (databaseOid == 0) + { + char *databaseName = MainDb; + + /* + * Since we cannot query databaseOid without initializing Postgres + * first, connect to the database by name. + */ + BackgroundWorkerInitializeConnection(databaseName, NULL, 0); + + /* + * Now we have a valid MyDatabaseId. + * Insert the hash entry for the database to the Maintenance Deamon Hash. + */ + bool found = false; + + myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found); + + if (!myDbData) + { + /* + * If an entry cannot be created, + * return code of 1 requests worker restart + * Since BackgroundWorker for the MainDb is only registered + * once during server startup, we need to retry. + */ + proc_exit(1); + } + + if (found && myDbData->workerPid != 0) + { + /* Another maintenance daemon is running.*/ + + proc_exit(0); + } + + databaseOid = MyDatabaseId; + myDbData->userOid = GetSessionUserId(); + isMainDb = true; + } + else + { + myDbData = (MaintenanceDaemonDBData *) + hash_search(MaintenanceDaemonDBHash, &databaseOid, + HASH_FIND, NULL); + + if (!myDbData) + { + /* + * When the database crashes, background workers are restarted, but + * the state in shared memory is lost. In that case, we exit and + * wait for a session to call InitializeMaintenanceDaemonBackend + * to properly add it to the hash. + */ + + proc_exit(0); + } + + if (myDbData->workerPid != 0) + { + /* + * Another maintenance daemon is running. This usually happens because + * postgres restarts the daemon after an non-zero exit, and + * InitializeMaintenanceDaemonBackend started one before postgres did. + * In that case, the first one stays and the last one exits. + */ + + proc_exit(0); + } } - if (myDbData->workerPid != 0) - { - /* - * Another maintenance daemon is running. This usually happens because - * postgres restarts the daemon after an non-zero exit, and - * InitializeMaintenanceDaemonBackend started one before postgres did. - * In that case, the first one stays and the last one exits. - */ - - proc_exit(0); - } - - before_shmem_exit(MaintenanceDaemonShmemExit, main_arg); + before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid)); /* * Signal that I am the maintenance daemon now. @@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg) LWLockRelease(&MaintenanceDaemonControl->lock); - /* - * Setup error context so log messages can be properly attributed. Some of - * them otherwise sound like they might be from a normal user connection. - * Do so before setting up signals etc, so we never exit without the - * context setup. - */ - ErrorContextCallback errorCallback = { 0 }; memset(&errorCallback, 0, sizeof(errorCallback)); errorCallback.callback = MaintenanceDaemonErrorContext; errorCallback.arg = (void *) myDbData; errorCallback.previous = error_context_stack; error_context_stack = &errorCallback; - elog(LOG, "starting maintenance daemon on database %u user %u", databaseOid, myDbData->userOid); - /* connect to database, after that we can actually access catalogs */ - BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + if (!isMainDb) + { + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0); + } + + return myDbData; +} + + +/* + * CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll + * be started by the background worker infrastructure. If it errors out, + * it'll be restarted after a few seconds. + */ +void +CitusMaintenanceDaemonMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000); + bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false; + TimestampTz lastRecoveryTime = 0; + TimestampTz lastShardCleanTime = 0; + TimestampTz lastStatStatementsPurgeTime = 0; + TimestampTz nextMetadataSyncTime = 0; + + /* state kept for the background tasks queue monitor */ + TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp(); + BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL; + bool backgroundTasksQueueWarnedForLock = false; + + + /* + * We do metadata sync in a separate background worker. We need its + * handle to be able to check its status. + */ + BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + + MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid); /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); @@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg) * Terminate orphaned metadata sync daemons spawned from previously terminated * or crashed maintenanced instances. */ - SignalMetadataSyncDaemon(databaseOid, SIGTERM); + SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM); /* enter main loop */ while (!got_SIGTERM) @@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg) } -/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */ +/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/ static void MaintenanceDaemonSigTermHandler(SIGNAL_ARGS) { diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index de1e68883..07387a7fd 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -20,6 +20,7 @@ /* config variable for */ extern double DistributedDeadlockDetectionTimeoutFactor; +extern char *MainDb; extern void StopMaintenanceDaemon(Oid databaseId); extern void TriggerNodeMetadataSync(Oid databaseId); @@ -27,6 +28,7 @@ extern void InitializeMaintenanceDaemon(void); extern size_t MaintenanceDaemonShmemSize(void); extern void MaintenanceDaemonShmemInit(void); extern void InitializeMaintenanceDaemonBackend(void); +extern void InitializeMaintenanceDaemonForMainDb(void); extern bool LockCitusExtension(void); extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg); diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index efa9e310f..1d293e964 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -222,7 +222,7 @@ s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g # normalize a test difference in multi_move_mx -s/ connection to server at "\w+" \(127\.0\.0\.1\), port [0-9]+ failed://g +s/ connection to server at "\w+" (\(127\.0\.0\.1\)|\(::1\)), port [0-9]+ failed://g # normalize differences in tablespace of new index s/pg14\.idx.*/pg14\.xxxxx/g diff --git a/src/test/regress/citus_tests/common.py b/src/test/regress/citus_tests/common.py index 907102482..53c9c7944 100644 --- a/src/test/regress/citus_tests/common.py +++ b/src/test/regress/citus_tests/common.py @@ -453,6 +453,9 @@ def cleanup_test_leftovers(nodes): for node in nodes: node.cleanup_schemas() + for node in nodes: + node.cleanup_databases() + for node in nodes: node.cleanup_users() @@ -753,6 +756,7 @@ class Postgres(QueryRunner): self.subscriptions = set() self.publications = set() self.replication_slots = set() + self.databases = set() self.schemas = set() self.users = set() @@ -993,6 +997,10 @@ class Postgres(QueryRunner): args = sql.SQL("") self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args)) + def create_database(self, name): + self.databases.add(name) + self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name))) + def create_schema(self, name): self.schemas.add(name) self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name))) @@ -1020,6 +1028,12 @@ class Postgres(QueryRunner): for user in self.users: self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user))) + def cleanup_databases(self): + for database in self.databases: + self.sql( + sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database)) + ) + def cleanup_schemas(self): for schema in self.schemas: self.sql( diff --git a/src/test/regress/citus_tests/test/test_maintenancedeamon.py b/src/test/regress/citus_tests/test/test_maintenancedeamon.py new file mode 100644 index 000000000..3f6cb501e --- /dev/null +++ b/src/test/regress/citus_tests/test/test_maintenancedeamon.py @@ -0,0 +1,74 @@ +# This test checks that once citus.main_db is set and the +# server is restarted. A Citus Maintenance Daemon for the main_db +# is launched. This should happen even if there is no query run +# in main_db yet. +import time + + +def wait_until_maintenance_deamons_start(deamoncount, cluster): + i = 0 + n = 0 + + while i < 10: + i += 1 + n = cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';" + ) + + if n == deamoncount: + break + + time.sleep(0.1) + + assert n == deamoncount + + +def test_set_maindb(cluster_factory): + cluster = cluster_factory(0) + + # Test that once citus.main_db is set to a database name + # there are two maintenance deamons running upon restart. + # One maintenance deamon for the database of the current connection + # and one for the citus.main_db. + cluster.coordinator.create_database("mymaindb") + cluster.coordinator.configure("citus.main_db='mymaindb'") + cluster.coordinator.restart() + + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb" + + wait_until_maintenance_deamons_start(2, cluster) + + assert ( + cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';" + ) + == 1 + ) + + # Test that once citus.main_db is set to empty string + # there is only one maintenance deamon for the database + # of the current connection. + cluster.coordinator.configure("citus.main_db=''") + cluster.coordinator.restart() + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "" + + wait_until_maintenance_deamons_start(1, cluster) + + # Test that after citus.main_db is dropped. The maintenance + # deamon for this database is terminated. + cluster.coordinator.configure("citus.main_db='mymaindb'") + cluster.coordinator.restart() + assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb" + + wait_until_maintenance_deamons_start(2, cluster) + + cluster.coordinator.sql("DROP DATABASE mymaindb;") + + wait_until_maintenance_deamons_start(1, cluster) + + assert ( + cluster.coordinator.sql_value( + "SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';" + ) + == 0 + )