diff --git a/src/backend/distributed/commands/call.c b/src/backend/distributed/commands/call.c index ef7fd887d..0cd621f68 100644 --- a/src/backend/distributed/commands/call.c +++ b/src/backend/distributed/commands/call.c @@ -28,6 +28,7 @@ #include "distributed/adaptive_executor.h" #include "distributed/reference_table_utils.h" #include "distributed/remote_commands.h" +#include "distributed/reference_table_utils.h" #include "distributed/shard_pruning.h" #include "distributed/tuple_destination.h" #include "distributed/version_compat.h" @@ -92,17 +93,16 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure, "be constant expressions"))); return false; } - CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId); Var *partitionColumn = distTable->partitionColumn; bool colocatedWithReferenceTable = false; - if (partitionColumn == NULL) + if (IsCitusTableTypeCacheEntry(distTable, REFERENCE_TABLE)) { /* This can happen if colocated with a reference table. Punt for now. */ ereport(DEBUG1, (errmsg( "will push down CALL for reference tables"))); colocatedWithReferenceTable = true; - Assert(IsReferenceTable(colocatedRelationId)); + Assert(IsCitusTableType(colocatedRelationId, REFERENCE_TABLE)); } ShardPlacement *placement = NULL; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 0ab40a844..9b4fee684 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -841,11 +841,10 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel, Oid distributionColumnType, Oid sourceRelationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; char sourceReplicationModel = sourceTableEntry->replicationModel; Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId); - if (sourceDistributionMethod != DISTRIBUTE_BY_HASH) + if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot distribute relation"), diff --git a/src/backend/distributed/commands/foreign_constraint.c b/src/backend/distributed/commands/foreign_constraint.c index b7aea3576..dd28a03a4 100644 --- a/src/backend/distributed/commands/foreign_constraint.c +++ b/src/backend/distributed/commands/foreign_constraint.c @@ -138,7 +138,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis int referencingAttrIndex = -1; - char referencedDistMethod = 0; Var *referencedDistKey = NULL; int referencedAttrIndex = -1; uint32 referencedColocationId = INVALID_COLOCATION_ID; @@ -165,11 +164,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis } /* set referenced table related variables here if table is referencing itself */ - + char referencedDistMethod = 0; if (!selfReferencingTable) { referencedDistMethod = PartitionMethod(referencedTableId); - referencedDistKey = (referencedDistMethod == DISTRIBUTE_BY_NONE) ? + referencedDistKey = IsCitusTableType(referencedTableId, + CITUS_TABLE_WITH_NO_DIST_KEY) ? NULL : DistPartitionKey(referencedTableId); referencedColocationId = TableColocationId(referencedTableId); @@ -428,7 +428,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId) * any foreign constraint from a distributed table to a local table. */ Assert(IsCitusTable(referencedTableId)); - if (PartitionMethod(referencedTableId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE)) { heapTuple = systable_getnext(scanDescriptor); continue; @@ -559,7 +559,7 @@ GetForeignKeyOidsToReferenceTables(Oid relationId) Oid referencedTableOid = constraintForm->confrelid; - if (IsReferenceTable(referencedTableOid)) + if (IsCitusTableType(referencedTableOid, REFERENCE_TABLE)) { fkeyOidsToReferenceTables = lappend_oid(fkeyOidsToReferenceTables, foreignKeyOid); diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 6571cbb07..62b7cc880 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -167,7 +167,8 @@ create_distributed_function(PG_FUNCTION_ARGS) if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0) { Oid colocationRelationId = ResolveRelationId(colocateWithText, false); - colocatedWithReferenceTable = IsReferenceTable(colocationRelationId); + colocatedWithReferenceTable = IsCitusTableType(colocationRelationId, + REFERENCE_TABLE); } } @@ -499,11 +500,10 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp Oid sourceRelationId) { CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; char sourceReplicationModel = sourceTableEntry->replicationModel; - if (sourceDistributionMethod != DISTRIBUTE_BY_HASH && - sourceDistributionMethod != DISTRIBUTE_BY_NONE) + if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE)) { char *functionName = get_func_name(functionOid); char *sourceRelationName = get_rel_name(sourceRelationId); @@ -515,7 +515,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp functionName, sourceRelationName))); } - if (sourceDistributionMethod == DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE) && distributionColumnType != InvalidOid) { char *functionName = get_func_name(functionOid); diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 80d7df499..b6a75dd45 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -800,19 +800,18 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) /* caller uses ShareLock for non-concurrent indexes, use the same lock here */ LOCKMODE lockMode = ShareLock; Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk); - char partitionMethod = PartitionMethod(relationId); bool indexContainsPartitionColumn = false; /* - * Reference tables do not have partition key, and unique constraints - * are allowed for them. Thus, we added a short-circuit for reference tables. + * Non-distributed tables do not have partition key, and unique constraints + * are allowed for them. Thus, we added a short-circuit for non-distributed tables. */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { return; } - if (partitionMethod == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("creating unique indexes on append-partitioned tables " diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 473303cc9..2b5804a25 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -363,17 +363,18 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag) } Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char partitionMethod = PartitionMethod(relationId); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); /* disallow modifications to a partition table which have rep. factor > 1 */ EnsurePartitionTableNotReplicated(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || - partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { CopyToExistingShards(copyStatement, completionTag); } - else if (partitionMethod == DISTRIBUTE_BY_APPEND) + else if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED)) { CopyToNewShards(copyStatement, completionTag, relationId); } @@ -409,7 +410,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; - char partitionMethod = 0; bool stopOnFailure = false; CopyState copyState = NULL; @@ -460,8 +460,7 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT executorTupleContext = GetPerTupleMemoryContext(executorState); executorExpressionContext = GetPerTupleExprContext(executorState); - partitionMethod = PartitionMethod(tableId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(tableId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -2150,16 +2149,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, ListCell *columnNameCell = NULL; - char partitionMethod = '\0'; - - const char *delimiterCharacter = "\t"; const char *nullPrintCharacter = "\\N"; /* look up table properties */ Relation distributedRelation = table_open(tableId, RowExclusiveLock); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId); - partitionMethod = cacheEntry->partitionMethod; copyDest->distributedRelation = distributedRelation; copyDest->tupleDescriptor = inputTupleDescriptor; @@ -2168,7 +2163,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, List *shardIntervalList = LoadShardIntervalList(tableId); if (shardIntervalList == NIL) { - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("could not find any shards into which to copy"), @@ -2187,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } /* error if any shard missing min/max values */ - if (partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && cacheEntry->hasUninitializedShardInterval) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), @@ -2248,7 +2243,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, attributeList = lappend(attributeList, columnNameValue); } - if (partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX) { ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index c8d772ae0..fdce4fff0 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -93,7 +93,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString) continue; } - if (IsReferenceTable(relationId)) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ int colocationId = CreateReferenceTableColocationId(); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index c307621c5..36ab6835a 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -86,7 +86,7 @@ PreprocessDropTableStmt(Node *node, const char *queryString) continue; } - if (IsReferenceTable(relationId)) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { /* prevent concurrent EnsureReferenceTablesExistOnAllNodes */ int colocationId = CreateReferenceTableColocationId(); @@ -1230,8 +1230,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) { Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock, false); - if (IsCitusTable(rightRelationId) && - PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { executeSequentially = true; } @@ -1266,8 +1265,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) { Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock, false); - if (IsCitusTable(rightRelationId) && - PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { executeSequentially = true; } @@ -1288,8 +1286,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command) * the distributed tables, thus contradicting our purpose of using * sequential mode. */ - if (executeSequentially && IsCitusTable(relationId) && - PartitionMethod(relationId) != DISTRIBUTE_BY_NONE && + if (executeSequentially && !IsCitusTableType(relationId, REFERENCE_TABLE) && ParallelQueryExecutedInTransaction()) { char *relationName = get_rel_name(relationId); @@ -1331,7 +1328,6 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, char *leftSchemaName = get_namespace_name(leftSchemaId); char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName); - char rightPartitionMethod = PartitionMethod(rightRelationId); List *rightShardList = LoadShardIntervalList(rightRelationId); ListCell *rightShardCell = NULL; Oid rightSchemaId = get_rel_namespace(rightRelationId); @@ -1348,7 +1344,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId, * since we only have one placement per worker. This hack is first implemented * for foreign constraint support from distributed tables to reference tables. */ - if (rightPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(rightRelationId, REFERENCE_TABLE)) { int rightShardCount = list_length(rightShardList); int leftShardCount = list_length(leftShardList); diff --git a/src/backend/distributed/commands/truncate.c b/src/backend/distributed/commands/truncate.c index a3f75520f..0bece1853 100644 --- a/src/backend/distributed/commands/truncate.c +++ b/src/backend/distributed/commands/truncate.c @@ -74,14 +74,13 @@ citus_truncate_trigger(PG_FUNCTION_ARGS) TriggerData *triggerData = (TriggerData *) fcinfo->context; Relation truncatedRelation = triggerData->tg_relation; Oid relationId = RelationGetRelid(truncatedRelation); - char partitionMethod = PartitionMethod(relationId); if (!EnableDDLPropagation) { PG_RETURN_DATUM(PointerGetDatum(NULL)); } - if (partitionMethod == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { Oid schemaId = get_rel_namespace(relationId); char *schemaName = get_namespace_name(schemaId); @@ -317,8 +316,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command) { Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK); - if (IsCitusTable(relationId) && - PartitionMethod(relationId) == DISTRIBUTE_BY_NONE && + if (IsCitusTableType(relationId, REFERENCE_TABLE) && TableReferenced(relationId)) { char *relationName = get_rel_name(relationId); diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index f9fa78d40..6ac1684a5 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -779,7 +779,7 @@ AdaptiveExecutor(CitusScanState *scanState) executorState->es_processed = execution->rowsProcessed; } else if (distributedPlan->targetRelationId != InvalidOid && - PartitionMethod(distributedPlan->targetRelationId) != DISTRIBUTE_BY_NONE) + !IsCitusTableType(distributedPlan->targetRelationId, REFERENCE_TABLE)) { /* * For reference tables we already add rowsProcessed on the local execution, @@ -1536,7 +1536,7 @@ SelectForUpdateOnReferenceTable(List *taskList) { Oid relationId = relationRowLock->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { return true; } diff --git a/src/backend/distributed/executor/distributed_execution_locks.c b/src/backend/distributed/executor/distributed_execution_locks.c index 74af023a5..bf581f26a 100644 --- a/src/backend/distributed/executor/distributed_execution_locks.c +++ b/src/backend/distributed/executor/distributed_execution_locks.c @@ -385,7 +385,7 @@ AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList) LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength; Oid relationId = relationRowLock->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { List *shardIntervalList = LoadShardIntervalList(relationId); diff --git a/src/backend/distributed/executor/distributed_intermediate_results.c b/src/backend/distributed/executor/distributed_intermediate_results.c index ec643d5b2..699a556a0 100644 --- a/src/backend/distributed/executor/distributed_intermediate_results.c +++ b/src/backend/distributed/executor/distributed_intermediate_results.c @@ -172,8 +172,8 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList, CitusTableCacheEntry *targetRelation, bool binaryFormat) { - if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH && - targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE) + if (!IsCitusTableTypeCacheEntry(targetRelation, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(targetRelation, RANGE_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("repartitioning results of a tasklist is only supported " diff --git a/src/backend/distributed/executor/insert_select_executor.c b/src/backend/distributed/executor/insert_select_executor.c index dba7755d8..c6fd32ea4 100644 --- a/src/backend/distributed/executor/insert_select_executor.c +++ b/src/backend/distributed/executor/insert_select_executor.c @@ -493,8 +493,7 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId, ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; - char partitionMethod = PartitionMethod(targetRelationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -535,8 +534,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList, ParamListInfo paramListInfo = executorState->es_param_list_info; bool stopOnFailure = false; - char partitionMethod = PartitionMethod(targetRelationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { stopOnFailure = true; } @@ -620,9 +618,8 @@ IsSupportedRedistributionTarget(Oid targetRelationId) { CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId); - /* only range and hash-distributed tables are currently supported */ - if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH && - tableEntry->partitionMethod != DISTRIBUTE_BY_RANGE) + if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED)) { return false; } diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 921b3ed7c..93c9b0dd3 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -259,6 +259,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl static void InvalidateDistTableCache(void); static void InvalidateDistObjectCache(void); static void InitializeTableCacheEntry(int64 shardId); +static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType + tableType); static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry); @@ -296,6 +298,87 @@ EnsureModificationsCanRun(void) } +/* + * IsCitusTableType returns true if the given table with relationId + * belongs to a citus table that matches the given table type. If cache + * entry already exists, prefer using IsCitusTableTypeCacheEntry to avoid + * an extra lookup. + */ +bool +IsCitusTableType(Oid relationId, CitusTableType tableType) +{ + CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId); + + /* we are not interested in postgres tables */ + if (tableEntry == NULL) + { + return false; + } + return IsCitusTableTypeInternal(tableEntry, tableType); +} + + +/* + * IsCitusTableTypeCacheEntry returns true if the given table cache entry + * belongs to a citus table that matches the given table type. + */ +bool +IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType) +{ + return IsCitusTableTypeInternal(tableEntry, tableType); +} + + +/* + * IsCitusTableTypeInternal returns true if the given table entry belongs to + * the given table type group. For definition of table types, see CitusTableType. + */ +static bool +IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType) +{ + switch (tableType) + { + case HASH_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH; + } + + case APPEND_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND; + } + + case RANGE_DISTRIBUTED: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE; + } + + case DISTRIBUTED_TABLE: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH || + tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE || + tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND; + } + + case REFERENCE_TABLE: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; + } + + case CITUS_TABLE_WITH_NO_DIST_KEY: + { + return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; + } + + default: + { + ereport(ERROR, (errmsg("Unknown table type %d", tableType))); + } + } + return false; +} + + /* * IsCitusTable returns whether relationId is a distributed relation or * not. @@ -416,9 +499,7 @@ ReferenceTableShardId(uint64 shardId) { ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId); CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry; - char partitionMethod = tableEntry->partitionMethod; - - return partitionMethod == DISTRIBUTE_BY_NONE; + return IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 86a9666a3..cbb33ac05 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -224,14 +224,12 @@ ShouldSyncTableMetadata(Oid relationId) { CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH); bool streamingReplicated = (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING); - bool mxTable = (streamingReplicated && hashDistributed); - bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE); - - if (mxTable || referenceTable) + bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry, + HASH_DISTRIBUTED)); + if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) { return true; } @@ -631,7 +629,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry) char replicationModel = cacheEntry->replicationModel; StringInfo tablePartitionKeyString = makeStringInfo(); - if (distributionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { appendStringInfo(tablePartitionKeyString, "NULL"); } diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index e2ba1cbcb..198da2d77 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -368,7 +368,7 @@ ErrorIfNotSuitableToGetSize(Oid relationId) "distributed", escapedQueryString))); } - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) && !SingleReplicatedTable(relationId)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -1401,7 +1401,7 @@ EnsureFunctionOwner(Oid functionId) void EnsureHashDistributedTable(Oid relationId) { - if (!IsHashDistributedTable(relationId)) + if (!IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("relation %s should be a " @@ -1410,23 +1410,6 @@ EnsureHashDistributedTable(Oid relationId) } -/* - * IsHashDistributedTable returns true if the given relation is - * a distributed table. - */ -bool -IsHashDistributedTable(Oid relationId) -{ - if (!IsCitusTable(relationId)) - { - return false; - } - CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(relationId); - char sourceDistributionMethod = sourceTableEntry->partitionMethod; - return sourceDistributionMethod == DISTRIBUTE_BY_HASH; -} - - /* * EnsureSuperUser check that the current user is a superuser and errors out if not. */ diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 5d733bfeb..72ea11d18 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -870,8 +870,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) errmsg("relation is not distributed"))); } - char distributionMethod = PartitionMethod(relationId); - if (distributionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { List *shardIntervalList = LoadShardIntervalList(relationId); if (shardIntervalList == NIL) @@ -881,8 +880,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS) shardInterval = (ShardInterval *) linitial(shardIntervalList); } - else if (distributionMethod == DISTRIBUTE_BY_HASH || - distributionMethod == DISTRIBUTE_BY_RANGE) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || + IsCitusTableType(relationId, RANGE_DISTRIBUTED)) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); diff --git a/src/backend/distributed/operations/delete_protocol.c b/src/backend/distributed/operations/delete_protocol.c index 48bf51222..025555ebf 100644 --- a/src/backend/distributed/operations/delete_protocol.c +++ b/src/backend/distributed/operations/delete_protocol.c @@ -163,8 +163,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) Node *whereClause = (Node *) deleteQuery->jointree->quals; Node *deleteCriteria = eval_const_expressions(NULL, whereClause); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from hash distributed table with this " @@ -173,7 +172,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS) "are not supported with master_apply_delete_command."), errhint("Use the DELETE command instead."))); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, REFERENCE_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot delete from reference table"), diff --git a/src/backend/distributed/operations/repair_shards.c b/src/backend/distributed/operations/repair_shards.c index 517e78f17..30f9b65c8 100644 --- a/src/backend/distributed/operations/repair_shards.c +++ b/src/backend/distributed/operations/repair_shards.c @@ -410,7 +410,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName, targetNodeName, targetNodePort); } - if (!IsReferenceTable(distributedTableId)) + if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE)) { /* * When copying a shard to a new node, we should first ensure that reference @@ -492,7 +492,7 @@ EnsureTableListSuitableForReplication(List *tableIdList) GetReferencingForeignConstaintCommands(tableId); if (foreignConstraintCommandList != NIL && - PartitionMethod(tableId) != DISTRIBUTE_BY_NONE) + IsCitusTableType(tableId, DISTRIBUTED_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create foreign key constraint"), @@ -850,7 +850,7 @@ CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval, char *referencedSchemaName = get_namespace_name(referencedSchemaId); char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName); - if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(referencedRelationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { referencedShardId = GetFirstShardId(referencedRelationId); } diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index e1d33ecd8..0d6851411 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -136,15 +136,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS) } } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { ereport(ERROR, (errmsg("relation \"%s\" is a hash partitioned table", relationName), errdetail("We currently don't support creating shards " "on hash-partitioned tables"))); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { ereport(ERROR, (errmsg("relation \"%s\" is a reference table", relationName), @@ -253,8 +252,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) errdetail("The underlying shard is not a regular table"))); } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || IsCitusTableType(relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId), errdetail("We currently don't support appending to shards " @@ -586,7 +585,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) relationShard->shardId = shardInterval->shardId; List *relationShardList = list_make1(relationShard); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && cacheEntry->colocationId != INVALID_COLOCATION_ID) { shardIndex = ShardIndex(shardInterval); @@ -605,12 +604,12 @@ RelationShardListForShardCreate(ShardInterval *shardInterval) continue; } - if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(fkeyRelationid, REFERENCE_TABLE)) { fkeyShardId = GetFirstShardId(fkeyRelationid); } - else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && - PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH) + else if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && + IsCitusTableType(fkeyRelationid, HASH_DISTRIBUTED)) { /* hash distributed tables should be colocated to have fkey */ Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId); @@ -726,7 +725,7 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, { referencedShardId = shardId; } - else if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE)) { referencedShardId = GetFirstShardId(referencedRelationId); } @@ -769,7 +768,6 @@ UpdateShardStatistics(int64 shardId) ShardInterval *shardInterval = LoadShardInterval(shardId); Oid relationId = shardInterval->relationId; char storageType = shardInterval->storageType; - char partitionType = PartitionMethod(relationId); bool statsOK = false; uint64 shardSize = 0; text *minValue = NULL; @@ -827,7 +825,7 @@ UpdateShardStatistics(int64 shardId) } /* only update shard min/max values for append-partitioned tables */ - if (partitionType == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { DeleteShardRow(shardId); InsertShardRow(relationId, shardId, storageType, minValue, maxValue); @@ -856,7 +854,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam StringInfo tableSizeQuery = makeStringInfo(); const uint32 unusedTableId = 1; - char partitionType = PartitionMethod(relationId); StringInfo partitionValueQuery = makeStringInfo(); PGresult *queryResult = NULL; @@ -921,7 +918,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam PQclear(queryResult); ForgetResults(connection); - if (partitionType != DISTRIBUTE_BY_APPEND) + if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED)) { /* we don't need min/max for non-append distributed tables */ return true; diff --git a/src/backend/distributed/planner/deparse_shard_query.c b/src/backend/distributed/planner/deparse_shard_query.c index 60a83a186..5cc0c5924 100644 --- a/src/backend/distributed/planner/deparse_shard_query.c +++ b/src/backend/distributed/planner/deparse_shard_query.c @@ -78,8 +78,7 @@ RebuildQueryStrings(Job *workerJob) Query *copiedSubquery = copiedSubqueryRte->subquery; /* there are no restrictions to add for reference tables */ - char partitionMethod = PartitionMethod(shardInterval->relationId); - if (partitionMethod != DISTRIBUTE_BY_NONE) + if (IsCitusTableType(shardInterval->relationId, DISTRIBUTED_TABLE)) { AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); } diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index cdcac8f81..f71f67727 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -286,37 +286,6 @@ ExtractRangeTableEntryList(Query *query) } -/* - * ExtractClassifiedRangeTableEntryList extracts reference table rte's from - * the given rte list. - * Callers of this function are responsible for passing referenceTableRTEList - * to be non-null and initially pointing to an empty list. - */ -List * -ExtractReferenceTableRTEList(List *rteList) -{ - List *referenceTableRTEList = NIL; - - RangeTblEntry *rte = NULL; - foreach_ptr(rte, rteList) - { - if (rte->rtekind != RTE_RELATION || rte->relkind != RELKIND_RELATION) - { - continue; - } - - Oid relationOid = rte->relid; - if (IsCitusTable(relationOid) && PartitionMethod(relationOid) == - DISTRIBUTE_BY_NONE) - { - referenceTableRTEList = lappend(referenceTableRTEList, rte); - } - } - - return referenceTableRTEList; -} - - /* * NeedsDistributedPlanning returns true if the Citus extension is loaded and * the query contains a distributed table. @@ -1855,7 +1824,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, cacheEntry = GetCitusTableCacheEntry(rte->relid); relationRestrictionContext->allReferenceTables &= - (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE); + IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE); } relationRestrictionContext->relationRestrictionList = diff --git a/src/backend/distributed/planner/extended_op_node_utils.c b/src/backend/distributed/planner/extended_op_node_utils.c index cb50312cd..30367fe20 100644 --- a/src/backend/distributed/planner/extended_op_node_utils.c +++ b/src/backend/distributed/planner/extended_op_node_utils.c @@ -129,9 +129,8 @@ GroupedByPartitionColumn(MultiNode *node, MultiExtendedOp *opNode) return false; } - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod != DISTRIBUTE_BY_RANGE && - partitionMethod != DISTRIBUTE_BY_HASH) + if (!IsCitusTableType(relationId, RANGE_DISTRIBUTED) && + !IsCitusTableType(relationId, HASH_DISTRIBUTED)) { /* only range- and hash-distributed tables are strictly partitioned */ return false; @@ -298,7 +297,7 @@ PartitionColumnInTableList(Var *column, List *tableNodeList) { Assert(partitionColumn->varno == tableNode->rangeTableId); - if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND) + if (!IsCitusTableType(tableNode->relationId, APPEND_DISTRIBUTED)) { return true; } diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 70d813bf5..2d45abf66 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -205,15 +205,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue) /* we don't want to deal with append/range distributed tables */ Oid distributedTableId = rangeTableEntry->relid; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH || - cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)) + if (IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED)) { return false; } /* WHERE clause should not be empty for distributed tables */ if (joinTree == NULL || - (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && joinTree->quals == NULL)) + (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals == + NULL)) { return false; } diff --git a/src/backend/distributed/planner/insert_select_planner.c b/src/backend/distributed/planner/insert_select_planner.c index aa05f9e0d..a8bc73bcf 100644 --- a/src/backend/distributed/planner/insert_select_planner.c +++ b/src/backend/distributed/planner/insert_select_planner.c @@ -541,7 +541,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, { Oid selectPartitionColumnTableId = InvalidOid; Oid targetRelationId = insertRte->relid; - char targetPartitionMethod = PartitionMethod(targetRelationId); ListCell *rangeTableCell = NULL; /* we only do this check for INSERT ... SELECT queries */ @@ -589,7 +588,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte, * If we're inserting into a reference table, all participating tables * should be reference tables as well. */ - if (targetPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(targetRelationId, REFERENCE_TABLE)) { if (!allReferenceTables) { @@ -1424,7 +1423,7 @@ NonPushableInsertSelectSupported(Query *insertSelectQuery) } RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery); - if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND) + if (IsCitusTableType(insertRte->relid, APPEND_DISTRIBUTED)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "INSERT ... SELECT into an append-distributed table is " diff --git a/src/backend/distributed/planner/multi_join_order.c b/src/backend/distributed/planner/multi_join_order.c index 7b2b2c1f4..f0c20d61e 100644 --- a/src/backend/distributed/planner/multi_join_order.c +++ b/src/backend/distributed/planner/multi_join_order.c @@ -823,12 +823,12 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, return NULL; } - char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); - char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); - - if (!IsSupportedReferenceJoin(joinType, - leftPartitionMethod == DISTRIBUTE_BY_NONE, - candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + bool leftIsReferenceTable = IsCitusTableType( + currentJoinNode->tableEntry->relationId, + REFERENCE_TABLE); + bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId, + REFERENCE_TABLE); + if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable)) { return NULL; } @@ -874,12 +874,13 @@ static JoinOrderNode * CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable, List *applicableJoinClauses, JoinType joinType) { - char candidatePartitionMethod = PartitionMethod(candidateTable->relationId); - char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId); + bool leftIsReferenceTable = IsCitusTableType( + currentJoinNode->tableEntry->relationId, + REFERENCE_TABLE); + bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId, + REFERENCE_TABLE); - if (!IsSupportedReferenceJoin(joinType, - leftPartitionMethod == DISTRIBUTE_BY_NONE, - candidatePartitionMethod == DISTRIBUTE_BY_NONE)) + if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable)) { return NULL; } @@ -1385,8 +1386,8 @@ DistPartitionKey(Oid relationId) { CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId); - /* reference tables do not have partition column */ - if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE) + /* non-distributed tables do not have partition column */ + if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { return NULL; } diff --git a/src/backend/distributed/planner/multi_logical_optimizer.c b/src/backend/distributed/planner/multi_logical_optimizer.c index f9021b8bd..5ee7e51eb 100644 --- a/src/backend/distributed/planner/multi_logical_optimizer.c +++ b/src/backend/distributed/planner/multi_logical_optimizer.c @@ -4276,10 +4276,8 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode, * We need to check that task results don't overlap. We can only do this * if table is range partitioned. */ - char partitionMethod = PartitionMethod(relationId); - - if (partitionMethod == DISTRIBUTE_BY_RANGE || - partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) || + IsCitusTableType(relationId, HASH_DISTRIBUTED)) { Var *tablePartitionColumn = tableNode->partitionColumn; diff --git a/src/backend/distributed/planner/multi_logical_planner.c b/src/backend/distributed/planner/multi_logical_planner.c index fc6b7d4d6..745c9e802 100644 --- a/src/backend/distributed/planner/multi_logical_planner.c +++ b/src/backend/distributed/planner/multi_logical_planner.c @@ -227,10 +227,10 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList) FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column); /* - * If the expression belongs to a reference table continue searching for + * If the expression belongs to a non-distributed table continue searching for * other partition keys. */ - if (IsCitusTable(relationId) && PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -341,8 +341,7 @@ bool IsDistributedTableRTE(Node *node) { Oid relationId = NodeTryGetRteRelid(node); - return relationId != InvalidOid && IsCitusTable(relationId) && - PartitionMethod(relationId) != DISTRIBUTE_BY_NONE; + return relationId != InvalidOid && IsCitusTableType(relationId, DISTRIBUTED_TABLE); } @@ -354,7 +353,7 @@ bool IsReferenceTableRTE(Node *node) { Oid relationId = NodeTryGetRteRelid(node); - return relationId != InvalidOid && IsReferenceTable(relationId); + return relationId != InvalidOid && IsCitusTableType(relationId, REFERENCE_TABLE); } @@ -1021,12 +1020,11 @@ ErrorHintRequired(const char *errorHint, Query *queryTree) foreach(relationIdCell, distributedRelationIdList) { Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { continue; } - else if (partitionMethod == DISTRIBUTE_BY_HASH) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { int colocationId = TableColocationId(relationId); colocationIdList = list_append_unique_int(colocationIdList, colocationId); diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c68c5ee93..73d4d3c41 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -2141,7 +2141,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId, ListCell *shardIntervalCell = NULL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -2299,22 +2299,21 @@ ErrorIfUnsupportedShardDistribution(Query *query) foreach(relationIdCell, relationIdList) { Oid relationId = lfirst_oid(relationIdCell); - char partitionMethod = PartitionMethod(relationId); - if (partitionMethod == DISTRIBUTE_BY_RANGE) + if (IsCitusTableType(relationId, RANGE_DISTRIBUTED)) { rangeDistributedRelationCount++; nonReferenceRelations = lappend_oid(nonReferenceRelations, relationId); } - else if (partitionMethod == DISTRIBUTE_BY_HASH) + else if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { hashDistributedRelationCount++; nonReferenceRelations = lappend_oid(nonReferenceRelations, relationId); } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* do not need to handle reference tables */ + /* do not need to handle non-distributed tables */ continue; } else @@ -2426,9 +2425,9 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex, ShardInterval *shardInterval = NULL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* reference table only has one shard */ + /* non-distributed tables have only one shard */ shardInterval = cacheEntry->sortedShardIntervalArray[0]; /* only use reference table as anchor shard if none exists yet */ @@ -2537,13 +2536,13 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction; /* reference tables are always & only copartitioned with reference tables */ - if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE && - secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) && + IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) { return true; } - else if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE || - secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) || + IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY)) { return false; } @@ -2578,8 +2577,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId) * different values for the same value. int vs bigint can be given as an * example. */ - if (firstTableCache->partitionMethod == DISTRIBUTE_BY_HASH || - secondTableCache->partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED)) { return false; } @@ -3562,7 +3561,7 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList) { return false; } - return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE; + return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE); } @@ -3734,15 +3733,12 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList) if (rangeTableType == CITUS_RTE_RELATION) { Oid relationId = rangeTableEntry->relid; - char partitionMethod = PartitionMethod(relationId); Var *partitionColumn = PartitionColumn(relationId, rangeTableId); - /* reference tables do not have partition columns */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + /* non-distributed tables do not have partition columns */ + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - partitionedOnColumn = false; - - return partitionedOnColumn; + return false; } if (partitionColumn->varattno == column->varattno) @@ -3927,7 +3923,7 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva CitusTableCacheEntry *intervalRelation = GetCitusTableCacheEntry(firstInterval->relationId); - Assert(intervalRelation->partitionMethod != DISTRIBUTE_BY_NONE); + Assert(IsCitusTableTypeCacheEntry(intervalRelation, DISTRIBUTED_TABLE)); FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction; Oid collation = intervalRelation->partitionColumn->varcollid; diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index f1efcef02..18151ba15 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -299,15 +299,14 @@ List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex) { Oid relationId = shardInterval->relationId; - char partitionMethod = PartitionMethod(shardInterval->relationId); Var *partitionColumn = NULL; - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { partitionColumn = MakeInt4Column(); } - else if (partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod == - DISTRIBUTE_BY_APPEND) + else if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) || IsCitusTableType( + relationId, APPEND_DISTRIBUTED)) { Assert(rteIndex > 0); partitionColumn = PartitionColumn(relationId, rteIndex); @@ -1134,7 +1133,6 @@ MultiShardModifyQuerySupported(Query *originalQuery, DeferredErrorMessage *errorMessage = NULL; RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery); Oid resultRelationOid = resultRangeTable->relid; - char resultPartitionMethod = PartitionMethod(resultRelationOid); if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree)) { @@ -1151,7 +1149,7 @@ MultiShardModifyQuerySupported(Query *originalQuery, "tables must not be VOLATILE", NULL, NULL); } - else if (resultPartitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(resultRelationOid, REFERENCE_TABLE)) { errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, "only reference tables may be queried when targeting " @@ -1882,9 +1880,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry( updateOrDeleteRTE->relid); - char modificationPartitionMethod = modificationTableCacheEntry->partitionMethod; - if (modificationPartitionMethod == DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry, REFERENCE_TABLE) && SelectsFromDistributedTable(rangeTableList, query)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), @@ -2009,7 +2006,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query) CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( rangeTableEntry->relid); - if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && + if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && (resultRangeTableEntry == NULL || resultRangeTableEntry->relid != rangeTableEntry->relid)) { @@ -2424,9 +2421,9 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery, { Oid relationId = ExtractFirstCitusTableId(query); - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* we don't need to do shard pruning for reference tables */ + /* we don't need to do shard pruning for non-distributed tables */ return list_make1(LoadShardIntervalList(relationId)); } @@ -2702,14 +2699,13 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) { Oid distributedTableId = ExtractFirstCitusTableId(query); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; List *modifyRouteList = NIL; ListCell *insertValuesCell = NULL; Assert(query->commandType == CMD_INSERT); /* reference tables can only have one shard */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { List *shardIntervalList = LoadShardIntervalList(distributedTableId); @@ -2808,8 +2804,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError) missingOk); } - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_RANGE) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED)) { Datum partitionValue = partitionValueConst->constvalue; @@ -3203,8 +3199,7 @@ ExtractInsertPartitionKeyValue(Query *query) uint32 rangeTableId = 1; Const *singlePartitionValueConst = NULL; - char partitionMethod = PartitionMethod(distributedTableId); - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(distributedTableId, CITUS_TABLE_WITH_NO_DIST_KEY)) { return NULL; } @@ -3336,9 +3331,7 @@ MultiRouterPlannableQuery(Query *query) continue; } - char partitionMethod = PartitionMethod(distributedTableId); - if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE)) + if (IsCitusTableType(distributedTableId, APPEND_DISTRIBUTED)) { return DeferredError( ERRCODE_FEATURE_NOT_SUPPORTED, @@ -3346,7 +3339,7 @@ MultiRouterPlannableQuery(Query *query) NULL, NULL); } - if (partitionMethod != DISTRIBUTE_BY_NONE) + if (IsCitusTableType(distributedTableId, DISTRIBUTED_TABLE)) { hasDistributedTable = true; } @@ -3361,7 +3354,8 @@ MultiRouterPlannableQuery(Query *query) uint32 tableReplicationFactor = TableShardReplicationFactor( distributedTableId); - if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE) + if (tableReplicationFactor > 1 && IsCitusTableType(distributedTableId, + DISTRIBUTED_TABLE)) { return DeferredError( ERRCODE_FEATURE_NOT_SUPPORTED, @@ -3490,13 +3484,12 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree) CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(distributedTableId); - char modificationPartitionMethod = - modificationTableCacheEntry->partitionMethod; - if (modificationPartitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry, + CITUS_TABLE_WITH_NO_DIST_KEY)) { return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED, - "cannot router plan modification of a reference table", + "cannot router plan modification of a non-distributed table", NULL, NULL); } diff --git a/src/backend/distributed/planner/query_colocation_checker.c b/src/backend/distributed/planner/query_colocation_checker.c index 60300535f..0a84ea49e 100644 --- a/src/backend/distributed/planner/query_colocation_checker.c +++ b/src/backend/distributed/planner/query_colocation_checker.c @@ -25,6 +25,7 @@ #include "distributed/query_colocation_checker.h" #include "distributed/pg_dist_partition.h" #include "distributed/relation_restriction_equivalence.h" +#include "distributed/metadata_cache.h" #include "distributed/multi_logical_planner.h" /* only to access utility functions */ #include "nodes/makefuncs.h" #include "nodes/nodeFuncs.h" @@ -153,10 +154,10 @@ AnchorRte(Query *subquery) { Oid relationId = currentRte->relid; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { /* - * Reference tables should not be the anchor rte since they + * Non-distributed tables should not be the anchor rte since they * don't have distribution key. */ continue; diff --git a/src/backend/distributed/planner/query_pushdown_planning.c b/src/backend/distributed/planner/query_pushdown_planning.c index 13b73bac3..534b05b6a 100644 --- a/src/backend/distributed/planner/query_pushdown_planning.c +++ b/src/backend/distributed/planner/query_pushdown_planning.c @@ -1445,8 +1445,7 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType) if (rangeTableEntry->rtekind == RTE_RELATION) { Oid relationId = rangeTableEntry->relid; - if (IsCitusTable(relationId) && - PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, REFERENCE_TABLE)) { *recurType = RECURRING_TUPLES_REFERENCE_TABLE; diff --git a/src/backend/distributed/planner/relation_restriction_equivalence.c b/src/backend/distributed/planner/relation_restriction_equivalence.c index aa1cf2ee2..22b1a2b30 100644 --- a/src/backend/distributed/planner/relation_restriction_equivalence.c +++ b/src/backend/distributed/planner/relation_restriction_equivalence.c @@ -605,8 +605,10 @@ ReferenceRelationCount(RelationRestrictionContext *restrictionContext) { RelationRestriction *relationRestriction = (RelationRestriction *) lfirst(relationRestrictionCell); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry( + relationRestriction->relationId); - if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)) { referenceRelationCount++; } @@ -657,8 +659,9 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList, (RelationRestriction *) lfirst(relationRestrictionCell); int rteIdentity = GetRTEIdentity(relationRestriction->rte); - /* we shouldn't check for the equality of reference tables */ - if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE) + /* we shouldn't check for the equality of non-distributed tables */ + if (IsCitusTableType(relationRestriction->relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } @@ -1721,7 +1724,7 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio { Oid relationId = relationRestriction->relationId; - if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) + if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) { continue; } diff --git a/src/backend/distributed/planner/shard_pruning.c b/src/backend/distributed/planner/shard_pruning.c index ece01419f..1eebda1dc 100644 --- a/src/backend/distributed/planner/shard_pruning.c +++ b/src/backend/distributed/planner/shard_pruning.c @@ -306,8 +306,8 @@ static int ConstraintCount(PruningTreeNode *node); * PruneShards returns all shards from a distributed table that cannot be * proven to be eliminated by whereClauseList. * - * For reference tables, the function simply returns the single shard that the - * table has. + * For non-distributed tables such as reference table, the function + * simply returns the single shard that the table has. * * When there is a single = filter in the where * clause list, the constant is written to the partitionValueConst pointer. @@ -338,8 +338,8 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList, return NIL; } - /* short circuit for reference tables */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + /* short circuit for non-distributed tables such as reference table */ + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray, cacheEntry->shardIntervalArrayLength); diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 120cea563..5c450d88b 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -74,7 +74,7 @@ partition_task_list_results(PG_FUNCTION_ARGS) */ int partitionColumnIndex = 0; - if (targetRelation->partitionMethod != DISTRIBUTE_BY_NONE && IsA( + if (IsCitusTableTypeCacheEntry(targetRelation, DISTRIBUTED_TABLE) && IsA( targetRelation->partitionColumn, Var)) { partitionColumnIndex = targetRelation->partitionColumn->varattno - 1; @@ -146,7 +146,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) * Here SELECT query's target list should match column list of target relation, * so their partition column indexes are equal. */ - int partitionColumnIndex = targetRelation->partitionMethod != DISTRIBUTE_BY_NONE ? + int partitionColumnIndex = IsCitusTableTypeCacheEntry(targetRelation, + DISTRIBUTED_TABLE) ? targetRelation->partitionColumn->varattno - 1 : 0; List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList, diff --git a/src/backend/distributed/transaction/relation_access_tracking.c b/src/backend/distributed/transaction/relation_access_tracking.c index b5dafa135..a3b56ecca 100644 --- a/src/backend/distributed/transaction/relation_access_tracking.c +++ b/src/backend/distributed/transaction/relation_access_tracking.c @@ -173,7 +173,7 @@ RecordRelationAccessIfReferenceTable(Oid relationId, ShardPlacementAccessType ac * recursively calling RecordRelationAccessBase(), so becareful about * removing this check. */ - if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(relationId, REFERENCE_TABLE)) { return; } @@ -697,7 +697,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE && + if (!(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) && cacheEntry->referencingRelationsViaForeignKey != NIL)) { return; @@ -817,7 +817,7 @@ CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessTyp } CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); - if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH && + if (!(IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && cacheEntry->referencedRelationsViaForeignKey != NIL)) { return; @@ -893,7 +893,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey) { /* we're only interested in foreign keys to reference tables */ - if (PartitionMethod(referencedRelation) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(referencedRelation, REFERENCE_TABLE)) { continue; } @@ -955,7 +955,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); bool holdsConflictingLocks = false; - Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE); + Assert(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)); Oid referencingRelation = InvalidOid; foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey) @@ -964,8 +964,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces * We're only interested in foreign keys to reference tables from * hash distributed tables. */ - if (!IsCitusTable(referencingRelation) || - PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH) + if (!IsCitusTableType(referencingRelation, HASH_DISTRIBUTED)) { continue; } diff --git a/src/backend/distributed/utils/colocation_utils.c b/src/backend/distributed/utils/colocation_utils.c index dcaa70f9d..09660f0f3 100644 --- a/src/backend/distributed/utils/colocation_utils.c +++ b/src/backend/distributed/utils/colocation_utils.c @@ -431,11 +431,12 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard return false; } - if (leftIntervalPartitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(leftShardInterval->relationId, HASH_DISTRIBUTED)) { return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval); } - else if (leftIntervalPartitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableType(leftShardInterval->relationId, + CITUS_TABLE_WITH_NO_DIST_KEY)) { /* * Reference tables has only a single shard and all reference tables @@ -921,14 +922,13 @@ ColocatedShardIntervalList(ShardInterval *shardInterval) List *colocatedShardList = NIL; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; /* - * If distribution type of the table is not hash or reference, each shard of + * If distribution type of the table is append or range, each shard of * the shard is only co-located with itself. */ - if ((partitionMethod == DISTRIBUTE_BY_APPEND) || - (partitionMethod == DISTRIBUTE_BY_RANGE)) + if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) || + IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED)) { ShardInterval *copyShardInterval = CopyShardInterval(shardInterval); diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 71eae9696..e576066ff 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -56,29 +56,6 @@ static bool AnyRelationsModifiedInTransaction(List *relationIdList); PG_FUNCTION_INFO_V1(upgrade_to_reference_table); PG_FUNCTION_INFO_V1(replicate_reference_tables); - -/* - * IsReferenceTable returns whether the given relation ID identifies a reference - * table. - */ -bool -IsReferenceTable(Oid relationId) -{ - if (!IsCitusTable(relationId)) - { - return false; - } - CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - - if (tableEntry->partitionMethod != DISTRIBUTE_BY_NONE) - { - return false; - } - - return true; -} - - /* * replicate_reference_tables is a UDF to ensure that allreference tables are * replicated to all nodes. @@ -350,7 +327,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS) CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); - if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index 08d07f257..74f0db25b 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -395,7 +395,7 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag) uint32 colocationId = citusTable->colocationId; if (colocationId == INVALID_COLOCATION_ID || - citusTable->partitionMethod != DISTRIBUTE_BY_HASH) + !IsCitusTableTypeCacheEntry(citusTable, HASH_DISTRIBUTED)) { SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId); } @@ -493,7 +493,7 @@ GetSortedReferenceShardIntervals(List *relationList) Oid relationId = InvalidOid; foreach_oid(relationId, relationList) { - if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE) + if (!IsCitusTableType(relationId, REFERENCE_TABLE)) { continue; } diff --git a/src/backend/distributed/utils/shard_utils.c b/src/backend/distributed/utils/shard_utils.c index 1bfc8940a..2de992c26 100644 --- a/src/backend/distributed/utils/shard_utils.c +++ b/src/backend/distributed/utils/shard_utils.c @@ -46,10 +46,10 @@ GetTableLocalShardOid(Oid citusTableOid, uint64 shardId) Oid GetReferenceTableLocalShardOid(Oid referenceTableOid) { - const CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid); + CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid); /* given OID should belong to a valid reference table */ - Assert(cacheEntry != NULL && cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE); + Assert(cacheEntry != NULL && IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)); const ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0]; uint64 referenceTableShardId = shardInterval->shardId; diff --git a/src/backend/distributed/utils/shardinterval_utils.c b/src/backend/distributed/utils/shardinterval_utils.c index af463644a..675c512f3 100644 --- a/src/backend/distributed/utils/shardinterval_utils.c +++ b/src/backend/distributed/utils/shardinterval_utils.c @@ -240,13 +240,14 @@ ShardIndex(ShardInterval *shardInterval) Datum shardMinValue = shardInterval->minValue; CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); - char partitionMethod = cacheEntry->partitionMethod; /* * Note that, we can also support append and range distributed tables, but * currently it is not required. */ - if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE) + if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && + !IsCitusTableTypeCacheEntry( + cacheEntry, REFERENCE_TABLE)) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("finding index of a given shard is only supported for " @@ -254,7 +255,7 @@ ShardIndex(ShardInterval *shardInterval) } /* short-circuit for reference tables */ - if (partitionMethod == DISTRIBUTE_BY_NONE) + if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { /* reference tables has only a single shard, so the index is fixed to 0 */ shardIndex = 0; @@ -279,7 +280,7 @@ FindShardInterval(Datum partitionColumnValue, CitusTableCacheEntry *cacheEntry) { Datum searchedValue = partitionColumnValue; - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { searchedValue = FunctionCall1Coll(cacheEntry->hashFunction, cacheEntry->partitionColumn->varcollid, @@ -314,9 +315,8 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) { ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray; int shardCount = cacheEntry->shardIntervalArrayLength; - char partitionMethod = cacheEntry->partitionMethod; FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction; - bool useBinarySearch = (partitionMethod != DISTRIBUTE_BY_HASH || + bool useBinarySearch = (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) || !cacheEntry->hasUniformHashDistribution); int shardIndex = INVALID_SHARD_INDEX; @@ -325,7 +325,7 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) return INVALID_SHARD_INDEX; } - if (partitionMethod == DISTRIBUTE_BY_HASH) + if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED)) { if (useBinarySearch) { @@ -352,9 +352,9 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry) shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount); } } - else if (partitionMethod == DISTRIBUTE_BY_NONE) + else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) { - /* reference tables has a single shard, all values mapped to that shard */ + /* non-distributed tables have a single shard, all values mapped to that shard */ Assert(shardCount == 1); shardIndex = 0; @@ -490,7 +490,7 @@ SingleReplicatedTable(Oid relationId) } /* for hash distributed tables, it is sufficient to only check one shard */ - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH) + if (IsCitusTableType(relationId, HASH_DISTRIBUTED)) { /* checking only for the first shard id should suffice */ uint64 shardId = *(uint64 *) linitial(shardList); diff --git a/src/backend/distributed/utils/statistics_collection.c b/src/backend/distributed/utils/statistics_collection.c index f5ce2faff..16df47da0 100644 --- a/src/backend/distributed/utils/statistics_collection.c +++ b/src/backend/distributed/utils/statistics_collection.c @@ -194,7 +194,7 @@ DistributedTablesSize(List *distTableOids) * Ignore hash partitioned tables with size greater than 1, since * citus_table_size() doesn't work on them. */ - if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH && + if (IsCitusTableType(relationId, HASH_DISTRIBUTED) && !SingleReplicatedTable(relationId)) { table_close(relation, AccessShareLock); diff --git a/src/include/distributed/distributed_planner.h b/src/include/distributed/distributed_planner.h index b385e8244..8db79a6ee 100644 --- a/src/include/distributed/distributed_planner.h +++ b/src/include/distributed/distributed_planner.h @@ -196,7 +196,6 @@ extern PlannedStmt * distributed_planner(Query *parse, extern List * ExtractRangeTableEntryList(Query *query); -extern List * ExtractReferenceTableRTEList(List *rteList); extern bool NeedsDistributedPlanning(Query *query); extern struct DistributedPlan * GetDistributedPlan(CustomScan *node); extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo, diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 341736774..a6b080315 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -116,6 +116,25 @@ typedef struct DistObjectCacheEntry int colocationId; } DistObjectCacheEntry; +typedef enum +{ + HASH_DISTRIBUTED, + APPEND_DISTRIBUTED, + RANGE_DISTRIBUTED, + + /* hash, range or append distributed table */ + DISTRIBUTED_TABLE, + + REFERENCE_TABLE, + CITUS_LOCAL_TABLE, + + /* table without a dist key such as reference table */ + CITUS_TABLE_WITH_NO_DIST_KEY +} CitusTableType; + +extern bool IsCitusTableType(Oid relationId, CitusTableType tableType); +extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType + tableType); extern bool IsCitusTable(Oid relationId); extern List * CitusTableList(void); diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 9c5019af4..c5cf04f21 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -150,7 +150,6 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode); extern void EnsureTableOwner(Oid relationId); extern void EnsureSchemaOwner(Oid schemaId); extern void EnsureHashDistributedTable(Oid relationId); -extern bool IsHashDistributedTable(Oid relationId); extern void EnsureSequenceOwner(Oid sequenceOid); extern void EnsureFunctionOwner(Oid functionId); extern void EnsureSuperUser(void); diff --git a/src/include/distributed/reference_table_utils.h b/src/include/distributed/reference_table_utils.h index 0fac3c41d..0ef44c601 100644 --- a/src/include/distributed/reference_table_utils.h +++ b/src/include/distributed/reference_table_utils.h @@ -16,7 +16,8 @@ #include "listutils.h" -extern bool IsReferenceTable(Oid relationId); +#include "distributed/metadata_cache.h" + extern void EnsureReferenceTablesExistOnAllNodes(void); extern uint32 CreateReferenceTableColocationId(void); extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);