mirror of https://github.com/citusdata/citus.git
Compare commits
10 Commits
| Author | SHA1 | Date |
|---|---|---|
|
|
e8e06d8d0c | |
|
|
f79dd61a92 | |
|
|
274504465d | |
|
|
c39df81f69 | |
|
|
91cae1fb29 | |
|
|
0b9acbeb3d | |
|
|
754e2986ba | |
|
|
2ba22104eb | |
|
|
b03b249f6d | |
|
|
a7d529813b |
|
|
@ -73,7 +73,7 @@ USER citus
|
|||
|
||||
# build postgres versions separately for effective parrallelism and caching of already built versions when changing only certain versions
|
||||
FROM base AS pg15
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.13
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 15.14
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -85,7 +85,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
|||
RUN rm .pgenv-staging/config/default.conf
|
||||
|
||||
FROM base AS pg16
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.9
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 16.10
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -97,7 +97,7 @@ RUN cp -r .pgenv/src .pgenv/pgsql-* .pgenv/config .pgenv-staging/
|
|||
RUN rm .pgenv-staging/config/default.conf
|
||||
|
||||
FROM base AS pg17
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.5
|
||||
RUN MAKEFLAGS="-j $(nproc)" pgenv build 17.6
|
||||
RUN rm .pgenv/src/*.tar*
|
||||
RUN make -C .pgenv/src/postgresql-*/ clean
|
||||
RUN make -C .pgenv/src/postgresql-*/src/include install
|
||||
|
|
@ -216,7 +216,7 @@ COPY --chown=citus:citus .psqlrc .
|
|||
RUN sudo chown --from=root:root citus:citus -R ~
|
||||
|
||||
# sets default pg version
|
||||
RUN pgenv switch 17.5
|
||||
RUN pgenv switch 17.6
|
||||
|
||||
# make connecting to the coordinator easy
|
||||
ENV PGPORT=9700
|
||||
|
|
|
|||
|
|
@ -31,12 +31,12 @@ jobs:
|
|||
pgupgrade_image_name: "ghcr.io/citusdata/pgupgradetester"
|
||||
style_checker_image_name: "ghcr.io/citusdata/stylechecker"
|
||||
style_checker_tools_version: "0.8.18"
|
||||
sql_snapshot_pg_version: "17.5"
|
||||
image_suffix: "-vb17c33b"
|
||||
pg15_version: '{ "major": "15", "full": "15.13" }'
|
||||
pg16_version: '{ "major": "16", "full": "16.9" }'
|
||||
pg17_version: '{ "major": "17", "full": "17.5" }'
|
||||
upgrade_pg_versions: "15.13-16.9-17.5"
|
||||
sql_snapshot_pg_version: "17.6"
|
||||
image_suffix: "-v4df94a0"
|
||||
pg15_version: '{ "major": "15", "full": "15.14" }'
|
||||
pg16_version: '{ "major": "16", "full": "16.10" }'
|
||||
pg17_version: '{ "major": "17", "full": "17.6" }'
|
||||
upgrade_pg_versions: "15.14-16.10-17.6"
|
||||
steps:
|
||||
# Since GHA jobs need at least one step we use a noop step here.
|
||||
- name: Set up parameters
|
||||
|
|
|
|||
49
CHANGELOG.md
49
CHANGELOG.md
|
|
@ -1,3 +1,52 @@
|
|||
### citus v13.2.0 (August 18, 2025) ###
|
||||
|
||||
* Adds `citus_add_clone_node()`, `citus_add_clone_node_with_nodeid()`,
|
||||
`citus_remove_clone_node()` and `citus_remove_clone_node_with_nodeid()`
|
||||
UDFs to support snapshot-based node splits. This feature allows promoting
|
||||
a streaming replica (clone) to a primary node and rebalancing shards
|
||||
between the original and newly promoted node without requiring a full data
|
||||
copy. This greatly reduces rebalance times for scale-out operations when
|
||||
the new node already has the data via streaming replication (#8122)
|
||||
|
||||
* Improves performance of shard rebalancer by parallelizing moves and removing
|
||||
bottlenecks that blocked concurrent logical-replication transfers. This
|
||||
reduces rebalance windows especially for clusters with large reference
|
||||
tables and allows multiple shard transfers to run in parallel (#7983)
|
||||
|
||||
* Adds citus.enable_recurring_outer_join_pushdown GUC (enabled by default)
|
||||
to allow pushing down LEFT/RIGHT outer joins having a reference table in
|
||||
the outer side and a distributed table on the inner side (e.g.,
|
||||
\<reference table\> LEFT JOIN \<distributed table\>) (#7973)
|
||||
|
||||
* Adds citus.enable_local_fast_path_query_optimization (enabled by default)
|
||||
GUC to avoid unnecessary query deparsing to improve performance of
|
||||
fast-path queries targeting local shards (#8035)
|
||||
|
||||
* Adds `citus_stats()` UDF that can be used to retrieve distributed `pg_stats`
|
||||
for the provided Citus table. (#8026)
|
||||
|
||||
* Avoids automatically creating citus_columnar when there are no relations
|
||||
using it (#8081)
|
||||
|
||||
* Makes sure to check if the distribution key is in the target list before
|
||||
pushing down a query with a union and an outer join (#8092)
|
||||
|
||||
* Fixes a bug in EXPLAIN ANALYZE to prevent unintended (duplicate) execution
|
||||
of the (sub)plans during the explain phase (#8017)
|
||||
|
||||
* Fixes potential memory corruptions that could happen when accessing
|
||||
various catalog tables after a Citus downgrade is followed by a Citus
|
||||
upgrade (#7950, #8120, #8124, #8121, #8114, #8146)
|
||||
|
||||
* Fixes UPDATE statements with indirection and array/jsonb subscripting with
|
||||
more than one field (#7675)
|
||||
|
||||
* Fixes an assertion failure that happens when an expression in the query
|
||||
references a CTE (#8106)
|
||||
|
||||
* Fixes an assertion failure that happens when querying a view that is
|
||||
defined on distributed tables (#8136)
|
||||
|
||||
### citus v13.1.0 (May 30th, 2025) ###
|
||||
|
||||
* Adds `citus_stat_counters` view that can be used to query
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
#! /bin/sh
|
||||
# Guess values for system-dependent variables and create Makefiles.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 13.2devel.
|
||||
# Generated by GNU Autoconf 2.69 for Citus 13.2.0.
|
||||
#
|
||||
#
|
||||
# Copyright (C) 1992-1996, 1998-2012 Free Software Foundation, Inc.
|
||||
|
|
@ -579,8 +579,8 @@ MAKEFLAGS=
|
|||
# Identity of this package.
|
||||
PACKAGE_NAME='Citus'
|
||||
PACKAGE_TARNAME='citus'
|
||||
PACKAGE_VERSION='13.2devel'
|
||||
PACKAGE_STRING='Citus 13.2devel'
|
||||
PACKAGE_VERSION='13.2.0'
|
||||
PACKAGE_STRING='Citus 13.2.0'
|
||||
PACKAGE_BUGREPORT=''
|
||||
PACKAGE_URL=''
|
||||
|
||||
|
|
@ -1262,7 +1262,7 @@ if test "$ac_init_help" = "long"; then
|
|||
# Omit some internal or obsolete options to make the list less imposing.
|
||||
# This message is too long to be a string in the A/UX 3.1 sh.
|
||||
cat <<_ACEOF
|
||||
\`configure' configures Citus 13.2devel to adapt to many kinds of systems.
|
||||
\`configure' configures Citus 13.2.0 to adapt to many kinds of systems.
|
||||
|
||||
Usage: $0 [OPTION]... [VAR=VALUE]...
|
||||
|
||||
|
|
@ -1324,7 +1324,7 @@ fi
|
|||
|
||||
if test -n "$ac_init_help"; then
|
||||
case $ac_init_help in
|
||||
short | recursive ) echo "Configuration of Citus 13.2devel:";;
|
||||
short | recursive ) echo "Configuration of Citus 13.2.0:";;
|
||||
esac
|
||||
cat <<\_ACEOF
|
||||
|
||||
|
|
@ -1429,7 +1429,7 @@ fi
|
|||
test -n "$ac_init_help" && exit $ac_status
|
||||
if $ac_init_version; then
|
||||
cat <<\_ACEOF
|
||||
Citus configure 13.2devel
|
||||
Citus configure 13.2.0
|
||||
generated by GNU Autoconf 2.69
|
||||
|
||||
Copyright (C) 2012 Free Software Foundation, Inc.
|
||||
|
|
@ -1912,7 +1912,7 @@ cat >config.log <<_ACEOF
|
|||
This file contains any messages produced by compilers while
|
||||
running configure, to aid debugging if configure makes a mistake.
|
||||
|
||||
It was created by Citus $as_me 13.2devel, which was
|
||||
It was created by Citus $as_me 13.2.0, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
$ $0 $@
|
||||
|
|
@ -5393,7 +5393,7 @@ cat >>$CONFIG_STATUS <<\_ACEOF || ac_write_fail=1
|
|||
# report actual input values of CONFIG_FILES etc. instead of their
|
||||
# values after options handling.
|
||||
ac_log="
|
||||
This file was extended by Citus $as_me 13.2devel, which was
|
||||
This file was extended by Citus $as_me 13.2.0, which was
|
||||
generated by GNU Autoconf 2.69. Invocation command line was
|
||||
|
||||
CONFIG_FILES = $CONFIG_FILES
|
||||
|
|
@ -5455,7 +5455,7 @@ _ACEOF
|
|||
cat >>$CONFIG_STATUS <<_ACEOF || ac_write_fail=1
|
||||
ac_cs_config="`$as_echo "$ac_configure_args" | sed 's/^ //; s/[\\""\`\$]/\\\\&/g'`"
|
||||
ac_cs_version="\\
|
||||
Citus config.status 13.2devel
|
||||
Citus config.status 13.2.0
|
||||
configured by $0, generated by GNU Autoconf 2.69,
|
||||
with options \\"\$ac_cs_config\\"
|
||||
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@
|
|||
# everyone needing autoconf installed, the resulting files are checked
|
||||
# into the SCM.
|
||||
|
||||
AC_INIT([Citus], [13.2devel])
|
||||
AC_INIT([Citus], [13.2.0])
|
||||
AC_COPYRIGHT([Copyright (c) Citus Data, Inc.])
|
||||
|
||||
# we'll need sed and awk for some of the version commands
|
||||
|
|
|
|||
|
|
@ -659,8 +659,9 @@ SaveStripeSkipList(RelFileLocator relfilelocator, uint64 stripe,
|
|||
nulls[Anum_columnar_chunk_minimum_value - 1] = true;
|
||||
nulls[Anum_columnar_chunk_maximum_value - 1] = true;
|
||||
}
|
||||
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
InsertTupleAndEnforceConstraints(modifyState, values, nulls);
|
||||
PopActiveSnapshot();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -2132,7 +2133,7 @@ GetHighestUsedRowNumber(uint64 storageId)
|
|||
static int
|
||||
GetFirstRowNumberAttrIndexInColumnarStripe(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_columnar_stripe
|
||||
return tupleDesc->natts == Natts_columnar_stripe
|
||||
? (Anum_columnar_stripe_first_row_number - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1837,6 +1837,62 @@ ensure_update_targetlist_in_param_order(List *targetList)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* isSubsRef checks if a given node is a SubscriptingRef or can be
|
||||
* reached through an implicit coercion.
|
||||
*/
|
||||
static
|
||||
bool
|
||||
isSubsRef(Node *node)
|
||||
{
|
||||
if (node == NULL)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
|
||||
if (IsA(node, CoerceToDomain))
|
||||
{
|
||||
CoerceToDomain *coerceToDomain = (CoerceToDomain *) node;
|
||||
if (coerceToDomain->coercionformat != COERCE_IMPLICIT_CAST)
|
||||
{
|
||||
/* not an implicit coercion, cannot reach to a SubscriptingRef */
|
||||
return false;
|
||||
}
|
||||
|
||||
node = (Node *) coerceToDomain->arg;
|
||||
}
|
||||
|
||||
return (IsA(node, SubscriptingRef));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* checkTlistForSubsRef - checks if any target entry in the list contains a
|
||||
* SubscriptingRef or can be reached through an implicit coercion. Used by
|
||||
* ExpandMergedSubscriptingRefEntries() to identify if any target entries
|
||||
* need to be expanded - if not the original target list is preserved.
|
||||
*/
|
||||
static
|
||||
bool
|
||||
checkTlistForSubsRef(List *targetEntryList)
|
||||
{
|
||||
ListCell *tgtCell = NULL;
|
||||
|
||||
foreach(tgtCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
|
||||
Expr *expr = targetEntry->expr;
|
||||
|
||||
if (isSubsRef((Node *) expr))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ExpandMergedSubscriptingRefEntries takes a list of target entries and expands
|
||||
* each one that references a SubscriptingRef node that indicates multiple (field)
|
||||
|
|
@ -1848,6 +1904,12 @@ ExpandMergedSubscriptingRefEntries(List *targetEntryList)
|
|||
List *newTargetEntryList = NIL;
|
||||
ListCell *tgtCell = NULL;
|
||||
|
||||
if (!checkTlistForSubsRef(targetEntryList))
|
||||
{
|
||||
/* No subscripting refs found, return original list */
|
||||
return targetEntryList;
|
||||
}
|
||||
|
||||
foreach(tgtCell, targetEntryList)
|
||||
{
|
||||
TargetEntry *targetEntry = (TargetEntry *) lfirst(tgtCell);
|
||||
|
|
|
|||
|
|
@ -801,7 +801,7 @@ DistributedSequenceList(void)
|
|||
int
|
||||
GetForceDelegationAttrIndexInPgDistObject(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_object
|
||||
return tupleDesc->natts == Natts_pg_dist_object
|
||||
? (Anum_pg_dist_object_force_delegation - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4486,7 +4486,7 @@ UnblockDependingBackgroundTasks(BackgroundTask *task)
|
|||
int
|
||||
GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_partition
|
||||
return tupleDesc->natts == Natts_pg_dist_partition
|
||||
? (Anum_pg_dist_partition_autoconverted - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
@ -4506,7 +4506,7 @@ GetAutoConvertedAttrIndexInPgDistPartition(TupleDesc tupleDesc)
|
|||
int
|
||||
GetNodesInvolvedAttrIndexInPgDistBackgroundTask(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_background_task
|
||||
return tupleDesc->natts == Natts_pg_dist_background_task
|
||||
? (Anum_pg_dist_background_task_nodes_involved - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -115,6 +115,8 @@ static bool NodeIsLocal(WorkerNode *worker);
|
|||
static void SetLockTimeoutLocally(int32 lock_cooldown);
|
||||
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort,
|
||||
bool localOnly);
|
||||
static int GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc);
|
||||
static int GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc);
|
||||
static bool UnsetMetadataSyncedForAllWorkers(void);
|
||||
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
|
||||
int columnIndex,
|
||||
|
|
@ -1196,6 +1198,11 @@ ActivateNodeList(MetadataSyncContext *context)
|
|||
void
|
||||
ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
|
||||
{
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
TupleDesc copiedTupleDescriptor = CreateTupleDescCopy(tupleDescriptor);
|
||||
table_close(pgDistNode, AccessShareLock);
|
||||
|
||||
/*
|
||||
* Set the node as primary and active.
|
||||
*/
|
||||
|
|
@ -1203,9 +1210,13 @@ ActivateCloneNodeAsPrimary(WorkerNode *workerNode)
|
|||
ObjectIdGetDatum(PrimaryNodeRoleId()));
|
||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_isactive,
|
||||
BoolGetDatum(true));
|
||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeisclone,
|
||||
SetWorkerColumnLocalOnly(workerNode,
|
||||
GetNodeIsCloneAttrIndexInPgDistNode(copiedTupleDescriptor) +
|
||||
1,
|
||||
BoolGetDatum(false));
|
||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_nodeprimarynodeid,
|
||||
SetWorkerColumnLocalOnly(workerNode,
|
||||
GetNodePrimaryNodeIdAttrIndexInPgDistNode(
|
||||
copiedTupleDescriptor) + 1,
|
||||
Int32GetDatum(0));
|
||||
SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_hasmetadata,
|
||||
BoolGetDatum(true));
|
||||
|
|
@ -1779,14 +1790,14 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
|||
{
|
||||
const bool indexOK = true;
|
||||
|
||||
ScanKeyData scanKey[1];
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
|
||||
ScanKeyData scanKey[1];
|
||||
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodeid,
|
||||
BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(nodeId));
|
||||
|
||||
|
|
@ -1801,8 +1812,6 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
|||
newNodeName, newNodePort)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
values[Anum_pg_dist_node_nodeport - 1] = Int32GetDatum(newNodePort);
|
||||
isnull[Anum_pg_dist_node_nodeport - 1] = false;
|
||||
replace[Anum_pg_dist_node_nodeport - 1] = true;
|
||||
|
|
@ -1835,6 +1844,10 @@ UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort, bool loca
|
|||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistNode, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -2105,11 +2118,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
|
|||
Relation pgDistNode = table_open(DistNodeRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
values[Anum_pg_dist_node_metadatasynced - 1] = DatumGetBool(false);
|
||||
isnull[Anum_pg_dist_node_metadatasynced - 1] = false;
|
||||
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
||||
|
|
@ -2123,6 +2135,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
|
|||
|
||||
table_close(pgDistNode, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
|
@ -2831,9 +2847,9 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
HeapTuple heapTuple = GetNodeTuple(workerNode->workerName, workerNode->workerPort);
|
||||
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
if (heapTuple == NULL)
|
||||
{
|
||||
|
|
@ -2841,7 +2857,6 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
workerNode->workerName, workerNode->workerPort)));
|
||||
}
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
values[columnIndex - 1] = value;
|
||||
isnull[columnIndex - 1] = false;
|
||||
replace[columnIndex - 1] = true;
|
||||
|
|
@ -2857,6 +2872,10 @@ SetWorkerColumnLocalOnly(WorkerNode *workerNode, int columnIndex, Datum value)
|
|||
|
||||
table_close(pgDistNode, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
|
||||
return newWorkerNode;
|
||||
}
|
||||
|
||||
|
|
@ -3241,16 +3260,15 @@ InsertPlaceholderCoordinatorRecord(void)
|
|||
static void
|
||||
InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMetadata)
|
||||
{
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isNulls[Natts_pg_dist_node];
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
|
||||
Datum *values = palloc0(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isNulls = palloc0(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
Datum nodeClusterStringDatum = CStringGetDatum(nodeMetadata->nodeCluster);
|
||||
Datum nodeClusterNameDatum = DirectFunctionCall1(namein, nodeClusterStringDatum);
|
||||
|
||||
/* form new shard tuple */
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(isNulls, false, sizeof(isNulls));
|
||||
|
||||
values[Anum_pg_dist_node_nodeid - 1] = UInt32GetDatum(nodeid);
|
||||
values[Anum_pg_dist_node_groupid - 1] = Int32GetDatum(nodeMetadata->groupId);
|
||||
values[Anum_pg_dist_node_nodename - 1] = CStringGetTextDatum(nodeName);
|
||||
|
|
@ -3264,17 +3282,15 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
|||
values[Anum_pg_dist_node_nodecluster - 1] = nodeClusterNameDatum;
|
||||
values[Anum_pg_dist_node_shouldhaveshards - 1] = BoolGetDatum(
|
||||
nodeMetadata->shouldHaveShards);
|
||||
values[Anum_pg_dist_node_nodeisclone - 1] = BoolGetDatum(
|
||||
nodeMetadata->nodeisclone);
|
||||
values[Anum_pg_dist_node_nodeprimarynodeid - 1] = Int32GetDatum(
|
||||
nodeMetadata->nodeprimarynodeid);
|
||||
|
||||
Relation pgDistNode = table_open(DistNodeRelationId(), RowExclusiveLock);
|
||||
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistNode);
|
||||
values[GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor)] =
|
||||
BoolGetDatum(nodeMetadata->nodeisclone);
|
||||
values[GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor)] =
|
||||
Int32GetDatum(nodeMetadata->nodeprimarynodeid);
|
||||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||
|
||||
CATALOG_INSERT_WITH_SNAPSHOT(pgDistNode, heapTuple);
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
CatalogTupleInsert(pgDistNode, heapTuple);
|
||||
PopActiveSnapshot();
|
||||
|
||||
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
|
||||
|
||||
|
|
@ -3283,6 +3299,9 @@ InsertNodeRow(int nodeid, char *nodeName, int32 nodePort, NodeMetadata *nodeMeta
|
|||
|
||||
/* close relation */
|
||||
table_close(pgDistNode, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isNulls);
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -3397,43 +3416,30 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
|
|||
1]);
|
||||
|
||||
/*
|
||||
* Attributes above this line are guaranteed to be present at the
|
||||
* exact defined attribute number. Atleast till now. If you are droping or
|
||||
* adding any of the above columns consider adjusting the code above
|
||||
* nodecluster, nodeisclone and nodeprimarynodeid columns can be missing. In case
|
||||
* of extension creation/upgrade, master_initialize_node_metadata function is
|
||||
* called before the nodecluster column is added to pg_dist_node table.
|
||||
*/
|
||||
Oid pgDistNodeRelId = RelationGetRelid(pgDistNode);
|
||||
|
||||
AttrNumber nodeClusterAttno = get_attnum(pgDistNodeRelId, "nodecluster");
|
||||
|
||||
if (nodeClusterAttno > 0 &&
|
||||
!TupleDescAttr(tupleDescriptor, nodeClusterAttno - 1)->attisdropped &&
|
||||
!isNullArray[nodeClusterAttno - 1])
|
||||
if (!isNullArray[Anum_pg_dist_node_nodecluster - 1])
|
||||
{
|
||||
Name nodeClusterName =
|
||||
DatumGetName(datumArray[nodeClusterAttno - 1]);
|
||||
DatumGetName(datumArray[Anum_pg_dist_node_nodecluster - 1]);
|
||||
char *nodeClusterString = NameStr(*nodeClusterName);
|
||||
strlcpy(workerNode->nodeCluster, nodeClusterString, NAMEDATALEN);
|
||||
}
|
||||
|
||||
if (nAtts > Anum_pg_dist_node_nodeisclone)
|
||||
int nodeIsCloneIdx = GetNodeIsCloneAttrIndexInPgDistNode(tupleDescriptor);
|
||||
int nodePrimaryNodeIdIdx = GetNodePrimaryNodeIdAttrIndexInPgDistNode(tupleDescriptor);
|
||||
|
||||
if (!isNullArray[nodeIsCloneIdx])
|
||||
{
|
||||
AttrNumber nodeIsCloneAttno = get_attnum(pgDistNodeRelId, "nodeisclone");
|
||||
if (nodeIsCloneAttno > 0 &&
|
||||
!TupleDescAttr(tupleDescriptor, nodeIsCloneAttno - 1)->attisdropped &&
|
||||
!isNullArray[nodeIsCloneAttno - 1])
|
||||
{
|
||||
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneAttno - 1]);
|
||||
}
|
||||
AttrNumber nodePrimaryNodeIdAttno = get_attnum(pgDistNodeRelId,
|
||||
"nodeprimarynodeid");
|
||||
if (nodePrimaryNodeIdAttno > 0 &&
|
||||
!TupleDescAttr(tupleDescriptor, nodePrimaryNodeIdAttno - 1)->attisdropped &&
|
||||
!isNullArray[nodePrimaryNodeIdAttno - 1])
|
||||
{
|
||||
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[
|
||||
nodePrimaryNodeIdAttno - 1])
|
||||
;
|
||||
}
|
||||
workerNode->nodeisclone = DatumGetBool(datumArray[nodeIsCloneIdx]);
|
||||
}
|
||||
|
||||
if (!isNullArray[nodePrimaryNodeIdIdx])
|
||||
{
|
||||
workerNode->nodeprimarynodeid = DatumGetInt32(datumArray[nodePrimaryNodeIdIdx]);
|
||||
}
|
||||
|
||||
pfree(datumArray);
|
||||
|
|
@ -3443,6 +3449,48 @@ TupleToWorkerNode(Relation pgDistNode, TupleDesc tupleDescriptor, HeapTuple heap
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNodePrimaryNodeIdAttrIndexInPgDistNode returns attrnum for nodeprimarynodeid attr.
|
||||
*
|
||||
* nodeprimarynodeid attr was added to table pg_dist_node using alter operation
|
||||
* after the version where Citus started supporting downgrades, and it's one of
|
||||
* the two columns that we've introduced to pg_dist_node since then.
|
||||
*
|
||||
* And in case of a downgrade + upgrade, tupleDesc->natts becomes greater than
|
||||
* Natts_pg_dist_node and when this happens, then we know that attrnum
|
||||
* nodeprimarynodeid is not Anum_pg_dist_node_nodeprimarynodeid anymore but
|
||||
* tupleDesc->natts - 1.
|
||||
*/
|
||||
static int
|
||||
GetNodePrimaryNodeIdAttrIndexInPgDistNode(TupleDesc tupleDesc)
|
||||
{
|
||||
return tupleDesc->natts == Natts_pg_dist_node
|
||||
? (Anum_pg_dist_node_nodeprimarynodeid - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNodeIsCloneAttrIndexInPgDistNode returns attrnum for nodeisclone attr.
|
||||
*
|
||||
* Like, GetNodePrimaryNodeIdAttrIndexInPgDistNode(), performs a similar
|
||||
* calculation for nodeisclone attribute because this is column added to
|
||||
* pg_dist_node after we started supporting downgrades.
|
||||
*
|
||||
* Only difference with the mentioned function is that we know
|
||||
* the attrnum for nodeisclone is not Anum_pg_dist_node_nodeisclone anymore
|
||||
* but tupleDesc->natts - 2 because we added these columns consecutively
|
||||
* and we first add nodeisclone attribute and then nodeprimarynodeid attribute.
|
||||
*/
|
||||
static int
|
||||
GetNodeIsCloneAttrIndexInPgDistNode(TupleDesc tupleDesc)
|
||||
{
|
||||
return tupleDesc->natts == Natts_pg_dist_node
|
||||
? (Anum_pg_dist_node_nodeisclone - 1)
|
||||
: tupleDesc->natts - 2;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* StringToDatum transforms a string representation into a Datum.
|
||||
*/
|
||||
|
|
@ -3519,15 +3567,15 @@ UnsetMetadataSyncedForAllWorkers(void)
|
|||
updatedAtLeastOne = true;
|
||||
}
|
||||
|
||||
Datum *values = palloc(tupleDescriptor->natts * sizeof(Datum));
|
||||
bool *isnull = palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
bool *replace = palloc(tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
Datum values[Natts_pg_dist_node];
|
||||
bool isnull[Natts_pg_dist_node];
|
||||
bool replace[Natts_pg_dist_node];
|
||||
|
||||
memset(replace, false, sizeof(replace));
|
||||
memset(isnull, false, sizeof(isnull));
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(values, 0, tupleDescriptor->natts * sizeof(Datum));
|
||||
memset(isnull, 0, tupleDescriptor->natts * sizeof(bool));
|
||||
memset(replace, 0, tupleDescriptor->natts * sizeof(bool));
|
||||
|
||||
values[Anum_pg_dist_node_metadatasynced - 1] = BoolGetDatum(false);
|
||||
replace[Anum_pg_dist_node_metadatasynced - 1] = true;
|
||||
|
|
@ -3550,6 +3598,10 @@ UnsetMetadataSyncedForAllWorkers(void)
|
|||
CatalogCloseIndexes(indstate);
|
||||
table_close(relation, NoLock);
|
||||
|
||||
pfree(values);
|
||||
pfree(isnull);
|
||||
pfree(replace);
|
||||
|
||||
return updatedAtLeastOne;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -141,10 +141,9 @@ static RouterPlanType GetRouterPlanType(Query *query,
|
|||
bool hasUnresolvedParams);
|
||||
static void ConcatenateRTablesAndPerminfos(PlannedStmt *mainPlan,
|
||||
PlannedStmt *concatPlan);
|
||||
static bool CheckPostPlanDistribution(bool isDistributedQuery,
|
||||
Query *origQuery,
|
||||
List *rangeTableList,
|
||||
Query *plannedQuery);
|
||||
static bool CheckPostPlanDistribution(DistributedPlanningContext *planContext,
|
||||
bool isDistributedQuery,
|
||||
List *rangeTableList);
|
||||
|
||||
/* Distributed planner hook */
|
||||
PlannedStmt *
|
||||
|
|
@ -265,10 +264,9 @@ distributed_planner(Query *parse,
|
|||
planContext.plan = standard_planner(planContext.query, NULL,
|
||||
planContext.cursorOptions,
|
||||
planContext.boundParams);
|
||||
needsDistributedPlanning = CheckPostPlanDistribution(needsDistributedPlanning,
|
||||
planContext.originalQuery,
|
||||
rangeTableList,
|
||||
planContext.query);
|
||||
needsDistributedPlanning = CheckPostPlanDistribution(&planContext,
|
||||
needsDistributedPlanning,
|
||||
rangeTableList);
|
||||
|
||||
if (needsDistributedPlanning)
|
||||
{
|
||||
|
|
@ -2740,12 +2738,13 @@ WarnIfListHasForeignDistributedTable(List *rangeTableList)
|
|||
|
||||
|
||||
static bool
|
||||
CheckPostPlanDistribution(bool isDistributedQuery,
|
||||
Query *origQuery, List *rangeTableList,
|
||||
Query *plannedQuery)
|
||||
CheckPostPlanDistribution(DistributedPlanningContext *planContext, bool
|
||||
isDistributedQuery, List *rangeTableList)
|
||||
{
|
||||
if (isDistributedQuery)
|
||||
{
|
||||
Query *origQuery = planContext->originalQuery;
|
||||
Query *plannedQuery = planContext->query;
|
||||
Node *origQuals = origQuery->jointree->quals;
|
||||
Node *plannedQuals = plannedQuery->jointree->quals;
|
||||
|
||||
|
|
@ -2764,6 +2763,23 @@ CheckPostPlanDistribution(bool isDistributedQuery,
|
|||
*/
|
||||
if (origQuals != NULL && plannedQuals == NULL)
|
||||
{
|
||||
bool planHasDistTable = ListContainsDistributedTableRTE(
|
||||
planContext->plan->rtable, NULL);
|
||||
|
||||
/*
|
||||
* If the Postgres plan has a distributed table, we know for sure that
|
||||
* the query requires distributed planning.
|
||||
*/
|
||||
if (planHasDistTable)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
|
||||
/*
|
||||
* Otherwise, if the query has less range table entries after Postgres,
|
||||
* planning, we should re-evaluate the distribution of the query. Postgres
|
||||
* may have optimized away all citus tables, per issues 7782, 7783.
|
||||
*/
|
||||
List *rtesPostPlan = ExtractRangeTableEntryList(plannedQuery);
|
||||
if (list_length(rtesPostPlan) < list_length(rangeTableList))
|
||||
{
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ most_common_vals_json AS (
|
|||
|
||||
table_reltuples_json AS (
|
||||
SELECT distinct(shardid),
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
||||
FROM most_common_vals_json),
|
||||
|
||||
|
|
@ -32,8 +32,8 @@ table_reltuples AS (
|
|||
|
||||
null_frac_json AS (
|
||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
||||
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
|
||||
(json_array_elements(result::json)->>'attname')::text AS attname
|
||||
FROM most_common_vals_json
|
||||
),
|
||||
|
|
@ -49,8 +49,8 @@ most_common_vals AS (
|
|||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||
(json_array_elements(result::json)->>'attname')::text AS attname,
|
||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
|
||||
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
|
||||
FROM most_common_vals_json),
|
||||
|
||||
common_val_occurrence AS (
|
||||
|
|
@ -58,7 +58,7 @@ common_val_occurrence AS (
|
|||
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
||||
FROM most_common_vals m
|
||||
GROUP BY citus_table, m.attname, common_val
|
||||
ORDER BY 1, 2, occurrence DESC)
|
||||
ORDER BY 1, 2, occurrence DESC, 3)
|
||||
|
||||
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
||||
|
||||
|
|
|
|||
|
|
@ -22,7 +22,7 @@ most_common_vals_json AS (
|
|||
|
||||
table_reltuples_json AS (
|
||||
SELECT distinct(shardid),
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||
(json_array_elements(result::json)->>'citus_table')::regclass AS citus_table
|
||||
FROM most_common_vals_json),
|
||||
|
||||
|
|
@ -32,8 +32,8 @@ table_reltuples AS (
|
|||
|
||||
null_frac_json AS (
|
||||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples,
|
||||
(json_array_elements(result::json)->>'null_frac')::float4 AS null_frac,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples,
|
||||
CAST((json_array_elements(result::json)->>'null_frac') AS float4) AS null_frac,
|
||||
(json_array_elements(result::json)->>'attname')::text AS attname
|
||||
FROM most_common_vals_json
|
||||
),
|
||||
|
|
@ -49,8 +49,8 @@ most_common_vals AS (
|
|||
SELECT (json_array_elements(result::json)->>'citus_table')::regclass AS citus_table,
|
||||
(json_array_elements(result::json)->>'attname')::text AS attname,
|
||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_vals')::json)::text AS common_val,
|
||||
json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json)::float4 AS common_freq,
|
||||
(json_array_elements(result::json)->>'reltuples')::bigint AS shard_reltuples
|
||||
CAST(json_array_elements_text((json_array_elements(result::json)->>'most_common_freqs')::json) AS float4) AS common_freq,
|
||||
CAST( CAST((json_array_elements(result::json)->>'reltuples') AS DOUBLE PRECISION) AS bigint) AS shard_reltuples
|
||||
FROM most_common_vals_json),
|
||||
|
||||
common_val_occurrence AS (
|
||||
|
|
@ -58,7 +58,7 @@ common_val_occurrence AS (
|
|||
sum(common_freq * shard_reltuples)::bigint AS occurrence
|
||||
FROM most_common_vals m
|
||||
GROUP BY citus_table, m.attname, common_val
|
||||
ORDER BY 1, 2, occurrence DESC)
|
||||
ORDER BY 1, 2, occurrence DESC, 3)
|
||||
|
||||
SELECT nsp.nspname AS schemaname, p.relname AS tablename, c.attname,
|
||||
|
||||
|
|
|
|||
|
|
@ -106,7 +106,9 @@ LogTransactionRecord(int32 groupId, char *transactionName, FullTransactionId out
|
|||
HeapTuple heapTuple = heap_form_tuple(tupleDescriptor, values, isNulls);
|
||||
|
||||
/* insert new tuple */
|
||||
CATALOG_INSERT_WITH_SNAPSHOT(pgDistTransaction, heapTuple);
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
CatalogTupleInsert(pgDistTransaction, heapTuple);
|
||||
PopActiveSnapshot();
|
||||
|
||||
CommandCounterIncrement();
|
||||
|
||||
|
|
@ -685,7 +687,7 @@ DeleteWorkerTransactions(WorkerNode *workerNode)
|
|||
int
|
||||
GetOuterXidAttrIndexInPgDistTransaction(TupleDesc tupleDesc)
|
||||
{
|
||||
return TupleDescSize(tupleDesc) == Natts_pg_dist_transaction
|
||||
return tupleDesc->natts == Natts_pg_dist_transaction
|
||||
? (Anum_pg_dist_transaction_outerxid - 1)
|
||||
: tupleDesc->natts - 1;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -140,10 +140,10 @@ GetReplicationLag(WorkerNode *primaryWorkerNode, WorkerNode *replicaWorkerNode)
|
|||
ForgetResults(replicaConnection);
|
||||
CloseConnection(replicaConnection);
|
||||
|
||||
ereport(DEBUG1, (errmsg(
|
||||
ereport(DEBUG2, (errmsg(
|
||||
"successfully measured replication lag: primary LSN %s, clone LSN %s",
|
||||
primary_lsn_str, replica_lsn_str)));
|
||||
ereport(NOTICE, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
|
||||
ereport(DEBUG1, (errmsg("replication lag between %s:%d and %s:%d is %ld bytes",
|
||||
primaryWorkerNode->workerName, primaryWorkerNode->workerPort,
|
||||
replicaWorkerNode->workerName, replicaWorkerNode->workerPort,
|
||||
lag_bytes)));
|
||||
|
|
@ -244,9 +244,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
|||
freeaddrinfo(result);
|
||||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("checking replication for node %s (resolved IP: %s)",
|
||||
ereport(NOTICE, (errmsg("checking replication status of clone node %s:%d",
|
||||
cloneHostname,
|
||||
resolvedIP ? resolvedIP : "unresolved")));
|
||||
clonePort)));
|
||||
|
||||
/* Build query to check if clone is connected and get its sync state */
|
||||
|
||||
|
|
@ -278,6 +278,11 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
|||
cloneHostname);
|
||||
}
|
||||
|
||||
ereport(DEBUG2, (errmsg("sending replication status check query: %s to primary %s:%d",
|
||||
replicationCheckQuery->data,
|
||||
primaryWorkerNode->workerName,
|
||||
primaryWorkerNode->workerPort)));
|
||||
|
||||
int replicationCheckResultCode = SendRemoteCommand(primaryConnection,
|
||||
replicationCheckQuery->data);
|
||||
if (replicationCheckResultCode == 0)
|
||||
|
|
@ -305,8 +310,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
|||
primaryWorkerNode->workerName, primaryWorkerNode->
|
||||
workerPort),
|
||||
errdetail(
|
||||
"The clone must be actively replicating from the specified primary node. "
|
||||
"Check that the clone is running and properly configured for replication.")));
|
||||
"The clone must be actively replicating from the specified primary node"),
|
||||
errhint(
|
||||
"Verify the clone is running and properly configured for replication")));
|
||||
}
|
||||
|
||||
/* Check if clone is synchronous */
|
||||
|
|
@ -322,9 +328,9 @@ EnsureValidCloneMode(WorkerNode *primaryWorkerNode,
|
|||
"cannot %s clone %s:%d as it is configured as a synchronous replica",
|
||||
operation, cloneHostname, clonePort),
|
||||
errdetail(
|
||||
"Promoting a synchronous clone can cause data consistency issues. "
|
||||
"Please configure it as an asynchronous replica first.")))
|
||||
;
|
||||
"Promoting a synchronous clone can cause data consistency issues"),
|
||||
errhint(
|
||||
"Configure clone as an asynchronous replica")));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -13,10 +13,6 @@
|
|||
|
||||
#include "pg_version_constants.h"
|
||||
|
||||
/* we need these for PG-18’s PushActiveSnapshot/PopActiveSnapshot APIs */
|
||||
#include "access/xact.h"
|
||||
#include "utils/snapmgr.h"
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_18
|
||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
||||
create_foreignscan_path( \
|
||||
|
|
@ -40,14 +36,6 @@
|
|||
/* PG-18 unified row-compare operator codes under COMPARE_* */
|
||||
#define ROWCOMPARE_NE COMPARE_NE
|
||||
|
||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
||||
do { \
|
||||
Snapshot __snap = GetTransactionSnapshot(); \
|
||||
PushActiveSnapshot(__snap); \
|
||||
CatalogTupleInsert((rel), (tup)); \
|
||||
PopActiveSnapshot(); \
|
||||
} while (0)
|
||||
|
||||
#elif PG_VERSION_NUM >= PG_VERSION_17
|
||||
#define create_foreignscan_path_compat(a, b, c, d, e, f, g, h, i, j, k) \
|
||||
create_foreignscan_path( \
|
||||
|
|
@ -56,9 +44,6 @@
|
|||
(g), (h), (i), (j), (k) \
|
||||
)
|
||||
|
||||
/* no-op wrapper on older PGs */
|
||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
||||
CatalogTupleInsert((rel), (tup))
|
||||
#endif
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_17
|
||||
|
|
@ -469,10 +454,6 @@ getStxstattarget_compat(HeapTuple tup)
|
|||
k) create_foreignscan_path(a, b, c, d, e, f, g, h, \
|
||||
i, k)
|
||||
|
||||
/* no-op wrapper on older PGs */
|
||||
#define CATALOG_INSERT_WITH_SNAPSHOT(rel, tup) \
|
||||
CatalogTupleInsert((rel), (tup))
|
||||
|
||||
#define getProcNo_compat(a) (a->pgprocno)
|
||||
#define getLxid_compat(a) (a->lxid)
|
||||
|
||||
|
|
|
|||
|
|
@ -262,6 +262,9 @@ DEPS = {
|
|||
"multi_subquery_in_where_reference_clause": TestDeps(
|
||||
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
|
||||
),
|
||||
"subquery_in_where": TestDeps(
|
||||
"minimal_schedule", ["multi_behavioral_analytics_create_table"]
|
||||
),
|
||||
}
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -110,13 +110,100 @@ SELECT * FROM citus_stats
|
|||
citus_aggregated_stats | citus_local_current_check | rlsuser | 0.142857 | {user1} | {0.714286}
|
||||
(9 rows)
|
||||
|
||||
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
|
||||
-- this tests casting numbers like 3.729223e+06 to bigint
|
||||
CREATE TABLE organizations (
|
||||
org_id bigint,
|
||||
id int
|
||||
);
|
||||
SELECT create_distributed_table('organizations', 'org_id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO organizations(org_id, id)
|
||||
SELECT i, 1
|
||||
FROM generate_series(1,2000000) i;
|
||||
ANALYZE organizations;
|
||||
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||
WHERE tablename IN ('organizations')
|
||||
ORDER BY 1;
|
||||
attname | null_frac | most_common_vals | most_common_freqs
|
||||
---------------------------------------------------------------------
|
||||
id | 0 | {1} | {1}
|
||||
(1 row)
|
||||
|
||||
-- more real-world scenario:
|
||||
-- outputs of pg_stats and citus_stats are NOT the same
|
||||
-- but citus_stats does a fair estimation job
|
||||
SELECT setseed(0.42);
|
||||
setseed
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
|
||||
FROM generate_series(1,11) d(i);
|
||||
-- frequent customer
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
|
||||
FROM generate_series(1, 57) sk(i);
|
||||
-- popular product
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
|
||||
FROM generate_series(1, 37) sk(i);
|
||||
-- frequent customer
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
|
||||
FROM generate_series(1, 33) sk(i);
|
||||
ANALYZE orders;
|
||||
-- pg_stats
|
||||
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
|
||||
WHERE tablename IN ('orders')
|
||||
ORDER BY 3;
|
||||
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
|
||||
---------------------------------------------------------------------
|
||||
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
|
||||
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
|
||||
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
|
||||
citus_aggregated_stats | orders | quantity | 0.492754 | {26,23,6,8,11,12,13,17,20,25,30,4,14,15,16,19,24,27,35,36,38,40} | {0.0362319,0.0289855,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
|
||||
(4 rows)
|
||||
|
||||
SELECT create_distributed_table('orders', 'id');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$citus_aggregated_stats.orders$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
ANALYZE orders;
|
||||
-- citus_stats
|
||||
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||
WHERE tablename IN ('orders')
|
||||
ORDER BY 3;
|
||||
schemaname | tablename | attname | null_frac | most_common_vals | most_common_freqs
|
||||
---------------------------------------------------------------------
|
||||
citus_aggregated_stats | orders | custid | 0.268116 | {17,76} | {0.413043,0.23913}
|
||||
citus_aggregated_stats | orders | id | 0 | {1200,1390} | {0.413043,0.23913}
|
||||
citus_aggregated_stats | orders | product | 0 | {product3,product2,product0,product1,product9,product4,product8,product5,product10,product6} | {0.347826,0.15942,0.115942,0.108696,0.0652174,0.057971,0.0507246,0.0362319,0.0289855,0.0289855}
|
||||
citus_aggregated_stats | orders | quantity | 0.492754 | {26,13,17,20,23,8,11,12,14,16,19,24,25,27,30,35,38,40,6} | {0.0362319,0.0217391,0.0217391,0.0217391,0.0217391,0.0217391,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928,0.0144928}
|
||||
(4 rows)
|
||||
|
||||
RESET SESSION AUTHORIZATION;
|
||||
DROP SCHEMA citus_aggregated_stats CASCADE;
|
||||
NOTICE: drop cascades to 6 other objects
|
||||
NOTICE: drop cascades to 8 other objects
|
||||
DETAIL: drop cascades to table current_check
|
||||
drop cascades to table dist_current_check
|
||||
drop cascades to table ref_current_check
|
||||
drop cascades to table citus_local_current_check_1870003
|
||||
drop cascades to table ref_current_check_1870002
|
||||
drop cascades to table citus_local_current_check
|
||||
drop cascades to table organizations
|
||||
drop cascades to table orders
|
||||
DROP USER user1;
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ SELECT pg_sleep(5);
|
|||
-- the function returns the new node id
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
SELECT * from pg_dist_node ORDER by nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||
|
|
@ -234,12 +234,11 @@ SET client_min_messages to 'LOG';
|
|||
SELECT citus_promote_clone_and_rebalance(:clone_node_id);
|
||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 3), original primary localhost:xxxxx (ID 1)
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 1)
|
||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 1)
|
||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||
NOTICE: Clone node localhost:xxxxx (ID 3) has been successfully promoted.
|
||||
|
|
|
|||
|
|
@ -17,9 +17,10 @@ SELECT * from pg_dist_node ORDER by nodeid;
|
|||
-- this should fail as it is not a valid replica of worker_1
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
||||
DETAIL: The clone must be actively replicating from the specified primary node
|
||||
HINT: Verify the clone is running and properly configured for replication
|
||||
SELECT * from pg_dist_node ORDER by nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||
---------------------------------------------------------------------
|
||||
|
|
@ -71,9 +72,10 @@ SELECT nodename, nodeport, count(shardid) FROM pg_dist_shard_placement GROUP BY
|
|||
-- Try to add replica of worker_node2 as a clone of worker_node1
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
||||
DETAIL: The clone must be actively replicating from the specified primary node
|
||||
HINT: Verify the clone is running and properly configured for replication
|
||||
-- Test 1: Try to promote a non-existent clone node
|
||||
SELECT citus_promote_clone_and_rebalance(clone_nodeid =>99999);
|
||||
ERROR: Clone node with ID 99999 not found.
|
||||
|
|
@ -87,7 +89,7 @@ ERROR: Node localhost:xxxxx (ID 1) is not a valid clone or its primary node ID
|
|||
-- register the new node as a clone, This should pass
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
SELECT * from pg_dist_node ORDER by nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||
|
|
@ -108,12 +110,11 @@ SELECT :clone_node_id;
|
|||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id, rebalance_strategy => 'invalid_strategy');
|
||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 2)
|
||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 2)
|
||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||
NOTICE: Clone node localhost:xxxxx (ID 4) has been successfully promoted.
|
||||
|
|
@ -133,9 +134,10 @@ BEGIN;
|
|||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id);
|
||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 4), original primary localhost:xxxxx (ID 2)
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
ERROR: clone localhost:xxxxx is not connected to primary localhost:xxxxx
|
||||
DETAIL: The clone must be actively replicating from the specified primary node. Check that the clone is running and properly configured for replication.
|
||||
DETAIL: The clone must be actively replicating from the specified primary node
|
||||
HINT: Verify the clone is running and properly configured for replication
|
||||
ROLLBACK;
|
||||
-- Verify no data is lost after rooling back the transaction
|
||||
SELECT COUNT(*) from backup_test;
|
||||
|
|
@ -163,7 +165,7 @@ SELECT * from pg_dist_node ORDER by nodeid;
|
|||
SELECT master_add_node('localhost', :worker_3_port) AS nodeid_3 \gset
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_3_port, 'localhost', :worker_3_port) AS clone_node_id_3 \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
set citus.shard_count = 100;
|
||||
CREATE TABLE backup_test2(id int, value text);
|
||||
|
|
@ -217,12 +219,11 @@ SET client_min_messages to 'LOG';
|
|||
SELECT citus_promote_clone_and_rebalance(clone_nodeid => :clone_node_id_3);
|
||||
NOTICE: Starting promotion process for clone node localhost:xxxxx (ID 6), original primary localhost:xxxxx (ID 5)
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
NOTICE: clone localhost:xxxxx is properly connected to primary localhost:xxxxx and is not synchronous
|
||||
NOTICE: Blocking writes on shards of original primary node localhost:xxxxx (group 5)
|
||||
NOTICE: Blocking all writes to worker node localhost:xxxxx (ID 5)
|
||||
NOTICE: Waiting for clone localhost:xxxxx to catch up with primary localhost:xxxxx (timeout: 300 seconds)
|
||||
NOTICE: replication lag between localhost:xxxxx and localhost:xxxxx is 0 bytes
|
||||
NOTICE: Clone localhost:xxxxx is now caught up with primary localhost:xxxxx.
|
||||
NOTICE: Attempting to promote clone localhost:xxxxx via pg_promote().
|
||||
NOTICE: Clone node localhost:xxxxx (ID 6) has been successfully promoted.
|
||||
|
|
|
|||
|
|
@ -27,15 +27,17 @@ SELECT master_remove_node('localhost', :follower_worker_2_port);
|
|||
-- this should fail as the replica is a synchronous replica that is not allowed
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_1_port, 'localhost', :worker_1_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
||||
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
|
||||
DETAIL: Promoting a synchronous clone can cause data consistency issues
|
||||
HINT: Configure clone as an asynchronous replica
|
||||
-- this should fail as the replica is a synchronous replica that is not allowed
|
||||
SELECT citus_add_clone_node('localhost', :follower_worker_2_port, 'localhost', :worker_2_port) AS clone_node_id \gset
|
||||
NOTICE: checking replication relationship between primary localhost:xxxxx and clone localhost:xxxxx
|
||||
NOTICE: checking replication for node localhost (resolved IP: ::1)
|
||||
NOTICE: checking replication status of clone node localhost:xxxxx
|
||||
ERROR: cannot add clone localhost:xxxxx as it is configured as a synchronous replica
|
||||
DETAIL: Promoting a synchronous clone can cause data consistency issues. Please configure it as an asynchronous replica first.
|
||||
DETAIL: Promoting a synchronous clone can cause data consistency issues
|
||||
HINT: Configure clone as an asynchronous replica
|
||||
SELECT * from pg_dist_node ORDER by nodeid;
|
||||
nodeid | groupid | nodename | nodeport | noderack | hasmetadata | isactive | noderole | nodecluster | metadatasynced | shouldhaveshards | nodeisclone | nodeprimarynodeid
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
|||
|
|
@ -1657,7 +1657,7 @@ DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
|||
SHOW citus.version;
|
||||
citus.version
|
||||
---------------------------------------------------------------------
|
||||
13.2devel
|
||||
13.2.0
|
||||
(1 row)
|
||||
|
||||
-- ensure no unexpected objects were created outside pg_catalog
|
||||
|
|
|
|||
|
|
@ -1253,10 +1253,53 @@ SELECT vkey, pkey FROM t3;
|
|||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- Redundant WHERE clause with distributed parititioned table
|
||||
CREATE TABLE a (a int);
|
||||
INSERT INTO a VALUES (1);
|
||||
-- populated distributed partitioned table
|
||||
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
|
||||
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
|
||||
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
|
||||
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
|
||||
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
|
||||
SELECT create_distributed_table('partitioned_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
insert into partitioned_table(a) select i from generate_series(1,160) i;
|
||||
-- test citus table in init plan
|
||||
-- with redundant WHERE clause
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT * FROM partitioned_table
|
||||
) THEN 1 ELSE 0 END AS table_non_empty
|
||||
FROM a
|
||||
WHERE true;
|
||||
table_non_empty
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- test citus table in init plan
|
||||
-- with redundant WHERE clause involving
|
||||
-- a citus table
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT * FROM partitioned_table
|
||||
) THEN 1 ELSE 0 END AS table_non_empty
|
||||
FROM a
|
||||
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
|
||||
table_non_empty
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
DROP TABLE local_table;
|
||||
DROP TABLE t0;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t3;
|
||||
DROP TABLE t7;
|
||||
DROP TABLE a;
|
||||
DROP TABLE partitioned_table CASCADE;
|
||||
DROP SCHEMA subquery_in_where CASCADE;
|
||||
SET search_path TO public;
|
||||
|
|
|
|||
|
|
@ -82,6 +82,68 @@ SELECT * FROM citus_stats
|
|||
WHERE tablename IN ('current_check', 'dist_current_check', 'ref_current_check', 'citus_local_current_check')
|
||||
ORDER BY 1;
|
||||
|
||||
-- create a dist table with million rows to simulate 3.729223e+06 in reltuples
|
||||
-- this tests casting numbers like 3.729223e+06 to bigint
|
||||
|
||||
CREATE TABLE organizations (
|
||||
org_id bigint,
|
||||
id int
|
||||
);
|
||||
|
||||
SELECT create_distributed_table('organizations', 'org_id');
|
||||
|
||||
INSERT INTO organizations(org_id, id)
|
||||
SELECT i, 1
|
||||
FROM generate_series(1,2000000) i;
|
||||
|
||||
ANALYZE organizations;
|
||||
|
||||
SELECT attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||
WHERE tablename IN ('organizations')
|
||||
ORDER BY 1;
|
||||
|
||||
-- more real-world scenario:
|
||||
-- outputs of pg_stats and citus_stats are NOT the same
|
||||
-- but citus_stats does a fair estimation job
|
||||
|
||||
SELECT setseed(0.42);
|
||||
|
||||
CREATE TABLE orders (id bigint , custid int, product text, quantity int);
|
||||
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT i, (random() * 100)::int, 'product' || (random() * 10)::int, NULL
|
||||
FROM generate_series(1,11) d(i);
|
||||
|
||||
-- frequent customer
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT 1200, 17, 'product' || (random() * 10)::int, NULL
|
||||
FROM generate_series(1, 57) sk(i);
|
||||
|
||||
-- popular product
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT i+100 % 17, NULL, 'product3', (random() * 40)::int
|
||||
FROM generate_series(1, 37) sk(i);
|
||||
|
||||
-- frequent customer
|
||||
INSERT INTO orders(id, custid, product, quantity)
|
||||
SELECT 1390, 76, 'product' || ((random() * 20)::int % 3), (random() * 30)::int
|
||||
FROM generate_series(1, 33) sk(i);
|
||||
|
||||
ANALYZE orders;
|
||||
|
||||
-- pg_stats
|
||||
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM pg_stats
|
||||
WHERE tablename IN ('orders')
|
||||
ORDER BY 3;
|
||||
|
||||
SELECT create_distributed_table('orders', 'id');
|
||||
ANALYZE orders;
|
||||
|
||||
-- citus_stats
|
||||
SELECT schemaname, tablename, attname, null_frac, most_common_vals, most_common_freqs FROM citus_stats
|
||||
WHERE tablename IN ('orders')
|
||||
ORDER BY 3;
|
||||
|
||||
RESET SESSION AUTHORIZATION;
|
||||
DROP SCHEMA citus_aggregated_stats CASCADE;
|
||||
DROP USER user1;
|
||||
|
|
|
|||
|
|
@ -929,10 +929,42 @@ where TRUE or (((t3.vkey) >= (select
|
|||
-- Distributed table t3 is now empty
|
||||
SELECT vkey, pkey FROM t3;
|
||||
|
||||
-- Redundant WHERE clause with distributed parititioned table
|
||||
CREATE TABLE a (a int);
|
||||
INSERT INTO a VALUES (1);
|
||||
|
||||
-- populated distributed partitioned table
|
||||
create table partitioned_table (a INT UNIQUE) PARTITION BY RANGE(a);
|
||||
CREATE TABLE par_1 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (41);
|
||||
CREATE TABLE par_2 PARTITION OF partitioned_table FOR VALUES FROM (41) TO (81);
|
||||
CREATE TABLE par_3 PARTITION OF partitioned_table FOR VALUES FROM (81) TO (121);
|
||||
CREATE TABLE par_4 PARTITION OF partitioned_table FOR VALUES FROM (121) TO (161);
|
||||
SELECT create_distributed_table('partitioned_table', 'a');
|
||||
insert into partitioned_table(a) select i from generate_series(1,160) i;
|
||||
|
||||
-- test citus table in init plan
|
||||
-- with redundant WHERE clause
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT * FROM partitioned_table
|
||||
) THEN 1 ELSE 0 END AS table_non_empty
|
||||
FROM a
|
||||
WHERE true;
|
||||
|
||||
-- test citus table in init plan
|
||||
-- with redundant WHERE clause involving
|
||||
-- a citus table
|
||||
SELECT CASE WHEN EXISTS (
|
||||
SELECT * FROM partitioned_table
|
||||
) THEN 1 ELSE 0 END AS table_non_empty
|
||||
FROM a
|
||||
WHERE true OR NOT EXISTS (SELECT 1 FROM t1);
|
||||
|
||||
DROP TABLE local_table;
|
||||
DROP TABLE t0;
|
||||
DROP TABLE t1;
|
||||
DROP TABLE t3;
|
||||
DROP TABLE t7;
|
||||
DROP TABLE a;
|
||||
DROP TABLE partitioned_table CASCADE;
|
||||
DROP SCHEMA subquery_in_where CASCADE;
|
||||
SET search_path TO public;
|
||||
|
|
|
|||
Loading…
Reference in New Issue