diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 536f80291..38fadb0f3 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -2727,11 +2727,15 @@ CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId) ExprContext *econtext = GetPerTupleExprContext(estate); econtext->ecxt_scantuple = slot; const bool nonPublishableData = false; + + /* we don't track query counters when distributing a table */ + const bool trackQueryCounters = false; DestReceiver *copyDest = (DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId, columnNameList, partitionColumnIndex, - estate, NULL, nonPublishableData); + estate, NULL, nonPublishableData, + trackQueryCounters); /* initialise state for writing to shards, we'll open connections on demand */ copyDest->rStartup(copyDest, 0, sourceTupleDescriptor); diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 758e8694f..d83e7c1fb 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -106,6 +106,7 @@ #include "distributed/resource_lock.h" #include "distributed/shard_pruning.h" #include "distributed/shared_connection_stats.h" +#include "distributed/stat_counters.h" #include "distributed/transmit.h" #include "distributed/version_compat.h" #include "distributed/worker_protocol.h" @@ -499,10 +500,14 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletion *completionTag) /* set up the destination for the COPY */ const bool publishableData = true; + + /* we want to track query counters for "COPY (to) distributed-table .." commands */ + const bool trackQueryCounters = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, partitionColumnIndex, executorState, NULL, - publishableData); + publishableData, + trackQueryCounters); /* if the user specified an explicit append-to_shard option, write to it */ uint64 appendShardId = ProcessAppendToShardOption(tableId, copyStatement); @@ -1877,11 +1882,15 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) * of intermediate results that are co-located with the actual table. * The names of the intermediate results with be of the form: * intermediateResultIdPrefix_ + * + * If trackQueryCounters is true, the COPY will increment the query stat + * counters as needed at the end of the COPY. */ CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColumnIndex, EState *executorState, - char *intermediateResultIdPrefix, bool isPublishable) + char *intermediateResultIdPrefix, bool isPublishable, + bool trackQueryCounters) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) palloc0( sizeof(CitusCopyDestReceiver)); @@ -1901,6 +1910,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->memoryContext = CurrentMemoryContext; copyDest->isPublishable = isPublishable; + copyDest->trackQueryCounters = trackQueryCounters; return copyDest; } @@ -2587,8 +2597,9 @@ ShardIdForTuple(CitusCopyDestReceiver *copyDest, Datum *columnValues, bool *colu /* * CitusCopyDestReceiverShutdown implements the rShutdown interface of - * CitusCopyDestReceiver. It ends the COPY on all the open connections and closes - * the relation. + * CitusCopyDestReceiver. It ends the COPY on all the open connections, closes + * the relation and increments the query stat counters based on the shards + * copied into if requested. */ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) @@ -2599,6 +2610,26 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) ListCell *connectionStateCell = NULL; Relation distributedRelation = copyDest->distributedRelation; + /* + * Increment the query stat counters based on the shards copied into + * if requested. + */ + if (copyDest->trackQueryCounters) + { + int copiedShardCount = + copyDest->shardStateHash ? + hash_get_num_entries(copyDest->shardStateHash) : + 0; + if (copiedShardCount <= 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + } + List *connectionStateList = ConnectionStateList(connectionStateHash); FinishLocalColocatedIntermediateFiles(copyDest); @@ -3141,6 +3172,15 @@ CitusCopyTo(CopyStmt *copyStatement, QueryCompletion *completionTag) SendCopyEnd(copyOutState); + if (list_length(shardIntervalList) <= 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + table_close(distributedRelation, AccessShareLock); if (completionTag != NULL) diff --git a/src/backend/distributed/executor/citus_custom_scan.c b/src/backend/distributed/executor/citus_custom_scan.c index 16ac6a9cd..89503b0c9 100644 --- a/src/backend/distributed/executor/citus_custom_scan.c +++ b/src/backend/distributed/executor/citus_custom_scan.c @@ -221,18 +221,6 @@ CitusBeginScan(CustomScanState *node, EState *estate, int eflags) CitusBeginModifyScan(node, estate, eflags); } - /* - * For INSERT..SELECT / MERGE via coordinator or re-partitioning, we - * increment the stat counters in the respective ExecCustomScan functions. - */ - if (IsMultiTaskPlan(distributedPlan)) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } /* * If there is force_delgation functions' distribution argument set, @@ -275,8 +263,19 @@ CitusExecScan(CustomScanState *node) if (!scanState->finishedRemoteScan) { + bool isMultiTaskPlan = IsMultiTaskPlan(scanState->distributedPlan); + AdaptiveExecutor(scanState); + if (isMultiTaskPlan) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + scanState->finishedRemoteScan = true; } diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index 032168bfb..d53f60594 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -116,23 +116,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) GetDistributedPlan((CustomScan *) selectPlan->planTree); Job *distSelectJob = distSelectPlan->workerJob; List *distSelectTaskList = distSelectJob->taskList; - - if (list_length(distSelectTaskList) <= 1) - { - /* - * Probably we will never get here for a repartitioned - * INSERT..SELECT because when the source is a single shard - * table, we should most probably choose to use - * MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this - * here. - */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - bool randomAccess = true; bool interTransactions = false; bool binaryFormat = @@ -196,6 +179,22 @@ NonPushableInsertSelectExecScan(CustomScanState *node) targetRelation, binaryFormat); + if (list_length(distSelectTaskList) <= 1) + { + /* + * Probably we will never get here for a repartitioned + * INSERT..SELECT because when the source is a single shard + * table, we should most probably choose to use + * MODIFY_WITH_SELECT_VIA_COORDINATOR, but we still keep this + * here. + */ + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + /* * At this point select query has been executed on workers and results * have been fetched in such a way that they are colocated with corresponding @@ -207,15 +206,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) redistributedResults, binaryFormat); - if (list_length(taskList) <= 1) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); TupleDesc tupleDescriptor = ScanStateGetTupleDescriptor(scanState); @@ -225,6 +215,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node) taskList, tupleDest, hasReturning); + if (list_length(taskList) <= 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + executorState->es_processed = rowsInserted; if (SortReturning && hasReturning) @@ -278,15 +277,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) } } - if (list_length(prunedTaskList) <= 1) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - if (prunedTaskList != NIL) { bool randomAccess = true; @@ -308,6 +298,15 @@ NonPushableInsertSelectExecScan(CustomScanState *node) SortTupleStore(scanState); } } + + if (list_length(prunedTaskList) <= 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } } else { @@ -316,29 +315,6 @@ NonPushableInsertSelectExecScan(CustomScanState *node) ExecutePlanIntoRelation(targetRelationId, insertTargetList, selectPlan, executorState); - - /* - * At this point, we already incremented the query counters for the SELECT - * query indirectly via ExecutePlanIntoRelation() (if needed), so now we - * need to increment the counters for the INSERT query as well. - */ - if (IsCitusTable(targetRelationId)) - { - if (HasDistributionKey(targetRelationId)) - { - /* - * We assume it's a multi-shard insert if the table has a - * distribution column. Although this may not be true, e.g., - * when all the data we read from source goes to the same - * shard of the target table, we cannot know that in advance. - */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - } } scanState->finishedRemoteScan = true; @@ -372,6 +348,12 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); + /* + * We don't track query counters for the COPY commands that are executed to + * prepare intermediate results. + */ + const bool trackQueryCounters = false; + /* set up a DestReceiver that copies into the intermediate table */ const bool publishableData = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, @@ -379,7 +361,8 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, partitionColumnIndex, executorState, intermediateResultIdPrefix, - publishableData); + publishableData, + trackQueryCounters); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); @@ -408,13 +391,20 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, int partitionColumnIndex = PartitionColumnIndexFromColumnList(targetRelationId, columnNameList); + /* + * We want to track query counters for the COPY commands that are executed to + * perform the final INSERT for such INSERT..SELECT queries. + */ + const bool trackQueryCounters = true; + /* set up a DestReceiver that copies into the distributed table */ const bool publishableData = true; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, columnNameList, partitionColumnIndex, executorState, NULL, - publishableData); + publishableData, + trackQueryCounters); ExecutePlanIntoDestReceiver(selectPlan, paramListInfo, (DestReceiver *) copyDest); diff --git a/src/backend/distributed/executor/merge_executor.c b/src/backend/distributed/executor/merge_executor.c index cf62a063f..2b2f20451 100644 --- a/src/backend/distributed/executor/merge_executor.c +++ b/src/backend/distributed/executor/merge_executor.c @@ -126,16 +126,6 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) GetDistributedPlan((CustomScan *) sourcePlan->planTree); Job *distSourceJob = distSourcePlan->workerJob; List *distSourceTaskList = distSourceJob->taskList; - - if (list_length(distSourceTaskList) <= 1) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - bool binaryFormat = CanUseBinaryCopyFormatForTargetList(sourceQuery->targetList); @@ -177,6 +167,21 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) distSourceTaskList, partitionColumnIndex, targetRelation, binaryFormat); + if (list_length(distSourceTaskList) <= 1) + { + /* + * Probably we will never get here for a repartitioned MERGE + * because when the source is a single shard table, we should + * most probably choose to use ExecuteSourceAtCoordAndRedistribution(), + * but we still keep this here. + */ + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + ereport(DEBUG1, (errmsg("Executing final MERGE on workers using " "intermediate results"))); @@ -192,15 +197,6 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) redistributedResults, binaryFormat); - if (list_length(taskList) <= 1) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - scanState->tuplestorestate = tuplestore_begin_heap(randomAccess, interTransactions, work_mem); ParamListInfo paramListInfo = executorState->es_param_list_info; @@ -213,6 +209,16 @@ ExecuteSourceAtWorkerAndRepartition(CitusScanState *scanState) tupleDest, hasReturning, paramListInfo); + + if (list_length(taskList) <= 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + executorState->es_processed = rowsMerged; } @@ -305,18 +311,13 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) prunedTaskList = list_concat(prunedTaskList, emptySourceTaskList); } - if (list_length(prunedTaskList) <= 1) - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - if (prunedTaskList == NIL) { - /* No task to execute */ + /* + * No task to execute, but we still increment STAT_QUERY_EXECUTION_SINGLE_SHARD + * as per our convention. + */ + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); return; } @@ -336,6 +337,16 @@ ExecuteSourceAtCoordAndRedistribution(CitusScanState *scanState) tupleDest, hasReturning, paramListInfo); + + if (list_length(prunedTaskList) == 1) + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); + } + else + { + IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); + } + executorState->es_processed = rowsMerged; } @@ -361,6 +372,12 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId, List *columnNameList = BuildColumnNameListFromTargetList(targetRelationId, sourceTargetList); + /* + * We don't track query counters for the COPY commands that are executed to + * prepare intermediate results. + */ + const bool trackQueryCounters = false; + /* set up a DestReceiver that copies into the intermediate file */ const bool publishableData = false; CitusCopyDestReceiver *copyDest = CreateCitusCopyDestReceiver(targetRelationId, @@ -368,36 +385,14 @@ ExecuteMergeSourcePlanIntoColocatedIntermediateResults(Oid targetRelationId, partitionColumnIndex, executorState, intermediateResultIdPrefix, - publishableData); + publishableData, + trackQueryCounters); /* We can skip when writing to intermediate files */ copyDest->skipCoercions = true; ExecutePlanIntoDestReceiver(sourcePlan, paramListInfo, (DestReceiver *) copyDest); - /* - * At this point, we already incremented the query counters for the SELECT - * query indirectly via ExecutePlanIntoDestReceiver() (if needed), so now we - * need to increment the counters for the MERGE query as well. - */ - if (IsCitusTable(targetRelationId)) - { - if (HasDistributionKey(targetRelationId)) - { - /* - * We assume it's a multi-shard insert if the table has a - * distribution column. Although this may not be true, e.g., - * when all the data we read from source goes to the same - * shard of the target table, we cannot know that in advance. - */ - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_MULTI_SHARD); - } - else - { - IncrementStatCounterForMyDb(STAT_QUERY_EXECUTION_SINGLE_SHARD); - } - } - executorState->es_processed = copyDest->tuplesSent; XactModificationLevel = XACT_MODIFICATION_DATA; diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 1fc42df60..594ee311c 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -154,6 +154,11 @@ typedef struct CitusCopyDestReceiver * when merging into the target tables. */ bool skipCoercions; + + /* + * Determines whether the COPY command should track query stat counters. + */ + bool trackQueryCounters; } CitusCopyDestReceiver; @@ -170,7 +175,8 @@ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, int partitionColumnIndex, EState *executorState, char *intermediateResultPrefix, - bool isPublishable); + bool isPublishable, + bool trackQueryCounters); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern bool CanUseBinaryCopyFormat(TupleDesc tupleDescription); extern bool CanUseBinaryCopyFormatForTargetList(List *targetEntryList); diff --git a/src/include/distributed/stat_counters.h b/src/include/distributed/stat_counters.h index f25588519..c673c062c 100644 --- a/src/include/distributed/stat_counters.h +++ b/src/include/distributed/stat_counters.h @@ -26,10 +26,24 @@ */ typedef enum { + /* + * These are mainly tracked by connection_management.c and + * adaptive_executor.c. + */ STAT_CONNECTION_ESTABLISHMENT_SUCCEEDED, STAT_CONNECTION_ESTABLISHMENT_FAILED, STAT_CONNECTION_REUSED, + /* + * These are maintained by ExecCustomScan methods implemented + * for CustomScan nodes provided by Citus to account for actual + * execution of the queries and subplans. By maintaining these + * counters in ExecCustomScan callbacks, we ensure avoid + * incrementing them for plain EXPLAIN (i.e., without ANALYZE). + * queries. And, prefering the executor methods rather than the + * planner methods helps us capture the execution of prepared + * statements too. + */ STAT_QUERY_EXECUTION_SINGLE_SHARD, STAT_QUERY_EXECUTION_MULTI_SHARD, diff --git a/src/test/regress/expected/stat_counters.out b/src/test/regress/expected/stat_counters.out index d9be266c9..0558a3fad 100644 --- a/src/test/regress/expected/stat_counters.out +++ b/src/test/regress/expected/stat_counters.out @@ -59,9 +59,8 @@ SELECT citus_stat_counters_reset(null); (1 row) --- citus_stat_counters lists all the databases that currently exist, --- so we should get 5 rows here. -SELECT COUNT(*) = 5 FROM citus_stat_counters; +-- citus_stat_counters lists all the databases that currently exist +SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database); ?column? --------------------------------------------------------------------- t @@ -476,6 +475,131 @@ SELECT citus_add_local_table_to_metadata('citus_local'); (1 row) GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user; +-- test copy while we're superuser +-- cannot call copy via exec_query_and_check_query_counters +SET citus.enable_stat_counters TO true; +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy dist_table(a) from program 'seq 1'; -- single shard +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 1 | 0 +(1 row) + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy dist_table(a) from program 'seq 2'; -- multi-shard +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 0 | 1 +(1 row) + +-- load some data +insert into dist_table (a, b) select i, i from generate_series(1, 2) as i; +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy dist_table to stdout; +1 \N +1 \N +1 1 +2 \N +2 2 +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 0 | 1 +(1 row) + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy (select * from dist_table join citus_local on dist_table.a = citus_local.a) to stdout; +1 \N 1 1 +1 \N 1 1 +1 1 1 1 +2 \N 2 2 +2 2 2 2 +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy dist_table to :'temp_dir''stat_counters_dist_table_dump'; +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 0 | 1 +(1 row) + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset +copy dist_table from :'temp_dir''stat_counters_dist_table_dump'; +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + query_execution_single_shard_diff | query_execution_multi_shard_diff +--------------------------------------------------------------------- + 0 | 1 +(1 row) + +-- empty the table before rest of the tests +truncate dist_table; \c stat_counters_test_db postgres - :master_port -- reset from another database as superuser SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = 'regression'; @@ -541,6 +665,23 @@ CALL exec_query_and_check_query_counters($$ $$, 0, 1 ); +-- same with explain +-- +-- Explain without analyze should never increment the counters. +-- This also applies to all such tests in this file. +CALL exec_query_and_check_query_counters($$ + EXPLAIN + SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a + $$, + 0, 0 +); +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a + $$, + 0, 1 +); SET citus.enable_repartition_joins TO true; -- A repartition join only increments query_execution_multi_shard once, although -- this doesn't feel so much ideal. @@ -568,6 +709,22 @@ CALL exec_query_and_check_query_counters($$ $$, 1, 1 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q + $$, + 0, 0 +); +-- same with explain analyze +-- +-- this time, query_execution_multi_shard is incremented twice because of #4212 +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q + $$, + 1, 2 +); CALL exec_query_and_check_query_counters($$ DELETE FROM dist_table WHERE a = 1 $$, @@ -595,12 +752,13 @@ CALL exec_query_and_check_query_counters($$ $$, 0, 1 ); --- single-shard inserts +-- multi-shard insert CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3) $$, - 1, 0 + 0, 1 ); +-- single-shard insert CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table (a, b) VALUES (-4, -4) $$, @@ -645,7 +803,7 @@ CALL exec_query_and_check_query_counters($$ ); -- Select query is multi-shard and the same is also true for the final insert -- but only if it doesn't prune to zero shards, which happens when the source --- table is empty. So here, we both query_execution_multi_shard and +-- table is empty. So here, both query_execution_multi_shard and -- query_execution_single_shard are incremented by 1. CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table SELECT * FROM uncolocated_dist_table @@ -665,6 +823,20 @@ CALL exec_query_and_check_query_counters($$ $$, 1, 0 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO single_shard SELECT * FROM single_shard_1 + $$, + 0, 0 +); +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO single_shard SELECT * FROM single_shard_1 + $$, + 1, 0 +); CALL exec_query_and_check_query_counters($$ INSERT INTO single_shard SELECT * FROM uncolocated_single_shard $$, @@ -724,6 +896,165 @@ CALL exec_query_and_check_query_counters($$ $$, 1, 1 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO citus_local (a, b) SELECT * FROM dist_table + $$, + 0, 0 +); +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO citus_local (a, b) SELECT * FROM dist_table + $$, + 1, 1 +); +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands via coordinator +CONTEXT: SQL statement " + EXPLAIN (ANALYZE) + INSERT INTO citus_local (a, b) SELECT * FROM dist_table + " +PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE +insert into dist_table_1 (a, b) values (1, 1), (2, 2), (3, 3); +-- First, we pull the select (multi-shard) query to the query node and create an +-- intermediate results for it because we cannot pushdown the whole INSERT query. +-- Then, the select query becomes of the form: +-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) intermediate_result +-- +-- So, while repartitioning the select query, we perform a single-shard read +-- query because we read from an intermediate result and we then partition it +-- across the nodes. For the read part, we increment query_execution_single_shard +-- because we go through distributed planning if there are read_intermediate_result() +-- calls in a query, so it happens to be a distributed plan and goes through our +-- CustomScan callbacks. For the repartitioning of the intermediate result, just +-- as usual, we don't increment any counters. +-- +-- Then, the final insert query happens between the distributed table and the +-- colocated intermediate result, so this increments query_execution_multi_shard +-- by 1. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q RETURNING * + $$, + 1, 2 +); +-- Same query but without RETURNING - this goes through a different code path, but +-- the counters are still incremented the same way as above. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + $$, + 1, 2 +); +-- Same query but inserting a single row makes the final query single-shard too. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q + $$, + 2, 1 +); +-- A similar query but with a cte. +-- Subplan execution for the cte, additionally, first increments query_execution_multi_shard +-- for "SELECT * FROM dist_table" when creating the intermediate result for it and then +-- query_execution_single_shard for; +-- +-- EXCEPT +-- SELECT i as a, i as b FROM generate_series(10, 32) AS i +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + INSERT INTO dist_table + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); +-- the same query but this time the cte is part of the select, not the insert +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 0, 0 +); +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); +ERROR: EXPLAIN ANALYZE is currently not supported for INSERT ... SELECT commands via coordinator +CONTEXT: SQL statement " + EXPLAIN (ANALYZE) + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + " +PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE +-- A similar one but without the insert, so we would normally expect 2 increments +-- for query_execution_single_shard and 2 for query_execution_multi_shard instead +-- of 3 since the insert is not there anymore. +-- +-- But this time we observe more counter increments because we execute the subplans +-- twice because of #4212. +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + -- single-shard subplan (whole cte) + WITH cte AS ( + -- multi-shard subplan (lhs of EXCEPT) + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) + JOIN cte ON q.a = cte.a + $$, + 3, 4 +); -- safe to push-down CALL exec_query_and_check_query_counters($$ SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo @@ -743,9 +1074,199 @@ CALL exec_query_and_check_query_counters($$ 0, 1 ); RESET citus.local_table_join_policy; --- citus_stat_counters lists all the databases that currently exist, --- so we should get 5 rows here. -SELECT COUNT(*) = 5 FROM citus_stat_counters; +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING dist_table_1 AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); +-- First, we pull the merge (multi-shard) query to the query node and create an +-- intermediate results for it because we cannot pushdown the whole INSERT query. +-- Then, the merge query becomes of the form: +-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) citus_insert_select_subquery +-- +-- So, while repartitioning the source query, we perform a single-shard read +-- query because we read from an intermediate result and we then partition it +-- across the nodes. For the read part, we increment query_execution_single_shard +-- because we go through distributed planning if there are read_intermediate_result() +-- calls in a query, so it happens to be a distributed plan and goes through our +-- CustomScan callbacks. For the repartitioning of the intermediate result, just +-- as usual, we don't increment any counters. +-- +-- Then, the final merge query happens between the distributed table and the +-- colocated intermediate result, so this increments query_execution_multi_shard +-- by 1. +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 1, 2 +); +truncate dist_table; +CALL exec_query_and_check_query_counters($$ + insert into dist_table (a, b) select i, i from generate_series(1, 128) as i + $$, + 0, 1 +); +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 0 +); +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); +ERROR: EXPLAIN ANALYZE is currently not supported for MERGE INTO ... commands with repartitioning +CONTEXT: SQL statement " + EXPLAIN (ANALYZE) + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + " +PL/pgSQL function exec_query_and_check_query_counters(text,bigint,bigint) line XX at EXECUTE +truncate dist_table, ref_table, uncolocated_dist_table; +insert into dist_table (a, b) select i, i from generate_series(1, 128) as i; +insert into uncolocated_dist_table (a, b) select i, i from generate_series(1, 95) as i; +insert into ref_table (a, b) select i, i from generate_series(33, 128) as i; +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT uncolocated_dist_table.a, uncolocated_dist_table.b + FROM uncolocated_dist_table JOIN ref_table ON uncolocated_dist_table.a = ref_table.a + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); +truncate dist_table, dist_table_1; +insert into dist_table (a, b) select i, i from generate_series(1, 128) as i; +insert into dist_table_1 (a, b) select i, i from generate_series(1, 95) as i; +-- Not ideal but since this contains both distributed and reference tables, +-- we directly decide partitioning for the source instead of pulling it to +-- the query node and repartitioning from there. +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b + FROM dist_table_1 JOIN ref_table ON dist_table_1.a = ref_table.a + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); +-- pushable +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 0 +); +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); +-- pushable +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING (SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1) AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); +-- pushable +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING dist_table_1 AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); +-- citus_stat_counters lists all the databases that currently exist +SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database); ?column? --------------------------------------------------------------------- t diff --git a/src/test/regress/sql/stat_counters.sql b/src/test/regress/sql/stat_counters.sql index b28f727e6..8726ecfed 100644 --- a/src/test/regress/sql/stat_counters.sql +++ b/src/test/regress/sql/stat_counters.sql @@ -38,9 +38,8 @@ SET citus.enable_stat_counters TO false; SELECT citus_stat_counters(null); SELECT citus_stat_counters_reset(null); --- citus_stat_counters lists all the databases that currently exist, --- so we should get 5 rows here. -SELECT COUNT(*) = 5 FROM citus_stat_counters; +-- citus_stat_counters lists all the databases that currently exist +SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database); -- Verify that providing an oid that doesn't correspond to any database -- returns an empty set. We know that "SELECT MAX(oid)+1 FROM pg_database" @@ -245,6 +244,113 @@ SELECT citus_add_local_table_to_metadata('citus_local'); GRANT ALL ON ALL TABLES IN SCHEMA stat_counters TO stat_counters_test_user; +-- test copy while we're superuser +-- cannot call copy via exec_query_and_check_query_counters + +SET citus.enable_stat_counters TO true; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy dist_table(a) from program 'seq 1'; -- single shard + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy dist_table(a) from program 'seq 2'; -- multi-shard + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +-- load some data +insert into dist_table (a, b) select i, i from generate_series(1, 2) as i; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy dist_table to stdout; + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy (select * from dist_table join citus_local on dist_table.a = citus_local.a) to stdout; + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy dist_table to :'temp_dir''stat_counters_dist_table_dump'; + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +SELECT query_execution_single_shard AS old_query_execution_single_shard, + query_execution_multi_shard AS old_query_execution_multi_shard +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q \gset + +copy dist_table from :'temp_dir''stat_counters_dist_table_dump'; + +SELECT query_execution_single_shard - :old_query_execution_single_shard AS query_execution_single_shard_diff, + query_execution_multi_shard - :old_query_execution_multi_shard AS query_execution_multi_shard_diff +FROM ( + SELECT (citus_stat_counters(oid)).* + FROM pg_database WHERE datname = current_database() +) q; + +-- empty the table before rest of the tests +truncate dist_table; + \c stat_counters_test_db postgres - :master_port -- reset from another database as superuser @@ -308,6 +414,25 @@ CALL exec_query_and_check_query_counters($$ 0, 1 ); +-- same with explain +-- +-- Explain without analyze should never increment the counters. +-- This also applies to all such tests in this file. +CALL exec_query_and_check_query_counters($$ + EXPLAIN + SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a + $$, + 0, 0 +); + +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + SELECT * FROM dist_table JOIN dist_table_1 ON dist_table.a = dist_table_1.a + $$, + 0, 1 +); + SET citus.enable_repartition_joins TO true; -- A repartition join only increments query_execution_multi_shard once, although -- this doesn't feel so much ideal. @@ -338,6 +463,24 @@ CALL exec_query_and_check_query_counters($$ 1, 1 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q + $$, + 0, 0 +); + +-- same with explain analyze +-- +-- this time, query_execution_multi_shard is incremented twice because of #4212 +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + SELECT * FROM (SELECT * FROM dist_table OFFSET 0) q + $$, + 1, 2 +); + CALL exec_query_and_check_query_counters($$ DELETE FROM dist_table WHERE a = 1 $$, @@ -369,14 +512,14 @@ CALL exec_query_and_check_query_counters($$ 0, 1 ); --- single-shard inserts - +-- multi-shard insert CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table (a, b) VALUES (-1, -1), (-2, -2), (-3, -3) $$, - 1, 0 + 0, 1 ); +-- single-shard insert CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table (a, b) VALUES (-4, -4) $$, @@ -425,7 +568,7 @@ CALL exec_query_and_check_query_counters($$ -- Select query is multi-shard and the same is also true for the final insert -- but only if it doesn't prune to zero shards, which happens when the source --- table is empty. So here, we both query_execution_multi_shard and +-- table is empty. So here, both query_execution_multi_shard and -- query_execution_single_shard are incremented by 1. CALL exec_query_and_check_query_counters($$ INSERT INTO dist_table SELECT * FROM uncolocated_dist_table @@ -449,6 +592,22 @@ CALL exec_query_and_check_query_counters($$ 1, 0 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO single_shard SELECT * FROM single_shard_1 + $$, + 0, 0 +); + +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO single_shard SELECT * FROM single_shard_1 + $$, + 1, 0 +); + CALL exec_query_and_check_query_counters($$ INSERT INTO single_shard SELECT * FROM uncolocated_single_shard $$, @@ -514,6 +673,155 @@ CALL exec_query_and_check_query_counters($$ 1, 1 ); +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO citus_local (a, b) SELECT * FROM dist_table + $$, + 0, 0 +); + +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO citus_local (a, b) SELECT * FROM dist_table + $$, + 1, 1 +); + +insert into dist_table_1 (a, b) values (1, 1), (2, 2), (3, 3); + +-- First, we pull the select (multi-shard) query to the query node and create an +-- intermediate results for it because we cannot pushdown the whole INSERT query. +-- Then, the select query becomes of the form: +-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) intermediate_result +-- +-- So, while repartitioning the select query, we perform a single-shard read +-- query because we read from an intermediate result and we then partition it +-- across the nodes. For the read part, we increment query_execution_single_shard +-- because we go through distributed planning if there are read_intermediate_result() +-- calls in a query, so it happens to be a distributed plan and goes through our +-- CustomScan callbacks. For the repartitioning of the intermediate result, just +-- as usual, we don't increment any counters. +-- +-- Then, the final insert query happens between the distributed table and the +-- colocated intermediate result, so this increments query_execution_multi_shard +-- by 1. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q RETURNING * + $$, + 1, 2 +); + +-- Same query but without RETURNING - this goes through a different code path, but +-- the counters are still incremented the same way as above. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + $$, + 1, 2 +); + +-- Same query but inserting a single row makes the final query single-shard too. +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table SELECT * FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 1) q + $$, + 2, 1 +); + +-- A similar query but with a cte. +-- Subplan execution for the cte, additionally, first increments query_execution_multi_shard +-- for "SELECT * FROM dist_table" when creating the intermediate result for it and then +-- query_execution_single_shard for; +-- +-- EXCEPT +-- SELECT i as a, i as b FROM generate_series(10, 32) AS i +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + INSERT INTO dist_table + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); + +-- the same query but this time the cte is part of the select, not the insert +CALL exec_query_and_check_query_counters($$ + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); + +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 0, 0 +); + +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + INSERT INTO dist_table + WITH cte AS ( + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q + JOIN cte ON q.a = cte.a + RETURNING * + $$, + 2, 3 +); + +-- A similar one but without the insert, so we would normally expect 2 increments +-- for query_execution_single_shard and 2 for query_execution_multi_shard instead +-- of 3 since the insert is not there anymore. +-- +-- But this time we observe more counter increments because we execute the subplans +-- twice because of #4212. +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + -- single-shard subplan (whole cte) + WITH cte AS ( + -- multi-shard subplan (lhs of EXCEPT) + SELECT * FROM dist_table + EXCEPT + SELECT i as a, i as b FROM generate_series(10, 32) AS i + ) + SELECT q.a, q.b + FROM (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) q -- multi-shard subplan (subquery q) + JOIN cte ON q.a = cte.a + $$, + 3, 4 +); + -- safe to push-down CALL exec_query_and_check_query_counters($$ SELECT * FROM (SELECT * FROM dist_table UNION SELECT * FROM dist_table) as foo @@ -536,9 +844,206 @@ CALL exec_query_and_check_query_counters($$ ); RESET citus.local_table_join_policy; --- citus_stat_counters lists all the databases that currently exist, --- so we should get 5 rows here. -SELECT COUNT(*) = 5 FROM citus_stat_counters; +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING dist_table_1 AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); + +-- First, we pull the merge (multi-shard) query to the query node and create an +-- intermediate results for it because we cannot pushdown the whole INSERT query. +-- Then, the merge query becomes of the form: +-- SELECT .. FROM (SELECT .. FROM read_intermediate_result(..)) citus_insert_select_subquery +-- +-- So, while repartitioning the source query, we perform a single-shard read +-- query because we read from an intermediate result and we then partition it +-- across the nodes. For the read part, we increment query_execution_single_shard +-- because we go through distributed planning if there are read_intermediate_result() +-- calls in a query, so it happens to be a distributed plan and goes through our +-- CustomScan callbacks. For the repartitioning of the intermediate result, just +-- as usual, we don't increment any counters. +-- +-- Then, the final merge query happens between the distributed table and the +-- colocated intermediate result, so this increments query_execution_multi_shard +-- by 1. +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING (SELECT * FROM dist_table_1 ORDER BY a LIMIT 16) AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 1, 2 +); + +truncate dist_table; + +CALL exec_query_and_check_query_counters($$ + insert into dist_table (a, b) select i, i from generate_series(1, 128) as i + $$, + 0, 1 +); + +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); + +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 0 +); + +-- same with explain analyze, not supported today +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + MERGE INTO dist_table AS t + USING uncolocated_dist_table AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); + +truncate dist_table, ref_table, uncolocated_dist_table; + +insert into dist_table (a, b) select i, i from generate_series(1, 128) as i; +insert into uncolocated_dist_table (a, b) select i, i from generate_series(1, 95) as i; +insert into ref_table (a, b) select i, i from generate_series(33, 128) as i; + +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT uncolocated_dist_table.a, uncolocated_dist_table.b + FROM uncolocated_dist_table JOIN ref_table ON uncolocated_dist_table.a = ref_table.a + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); + +truncate dist_table, dist_table_1; + +insert into dist_table (a, b) select i, i from generate_series(1, 128) as i; +insert into dist_table_1 (a, b) select i, i from generate_series(1, 95) as i; + +-- Not ideal but since this contains both distributed and reference tables, +-- we directly decide partitioning for the source instead of pulling it to +-- the query node and repartitioning from there. +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b + FROM dist_table_1 JOIN ref_table ON dist_table_1.a = ref_table.a + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 2 +); + +-- pushable +CALL exec_query_and_check_query_counters($$ + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); + +-- same with explain +CALL exec_query_and_check_query_counters($$ + EXPLAIN + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 0 +); + +-- same with explain analyze +CALL exec_query_and_check_query_counters($$ + EXPLAIN (ANALYZE) + WITH cte AS ( + SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1 + ) + MERGE INTO dist_table AS t + USING cte AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); + +-- pushable +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING (SELECT dist_table_1.a, dist_table_1.b * 2 as b FROM dist_table_1) AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); + +-- pushable +CALL exec_query_and_check_query_counters($$ + MERGE INTO dist_table AS t + USING dist_table_1 AS s ON t.a = s.a + WHEN MATCHED THEN + UPDATE SET b = s.b + WHEN NOT MATCHED THEN + INSERT (a, b) VALUES (s.a, s.b) + $$, + 0, 1 +); + +-- citus_stat_counters lists all the databases that currently exist +SELECT (SELECT COUNT(*) FROM citus_stat_counters) = (SELECT COUNT(*) FROM pg_database); -- verify that we cannot execute citus_stat_counters_reset() from a non-superuser SELECT citus_stat_counters_reset(oid) FROM pg_database WHERE datname = current_database();