mirror of https://github.com/citusdata/citus.git
Compare commits
10 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
e8e06d8d0c | |
|
|
f79dd61a92 | |
|
|
274504465d | |
|
|
c39df81f69 | |
|
|
91cae1fb29 | |
|
|
0b9acbeb3d | |
|
|
754e2986ba | |
|
|
2ba22104eb | |
|
|
b03b249f6d | |
|
|
a7d529813b |
|
|
@ -73,7 +73,7 @@ USER citus
|
||||||
|
|
||||||
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
|
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
|
||||||
FROM base AS pg15
|
FROM base AS pg15
|
||||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.13
|
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.14
|
||||||
RUN rm .pgenv/src/*.tar*
|
RUN rm .pgenv/src/*.tar*
|
||||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||||
|
|
@ -85,7 +85,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
||||||
RUN rm .pgenv-staging/config/default.conf
|
RUN rm .pgenv-staging/config/default.conf
|
||||||
|
|
||||||
FROM base AS pg16
|
FROM base AS pg16
|
||||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.9
|
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.10
|
||||||
RUN rm .pgenv/src/*.tar*
|
RUN rm .pgenv/src/*.tar*
|
||||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||||
|
|
@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
||||||
RUN rm .pgenv-staging/config/default.conf
|
RUN rm .pgenv-staging/config/default.conf
|
||||||
|
|
||||||
FROM base AS pg17
|
FROM base AS pg17
|
||||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.5
|
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.6
|
||||||
RUN rm .pgenv/src/*.tar*
|
RUN rm .pgenv/src/*.tar*
|
||||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||||
|
|
@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
|
||||||
RUN sudo chown --from=root:root citus:citus -R ~
|
RUN sudo chown --from=root:root citus:citus -R ~
|
||||||
|
|
||||||
# sets default pg version
|
# sets default pg version
|
||||||
RUN pgenv switch 17.5
|
RUN pgenv switch 17.6
|
||||||
|
|
||||||
# make connecting to the coordinator easy
|
# make connecting to the coordinator easy
|
||||||
ENV PGPORT=9700
|
ENV PGPORT=9700
|
||||||
|
|
|
||||||
|
|
@ -31,12 +31,12 @@ jobs:
|
||||||
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
|
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
|
||||||
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
|
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
|
||||||
style_checker_tools_version: "0.8.18"
|
style_checker_tools_version: "0.8.18"
|
||||||
sql_snapshot_pg_version: "17.5"
|
sql_snapshot_pg_version: "17.6"
|
||||||
image_suffix: "-vb17c33b"
|
image_suffix: "-v4df94a0"
|
||||||
pg15_version: '{ "major": "15", "full": "15.13" }'
|
pg15_version: '{ "major": "15", "full": "15.14" }'
|
||||||
pg16_version: '{ "major": "16", "full": "16.9" }'
|
pg16_version: '{ "major": "16", "full": "16.10" }'
|
||||||
pg17_version: '{ "major": "17", "full": "17.5" }'
|
pg17_version: '{ "major": "17", "full": "17.6" }'
|
||||||
upgrade_pg_versions: "15.13-16.9-17.5"
|
upgrade_pg_versions: "15.14-16.10-17.6"
|
||||||
steps:
|
steps:
|
||||||
# Since GHA jobs need at least one step we use a noop step here.
|
# Since GHA jobs need at least one step we use a noop step here.
|
||||||
- name: Set up parameters
|
- name: Set up parameters
|
||||||
|
|
|
||||||
49
CHANGELOG.md
49
CHANGELOG.md
|
|
@ -1,3 +1,52 @@
|
||||||
|
### citus v13.2.0 (August 18, 2025) ###
|
||||||
|
|
||||||
|
* Adds `citus_add_clone_node()`, `citus_add_clone_node_with_nodeid()`,
|
||||||
|
`citus_remove_clone_node()` and `citus_remove_clone_node_with_nodeid()`
|
||||||
|
UDFs to support snapshot-based node splits. This feature allows promoting
|
||||||
|
a streaming replica (clone) to a primary node and rebalancing shards
|
||||||
|
between the original and newly promoted node without requiring a full data
|
||||||
|
copy. This greatly reduces rebalance times for scale-out operations when
|
||||||
|
the new node already has the data via streaming replication (#8122)
|
||||||
|
|
||||||
|
* Improves performance of shard rebalancer by parallelizing moves and removing
|
||||||
|
bottlenecks that blocked concurrent logical-replication transfers. This
|
||||||
|
reduces rebalance windows especially for clusters with large reference
|
||||||
|
tables and allows multiple shard transfers to run in parallel (#7983)
|
||||||
|
|
||||||
|
* Adds citus.enable_recurring_outer_join_pushdown GUC (enabled by default)
|
||||||
|
to allow pushing down LEFT/RIGHT outer joins having a reference table in
|
||||||
|
the outer side and a distributed table on the inner side (e.g.,
|
||||||
|
\<reference table\> LEFT JOIN \<distributed table\>) (#7973)
|
||||||
|
|
||||||
|
* Adds citus.enable_local_fast_path_query_optimization (enabled by default)
|
||||||
|
GUC to avoid unnecessary query deparsing to improve performance of
|
||||||
|
fast-path queries targeting local shards (#8035)
|
||||||
|
|
||||||
|
* Adds `citus_stats()` UDF that can be used to retrieve distributed `pg_stats`
|
||||||
|
for the provided Citus table. (#8026)
|
||||||
|
|
||||||
|
* Avoids automatically creating citus_columnar when there are no relations
|
||||||
|
using it (#8081)
|
||||||
|
|
||||||
|
* Makes sure to check if the distribution key is in the target list before
|
||||||
|
pushing down a query with a union and an outer join (#8092)
|
||||||
|
|
||||||
|
* Fixes a bug in EXPLAIN ANALYZE to prevent unintended (duplicate) execution
|
||||||
|
of the (sub)plans during the explain phase (#8017)
|
||||||
|
|
||||||
|
* Fixes potential memory corruptions that could happen when accessing
|
||||||
|
various catalog tables after a Citus downgrade is followed by a Citus
|
||||||
|
upgrade (#7950, #8120, #8124, #8121, #8114, #8146)
|
||||||
|
|
||||||
|
* Fixes UPDATE statements with indirection and array/jsonb subscripting with
|
||||||
|
more than one field (#7675)
|
||||||
|
|
||||||
|
* Fixes an assertion failure that happens when an expression in the query
|
||||||
|
references a CTE (#8106)
|
||||||
|
|
||||||
|
* Fixes an assertion failure that happens when querying a view that is
|
||||||
|
defined on distributed tables (#8136)
|
||||||
|
|
||||||
### citus v13.1.0 (May 30th, 2025) ###
|
### citus v13.1.0 (May 30th, 2025) ###
|
||||||
|
|
||||||
* Adds `citus_stat_counters` view that can be used to query
|
* Adds `citus_stat_counters` view that can be used to query
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
#! /bin/sh
|
#! /bin/sh
|
||||||
# Guess values for system-dependent variables and create Makefiles.
|
# Guess values for system-dependent variables and create Makefiles.
|
||||||
# Generated by GNU Autoconf 2.69 for Citus 13.2devel.
|
# Generated by GNU Autoconf 2.69 for Citus 13.2.0.
|
||||||
#
|
#
|
||||||
#
|
#
|
||||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||||
|
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
||||||
# Identity of this package.
|
# Identity of this package.
|
||||||
PACKAGE_NAME='Citus'
|
PACKAGE_NAME='Citus'
|
||||||
PACKAGE_TARNAME='citus'
|
PACKAGE_TARNAME='citus'
|
||||||
PACKAGE_VERSION='13.2devel'
|
PACKAGE_VERSION='13.2.0'
|
||||||
PACKAGE_STRING='Citus 13.2devel'
|
PACKAGE_STRING='Citus 13.2.0'
|
||||||
PACKAGE_BUGREPORT=''
|
PACKAGE_BUGREPORT=''
|
||||||
PACKAGE_URL=''
|
PACKAGE_URL=''
|
||||||
|
|
||||||
|
|
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
|
||||||
# Omit some internal or obsolete options to make the list less imposing.
|
# Omit some internal or obsolete options to make the list less imposing.
|
||||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||||
cat <<_ACEOF
|
cat <<_ACEOF
|
||||||
\`configure' configures Citus 13.2devel to adapt to many kinds of systems.
|
\`configure' configures Citus 13.2.0 to adapt to many kinds of systems.
|
||||||
|
|
||||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||||
|
|
||||||
|
|
@ -1324,7 +1324,7 @@ fi
|
||||||
|
|
||||||
if test -n "$ac_init_help"; then
|
if test -n "$ac_init_help"; then
|
||||||
case $ac_init_help in
|
case $ac_init_help in
|
||||||
short | recursive ) echo "Configuration of Citus 13.2devel:";;
|
short | recursive ) echo "Configuration of Citus 13.2.0:";;
|
||||||
esac
|
esac
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
|
|
||||||
|
|
@ -1429,7 +1429,7 @@ fi
|
||||||
test -n "$ac_init_help" && exit $ac_status
|
test -n "$ac_init_help" && exit $ac_status
|
||||||
if $ac_init_version; then
|
if $ac_init_version; then
|
||||||
cat <<\_ACEOF
|
cat <<\_ACEOF
|
||||||
Citus configure 13.2devel
|
Citus configure 13.2.0
|
||||||
generated by GNU Autoconf 2.69
|
generated by GNU Autoconf 2.69
|
||||||
|
|
||||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||||
|
|
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
|
||||||
This file contains any messages produced by compilers while
|
This file contains any messages produced by compilers while
|
||||||
running configure, to aid debugging if configure makes a mistake.
|
running configure, to aid debugging if configure makes a mistake.
|
||||||
|
|
||||||
It was created by Citus $as_me 13.2devel, which was
|
It was created by Citus $as_me 13.2.0, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
$ $0 $@
|
$ $0 $@
|
||||||
|
|
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
||||||
# report actual input values of CONFIG_FILES etc. instead of their
|
# report actual input values of CONFIG_FILES etc. instead of their
|
||||||
# values after options handling.
|
# values after options handling.
|
||||||
ac_log="
|
ac_log="
|
||||||
This file was extended by Citus $as_me 13.2devel, which was
|
This file was extended by Citus $as_me 13.2.0, which was
|
||||||
generated by GNU Autoconf 2.69. Invocation command line was
|
generated by GNU Autoconf 2.69. Invocation command line was
|
||||||
|
|
||||||
CONFIG_FILES = $CONFIG_FILES
|
CONFIG_FILES = $CONFIG_FILES
|
||||||
|
|
@ -5455,7 +5455,7 @@ _ACEOF
|
||||||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||||
ac_cs_version="\\
|
ac_cs_version="\\
|
||||||
Citus config.status 13.2devel
|
Citus config.status 13.2.0
|
||||||
configured by $0, generated by GNU Autoconf 2.69,
|
configured by $0, generated by GNU Autoconf 2.69,
|
||||||
with options \\"\$ac_cs_config\\"
|
with options \\"\$ac_cs_config\\"
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@
|
||||||
# everyone needing autoconf installed, the resulting files are checked
|
# everyone needing autoconf installed, the resulting files are checked
|
||||||
# into the SCM.
|
# into the SCM.
|
||||||
|
|
||||||
AC_INIT([Citus], [13.2devel])
|
AC_INIT([Citus], [13.2.0])
|
||||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||||
|
|
||||||
# we'll need sed and awk for some of the version commands
|
# we'll need sed and awk for some of the version commands
|
||||||
|
|
|
||||||
|
|
@ -659,8 +659,9 @@ SaveStripeSkipList(RelFileLocator relfilelocator, uint64 stripe,
|
||||||
nulls[Anum_columnar_chunk_minimum_value - 1] = true;
|
nulls[Anum_columnar_chunk_minimum_value - 1] = true;
|
||||||
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
|
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
|
||||||
}
|
}
|
||||||
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||||
|
PopActiveSnapshot();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2132,7 +2133,7 @@ GetHighestUsedRowNumber(uint64 storageId)
|
||||||
static int
|
static int
|
||||||
GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc)
|
GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc)
|
||||||
{
|
{
|
||||||
return TupleDescSize(tupleDesc) == Natts_columnar_stripe
|
return tupleDesc->natts == Natts_columnar_stripe
|
||||||
? (Anum_columnar_stripe_first_row_number - 1)
|
? (Anum_columnar_stripe_first_row_number - 1)
|
||||||
: tupleDesc->natts - 1;
|
: tupleDesc->natts - 1;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1837,6 +1837,62 @@ ensure_update_targetlist_in_param_order(List *targetList)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* isSubsRef checks if a given node is a SubscriptingRef or can be
|
||||||
|
* reached through an implicit coercion.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
bool
|
||||||
|
isSubsRef(Node *node)
|
||||||
|
{
|
||||||
|
if (node == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (IsA(node, CoerceToDomain))
|
||||||
|
{
|
||||||
|
CoerceToDomain *coerceToDomain = (CoerceToDomain *) node;
|
||||||
|
if (coerceToDomain->coercionformat != COERCE_IMPLICIT_CAST)
|
||||||
|
{
|
||||||
|
/* not an implicit coercion, cannot reach to a SubscriptingRef */
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
node = (Node *) coerceToDomain->arg;
|
||||||
|
}
|
||||||
|
|
||||||
|
return (IsA(node, SubscriptingRef));
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* checkTlistForSubsRef - checks if any target entry in the list contains a
|
||||||
|
* SubscriptingRef or can be reached through an implicit coercion. Used by
|
||||||
|
* ExpandMergedSubscriptingRefEntries() to identify if any target entries
|
||||||
|
* need to be expanded - if not the original target list is preserved.
|
||||||
|
*/
|
||||||
|
static
|
||||||
|
bool
|
||||||
|
checkTlistForSubsRef(List *targetEntryList)
|
||||||
|
{
|
||||||
|
ListCell *tgtCell = NULL;
|
||||||
|
|
||||||
|
foreach(tgtCell, targetEntryList)
|
||||||
|
{
|
||||||
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
|
||||||
|
Expr *expr = targetEntry->expr;
|
||||||
|
|
||||||
|
if (isSubsRef((Node *) expr))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
|
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
|
||||||
* each one that references a SubscriptingRef node that indicates multiple (field)
|
* each one that references a SubscriptingRef node that indicates multiple (field)
|
||||||
|
|
@ -1848,6 +1904,12 @@ ExpandMergedSubscriptingRefEntries(List *targetEntryList)
|
||||||
List *newTargetEntryList = NIL;
|
List *newTargetEntryList = NIL;
|
||||||
ListCell *tgtCell = NULL;
|
ListCell *tgtCell = NULL;
|
||||||
|
|
||||||
|
if (!checkTlistForSubsRef(targetEntryList))
|
||||||
|
{
|
||||||
|
/* No subscripting refs found, return original list */
|
||||||
|
return targetEntryList;
|
||||||
|
}
|
||||||
|
|
||||||
foreach(tgtCell, targetEntryList)
|
foreach(tgtCell, targetEntryList)
|
||||||
{
|
{
|
||||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
|
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
|
||||||
|
|
|
||||||
|
|
@ -801,7 +801,7 @@ DistributedSequenceList(void)
|
||||||
int
|
int
|
||||||
GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc)
|
GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc)
|
||||||
{
|
{
|
||||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_object
|
return tupleDesc->natts == Natts_pg_dist_object
|
||||||
? (Anum_pg_dist_object_force_delegation - 1)
|
? (Anum_pg_dist_object_force_delegation - 1)
|
||||||
: tupleDesc->natts - 1;
|
: tupleDesc->natts - 1;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -4486,7 +4486,7 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
|
||||||
int
|
int
|
||||||
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
||||||
{
|
{
|
||||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_partition
|
return tupleDesc->natts == Natts_pg_dist_partition
|
||||||
? (Anum_pg_dist_partition_autoconverted - 1)
|
? (Anum_pg_dist_partition_autoconverted - 1)
|
||||||
: tupleDesc->natts - 1;
|
: tupleDesc->natts - 1;
|
||||||
}
|
}
|
||||||
|
|
@ -4506,7 +4506,7 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
||||||
int
|
int
|
||||||
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
|
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
|
||||||
{
|
{
|
||||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
|
return tupleDesc->natts == Natts_pg_dist_background_task
|
||||||
? (Anum_pg_dist_background_task_nodes_involved - 1)
|
? (Anum_pg_dist_background_task_nodes_involved - 1)
|
||||||
: tupleDesc->natts - 1;
|
: tupleDesc->natts - 1;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker);
|
||||||
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
|
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
|
||||||
bool localOnly);
|
bool localOnly);
|
||||||
|
static int GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc);
|
||||||
|
static int GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc);
|
||||||
static bool UnsetMetadataSyncedForAllWorkers(void);
|
static bool UnsetMetadataSyncedForAllWorkers(void);
|
||||||
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
|
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
|
||||||
int columnIndex,
|
int columnIndex,
|
||||||
|
|
@ -1196,6 +1198,11 @@ ActivateNodeList(MetadataSyncContext *context)
|
||||||
void
|
void
|
||||||
ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
|
ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
|
||||||
{
|
{
|
||||||
|
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
|
||||||
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
TupleDesc copiedTupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
|
||||||
|
table_close(pgDistNode, AccessShareLock);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Set the node as primary and active.
|
* Set the node as primary and active.
|
||||||
*/
|
*/
|
||||||
|
|
@ -1203,9 +1210,13 @@ ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
|
||||||
ObjectIdGetDatum(PrimaryNodeRoleId()));
|
ObjectIdGetDatum(PrimaryNodeRoleId()));
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
||||||
BoolGetDatum(true));
|
BoolGetDatum(true));
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisclone,
|
SetWorkerColumnLocalOnly(workerNode,
|
||||||
|
GetNodeIsCloneAttrIndexInPgDistNode(copiedTupleDescriptor) +
|
||||||
|
1,
|
||||||
BoolGetDatum(false));
|
BoolGetDatum(false));
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid,
|
SetWorkerColumnLocalOnly(workerNode,
|
||||||
|
GetNodePrimaryNodeIdAttrIndexInPgDistNode(
|
||||||
|
copiedTupleDescriptor) + 1,
|
||||||
Int32GetDatum(0));
|
Int32GetDatum(0));
|
||||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
|
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
|
||||||
BoolGetDatum(true));
|
BoolGetDatum(true));
|
||||||
|
|
@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
||||||
{
|
{
|
||||||
const bool indexOK = true;
|
const bool indexOK = true;
|
||||||
|
|
||||||
ScanKeyData scanKey[1];
|
|
||||||
Datum values[Natts_pg_dist_node];
|
|
||||||
bool isnull[Natts_pg_dist_node];
|
|
||||||
bool replace[Natts_pg_dist_node];
|
|
||||||
|
|
||||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
|
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
|
||||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
|
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
|
||||||
|
|
||||||
|
|
@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
||||||
newNodeName, newNodePort)));
|
newNodeName, newNodePort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
|
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
|
||||||
isnull[Anum_pg_dist_node_nodeport - 1] = false;
|
isnull[Anum_pg_dist_node_nodeport - 1] = false;
|
||||||
replace[Anum_pg_dist_node_nodeport - 1] = true;
|
replace[Anum_pg_dist_node_nodeport - 1] = true;
|
||||||
|
|
@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
||||||
|
|
||||||
systable_endscan(scanDescriptor);
|
systable_endscan(scanDescriptor);
|
||||||
table_close(pgDistNode, NoLock);
|
table_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
|
||||||
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
|
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_node];
|
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
bool isnull[Natts_pg_dist_node];
|
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
bool replace[Natts_pg_dist_node];
|
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
|
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
|
||||||
isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
|
isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
|
||||||
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
||||||
|
|
@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
table_close(pgDistNode, NoLock);
|
table_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -2831,9 +2847,9 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
|
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
|
||||||
|
|
||||||
Datum values[Natts_pg_dist_node];
|
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
bool isnull[Natts_pg_dist_node];
|
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
bool replace[Natts_pg_dist_node];
|
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
if (heapTuple == NULL)
|
if (heapTuple == NULL)
|
||||||
{
|
{
|
||||||
|
|
@ -2841,7 +2857,6 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||||
workerNode->workerName, workerNode->workerPort)));
|
workerNode->workerName, workerNode->workerPort)));
|
||||||
}
|
}
|
||||||
|
|
||||||
memset(replace, 0, sizeof(replace));
|
|
||||||
values[columnIndex - 1] = value;
|
values[columnIndex - 1] = value;
|
||||||
isnull[columnIndex - 1] = false;
|
isnull[columnIndex - 1] = false;
|
||||||
replace[columnIndex - 1] = true;
|
replace[columnIndex - 1] = true;
|
||||||
|
|
@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
||||||
|
|
||||||
table_close(pgDistNode, NoLock);
|
table_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
|
|
||||||
return newWorkerNode;
|
return newWorkerNode;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void)
|
||||||
static void
|
static void
|
||||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
|
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
|
||||||
{
|
{
|
||||||
Datum values[Natts_pg_dist_node];
|
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||||
bool isNulls[Natts_pg_dist_node];
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||||
|
|
||||||
|
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool *isNulls = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
|
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
|
||||||
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
|
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
|
||||||
|
|
||||||
/* form new shard tuple */
|
|
||||||
memset(values, 0, sizeof(values));
|
|
||||||
memset(isNulls, false, sizeof(isNulls));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
|
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
|
||||||
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
|
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
|
||||||
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
|
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
|
||||||
|
|
@ -3264,17 +3282,15 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
||||||
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
||||||
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
|
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
|
||||||
nodeMetadata->shouldHaveShards);
|
nodeMetadata->shouldHaveShards);
|
||||||
values[Anum_pg_dist_node_nodeisclone - 1] = BoolGetDatum(
|
values[GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor)] =
|
||||||
nodeMetadata->nodeisclone);
|
BoolGetDatum(nodeMetadata->nodeisclone);
|
||||||
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum(
|
values[GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor)] =
|
||||||
nodeMetadata->nodeprimarynodeid);
|
Int32GetDatum(nodeMetadata->nodeprimarynodeid);
|
||||||
|
|
||||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
|
||||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple);
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
CatalogTupleInsert(pgDistNode, heapTuple);
|
||||||
|
PopActiveSnapshot();
|
||||||
|
|
||||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||||
|
|
||||||
|
|
@ -3283,6 +3299,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
||||||
|
|
||||||
/* close relation */
|
/* close relation */
|
||||||
table_close(pgDistNode, NoLock);
|
table_close(pgDistNode, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isNulls);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -3397,43 +3416,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
|
||||||
1]);
|
1]);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Attributes above this line are guaranteed to be present at the
|
* nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case
|
||||||
* exact defined attribute number. Atleast till now. If you are droping or
|
* of extension creation/upgrade, master_initialize_node_metadata function is
|
||||||
* adding any of the above columns consider adjusting the code above
|
* called before the nodecluster column is added to pg_dist_node table.
|
||||||
*/
|
*/
|
||||||
Oid pgDistNodeRelId = RelationGetRelid(pgDistNode);
|
|
||||||
|
|
||||||
AttrNumber nodeClusterAttno = get_attnum(pgDistNodeRelId, "nodecluster");
|
if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
|
||||||
|
|
||||||
if (nodeClusterAttno > 0 &&
|
|
||||||
!TupleDescAttr(tupleDescriptor, nodeClusterAttno - 1)->attisdropped &&
|
|
||||||
!isNullArray[nodeClusterAttno - 1])
|
|
||||||
{
|
{
|
||||||
Name nodeClusterName =
|
Name nodeClusterName =
|
||||||
DatumGetName(datumArray[nodeClusterAttno - 1]);
|
DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
|
||||||
char *nodeClusterString = NameStr(*nodeClusterName);
|
char *nodeClusterString = NameStr(*nodeClusterName);
|
||||||
strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
|
strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (nAtts > Anum_pg_dist_node_nodeisclone)
|
int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor);
|
||||||
|
int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor);
|
||||||
|
|
||||||
|
if (!isNullArray[nodeIsCloneIdx])
|
||||||
{
|
{
|
||||||
AttrNumber nodeIsCloneAttno = get_attnum(pgDistNodeRelId, "nodeisclone");
|
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneIdx]);
|
||||||
if (nodeIsCloneAttno > 0 &&
|
|
||||||
!TupleDescAttr(tupleDescriptor, nodeIsCloneAttno - 1)->attisdropped &&
|
|
||||||
!isNullArray[nodeIsCloneAttno - 1])
|
|
||||||
{
|
|
||||||
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneAttno - 1]);
|
|
||||||
}
|
}
|
||||||
AttrNumber nodePrimaryNodeIdAttno = get_attnum(pgDistNodeRelId,
|
|
||||||
"nodeprimarynodeid");
|
if (!isNullArray[nodePrimaryNodeIdIdx])
|
||||||
if (nodePrimaryNodeIdAttno > 0 &&
|
|
||||||
!TupleDescAttr(tupleDescriptor, nodePrimaryNodeIdAttno - 1)->attisdropped &&
|
|
||||||
!isNullArray[nodePrimaryNodeIdAttno - 1])
|
|
||||||
{
|
{
|
||||||
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[
|
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[nodePrimaryNodeIdIdx]);
|
||||||
nodePrimaryNodeIdAttno - 1])
|
|
||||||
;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pfree(datumArray);
|
pfree(datumArray);
|
||||||
|
|
@ -3443,6 +3449,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr.
|
||||||
|
*
|
||||||
|
* nodeprimarynodeid attr was added to table pg_dist_node using alter operation
|
||||||
|
* after the version where Citus started supporting downgrades, and it's one of
|
||||||
|
* the two columns that we've introduced to pg_dist_node since then.
|
||||||
|
*
|
||||||
|
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
|
||||||
|
* Natts_pg_dist_node and when this happens, then we know that attrnum
|
||||||
|
* nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but
|
||||||
|
* tupleDesc->natts - 1.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc)
|
||||||
|
{
|
||||||
|
return tupleDesc->natts == Natts_pg_dist_node
|
||||||
|
? (Anum_pg_dist_node_nodeprimarynodeid - 1)
|
||||||
|
: tupleDesc->natts - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr.
|
||||||
|
*
|
||||||
|
* Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar
|
||||||
|
* calculation for nodeisclone attribute because this is column added to
|
||||||
|
* pg_dist_node after we started supporting downgrades.
|
||||||
|
*
|
||||||
|
* Only difference with the mentioned function is that we know
|
||||||
|
* the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore
|
||||||
|
* but tupleDesc->natts - 2 because we added these columns consecutively
|
||||||
|
* and we first add nodeisclone attribute and then nodeprimarynodeid attribute.
|
||||||
|
*/
|
||||||
|
static int
|
||||||
|
GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc)
|
||||||
|
{
|
||||||
|
return tupleDesc->natts == Natts_pg_dist_node
|
||||||
|
? (Anum_pg_dist_node_nodeisclone - 1)
|
||||||
|
: tupleDesc->natts - 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* StringToDatum transforms a string representation into a Datum.
|
* StringToDatum transforms a string representation into a Datum.
|
||||||
*/
|
*/
|
||||||
|
|
@ -3519,15 +3567,15 @@ UnsetMetadataSyncedForAllWorkers(void)
|
||||||
updatedAtLeastOne = true;
|
updatedAtLeastOne = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Datum *values = palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||||
|
bool *isnull = palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
bool *replace = palloc(tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
while (HeapTupleIsValid(heapTuple))
|
while (HeapTupleIsValid(heapTuple))
|
||||||
{
|
{
|
||||||
Datum values[Natts_pg_dist_node];
|
memset(values, 0, tupleDescriptor->natts * sizeof(Datum));
|
||||||
bool isnull[Natts_pg_dist_node];
|
memset(isnull, 0, tupleDescriptor->natts * sizeof(bool));
|
||||||
bool replace[Natts_pg_dist_node];
|
memset(replace, 0, tupleDescriptor->natts * sizeof(bool));
|
||||||
|
|
||||||
memset(replace, false, sizeof(replace));
|
|
||||||
memset(isnull, false, sizeof(isnull));
|
|
||||||
memset(values, 0, sizeof(values));
|
|
||||||
|
|
||||||
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
|
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
|
||||||
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
||||||
|
|
@ -3550,6 +3598,10 @@ UnsetMetadataSyncedForAllWorkers(void)
|
||||||
CatalogCloseIndexes(indstate);
|
CatalogCloseIndexes(indstate);
|
||||||
table_close(relation, NoLock);
|
table_close(relation, NoLock);
|
||||||
|
|
||||||
|
pfree(values);
|
||||||
|
pfree(isnull);
|
||||||
|
pfree(replace);
|
||||||
|
|
||||||
return updatedAtLeastOne;
|
return updatedAtLeastOne;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -141,10 +141,9 @@ static RouterPlanType GetRouterPlanType(Query *query,
|
||||||
bool hasUnresolvedParams);
|
bool hasUnresolvedParams);
|
||||||
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
|
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
|
||||||
PlannedStmt *concatPlan);
|
PlannedStmt *concatPlan);
|
||||||
static bool CheckPostPlanDistribution(bool isDistributedQuery,
|
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
|
||||||
Query *origQuery,
|
bool isDistributedQuery,
|
||||||
List *rangeTableList,
|
List *rangeTableList);
|
||||||
Query *plannedQuery);
|
|
||||||
|
|
||||||
/* Distributed planner hook */
|
/* Distributed planner hook */
|
||||||
PlannedStmt *
|
PlannedStmt *
|
||||||
|
|
@ -265,10 +264,9 @@ distributed_planner(Query *parse,
|
||||||
planContext.plan = standard_planner(planContext.query, NULL,
|
planContext.plan = standard_planner(planContext.query, NULL,
|
||||||
planContext.cursorOptions,
|
planContext.cursorOptions,
|
||||||
planContext.boundParams);
|
planContext.boundParams);
|
||||||
needsDistributedPlanning = CheckPostPlanDistribution(needsDistributedPlanning,
|
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
|
||||||
planContext.originalQuery,
|
needsDistributedPlanning,
|
||||||
rangeTableList,
|
rangeTableList);
|
||||||
planContext.query);
|
|
||||||
|
|
||||||
if (needsDistributedPlanning)
|
if (needsDistributedPlanning)
|
||||||
{
|
{
|
||||||
|
|
@ -2740,12 +2738,13 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
|
||||||
|
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
CheckPostPlanDistribution(bool isDistributedQuery,
|
CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool
|
||||||
Query *origQuery, List *rangeTableList,
|
isDistributedQuery, List *rangeTableList)
|
||||||
Query *plannedQuery)
|
|
||||||
{
|
{
|
||||||
if (isDistributedQuery)
|
if (isDistributedQuery)
|
||||||
{
|
{
|
||||||
|
Query *origQuery = planContext->originalQuery;
|
||||||
|
Query *plannedQuery = planContext->query;
|
||||||
Node *origQuals = origQuery->jointree->quals;
|
Node *origQuals = origQuery->jointree->quals;
|
||||||
Node *plannedQuals = plannedQuery->jointree->quals;
|
Node *plannedQuals = plannedQuery->jointree->quals;
|
||||||
|
|
||||||
|
|
@ -2764,6 +2763,23 @@ CheckPostPlanDistribution(bool isDistributedQuery,
|
||||||
*/
|
*/
|
||||||
if (origQuals != NULL && plannedQuals == NULL)
|
if (origQuals != NULL && plannedQuals == NULL)
|
||||||
{
|
{
|
||||||
|
bool planHasDistTable = ListContainsDistributedTableRTE(
|
||||||
|
planContext->plan->rtable, NULL);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the Postgres plan has a distributed table, we know for sure that
|
||||||
|
* the query requires distributed planning.
|
||||||
|
*/
|
||||||
|
if (planHasDistTable)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Otherwise, if the query has less range table entries after Postgres,
|
||||||
|
* planning, we should re-evaluate the distribution of the query. Postgres
|
||||||
|
* may have optimized away all citus tables, per issues 7782, 7783.
|
||||||
|
*/
|
||||||
List *rtesPostPlan = ExtractRangeTableEntryList(plannedQuery);
|
List *rtesPostPlan = ExtractRangeTableEntryList(plannedQuery);
|
||||||
if (list_length(rtesPostPlan) < list_length(rangeTableList))
|
if (list_length(rtesPostPlan) < list_length(rangeTableList))
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ most_common_vals_json AS (
|
||||||
|
|
||||||
table_reltuples_json AS (
|
table_reltuples_json AS (
|
||||||
SELECT distinct(shardid),
|
SELECT distinct(shardid),
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||||
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
||||||
FROM most_common_vals_json),
|
FROM most_common_vals_json),
|
||||||
|
|
||||||
|
|
@ -32,8 +32,8 @@ table_reltuples AS (
|
||||||
|
|
||||||
null_frac_json AS (
|
null_frac_json AS (
|
||||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||||
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
|
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
|
||||||
(json_array_elements(result::json)->>'attname')::text AS attname
|
(json_array_elements(result::json)->>'attname')::text AS attname
|
||||||
FROM most_common_vals_json
|
FROM most_common_vals_json
|
||||||
),
|
),
|
||||||
|
|
@ -49,8 +49,8 @@ most_common_vals AS (
|
||||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||||
(json_array_elements(result::json)->>'attname')::text AS attname,
|
(json_array_elements(result::json)->>'attname')::text AS attname,
|
||||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
||||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
|
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
|
||||||
FROM most_common_vals_json),
|
FROM most_common_vals_json),
|
||||||
|
|
||||||
common_val_occurrence AS (
|
common_val_occurrence AS (
|
||||||
|
|
@ -58,7 +58,7 @@ common_val_occurrence AS (
|
||||||
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
||||||
FROM most_common_vals m
|
FROM most_common_vals m
|
||||||
GROUP BY citus_table, m.attname, common_val
|
GROUP BY citus_table, m.attname, common_val
|
||||||
ORDER BY 1, 2, occurrence DESC)
|
ORDER BY 1, 2, occurrence DESC, 3)
|
||||||
|
|
||||||
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,7 +22,7 @@ most_common_vals_json AS (
|
||||||
|
|
||||||
table_reltuples_json AS (
|
table_reltuples_json AS (
|
||||||
SELECT distinct(shardid),
|
SELECT distinct(shardid),
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||||
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
||||||
FROM most_common_vals_json),
|
FROM most_common_vals_json),
|
||||||
|
|
||||||
|
|
@ -32,8 +32,8 @@ table_reltuples AS (
|
||||||
|
|
||||||
null_frac_json AS (
|
null_frac_json AS (
|
||||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||||
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
|
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
|
||||||
(json_array_elements(result::json)->>'attname')::text AS attname
|
(json_array_elements(result::json)->>'attname')::text AS attname
|
||||||
FROM most_common_vals_json
|
FROM most_common_vals_json
|
||||||
),
|
),
|
||||||
|
|
@ -49,8 +49,8 @@ most_common_vals AS (
|
||||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||||
(json_array_elements(result::json)->>'attname')::text AS attname,
|
(json_array_elements(result::json)->>'attname')::text AS attname,
|
||||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
||||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
|
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
|
||||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
|
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
|
||||||
FROM most_common_vals_json),
|
FROM most_common_vals_json),
|
||||||
|
|
||||||
common_val_occurrence AS (
|
common_val_occurrence AS (
|
||||||
|
|
@ -58,7 +58,7 @@ common_val_occurrence AS (
|
||||||
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
||||||
FROM most_common_vals m
|
FROM most_common_vals m
|
||||||
GROUP BY citus_table, m.attname, common_val
|
GROUP BY citus_table, m.attname, common_val
|
||||||
ORDER BY 1, 2, occurrence DESC)
|
ORDER BY 1, 2, occurrence DESC, 3)
|
||||||
|
|
||||||
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -106,7 +106,9 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
|
||||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||||
|
|
||||||
/* insert new tuple */
|
/* insert new tuple */
|
||||||
CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple);
|
PushActiveSnapshot(GetTransactionSnapshot());
|
||||||
|
CatalogTupleInsert(pgDistTransaction, heapTuple);
|
||||||
|
PopActiveSnapshot();
|
||||||
|
|
||||||
CommandCounterIncrement();
|
CommandCounterIncrement();
|
||||||
|
|
||||||
|
|
@ -685,7 +687,7 @@ DeleteWorkerTransactions(WorkerNode *workerNode)
|
||||||
int
|
int
|
||||||
GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc)
|
GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc)
|
||||||
{
|
{
|
||||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_transaction
|
return tupleDesc->natts == Natts_pg_dist_transaction
|
||||||
? (Anum_pg_dist_transaction_outerxid - 1)
|
? (Anum_pg_dist_transaction_outerxid - 1)
|
||||||
: tupleDesc->natts - 1;
|
: tupleDesc->natts - 1;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -140,10 +140,10 @@ GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode)
|
||||||
ForgetResults(replicaConnection);
|
ForgetResults(replicaConnection);
|
||||||
CloseConnection(replicaConnection);
|
CloseConnection(replicaConnection);
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg(
|
ereport(DEBUG2, (errmsg(
|
||||||
"successfully measured replication lag: primary LSN %s, clone LSN %s",
|
"successfully measured replication lag: primary LSN %s, clone LSN %s",
|
||||||
primary_lsn_str, replica_lsn_str)));
|
primary_lsn_str, replica_lsn_str)));
|
||||||
ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
|
ereport(DEBUG1, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
|
||||||
primaryWorkerNode->workerName, primaryWorkerNode->workerPort,
|
primaryWorkerNode->workerName, primaryWorkerNode->workerPort,
|
||||||
replicaWorkerNode->workerName, replicaWorkerNode->workerPort,
|
replicaWorkerNode->workerName, replicaWorkerNode->workerPort,
|
||||||
lag_bytes)));
|
lag_bytes)));
|
||||||
|
|
@ -244,9 +244,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
||||||
freeaddrinfo(result);
|
freeaddrinfo(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
ereport(NOTICE, (errmsg("checking replication for node %s (resolved IP: %s)",
|
ereport(NOTICE, (errmsg("checking replication status of clone node %s:%d",
|
||||||
cloneHostname,
|
cloneHostname,
|
||||||
resolvedIP ? resolvedIP : "unresolved")));
|
clonePort)));
|
||||||
|
|
||||||
/* Build query to check if clone is connected and get its sync state */
|
/* Build query to check if clone is connected and get its sync state */
|
||||||
|
|
||||||
|
|
@ -278,6 +278,11 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
||||||
cloneHostname);
|
cloneHostname);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ereport(DEBUG2, (errmsg("sending replication status check query: %s to primary %s:%d",
|
||||||
|
replicationCheckQuery->data,
|
||||||
|
primaryWorkerNode->workerName,
|
||||||
|
primaryWorkerNode->workerPort)));
|
||||||
|
|
||||||
int replicationCheckResultCode = SendRemoteCommand(primaryConnection,
|
int replicationCheckResultCode = SendRemoteCommand(primaryConnection,
|
||||||
replicationCheckQuery->data);
|
replicationCheckQuery->data);
|
||||||
if (replicationCheckResultCode == 0)
|
if (replicationCheckResultCode == 0)
|
||||||
|
|
@ -305,8 +310,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
||||||
primaryWorkerNode->workerName, primaryWorkerNode->
|
primaryWorkerNode->workerName, primaryWorkerNode->
|
||||||
workerPort),
|
workerPort),
|
||||||
errdetail(
|
errdetail(
|
||||||
"The clone must be actively replicating from the specified primary node. "
|
"The clone must be actively replicating from the specified primary node"),
|
||||||
"Check that the clone is running and properly configured for replication.")));
|
errhint(
|
||||||
|
"Verify the clone is running and properly configured for replication")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* Check if clone is synchronous */
|
/* Check if clone is synchronous */
|
||||||
|
|
@ -322,9 +328,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
||||||
"cannot %s clone %s:%d as it is configured as a synchronous replica",
|
"cannot %s clone %s:%d as it is configured as a synchronous replica",
|
||||||
operation, cloneHostname, clonePort),
|
operation, cloneHostname, clonePort),
|
||||||
errdetail(
|
errdetail(
|
||||||
"Promoting a synchronous clone can cause data consistency issues. "
|
"Promoting a synchronous clone can cause data consistency issues"),
|
||||||
"Please configure it as an asynchronous replica first.")))
|
errhint(
|
||||||
;
|
"Configure clone as an asynchronous replica")));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,10 +13,6 @@
|
||||||
|
|
||||||
#include "pg_version_constants.h"
|
#include "pg_version_constants.h"
|
||||||
|
|
||||||
/* we need these for PG-18’s PushActiveSnapshot/PopActiveSnapshot APIs */
|
|
||||||
#include "access/xact.h"
|
|
||||||
#include "utils/snapmgr.h"
|
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
||||||
create_foreignscan_path( \
|
create_foreignscan_path( \
|
||||||
|
|
@ -40,14 +36,6 @@
|
||||||
/* PG-18 unified row-compare operator codes under COMPARE_* */
|
/* PG-18 unified row-compare operator codes under COMPARE_* */
|
||||||
#define ROWCOMPARE_NE COMPARE_NE
|
#define ROWCOMPARE_NE COMPARE_NE
|
||||||
|
|
||||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
|
||||||
do { \
|
|
||||||
Snapshot __snap = GetTransactionSnapshot(); \
|
|
||||||
PushActiveSnapshot(__snap); \
|
|
||||||
CatalogTupleInsert((rel), (tup)); \
|
|
||||||
PopActiveSnapshot(); \
|
|
||||||
} while (0)
|
|
||||||
|
|
||||||
#elif PG_VERSION_NUM >= PG_VERSION_17
|
#elif PG_VERSION_NUM >= PG_VERSION_17
|
||||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
||||||
create_foreignscan_path( \
|
create_foreignscan_path( \
|
||||||
|
|
@ -56,9 +44,6 @@
|
||||||
(g), (h), (i), (j), (k) \
|
(g), (h), (i), (j), (k) \
|
||||||
)
|
)
|
||||||
|
|
||||||
/* no-op wrapper on older PGs */
|
|
||||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
|
||||||
CatalogTupleInsert((rel), (tup))
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||||
|
|
@ -469,10 +454,6 @@ getStxstattarget_compat(HeapTuple tup)
|
||||||
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
|
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
|
||||||
i, k)
|
i, k)
|
||||||
|
|
||||||
/* no-op wrapper on older PGs */
|
|
||||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
|
||||||
CatalogTupleInsert((rel), (tup))
|
|
||||||
|
|
||||||
#define getProcNo_compat(a) (a->pgprocno)
|
#define getProcNo_compat(a) (a->pgprocno)
|
||||||
#define getLxid_compat(a) (a->lxid)
|
#define getLxid_compat(a) (a->lxid)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -262,6 +262,9 @@ DEPS = {
|
||||||
"multi_subquery_in_where_reference_clause": TestDeps(
|
"multi_subquery_in_where_reference_clause": TestDeps(
|
||||||
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
|
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
|
||||||
),
|
),
|
||||||
|
"subquery_in_where": TestDeps(
|
||||||
|
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
|
||||||
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -110,13 +110,100 @@ SELECT * FROM citus_stats
|
||||||
citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
|
citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
|
||||||
(9 rows)
|
(9 rows)
|
||||||
|
|
||||||
|
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
|
||||||
|
-- this tests casting numbers like 3.729223e+06 to bigint
|
||||||
|
CREATE TABLE organizations (
|
||||||
|
org_id bigint,
|
||||||
|
id int
|
||||||
|
);
|
||||||
|
SELECT create_distributed_table('organizations', 'org_id');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO organizations(org_id, id)
|
||||||
|
SELECT i, 1
|
||||||
|
FROM generate_series(1,2000000) i;
|
||||||
|
ANALYZE organizations;
|
||||||
|
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||||
|
WHERE tablename IN ('organizations')
|
||||||
|
ORDER BY 1;
|
||||||
|
attname | null_frac | most_common_vals | most_common_freqs
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
id | 0 | {1} | {1}
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- more real-world scenario:
|
||||||
|
-- outputs of pg_stats and citus_stats are NOT the same
|
||||||
|
-- but citus_stats does a fair estimation job
|
||||||
|
SELECT setseed(0.42);
|
||||||
|
setseed
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
|
||||||
|
FROM generate_series(1,11) d(i);
|
||||||
|
-- frequent customer
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
|
||||||
|
FROM generate_series(1, 57) sk(i);
|
||||||
|
-- popular product
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
|
||||||
|
FROM generate_series(1, 37) sk(i);
|
||||||
|
-- frequent customer
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
|
||||||
|
FROM generate_series(1, 33) sk(i);
|
||||||
|
ANALYZE orders;
|
||||||
|
-- pg_stats
|
||||||
|
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
|
||||||
|
WHERE tablename IN ('orders')
|
||||||
|
ORDER BY 3;
|
||||||
|
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
|
||||||
|
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
|
||||||
|
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
|
||||||
|
citus_aggregated_stats | orders | quantity | 0.492754 | {26,23,6,8,11,12,13,17,20,25,30,4,14,15,16,19,24,27,35,36,38,40} | {0.0362319,0.0289855,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
SELECT create_distributed_table('orders', 'id');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
NOTICE: copying the data has completed
|
||||||
|
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||||
|
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$citus_aggregated_stats.orders$$)
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ANALYZE orders;
|
||||||
|
-- citus_stats
|
||||||
|
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||||
|
WHERE tablename IN ('orders')
|
||||||
|
ORDER BY 3;
|
||||||
|
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
|
||||||
|
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
|
||||||
|
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
|
||||||
|
citus_aggregated_stats | orders | quantity | 0.492754 | {26,13,17,20,23,8,11,12,14,16,19,24,25,27,30,35,38,40,6} | {0.0362319,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
RESET SESSION AUTHORIZATION;
|
RESET SESSION AUTHORIZATION;
|
||||||
DROP SCHEMA citus_aggregated_stats CASCADE;
|
DROP SCHEMA citus_aggregated_stats CASCADE;
|
||||||
NOTICE: drop cascades to 6 other objects
|
NOTICE: drop cascades to 8 other objects
|
||||||
DETAIL: drop cascades to table current_check
|
DETAIL: drop cascades to table current_check
|
||||||
drop cascades to table dist_current_check
|
drop cascades to table dist_current_check
|
||||||
drop cascades to table ref_current_check
|
drop cascades to table ref_current_check
|
||||||
drop cascades to table citus_local_current_check_1870003
|
drop cascades to table citus_local_current_check_1870003
|
||||||
drop cascades to table ref_current_check_1870002
|
drop cascades to table ref_current_check_1870002
|
||||||
drop cascades to table citus_local_current_check
|
drop cascades to table citus_local_current_check
|
||||||
|
drop cascades to table organizations
|
||||||
|
drop cascades to table orders
|
||||||
DROP USER user1;
|
DROP USER user1;
|
||||||
|
|
|
||||||
|
|
@ -167,7 +167,7 @@ SELECT pg_sleep(5);
|
||||||
-- the function returns the new node id
|
-- the function returns the new node id
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
SELECT * from pg_dist_node ORDER by nodeid;
|
SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||||
|
|
@ -234,12 +234,11 @@ SET client_min_messages to 'LOG';
|
||||||
SELECT citus_promote_clone_and_rebalance(:clone_node_id);
|
SELECT citus_promote_clone_and_rebalance(:clone_node_id);
|
||||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 3), original primary localhost:xxxxx (ID 1)
|
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 3), original primary localhost:xxxxx (ID 1)
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 1)
|
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 1)
|
||||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 1)
|
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 1)
|
||||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
|
||||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||||
NOTICE: Clone node localhost:xxxxx (ID 3) has been successfully promoted.
|
NOTICE: Clone node localhost:xxxxx (ID 3) has been successfully promoted.
|
||||||
|
|
|
||||||
|
|
@ -17,9 +17,10 @@ SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
-- this should fail as it is not a valid replica of worker_1
|
-- this should fail as it is not a valid replica of worker_1
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
DETAIL: The clone must be actively replicating from the specified primary node
|
||||||
|
HINT: Verify the clone is running and properly configured for replication
|
||||||
SELECT * from pg_dist_node ORDER by nodeid;
|
SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
@ -71,9 +72,10 @@ SELECT nodename, nodeport, count(shardid) FROM pg_dist_shard_placement GROUP BY
|
||||||
-- Try to add replica of worker_node2 as a clone of worker_node1
|
-- Try to add replica of worker_node2 as a clone of worker_node1
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
DETAIL: The clone must be actively replicating from the specified primary node
|
||||||
|
HINT: Verify the clone is running and properly configured for replication
|
||||||
-- Test 1: Try to promote a non-existent clone node
|
-- Test 1: Try to promote a non-existent clone node
|
||||||
SELECT citus_promote_clone_and_rebalance(clone_nodeid =>99999);
|
SELECT citus_promote_clone_and_rebalance(clone_nodeid =>99999);
|
||||||
ERROR: Clone node with ID 99999 not found.
|
ERROR: Clone node with ID 99999 not found.
|
||||||
|
|
@ -87,7 +89,7 @@ ERROR: Node localhost:xxxxx (ID 1) is not a valid clone or its primary node ID
|
||||||
-- register the new node as a clone, This should pass
|
-- register the new node as a clone, This should pass
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
SELECT * from pg_dist_node ORDER by nodeid;
|
SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||||
|
|
@ -108,12 +110,11 @@ SELECT :clone_node_id;
|
||||||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id, rebalance_strategy => 'invalid_strategy');
|
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id, rebalance_strategy => 'invalid_strategy');
|
||||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 2)
|
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 2)
|
||||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 2)
|
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 2)
|
||||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
|
||||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||||
NOTICE: Clone node localhost:xxxxx (ID 4) has been successfully promoted.
|
NOTICE: Clone node localhost:xxxxx (ID 4) has been successfully promoted.
|
||||||
|
|
@ -133,9 +134,10 @@ BEGIN;
|
||||||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id);
|
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id);
|
||||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
DETAIL: The clone must be actively replicating from the specified primary node
|
||||||
|
HINT: Verify the clone is running and properly configured for replication
|
||||||
ROLLBACK;
|
ROLLBACK;
|
||||||
-- Verify no data is lost after rooling back the transaction
|
-- Verify no data is lost after rooling back the transaction
|
||||||
SELECT COUNT(*) from backup_test;
|
SELECT COUNT(*) from backup_test;
|
||||||
|
|
@ -163,7 +165,7 @@ SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
SELECT master_add_node('localhost', :worker_3_port) AS nodeid_3 \gset
|
SELECT master_add_node('localhost', :worker_3_port) AS nodeid_3 \gset
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_3_port, 'localhost', :worker_3_port) AS clone_node_id_3 \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_3_port, 'localhost', :worker_3_port) AS clone_node_id_3 \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
set citus.shard_count = 100;
|
set citus.shard_count = 100;
|
||||||
CREATE TABLE backup_test2(id int, value text);
|
CREATE TABLE backup_test2(id int, value text);
|
||||||
|
|
@ -217,12 +219,11 @@ SET client_min_messages to 'LOG';
|
||||||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id_3);
|
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id_3);
|
||||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 6), original primary localhost:xxxxx (ID 5)
|
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 6), original primary localhost:xxxxx (ID 5)
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 5)
|
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 5)
|
||||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 5)
|
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 5)
|
||||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
|
||||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||||
NOTICE: Clone node localhost:xxxxx (ID 6) has been successfully promoted.
|
NOTICE: Clone node localhost:xxxxx (ID 6) has been successfully promoted.
|
||||||
|
|
|
||||||
|
|
@ -27,15 +27,17 @@ SELECT master_remove_node('localhost', :follower_worker_2_port);
|
||||||
-- this should fail as the replica is a synchronous replica that is not allowed
|
-- this should fail as the replica is a synchronous replica that is not allowed
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
||||||
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
|
DETAIL: Promoting a synchronous clone can cause data consistency issues
|
||||||
|
HINT: Configure clone as an asynchronous replica
|
||||||
-- this should fail as the replica is a synchronous replica that is not allowed
|
-- this should fail as the replica is a synchronous replica that is not allowed
|
||||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
||||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||||
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
||||||
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
|
DETAIL: Promoting a synchronous clone can cause data consistency issues
|
||||||
|
HINT: Configure clone as an asynchronous replica
|
||||||
SELECT * from pg_dist_node ORDER by nodeid;
|
SELECT * from pg_dist_node ORDER by nodeid;
|
||||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
|
|
||||||
|
|
@ -1657,7 +1657,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
SHOW citus.version;
|
SHOW citus.version;
|
||||||
citus.version
|
citus.version
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
13.2devel
|
13.2.0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- ensure no unexpected objects were created outside pg_catalog
|
-- ensure no unexpected objects were created outside pg_catalog
|
||||||
|
|
|
||||||
|
|
@ -1253,10 +1253,53 @@ SELECT vkey, pkey FROM t3;
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
-- Redundant WHERE clause with distributed parititioned table
|
||||||
|
CREATE TABLE a (a int);
|
||||||
|
INSERT INTO a VALUES (1);
|
||||||
|
-- populated distributed partitioned table
|
||||||
|
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
|
||||||
|
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
|
||||||
|
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
|
||||||
|
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
|
||||||
|
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
|
||||||
|
SELECT create_distributed_table('partitioned_table', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
insert into partitioned_table(a) select i from generate_series(1,160) i;
|
||||||
|
-- test citus table in init plan
|
||||||
|
-- with redundant WHERE clause
|
||||||
|
SELECT CASE WHEN EXISTS (
|
||||||
|
SELECT * FROM partitioned_table
|
||||||
|
) THEN 1 ELSE 0 END AS table_non_empty
|
||||||
|
FROM a
|
||||||
|
WHERE true;
|
||||||
|
table_non_empty
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- test citus table in init plan
|
||||||
|
-- with redundant WHERE clause involving
|
||||||
|
-- a citus table
|
||||||
|
SELECT CASE WHEN EXISTS (
|
||||||
|
SELECT * FROM partitioned_table
|
||||||
|
) THEN 1 ELSE 0 END AS table_non_empty
|
||||||
|
FROM a
|
||||||
|
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
|
||||||
|
table_non_empty
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
DROP TABLE local_table;
|
DROP TABLE local_table;
|
||||||
DROP TABLE t0;
|
DROP TABLE t0;
|
||||||
DROP TABLE t1;
|
DROP TABLE t1;
|
||||||
DROP TABLE t3;
|
DROP TABLE t3;
|
||||||
DROP TABLE t7;
|
DROP TABLE t7;
|
||||||
|
DROP TABLE a;
|
||||||
|
DROP TABLE partitioned_table CASCADE;
|
||||||
DROP SCHEMA subquery_in_where CASCADE;
|
DROP SCHEMA subquery_in_where CASCADE;
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,68 @@ SELECT * FROM citus_stats
|
||||||
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
|
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
|
||||||
ORDER BY 1;
|
ORDER BY 1;
|
||||||
|
|
||||||
|
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
|
||||||
|
-- this tests casting numbers like 3.729223e+06 to bigint
|
||||||
|
|
||||||
|
CREATE TABLE organizations (
|
||||||
|
org_id bigint,
|
||||||
|
id int
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_distributed_table('organizations', 'org_id');
|
||||||
|
|
||||||
|
INSERT INTO organizations(org_id, id)
|
||||||
|
SELECT i, 1
|
||||||
|
FROM generate_series(1,2000000) i;
|
||||||
|
|
||||||
|
ANALYZE organizations;
|
||||||
|
|
||||||
|
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||||
|
WHERE tablename IN ('organizations')
|
||||||
|
ORDER BY 1;
|
||||||
|
|
||||||
|
-- more real-world scenario:
|
||||||
|
-- outputs of pg_stats and citus_stats are NOT the same
|
||||||
|
-- but citus_stats does a fair estimation job
|
||||||
|
|
||||||
|
SELECT setseed(0.42);
|
||||||
|
|
||||||
|
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
|
||||||
|
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
|
||||||
|
FROM generate_series(1,11) d(i);
|
||||||
|
|
||||||
|
-- frequent customer
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
|
||||||
|
FROM generate_series(1, 57) sk(i);
|
||||||
|
|
||||||
|
-- popular product
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
|
||||||
|
FROM generate_series(1, 37) sk(i);
|
||||||
|
|
||||||
|
-- frequent customer
|
||||||
|
INSERT INTO orders(id, custid, product, quantity)
|
||||||
|
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
|
||||||
|
FROM generate_series(1, 33) sk(i);
|
||||||
|
|
||||||
|
ANALYZE orders;
|
||||||
|
|
||||||
|
-- pg_stats
|
||||||
|
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
|
||||||
|
WHERE tablename IN ('orders')
|
||||||
|
ORDER BY 3;
|
||||||
|
|
||||||
|
SELECT create_distributed_table('orders', 'id');
|
||||||
|
ANALYZE orders;
|
||||||
|
|
||||||
|
-- citus_stats
|
||||||
|
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||||
|
WHERE tablename IN ('orders')
|
||||||
|
ORDER BY 3;
|
||||||
|
|
||||||
RESET SESSION AUTHORIZATION;
|
RESET SESSION AUTHORIZATION;
|
||||||
DROP SCHEMA citus_aggregated_stats CASCADE;
|
DROP SCHEMA citus_aggregated_stats CASCADE;
|
||||||
DROP USER user1;
|
DROP USER user1;
|
||||||
|
|
|
||||||
|
|
@ -929,10 +929,42 @@ where TRUE or (((t3.vkey) >= (select
|
||||||
-- Distributed table t3 is now empty
|
-- Distributed table t3 is now empty
|
||||||
SELECT vkey, pkey FROM t3;
|
SELECT vkey, pkey FROM t3;
|
||||||
|
|
||||||
|
-- Redundant WHERE clause with distributed parititioned table
|
||||||
|
CREATE TABLE a (a int);
|
||||||
|
INSERT INTO a VALUES (1);
|
||||||
|
|
||||||
|
-- populated distributed partitioned table
|
||||||
|
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
|
||||||
|
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
|
||||||
|
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
|
||||||
|
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
|
||||||
|
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
|
||||||
|
SELECT create_distributed_table('partitioned_table', 'a');
|
||||||
|
insert into partitioned_table(a) select i from generate_series(1,160) i;
|
||||||
|
|
||||||
|
-- test citus table in init plan
|
||||||
|
-- with redundant WHERE clause
|
||||||
|
SELECT CASE WHEN EXISTS (
|
||||||
|
SELECT * FROM partitioned_table
|
||||||
|
) THEN 1 ELSE 0 END AS table_non_empty
|
||||||
|
FROM a
|
||||||
|
WHERE true;
|
||||||
|
|
||||||
|
-- test citus table in init plan
|
||||||
|
-- with redundant WHERE clause involving
|
||||||
|
-- a citus table
|
||||||
|
SELECT CASE WHEN EXISTS (
|
||||||
|
SELECT * FROM partitioned_table
|
||||||
|
) THEN 1 ELSE 0 END AS table_non_empty
|
||||||
|
FROM a
|
||||||
|
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
|
||||||
|
|
||||||
DROP TABLE local_table;
|
DROP TABLE local_table;
|
||||||
DROP TABLE t0;
|
DROP TABLE t0;
|
||||||
DROP TABLE t1;
|
DROP TABLE t1;
|
||||||
DROP TABLE t3;
|
DROP TABLE t3;
|
||||||
DROP TABLE t7;
|
DROP TABLE t7;
|
||||||
|
DROP TABLE a;
|
||||||
|
DROP TABLE partitioned_table CASCADE;
|
||||||
DROP SCHEMA subquery_in_where CASCADE;
|
DROP SCHEMA subquery_in_where CASCADE;
|
||||||
SET search_path TO public;
|
SET search_path TO public;
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue