mirror of https://github.com/citusdata/citus.git
Merge remote-tracking branch 'origin/main' into flaky-1
commit
5967d27dc5
1128
.circleci/config.yml
1128
.circleci/config.yml
File diff suppressed because it is too large
Load Diff
|
@ -3,3 +3,31 @@
|
|||
# actually also works when debugging with vscode. Providing nice tools
|
||||
# to understand the internal datastructures we are working with.
|
||||
source /root/gdbpg.py
|
||||
|
||||
# when debugging postgres it is convenient to _always_ have a breakpoint
|
||||
# trigger when an error is logged. Because .gdbinit is sourced before gdb
|
||||
# is fully attached and has the sources loaded. To make sure the breakpoint
|
||||
# is added when the library is loaded we temporary set the breakpoint pending
|
||||
# to on. After we have added out breakpoint we revert back to the default
|
||||
# configuration for breakpoint pending.
|
||||
# The breakpoint is hard to read, but at entry of the function we don't have
|
||||
# the level loaded in elevel. Instead we hardcode the location where the
|
||||
# level of the current error is stored. Also gdb doesn't understand the
|
||||
# ERROR symbol so we hardcode this to the value of ERROR. It is very unlikely
|
||||
# this value will ever change in postgres, but if it does we might need to
|
||||
# find a way to conditionally load the correct breakpoint.
|
||||
set breakpoint pending on
|
||||
break elog.c:errfinish if errordata[errordata_stack_depth].elevel == 21
|
||||
set breakpoint pending auto
|
||||
|
||||
echo \n
|
||||
echo ----------------------------------------------------------------------------------\n
|
||||
echo when attaching to a postgres backend a breakpoint will be set on elog.c:errfinish \n
|
||||
echo it will only break on errors being raised in postgres \n
|
||||
echo \n
|
||||
echo to disable this breakpoint from vscode run `-exec disable 1` in the debug console \n
|
||||
echo this assumes it's the first breakpoint loaded as it is loaded from .gdbinit \n
|
||||
echo this can be verified with `-exec info break`, enabling can be done with \n
|
||||
echo `-exec enable 1` \n
|
||||
echo ----------------------------------------------------------------------------------\n
|
||||
echo \n
|
||||
|
|
|
@ -10,6 +10,10 @@ on:
|
|||
required: false
|
||||
default: false
|
||||
type: boolean
|
||||
push:
|
||||
branches:
|
||||
- "main"
|
||||
- "release-*"
|
||||
pull_request:
|
||||
types: [opened, reopened,synchronize]
|
||||
jobs:
|
||||
|
@ -27,9 +31,9 @@ jobs:
|
|||
style_checker_image_name: "citus/stylechecker"
|
||||
style_checker_tools_version: "0.8.18"
|
||||
image_suffix: "-v9d71045"
|
||||
pg14_version: "14.9"
|
||||
pg15_version: "15.4"
|
||||
pg16_version: "16.0"
|
||||
pg14_version: '{ "major": "14", "full": "14.9" }'
|
||||
pg15_version: '{ "major": "15", "full": "15.4" }'
|
||||
pg16_version: '{ "major": "16", "full": "16.0" }'
|
||||
upgrade_pg_versions: "14.9-15.4-16.0"
|
||||
steps:
|
||||
# Since GHA jobs needs at least one step we use a noop step here.
|
||||
|
@ -93,7 +97,7 @@ jobs:
|
|||
run: ci/check_migration_files.sh
|
||||
build:
|
||||
needs: params
|
||||
name: Build for PG ${{ matrix.pg_version}}
|
||||
name: Build for PG${{ fromJson(matrix.pg_version).major }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
|
@ -107,7 +111,7 @@ jobs:
|
|||
- ${{ needs.params.outputs.pg16_version }}
|
||||
runs-on: ubuntu-20.04
|
||||
container:
|
||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ matrix.image_suffix }}"
|
||||
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ matrix.image_suffix }}"
|
||||
options: --user root
|
||||
steps:
|
||||
- uses: actions/checkout@v3.5.0
|
||||
|
@ -124,7 +128,7 @@ jobs:
|
|||
./build-${{ env.PG_MAJOR }}/*
|
||||
./install-${{ env.PG_MAJOR }}.tar
|
||||
test-citus:
|
||||
name: PG${{ matrix.pg_version }} - ${{ matrix.make }}
|
||||
name: PG${{ fromJson(matrix.pg_version).major }} - ${{ matrix.make }}
|
||||
strategy:
|
||||
fail-fast: false
|
||||
matrix:
|
||||
|
@ -211,7 +215,7 @@ jobs:
|
|||
image_name: ${{ needs.params.outputs.fail_test_image_name }}
|
||||
runs-on: ubuntu-20.04
|
||||
container:
|
||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}"
|
||||
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||
options: --user root --dns=8.8.8.8
|
||||
# Due to Github creates a default network for each job, we need to use
|
||||
# --dns= to have similar DNS settings as our other CI systems or local
|
||||
|
@ -228,17 +232,17 @@ jobs:
|
|||
- uses: "./.github/actions/save_logs_and_results"
|
||||
if: always()
|
||||
with:
|
||||
folder: ${{ matrix.pg_version }}_${{ matrix.make }}
|
||||
folder: ${{ fromJson(matrix.pg_version).major }}_${{ matrix.make }}
|
||||
- uses: "./.github/actions/upload_coverage"
|
||||
if: always()
|
||||
with:
|
||||
flags: ${{ env.PG_MAJOR }}_${{ matrix.suite }}_${{ matrix.make }}
|
||||
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
||||
test-arbitrary-configs:
|
||||
name: PG${{ matrix.pg_version }} - check-arbitrary-configs-${{ matrix.parallel }}
|
||||
name: PG${{ fromJson(matrix.pg_version).major }} - check-arbitrary-configs-${{ matrix.parallel }}
|
||||
runs-on: ["self-hosted", "1ES.Pool=1es-gha-citusdata-pool"]
|
||||
container:
|
||||
image: "${{ matrix.image_name }}:${{ matrix.pg_version }}${{ needs.params.outputs.image_suffix }}"
|
||||
image: "${{ matrix.image_name }}:${{ fromJson(matrix.pg_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||
options: --user root
|
||||
needs:
|
||||
- params
|
||||
|
@ -333,10 +337,10 @@ jobs:
|
|||
flags: ${{ env.old_pg_major }}_${{ env.new_pg_major }}_upgrade
|
||||
codecov_token: ${{ secrets.CODECOV_TOKEN }}
|
||||
test-citus-upgrade:
|
||||
name: PG${{ needs.params.outputs.pg14_version }} - check-citus-upgrade
|
||||
name: PG${{ fromJson(needs.params.outputs.pg14_version).major }} - check-citus-upgrade
|
||||
runs-on: ubuntu-20.04
|
||||
container:
|
||||
image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ needs.params.outputs.pg14_version }}${{ needs.params.outputs.image_suffix }}"
|
||||
image: "${{ needs.params.outputs.citusupgrade_image_name }}:${{ fromJson(needs.params.outputs.pg14_version).full }}${{ needs.params.outputs.image_suffix }}"
|
||||
options: --user root
|
||||
needs:
|
||||
- params
|
||||
|
@ -383,7 +387,7 @@ jobs:
|
|||
CC_TEST_REPORTER_ID: ${{ secrets.CC_TEST_REPORTER_ID }}
|
||||
runs-on: ubuntu-20.04
|
||||
container:
|
||||
image: ${{ needs.params.outputs.test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }}
|
||||
image: ${{ needs.params.outputs.test_image_name }}:${{ fromJson(needs.params.outputs.pg16_version).full }}${{ needs.params.outputs.image_suffix }}
|
||||
needs:
|
||||
- params
|
||||
- test-citus
|
||||
|
@ -478,7 +482,7 @@ jobs:
|
|||
name: Test flakyness
|
||||
runs-on: ubuntu-20.04
|
||||
container:
|
||||
image: ${{ needs.params.outputs.fail_test_image_name }}:${{ needs.params.outputs.pg16_version }}${{ needs.params.outputs.image_suffix }}
|
||||
image: ${{ needs.params.outputs.fail_test_image_name }}:${{ fromJson(needs.params.outputs.pg16_version).full }}${{ needs.params.outputs.image_suffix }}
|
||||
options: --user root
|
||||
env:
|
||||
runs: 8
|
||||
|
|
|
@ -24,9 +24,11 @@ jobs:
|
|||
- name: Get Postgres Versions
|
||||
id: get-postgres-versions
|
||||
run: |
|
||||
# Postgres versions are stored in .github/workflows/build_and_test.yml file in "pg[pg-version]_version"
|
||||
# format. Below command extracts the versions and get the unique values.
|
||||
pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE 'pg[0-9]+_version: "[0-9.]+"' | sed -E 's/pg([0-9]+)_version: "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',')
|
||||
set -euxo pipefail
|
||||
# Postgres versions are stored in .github/workflows/build_and_test.yml
|
||||
# file in json strings with major and full keys.
|
||||
# Below command extracts the versions and get the unique values.
|
||||
pg_versions=$(cat .github/workflows/build_and_test.yml | grep -oE '"major": "[0-9]+", "full": "[0-9.]+"' | sed -E 's/"major": "([0-9]+)", "full": "([0-9.]+)"/\1/g' | sort | uniq | tr '\n', ',')
|
||||
pg_versions_array="[ ${pg_versions} ]"
|
||||
echo "Supported PG Versions: ${pg_versions_array}"
|
||||
# Below line is needed to set the output variable to be used in the next job
|
||||
|
|
|
@ -14,8 +14,8 @@ ci_scripts=$(
|
|||
grep -v -E '^(ci_helpers.sh|fix_style.sh)$'
|
||||
)
|
||||
for script in $ci_scripts; do
|
||||
if ! grep "\\bci/$script\\b" .circleci/config.yml > /dev/null; then
|
||||
echo "ERROR: CI script with name \"$script\" is not actually used in .circleci/config.yml"
|
||||
if ! grep "\\bci/$script\\b" -r .github > /dev/null; then
|
||||
echo "ERROR: CI script with name \"$script\" is not actually used in .github folder"
|
||||
exit 1
|
||||
fi
|
||||
if ! grep "^## \`$script\`\$" ci/README.md > /dev/null; then
|
||||
|
|
|
@ -1,96 +0,0 @@
|
|||
#!/bin/bash
|
||||
|
||||
# Testing this script locally requires you to set the following environment
|
||||
# variables:
|
||||
# CIRCLE_BRANCH, GIT_USERNAME and GIT_TOKEN
|
||||
|
||||
# fail if trying to reference a variable that is not set.
|
||||
set -u
|
||||
# exit immediately if a command fails
|
||||
set -e
|
||||
# Fail on pipe failures
|
||||
set -o pipefail
|
||||
|
||||
PR_BRANCH="${CIRCLE_BRANCH}"
|
||||
ENTERPRISE_REMOTE="https://${GIT_USERNAME}:${GIT_TOKEN}@github.com/citusdata/citus-enterprise"
|
||||
|
||||
# shellcheck disable=SC1091
|
||||
source ci/ci_helpers.sh
|
||||
|
||||
# List executed commands. This is done so debugging this script is easier when
|
||||
# it fails. It's explicitly done after git remote add so username and password
|
||||
# are not shown in CI output (even though it's also filtered out by CircleCI)
|
||||
set -x
|
||||
|
||||
check_compile () {
|
||||
echo "INFO: checking if merged code can be compiled"
|
||||
./configure --without-libcurl
|
||||
make -j10
|
||||
}
|
||||
|
||||
# Clone current git repo (which should be community) to a temporary working
|
||||
# directory and go there
|
||||
GIT_DIR_ROOT="$(git rev-parse --show-toplevel)"
|
||||
TMP_GIT_DIR="$(mktemp --directory -t citus-merge-check.XXXXXXXXX)"
|
||||
git clone "$GIT_DIR_ROOT" "$TMP_GIT_DIR"
|
||||
cd "$TMP_GIT_DIR"
|
||||
|
||||
# Fails in CI without this
|
||||
git config user.email "citus-bot@microsoft.com"
|
||||
git config user.name "citus bot"
|
||||
|
||||
# Disable "set -x" temporarily, because $ENTERPRISE_REMOTE contains passwords
|
||||
{ set +x ; } 2> /dev/null
|
||||
git remote add enterprise "$ENTERPRISE_REMOTE"
|
||||
set -x
|
||||
|
||||
git remote set-url --push enterprise no-pushing
|
||||
|
||||
# Fetch enterprise-master
|
||||
git fetch enterprise enterprise-master
|
||||
|
||||
|
||||
git checkout "enterprise/enterprise-master"
|
||||
|
||||
if git merge --no-commit "origin/$PR_BRANCH"; then
|
||||
echo "INFO: community PR branch could be merged into enterprise-master"
|
||||
# check that we can compile after the merge
|
||||
if check_compile; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "WARN: Failed to compile after community PR branch was merged into enterprise"
|
||||
fi
|
||||
|
||||
# undo partial merge
|
||||
git merge --abort
|
||||
|
||||
# If we have a conflict on enterprise merge on the master branch, we have a problem.
|
||||
# Provide an error message to indicate that enterprise merge is needed to fix this check.
|
||||
if [[ $PR_BRANCH = master ]]; then
|
||||
echo "ERROR: Master branch has merge conflicts with enterprise-master."
|
||||
echo "Try re-running this CI job after merging your changes into enterprise-master."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! git fetch enterprise "$PR_BRANCH" ; then
|
||||
echo "ERROR: enterprise/$PR_BRANCH was not found and community PR branch could not be merged into enterprise-master"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Show the top commit of the enterprise PR branch to make debugging easier
|
||||
git log -n 1 "enterprise/$PR_BRANCH"
|
||||
|
||||
# Check that this branch contains the top commit of the current community PR
|
||||
# branch. If it does not it means it's not up to date with the current PR, so
|
||||
# the enterprise branch should be updated.
|
||||
if ! git merge-base --is-ancestor "origin/$PR_BRANCH" "enterprise/$PR_BRANCH" ; then
|
||||
echo "ERROR: enterprise/$PR_BRANCH is not up to date with community PR branch"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Now check if we can merge the enterprise PR into enterprise-master without
|
||||
# issues.
|
||||
git merge --no-commit "enterprise/$PR_BRANCH"
|
||||
# check that we can compile after the merge
|
||||
check_compile
|
|
@ -24,6 +24,7 @@
|
|||
#include "access/sysattr.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/index.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/pg_authid.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
|
@ -88,11 +89,11 @@ static uint64 * AllocateUint64(uint64 value);
|
|||
static void RecordDistributedRelationDependencies(Oid distributedRelationId);
|
||||
static GroupShardPlacement * TupleToGroupShardPlacement(TupleDesc tupleDesc,
|
||||
HeapTuple heapTuple);
|
||||
static bool DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||
bool failOnError, uint64 *tableSize);
|
||||
static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
SizeQueryType sizeQueryType, bool failOnError,
|
||||
uint64 *tableSize);
|
||||
static bool DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||
bool failOnError, uint64 *relationSize);
|
||||
static bool DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
SizeQueryType sizeQueryType, bool failOnError,
|
||||
uint64 *relationSize);
|
||||
static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId);
|
||||
static char * GenerateShardIdNameValuesForShardList(List *shardIntervalList,
|
||||
bool firstValue);
|
||||
|
@ -282,7 +283,7 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
|
|||
|
||||
|
||||
/*
|
||||
* citus_total_relation_size accepts a table name and returns a distributed table
|
||||
* citus_total_relation_size accepts a distributed table name and returns a distributed table
|
||||
* and its indexes' total relation size.
|
||||
*/
|
||||
Datum
|
||||
|
@ -294,20 +295,20 @@ citus_total_relation_size(PG_FUNCTION_ARGS)
|
|||
bool failOnError = PG_GETARG_BOOL(1);
|
||||
|
||||
SizeQueryType sizeQueryType = TOTAL_RELATION_SIZE;
|
||||
uint64 tableSize = 0;
|
||||
uint64 relationSize = 0;
|
||||
|
||||
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
|
||||
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
PG_RETURN_INT64(relationSize);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_table_size accepts a table name and returns a distributed table's total
|
||||
* citus_table_size accepts a distributed table name and returns a distributed table's total
|
||||
* relation size.
|
||||
*/
|
||||
Datum
|
||||
|
@ -318,21 +319,24 @@ citus_table_size(PG_FUNCTION_ARGS)
|
|||
Oid relationId = PG_GETARG_OID(0);
|
||||
bool failOnError = true;
|
||||
SizeQueryType sizeQueryType = TABLE_SIZE;
|
||||
uint64 tableSize = 0;
|
||||
uint64 relationSize = 0;
|
||||
|
||||
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &tableSize))
|
||||
/* We do not check if relation is really a table, like PostgreSQL is doing. */
|
||||
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
}
|
||||
|
||||
PG_RETURN_INT64(tableSize);
|
||||
PG_RETURN_INT64(relationSize);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_relation_size accept a table name and returns a relation's 'main'
|
||||
* citus_relation_size accept a distributed relation name and returns a relation's 'main'
|
||||
* fork's size.
|
||||
*
|
||||
* Input relation is allowed to be an index on a distributed table too.
|
||||
*/
|
||||
Datum
|
||||
citus_relation_size(PG_FUNCTION_ARGS)
|
||||
|
@ -344,7 +348,7 @@ citus_relation_size(PG_FUNCTION_ARGS)
|
|||
SizeQueryType sizeQueryType = RELATION_SIZE;
|
||||
uint64 relationSize = 0;
|
||||
|
||||
if (!DistributedTableSize(relationId, sizeQueryType, failOnError, &relationSize))
|
||||
if (!DistributedRelationSize(relationId, sizeQueryType, failOnError, &relationSize))
|
||||
{
|
||||
Assert(!failOnError);
|
||||
PG_RETURN_NULL();
|
||||
|
@ -506,13 +510,16 @@ ReceiveShardIdAndSizeResults(List *connectionList, Tuplestorestate *tupleStore,
|
|||
|
||||
|
||||
/*
|
||||
* DistributedTableSize is helper function for each kind of citus size functions.
|
||||
* It first checks whether the table is distributed and size query can be run on
|
||||
* it. Connection to each node has to be established to get the size of the table.
|
||||
* DistributedRelationSize is helper function for each kind of citus size
|
||||
* functions. It first checks whether the relation is a distributed table or an
|
||||
* index belonging to a distributed table and size query can be run on it.
|
||||
* Connection to each node has to be established to get the size of the
|
||||
* relation.
|
||||
* Input relation is allowed to be an index on a distributed table too.
|
||||
*/
|
||||
static bool
|
||||
DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnError,
|
||||
uint64 *tableSize)
|
||||
DistributedRelationSize(Oid relationId, SizeQueryType sizeQueryType,
|
||||
bool failOnError, uint64 *relationSize)
|
||||
{
|
||||
int logLevel = WARNING;
|
||||
|
||||
|
@ -538,7 +545,7 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
|
|||
if (relation == NULL)
|
||||
{
|
||||
ereport(logLevel,
|
||||
(errmsg("could not compute table size: relation does not exist")));
|
||||
(errmsg("could not compute relation size: relation does not exist")));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
@ -553,8 +560,9 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
|
|||
{
|
||||
uint64 relationSizeOnNode = 0;
|
||||
|
||||
bool gotSize = DistributedTableSizeOnWorker(workerNode, relationId, sizeQueryType,
|
||||
failOnError, &relationSizeOnNode);
|
||||
bool gotSize = DistributedRelationSizeOnWorker(workerNode, relationId,
|
||||
sizeQueryType,
|
||||
failOnError, &relationSizeOnNode);
|
||||
if (!gotSize)
|
||||
{
|
||||
return false;
|
||||
|
@ -563,21 +571,22 @@ DistributedTableSize(Oid relationId, SizeQueryType sizeQueryType, bool failOnErr
|
|||
sumOfSizes += relationSizeOnNode;
|
||||
}
|
||||
|
||||
*tableSize = sumOfSizes;
|
||||
*relationSize = sumOfSizes;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributedTableSizeOnWorker gets the workerNode and relationId to calculate
|
||||
* DistributedRelationSizeOnWorker gets the workerNode and relationId to calculate
|
||||
* size of that relation on the given workerNode by summing up the size of each
|
||||
* shard placement.
|
||||
* Input relation is allowed to be an index on a distributed table too.
|
||||
*/
|
||||
static bool
|
||||
DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
SizeQueryType sizeQueryType,
|
||||
bool failOnError, uint64 *tableSize)
|
||||
DistributedRelationSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
||||
SizeQueryType sizeQueryType,
|
||||
bool failOnError, uint64 *relationSize)
|
||||
{
|
||||
int logLevel = WARNING;
|
||||
|
||||
|
@ -591,6 +600,17 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
|||
uint32 connectionFlag = 0;
|
||||
PGresult *result = NULL;
|
||||
|
||||
/* if the relation is an index, update relationId and define indexId */
|
||||
Oid indexId = InvalidOid;
|
||||
Oid relKind = get_rel_relkind(relationId);
|
||||
if (relKind == RELKIND_INDEX || relKind == RELKIND_PARTITIONED_INDEX)
|
||||
{
|
||||
indexId = relationId;
|
||||
|
||||
bool missingOk = false;
|
||||
relationId = IndexGetRelation(indexId, missingOk);
|
||||
}
|
||||
|
||||
List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId);
|
||||
|
||||
/*
|
||||
|
@ -598,21 +618,22 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
|||
* But citus size functions shouldn't include them, like PG.
|
||||
*/
|
||||
bool optimizePartitionCalculations = false;
|
||||
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(
|
||||
StringInfo relationSizeQuery = GenerateSizeQueryOnMultiplePlacements(
|
||||
shardIntervalsOnNode,
|
||||
indexId,
|
||||
sizeQueryType,
|
||||
optimizePartitionCalculations);
|
||||
|
||||
MultiConnection *connection = GetNodeConnection(connectionFlag, workerNodeName,
|
||||
workerNodePort);
|
||||
int queryResult = ExecuteOptionalRemoteCommand(connection, tableSizeQuery->data,
|
||||
int queryResult = ExecuteOptionalRemoteCommand(connection, relationSizeQuery->data,
|
||||
&result);
|
||||
|
||||
if (queryResult != 0)
|
||||
{
|
||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("could not connect to %s:%d to get size of "
|
||||
"table \"%s\"",
|
||||
"relation \"%s\"",
|
||||
workerNodeName, workerNodePort,
|
||||
get_rel_name(relationId))));
|
||||
|
||||
|
@ -626,19 +647,19 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
|||
ClearResults(connection, failOnError);
|
||||
|
||||
ereport(logLevel, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||
errmsg("cannot parse size of table \"%s\" from %s:%d",
|
||||
errmsg("cannot parse size of relation \"%s\" from %s:%d",
|
||||
get_rel_name(relationId), workerNodeName,
|
||||
workerNodePort)));
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
StringInfo tableSizeStringInfo = (StringInfo) linitial(sizeList);
|
||||
char *tableSizeString = tableSizeStringInfo->data;
|
||||
StringInfo relationSizeStringInfo = (StringInfo) linitial(sizeList);
|
||||
char *relationSizeString = relationSizeStringInfo->data;
|
||||
|
||||
if (strlen(tableSizeString) > 0)
|
||||
if (strlen(relationSizeString) > 0)
|
||||
{
|
||||
*tableSize = SafeStringToUint64(tableSizeString);
|
||||
*relationSize = SafeStringToUint64(relationSizeString);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -647,7 +668,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId,
|
|||
* being executed. For this case we get an empty string as table size.
|
||||
* We can take that as zero to prevent any unnecessary errors.
|
||||
*/
|
||||
*tableSize = 0;
|
||||
*relationSize = 0;
|
||||
}
|
||||
|
||||
PQclear(result);
|
||||
|
@ -732,7 +753,7 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
|
|||
|
||||
/*
|
||||
* GenerateSizeQueryOnMultiplePlacements generates a select size query to get
|
||||
* size of multiple tables. Note that, different size functions supported by PG
|
||||
* size of multiple relations. Note that, different size functions supported by PG
|
||||
* are also supported by this function changing the size query type given as the
|
||||
* last parameter to function. Depending on the sizeQueryType enum parameter, the
|
||||
* generated query will call one of the functions: pg_relation_size,
|
||||
|
@ -740,9 +761,13 @@ ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
|
|||
* This function uses UDFs named worker_partitioned_*_size for partitioned tables,
|
||||
* if the parameter optimizePartitionCalculations is true. The UDF to be called is
|
||||
* determined by the parameter sizeQueryType.
|
||||
*
|
||||
* indexId is provided if we're interested in the size of an index, not the whole
|
||||
* table.
|
||||
*/
|
||||
StringInfo
|
||||
GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||
Oid indexId,
|
||||
SizeQueryType sizeQueryType,
|
||||
bool optimizePartitionCalculations)
|
||||
{
|
||||
|
@ -766,16 +791,20 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
|||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
/* we need to build the shard relation name, being an index or table */
|
||||
Oid objectId = OidIsValid(indexId) ? indexId : shardInterval->relationId;
|
||||
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid schemaId = get_rel_namespace(shardInterval->relationId);
|
||||
Oid schemaId = get_rel_namespace(objectId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *shardName = get_rel_name(shardInterval->relationId);
|
||||
char *shardName = get_rel_name(objectId);
|
||||
AppendShardIdToName(&shardName, shardId);
|
||||
|
||||
char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||
char *quotedShardName = quote_literal_cstr(shardQualifiedName);
|
||||
|
||||
/* for partitoned tables, we will call worker_partitioned_... size functions */
|
||||
/* for partitioned tables, we will call worker_partitioned_... size functions */
|
||||
if (optimizePartitionCalculations && PartitionedTable(shardInterval->relationId))
|
||||
{
|
||||
partitionedShardNames = lappend(partitionedShardNames, quotedShardName);
|
||||
|
@ -1010,7 +1039,7 @@ AppendShardIdNameValues(StringInfo selectQuery, ShardInterval *shardInterval)
|
|||
|
||||
|
||||
/*
|
||||
* ErrorIfNotSuitableToGetSize determines whether the table is suitable to find
|
||||
* ErrorIfNotSuitableToGetSize determines whether the relation is suitable to find
|
||||
* its' size with internal functions.
|
||||
*/
|
||||
static void
|
||||
|
@ -1018,11 +1047,32 @@ ErrorIfNotSuitableToGetSize(Oid relationId)
|
|||
{
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
char *escapedQueryString = quote_literal_cstr(relationName);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg("cannot calculate the size because relation %s is not "
|
||||
"distributed", escapedQueryString)));
|
||||
Oid relKind = get_rel_relkind(relationId);
|
||||
if (relKind != RELKIND_INDEX && relKind != RELKIND_PARTITIONED_INDEX)
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
char *escapedRelationName = quote_literal_cstr(relationName);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||
errmsg(
|
||||
"cannot calculate the size because relation %s "
|
||||
"is not distributed",
|
||||
escapedRelationName)));
|
||||
}
|
||||
bool missingOk = false;
|
||||
Oid indexId = relationId;
|
||||
relationId = IndexGetRelation(relationId, missingOk);
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
char *tableName = get_rel_name(relationId);
|
||||
char *escapedTableName = quote_literal_cstr(tableName);
|
||||
char *indexName = get_rel_name(indexId);
|
||||
char *escapedIndexName = quote_literal_cstr(indexName);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_OBJECT_DEFINITION),
|
||||
errmsg(
|
||||
"cannot calculate the size because table %s for "
|
||||
"index %s is not distributed",
|
||||
escapedTableName, escapedIndexName)));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -792,7 +792,12 @@ ShardListSizeInBytes(List *shardList, char *workerNodeName, uint32
|
|||
|
||||
/* we skip child tables of a partitioned table if this boolean variable is true */
|
||||
bool optimizePartitionCalculations = true;
|
||||
|
||||
/* we're interested in whole table, not a particular index */
|
||||
Oid indexId = InvalidOid;
|
||||
|
||||
StringInfo tableSizeQuery = GenerateSizeQueryOnMultiplePlacements(shardList,
|
||||
indexId,
|
||||
TOTAL_RELATION_SIZE,
|
||||
optimizePartitionCalculations);
|
||||
|
||||
|
|
|
@ -481,6 +481,7 @@ _PG_init(void)
|
|||
#endif
|
||||
|
||||
InitializeMaintenanceDaemon();
|
||||
InitializeMaintenanceDaemonForMainDb();
|
||||
|
||||
/* initialize coordinated transaction management */
|
||||
InitializeTransactionManagement();
|
||||
|
@ -1820,6 +1821,16 @@ RegisterCitusConfigVariables(void)
|
|||
GUC_NO_SHOW_ALL | GUC_NOT_IN_SAMPLE | GUC_UNIT_MS,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomStringVariable(
|
||||
"citus.main_db",
|
||||
gettext_noop("Which database is designated as the main_db"),
|
||||
NULL,
|
||||
&MainDb,
|
||||
"",
|
||||
PGC_POSTMASTER,
|
||||
GUC_STANDARD,
|
||||
NULL, NULL, NULL);
|
||||
|
||||
DefineCustomIntVariable(
|
||||
"citus.max_adaptive_executor_pool_size",
|
||||
gettext_noop("Sets the maximum number of connections per worker node used by "
|
||||
|
|
|
@ -90,6 +90,28 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsMetadataSynced checks the workers to see if all workers with metadata are
|
||||
* synced.
|
||||
*/
|
||||
static bool
|
||||
IsMetadataSynced(void)
|
||||
{
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
{
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* wait_until_metadata_sync waits until the maintenance daemon does a metadata
|
||||
* sync, or times out.
|
||||
|
@ -99,19 +121,10 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
{
|
||||
uint32 timeout = PG_GETARG_UINT32(0);
|
||||
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
bool waitNotifications = false;
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
{
|
||||
/* if already has metadata, no need to do it again */
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
waitNotifications = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
/* First we start listening. */
|
||||
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
|
||||
LOCAL_HOST_NAME, PostPortNumber);
|
||||
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
|
||||
|
||||
/*
|
||||
* If all the metadata nodes have already been synced, we should not wait.
|
||||
|
@ -119,15 +132,12 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
* the notification and we'd wait unnecessarily here. Worse, the test outputs
|
||||
* might be inconsistent across executions due to the warning.
|
||||
*/
|
||||
if (!waitNotifications)
|
||||
if (IsMetadataSynced())
|
||||
{
|
||||
CloseConnection(connection);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
|
||||
LOCAL_HOST_NAME, PostPortNumber);
|
||||
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
|
||||
|
||||
int waitFlags = WL_SOCKET_READABLE | WL_TIMEOUT | WL_POSTMASTER_DEATH;
|
||||
int waitResult = WaitLatchOrSocket(NULL, waitFlags, PQsocket(connection->pgConn),
|
||||
timeout, 0);
|
||||
|
@ -139,7 +149,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
{
|
||||
ClearResults(connection, true);
|
||||
}
|
||||
else if (waitResult & WL_TIMEOUT)
|
||||
else if (waitResult & WL_TIMEOUT && !IsMetadataSynced())
|
||||
{
|
||||
elog(WARNING, "waiting for metadata sync timed out");
|
||||
}
|
||||
|
|
|
@ -99,6 +99,7 @@ int Recover2PCInterval = 60000;
|
|||
int DeferShardDeleteInterval = 15000;
|
||||
int BackgroundTaskQueueCheckInterval = 5000;
|
||||
int MaxBackgroundTaskExecutors = 4;
|
||||
char *MainDb = "";
|
||||
|
||||
/* config variables for metadata sync timeout */
|
||||
int MetadataSyncInterval = 60000;
|
||||
|
@ -112,7 +113,7 @@ static MaintenanceDaemonControlData *MaintenanceDaemonControl = NULL;
|
|||
* activated.
|
||||
*/
|
||||
static HTAB *MaintenanceDaemonDBHash;
|
||||
|
||||
static ErrorContextCallback errorCallback = { 0 };
|
||||
static volatile sig_atomic_t got_SIGHUP = false;
|
||||
static volatile sig_atomic_t got_SIGTERM = false;
|
||||
|
||||
|
@ -125,6 +126,8 @@ static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
|||
static void MaintenanceDaemonErrorContext(void *arg);
|
||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||
static void WarnMaintenanceDaemonNotStarted(void);
|
||||
static MaintenanceDaemonDBData * GetMaintenanceDaemonDBHashEntry(Oid databaseId,
|
||||
bool *found);
|
||||
|
||||
/*
|
||||
* InitializeMaintenanceDaemon, called at server start, is responsible for
|
||||
|
@ -139,6 +142,82 @@ InitializeMaintenanceDaemon(void)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetMaintenanceDaemonDBHashEntry searches the MaintenanceDaemonDBHash for the
|
||||
* databaseId. It returns the entry if found or creates a new entry and initializes
|
||||
* the value with zeroes.
|
||||
*/
|
||||
MaintenanceDaemonDBData *
|
||||
GetMaintenanceDaemonDBHashEntry(Oid databaseId, bool *found)
|
||||
{
|
||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||
MaintenanceDaemonDBHash,
|
||||
&MyDatabaseId,
|
||||
HASH_ENTER_NULL,
|
||||
found);
|
||||
|
||||
if (!dbData)
|
||||
{
|
||||
elog(LOG,
|
||||
"cannot create or find the maintenance deamon hash entry for database %u",
|
||||
databaseId);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (!*found)
|
||||
{
|
||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||
memset(((char *) dbData) + sizeof(Oid), 0,
|
||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
||||
}
|
||||
|
||||
return dbData;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeMaintenanceDaemonForMainDb is called in _PG_Init
|
||||
* at which stage we are not in a transaction or have databaseOid
|
||||
*/
|
||||
void
|
||||
InitializeMaintenanceDaemonForMainDb(void)
|
||||
{
|
||||
if (strcmp(MainDb, "") == 0)
|
||||
{
|
||||
elog(LOG, "There is no designated Main database.");
|
||||
return;
|
||||
}
|
||||
|
||||
BackgroundWorker worker;
|
||||
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
|
||||
|
||||
strcpy_s(worker.bgw_name, sizeof(worker.bgw_name),
|
||||
"Citus Maintenance Daemon for Main DB");
|
||||
|
||||
/* request ability to connect to target database */
|
||||
worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
|
||||
/*
|
||||
* No point in getting started before able to run query, but we do
|
||||
* want to get started on Hot-Standby.
|
||||
*/
|
||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
|
||||
/* Restart after a bit after errors, but don't bog the system. */
|
||||
worker.bgw_restart_time = 5;
|
||||
strcpy_s(worker.bgw_library_name,
|
||||
sizeof(worker.bgw_library_name), "citus");
|
||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||
"CitusMaintenanceDaemonMain");
|
||||
|
||||
worker.bgw_main_arg = (Datum) 0;
|
||||
|
||||
RegisterBackgroundWorker(&worker);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InitializeMaintenanceDaemonBackend, called at backend start and
|
||||
* configuration changes, is responsible for starting a per-database
|
||||
|
@ -148,31 +227,20 @@ void
|
|||
InitializeMaintenanceDaemonBackend(void)
|
||||
{
|
||||
Oid extensionOwner = CitusExtensionOwner();
|
||||
bool found;
|
||||
bool found = false;
|
||||
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
MaintenanceDaemonDBData *dbData = (MaintenanceDaemonDBData *) hash_search(
|
||||
MaintenanceDaemonDBHash,
|
||||
&MyDatabaseId,
|
||||
HASH_ENTER_NULL,
|
||||
&found);
|
||||
MaintenanceDaemonDBData *dbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId,
|
||||
&found);
|
||||
|
||||
if (dbData == NULL)
|
||||
{
|
||||
WarnMaintenanceDaemonNotStarted();
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* ensure the values in MaintenanceDaemonDBData are zero */
|
||||
memset(((char *) dbData) + sizeof(Oid), 0,
|
||||
sizeof(MaintenanceDaemonDBData) - sizeof(Oid));
|
||||
}
|
||||
|
||||
if (IsMaintenanceDaemon)
|
||||
{
|
||||
/*
|
||||
|
@ -271,66 +339,97 @@ WarnMaintenanceDaemonNotStarted(void)
|
|||
|
||||
|
||||
/*
|
||||
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
||||
* be started by the background worker infrastructure. If it errors out,
|
||||
* it'll be restarted after a few seconds.
|
||||
* ConnectToDatabase connects to the database for the given databaseOid.
|
||||
* if databaseOid is 0, connects to MainDb and then creates a hash entry.
|
||||
* If a hash entry cannot be created for MainDb it exits the process requesting a restart.
|
||||
* However for regular databases, it exits without requesting a restart since another
|
||||
* subsequent backend is expected to start the Maintenance Daemon.
|
||||
* If the found hash entry has a valid workerPid, it exits
|
||||
* without requesting a restart since there is already a daemon running.
|
||||
*/
|
||||
void
|
||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||
static MaintenanceDaemonDBData *
|
||||
ConnectToDatabase(Oid databaseOid)
|
||||
{
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz lastShardCleanTime = 0;
|
||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
MaintenanceDaemonDBData *myDbData = NULL;
|
||||
|
||||
/* state kept for the background tasks queue monitor */
|
||||
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
||||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||
bool backgroundTasksQueueWarnedForLock = false;
|
||||
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
bool isMainDb = false;
|
||||
|
||||
/*
|
||||
* Look up this worker's configuration.
|
||||
*/
|
||||
LWLockAcquire(&MaintenanceDaemonControl->lock, LW_EXCLUSIVE);
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = (MaintenanceDaemonDBData *)
|
||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||
HASH_FIND, NULL);
|
||||
if (!myDbData)
|
||||
{
|
||||
/*
|
||||
* When the database crashes, background workers are restarted, but
|
||||
* the state in shared memory is lost. In that case, we exit and
|
||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||
* to properly add it to the hash.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
if (databaseOid == 0)
|
||||
{
|
||||
char *databaseName = MainDb;
|
||||
|
||||
/*
|
||||
* Since we cannot query databaseOid without initializing Postgres
|
||||
* first, connect to the database by name.
|
||||
*/
|
||||
BackgroundWorkerInitializeConnection(databaseName, NULL, 0);
|
||||
|
||||
/*
|
||||
* Now we have a valid MyDatabaseId.
|
||||
* Insert the hash entry for the database to the Maintenance Deamon Hash.
|
||||
*/
|
||||
bool found = false;
|
||||
|
||||
myDbData = GetMaintenanceDaemonDBHashEntry(MyDatabaseId, &found);
|
||||
|
||||
if (!myDbData)
|
||||
{
|
||||
/*
|
||||
* If an entry cannot be created,
|
||||
* return code of 1 requests worker restart
|
||||
* Since BackgroundWorker for the MainDb is only registered
|
||||
* once during server startup, we need to retry.
|
||||
*/
|
||||
proc_exit(1);
|
||||
}
|
||||
|
||||
if (found && myDbData->workerPid != 0)
|
||||
{
|
||||
/* Another maintenance daemon is running.*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
databaseOid = MyDatabaseId;
|
||||
myDbData->userOid = GetSessionUserId();
|
||||
isMainDb = true;
|
||||
}
|
||||
else
|
||||
{
|
||||
myDbData = (MaintenanceDaemonDBData *)
|
||||
hash_search(MaintenanceDaemonDBHash, &databaseOid,
|
||||
HASH_FIND, NULL);
|
||||
|
||||
if (!myDbData)
|
||||
{
|
||||
/*
|
||||
* When the database crashes, background workers are restarted, but
|
||||
* the state in shared memory is lost. In that case, we exit and
|
||||
* wait for a session to call InitializeMaintenanceDaemonBackend
|
||||
* to properly add it to the hash.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
if (myDbData->workerPid != 0)
|
||||
{
|
||||
/*
|
||||
* Another maintenance daemon is running. This usually happens because
|
||||
* postgres restarts the daemon after an non-zero exit, and
|
||||
* InitializeMaintenanceDaemonBackend started one before postgres did.
|
||||
* In that case, the first one stays and the last one exits.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
}
|
||||
|
||||
if (myDbData->workerPid != 0)
|
||||
{
|
||||
/*
|
||||
* Another maintenance daemon is running. This usually happens because
|
||||
* postgres restarts the daemon after an non-zero exit, and
|
||||
* InitializeMaintenanceDaemonBackend started one before postgres did.
|
||||
* In that case, the first one stays and the last one exits.
|
||||
*/
|
||||
|
||||
proc_exit(0);
|
||||
}
|
||||
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, main_arg);
|
||||
before_shmem_exit(MaintenanceDaemonShmemExit, ObjectIdGetDatum(databaseOid));
|
||||
|
||||
/*
|
||||
* Signal that I am the maintenance daemon now.
|
||||
|
@ -356,25 +455,55 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
|
||||
LWLockRelease(&MaintenanceDaemonControl->lock);
|
||||
|
||||
/*
|
||||
* Setup error context so log messages can be properly attributed. Some of
|
||||
* them otherwise sound like they might be from a normal user connection.
|
||||
* Do so before setting up signals etc, so we never exit without the
|
||||
* context setup.
|
||||
*/
|
||||
ErrorContextCallback errorCallback = { 0 };
|
||||
memset(&errorCallback, 0, sizeof(errorCallback));
|
||||
errorCallback.callback = MaintenanceDaemonErrorContext;
|
||||
errorCallback.arg = (void *) myDbData;
|
||||
errorCallback.previous = error_context_stack;
|
||||
error_context_stack = &errorCallback;
|
||||
|
||||
|
||||
elog(LOG, "starting maintenance daemon on database %u user %u",
|
||||
databaseOid, myDbData->userOid);
|
||||
|
||||
/* connect to database, after that we can actually access catalogs */
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||
if (!isMainDb)
|
||||
{
|
||||
/* connect to database, after that we can actually access catalogs */
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, myDbData->userOid, 0);
|
||||
}
|
||||
|
||||
return myDbData;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CitusMaintenanceDaemonMain is the maintenance daemon's main routine, it'll
|
||||
* be started by the background worker infrastructure. If it errors out,
|
||||
* it'll be restarted after a few seconds.
|
||||
*/
|
||||
void
|
||||
CitusMaintenanceDaemonMain(Datum main_arg)
|
||||
{
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
TimestampTz nextStatsCollectionTime USED_WITH_LIBCURL_ONLY =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), 60 * 1000);
|
||||
bool retryStatsCollection USED_WITH_LIBCURL_ONLY = false;
|
||||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz lastShardCleanTime = 0;
|
||||
TimestampTz lastStatStatementsPurgeTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
/* state kept for the background tasks queue monitor */
|
||||
TimestampTz lastBackgroundTaskQueueCheck = GetCurrentTimestamp();
|
||||
BackgroundWorkerHandle *backgroundTasksQueueBgwHandle = NULL;
|
||||
bool backgroundTasksQueueWarnedForLock = false;
|
||||
|
||||
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
|
||||
MaintenanceDaemonDBData *myDbData = ConnectToDatabase(databaseOid);
|
||||
|
||||
/* make worker recognizable in pg_stat_activity */
|
||||
pgstat_report_appname("Citus Maintenance Daemon");
|
||||
|
@ -383,7 +512,7 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||
* or crashed maintenanced instances.
|
||||
*/
|
||||
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
||||
SignalMetadataSyncDaemon(MyDatabaseId, SIGTERM);
|
||||
|
||||
/* enter main loop */
|
||||
while (!got_SIGTERM)
|
||||
|
@ -945,7 +1074,7 @@ MaintenanceDaemonShmemExit(int code, Datum arg)
|
|||
}
|
||||
|
||||
|
||||
/* MaintenanceDaemonSigTermHandler calls proc_exit(0) */
|
||||
/* MaintenanceDaemonSigTermHandler sets the got_SIGTERM flag.*/
|
||||
static void
|
||||
MaintenanceDaemonSigTermHandler(SIGNAL_ARGS)
|
||||
{
|
||||
|
|
|
@ -20,6 +20,7 @@
|
|||
|
||||
/* config variable for */
|
||||
extern double DistributedDeadlockDetectionTimeoutFactor;
|
||||
extern char *MainDb;
|
||||
|
||||
extern void StopMaintenanceDaemon(Oid databaseId);
|
||||
extern void TriggerNodeMetadataSync(Oid databaseId);
|
||||
|
@ -27,6 +28,7 @@ extern void InitializeMaintenanceDaemon(void);
|
|||
extern size_t MaintenanceDaemonShmemSize(void);
|
||||
extern void MaintenanceDaemonShmemInit(void);
|
||||
extern void InitializeMaintenanceDaemonBackend(void);
|
||||
extern void InitializeMaintenanceDaemonForMainDb(void);
|
||||
extern bool LockCitusExtension(void);
|
||||
|
||||
extern PGDLLEXPORT void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||
|
|
|
@ -342,6 +342,7 @@ extern void LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char *
|
|||
int *nodePort);
|
||||
extern bool IsDummyPlacement(ShardPlacement *taskPlacement);
|
||||
extern StringInfo GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList,
|
||||
Oid indexId,
|
||||
SizeQueryType sizeQueryType,
|
||||
bool optimizePartitionCalculations);
|
||||
extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
|
||||
|
|
|
@ -222,7 +222,7 @@ s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g
|
|||
s/^(PL\/pgSQL function .* line) [0-9]+ (.*)/\1 XX \2/g
|
||||
|
||||
# normalize a test difference in multi_move_mx
|
||||
s/ connection to server at "\w+" \(127\.0\.0\.1\), port [0-9]+ failed://g
|
||||
s/ connection to server at "\w+" (\(127\.0\.0\.1\)|\(::1\)), port [0-9]+ failed://g
|
||||
|
||||
# normalize differences in tablespace of new index
|
||||
s/pg14\.idx.*/pg14\.xxxxx/g
|
||||
|
|
|
@ -453,6 +453,9 @@ def cleanup_test_leftovers(nodes):
|
|||
for node in nodes:
|
||||
node.cleanup_schemas()
|
||||
|
||||
for node in nodes:
|
||||
node.cleanup_databases()
|
||||
|
||||
for node in nodes:
|
||||
node.cleanup_users()
|
||||
|
||||
|
@ -753,6 +756,7 @@ class Postgres(QueryRunner):
|
|||
self.subscriptions = set()
|
||||
self.publications = set()
|
||||
self.replication_slots = set()
|
||||
self.databases = set()
|
||||
self.schemas = set()
|
||||
self.users = set()
|
||||
|
||||
|
@ -993,6 +997,10 @@ class Postgres(QueryRunner):
|
|||
args = sql.SQL("")
|
||||
self.sql(sql.SQL("CREATE USER {} {}").format(sql.Identifier(name), args))
|
||||
|
||||
def create_database(self, name):
|
||||
self.databases.add(name)
|
||||
self.sql(sql.SQL("CREATE DATABASE {}").format(sql.Identifier(name)))
|
||||
|
||||
def create_schema(self, name):
|
||||
self.schemas.add(name)
|
||||
self.sql(sql.SQL("CREATE SCHEMA {}").format(sql.Identifier(name)))
|
||||
|
@ -1020,6 +1028,12 @@ class Postgres(QueryRunner):
|
|||
for user in self.users:
|
||||
self.sql(sql.SQL("DROP USER IF EXISTS {}").format(sql.Identifier(user)))
|
||||
|
||||
def cleanup_databases(self):
|
||||
for database in self.databases:
|
||||
self.sql(
|
||||
sql.SQL("DROP DATABASE IF EXISTS {}").format(sql.Identifier(database))
|
||||
)
|
||||
|
||||
def cleanup_schemas(self):
|
||||
for schema in self.schemas:
|
||||
self.sql(
|
||||
|
|
|
@ -175,6 +175,7 @@ DEPS = {
|
|||
),
|
||||
"grant_on_schema_propagation": TestDeps("minimal_schedule"),
|
||||
"propagate_extension_commands": TestDeps("minimal_schedule"),
|
||||
"multi_size_queries": TestDeps("base_schedule", ["multi_copy"]),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -0,0 +1,74 @@
|
|||
# This test checks that once citus.main_db is set and the
|
||||
# server is restarted. A Citus Maintenance Daemon for the main_db
|
||||
# is launched. This should happen even if there is no query run
|
||||
# in main_db yet.
|
||||
import time
|
||||
|
||||
|
||||
def wait_until_maintenance_deamons_start(deamoncount, cluster):
|
||||
i = 0
|
||||
n = 0
|
||||
|
||||
while i < 10:
|
||||
i += 1
|
||||
n = cluster.coordinator.sql_value(
|
||||
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';"
|
||||
)
|
||||
|
||||
if n == deamoncount:
|
||||
break
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
assert n == deamoncount
|
||||
|
||||
|
||||
def test_set_maindb(cluster_factory):
|
||||
cluster = cluster_factory(0)
|
||||
|
||||
# Test that once citus.main_db is set to a database name
|
||||
# there are two maintenance deamons running upon restart.
|
||||
# One maintenance deamon for the database of the current connection
|
||||
# and one for the citus.main_db.
|
||||
cluster.coordinator.create_database("mymaindb")
|
||||
cluster.coordinator.configure("citus.main_db='mymaindb'")
|
||||
cluster.coordinator.restart()
|
||||
|
||||
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
|
||||
|
||||
wait_until_maintenance_deamons_start(2, cluster)
|
||||
|
||||
assert (
|
||||
cluster.coordinator.sql_value(
|
||||
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
|
||||
)
|
||||
== 1
|
||||
)
|
||||
|
||||
# Test that once citus.main_db is set to empty string
|
||||
# there is only one maintenance deamon for the database
|
||||
# of the current connection.
|
||||
cluster.coordinator.configure("citus.main_db=''")
|
||||
cluster.coordinator.restart()
|
||||
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == ""
|
||||
|
||||
wait_until_maintenance_deamons_start(1, cluster)
|
||||
|
||||
# Test that after citus.main_db is dropped. The maintenance
|
||||
# deamon for this database is terminated.
|
||||
cluster.coordinator.configure("citus.main_db='mymaindb'")
|
||||
cluster.coordinator.restart()
|
||||
assert cluster.coordinator.sql_value("SHOW citus.main_db;") == "mymaindb"
|
||||
|
||||
wait_until_maintenance_deamons_start(2, cluster)
|
||||
|
||||
cluster.coordinator.sql("DROP DATABASE mymaindb;")
|
||||
|
||||
wait_until_maintenance_deamons_start(1, cluster)
|
||||
|
||||
assert (
|
||||
cluster.coordinator.sql_value(
|
||||
"SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon' AND datname='mymaindb';"
|
||||
)
|
||||
== 0
|
||||
)
|
|
@ -226,7 +226,7 @@ step s1-drop: DROP TABLE drop_hash;
|
|||
step s2-table-size: SELECT citus_total_relation_size('drop_hash'); <waiting ...>
|
||||
step s1-commit: COMMIT;
|
||||
step s2-table-size: <... completed>
|
||||
ERROR: could not compute table size: relation does not exist
|
||||
ERROR: could not compute relation size: relation does not exist
|
||||
step s2-commit: COMMIT;
|
||||
step s1-select-count: SELECT COUNT(*) FROM drop_hash;
|
||||
ERROR: relation "drop_hash" does not exist
|
||||
|
|
|
@ -0,0 +1,68 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1:
|
||||
-- update a specific node by address
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s1-abort: ABORT;
|
||||
step s2-update-node-1: <... completed>
|
||||
master_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-abort: ABORT;
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s1-begin: BEGIN;
|
||||
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
|
||||
step s2-begin: BEGIN;
|
||||
step s2-update-node-1-force:
|
||||
-- update a specific node by address (force)
|
||||
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
|
||||
FROM pg_dist_node
|
||||
WHERE nodename = 'localhost'
|
||||
AND nodeport = 57637;
|
||||
<waiting ...>
|
||||
step s2-update-node-1-force: <... completed>
|
||||
master_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
step s2-abort: ABORT;
|
||||
step s1-abort: ABORT;
|
||||
FATAL: terminating connection due to administrator command
|
||||
FATAL: terminating connection due to administrator command
|
||||
SSL connection has been closed unexpectedly
|
||||
server closed the connection unexpectedly
|
||||
|
||||
master_remove_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
||||
(2 rows)
|
||||
|
|
@ -90,7 +90,7 @@ SELECT citus_disable_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
@ -812,7 +812,7 @@ SELECT citus_disable_node('localhost', 9999);
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
|
@ -1258,3 +1258,9 @@ SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHER
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- Grant all on public schema to public
|
||||
--
|
||||
-- That's the default on Postgres versions < 15 and we want to
|
||||
-- keep permissions compatible accross versions, in regression
|
||||
-- tests.
|
||||
GRANT ALL ON SCHEMA public TO PUBLIC;
|
||||
|
|
|
@ -7,19 +7,25 @@
|
|||
SET citus.next_shard_id TO 1390000;
|
||||
-- Tests with invalid relation IDs
|
||||
SELECT citus_table_size(1);
|
||||
ERROR: could not compute table size: relation does not exist
|
||||
ERROR: could not compute relation size: relation does not exist
|
||||
SELECT citus_relation_size(1);
|
||||
ERROR: could not compute table size: relation does not exist
|
||||
ERROR: could not compute relation size: relation does not exist
|
||||
SELECT citus_total_relation_size(1);
|
||||
ERROR: could not compute table size: relation does not exist
|
||||
ERROR: could not compute relation size: relation does not exist
|
||||
-- Tests with non-distributed table
|
||||
CREATE TABLE non_distributed_table (x int);
|
||||
CREATE TABLE non_distributed_table (x int primary key);
|
||||
SELECT citus_table_size('non_distributed_table');
|
||||
ERROR: cannot calculate the size because relation 'non_distributed_table' is not distributed
|
||||
SELECT citus_relation_size('non_distributed_table');
|
||||
ERROR: cannot calculate the size because relation 'non_distributed_table' is not distributed
|
||||
SELECT citus_total_relation_size('non_distributed_table');
|
||||
ERROR: cannot calculate the size because relation 'non_distributed_table' is not distributed
|
||||
SELECT citus_table_size('non_distributed_table_pkey');
|
||||
ERROR: cannot calculate the size because table 'non_distributed_table' for index 'non_distributed_table_pkey' is not distributed
|
||||
SELECT citus_relation_size('non_distributed_table_pkey');
|
||||
ERROR: cannot calculate the size because table 'non_distributed_table' for index 'non_distributed_table_pkey' is not distributed
|
||||
SELECT citus_total_relation_size('non_distributed_table_pkey');
|
||||
ERROR: cannot calculate the size because table 'non_distributed_table' for index 'non_distributed_table_pkey' is not distributed
|
||||
DROP TABLE non_distributed_table;
|
||||
-- fix broken placements via disabling the node
|
||||
SET client_min_messages TO ERROR;
|
||||
|
@ -31,24 +37,70 @@ SELECT replicate_table_shards('lineitem_hash_part', shard_replication_factor:=2,
|
|||
|
||||
-- Tests on distributed table with replication factor > 1
|
||||
VACUUM (FULL) lineitem_hash_part;
|
||||
SELECT citus_table_size('lineitem_hash_part');
|
||||
citus_table_size
|
||||
SELECT citus_relation_size('lineitem_hash_part') <= citus_table_size('lineitem_hash_part');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
3801088
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part');
|
||||
citus_relation_size
|
||||
SELECT citus_table_size('lineitem_hash_part') <= citus_total_relation_size('lineitem_hash_part');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
3801088
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_total_relation_size('lineitem_hash_part');
|
||||
citus_total_relation_size
|
||||
SELECT citus_relation_size('lineitem_hash_part') > 0;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
3801088
|
||||
t
|
||||
(1 row)
|
||||
|
||||
CREATE INDEX lineitem_hash_part_idx ON lineitem_hash_part(l_orderkey);
|
||||
VACUUM (FULL) lineitem_hash_part;
|
||||
SELECT citus_relation_size('lineitem_hash_part') <= citus_table_size('lineitem_hash_part');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_table_size('lineitem_hash_part') <= citus_total_relation_size('lineitem_hash_part');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part') > 0;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part_idx') <= citus_table_size('lineitem_hash_part_idx');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_table_size('lineitem_hash_part_idx') <= citus_total_relation_size('lineitem_hash_part_idx');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part_idx') > 0;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT citus_total_relation_size('lineitem_hash_part') >=
|
||||
citus_table_size('lineitem_hash_part') + citus_table_size('lineitem_hash_part_idx');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
DROP INDEX lineitem_hash_part_idx;
|
||||
VACUUM (FULL) customer_copy_hash;
|
||||
-- Tests on distributed tables with streaming replication.
|
||||
SELECT citus_table_size('customer_copy_hash');
|
||||
|
@ -72,10 +124,10 @@ SELECT citus_total_relation_size('customer_copy_hash');
|
|||
-- Make sure we can get multiple sizes in a single query
|
||||
SELECT citus_table_size('customer_copy_hash'),
|
||||
citus_table_size('customer_copy_hash'),
|
||||
citus_table_size('supplier');
|
||||
citus_table_size('customer_copy_hash');
|
||||
citus_table_size | citus_table_size | citus_table_size
|
||||
---------------------------------------------------------------------
|
||||
548864 | 548864 | 655360
|
||||
548864 | 548864 | 548864
|
||||
(1 row)
|
||||
|
||||
CREATE INDEX index_1 on customer_copy_hash(c_custkey);
|
||||
|
@ -99,6 +151,24 @@ SELECT citus_total_relation_size('customer_copy_hash');
|
|||
2646016
|
||||
(1 row)
|
||||
|
||||
SELECT citus_table_size('index_1');
|
||||
citus_table_size
|
||||
---------------------------------------------------------------------
|
||||
1048576
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('index_1');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
1048576
|
||||
(1 row)
|
||||
|
||||
SELECT citus_total_relation_size('index_1');
|
||||
citus_total_relation_size
|
||||
---------------------------------------------------------------------
|
||||
1048576
|
||||
(1 row)
|
||||
|
||||
-- Tests on reference table
|
||||
VACUUM (FULL) supplier;
|
||||
SELECT citus_table_size('supplier');
|
||||
|
@ -139,6 +209,74 @@ SELECT citus_total_relation_size('supplier');
|
|||
688128
|
||||
(1 row)
|
||||
|
||||
SELECT citus_table_size('index_2');
|
||||
citus_table_size
|
||||
---------------------------------------------------------------------
|
||||
122880
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('index_2');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
122880
|
||||
(1 row)
|
||||
|
||||
SELECT citus_total_relation_size('index_2');
|
||||
citus_total_relation_size
|
||||
---------------------------------------------------------------------
|
||||
122880
|
||||
(1 row)
|
||||
|
||||
-- Test on partitioned table
|
||||
CREATE TABLE split_me (dist_col int, partition_col timestamp) PARTITION BY RANGE (partition_col);
|
||||
CREATE INDEX ON split_me(dist_col);
|
||||
-- create 2 partitions
|
||||
CREATE TABLE m PARTITION OF split_me FOR VALUES FROM ('2018-01-01') TO ('2019-01-01');
|
||||
CREATE TABLE e PARTITION OF split_me FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
|
||||
INSERT INTO split_me SELECT 1, '2018-01-01'::timestamp + i * interval '1 day' FROM generate_series(1, 360) i;
|
||||
INSERT INTO split_me SELECT 2, '2019-01-01'::timestamp + i * interval '1 day' FROM generate_series(1, 180) i;
|
||||
-- before citus
|
||||
SELECT citus_relation_size('split_me');
|
||||
ERROR: cannot calculate the size because relation 'split_me' is not distributed
|
||||
SELECT citus_relation_size('split_me_dist_col_idx');
|
||||
ERROR: cannot calculate the size because table 'split_me' for index 'split_me_dist_col_idx' is not distributed
|
||||
SELECT citus_relation_size('m');
|
||||
ERROR: cannot calculate the size because relation 'm' is not distributed
|
||||
SELECT citus_relation_size('m_dist_col_idx');
|
||||
ERROR: cannot calculate the size because table 'm' for index 'm_dist_col_idx' is not distributed
|
||||
-- distribute the table(s)
|
||||
SELECT create_distributed_table('split_me', 'dist_col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- after citus
|
||||
SELECT citus_relation_size('split_me');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('split_me_dist_col_idx');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
0
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('m');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
32768
|
||||
(1 row)
|
||||
|
||||
SELECT citus_relation_size('m_dist_col_idx');
|
||||
citus_relation_size
|
||||
---------------------------------------------------------------------
|
||||
81920
|
||||
(1 row)
|
||||
|
||||
DROP TABLE split_me;
|
||||
-- Test inside the transaction
|
||||
BEGIN;
|
||||
ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
||||
|
|
|
@ -133,12 +133,6 @@ ORDER BY 1, 2;
|
|||
validatable_constraint_8000016 | t
|
||||
(10 rows)
|
||||
|
||||
DROP TABLE constrained_table;
|
||||
DROP TABLE referenced_table CASCADE;
|
||||
DROP TABLE referencing_table;
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA validate_constraint CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to type constraint_validity
|
||||
drop cascades to view constraint_validations_in_workers
|
||||
drop cascades to view constraint_validations
|
||||
SET search_path TO DEFAULT;
|
||||
|
|
|
@ -201,7 +201,8 @@ test: citus_copy_shard_placement
|
|||
# multi_utilities cannot be run in parallel with other tests because it checks
|
||||
# global locks
|
||||
test: multi_utilities
|
||||
test: foreign_key_to_reference_table validate_constraint
|
||||
test: foreign_key_to_reference_table
|
||||
test: validate_constraint
|
||||
test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactions
|
||||
|
||||
test: multi_modifying_xacts
|
||||
|
@ -297,7 +298,8 @@ test: replicate_reference_tables_to_coordinator
|
|||
test: citus_local_tables
|
||||
test: mixed_relkind_tests
|
||||
test: multi_row_router_insert create_distributed_table_concurrently
|
||||
test: multi_reference_table citus_local_tables_queries
|
||||
test: multi_reference_table
|
||||
test: citus_local_tables_queries
|
||||
test: citus_local_table_triggers
|
||||
test: coordinator_shouldhaveshards
|
||||
test: local_shard_utility_command_execution
|
||||
|
|
|
@ -154,7 +154,8 @@ test: multi_outer_join
|
|||
# ---
|
||||
test: multi_complex_count_distinct
|
||||
test: multi_upsert multi_simple_queries
|
||||
test: foreign_key_to_reference_table validate_constraint
|
||||
test: foreign_key_to_reference_table
|
||||
test: validate_constraint
|
||||
|
||||
# ---------
|
||||
# creates hash and range-partitioned tables and performs COPY
|
||||
|
|
|
@ -150,7 +150,9 @@ test: multi_outer_join
|
|||
test: multi_create_fdw
|
||||
test: multi_generate_ddl_commands multi_create_shards multi_prune_shard_list
|
||||
test: multi_upsert multi_simple_queries multi_data_types
|
||||
test: multi_utilities foreign_key_to_reference_table validate_constraint
|
||||
test: multi_utilities
|
||||
test: foreign_key_to_reference_table
|
||||
test: validate_constraint
|
||||
test: multi_repartition_udt multi_repartitioned_subquery_udf
|
||||
|
||||
# ---------
|
||||
|
|
|
@ -39,7 +39,7 @@ SELECT master_get_active_worker_nodes();
|
|||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
|
||||
SELECT citus_disable_node('localhost', :worker_2_port);
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
SELECT master_get_active_worker_nodes();
|
||||
|
||||
-- add some shard placements to the cluster
|
||||
|
@ -328,7 +328,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
|
|||
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
|
||||
SELECT master_activate_node('localhost', 9999);
|
||||
SELECT citus_disable_node('localhost', 9999);
|
||||
SELECT public.wait_until_metadata_sync(60000);
|
||||
SELECT public.wait_until_metadata_sync(20000);
|
||||
SELECT master_remove_node('localhost', 9999);
|
||||
|
||||
-- check that you can't manually add two primaries to a group
|
||||
|
@ -530,3 +530,10 @@ RESET citus.metadata_sync_mode;
|
|||
|
||||
-- verify that at the end of this file, all primary nodes have metadata synced
|
||||
SELECT bool_and(hasmetadata) AND bool_and(metadatasynced) FROM pg_dist_node WHERE isactive = 't' and noderole = 'primary';
|
||||
|
||||
-- Grant all on public schema to public
|
||||
--
|
||||
-- That's the default on Postgres versions < 15 and we want to
|
||||
-- keep permissions compatible accross versions, in regression
|
||||
-- tests.
|
||||
GRANT ALL ON SCHEMA public TO PUBLIC;
|
||||
|
|
|
@ -13,10 +13,15 @@ SELECT citus_relation_size(1);
|
|||
SELECT citus_total_relation_size(1);
|
||||
|
||||
-- Tests with non-distributed table
|
||||
CREATE TABLE non_distributed_table (x int);
|
||||
CREATE TABLE non_distributed_table (x int primary key);
|
||||
|
||||
SELECT citus_table_size('non_distributed_table');
|
||||
SELECT citus_relation_size('non_distributed_table');
|
||||
SELECT citus_total_relation_size('non_distributed_table');
|
||||
|
||||
SELECT citus_table_size('non_distributed_table_pkey');
|
||||
SELECT citus_relation_size('non_distributed_table_pkey');
|
||||
SELECT citus_total_relation_size('non_distributed_table_pkey');
|
||||
DROP TABLE non_distributed_table;
|
||||
|
||||
-- fix broken placements via disabling the node
|
||||
|
@ -26,9 +31,25 @@ SELECT replicate_table_shards('lineitem_hash_part', shard_replication_factor:=2,
|
|||
-- Tests on distributed table with replication factor > 1
|
||||
VACUUM (FULL) lineitem_hash_part;
|
||||
|
||||
SELECT citus_table_size('lineitem_hash_part');
|
||||
SELECT citus_relation_size('lineitem_hash_part');
|
||||
SELECT citus_total_relation_size('lineitem_hash_part');
|
||||
SELECT citus_relation_size('lineitem_hash_part') <= citus_table_size('lineitem_hash_part');
|
||||
SELECT citus_table_size('lineitem_hash_part') <= citus_total_relation_size('lineitem_hash_part');
|
||||
SELECT citus_relation_size('lineitem_hash_part') > 0;
|
||||
|
||||
CREATE INDEX lineitem_hash_part_idx ON lineitem_hash_part(l_orderkey);
|
||||
VACUUM (FULL) lineitem_hash_part;
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part') <= citus_table_size('lineitem_hash_part');
|
||||
SELECT citus_table_size('lineitem_hash_part') <= citus_total_relation_size('lineitem_hash_part');
|
||||
SELECT citus_relation_size('lineitem_hash_part') > 0;
|
||||
|
||||
SELECT citus_relation_size('lineitem_hash_part_idx') <= citus_table_size('lineitem_hash_part_idx');
|
||||
SELECT citus_table_size('lineitem_hash_part_idx') <= citus_total_relation_size('lineitem_hash_part_idx');
|
||||
SELECT citus_relation_size('lineitem_hash_part_idx') > 0;
|
||||
|
||||
SELECT citus_total_relation_size('lineitem_hash_part') >=
|
||||
citus_table_size('lineitem_hash_part') + citus_table_size('lineitem_hash_part_idx');
|
||||
|
||||
DROP INDEX lineitem_hash_part_idx;
|
||||
|
||||
VACUUM (FULL) customer_copy_hash;
|
||||
|
||||
|
@ -40,7 +61,7 @@ SELECT citus_total_relation_size('customer_copy_hash');
|
|||
-- Make sure we can get multiple sizes in a single query
|
||||
SELECT citus_table_size('customer_copy_hash'),
|
||||
citus_table_size('customer_copy_hash'),
|
||||
citus_table_size('supplier');
|
||||
citus_table_size('customer_copy_hash');
|
||||
|
||||
CREATE INDEX index_1 on customer_copy_hash(c_custkey);
|
||||
VACUUM (FULL) customer_copy_hash;
|
||||
|
@ -50,6 +71,10 @@ SELECT citus_table_size('customer_copy_hash');
|
|||
SELECT citus_relation_size('customer_copy_hash');
|
||||
SELECT citus_total_relation_size('customer_copy_hash');
|
||||
|
||||
SELECT citus_table_size('index_1');
|
||||
SELECT citus_relation_size('index_1');
|
||||
SELECT citus_total_relation_size('index_1');
|
||||
|
||||
-- Tests on reference table
|
||||
VACUUM (FULL) supplier;
|
||||
|
||||
|
@ -64,6 +89,38 @@ SELECT citus_table_size('supplier');
|
|||
SELECT citus_relation_size('supplier');
|
||||
SELECT citus_total_relation_size('supplier');
|
||||
|
||||
SELECT citus_table_size('index_2');
|
||||
SELECT citus_relation_size('index_2');
|
||||
SELECT citus_total_relation_size('index_2');
|
||||
|
||||
-- Test on partitioned table
|
||||
CREATE TABLE split_me (dist_col int, partition_col timestamp) PARTITION BY RANGE (partition_col);
|
||||
CREATE INDEX ON split_me(dist_col);
|
||||
|
||||
-- create 2 partitions
|
||||
CREATE TABLE m PARTITION OF split_me FOR VALUES FROM ('2018-01-01') TO ('2019-01-01');
|
||||
CREATE TABLE e PARTITION OF split_me FOR VALUES FROM ('2019-01-01') TO ('2020-01-01');
|
||||
|
||||
INSERT INTO split_me SELECT 1, '2018-01-01'::timestamp + i * interval '1 day' FROM generate_series(1, 360) i;
|
||||
INSERT INTO split_me SELECT 2, '2019-01-01'::timestamp + i * interval '1 day' FROM generate_series(1, 180) i;
|
||||
|
||||
-- before citus
|
||||
SELECT citus_relation_size('split_me');
|
||||
SELECT citus_relation_size('split_me_dist_col_idx');
|
||||
SELECT citus_relation_size('m');
|
||||
SELECT citus_relation_size('m_dist_col_idx');
|
||||
|
||||
-- distribute the table(s)
|
||||
SELECT create_distributed_table('split_me', 'dist_col');
|
||||
|
||||
-- after citus
|
||||
SELECT citus_relation_size('split_me');
|
||||
SELECT citus_relation_size('split_me_dist_col_idx');
|
||||
SELECT citus_relation_size('m');
|
||||
SELECT citus_relation_size('m_dist_col_idx');
|
||||
|
||||
DROP TABLE split_me;
|
||||
|
||||
-- Test inside the transaction
|
||||
BEGIN;
|
||||
ALTER TABLE supplier ALTER COLUMN s_suppkey SET NOT NULL;
|
||||
|
|
|
@ -116,9 +116,6 @@ SELECT *
|
|||
FROM constraint_validations_in_workers
|
||||
ORDER BY 1, 2;
|
||||
|
||||
DROP TABLE constrained_table;
|
||||
DROP TABLE referenced_table CASCADE;
|
||||
DROP TABLE referencing_table;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA validate_constraint CASCADE;
|
||||
SET search_path TO DEFAULT;
|
||||
|
|
Loading…
Reference in New Issue