Merge branch 'main' into gokhangulbiz/disable-circleci

pull/7276/head
Jelte Fennema-Nio 2023-10-31 12:00:02 +01:00 committed by GitHub
commit 3665ef73b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 421 additions and 132 deletions

View File

@ -3,3 +3,31 @@
# actually also works when debugging with vscode. Providing nice tools
# to understand the internal datastructures we are working with.
source /root/gdbpg.py
# when debugging postgres it is convenient to _always_ have a breakpoint
# trigger when an error is logged. Because .gdbinit is sourced before gdb
# is fully attached and has the sources loaded. To make sure the breakpoint
# is added when the library is loaded we temporary set the breakpoint pending
# to on. After we have added out breakpoint we revert back to the default
# configuration for breakpoint pending.
# The breakpoint is hard to read, but at entry of the function we don't have
# the level loaded in elevel. Instead we hardcode the location where the
# level of the current error is stored. Also gdb doesn't understand the
# ERROR symbol so we hardcode this to the value of ERROR. It is very unlikely
# this value will ever change in postgres, but if it does we might need to
# find a way to conditionally load the correct breakpoint.
set breakpoint pending on
break elog.c:errfinish if errordata[errordata_stack_depth].elevel == 21
set breakpoint pending auto
echo \n
echo ----------------------------------------------------------------------------------\n
echo when attaching to a postgres backend a breakpoint will be set on elog.c:errfinish \n
echo it will only break on errors being raised in postgres \n
echo \n
echo to disable this breakpoint from vscode run `-exec disable 1` in the debug console \n
echo this assumes it's the first breakpoint loaded as it is loaded from .gdbinit \n
echo this can be verified with `-exec info break`, enabling can be done with \n
echo `-exec enable 1` \n
echo ----------------------------------------------------------------------------------\n
echo \n

View File

@ -13,10 +13,33 @@ on:
pull_request:
types: [opened, reopened,synchronize]
jobs:
# Since GHA does not interpolate env varibles in matrix context, we need to
# define them in a separate job and use them in other jobs.
params:
runs-on: ubuntu-latest
name: Initialize parameters
outputs:
build_image_name: "citus/extbuilder"
test_image_name: "citus/exttester"
citusupgrade_image_name: "citus/citusupgradetester"
fail_test_image_name: "citus/failtester"
pgupgrade_image_name: "citus/pgupgradetester"
style_checker_image_name: "citus/stylechecker"
style_checker_tools_version: "0.8.18"
image_suffix: "-v9d71045"
pg14_version: "14.9"
pg15_version: "15.4"
pg16_version: "16.0"
upgrade_pg_versions: "14.9-15.4-16.0"
steps:
# Since GHA jobs needs at least one step we use a noop step here.
- name: Set up parameters
run: echo 'noop'
check-sql-snapshots:
needs: params
runs-on: ubuntu-20.04
container:
image: ${{ vars.build_image_name }}:latest
image: ${{ needs.params.outputs.build_image_name }}:latest
options: --user root
steps:
- uses: actions/checkout@v3.5.0
@ -25,9 +48,10 @@ jobs:
git config --global --add safe.directory ${GITHUB_WORKSPACE}
ci/check_sql_snapshots.sh
check-style:
needs: params
runs-on: ubuntu-20.04
container:
image: ${{ vars.style_checker_image_name }}:${{ vars.style_checker_tools_version }}${{ vars.image_suffix }}
image: ${{ needs.params.outputs.style_checker_image_name }}:${{ needs.params.outputs.style_checker_tools_version }}${{ needs.params.outputs.image_suffix }}
steps:
- name: Check Snapshots
run: |
@ -68,18 +92,19 @@ jobs:
- name: Check for missing downgrade scripts
run: ci/check_migration_files.sh
build:
needs: params
name: Build for PG ${{ matrix.pg_version}}
strategy:
fail-fast: false
matrix:
image_name:
- ${{ vars.build_image_name }}
- ${{ needs.params.outputs.build_image_name }}
image_suffix:
- ${{ vars.image_suffix}}
- ${{ needs.params.outputs.image_suffix}}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
- ${{ needs.params.outputs.pg14_version }}
- ${{ needs.params.outputs.pg15_version }}
- ${{ needs.params.outputs.pg16_version }}
runs-on: ubuntu-20.04
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}"
@ -106,11 +131,11 @@ jobs:
suite:
- regress
image_name:
- ${{ vars.test_image_name }}
- ${{ needs.params.outputs.test_image_name }}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
- ${{ needs.params.outputs.pg14_version }}
- ${{ needs.params.outputs.pg15_version }}
- ${{ needs.params.outputs.pg16_version }}
make:
- check-split
- check-multi
@ -129,69 +154,70 @@ jobs:
- check-enterprise-isolation-logicalrep-3
include:
- make: check-failure
pg_version: ${{ vars.pg14_version }}
pg_version: ${{ needs.params.outputs.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-failure
pg_version: ${{ vars.pg15_version }}
pg_version: ${{ needs.params.outputs.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-failure
pg_version: ${{ vars.pg16_version }}
pg_version: ${{ needs.params.outputs.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg14_version }}
pg_version: ${{ needs.params.outputs.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg15_version }}
pg_version: ${{ needs.params.outputs.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-enterprise-failure
pg_version: ${{ vars.pg16_version }}
pg_version: ${{ needs.params.outputs.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg14_version }}
pg_version: ${{ needs.params.outputs.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg15_version }}
pg_version: ${{ needs.params.outputs.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-pytest
pg_version: ${{ vars.pg16_version }}
pg_version: ${{ needs.params.outputs.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: installcheck
suite: cdc
image_name: ${{ vars.test_image_name }}
pg_version: ${{ vars.pg15_version }}
image_name: ${{ needs.params.outputs.test_image_name }}
pg_version: ${{ needs.params.outputs.pg15_version }}
- make: installcheck
suite: cdc
image_name: ${{ vars.test_image_name }}
pg_version: ${{ vars.pg16_version }}
image_name: ${{ needs.params.outputs.test_image_name }}
pg_version: ${{ needs.params.outputs.pg16_version }}
- make: check-query-generator
pg_version: ${{ vars.pg14_version }}
pg_version: ${{ needs.params.outputs.pg14_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-query-generator
pg_version: ${{ vars.pg15_version }}
pg_version: ${{ needs.params.outputs.pg15_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
- make: check-query-generator
pg_version: ${{ vars.pg16_version }}
pg_version: ${{ needs.params.outputs.pg16_version }}
suite: regress
image_name: ${{ vars.fail_test_image_name }}
image_name: ${{ needs.params.outputs.fail_test_image_name }}
runs-on: ubuntu-20.04
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}"
options: --user root --dns=8.8.8.8
# Due to Github creates a default network for each job, we need to use
# --dns= to have similar DNS settings as our other CI systems or local
# machines. Otherwise, we may see different results.
needs:
- params
- build
steps:
- uses: actions/checkout@v3.5.0
@ -212,19 +238,20 @@ jobs:
name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }}
runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"]
container:
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ vars.image_suffix }}"
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}"
options: --user root
needs:
- params
- build
strategy:
fail-fast: false
matrix:
image_name:
- ${{ vars.fail_test_image_name }}
- ${{ needs.params.outputs.fail_test_image_name }}
pg_version:
- ${{ vars.pg14_version }}
- ${{ vars.pg15_version }}
- ${{ vars.pg16_version }}
- ${{ needs.params.outputs.pg14_version }}
- ${{ needs.params.outputs.pg15_version }}
- ${{ needs.params.outputs.pg16_version }}
parallel: [0,1,2,3,4,5] # workaround for running 6 parallel jobs
steps:
- uses: actions/checkout@v3.5.0
@ -258,9 +285,10 @@ jobs:
name: PG${{ matrix.old_pg_major }}-PG${{ matrix.new_pg_major }} - check-pg-upgrade
runs-on: ubuntu-20.04
container:
image: "${{ vars.pgupgrade_image_name }}:${{ vars.upgrade_pg_versions }}${{ vars.image_suffix }}"
image: "${{ needs.params.outputs.pgupgrade_image_name }}:${{ needs.params.outputs.upgrade_pg_versions }}${{ needs.params.outputs.image_suffix }}"
options: --user root
needs:
- params
- build
strategy:
fail-fast: false
@ -305,12 +333,13 @@ jobs:
flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade
codecov_token: ${{ secrets.CODECOV_TOKEN }}
test-citus-upgrade:
name: PG${{ vars.pg14_version }} - check-citus-upgrade
name: PG${{ needs.params.outputs.pg14_version }} - check-citus-upgrade
runs-on: ubuntu-20.04
container:
image: "${{ vars.citusupgrade_image_name }}:${{ vars.pg14_version }}${{ vars.image_suffix }}"
image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ needs.params.outputs.pg14_version }}${{ needs.params.outputs.image_suffix }}"
options: --user root
needs:
- params
- build
steps:
- uses: actions/checkout@v3.5.0
@ -354,8 +383,9 @@ jobs:
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
runs-on: ubuntu-20.04
container:
image: ${{ vars.test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
image: ${{ needs.params.outputs.test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }}
needs:
- params
- test-citus
- test-arbitrary-configs
- test-citus-upgrade
@ -448,11 +478,12 @@ jobs:
name: Test flakyness
runs-on: ubuntu-20.04
container:
image: ${{ vars.fail_test_image_name }}:${{ vars.pg16_version }}${{ vars.image_suffix }}
image: ${{ needs.params.outputs.fail_test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }}
options: --user root
env:
runs: 8
needs:
- params
- build
- test-flakyness-pre
- prepare_parallelization_matrix_32

View File

@ -24,14 +24,14 @@ jobs:
- name: Get Postgres Versions
id: get-postgres-versions
run: |
# Postgres versions are stored in .circleci/config.yml file in "build-[pg-version] format. Below command
# extracts the versions and get the unique values.
pg_versions=`grep -Eo 'build-[[:digit:]]{2}' .circleci/config.yml|sed -e "s/^build-//"|sort|uniq|tr '\n' ','| head -c -1`
# Postgres versions are stored in .github/workflows/build_and_test.yml file in "pg[pg-version]_version"
# format. Below command extracts the versions and get the unique values.
pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE 'pg[0-9]+_version: "[0-9.]+"' | sed -E 's/pg([0-9]+)_version: "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',')
pg_versions_array="[ ${pg_versions} ]"
echo "Supported PG Versions: ${pg_versions_array}"
# Below line is needed to set the output variable to be used in the next job
echo "pg_versions=${pg_versions_array}" >> $GITHUB_OUTPUT
shell: bash
rpm_build_tests:
name: rpm_build_tests
needs: get_postgres_versions_from_file

View File

@ -481,6 +481,7 @@ _PG_init(void)
#endif
InitializeMaintenanceDaemon();
InitializeMaintenanceDaemonForMainDb();
/* initialize coordinated transaction management */
InitializeTransactionManagement();
@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
NULL, NULL, NULL);
DefineCustomStringVariable(
"citus.main_db",
gettext_noop("Which database is designated as the main_db"),
NULL,
&MainDb,
"",
PGC_POSTMASTER,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.max_adaptive_executor_pool_size",
gettext_noop("Sets the maximum number of connections per worker node used by "

View File

@ -99,6 +99,7 @@ int Recover2PCInterval = 60000;
int DeferShardDeleteInterval = 15000;
int BackgroundTaskQueueCheckInterval = 5000;
int MaxBackgroundTaskExecutors = 4;
char *MainDb = "";
/* config variables for metadata sync timeout */
int MetadataSyncInterval = 60000;
@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
* activated.
*/
static HTAB *MaintenanceDaemonDBHash;
static ErrorContextCallback errorCallback = { 0 };
static volatile sig_atomic_t got_SIGHUP = false;
static volatile sig_atomic_t got_SIGTERM = false;
@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
static void MaintenanceDaemonErrorContext(void *arg);
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
static void WarnMaintenanceDaemonNotStarted(void);
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
bool *found);
/*
* InitializeMaintenanceDaemon, called at server start, is responsible for
@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void)
}
/*
* GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
* databaseId. It returns the entry if found or creates a new entry and initializes
* the value with zeroes.
*/
MaintenanceDaemonDBData *
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
{
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
found);
if (!dbData)
{
elog(LOG,
"cannot create or find the maintenance deamon hash entry for database %u",
databaseId);
return NULL;
}
if (!*found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}
return dbData;
}
/*
* InitializeMaintenanceDaemonForMainDb is called in _PG_Init
* at which stage we are not in a transaction or have databaseOid
*/
void
InitializeMaintenanceDaemonForMainDb(void)
{
if (strcmp(MainDb, "") == 0)
{
elog(LOG, "There is no designated Main database.");
return;
}
BackgroundWorker worker;
memset(&worker, 0, sizeof(worker));
strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
"Citus Maintenance Daemon for Main DB");
/* request ability to connect to target database */
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
/*
* No point in getting started before able to run query, but we do
* want to get started on Hot-Standby.
*/
worker.bgw_start_time = BgWorkerStart_ConsistentState;
/* Restart after a bit after errors, but don't bog the system. */
worker.bgw_restart_time = 5;
strcpy_s(worker.bgw_library_name,
sizeof(worker.bgw_library_name), "citus");
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
"CitusMaintenanceDaemonMain");
worker.bgw_main_arg = (Datum) 0;
RegisterBackgroundWorker(&worker);
}
/*
* InitializeMaintenanceDaemonBackend, called at backend start and
* configuration changes, is responsible for starting a per-database
@ -148,31 +227,20 @@ void
InitializeMaintenanceDaemonBackend(void)
{
Oid extensionOwner = CitusExtensionOwner();
bool found;
bool found = false;
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
MaintenanceDaemonDBHash,
&MyDatabaseId,
HASH_ENTER_NULL,
&found);
MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
&found);
if (dbData == NULL)
{
WarnMaintenanceDaemonNotStarted();
LWLockRelease(&MaintenanceDaemonControl->lock);
return;
}
if (!found)
{
/* ensure the values in MaintenanceDaemonDBData are zero */
memset(((char *) dbData) + sizeof(Oid), 0,
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
}
if (IsMaintenanceDaemon)
{
/*
@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void)
/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
* ConnectToDatabase connects to the database for the given databaseOid.
* if databaseOid is 0, connects to MainDb and then creates a hash entry.
* If a hash entry cannot be created for MainDb it exits the process requesting a restart.
* However for regular databases, it exits without requesting a restart since another
* subsequent backend is expected to start the Maintenance Daemon.
* If the found hash entry has a valid workerPid, it exits
* without requesting a restart since there is already a daemon running.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
static MaintenanceDaemonDBData *
ConnectToDatabase(Oid databaseOid)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;
MaintenanceDaemonDBData *myDbData = NULL;
/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;
/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
bool isMainDb = false;
/*
* Look up this worker's configuration.
*/
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)
{
/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
*/
proc_exit(0);
if (databaseOid == 0)
{
char *databaseName = MainDb;
/*
* Since we cannot query databaseOid without initializing Postgres
* first, connect to the database by name.
*/
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
/*
* Now we have a valid MyDatabaseId.
* Insert the hash entry for the database to the Maintenance Deamon Hash.
*/
bool found = false;
myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);
if (!myDbData)
{
/*
* If an entry cannot be created,
* return code of 1 requests worker restart
* Since BackgroundWorker for the MainDb is only registered
* once during server startup, we need to retry.
*/
proc_exit(1);
}
if (found && myDbData->workerPid != 0)
{
/* Another maintenance daemon is running.*/
proc_exit(0);
}
databaseOid = MyDatabaseId;
myDbData->userOid = GetSessionUserId();
isMainDb = true;
}
else
{
myDbData = (MaintenanceDaemonDBData *)
hash_search(MaintenanceDaemonDBHash, &databaseOid,
HASH_FIND, NULL);
if (!myDbData)
{
/*
* When the database crashes, background workers are restarted, but
* the state in shared memory is lost. In that case, we exit and
* wait for a session to call InitializeMaintenanceDaemonBackend
* to properly add it to the hash.
*/
proc_exit(0);
}
if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
*/
proc_exit(0);
}
}
if (myDbData->workerPid != 0)
{
/*
* Another maintenance daemon is running. This usually happens because
* postgres restarts the daemon after an non-zero exit, and
* InitializeMaintenanceDaemonBackend started one before postgres did.
* In that case, the first one stays and the last one exits.
*/
proc_exit(0);
}
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));
/*
* Signal that I am the maintenance daemon now.
@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg)
LWLockRelease(&MaintenanceDaemonControl->lock);
/*
* Setup error context so log messages can be properly attributed. Some of
* them otherwise sound like they might be from a normal user connection.
* Do so before setting up signals etc, so we never exit without the
* context setup.
*/
ErrorContextCallback errorCallback = { 0 };
memset(&errorCallback, 0, sizeof(errorCallback));
errorCallback.callback = MaintenanceDaemonErrorContext;
errorCallback.arg = (void *) myDbData;
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback;
elog(LOG, "starting maintenance daemon on database %u user %u",
databaseOid, myDbData->userOid);
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
if (!isMainDb)
{
/* connect to database, after that we can actually access catalogs */
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
}
return myDbData;
}
/*
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
* be started by the background worker infrastructure. If it errors out,
* it'll be restarted after a few seconds.
*/
void
CitusMaintenanceDaemonMain(Datum main_arg)
{
Oid databaseOid = DatumGetObjectId(main_arg);
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
TimestampTz lastRecoveryTime = 0;
TimestampTz lastShardCleanTime = 0;
TimestampTz lastStatStatementsPurgeTime = 0;
TimestampTz nextMetadataSyncTime = 0;
/* state kept for the background tasks queue monitor */
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
bool backgroundTasksQueueWarnedForLock = false;
/*
* We do metadata sync in a separate background worker. We need its
* handle to be able to check its status.
*/
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
/* make worker recognizable in pg_stat_activity */
pgstat_report_appname("Citus Maintenance Daemon");
@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
* Terminate orphaned metadata sync daemons spawned from previously terminated
* or crashed maintenanced instances.
*/
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);
/* enter main loop */
while (!got_SIGTERM)
@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
}
/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
static void
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
{

View File

@ -20,6 +20,7 @@
/* config variable for */
extern double DistributedDeadlockDetectionTimeoutFactor;
extern char *MainDb;
extern void StopMaintenanceDaemon(Oid databaseId);
extern void TriggerNodeMetadataSync(Oid databaseId);
@ -27,6 +28,7 @@ extern void InitializeMaintenanceDaemon(void);
extern size_t MaintenanceDaemonShmemSize(void);
extern void MaintenanceDaemonShmemInit(void);
extern void InitializeMaintenanceDaemonBackend(void);
extern void InitializeMaintenanceDaemonForMainDb(void);
extern bool LockCitusExtension(void);
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);

View File

@ -222,7 +222,7 @@ s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g
s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g
# normalize a test difference in multi_move_mx
s/ connection to server at "\w+" \(127\.0\.0\.1\), port [0-9]+ failed://g
s/ connection to server at "\w+" (\(127\.0\.0\.1\)|\(::1\)), port [0-9]+ failed://g
# normalize differences in tablespace of new index
s/pg14\.idx.*/pg14\.xxxxx/g

View File

@ -453,6 +453,9 @@ def cleanup_test_leftovers(nodes):
for node in nodes:
node.cleanup_schemas()
for node in nodes:
node.cleanup_databases()
for node in nodes:
node.cleanup_users()
@ -753,6 +756,7 @@ class Postgres(QueryRunner):
self.subscriptions = set()
self.publications = set()
self.replication_slots = set()
self.databases = set()
self.schemas = set()
self.users = set()
@ -993,6 +997,10 @@ class Postgres(QueryRunner):
args = sql.SQL("")
self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args))
def create_database(self, name):
self.databases.add(name)
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
def create_schema(self, name):
self.schemas.add(name)
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
@ -1020,6 +1028,12 @@ class Postgres(QueryRunner):
for user in self.users:
self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user)))
def cleanup_databases(self):
for database in self.databases:
self.sql(
sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database))
)
def cleanup_schemas(self):
for schema in self.schemas:
self.sql(

View File

@ -0,0 +1,74 @@
# This test checks that once citus.main_db is set and the
# server is restarted. A Citus Maintenance Daemon for the main_db
# is launched. This should happen even if there is no query run
# in main_db yet.
import time
def wait_until_maintenance_deamons_start(deamoncount, cluster):
i = 0
n = 0
while i < 10:
i += 1
n = cluster.coordinator.sql_value(
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';"
)
if n == deamoncount:
break
time.sleep(0.1)
assert n == deamoncount
def test_set_maindb(cluster_factory):
cluster = cluster_factory(0)
# Test that once citus.main_db is set to a database name
# there are two maintenance deamons running upon restart.
# One maintenance deamon for the database of the current connection
# and one for the citus.main_db.
cluster.coordinator.create_database("mymaindb")
cluster.coordinator.configure("citus.main_db='mymaindb'")
cluster.coordinator.restart()
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
wait_until_maintenance_deamons_start(2, cluster)
assert (
cluster.coordinator.sql_value(
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
)
== 1
)
# Test that once citus.main_db is set to empty string
# there is only one maintenance deamon for the database
# of the current connection.
cluster.coordinator.configure("citus.main_db=''")
cluster.coordinator.restart()
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == ""
wait_until_maintenance_deamons_start(1, cluster)
# Test that after citus.main_db is dropped. The maintenance
# deamon for this database is terminated.
cluster.coordinator.configure("citus.main_db='mymaindb'")
cluster.coordinator.restart()
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
wait_until_maintenance_deamons_start(2, cluster)
cluster.coordinator.sql("DROP DATABASE mymaindb;")
wait_until_maintenance_deamons_start(1, cluster)
assert (
cluster.coordinator.sql_value(
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
)
== 0
)