add more tests, improve query stat counters

avoid incrementing the counters for plain explain
track stats for copy
pull/7917/head
Onur Tirtir 2025-04-24 11:05:11 +03:00
parent 3c6240639c
commit a3448f2fd7
9 changed files with 1224 additions and 150 deletions

View File

@ -2727,11 +2727,15 @@ CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId)
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
const bool nonPublishableData = false;
/* we don't track query counters when distributing a table */
const bool trackQueryCounters = false;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
columnNameList,
partitionColumnIndex,
estate, NULL, nonPublishableData);
estate, NULL, nonPublishableData,
trackQueryCounters);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);

View File

@ -106,6 +106,7 @@
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/stat_counters.h"
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
@ -499,10 +500,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag)
/* set up the destination for the COPY */
const bool publishableData = true;
/* we want to track query counters for "COPY (to) distributed-table .." commands */
const bool trackQueryCounters = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList,
partitionColumnIndex,
executorState, NULL,
publishableData);
publishableData,
trackQueryCounters);
/* if the user specified an explicit append-to_shard option, write to it */
uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement);
@ -1877,11 +1882,15 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
* of intermediate results that are co-located with the actual table.
* The names of the intermediate results with be of the form:
* intermediateResultIdPrefix_<shardid>
*
* If trackQueryCounters is true, the COPY will increment the query stat
* counters as needed at the end of the COPY.
*/
CitusCopyDestReceiver *
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex,
EState *executorState,
char *intermediateResultIdPrefix, bool isPublishable)
char *intermediateResultIdPrefix, bool isPublishable,
bool trackQueryCounters)
{
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0(
sizeof(CitusCopyDestReceiver));
@ -1901,6 +1910,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
copyDest->isPublishable = isPublishable;
copyDest->trackQueryCounters = trackQueryCounters;
return copyDest;
}
@ -2587,8 +2597,9 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu
/*
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
* CitusCopyDestReceiver. It ends the COPY on all the open connections and closes
* the relation.
* CitusCopyDestReceiver. It ends the COPY on all the open connections, closes
* the relation and increments the query stat counters based on the shards
* copied into if requested.
*/
static void
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
@ -2599,6 +2610,26 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
ListCell *connectionStateCell = NULL;
Relation distributedRelation = copyDest->distributedRelation;
/*
* Increment the query stat counters based on the shards copied into
* if requested.
*/
if (copyDest->trackQueryCounters)
{
int copiedShardCount =
copyDest->shardStateHash ?
hash_get_num_entries(copyDest->shardStateHash) :
0;
if (copiedShardCount <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
}
List *connectionStateList = ConnectionStateList(connectionStateHash);
FinishLocalColocatedIntermediateFiles(copyDest);
@ -3141,6 +3172,15 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag)
SendCopyEnd(copyOutState);
if (list_length(shardIntervalList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
table_close(distributedRelation, AccessShareLock);
if (completionTag != NULL)

View File

@ -221,18 +221,6 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags)
CitusBeginModifyScan(node, estate, eflags);
}
/*
* For INSERT..SELECT / MERGE via coordinator or re-partitioning, we
* increment the stat counters in the respective ExecCustomScan functions.
*/
if (IsMultiTaskPlan(distributedPlan))
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
/*
* If there is force_delgation functions' distribution argument set,
@ -275,8 +263,19 @@ CitusExecScan(CustomScanState *node)
if (!scanState->finishedRemoteScan)
{
bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan);
AdaptiveExecutor(scanState);
if (isMultiTaskPlan)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
scanState->finishedRemoteScan = true;
}

View File

@ -116,23 +116,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
GetDistributedPlan((CustomScan *) selectPlan->planTree);
Job *distSelectJob = distSelectPlan->workerJob;
List *distSelectTaskList = distSelectJob->taskList;
if (list_length(distSelectTaskList) <= 1)
{
/*
* Probably we will never get here for a repartitioned
* INSERT..SELECT because when the source is a single shard
* table, we should most probably choose to use
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
* here.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
bool randomAccess = true;
bool interTransactions = false;
bool binaryFormat =
@ -196,6 +179,22 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
targetRelation,
binaryFormat);
if (list_length(distSelectTaskList) <= 1)
{
/*
* Probably we will never get here for a repartitioned
* INSERT..SELECT because when the source is a single shard
* table, we should most probably choose to use
* MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this
* here.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
/*
* At this point select query has been executed on workers and results
* have been fetched in such a way that they are colocated with corresponding
@ -207,15 +206,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
redistributedResults,
binaryFormat);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState);
@ -225,6 +215,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
taskList, tupleDest,
hasReturning);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
executorState->es_processed = rowsInserted;
if (SortReturning && hasReturning)
@ -278,15 +277,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
}
}
if (list_length(prunedTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
if (prunedTaskList != NIL)
{
bool randomAccess = true;
@ -308,6 +298,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
SortTupleStore(scanState);
}
}
if (list_length(prunedTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
}
else
{
@ -316,29 +315,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node)
ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan,
executorState);
/*
* At this point, we already incremented the query counters for the SELECT
* query indirectly via ExecutePlanIntoRelation() (if needed), so now we
* need to increment the counters for the INSERT query as well.
*/
if (IsCitusTable(targetRelationId))
{
if (HasDistributionKey(targetRelationId))
{
/*
* We assume it's a multi-shard insert if the table has a
* distribution column. Although this may not be true, e.g.,
* when all the data we read from source goes to the same
* shard of the target table, we cannot know that in advance.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
}
}
scanState->finishedRemoteScan = true;
@ -372,6 +348,12 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
/*
* We don't track query counters for the COPY commands that are executed to
* prepare intermediate results.
*/
const bool trackQueryCounters = false;
/* set up a DestReceiver that copies into the intermediate table */
const bool publishableData = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
@ -379,7 +361,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
partitionColumnIndex,
executorState,
intermediateResultIdPrefix,
publishableData);
publishableData,
trackQueryCounters);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);
@ -408,13 +391,20 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId,
columnNameList);
/*
* We want to track query counters for the COPY commands that are executed to
* perform the final INSERT for such INSERT..SELECT queries.
*/
const bool trackQueryCounters = true;
/* set up a DestReceiver that copies into the distributed table */
const bool publishableData = true;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
columnNameList,
partitionColumnIndex,
executorState, NULL,
publishableData);
publishableData,
trackQueryCounters);
ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest);

View File

@ -126,16 +126,6 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
GetDistributedPlan((CustomScan *) sourcePlan->planTree);
Job *distSourceJob = distSourcePlan->workerJob;
List *distSourceTaskList = distSourceJob->taskList;
if (list_length(distSourceTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
bool binaryFormat =
CanUseBinaryCopyFormatForTargetList(sourceQuery->targetList);
@ -177,6 +167,21 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
distSourceTaskList, partitionColumnIndex,
targetRelation, binaryFormat);
if (list_length(distSourceTaskList) <= 1)
{
/*
* Probably we will never get here for a repartitioned MERGE
* because when the source is a single shard table, we should
* most probably choose to use ExecuteSourceAtCoordAndRedistribution(),
* but we still keep this here.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
ereport(DEBUG1, (errmsg("Executing final MERGE on workers using "
"intermediate results")));
@ -192,15 +197,6 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
redistributedResults,
binaryFormat);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
scanState->tuplestorestate =
tuplestore_begin_heap(randomAccess, interTransactions, work_mem);
ParamListInfo paramListInfo = executorState->es_param_list_info;
@ -213,6 +209,16 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState)
tupleDest,
hasReturning,
paramListInfo);
if (list_length(taskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
executorState->es_processed = rowsMerged;
}
@ -305,18 +311,13 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList);
}
if (list_length(prunedTaskList) <= 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
if (prunedTaskList == NIL)
{
/* No task to execute */
/*
* No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD
* as per our convention.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
return;
}
@ -336,6 +337,16 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState)
tupleDest,
hasReturning,
paramListInfo);
if (list_length(prunedTaskList) == 1)
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
executorState->es_processed = rowsMerged;
}
@ -361,6 +372,12 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
List *columnNameList =
BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList);
/*
* We don't track query counters for the COPY commands that are executed to
* prepare intermediate results.
*/
const bool trackQueryCounters = false;
/* set up a DestReceiver that copies into the intermediate file */
const bool publishableData = false;
CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId,
@ -368,36 +385,14 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId,
partitionColumnIndex,
executorState,
intermediateResultIdPrefix,
publishableData);
publishableData,
trackQueryCounters);
/* We can skip when writing to intermediate files */
copyDest->skipCoercions = true;
ExecutePlanIntoDestReceiver(sourcePlan, paramListInfo, (DestReceiver *) copyDest);
/*
* At this point, we already incremented the query counters for the SELECT
* query indirectly via ExecutePlanIntoDestReceiver() (if needed), so now we
* need to increment the counters for the MERGE query as well.
*/
if (IsCitusTable(targetRelationId))
{
if (HasDistributionKey(targetRelationId))
{
/*
* We assume it's a multi-shard insert if the table has a
* distribution column. Although this may not be true, e.g.,
* when all the data we read from source goes to the same
* shard of the target table, we cannot know that in advance.
*/
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD);
}
else
{
IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD);
}
}
executorState->es_processed = copyDest->tuplesSent;
XactModificationLevel = XACT_MODIFICATION_DATA;

View File

@ -154,6 +154,11 @@ typedef struct CitusCopyDestReceiver
* when merging into the target tables.
*/
bool skipCoercions;
/*
* Determines whether the COPY command should track query stat counters.
*/
bool trackQueryCounters;
} CitusCopyDestReceiver;
@ -170,7 +175,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
int partitionColumnIndex,
EState *executorState,
char *intermediateResultPrefix,
bool isPublishable);
bool isPublishable,
bool trackQueryCounters);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList);

View File

@ -26,10 +26,24 @@
*/
typedef enum
{
/*
* These are mainly tracked by connection_management.c and
* adaptive_executor.c.
*/
STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED,
STAT_CONNECTION_ESTABLISHMENT_FAILED,
STAT_CONNECTION_REUSED,
/*
* These are maintained by ExecCustomScan methods implemented
* for CustomScan nodes provided by Citus to account for actual
* execution of the queries and subplans. By maintaining these
* counters in ExecCustomScan callbacks, we ensure avoid
* incrementing them for plain EXPLAIN (i.e., without ANALYZE).
* queries. And, prefering the executor methods rather than the
* planner methods helps us capture the execution of prepared
* statements too.
*/
STAT_QUERY_EXECUTION_SINGLE_SHARD,
STAT_QUERY_EXECUTION_MULTI_SHARD,

View File

@ -59,9 +59,8 @@ SELECT citus_stat_counters_reset(null);
(1 row)
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
-- citus_stat_counters lists all the databases that currently exist
SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database);
?column?
---------------------------------------------------------------------
t
@ -476,6 +475,131 @@ SELECT citus_add_local_table_to_metadata('citus_local');
(1 row)
GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user;
-- test copy while we're superuser
-- cannot call copy via exec_query_and_check_query_counters
SET citus.enable_stat_counters TO true;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table(a) from program 'seq 1'; -- single shard
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
1 | 0
(1 row)
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table(a) from program 'seq 2'; -- multi-shard
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
0 | 1
(1 row)
-- load some data
insert into dist_table (a, b) select i, i from generate_series(1, 2) as i;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table to stdout;
1 \N
1 \N
1 1
2 \N
2 2
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
0 | 1
(1 row)
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy (select * from dist_table join citus_local on dist_table.a = citus_local.a) to stdout;
1 \N 1 1
1 \N 1 1
1 1 1 1
2 \N 2 2
2 2 2 2
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
1 | 1
(1 row)
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table to :'temp_dir''stat_counters_dist_table_dump';
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
0 | 1
(1 row)
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table from :'temp_dir''stat_counters_dist_table_dump';
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
query_execution_single_shard_diff | query_execution_multi_shard_diff
---------------------------------------------------------------------
0 | 1
(1 row)
-- empty the table before rest of the tests
truncate dist_table;
\c stat_counters_test_db postgres - :master_port
-- reset from another database as superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'regression';
@ -541,6 +665,23 @@ CALL exec_query_and_check_query_counters($$
$$,
0, 1
);
-- same with explain
--
-- Explain without analyze should never increment the counters.
-- This also applies to all such tests in this file.
CALL exec_query_and_check_query_counters($$
EXPLAIN
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 1
);
SET citus.enable_repartition_joins TO true;
-- A repartition join only increments query_execution_multi_shard once, although
-- this doesn't feel so much ideal.
@ -568,6 +709,22 @@ CALL exec_query_and_check_query_counters($$
$$,
1, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
0, 0
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
$$,
@ -595,12 +752,13 @@ CALL exec_query_and_check_query_counters($$
$$,
0, 1
);
-- single-shard inserts
-- multi-shard insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3)
$$,
1, 0
0, 1
);
-- single-shard insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-4, -4)
$$,
@ -645,7 +803,7 @@ CALL exec_query_and_check_query_counters($$
);
-- Select query is multi-shard and the same is also true for the final insert
-- but only if it doesn't prune to zero shards, which happens when the source
-- table is empty. So here, we both query_execution_multi_shard and
-- table is empty. So here, both query_execution_multi_shard and
-- query_execution_single_shard are incremented by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
@ -665,6 +823,20 @@ CALL exec_query_and_check_query_counters($$
$$,
1, 0
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM uncolocated_single_shard
$$,
@ -724,6 +896,165 @@ CALL exec_query_and_check_query_counters($$
$$,
1, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
1, 1
);
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands via coordinator
CONTEXT: SQL statement "
EXPLAIN (ANALYZE)
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
"
PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE
insert into dist_table_1 (a, b) values (1, 1), (2, 2), (3, 3);
-- First, we pull the select (multi-shard) query to the query node and create an
-- intermediate results for it because we cannot pushdown the whole INSERT query.
-- Then, the select query becomes of the form:
-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) intermediate_result
--
-- So, while repartitioning the select query, we perform a single-shard read
-- query because we read from an intermediate result and we then partition it
-- across the nodes. For the read part, we increment query_execution_single_shard
-- because we go through distributed planning if there are read_intermediate_result()
-- calls in a query, so it happens to be a distributed plan and goes through our
-- CustomScan callbacks. For the repartitioning of the intermediate result, just
-- as usual, we don't increment any counters.
--
-- Then, the final insert query happens between the distributed table and the
-- colocated intermediate result, so this increments query_execution_multi_shard
-- by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q RETURNING *
$$,
1, 2
);
-- Same query but without RETURNING - this goes through a different code path, but
-- the counters are still incremented the same way as above.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
$$,
1, 2
);
-- Same query but inserting a single row makes the final query single-shard too.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q
$$,
2, 1
);
-- A similar query but with a cte.
-- Subplan execution for the cte, additionally, first increments query_execution_multi_shard
-- for "SELECT * FROM dist_table" when creating the intermediate result for it and then
-- query_execution_single_shard for;
-- <intermediate-result>
-- EXCEPT
-- SELECT i as a, i as b FROM generate_series(10, 32) AS i
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
INSERT INTO dist_table
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
-- the same query but this time the cte is part of the select, not the insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands via coordinator
CONTEXT: SQL statement "
EXPLAIN (ANALYZE)
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
"
PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
WITH cte AS (
-- multi-shard subplan (lhs of EXCEPT)
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo
@ -743,9 +1074,199 @@ CALL exec_query_and_check_query_counters($$
0, 1
);
RESET citus.local_table_join_policy;
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING dist_table_1 AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- First, we pull the merge (multi-shard) query to the query node and create an
-- intermediate results for it because we cannot pushdown the whole INSERT query.
-- Then, the merge query becomes of the form:
-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) citus_insert_select_subquery
--
-- So, while repartitioning the source query, we perform a single-shard read
-- query because we read from an intermediate result and we then partition it
-- across the nodes. For the read part, we increment query_execution_single_shard
-- because we go through distributed planning if there are read_intermediate_result()
-- calls in a query, so it happens to be a distributed plan and goes through our
-- CustomScan callbacks. For the repartitioning of the intermediate result, just
-- as usual, we don't increment any counters.
--
-- Then, the final merge query happens between the distributed table and the
-- colocated intermediate result, so this increments query_execution_multi_shard
-- by 1.
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
1, 2
);
truncate dist_table;
CALL exec_query_and_check_query_counters($$
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
ERROR: EXPLAIN ANALYZE is currently not supported for MERGE INTO ... commands with repartitioning
CONTEXT: SQL statement "
EXPLAIN (ANALYZE)
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
"
PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE
truncate dist_table, ref_table, uncolocated_dist_table;
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i;
insert into uncolocated_dist_table (a, b) select i, i from generate_series(1, 95) as i;
insert into ref_table (a, b) select i, i from generate_series(33, 128) as i;
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT uncolocated_dist_table.a, uncolocated_dist_table.b
FROM uncolocated_dist_table JOIN ref_table ON uncolocated_dist_table.a = ref_table.a
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
truncate dist_table, dist_table_1;
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i;
insert into dist_table_1 (a, b) select i, i from generate_series(1, 95) as i;
-- Not ideal but since this contains both distributed and reference tables,
-- we directly decide partitioning for the source instead of pulling it to
-- the query node and repartitioning from there.
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b
FROM dist_table_1 JOIN ref_table ON dist_table_1.a = ref_table.a
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
-- pushable
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- pushable
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING (SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1) AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- pushable
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING dist_table_1 AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- citus_stat_counters lists all the databases that currently exist
SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database);
?column?
---------------------------------------------------------------------
t

View File

@ -38,9 +38,8 @@ SET citus.enable_stat_counters TO false;
SELECT citus_stat_counters(null);
SELECT citus_stat_counters_reset(null);
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
-- citus_stat_counters lists all the databases that currently exist
SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database);
-- Verify that providing an oid that doesn't correspond to any database
-- returns an empty set. We know that "SELECT MAX(oid)+1 FROM pg_database"
@ -245,6 +244,113 @@ SELECT citus_add_local_table_to_metadata('citus_local');
GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user;
-- test copy while we're superuser
-- cannot call copy via exec_query_and_check_query_counters
SET citus.enable_stat_counters TO true;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table(a) from program 'seq 1'; -- single shard
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table(a) from program 'seq 2'; -- multi-shard
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
-- load some data
insert into dist_table (a, b) select i, i from generate_series(1, 2) as i;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table to stdout;
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy (select * from dist_table join citus_local on dist_table.a = citus_local.a) to stdout;
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table to :'temp_dir''stat_counters_dist_table_dump';
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
SELECT query_execution_single_shard AS old_query_execution_single_shard,
query_execution_multi_shard AS old_query_execution_multi_shard
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q \gset
copy dist_table from :'temp_dir''stat_counters_dist_table_dump';
SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff,
query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff
FROM (
SELECT (citus_stat_counters(oid)).*
FROM pg_database WHERE datname = current_database()
) q;
-- empty the table before rest of the tests
truncate dist_table;
\c stat_counters_test_db postgres - :master_port
-- reset from another database as superuser
@ -308,6 +414,25 @@ CALL exec_query_and_check_query_counters($$
0, 1
);
-- same with explain
--
-- Explain without analyze should never increment the counters.
-- This also applies to all such tests in this file.
CALL exec_query_and_check_query_counters($$
EXPLAIN
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a
$$,
0, 1
);
SET citus.enable_repartition_joins TO true;
-- A repartition join only increments query_execution_multi_shard once, although
-- this doesn't feel so much ideal.
@ -338,6 +463,24 @@ CALL exec_query_and_check_query_counters($$
1, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
0, 0
);
-- same with explain analyze
--
-- this time, query_execution_multi_shard is incremented twice because of #4212
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q
$$,
1, 2
);
CALL exec_query_and_check_query_counters($$
DELETE FROM dist_table WHERE a = 1
$$,
@ -369,14 +512,14 @@ CALL exec_query_and_check_query_counters($$
0, 1
);
-- single-shard inserts
-- multi-shard insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3)
$$,
1, 0
0, 1
);
-- single-shard insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table (a, b) VALUES (-4, -4)
$$,
@ -425,7 +568,7 @@ CALL exec_query_and_check_query_counters($$
-- Select query is multi-shard and the same is also true for the final insert
-- but only if it doesn't prune to zero shards, which happens when the source
-- table is empty. So here, we both query_execution_multi_shard and
-- table is empty. So here, both query_execution_multi_shard and
-- query_execution_single_shard are incremented by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM uncolocated_dist_table
@ -449,6 +592,22 @@ CALL exec_query_and_check_query_counters($$
1, 0
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO single_shard SELECT * FROM single_shard_1
$$,
1, 0
);
CALL exec_query_and_check_query_counters($$
INSERT INTO single_shard SELECT * FROM uncolocated_single_shard
$$,
@ -514,6 +673,155 @@ CALL exec_query_and_check_query_counters($$
1, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO citus_local (a, b) SELECT * FROM dist_table
$$,
1, 1
);
insert into dist_table_1 (a, b) values (1, 1), (2, 2), (3, 3);
-- First, we pull the select (multi-shard) query to the query node and create an
-- intermediate results for it because we cannot pushdown the whole INSERT query.
-- Then, the select query becomes of the form:
-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) intermediate_result
--
-- So, while repartitioning the select query, we perform a single-shard read
-- query because we read from an intermediate result and we then partition it
-- across the nodes. For the read part, we increment query_execution_single_shard
-- because we go through distributed planning if there are read_intermediate_result()
-- calls in a query, so it happens to be a distributed plan and goes through our
-- CustomScan callbacks. For the repartitioning of the intermediate result, just
-- as usual, we don't increment any counters.
--
-- Then, the final insert query happens between the distributed table and the
-- colocated intermediate result, so this increments query_execution_multi_shard
-- by 1.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q RETURNING *
$$,
1, 2
);
-- Same query but without RETURNING - this goes through a different code path, but
-- the counters are still incremented the same way as above.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
$$,
1, 2
);
-- Same query but inserting a single row makes the final query single-shard too.
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q
$$,
2, 1
);
-- A similar query but with a cte.
-- Subplan execution for the cte, additionally, first increments query_execution_multi_shard
-- for "SELECT * FROM dist_table" when creating the intermediate result for it and then
-- query_execution_single_shard for;
-- <intermediate-result>
-- EXCEPT
-- SELECT i as a, i as b FROM generate_series(10, 32) AS i
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
INSERT INTO dist_table
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
-- the same query but this time the cte is part of the select, not the insert
CALL exec_query_and_check_query_counters($$
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
INSERT INTO dist_table
WITH cte AS (
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q
JOIN cte ON q.a = cte.a
RETURNING *
$$,
2, 3
);
-- A similar one but without the insert, so we would normally expect 2 increments
-- for query_execution_single_shard and 2 for query_execution_multi_shard instead
-- of 3 since the insert is not there anymore.
--
-- But this time we observe more counter increments because we execute the subplans
-- twice because of #4212.
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
-- single-shard subplan (whole cte)
WITH cte AS (
-- multi-shard subplan (lhs of EXCEPT)
SELECT * FROM dist_table
EXCEPT
SELECT i as a, i as b FROM generate_series(10, 32) AS i
)
SELECT q.a, q.b
FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q)
JOIN cte ON q.a = cte.a
$$,
3, 4
);
-- safe to push-down
CALL exec_query_and_check_query_counters($$
SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo
@ -536,9 +844,206 @@ CALL exec_query_and_check_query_counters($$
);
RESET citus.local_table_join_policy;
-- citus_stat_counters lists all the databases that currently exist,
-- so we should get 5 rows here.
SELECT COUNT(*) = 5 FROM citus_stat_counters;
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING dist_table_1 AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- First, we pull the merge (multi-shard) query to the query node and create an
-- intermediate results for it because we cannot pushdown the whole INSERT query.
-- Then, the merge query becomes of the form:
-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) citus_insert_select_subquery
--
-- So, while repartitioning the source query, we perform a single-shard read
-- query because we read from an intermediate result and we then partition it
-- across the nodes. For the read part, we increment query_execution_single_shard
-- because we go through distributed planning if there are read_intermediate_result()
-- calls in a query, so it happens to be a distributed plan and goes through our
-- CustomScan callbacks. For the repartitioning of the intermediate result, just
-- as usual, we don't increment any counters.
--
-- Then, the final merge query happens between the distributed table and the
-- colocated intermediate result, so this increments query_execution_multi_shard
-- by 1.
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
1, 2
);
truncate dist_table;
CALL exec_query_and_check_query_counters($$
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i
$$,
0, 1
);
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 0
);
-- same with explain analyze, not supported today
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
MERGE INTO dist_table AS t
USING uncolocated_dist_table AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
truncate dist_table, ref_table, uncolocated_dist_table;
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i;
insert into uncolocated_dist_table (a, b) select i, i from generate_series(1, 95) as i;
insert into ref_table (a, b) select i, i from generate_series(33, 128) as i;
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT uncolocated_dist_table.a, uncolocated_dist_table.b
FROM uncolocated_dist_table JOIN ref_table ON uncolocated_dist_table.a = ref_table.a
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
truncate dist_table, dist_table_1;
insert into dist_table (a, b) select i, i from generate_series(1, 128) as i;
insert into dist_table_1 (a, b) select i, i from generate_series(1, 95) as i;
-- Not ideal but since this contains both distributed and reference tables,
-- we directly decide partitioning for the source instead of pulling it to
-- the query node and repartitioning from there.
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b
FROM dist_table_1 JOIN ref_table ON dist_table_1.a = ref_table.a
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 2
);
-- pushable
CALL exec_query_and_check_query_counters($$
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- same with explain
CALL exec_query_and_check_query_counters($$
EXPLAIN
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 0
);
-- same with explain analyze
CALL exec_query_and_check_query_counters($$
EXPLAIN (ANALYZE)
WITH cte AS (
SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1
)
MERGE INTO dist_table AS t
USING cte AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- pushable
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING (SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1) AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- pushable
CALL exec_query_and_check_query_counters($$
MERGE INTO dist_table AS t
USING dist_table_1 AS s ON t.a = s.a
WHEN MATCHED THEN
UPDATE SET b = s.b
WHEN NOT MATCHED THEN
INSERT (a, b) VALUES (s.a, s.b)
$$,
0, 1
);
-- citus_stat_counters lists all the databases that currently exist
SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database);
-- verify that we cannot execute citus_stat_counters_reset() from a non-superuser
SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();