Compare commits

...

10 Commits

Author SHA1 Message Date
ibrahim halatci e8e06d8d0c version string update for the relese 2025-08-29 09:40:13 +00:00
Naisila Puka f79dd61a92 Order same frequency common values, and add test (#8167)
Added similar test to what @colm-mchugh tested in the original PR
https://github.com/citusdata/citus/pull/8026#discussion_r2279021218
2025-08-29 01:44:08 +03:00
Naisila Puka 274504465d Fix invalid input syntax for type bigint (#8166)
Fixes #8164
2025-08-29 01:43:57 +03:00
Colm c39df81f69 In UPDATE deparse, check for a subscript before processing the targets. (#8155)
DESCRIPTION: Checking first for the presence of subscript ops avoids a
shallow copy of the target list for target lists where there are no
array or json subscripts.

Commit 0c1b31c fixed a bug in UPDATE statements with array or json
subscripting in the target list. This commit modifies that to first
check that the target list has a subscript and avoid a shallow copy of
the target list for UPDATE statements with no array/json subscripting.
2025-08-27 13:55:02 +00:00
Colm 91cae1fb29 Fix bug in redundant WHERE clause detection. (#8162)
Need to also check Postgres plan's rangetables for relations used in Initplans.

DESCRIPTION: Fix a bug in redundant WHERE clause detection; we need to
additionally check the Postgres plan's range tables for the presence of
citus tables, to account for relations that are referenced from scalar
subqueries.

There is a fundamental flaw in 4139370, the assumption that, after
Postgres planning has completed, all tables used in a query can be
obtained by walking the query tree. This is not the case for scalar
subqueries, which will be referenced by `PARAM` nodes. The fix adds an
additional check of the Postgres plan range tables; if there is at least
one citus table in there we do not need to change the needs distributed
planning flag.

Fixes #8159
2025-08-27 13:55:02 +00:00
Muhammad Usama 0b9acbeb3d Enhance clone node replication status messages (#8152)
- Downgrade replication lag reporting from NOTICE to DEBUG to reduce
noise and improve regression test stability.
- Add hints to certain replication status messages for better clarity.
- Update expected output files accordingly.
2025-08-26 23:23:40 +03:00
Naisila Puka 754e2986ba Bump PG versions to 17.6, 16.10, 15.14 (#8142)
Sister PR https://github.com/citusdata/the-process/pull/172

Fixes #8134 #8149
2025-08-25 15:35:25 +03:00
Onur Tirtir 2ba22104eb Fix incorrect usage of TupleDescSize() in #7950, #8120, #8124, #8121 and #8114 (#8146)
In #7950, #8120, #8124, #8121 and #8114, TupleDescSize() was used to
check whether the tuple length is `Natts_<catalog_table_name>`. However
this was wrong because TupleDescSize() returns the size of the
tupledesc, not the length of it (i.e., number of attributes).

Actually `TupleDescSize(tupleDesc) == Natts_<catalog_table_name>` was
always returning false but this didn't cause any problems because using
`tupleDesc->natts - 1` when `tupleDesc->natts ==
Natts_<catalog_table_name>` too had the same effect as using
`Anum_<column_added_later> - 1` in that case.

So this also makes me thinking of always returning `tupleDesc->natts -
1` (or `tupleDesc->natts - 2` if it's the second to last attribute) but
being more explicit seems more useful.

Even more, in the future we should probably switch to a different
implementation if / when we think of adding more columns to those
tables. We should probably scan non-dropped attributes of the relation,
enumerate them and return the attribute number of the one that we're
looking for, but seems this is not needed right now.

(cherry picked from commit 439870f3a9)
2025-08-25 12:02:33 +03:00
Onur Tirtir b03b249f6d Fix memory corruptions around pg_dist_node accessors after a Citus downgrade is followed by an upgrade (#8144)
Unlike what has been fixed in #7950, #8120, #8124, #8121 and #8114, this
was not an issue in older releases but is a potential issue to be
introduced by the current (13.2) release because in one of recent
commits (#8122) two columns has been added to pg_dist_node. In other
words, none of the older releases since we started supporting downgrades
added new columns to pg_dist_node.

The mentioned PR actually attempted avoiding these kind of issues in one
of the code-paths but not in some others.

So, this PR, avoids memory corruptions around pg_dist_node accessors in
a standardized way (as implemented in other example PRs) and in all
code-paths.

(cherry picked from commit 785287c58f)
2025-08-25 12:02:33 +03:00
Onur Tirtir a7d529813b Add changelog for 13.2.0 (#8130)
(cherry picked from commit 683ead9607)
2025-08-21 12:25:26 +03:00
25 changed files with 564 additions and 166 deletions

View File

@ -73,7 +73,7 @@ USER citus
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
FROM base AS pg15
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.13
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.14
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -85,7 +85,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf
FROM base AS pg16
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.9
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.10
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
RUN rm .pgenv-staging/config/default.conf
FROM base AS pg17
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.5
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.6
RUN rm .pgenv/src/*.tar*
RUN make -C .pgenv/src/postgresql-*/ clean
RUN make -C .pgenv/src/postgresql-*/src/include install
@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
RUN sudo chown --from=root:root citus:citus -R ~
# sets default pg version
RUN pgenv switch 17.5
RUN pgenv switch 17.6
# make connecting to the coordinator easy
ENV PGPORT=9700

View File

@ -31,12 +31,12 @@ jobs:
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
style_checker_tools_version: "0.8.18"
sql_snapshot_pg_version: "17.5"
image_suffix: "-vb17c33b"
pg15_version: '{ "major": "15", "full": "15.13" }'
pg16_version: '{ "major": "16", "full": "16.9" }'
pg17_version: '{ "major": "17", "full": "17.5" }'
upgrade_pg_versions: "15.13-16.9-17.5"
sql_snapshot_pg_version: "17.6"
image_suffix: "-v4df94a0"
pg15_version: '{ "major": "15", "full": "15.14" }'
pg16_version: '{ "major": "16", "full": "16.10" }'
pg17_version: '{ "major": "17", "full": "17.6" }'
upgrade_pg_versions: "15.14-16.10-17.6"
steps:
# Since GHA jobs need at least one step we use a noop step here.
- name: Set up parameters

View File

@ -1,3 +1,52 @@
### citus v13.2.0 (August 18, 2025) ###
* Adds `citus_add_clone_node()`, `citus_add_clone_node_with_nodeid()`,
`citus_remove_clone_node()` and `citus_remove_clone_node_with_nodeid()`
UDFs to support snapshot-based node splits. This feature allows promoting
a streaming replica (clone) to a primary node and rebalancing shards
between the original and newly promoted node without requiring a full data
copy. This greatly reduces rebalance times for scale-out operations when
the new node already has the data via streaming replication (#8122)
* Improves performance of shard rebalancer by parallelizing moves and removing
bottlenecks that blocked concurrent logical-replication transfers. This
reduces rebalance windows especially for clusters with large reference
tables and allows multiple shard transfers to run in parallel (#7983)
* Adds citus.enable_recurring_outer_join_pushdown GUC (enabled by default)
to allow pushing down LEFT/RIGHT outer joins having a reference table in
the outer side and a distributed table on the inner side (e.g.,
\<reference table\> LEFT JOIN \<distributed table\>) (#7973)
* Adds citus.enable_local_fast_path_query_optimization (enabled by default)
GUC to avoid unnecessary query deparsing to improve performance of
fast-path queries targeting local shards (#8035)
* Adds `citus_stats()` UDF that can be used to retrieve distributed `pg_stats`
for the provided Citus table. (#8026)
* Avoids automatically creating citus_columnar when there are no relations
using it (#8081)
* Makes sure to check if the distribution key is in the target list before
pushing down a query with a union and an outer join (#8092)
* Fixes a bug in EXPLAIN ANALYZE to prevent unintended (duplicate) execution
of the (sub)plans during the explain phase (#8017)
* Fixes potential memory corruptions that could happen when accessing
various catalog tables after a Citus downgrade is followed by a Citus
upgrade (#7950, #8120, #8124, #8121, #8114, #8146)
* Fixes UPDATE statements with indirection and array/jsonb subscripting with
more than one field (#7675)
* Fixes an assertion failure that happens when an expression in the query
references a CTE (#8106)
* Fixes an assertion failure that happens when querying a view that is
defined on distributed tables (#8136)
### citus v13.1.0 (May 30th, 2025) ###
* Adds `citus_stat_counters` view that can be used to query

18
configure vendored
View File

@ -1,6 +1,6 @@
#! /bin/sh
# Guess values for system-dependent variables and create Makefiles.
# Generated by GNU Autoconf 2.69 for Citus 13.2devel.
# Generated by GNU Autoconf 2.69 for Citus 13.2.0.
#
#
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
@ -579,8 +579,8 @@ MAKEFLAGS=
# Identity of this package.
PACKAGE_NAME='Citus'
PACKAGE_TARNAME='citus'
PACKAGE_VERSION='13.2devel'
PACKAGE_STRING='Citus 13.2devel'
PACKAGE_VERSION='13.2.0'
PACKAGE_STRING='Citus 13.2.0'
PACKAGE_BUGREPORT=''
PACKAGE_URL=''
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
# Omit some internal or obsolete options to make the list less imposing.
# This message is too long to be a string in the A/UX 3.1 sh.
cat <<_ACEOF
\`configure' configures Citus 13.2devel to adapt to many kinds of systems.
\`configure' configures Citus 13.2.0 to adapt to many kinds of systems.
Usage: $0 [OPTION]... [VAR=VALUE]...
@ -1324,7 +1324,7 @@ fi
if test -n "$ac_init_help"; then
case $ac_init_help in
short | recursive ) echo "Configuration of Citus 13.2devel:";;
short | recursive ) echo "Configuration of Citus 13.2.0:";;
esac
cat <<\_ACEOF
@ -1429,7 +1429,7 @@ fi
test -n "$ac_init_help" && exit $ac_status
if $ac_init_version; then
cat <<\_ACEOF
Citus configure 13.2devel
Citus configure 13.2.0
generated by GNU Autoconf 2.69
Copyright (C) 2012 Free Software Foundation, Inc.
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
This file contains any messages produced by compilers while
running configure, to aid debugging if configure makes a mistake.
It was created by Citus $as_me 13.2devel, which was
It was created by Citus $as_me 13.2.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
$ $0 $@
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
# report actual input values of CONFIG_FILES etc. instead of their
# values after options handling.
ac_log="
This file was extended by Citus $as_me 13.2devel, which was
This file was extended by Citus $as_me 13.2.0, which was
generated by GNU Autoconf 2.69. Invocation command line was
CONFIG_FILES = $CONFIG_FILES
@ -5455,7 +5455,7 @@ _ACEOF
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
ac_cs_version="\\
Citus config.status 13.2devel
Citus config.status 13.2.0
configured by $0, generated by GNU Autoconf 2.69,
with options \\"\$ac_cs_config\\"

View File

@ -5,7 +5,7 @@
# everyone needing autoconf installed, the resulting files are checked
# into the SCM.
AC_INIT([Citus], [13.2devel])
AC_INIT([Citus], [13.2.0])
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
# we'll need sed and awk for some of the version commands

View File

@ -659,8 +659,9 @@ SaveStripeSkipList(RelFileLocator relfilelocator, uint64 stripe,
nulls[Anum_columnar_chunk_minimum_value - 1] = true;
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
}
PushActiveSnapshot(GetTransactionSnapshot());
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
PopActiveSnapshot();
}
}
@ -2132,7 +2133,7 @@ GetHighestUsedRowNumber(uint64 storageId)
static int
GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_columnar_stripe
return tupleDesc->natts == Natts_columnar_stripe
? (Anum_columnar_stripe_first_row_number - 1)
: tupleDesc->natts - 1;
}

View File

@ -1837,6 +1837,62 @@ ensure_update_targetlist_in_param_order(List *targetList)
}
/*
* isSubsRef checks if a given node is a SubscriptingRef or can be
* reached through an implicit coercion.
*/
static
bool
isSubsRef(Node *node)
{
if (node == NULL)
{
return false;
}
if (IsA(node, CoerceToDomain))
{
CoerceToDomain *coerceToDomain = (CoerceToDomain *) node;
if (coerceToDomain->coercionformat != COERCE_IMPLICIT_CAST)
{
/* not an implicit coercion, cannot reach to a SubscriptingRef */
return false;
}
node = (Node *) coerceToDomain->arg;
}
return (IsA(node, SubscriptingRef));
}
/*
* checkTlistForSubsRef - checks if any target entry in the list contains a
* SubscriptingRef or can be reached through an implicit coercion. Used by
* ExpandMergedSubscriptingRefEntries() to identify if any target entries
* need to be expanded - if not the original target list is preserved.
*/
static
bool
checkTlistForSubsRef(List *targetEntryList)
{
ListCell *tgtCell = NULL;
foreach(tgtCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
Expr *expr = targetEntry->expr;
if (isSubsRef((Node *) expr))
{
return true;
}
}
return false;
}
/*
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
* each one that references a SubscriptingRef node that indicates multiple (field)
@ -1848,6 +1904,12 @@ ExpandMergedSubscriptingRefEntries(List *targetEntryList)
List *newTargetEntryList = NIL;
ListCell *tgtCell = NULL;
if (!checkTlistForSubsRef(targetEntryList))
{
/* No subscripting refs found, return original list */
return targetEntryList;
}
foreach(tgtCell, targetEntryList)
{
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);

View File

@ -801,7 +801,7 @@ DistributedSequenceList(void)
int
GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_object
return tupleDesc->natts == Natts_pg_dist_object
? (Anum_pg_dist_object_force_delegation - 1)
: tupleDesc->natts - 1;
}

View File

@ -4486,7 +4486,7 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
int
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_partition
return tupleDesc->natts == Natts_pg_dist_partition
? (Anum_pg_dist_partition_autoconverted - 1)
: tupleDesc->natts - 1;
}
@ -4506,7 +4506,7 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
int
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
return tupleDesc->natts == Natts_pg_dist_background_task
? (Anum_pg_dist_background_task_nodes_involved - 1)
: tupleDesc->natts - 1;
}

View File

@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
bool localOnly);
static int GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc);
static int GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc);
static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex,
@ -1196,6 +1198,11 @@ ActivateNodeList(MetadataSyncContext *context)
void
ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
{
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
TupleDesc copiedTupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
table_close(pgDistNode, AccessShareLock);
/*
* Set the node as primary and active.
*/
@ -1203,9 +1210,13 @@ ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
ObjectIdGetDatum(PrimaryNodeRoleId()));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
BoolGetDatum(true));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisclone,
SetWorkerColumnLocalOnly(workerNode,
GetNodeIsCloneAttrIndexInPgDistNode(copiedTupleDescriptor) +
1,
BoolGetDatum(false));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid,
SetWorkerColumnLocalOnly(workerNode,
GetNodePrimaryNodeIdAttrIndexInPgDistNode(
copiedTupleDescriptor) + 1,
Int32GetDatum(0));
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
BoolGetDatum(true));
@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
{
const bool indexOK = true;
ScanKeyData scanKey[1];
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
ScanKeyData scanKey[1];
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
newNodeName, newNodePort)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
isnull[Anum_pg_dist_node_nodeport - 1] = false;
replace[Anum_pg_dist_node_nodeport - 1] = true;
@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
systable_endscan(scanDescriptor);
table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
}
@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
PG_RETURN_VOID();
}
@ -2831,9 +2847,9 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
if (heapTuple == NULL)
{
@ -2841,7 +2857,6 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
workerNode->workerName, workerNode->workerPort)));
}
memset(replace, 0, sizeof(replace));
values[columnIndex - 1] = value;
isnull[columnIndex - 1] = false;
replace[columnIndex - 1] = true;
@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
table_close(pgDistNode, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
return newWorkerNode;
}
@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void)
static void
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
{
Datum values[Natts_pg_dist_node];
bool isNulls[Natts_pg_dist_node];
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
bool *isNulls = palloc0(tupleDescriptor->natts * sizeof(bool));
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
/* form new shard tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
@ -3264,17 +3282,15 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
nodeMetadata->shouldHaveShards);
values[Anum_pg_dist_node_nodeisclone - 1] = BoolGetDatum(
nodeMetadata->nodeisclone);
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum(
nodeMetadata->nodeprimarynodeid);
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
values[GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor)] =
BoolGetDatum(nodeMetadata->nodeisclone);
values[GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor)] =
Int32GetDatum(nodeMetadata->nodeprimarynodeid);
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple);
PushActiveSnapshot(GetTransactionSnapshot());
CatalogTupleInsert(pgDistNode, heapTuple);
PopActiveSnapshot();
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
@ -3283,6 +3299,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
/* close relation */
table_close(pgDistNode, NoLock);
pfree(values);
pfree(isNulls);
}
@ -3397,43 +3416,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
1]);
/*
* Attributes above this line are guaranteed to be present at the
* exact defined attribute number. Atleast till now. If you are droping or
* adding any of the above columns consider adjusting the code above
* nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case
* of extension creation/upgrade, master_initialize_node_metadata function is
* called before the nodecluster column is added to pg_dist_node table.
*/
Oid pgDistNodeRelId = RelationGetRelid(pgDistNode);
AttrNumber nodeClusterAttno = get_attnum(pgDistNodeRelId, "nodecluster");
if (nodeClusterAttno > 0 &&
!TupleDescAttr(tupleDescriptor, nodeClusterAttno - 1)->attisdropped &&
!isNullArray[nodeClusterAttno - 1])
if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
{
Name nodeClusterName =
DatumGetName(datumArray[nodeClusterAttno - 1]);
DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
char *nodeClusterString = NameStr(*nodeClusterName);
strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
}
if (nAtts > Anum_pg_dist_node_nodeisclone)
int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor);
int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor);
if (!isNullArray[nodeIsCloneIdx])
{
AttrNumber nodeIsCloneAttno = get_attnum(pgDistNodeRelId, "nodeisclone");
if (nodeIsCloneAttno > 0 &&
!TupleDescAttr(tupleDescriptor, nodeIsCloneAttno - 1)->attisdropped &&
!isNullArray[nodeIsCloneAttno - 1])
{
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneAttno - 1]);
}
AttrNumber nodePrimaryNodeIdAttno = get_attnum(pgDistNodeRelId,
"nodeprimarynodeid");
if (nodePrimaryNodeIdAttno > 0 &&
!TupleDescAttr(tupleDescriptor, nodePrimaryNodeIdAttno - 1)->attisdropped &&
!isNullArray[nodePrimaryNodeIdAttno - 1])
{
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[
nodePrimaryNodeIdAttno - 1])
;
}
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneIdx]);
}
if (!isNullArray[nodePrimaryNodeIdIdx])
{
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[nodePrimaryNodeIdIdx]);
}
pfree(datumArray);
@ -3443,6 +3449,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
}
/*
* GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr.
*
* nodeprimarynodeid attr was added to table pg_dist_node using alter operation
* after the version where Citus started supporting downgrades, and it's one of
* the two columns that we've introduced to pg_dist_node since then.
*
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
* Natts_pg_dist_node and when this happens, then we know that attrnum
* nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but
* tupleDesc->natts - 1.
*/
static int
GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc)
{
return tupleDesc->natts == Natts_pg_dist_node
? (Anum_pg_dist_node_nodeprimarynodeid - 1)
: tupleDesc->natts - 1;
}
/*
* GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr.
*
* Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar
* calculation for nodeisclone attribute because this is column added to
* pg_dist_node after we started supporting downgrades.
*
* Only difference with the mentioned function is that we know
* the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore
* but tupleDesc->natts - 2 because we added these columns consecutively
* and we first add nodeisclone attribute and then nodeprimarynodeid attribute.
*/
static int
GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc)
{
return tupleDesc->natts == Natts_pg_dist_node
? (Anum_pg_dist_node_nodeisclone - 1)
: tupleDesc->natts - 2;
}
/*
* StringToDatum transforms a string representation into a Datum.
*/
@ -3519,15 +3567,15 @@ UnsetMetadataSyncedForAllWorkers(void)
updatedAtLeastOne = true;
}
Datum *values = palloc(tupleDescriptor->natts * sizeof(Datum));
bool *isnull = palloc(tupleDescriptor->natts * sizeof(bool));
bool *replace = palloc(tupleDescriptor->natts * sizeof(bool));
while (HeapTupleIsValid(heapTuple))
{
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
memset(replace, false, sizeof(replace));
memset(isnull, false, sizeof(isnull));
memset(values, 0, sizeof(values));
memset(values, 0, tupleDescriptor->natts * sizeof(Datum));
memset(isnull, 0, tupleDescriptor->natts * sizeof(bool));
memset(replace, 0, tupleDescriptor->natts * sizeof(bool));
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
@ -3550,6 +3598,10 @@ UnsetMetadataSyncedForAllWorkers(void)
CatalogCloseIndexes(indstate);
table_close(relation, NoLock);
pfree(values);
pfree(isnull);
pfree(replace);
return updatedAtLeastOne;
}

View File

@ -141,10 +141,9 @@ static RouterPlanType GetRouterPlanType(Query *query,
bool hasUnresolvedParams);
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
PlannedStmt *concatPlan);
static bool CheckPostPlanDistribution(bool isDistributedQuery,
Query *origQuery,
List *rangeTableList,
Query *plannedQuery);
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
bool isDistributedQuery,
List *rangeTableList);
/* Distributed planner hook */
PlannedStmt *
@ -265,10 +264,9 @@ distributed_planner(Query *parse,
planContext.plan = standard_planner(planContext.query, NULL,
planContext.cursorOptions,
planContext.boundParams);
needsDistributedPlanning = CheckPostPlanDistribution(needsDistributedPlanning,
planContext.originalQuery,
rangeTableList,
planContext.query);
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
needsDistributedPlanning,
rangeTableList);
if (needsDistributedPlanning)
{
@ -2740,12 +2738,13 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
static bool
CheckPostPlanDistribution(bool isDistributedQuery,
Query *origQuery, List *rangeTableList,
Query *plannedQuery)
CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool
isDistributedQuery, List *rangeTableList)
{
if (isDistributedQuery)
{
Query *origQuery = planContext->originalQuery;
Query *plannedQuery = planContext->query;
Node *origQuals = origQuery->jointree->quals;
Node *plannedQuals = plannedQuery->jointree->quals;
@ -2764,6 +2763,23 @@ CheckPostPlanDistribution(bool isDistributedQuery,
*/
if (origQuals != NULL && plannedQuals == NULL)
{
bool planHasDistTable = ListContainsDistributedTableRTE(
planContext->plan->rtable, NULL);
/*
* If the Postgres plan has a distributed table, we know for sure that
* the query requires distributed planning.
*/
if (planHasDistTable)
{
return true;
}
/*
* Otherwise, if the query has less range table entries after Postgres,
* planning, we should re-evaluate the distribution of the query. Postgres
* may have optimized away all citus tables, per issues 7782, 7783.
*/
List *rtesPostPlan = ExtractRangeTableEntryList(plannedQuery);
if (list_length(rtesPostPlan) < list_length(rangeTableList))
{

View File

@ -22,7 +22,7 @@ most_common_vals_json AS (
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
FROM most_common_vals_json),
@ -32,8 +32,8 @@ table_reltuples AS (
null_frac_json AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
(json_array_elements(result::json)->>'attname')::text AS attname
FROM most_common_vals_json
),
@ -49,8 +49,8 @@ most_common_vals AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
@ -58,7 +58,7 @@ common_val_occurrence AS (
sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY citus_table, m.attname, common_val
ORDER BY 1, 2, occurrence DESC)
ORDER BY 1, 2, occurrence DESC, 3)
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,

View File

@ -22,7 +22,7 @@ most_common_vals_json AS (
table_reltuples_json AS (
SELECT distinct(shardid),
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
FROM most_common_vals_json),
@ -32,8 +32,8 @@ table_reltuples AS (
null_frac_json AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
(json_array_elements(result::json)->>'attname')::text AS attname
FROM most_common_vals_json
),
@ -49,8 +49,8 @@ most_common_vals AS (
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
(json_array_elements(result::json)->>'attname')::text AS attname,
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
FROM most_common_vals_json),
common_val_occurrence AS (
@ -58,7 +58,7 @@ common_val_occurrence AS (
sum(common_freq * shard_reltuples)::bigint AS occurrence
FROM most_common_vals m
GROUP BY citus_table, m.attname, common_val
ORDER BY 1, 2, occurrence DESC)
ORDER BY 1, 2, occurrence DESC, 3)
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,

View File

@ -106,7 +106,9 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
/* insert new tuple */
CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple);
PushActiveSnapshot(GetTransactionSnapshot());
CatalogTupleInsert(pgDistTransaction, heapTuple);
PopActiveSnapshot();
CommandCounterIncrement();
@ -685,7 +687,7 @@ DeleteWorkerTransactions(WorkerNode *workerNode)
int
GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc)
{
return TupleDescSize(tupleDesc) == Natts_pg_dist_transaction
return tupleDesc->natts == Natts_pg_dist_transaction
? (Anum_pg_dist_transaction_outerxid - 1)
: tupleDesc->natts - 1;
}

View File

@ -140,10 +140,10 @@ GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode)
ForgetResults(replicaConnection);
CloseConnection(replicaConnection);
ereport(DEBUG1, (errmsg(
ereport(DEBUG2, (errmsg(
"successfully measured replication lag: primary LSN %s, clone LSN %s",
primary_lsn_str, replica_lsn_str)));
ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
ereport(DEBUG1, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
primaryWorkerNode->workerName, primaryWorkerNode->workerPort,
replicaWorkerNode->workerName, replicaWorkerNode->workerPort,
lag_bytes)));
@ -244,9 +244,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
freeaddrinfo(result);
}
ereport(NOTICE, (errmsg("checking replication for node %s (resolved IP: %s)",
ereport(NOTICE, (errmsg("checking replication status of clone node %s:%d",
cloneHostname,
resolvedIP ? resolvedIP : "unresolved")));
clonePort)));
/* Build query to check if clone is connected and get its sync state */
@ -278,6 +278,11 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
cloneHostname);
}
ereport(DEBUG2, (errmsg("sending replication status check query: %s to primary %s:%d",
replicationCheckQuery->data,
primaryWorkerNode->workerName,
primaryWorkerNode->workerPort)));
int replicationCheckResultCode = SendRemoteCommand(primaryConnection,
replicationCheckQuery->data);
if (replicationCheckResultCode == 0)
@ -305,8 +310,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
primaryWorkerNode->workerName, primaryWorkerNode->
workerPort),
errdetail(
"The clone must be actively replicating from the specified primary node. "
"Check that the clone is running and properly configured for replication.")));
"The clone must be actively replicating from the specified primary node"),
errhint(
"Verify the clone is running and properly configured for replication")));
}
/* Check if clone is synchronous */
@ -322,9 +328,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
"cannot %s clone %s:%d as it is configured as a synchronous replica",
operation, cloneHostname, clonePort),
errdetail(
"Promoting a synchronous clone can cause data consistency issues. "
"Please configure it as an asynchronous replica first.")))
;
"Promoting a synchronous clone can cause data consistency issues"),
errhint(
"Configure clone as an asynchronous replica")));
}
}

View File

@ -13,10 +13,6 @@
#include "pg_version_constants.h"
/* we need these for PG-18’s PushActiveSnapshot/PopActiveSnapshot APIs */
#include "access/xact.h"
#include "utils/snapmgr.h"
#if PG_VERSION_NUM >= PG_VERSION_18
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
create_foreignscan_path( \
@ -40,14 +36,6 @@
/* PG-18 unified row-compare operator codes under COMPARE_* */
#define ROWCOMPARE_NE COMPARE_NE
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
do { \
Snapshot __snap = GetTransactionSnapshot(); \
PushActiveSnapshot(__snap); \
CatalogTupleInsert((rel), (tup)); \
PopActiveSnapshot(); \
} while (0)
#elif PG_VERSION_NUM >= PG_VERSION_17
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
create_foreignscan_path( \
@ -56,9 +44,6 @@
(g), (h), (i), (j), (k) \
)
/* no-op wrapper on older PGs */
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
CatalogTupleInsert((rel), (tup))
#endif
#if PG_VERSION_NUM >= PG_VERSION_17
@ -469,10 +454,6 @@ getStxstattarget_compat(HeapTuple tup)
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
i, k)
/* no-op wrapper on older PGs */
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
CatalogTupleInsert((rel), (tup))
#define getProcNo_compat(a) (a->pgprocno)
#define getLxid_compat(a) (a->lxid)

View File

@ -262,6 +262,9 @@ DEPS = {
"multi_subquery_in_where_reference_clause": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
"subquery_in_where": TestDeps(
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
),
}

View File

@ -110,13 +110,100 @@ SELECT * FROM citus_stats
citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
(9 rows)
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
-- this tests casting numbers like 3.729223e+06 to bigint
CREATE TABLE organizations (
org_id bigint,
id int
);
SELECT create_distributed_table('organizations', 'org_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO organizations(org_id, id)
SELECT i, 1
FROM generate_series(1,2000000) i;
ANALYZE organizations;
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
WHERE tablename IN ('organizations')
ORDER BY 1;
attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
id | 0 | {1} | {1}
(1 row)
-- more real-world scenario:
-- outputs of pg_stats and citus_stats are NOT the same
-- but citus_stats does a fair estimation job
SELECT setseed(0.42);
setseed
---------------------------------------------------------------------
(1 row)
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
INSERT INTO orders(id, custid, product, quantity)
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
FROM generate_series(1,11) d(i);
-- frequent customer
INSERT INTO orders(id, custid, product, quantity)
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
FROM generate_series(1, 57) sk(i);
-- popular product
INSERT INTO orders(id, custid, product, quantity)
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
FROM generate_series(1, 37) sk(i);
-- frequent customer
INSERT INTO orders(id, custid, product, quantity)
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
FROM generate_series(1, 33) sk(i);
ANALYZE orders;
-- pg_stats
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('orders')
ORDER BY 3;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
citus_aggregated_stats | orders | quantity | 0.492754 | {26,23,6,8,11,12,13,17,20,25,30,4,14,15,16,19,24,27,35,36,38,40} | {0.0362319,0.0289855,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
(4 rows)
SELECT create_distributed_table('orders', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$citus_aggregated_stats.orders$$)
create_distributed_table
---------------------------------------------------------------------
(1 row)
ANALYZE orders;
-- citus_stats
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
WHERE tablename IN ('orders')
ORDER BY 3;
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
---------------------------------------------------------------------
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
citus_aggregated_stats | orders | quantity | 0.492754 | {26,13,17,20,23,8,11,12,14,16,19,24,25,27,30,35,38,40,6} | {0.0362319,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
(4 rows)
RESET SESSION AUTHORIZATION;
DROP SCHEMA citus_aggregated_stats CASCADE;
NOTICE: drop cascades to 6 other objects
NOTICE: drop cascades to 8 other objects
DETAIL: drop cascades to table current_check
drop cascades to table dist_current_check
drop cascades to table ref_current_check
drop cascades to table citus_local_current_check_1870003
drop cascades to table ref_current_check_1870002
drop cascades to table citus_local_current_check
drop cascades to table organizations
drop cascades to table orders
DROP USER user1;

View File

@ -167,7 +167,7 @@ SELECT pg_sleep(5);
-- the function returns the new node id
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
SELECT * from pg_dist_node ORDER by nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
@ -234,12 +234,11 @@ SET client_min_messages to 'LOG';
SELECT citus_promote_clone_and_rebalance(:clone_node_id);
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 3), original primary localhost:xxxxx (ID 1)
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 1)
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 1)
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
NOTICE: Clone node localhost:xxxxx (ID 3) has been successfully promoted.

View File

@ -17,9 +17,10 @@ SELECT * from pg_dist_node ORDER by nodeid;
-- this should fail as it is not a valid replica of worker_1
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
DETAIL: The clone must be actively replicating from the specified primary node
HINT: Verify the clone is running and properly configured for replication
SELECT * from pg_dist_node ORDER by nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
---------------------------------------------------------------------
@ -71,9 +72,10 @@ SELECT nodename, nodeport, count(shardid) FROM pg_dist_shard_placement GROUP BY
-- Try to add replica of worker_node2 as a clone of worker_node1
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
DETAIL: The clone must be actively replicating from the specified primary node
HINT: Verify the clone is running and properly configured for replication
-- Test 1: Try to promote a non-existent clone node
SELECT citus_promote_clone_and_rebalance(clone_nodeid =>99999);
ERROR: Clone node with ID 99999 not found.
@ -87,7 +89,7 @@ ERROR: Node localhost:xxxxx (ID 1) is not a valid clone or its primary node ID
-- register the new node as a clone, This should pass
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
SELECT * from pg_dist_node ORDER by nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
@ -108,12 +110,11 @@ SELECT :clone_node_id;
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id, rebalance_strategy => 'invalid_strategy');
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 2)
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 2)
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
NOTICE: Clone node localhost:xxxxx (ID 4) has been successfully promoted.
@ -133,9 +134,10 @@ BEGIN;
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id);
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
DETAIL: The clone must be actively replicating from the specified primary node
HINT: Verify the clone is running and properly configured for replication
ROLLBACK;
-- Verify no data is lost after rooling back the transaction
SELECT COUNT(*) from backup_test;
@ -163,7 +165,7 @@ SELECT * from pg_dist_node ORDER by nodeid;
SELECT master_add_node('localhost', :worker_3_port) AS nodeid_3 \gset
SELECT citus_add_clone_node('localhost', :follower_worker_3_port, 'localhost', :worker_3_port) AS clone_node_id_3 \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
set citus.shard_count = 100;
CREATE TABLE backup_test2(id int, value text);
@ -217,12 +219,11 @@ SET client_min_messages to 'LOG';
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id_3);
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 6), original primary localhost:xxxxx (ID 5)
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 5)
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 5)
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
NOTICE: Clone node localhost:xxxxx (ID 6) has been successfully promoted.

View File

@ -27,15 +27,17 @@ SELECT master_remove_node('localhost', :follower_worker_2_port);
-- this should fail as the replica is a synchronous replica that is not allowed
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
DETAIL: Promoting a synchronous clone can cause data consistency issues
HINT: Configure clone as an asynchronous replica
-- this should fail as the replica is a synchronous replica that is not allowed
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
NOTICE: checking replication for node localhost (resolved IP: ::1)
NOTICE: checking replication status of clone node localhost:xxxxx
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
DETAIL: Promoting a synchronous clone can cause data consistency issues
HINT: Configure clone as an asynchronous replica
SELECT * from pg_dist_node ORDER by nodeid;
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
---------------------------------------------------------------------

View File

@ -1657,7 +1657,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
SHOW citus.version;
citus.version
---------------------------------------------------------------------
13.2devel
13.2.0
(1 row)
-- ensure no unexpected objects were created outside pg_catalog

View File

@ -1253,10 +1253,53 @@ SELECT vkey, pkey FROM t3;
---------------------------------------------------------------------
(0 rows)
-- Redundant WHERE clause with distributed parititioned table
CREATE TABLE a (a int);
INSERT INTO a VALUES (1);
-- populated distributed partitioned table
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
SELECT create_distributed_table('partitioned_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
insert into partitioned_table(a) select i from generate_series(1,160) i;
-- test citus table in init plan
-- with redundant WHERE clause
SELECT CASE WHEN EXISTS (
SELECT * FROM partitioned_table
) THEN 1 ELSE 0 END AS table_non_empty
FROM a
WHERE true;
table_non_empty
---------------------------------------------------------------------
1
(1 row)
-- test citus table in init plan
-- with redundant WHERE clause involving
-- a citus table
SELECT CASE WHEN EXISTS (
SELECT * FROM partitioned_table
) THEN 1 ELSE 0 END AS table_non_empty
FROM a
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
table_non_empty
---------------------------------------------------------------------
1
(1 row)
DROP TABLE local_table;
DROP TABLE t0;
DROP TABLE t1;
DROP TABLE t3;
DROP TABLE t7;
DROP TABLE a;
DROP TABLE partitioned_table CASCADE;
DROP SCHEMA subquery_in_where CASCADE;
SET search_path TO public;

View File

@ -82,6 +82,68 @@ SELECT * FROM citus_stats
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
ORDER BY 1;
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
-- this tests casting numbers like 3.729223e+06 to bigint
CREATE TABLE organizations (
org_id bigint,
id int
);
SELECT create_distributed_table('organizations', 'org_id');
INSERT INTO organizations(org_id, id)
SELECT i, 1
FROM generate_series(1,2000000) i;
ANALYZE organizations;
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
WHERE tablename IN ('organizations')
ORDER BY 1;
-- more real-world scenario:
-- outputs of pg_stats and citus_stats are NOT the same
-- but citus_stats does a fair estimation job
SELECT setseed(0.42);
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
INSERT INTO orders(id, custid, product, quantity)
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
FROM generate_series(1,11) d(i);
-- frequent customer
INSERT INTO orders(id, custid, product, quantity)
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
FROM generate_series(1, 57) sk(i);
-- popular product
INSERT INTO orders(id, custid, product, quantity)
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
FROM generate_series(1, 37) sk(i);
-- frequent customer
INSERT INTO orders(id, custid, product, quantity)
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
FROM generate_series(1, 33) sk(i);
ANALYZE orders;
-- pg_stats
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
WHERE tablename IN ('orders')
ORDER BY 3;
SELECT create_distributed_table('orders', 'id');
ANALYZE orders;
-- citus_stats
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
WHERE tablename IN ('orders')
ORDER BY 3;
RESET SESSION AUTHORIZATION;
DROP SCHEMA citus_aggregated_stats CASCADE;
DROP USER user1;

View File

@ -929,10 +929,42 @@ where TRUE or (((t3.vkey) >= (select
-- Distributed table t3 is now empty
SELECT vkey, pkey FROM t3;
-- Redundant WHERE clause with distributed parititioned table
CREATE TABLE a (a int);
INSERT INTO a VALUES (1);
-- populated distributed partitioned table
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
SELECT create_distributed_table('partitioned_table', 'a');
insert into partitioned_table(a) select i from generate_series(1,160) i;
-- test citus table in init plan
-- with redundant WHERE clause
SELECT CASE WHEN EXISTS (
SELECT * FROM partitioned_table
) THEN 1 ELSE 0 END AS table_non_empty
FROM a
WHERE true;
-- test citus table in init plan
-- with redundant WHERE clause involving
-- a citus table
SELECT CASE WHEN EXISTS (
SELECT * FROM partitioned_table
) THEN 1 ELSE 0 END AS table_non_empty
FROM a
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
DROP TABLE local_table;
DROP TABLE t0;
DROP TABLE t1;
DROP TABLE t3;
DROP TABLE t7;
DROP TABLE a;
DROP TABLE partitioned_table CASCADE;
DROP SCHEMA subquery_in_where CASCADE;
SET search_path TO public;