From 366461ccdb7c1bbdfef0a8ab1c12632ee083f135 Mon Sep 17 00:00:00 2001 From: SaitTalhaNisanci Date: Wed, 2 Sep 2020 22:26:05 +0300 Subject: [PATCH] Introduce cache entry/table utilities (#4132) Introduce table entry utility functions Citus table cache entry utilities are introduced so that we can easily extend existing functionality with minimum changes, specifically changes to these functions. For example IsNonDistributedTableCacheEntry can be extended for citus local tables without the need to scan the whole codebase and update each relevant part. * Introduce utility functions to find the type of tables A table type can be a reference table, a hash/range/append distributed table. Utility methods are created so that we don't have to worry about how a table is considered as a reference table etc. This also makes it easy to extend the table types. * Add IsCitusTableType utilities * Rename IsCacheEntryCitusTableType -> IsCitusTableTypeCacheEntry * Change citus table types in some checks --- src/backend/distributed/commands/call.c | 6 +- .../commands/create_distributed_table.c | 3 +- .../distributed/commands/foreign_constraint.c | 10 +-- src/backend/distributed/commands/function.c | 10 +-- src/backend/distributed/commands/index.c | 9 +- src/backend/distributed/commands/multi_copy.c | 23 ++--- src/backend/distributed/commands/schema.c | 2 +- src/backend/distributed/commands/table.c | 14 ++- src/backend/distributed/commands/truncate.c | 6 +- .../distributed/executor/adaptive_executor.c | 4 +- .../executor/distributed_execution_locks.c | 2 +- .../distributed_intermediate_results.c | 4 +- .../executor/insert_select_executor.c | 11 +-- .../distributed/metadata/metadata_cache.c | 87 ++++++++++++++++++- .../distributed/metadata/metadata_sync.c | 10 +-- .../distributed/metadata/metadata_utility.c | 21 +---- .../distributed/metadata/node_metadata.c | 7 +- .../distributed/operations/delete_protocol.c | 5 +- .../distributed/operations/repair_shards.c | 6 +- .../distributed/operations/stage_protocol.c | 25 +++--- .../distributed/planner/deparse_shard_query.c | 3 +- .../distributed/planner/distributed_planner.c | 33 +------ .../planner/extended_op_node_utils.c | 7 +- .../planner/fast_path_router_planner.c | 7 +- .../planner/insert_select_planner.c | 5 +- .../distributed/planner/multi_join_order.c | 27 +++--- .../planner/multi_logical_optimizer.c | 6 +- .../planner/multi_logical_planner.c | 14 ++- .../planner/multi_physical_planner.c | 40 ++++----- .../planner/multi_router_planner.c | 45 ++++------ .../planner/query_colocation_checker.c | 5 +- .../planner/query_pushdown_planning.c | 3 +- .../relation_restriction_equivalence.c | 11 ++- .../distributed/planner/shard_pruning.c | 8 +- .../test/distributed_intermediate_results.c | 5 +- .../transaction/relation_access_tracking.c | 13 ++- .../distributed/utils/colocation_utils.c | 12 +-- .../distributed/utils/reference_table_utils.c | 25 +----- src/backend/distributed/utils/resource_lock.c | 4 +- src/backend/distributed/utils/shard_utils.c | 4 +- .../distributed/utils/shardinterval_utils.c | 20 ++--- .../distributed/utils/statistics_collection.c | 2 +- src/include/distributed/distributed_planner.h | 1 - src/include/distributed/metadata_cache.h | 19 ++++ src/include/distributed/metadata_utility.h | 1 - .../distributed/reference_table_utils.h | 3 +- 46 files changed, 290 insertions(+), 298 deletions(-) diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index ef7fd887d..0cd621f68 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -28,6 +28,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" +#include "distributed/reference_table_utils.h" #include "distributed/shard_pruning.h" #include "distributed/tuple_destination.h" #include "distributed/version_compat.h" @@ -92,17 +93,16 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, "be constant expressions"))); return false; } - CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; bool colocatedWithReferenceTable = false; - if (partitionColumn == NULL) + if (IsCitusTableTypeCacheEntry(distTable, REFERENCE_TABLE)) { /* This can happen if colocated with a reference table. Punt for now. */ ereport(DEBUG1, (errmsg( "will push down CALL for reference tables"))); colocatedWithReferenceTable = true; - Assert(IsReferenceTable(colocatedRelationId)); + Assert(IsCitusTableType(colocatedRelationId, REFERENCE_TABLE)); } ShardPlacement *placement = NULL; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0ab40a844..9b4fee684 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -841,11 +841,10 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, Oid distributionColumnType, Oid sourceRelationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; char sourceReplicationModel = sourceTableEntry->replicationModel; Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId); - if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) + if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation"), diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index b7aea3576..dd28a03a4 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -138,7 +138,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis int referencingAttrIndex = -1; - char referencedDistMethod = 0; Var *referencedDistKey = NULL; int referencedAttrIndex = -1; uint32 referencedColocationId = INVALID_COLOCATION_ID; @@ -165,11 +164,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis } /* set referenced table related variables here if table is referencing itself */ - + char referencedDistMethod = 0; if (!selfReferencingTable) { referencedDistMethod = PartitionMethod(referencedTableId); - referencedDistKey = (referencedDistMethod == DISTRIBUTE_BY_NONE) ? + referencedDistKey = IsCitusTableType(referencedTableId, + CITUS_TABLE_WITH_NO_DIST_KEY) ? NULL : DistPartitionKey(referencedTableId); referencedColocationId = TableColocationId(referencedTableId); @@ -428,7 +428,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId) * any foreign constraint from a distributed table to a local table. */ Assert(IsCitusTable(referencedTableId)); - if (PartitionMethod(referencedTableId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE)) { heapTuple = systable_getnext(scanDescriptor); continue; @@ -559,7 +559,7 @@ GetForeignKeyOidsToReferenceTables(Oid relationId) Oid referencedTableOid = constraintForm->confrelid; - if (IsReferenceTable(referencedTableOid)) + if (IsCitusTableType(referencedTableOid, REFERENCE_TABLE)) { fkeyOidsToReferenceTables = lappend_oid(fkeyOidsToReferenceTables, foreignKeyOid); diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 6571cbb07..62b7cc880 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -167,7 +167,8 @@ create_distributed_function(PG_FUNCTION_ARGS) if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) { Oid colocationRelationId = ResolveRelationId(colocateWithText, false); - colocatedWithReferenceTable = IsReferenceTable(colocationRelationId); + colocatedWithReferenceTable = IsCitusTableType(colocationRelationId, + REFERENCE_TABLE); } } @@ -499,11 +500,10 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp Oid sourceRelationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; char sourceReplicationModel = sourceTableEntry->replicationModel; - if (sourceDistributionMethod != DISTRIBUTE_BY_HASH && - sourceDistributionMethod != DISTRIBUTE_BY_NONE) + if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE)) { char *functionName = get_func_name(functionOid); char *sourceRelationName = get_rel_name(sourceRelationId); @@ -515,7 +515,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp functionName, sourceRelationName))); } - if (sourceDistributionMethod == DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE) && distributionColumnType != InvalidOid) { char *functionName = get_func_name(functionOid); diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 80d7df499..b6a75dd45 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -800,19 +800,18 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) /* caller uses ShareLock for non-concurrent indexes, use the same lock here */ LOCKMODE lockMode = ShareLock; Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk); - char partitionMethod = PartitionMethod(relationId); bool indexContainsPartitionColumn = false; /* - * Reference tables do not have partition key, and unique constraints - * are allowed for them. Thus, we added a short-circuit for reference tables. + * Non-distributed tables do not have partition key, and unique constraints + * are allowed for them. Thus, we added a short-circuit for non-distributed tables. */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { return; } - if (partitionMethod == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("creating unique indexes on append-partitioned tables " diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 473303cc9..2b5804a25 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -363,17 +363,18 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) } Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char partitionMethod = PartitionMethod(relationId); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); /* disallow modifications to a partition table which have rep. factor > 1 */ EnsurePartitionTableNotReplicated(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || - partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { CopyToExistingShards(copyStatement, completionTag); } - else if (partitionMethod == DISTRIBUTE_BY_APPEND) + else if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED)) { CopyToNewShards(copyStatement, completionTag, relationId); } @@ -409,7 +410,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; - char partitionMethod = 0; bool stopOnFailure = false; CopyState copyState = NULL; @@ -460,8 +460,7 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT executorTupleContext = GetPerTupleMemoryContext(executorState); executorExpressionContext = GetPerTupleExprContext(executorState); - partitionMethod = PartitionMethod(tableId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(tableId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -2150,16 +2149,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, ListCell *columnNameCell = NULL; - char partitionMethod = '\0'; - - const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; /* look up table properties */ Relation distributedRelation = table_open(tableId, RowExclusiveLock); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId); - partitionMethod = cacheEntry->partitionMethod; copyDest->distributedRelation = distributedRelation; copyDest->tupleDescriptor = inputTupleDescriptor; @@ -2168,7 +2163,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, List *shardIntervalList = LoadShardIntervalList(tableId); if (shardIntervalList == NIL) { - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not find any shards into which to copy"), @@ -2187,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } /* error if any shard missing min/max values */ - if (partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && cacheEntry->hasUninitializedShardInterval) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -2248,7 +2243,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, attributeList = lappend(attributeList, columnNameValue); } - if (partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index c8d772ae0..fdce4fff0 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -93,7 +93,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString) continue; } - if (IsReferenceTable(relationId)) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ int colocationId = CreateReferenceTableColocationId(); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index c307621c5..36ab6835a 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -86,7 +86,7 @@ PreprocessDropTableStmt(Node *node, const char *queryString) continue; } - if (IsReferenceTable(relationId)) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ int colocationId = CreateReferenceTableColocationId(); @@ -1230,8 +1230,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) { Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock, false); - if (IsCitusTable(rightRelationId) && - PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { executeSequentially = true; } @@ -1266,8 +1265,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) { Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock, false); - if (IsCitusTable(rightRelationId) && - PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { executeSequentially = true; } @@ -1288,8 +1286,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) * the distributed tables, thus contradicting our purpose of using * sequential mode. */ - if (executeSequentially && IsCitusTable(relationId) && - PartitionMethod(relationId) != DISTRIBUTE_BY_NONE && + if (executeSequentially && !IsCitusTableType(relationId, REFERENCE_TABLE) && ParallelQueryExecutedInTransaction()) { char *relationName = get_rel_name(relationId); @@ -1331,7 +1328,6 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, char *leftSchemaName = get_namespace_name(leftSchemaId); char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName); - char rightPartitionMethod = PartitionMethod(rightRelationId); List *rightShardList = LoadShardIntervalList(rightRelationId); ListCell *rightShardCell = NULL; Oid rightSchemaId = get_rel_namespace(rightRelationId); @@ -1348,7 +1344,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, * since we only have one placement per worker. This hack is first implemented * for foreign constraint support from distributed tables to reference tables. */ - if (rightPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { int rightShardCount = list_length(rightShardList); int leftShardCount = list_length(leftShardList); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index a3f75520f..0bece1853 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -74,14 +74,13 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) TriggerData *triggerData = (TriggerData *) fcinfo->context; Relation truncatedRelation = triggerData->tg_relation; Oid relationId = RelationGetRelid(truncatedRelation); - char partitionMethod = PartitionMethod(relationId); if (!EnableDDLPropagation) { PG_RETURN_DATUM(PointerGetDatum(NULL)); } - if (partitionMethod == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -317,8 +316,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command) { Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK); - if (IsCitusTable(relationId) && - PartitionMethod(relationId) == DISTRIBUTE_BY_NONE && + if (IsCitusTableType(relationId, REFERENCE_TABLE) && TableReferenced(relationId)) { char *relationName = get_rel_name(relationId); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f9fa78d40..6ac1684a5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -779,7 +779,7 @@ AdaptiveExecutor(CitusScanState *scanState) executorState->es_processed = execution->rowsProcessed; } else if (distributedPlan->targetRelationId != InvalidOid && - PartitionMethod(distributedPlan->targetRelationId) != DISTRIBUTE_BY_NONE) + !IsCitusTableType(distributedPlan->targetRelationId, REFERENCE_TABLE)) { /* * For reference tables we already add rowsProcessed on the local execution, @@ -1536,7 +1536,7 @@ SelectForUpdateOnReferenceTable(List *taskList) { Oid relationId = relationRowLock->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { return true; } diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index 74af023a5..bf581f26a 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -385,7 +385,7 @@ AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; Oid relationId = relationRowLock->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { List *shardIntervalList = LoadShardIntervalList(relationId); diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index ec643d5b2..699a556a0 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -172,8 +172,8 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, CitusTableCacheEntry *targetRelation, bool binaryFormat) { - if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH && - targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE) + if (!IsCitusTableTypeCacheEntry(targetRelation, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(targetRelation, RANGE_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("repartitioning results of a tasklist is only supported " diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index dba7755d8..c6fd32ea4 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -493,8 +493,7 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; - char partitionMethod = PartitionMethod(targetRelationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -535,8 +534,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; - char partitionMethod = PartitionMethod(targetRelationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -620,9 +618,8 @@ IsSupportedRedistributionTarget(Oid targetRelationId) { CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId); - /* only range and hash-distributed tables are currently supported */ - if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH && - tableEntry->partitionMethod != DISTRIBUTE_BY_RANGE) + if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) { return false; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 921b3ed7c..93c9b0dd3 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -259,6 +259,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl static void InvalidateDistTableCache(void); static void InvalidateDistObjectCache(void); static void InitializeTableCacheEntry(int64 shardId); +static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType + tableType); static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); @@ -296,6 +298,87 @@ EnsureModificationsCanRun(void) } +/* + * IsCitusTableType returns true if the given table with relationId + * belongs to a citus table that matches the given table type. If cache + * entry already exists, prefer using IsCitusTableTypeCacheEntry to avoid + * an extra lookup. + */ +bool +IsCitusTableType(Oid relationId, CitusTableType tableType) +{ + CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); + + /* we are not interested in postgres tables */ + if (tableEntry == NULL) + { + return false; + } + return IsCitusTableTypeInternal(tableEntry, tableType); +} + + +/* + * IsCitusTableTypeCacheEntry returns true if the given table cache entry + * belongs to a citus table that matches the given table type. + */ +bool +IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType) +{ + return IsCitusTableTypeInternal(tableEntry, tableType); +} + + +/* + * IsCitusTableTypeInternal returns true if the given table entry belongs to + * the given table type group. For definition of table types, see CitusTableType. + */ +static bool +IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType) +{ + switch (tableType) + { + case HASH_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH; + } + + case APPEND_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND; + } + + case RANGE_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE; + } + + case DISTRIBUTED_TABLE: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH || + tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE || + tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND; + } + + case REFERENCE_TABLE: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; + } + + case CITUS_TABLE_WITH_NO_DIST_KEY: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; + } + + default: + { + ereport(ERROR, (errmsg("Unknown table type %d", tableType))); + } + } + return false; +} + + /* * IsCitusTable returns whether relationId is a distributed relation or * not. @@ -416,9 +499,7 @@ ReferenceTableShardId(uint64 shardId) { ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; - char partitionMethod = tableEntry->partitionMethod; - - return partitionMethod == DISTRIBUTE_BY_NONE; + return IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 86a9666a3..cbb33ac05 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -224,14 +224,12 @@ ShouldSyncTableMetadata(Oid relationId) { CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); bool streamingReplicated = (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING); - bool mxTable = (streamingReplicated && hashDistributed); - bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); - - if (mxTable || referenceTable) + bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry, + HASH_DISTRIBUTED)); + if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) { return true; } @@ -631,7 +629,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry) char replicationModel = cacheEntry->replicationModel; StringInfo tablePartitionKeyString = makeStringInfo(); - if (distributionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { appendStringInfo(tablePartitionKeyString, "NULL"); } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e2ba1cbcb..198da2d77 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -368,7 +368,7 @@ ErrorIfNotSuitableToGetSize(Oid relationId) "distributed", escapedQueryString))); } - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) && !SingleReplicatedTable(relationId)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1401,7 +1401,7 @@ EnsureFunctionOwner(Oid functionId) void EnsureHashDistributedTable(Oid relationId) { - if (!IsHashDistributedTable(relationId)) + if (!IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("relation %s should be a " @@ -1410,23 +1410,6 @@ EnsureHashDistributedTable(Oid relationId) } -/* - * IsHashDistributedTable returns true if the given relation is - * a distributed table. - */ -bool -IsHashDistributedTable(Oid relationId) -{ - if (!IsCitusTable(relationId)) - { - return false; - } - CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(relationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; - return sourceDistributionMethod == DISTRIBUTE_BY_HASH; -} - - /* * EnsureSuperUser check that the current user is a superuser and errors out if not. */ diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5d733bfeb..72ea11d18 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -870,8 +870,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) errmsg("relation is not distributed"))); } - char distributionMethod = PartitionMethod(relationId); - if (distributionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { List *shardIntervalList = LoadShardIntervalList(relationId); if (shardIntervalList == NIL) @@ -881,8 +880,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) shardInterval = (ShardInterval *) linitial(shardIntervalList); } - else if (distributionMethod == DISTRIBUTE_BY_HASH || - distributionMethod == DISTRIBUTE_BY_RANGE) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || + IsCitusTableType(relationId, RANGE_DISTRIBUTED)) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 48bf51222..025555ebf 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -163,8 +163,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) Node *whereClause = (Node *) deleteQuery->jointree->quals; Node *deleteCriteria = eval_const_expressions(NULL, whereClause); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from hash distributed table with this " @@ -173,7 +172,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) "are not supported with master_apply_delete_command."), errhint("Use the DELETE command instead."))); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, REFERENCE_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from reference table"), diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 517e78f17..30f9b65c8 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -410,7 +410,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, targetNodeName, targetNodePort); } - if (!IsReferenceTable(distributedTableId)) + if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE)) { /* * When copying a shard to a new node, we should first ensure that reference @@ -492,7 +492,7 @@ EnsureTableListSuitableForReplication(List *tableIdList) GetReferencingForeignConstaintCommands(tableId); if (foreignConstraintCommandList != NIL && - PartitionMethod(tableId) != DISTRIBUTE_BY_NONE) + IsCitusTableType(tableId, DISTRIBUTED_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), @@ -850,7 +850,7 @@ CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, char *referencedSchemaName = get_namespace_name(referencedSchemaId); char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName); - if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(referencedRelationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { referencedShardId = GetFirstShardId(referencedRelationId); } diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index e1d33ecd8..0d6851411 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -136,15 +136,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS) } } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errmsg("relation \"%s\" is a hash partitioned table", relationName), errdetail("We currently don't support creating shards " "on hash-partitioned tables"))); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { ereport(ERROR, (errmsg("relation \"%s\" is a reference table", relationName), @@ -253,8 +252,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errdetail("The underlying shard is not a regular table"))); } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || IsCitusTableType(relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId), errdetail("We currently don't support appending to shards " @@ -586,7 +585,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) relationShard->shardId = shardInterval->shardId; List *relationShardList = list_make1(relationShard); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && cacheEntry->colocationId != INVALID_COLOCATION_ID) { shardIndex = ShardIndex(shardInterval); @@ -605,12 +604,12 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) continue; } - if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(fkeyRelationid, REFERENCE_TABLE)) { fkeyShardId = GetFirstShardId(fkeyRelationid); } - else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && - PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) + else if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && + IsCitusTableType(fkeyRelationid, HASH_DISTRIBUTED)) { /* hash distributed tables should be colocated to have fkey */ Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); @@ -726,7 +725,7 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, { referencedShardId = shardId; } - else if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE)) { referencedShardId = GetFirstShardId(referencedRelationId); } @@ -769,7 +768,6 @@ UpdateShardStatistics(int64 shardId) ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; char storageType = shardInterval->storageType; - char partitionType = PartitionMethod(relationId); bool statsOK = false; uint64 shardSize = 0; text *minValue = NULL; @@ -827,7 +825,7 @@ UpdateShardStatistics(int64 shardId) } /* only update shard min/max values for append-partitioned tables */ - if (partitionType == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { DeleteShardRow(shardId); InsertShardRow(relationId, shardId, storageType, minValue, maxValue); @@ -856,7 +854,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam StringInfo tableSizeQuery = makeStringInfo(); const uint32 unusedTableId = 1; - char partitionType = PartitionMethod(relationId); StringInfo partitionValueQuery = makeStringInfo(); PGresult *queryResult = NULL; @@ -921,7 +918,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam PQclear(queryResult); ForgetResults(connection); - if (partitionType != DISTRIBUTE_BY_APPEND) + if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { /* we don't need min/max for non-append distributed tables */ return true; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 60a83a186..5cc0c5924 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -78,8 +78,7 @@ RebuildQueryStrings(Job *workerJob) Query *copiedSubquery = copiedSubqueryRte->subquery; /* there are no restrictions to add for reference tables */ - char partitionMethod = PartitionMethod(shardInterval->relationId); - if (partitionMethod != DISTRIBUTE_BY_NONE) + if (IsCitusTableType(shardInterval->relationId, DISTRIBUTED_TABLE)) { AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index cdcac8f81..f71f67727 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -286,37 +286,6 @@ ExtractRangeTableEntryList(Query *query) } -/* - * ExtractClassifiedRangeTableEntryList extracts reference table rte's from - * the given rte list. - * Callers of this function are responsible for passing referenceTableRTEList - * to be non-null and initially pointing to an empty list. - */ -List * -ExtractReferenceTableRTEList(List *rteList) -{ - List *referenceTableRTEList = NIL; - - RangeTblEntry *rte = NULL; - foreach_ptr(rte, rteList) - { - if (rte->rtekind != RTE_RELATION || rte->relkind != RELKIND_RELATION) - { - continue; - } - - Oid relationOid = rte->relid; - if (IsCitusTable(relationOid) && PartitionMethod(relationOid) == - DISTRIBUTE_BY_NONE) - { - referenceTableRTEList = lappend(referenceTableRTEList, rte); - } - } - - return referenceTableRTEList; -} - - /* * NeedsDistributedPlanning returns true if the Citus extension is loaded and * the query contains a distributed table. @@ -1855,7 +1824,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, cacheEntry = GetCitusTableCacheEntry(rte->relid); relationRestrictionContext->allReferenceTables &= - (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE); + IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE); } relationRestrictionContext->relationRestrictionList = diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index cb50312cd..30367fe20 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -129,9 +129,8 @@ GroupedByPartitionColumn(MultiNode *node, MultiExtendedOp *opNode) return false; } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod != DISTRIBUTE_BY_RANGE && - partitionMethod != DISTRIBUTE_BY_HASH) + if (!IsCitusTableType(relationId, RANGE_DISTRIBUTED) && + !IsCitusTableType(relationId, HASH_DISTRIBUTED)) { /* only range- and hash-distributed tables are strictly partitioned */ return false; @@ -298,7 +297,7 @@ PartitionColumnInTableList(Var *column, List *tableNodeList) { Assert(partitionColumn->varno == tableNode->rangeTableId); - if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND) + if (!IsCitusTableType(tableNode->relationId, APPEND_DISTRIBUTED)) { return true; } diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 70d813bf5..2d45abf66 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -205,15 +205,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) /* we don't want to deal with append/range distributed tables */ Oid distributedTableId = rangeTableEntry->relid; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH || - cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)) + if (IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED)) { return false; } /* WHERE clause should not be empty for distributed tables */ if (joinTree == NULL || - (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && joinTree->quals == NULL)) + (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == + NULL)) { return false; } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index aa05f9e0d..a8bc73bcf 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -541,7 +541,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, { Oid selectPartitionColumnTableId = InvalidOid; Oid targetRelationId = insertRte->relid; - char targetPartitionMethod = PartitionMethod(targetRelationId); ListCell *rangeTableCell = NULL; /* we only do this check for INSERT ... SELECT queries */ @@ -589,7 +588,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, * If we're inserting into a reference table, all participating tables * should be reference tables as well. */ - if (targetPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { if (!allReferenceTables) { @@ -1424,7 +1423,7 @@ NonPushableInsertSelectSupported(Query *insertSelectQuery) } RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); - if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(insertRte->relid, APPEND_DISTRIBUTED)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "INSERT ... SELECT into an append-distributed table is " diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 7b2b2c1f4..f0c20d61e 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -823,12 +823,12 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return NULL; } - char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); - char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); - - if (!IsSupportedReferenceJoin(joinType, - leftPartitionMethod == DISTRIBUTE_BY_NONE, - candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + bool leftIsReferenceTable = IsCitusTableType( + currentJoinNode->tableEntry->relationId, + REFERENCE_TABLE); + bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId, + REFERENCE_TABLE); + if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable)) { return NULL; } @@ -874,12 +874,13 @@ static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType) { - char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); - char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); + bool leftIsReferenceTable = IsCitusTableType( + currentJoinNode->tableEntry->relationId, + REFERENCE_TABLE); + bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId, + REFERENCE_TABLE); - if (!IsSupportedReferenceJoin(joinType, - leftPartitionMethod == DISTRIBUTE_BY_NONE, - candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable)) { return NULL; } @@ -1385,8 +1386,8 @@ DistPartitionKey(Oid relationId) { CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId); - /* reference tables do not have partition column */ - if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE) + /* non-distributed tables do not have partition column */ + if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { return NULL; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index f9021b8bd..5ee7e51eb 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -4276,10 +4276,8 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, * We need to check that task results don't overlap. We can only do this * if table is range partitioned. */ - char partitionMethod = PartitionMethod(relationId); - - if (partitionMethod == DISTRIBUTE_BY_RANGE || - partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) || + IsCitusTableType(relationId, HASH_DISTRIBUTED)) { Var *tablePartitionColumn = tableNode->partitionColumn; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fc6b7d4d6..745c9e802 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -227,10 +227,10 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column); /* - * If the expression belongs to a reference table continue searching for + * If the expression belongs to a non-distributed table continue searching for * other partition keys. */ - if (IsCitusTable(relationId) && PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -341,8 +341,7 @@ bool IsDistributedTableRTE(Node *node) { Oid relationId = NodeTryGetRteRelid(node); - return relationId != InvalidOid && IsCitusTable(relationId) && - PartitionMethod(relationId) != DISTRIBUTE_BY_NONE; + return relationId != InvalidOid && IsCitusTableType(relationId, DISTRIBUTED_TABLE); } @@ -354,7 +353,7 @@ bool IsReferenceTableRTE(Node *node) { Oid relationId = NodeTryGetRteRelid(node); - return relationId != InvalidOid && IsReferenceTable(relationId); + return relationId != InvalidOid && IsCitusTableType(relationId, REFERENCE_TABLE); } @@ -1021,12 +1020,11 @@ ErrorHintRequired(const char *errorHint, Query *queryTree) foreach(relationIdCell, distributedRelationIdList) { Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { continue; } - else if (partitionMethod == DISTRIBUTE_BY_HASH) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { int colocationId = TableColocationId(relationId); colocationIdList = list_append_unique_int(colocationIdList, colocationId); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c68c5ee93..73d4d3c41 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2141,7 +2141,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, ListCell *shardIntervalCell = NULL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -2299,22 +2299,21 @@ ErrorIfUnsupportedShardDistribution(Query *query) foreach(relationIdCell, relationIdList) { Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_RANGE) + if (IsCitusTableType(relationId, RANGE_DISTRIBUTED)) { rangeDistributedRelationCount++; nonReferenceRelations = lappend_oid(nonReferenceRelations, relationId); } - else if (partitionMethod == DISTRIBUTE_BY_HASH) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { hashDistributedRelationCount++; nonReferenceRelations = lappend_oid(nonReferenceRelations, relationId); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* do not need to handle reference tables */ + /* do not need to handle non-distributed tables */ continue; } else @@ -2426,9 +2425,9 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, ShardInterval *shardInterval = NULL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* reference table only has one shard */ + /* non-distributed tables have only one shard */ shardInterval = cacheEntry->sortedShardIntervalArray[0]; /* only use reference table as anchor shard if none exists yet */ @@ -2537,13 +2536,13 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; /* reference tables are always & only copartitioned with reference tables */ - if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE && - secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) && + IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) { return true; } - else if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE || - secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) || + IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) { return false; } @@ -2578,8 +2577,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) * different values for the same value. int vs bigint can be given as an * example. */ - if (firstTableCache->partitionMethod == DISTRIBUTE_BY_HASH || - secondTableCache->partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED)) { return false; } @@ -3562,7 +3561,7 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) { return false; } - return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE; + return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE); } @@ -3734,15 +3733,12 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList) if (rangeTableType == CITUS_RTE_RELATION) { Oid relationId = rangeTableEntry->relid; - char partitionMethod = PartitionMethod(relationId); Var *partitionColumn = PartitionColumn(relationId, rangeTableId); - /* reference tables do not have partition columns */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + /* non-distributed tables do not have partition columns */ + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - partitionedOnColumn = false; - - return partitionedOnColumn; + return false; } if (partitionColumn->varattno == column->varattno) @@ -3927,7 +3923,7 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva CitusTableCacheEntry *intervalRelation = GetCitusTableCacheEntry(firstInterval->relationId); - Assert(intervalRelation->partitionMethod != DISTRIBUTE_BY_NONE); + Assert(IsCitusTableTypeCacheEntry(intervalRelation, DISTRIBUTED_TABLE)); FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction; Oid collation = intervalRelation->partitionColumn->varcollid; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index f1efcef02..18151ba15 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -299,15 +299,14 @@ List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex) { Oid relationId = shardInterval->relationId; - char partitionMethod = PartitionMethod(shardInterval->relationId); Var *partitionColumn = NULL; - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { partitionColumn = MakeInt4Column(); } - else if (partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod == - DISTRIBUTE_BY_APPEND) + else if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) || IsCitusTableType( + relationId, APPEND_DISTRIBUTED)) { Assert(rteIndex > 0); partitionColumn = PartitionColumn(relationId, rteIndex); @@ -1134,7 +1133,6 @@ MultiShardModifyQuerySupported(Query *originalQuery, DeferredErrorMessage *errorMessage = NULL; RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery); Oid resultRelationOid = resultRangeTable->relid; - char resultPartitionMethod = PartitionMethod(resultRelationOid); if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) { @@ -1151,7 +1149,7 @@ MultiShardModifyQuerySupported(Query *originalQuery, "tables must not be VOLATILE", NULL, NULL); } - else if (resultPartitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(resultRelationOid, REFERENCE_TABLE)) { errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "only reference tables may be queried when targeting " @@ -1882,9 +1880,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry( updateOrDeleteRTE->relid); - char modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; - if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry, REFERENCE_TABLE) && SelectsFromDistributedTable(rangeTableList, query)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -2009,7 +2006,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query) CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( rangeTableEntry->relid); - if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && (resultRangeTableEntry == NULL || resultRangeTableEntry->relid != rangeTableEntry->relid)) { @@ -2424,9 +2421,9 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery, { Oid relationId = ExtractFirstCitusTableId(query); - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* we don't need to do shard pruning for reference tables */ + /* we don't need to do shard pruning for non-distributed tables */ return list_make1(LoadShardIntervalList(relationId)); } @@ -2702,14 +2699,13 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) { Oid distributedTableId = ExtractFirstCitusTableId(query); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; List *modifyRouteList = NIL; ListCell *insertValuesCell = NULL; Assert(query->commandType == CMD_INSERT); /* reference tables can only have one shard */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { List *shardIntervalList = LoadShardIntervalList(distributedTableId); @@ -2808,8 +2804,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) missingOk); } - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_RANGE) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED)) { Datum partitionValue = partitionValueConst->constvalue; @@ -3203,8 +3199,7 @@ ExtractInsertPartitionKeyValue(Query *query) uint32 rangeTableId = 1; Const *singlePartitionValueConst = NULL; - char partitionMethod = PartitionMethod(distributedTableId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(distributedTableId, CITUS_TABLE_WITH_NO_DIST_KEY)) { return NULL; } @@ -3336,9 +3331,7 @@ MultiRouterPlannableQuery(Query *query) continue; } - char partitionMethod = PartitionMethod(distributedTableId); - if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE)) + if (IsCitusTableType(distributedTableId, APPEND_DISTRIBUTED)) { return DeferredError( ERRCODE_FEATURE_NOT_SUPPORTED, @@ -3346,7 +3339,7 @@ MultiRouterPlannableQuery(Query *query) NULL, NULL); } - if (partitionMethod != DISTRIBUTE_BY_NONE) + if (IsCitusTableType(distributedTableId, DISTRIBUTED_TABLE)) { hasDistributedTable = true; } @@ -3361,7 +3354,8 @@ MultiRouterPlannableQuery(Query *query) uint32 tableReplicationFactor = TableShardReplicationFactor( distributedTableId); - if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE) + if (tableReplicationFactor > 1 && IsCitusTableType(distributedTableId, + DISTRIBUTED_TABLE)) { return DeferredError( ERRCODE_FEATURE_NOT_SUPPORTED, @@ -3490,13 +3484,12 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree) CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(distributedTableId); - char modificationPartitionMethod = - modificationTableCacheEntry->partitionMethod; - if (modificationPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry, + CITUS_TABLE_WITH_NO_DIST_KEY)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot router plan modification of a reference table", + "cannot router plan modification of a non-distributed table", NULL, NULL); } diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 60300535f..0a84ea49e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -25,6 +25,7 @@ #include "distributed/query_colocation_checker.h" #include "distributed/pg_dist_partition.h" #include "distributed/relation_restriction_equivalence.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" /* only to access utility functions */ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -153,10 +154,10 @@ AnchorRte(Query *subquery) { Oid relationId = currentRte->relid; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { /* - * Reference tables should not be the anchor rte since they + * Non-distributed tables should not be the anchor rte since they * don't have distribution key. */ continue; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 13b73bac3..534b05b6a 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -1445,8 +1445,7 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) if (rangeTableEntry->rtekind == RTE_RELATION) { Oid relationId = rangeTableEntry->relid; - if (IsCitusTable(relationId) && - PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { *recurType = RECURRING_TUPLES_REFERENCE_TABLE; diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index aa1cf2ee2..22b1a2b30 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -605,8 +605,10 @@ ReferenceRelationCount(RelationRestrictionContext *restrictionContext) { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(relationRestrictionCell); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( + relationRestriction->relationId); - if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)) { referenceRelationCount++; } @@ -657,8 +659,9 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, (RelationRestriction *) lfirst(relationRestrictionCell); int rteIdentity = GetRTEIdentity(relationRestriction->rte); - /* we shouldn't check for the equality of reference tables */ - if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE) + /* we shouldn't check for the equality of non-distributed tables */ + if (IsCitusTableType(relationRestriction->relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -1721,7 +1724,7 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio { Oid relationId = relationRestriction->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index ece01419f..1eebda1dc 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -306,8 +306,8 @@ static int ConstraintCount(PruningTreeNode *node); * PruneShards returns all shards from a distributed table that cannot be * proven to be eliminated by whereClauseList. * - * For reference tables, the function simply returns the single shard that the - * table has. + * For non-distributed tables such as reference table, the function + * simply returns the single shard that the table has. * * When there is a single = filter in the where * clause list, the constant is written to the partitionValueConst pointer. @@ -338,8 +338,8 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList, return NIL; } - /* short circuit for reference tables */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + /* short circuit for non-distributed tables such as reference table */ + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 120cea563..5c450d88b 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -74,7 +74,7 @@ partition_task_list_results(PG_FUNCTION_ARGS) */ int partitionColumnIndex = 0; - if (targetRelation->partitionMethod != DISTRIBUTE_BY_NONE && IsA( + if (IsCitusTableTypeCacheEntry(targetRelation, DISTRIBUTED_TABLE) && IsA( targetRelation->partitionColumn, Var)) { partitionColumnIndex = targetRelation->partitionColumn->varattno - 1; @@ -146,7 +146,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) * Here SELECT query's target list should match column list of target relation, * so their partition column indexes are equal. */ - int partitionColumnIndex = targetRelation->partitionMethod != DISTRIBUTE_BY_NONE ? + int partitionColumnIndex = IsCitusTableTypeCacheEntry(targetRelation, + DISTRIBUTED_TABLE) ? targetRelation->partitionColumn->varattno - 1 : 0; List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index b5dafa135..a3b56ecca 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -173,7 +173,7 @@ RecordRelationAccessIfReferenceTable(Oid relationId, ShardPlacementAccessType ac * recursively calling RecordRelationAccessBase(), so becareful about * removing this check. */ - if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(relationId, REFERENCE_TABLE)) { return; } @@ -697,7 +697,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE && + if (!(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) && cacheEntry->referencingRelationsViaForeignKey != NIL)) { return; @@ -817,7 +817,7 @@ CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessTyp } CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + if (!(IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && cacheEntry->referencedRelationsViaForeignKey != NIL)) { return; @@ -893,7 +893,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey) { /* we're only interested in foreign keys to reference tables */ - if (PartitionMethod(referencedRelation) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(referencedRelation, REFERENCE_TABLE)) { continue; } @@ -955,7 +955,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); bool holdsConflictingLocks = false; - Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE); + Assert(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)); Oid referencingRelation = InvalidOid; foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey) @@ -964,8 +964,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces * We're only interested in foreign keys to reference tables from * hash distributed tables. */ - if (!IsCitusTable(referencingRelation) || - PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH) + if (!IsCitusTableType(referencingRelation, HASH_DISTRIBUTED)) { continue; } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index dcaa70f9d..09660f0f3 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -431,11 +431,12 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard return false; } - if (leftIntervalPartitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(leftShardInterval->relationId, HASH_DISTRIBUTED)) { return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval); } - else if (leftIntervalPartitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(leftShardInterval->relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { /* * Reference tables has only a single shard and all reference tables @@ -921,14 +922,13 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) List *colocatedShardList = NIL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; /* - * If distribution type of the table is not hash or reference, each shard of + * If distribution type of the table is append or range, each shard of * the shard is only co-located with itself. */ - if ((partitionMethod == DISTRIBUTE_BY_APPEND) || - (partitionMethod == DISTRIBUTE_BY_RANGE)) + if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED)) { ShardInterval *copyShardInterval = CopyShardInterval(shardInterval); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 71eae9696..e576066ff 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -56,29 +56,6 @@ static bool AnyRelationsModifiedInTransaction(List *relationIdList); PG_FUNCTION_INFO_V1(upgrade_to_reference_table); PG_FUNCTION_INFO_V1(replicate_reference_tables); - -/* - * IsReferenceTable returns whether the given relation ID identifies a reference - * table. - */ -bool -IsReferenceTable(Oid relationId) -{ - if (!IsCitusTable(relationId)) - { - return false; - } - CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - - if (tableEntry->partitionMethod != DISTRIBUTE_BY_NONE) - { - return false; - } - - return true; -} - - /* * replicate_reference_tables is a UDF to ensure that allreference tables are * replicated to all nodes. @@ -350,7 +327,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 08d07f257..74f0db25b 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -395,7 +395,7 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag) uint32 colocationId = citusTable->colocationId; if (colocationId == INVALID_COLOCATION_ID || - citusTable->partitionMethod != DISTRIBUTE_BY_HASH) + !IsCitusTableTypeCacheEntry(citusTable, HASH_DISTRIBUTED)) { SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId); } @@ -493,7 +493,7 @@ GetSortedReferenceShardIntervals(List *relationList) Oid relationId = InvalidOid; foreach_oid(relationId, relationList) { - if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(relationId, REFERENCE_TABLE)) { continue; } diff --git a/src/backend/distributed/utils/shard_utils.c b/src/backend/distributed/utils/shard_utils.c index 1bfc8940a..2de992c26 100644 --- a/src/backend/distributed/utils/shard_utils.c +++ b/src/backend/distributed/utils/shard_utils.c @@ -46,10 +46,10 @@ GetTableLocalShardOid(Oid citusTableOid, uint64 shardId) Oid GetReferenceTableLocalShardOid(Oid referenceTableOid) { - const CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid); /* given OID should belong to a valid reference table */ - Assert(cacheEntry != NULL && cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE); + Assert(cacheEntry != NULL && IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)); const ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0]; uint64 referenceTableShardId = shardInterval->shardId; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index af463644a..675c512f3 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -240,13 +240,14 @@ ShardIndex(ShardInterval *shardInterval) Datum shardMinValue = shardInterval->minValue; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; /* * Note that, we can also support append and range distributed tables, but * currently it is not required. */ - if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE) + if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry( + cacheEntry, REFERENCE_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("finding index of a given shard is only supported for " @@ -254,7 +255,7 @@ ShardIndex(ShardInterval *shardInterval) } /* short-circuit for reference tables */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { /* reference tables has only a single shard, so the index is fixed to 0 */ shardIndex = 0; @@ -279,7 +280,7 @@ FindShardInterval(Datum partitionColumnValue, CitusTableCacheEntry *cacheEntry) { Datum searchedValue = partitionColumnValue; - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { searchedValue = FunctionCall1Coll(cacheEntry->hashFunction, cacheEntry->partitionColumn->varcollid, @@ -314,9 +315,8 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) { ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray; int shardCount = cacheEntry->shardIntervalArrayLength; - char partitionMethod = cacheEntry->partitionMethod; FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction; - bool useBinarySearch = (partitionMethod != DISTRIBUTE_BY_HASH || + bool useBinarySearch = (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || !cacheEntry->hasUniformHashDistribution); int shardIndex = INVALID_SHARD_INDEX; @@ -325,7 +325,7 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) return INVALID_SHARD_INDEX; } - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { if (useBinarySearch) { @@ -352,9 +352,9 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount); } } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* reference tables has a single shard, all values mapped to that shard */ + /* non-distributed tables have a single shard, all values mapped to that shard */ Assert(shardCount == 1); shardIndex = 0; @@ -490,7 +490,7 @@ SingleReplicatedTable(Oid relationId) } /* for hash distributed tables, it is sufficient to only check one shard */ - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { /* checking only for the first shard id should suffice */ uint64 shardId = *(uint64 *) linitial(shardList); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index f5ce2faff..16df47da0 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -194,7 +194,7 @@ DistributedTablesSize(List *distTableOids) * Ignore hash partitioned tables with size greater than 1, since * citus_table_size() doesn't work on them. */ - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) && !SingleReplicatedTable(relationId)) { table_close(relation, AccessShareLock); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index b385e8244..8db79a6ee 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -196,7 +196,6 @@ extern PlannedStmt * distributed_planner(Query *parse, extern List * ExtractRangeTableEntryList(Query *query); -extern List * ExtractReferenceTableRTEList(List *rteList); extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 341736774..a6b080315 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -116,6 +116,25 @@ typedef struct DistObjectCacheEntry int colocationId; } DistObjectCacheEntry; +typedef enum +{ + HASH_DISTRIBUTED, + APPEND_DISTRIBUTED, + RANGE_DISTRIBUTED, + + /* hash, range or append distributed table */ + DISTRIBUTED_TABLE, + + REFERENCE_TABLE, + CITUS_LOCAL_TABLE, + + /* table without a dist key such as reference table */ + CITUS_TABLE_WITH_NO_DIST_KEY +} CitusTableType; + +extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); +extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType + tableType); extern bool IsCitusTable(Oid relationId); extern List * CitusTableList(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 9c5019af4..c5cf04f21 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -150,7 +150,6 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSchemaOwner(Oid schemaId); extern void EnsureHashDistributedTable(Oid relationId); -extern bool IsHashDistributedTable(Oid relationId); extern void EnsureSequenceOwner(Oid sequenceOid); extern void EnsureFunctionOwner(Oid functionId); extern void EnsureSuperUser(void); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0fac3c41d..0ef44c601 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -16,7 +16,8 @@ #include "listutils.h" -extern bool IsReferenceTable(Oid relationId); +#include "distributed/metadata_cache.h" + extern void EnsureReferenceTablesExistOnAllNodes(void); extern uint32 CreateReferenceTableColocationId(void); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);