mirror of https://github.com/citusdata/citus.git
Introduce cache entry/table utilities (#4132)
Introduce table entry utility functions Citus table cache entry utilities are introduced so that we can easily extend existing functionality with minimum changes, specifically changes to these functions. For example IsNonDistributedTableCacheEntry can be extended for citus local tables without the need to scan the whole codebase and update each relevant part. * Introduce utility functions to find the type of tables A table type can be a reference table, a hash/range/append distributed table. Utility methods are created so that we don't have to worry about how a table is considered as a reference table etc. This also makes it easy to extend the table types. * Add IsCitusTableType utilities * Rename IsCacheEntryCitusTableType -> IsCitusTableTypeCacheEntry * Change citus table types in some checkspull/4136/head
parent
f38d24f8fb
commit
366461ccdb
|
@ -28,6 +28,7 @@
|
||||||
#include "distributed/adaptive_executor.h"
|
#include "distributed/adaptive_executor.h"
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
|
#include "distributed/reference_table_utils.h"
|
||||||
#include "distributed/shard_pruning.h"
|
#include "distributed/shard_pruning.h"
|
||||||
#include "distributed/tuple_destination.h"
|
#include "distributed/tuple_destination.h"
|
||||||
#include "distributed/version_compat.h"
|
#include "distributed/version_compat.h"
|
||||||
|
@ -92,17 +93,16 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
||||||
"be constant expressions")));
|
"be constant expressions")));
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId);
|
CitusTableCacheEntry *distTable = GetCitusTableCacheEntry(colocatedRelationId);
|
||||||
Var *partitionColumn = distTable->partitionColumn;
|
Var *partitionColumn = distTable->partitionColumn;
|
||||||
bool colocatedWithReferenceTable = false;
|
bool colocatedWithReferenceTable = false;
|
||||||
if (partitionColumn == NULL)
|
if (IsCitusTableTypeCacheEntry(distTable, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
/* This can happen if colocated with a reference table. Punt for now. */
|
/* This can happen if colocated with a reference table. Punt for now. */
|
||||||
ereport(DEBUG1, (errmsg(
|
ereport(DEBUG1, (errmsg(
|
||||||
"will push down CALL for reference tables")));
|
"will push down CALL for reference tables")));
|
||||||
colocatedWithReferenceTable = true;
|
colocatedWithReferenceTable = true;
|
||||||
Assert(IsReferenceTable(colocatedRelationId));
|
Assert(IsCitusTableType(colocatedRelationId, REFERENCE_TABLE));
|
||||||
}
|
}
|
||||||
|
|
||||||
ShardPlacement *placement = NULL;
|
ShardPlacement *placement = NULL;
|
||||||
|
|
|
@ -841,11 +841,10 @@ EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
|
||||||
Oid distributionColumnType, Oid sourceRelationId)
|
Oid distributionColumnType, Oid sourceRelationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
|
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
|
||||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
|
||||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||||
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
|
Var *sourceDistributionColumn = DistPartitionKeyOrError(sourceRelationId);
|
||||||
|
|
||||||
if (sourceDistributionMethod != DISTRIBUTE_BY_HASH)
|
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot distribute relation"),
|
errmsg("cannot distribute relation"),
|
||||||
|
|
|
@ -138,7 +138,6 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
|
|
||||||
int referencingAttrIndex = -1;
|
int referencingAttrIndex = -1;
|
||||||
|
|
||||||
char referencedDistMethod = 0;
|
|
||||||
Var *referencedDistKey = NULL;
|
Var *referencedDistKey = NULL;
|
||||||
int referencedAttrIndex = -1;
|
int referencedAttrIndex = -1;
|
||||||
uint32 referencedColocationId = INVALID_COLOCATION_ID;
|
uint32 referencedColocationId = INVALID_COLOCATION_ID;
|
||||||
|
@ -165,11 +164,12 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
}
|
}
|
||||||
|
|
||||||
/* set referenced table related variables here if table is referencing itself */
|
/* set referenced table related variables here if table is referencing itself */
|
||||||
|
char referencedDistMethod = 0;
|
||||||
if (!selfReferencingTable)
|
if (!selfReferencingTable)
|
||||||
{
|
{
|
||||||
referencedDistMethod = PartitionMethod(referencedTableId);
|
referencedDistMethod = PartitionMethod(referencedTableId);
|
||||||
referencedDistKey = (referencedDistMethod == DISTRIBUTE_BY_NONE) ?
|
referencedDistKey = IsCitusTableType(referencedTableId,
|
||||||
|
CITUS_TABLE_WITH_NO_DIST_KEY) ?
|
||||||
NULL :
|
NULL :
|
||||||
DistPartitionKey(referencedTableId);
|
DistPartitionKey(referencedTableId);
|
||||||
referencedColocationId = TableColocationId(referencedTableId);
|
referencedColocationId = TableColocationId(referencedTableId);
|
||||||
|
@ -428,7 +428,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
||||||
* any foreign constraint from a distributed table to a local table.
|
* any foreign constraint from a distributed table to a local table.
|
||||||
*/
|
*/
|
||||||
Assert(IsCitusTable(referencedTableId));
|
Assert(IsCitusTable(referencedTableId));
|
||||||
if (PartitionMethod(referencedTableId) != DISTRIBUTE_BY_NONE)
|
if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
continue;
|
continue;
|
||||||
|
@ -559,7 +559,7 @@ GetForeignKeyOidsToReferenceTables(Oid relationId)
|
||||||
|
|
||||||
Oid referencedTableOid = constraintForm->confrelid;
|
Oid referencedTableOid = constraintForm->confrelid;
|
||||||
|
|
||||||
if (IsReferenceTable(referencedTableOid))
|
if (IsCitusTableType(referencedTableOid, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
fkeyOidsToReferenceTables = lappend_oid(fkeyOidsToReferenceTables,
|
fkeyOidsToReferenceTables = lappend_oid(fkeyOidsToReferenceTables,
|
||||||
foreignKeyOid);
|
foreignKeyOid);
|
||||||
|
|
|
@ -167,7 +167,8 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
||||||
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
|
if (pg_strncasecmp(colocateWithTableName, "default", NAMEDATALEN) != 0)
|
||||||
{
|
{
|
||||||
Oid colocationRelationId = ResolveRelationId(colocateWithText, false);
|
Oid colocationRelationId = ResolveRelationId(colocateWithText, false);
|
||||||
colocatedWithReferenceTable = IsReferenceTable(colocationRelationId);
|
colocatedWithReferenceTable = IsCitusTableType(colocationRelationId,
|
||||||
|
REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -499,11 +500,10 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
|
||||||
Oid sourceRelationId)
|
Oid sourceRelationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
|
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(sourceRelationId);
|
||||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
|
||||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||||
|
|
||||||
if (sourceDistributionMethod != DISTRIBUTE_BY_HASH &&
|
if (!IsCitusTableTypeCacheEntry(sourceTableEntry, HASH_DISTRIBUTED) &&
|
||||||
sourceDistributionMethod != DISTRIBUTE_BY_NONE)
|
!IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
char *functionName = get_func_name(functionOid);
|
char *functionName = get_func_name(functionOid);
|
||||||
char *sourceRelationName = get_rel_name(sourceRelationId);
|
char *sourceRelationName = get_rel_name(sourceRelationId);
|
||||||
|
@ -515,7 +515,7 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
|
||||||
functionName, sourceRelationName)));
|
functionName, sourceRelationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (sourceDistributionMethod == DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(sourceTableEntry, REFERENCE_TABLE) &&
|
||||||
distributionColumnType != InvalidOid)
|
distributionColumnType != InvalidOid)
|
||||||
{
|
{
|
||||||
char *functionName = get_func_name(functionOid);
|
char *functionName = get_func_name(functionOid);
|
||||||
|
|
|
@ -800,19 +800,18 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
|
||||||
/* caller uses ShareLock for non-concurrent indexes, use the same lock here */
|
/* caller uses ShareLock for non-concurrent indexes, use the same lock here */
|
||||||
LOCKMODE lockMode = ShareLock;
|
LOCKMODE lockMode = ShareLock;
|
||||||
Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk);
|
Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk);
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
|
||||||
bool indexContainsPartitionColumn = false;
|
bool indexContainsPartitionColumn = false;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Reference tables do not have partition key, and unique constraints
|
* Non-distributed tables do not have partition key, and unique constraints
|
||||||
* are allowed for them. Thus, we added a short-circuit for reference tables.
|
* are allowed for them. Thus, we added a short-circuit for non-distributed tables.
|
||||||
*/
|
*/
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("creating unique indexes on append-partitioned tables "
|
errmsg("creating unique indexes on append-partitioned tables "
|
||||||
|
|
|
@ -363,17 +363,18 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletionCompat *completionTag)
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* disallow modifications to a partition table which have rep. factor > 1 */
|
/* disallow modifications to a partition table which have rep. factor > 1 */
|
||||||
EnsurePartitionTableNotReplicated(relationId);
|
EnsurePartitionTableNotReplicated(relationId);
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE ||
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
|
||||||
partitionMethod == DISTRIBUTE_BY_NONE)
|
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
|
||||||
|
IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
CopyToExistingShards(copyStatement, completionTag);
|
CopyToExistingShards(copyStatement, completionTag);
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
else if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
CopyToNewShards(copyStatement, completionTag, relationId);
|
CopyToNewShards(copyStatement, completionTag, relationId);
|
||||||
}
|
}
|
||||||
|
@ -409,7 +410,6 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
|
||||||
MemoryContext executorTupleContext = NULL;
|
MemoryContext executorTupleContext = NULL;
|
||||||
ExprContext *executorExpressionContext = NULL;
|
ExprContext *executorExpressionContext = NULL;
|
||||||
|
|
||||||
char partitionMethod = 0;
|
|
||||||
bool stopOnFailure = false;
|
bool stopOnFailure = false;
|
||||||
|
|
||||||
CopyState copyState = NULL;
|
CopyState copyState = NULL;
|
||||||
|
@ -460,8 +460,7 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
|
||||||
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
executorExpressionContext = GetPerTupleExprContext(executorState);
|
executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
partitionMethod = PartitionMethod(tableId);
|
if (IsCitusTableType(tableId, REFERENCE_TABLE))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
stopOnFailure = true;
|
stopOnFailure = true;
|
||||||
}
|
}
|
||||||
|
@ -2150,16 +2149,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
ListCell *columnNameCell = NULL;
|
ListCell *columnNameCell = NULL;
|
||||||
|
|
||||||
char partitionMethod = '\0';
|
|
||||||
|
|
||||||
|
|
||||||
const char *delimiterCharacter = "\t";
|
const char *delimiterCharacter = "\t";
|
||||||
const char *nullPrintCharacter = "\\N";
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
/* look up table properties */
|
/* look up table properties */
|
||||||
Relation distributedRelation = table_open(tableId, RowExclusiveLock);
|
Relation distributedRelation = table_open(tableId, RowExclusiveLock);
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(tableId);
|
||||||
partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
|
|
||||||
copyDest->distributedRelation = distributedRelation;
|
copyDest->distributedRelation = distributedRelation;
|
||||||
copyDest->tupleDescriptor = inputTupleDescriptor;
|
copyDest->tupleDescriptor = inputTupleDescriptor;
|
||||||
|
@ -2168,7 +2163,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
List *shardIntervalList = LoadShardIntervalList(tableId);
|
List *shardIntervalList = LoadShardIntervalList(tableId);
|
||||||
if (shardIntervalList == NIL)
|
if (shardIntervalList == NIL)
|
||||||
{
|
{
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
errmsg("could not find any shards into which to copy"),
|
errmsg("could not find any shards into which to copy"),
|
||||||
|
@ -2187,7 +2182,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* error if any shard missing min/max values */
|
/* error if any shard missing min/max values */
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
|
||||||
cacheEntry->hasUninitializedShardInterval)
|
cacheEntry->hasUninitializedShardInterval)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
@ -2248,7 +2243,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
attributeList = lappend(attributeList, columnNameValue);
|
attributeList = lappend(attributeList, columnNameValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
|
||||||
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
|
copyDest->partitionColumnIndex == INVALID_PARTITION_COLUMN_INDEX)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
|
|
@ -93,7 +93,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsReferenceTable(relationId))
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
|
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
|
||||||
int colocationId = CreateReferenceTableColocationId();
|
int colocationId = CreateReferenceTableColocationId();
|
||||||
|
|
|
@ -86,7 +86,7 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsReferenceTable(relationId))
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
|
/* prevent concurrent EnsureReferenceTablesExistOnAllNodes */
|
||||||
int colocationId = CreateReferenceTableColocationId();
|
int colocationId = CreateReferenceTableColocationId();
|
||||||
|
@ -1230,8 +1230,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
{
|
{
|
||||||
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
||||||
false);
|
false);
|
||||||
if (IsCitusTable(rightRelationId) &&
|
if (IsCitusTableType(rightRelationId, REFERENCE_TABLE))
|
||||||
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
executeSequentially = true;
|
executeSequentially = true;
|
||||||
}
|
}
|
||||||
|
@ -1266,8 +1265,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
{
|
{
|
||||||
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
||||||
false);
|
false);
|
||||||
if (IsCitusTable(rightRelationId) &&
|
if (IsCitusTableType(rightRelationId, REFERENCE_TABLE))
|
||||||
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
executeSequentially = true;
|
executeSequentially = true;
|
||||||
}
|
}
|
||||||
|
@ -1288,8 +1286,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
* the distributed tables, thus contradicting our purpose of using
|
* the distributed tables, thus contradicting our purpose of using
|
||||||
* sequential mode.
|
* sequential mode.
|
||||||
*/
|
*/
|
||||||
if (executeSequentially && IsCitusTable(relationId) &&
|
if (executeSequentially && !IsCitusTableType(relationId, REFERENCE_TABLE) &&
|
||||||
PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
|
|
||||||
ParallelQueryExecutedInTransaction())
|
ParallelQueryExecutedInTransaction())
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
@ -1331,7 +1328,6 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||||
char *leftSchemaName = get_namespace_name(leftSchemaId);
|
char *leftSchemaName = get_namespace_name(leftSchemaId);
|
||||||
char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName);
|
char *escapedLeftSchemaName = quote_literal_cstr(leftSchemaName);
|
||||||
|
|
||||||
char rightPartitionMethod = PartitionMethod(rightRelationId);
|
|
||||||
List *rightShardList = LoadShardIntervalList(rightRelationId);
|
List *rightShardList = LoadShardIntervalList(rightRelationId);
|
||||||
ListCell *rightShardCell = NULL;
|
ListCell *rightShardCell = NULL;
|
||||||
Oid rightSchemaId = get_rel_namespace(rightRelationId);
|
Oid rightSchemaId = get_rel_namespace(rightRelationId);
|
||||||
|
@ -1348,7 +1344,7 @@ InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
|
||||||
* since we only have one placement per worker. This hack is first implemented
|
* since we only have one placement per worker. This hack is first implemented
|
||||||
* for foreign constraint support from distributed tables to reference tables.
|
* for foreign constraint support from distributed tables to reference tables.
|
||||||
*/
|
*/
|
||||||
if (rightPartitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(rightRelationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
int rightShardCount = list_length(rightShardList);
|
int rightShardCount = list_length(rightShardList);
|
||||||
int leftShardCount = list_length(leftShardList);
|
int leftShardCount = list_length(leftShardList);
|
||||||
|
|
|
@ -74,14 +74,13 @@ citus_truncate_trigger(PG_FUNCTION_ARGS)
|
||||||
TriggerData *triggerData = (TriggerData *) fcinfo->context;
|
TriggerData *triggerData = (TriggerData *) fcinfo->context;
|
||||||
Relation truncatedRelation = triggerData->tg_relation;
|
Relation truncatedRelation = triggerData->tg_relation;
|
||||||
Oid relationId = RelationGetRelid(truncatedRelation);
|
Oid relationId = RelationGetRelid(truncatedRelation);
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
|
||||||
|
|
||||||
if (!EnableDDLPropagation)
|
if (!EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
PG_RETURN_DATUM(PointerGetDatum(NULL));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
Oid schemaId = get_rel_namespace(relationId);
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
char *schemaName = get_namespace_name(schemaId);
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
@ -317,8 +316,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
||||||
|
|
||||||
if (IsCitusTable(relationId) &&
|
if (IsCitusTableType(relationId, REFERENCE_TABLE) &&
|
||||||
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE &&
|
|
||||||
TableReferenced(relationId))
|
TableReferenced(relationId))
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
|
@ -779,7 +779,7 @@ AdaptiveExecutor(CitusScanState *scanState)
|
||||||
executorState->es_processed = execution->rowsProcessed;
|
executorState->es_processed = execution->rowsProcessed;
|
||||||
}
|
}
|
||||||
else if (distributedPlan->targetRelationId != InvalidOid &&
|
else if (distributedPlan->targetRelationId != InvalidOid &&
|
||||||
PartitionMethod(distributedPlan->targetRelationId) != DISTRIBUTE_BY_NONE)
|
!IsCitusTableType(distributedPlan->targetRelationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* For reference tables we already add rowsProcessed on the local execution,
|
* For reference tables we already add rowsProcessed on the local execution,
|
||||||
|
@ -1536,7 +1536,7 @@ SelectForUpdateOnReferenceTable(List *taskList)
|
||||||
{
|
{
|
||||||
Oid relationId = relationRowLock->relationId;
|
Oid relationId = relationRowLock->relationId;
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -385,7 +385,7 @@ AcquireExecutorShardLocksForRelationRowLockList(List *relationRowLockList)
|
||||||
LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength;
|
LockClauseStrength rowLockStrength = relationRowLock->rowLockStrength;
|
||||||
Oid relationId = relationRowLock->relationId;
|
Oid relationId = relationRowLock->relationId;
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
|
|
||||||
|
|
|
@ -172,8 +172,8 @@ PartitionTasklistResults(const char *resultIdPrefix, List *selectTaskList,
|
||||||
CitusTableCacheEntry *targetRelation,
|
CitusTableCacheEntry *targetRelation,
|
||||||
bool binaryFormat)
|
bool binaryFormat)
|
||||||
{
|
{
|
||||||
if (targetRelation->partitionMethod != DISTRIBUTE_BY_HASH &&
|
if (!IsCitusTableTypeCacheEntry(targetRelation, HASH_DISTRIBUTED) &&
|
||||||
targetRelation->partitionMethod != DISTRIBUTE_BY_RANGE)
|
!IsCitusTableTypeCacheEntry(targetRelation, RANGE_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("repartitioning results of a tasklist is only supported "
|
errmsg("repartitioning results of a tasklist is only supported "
|
||||||
|
|
|
@ -493,8 +493,7 @@ ExecutePlanIntoColocatedIntermediateResults(Oid targetRelationId,
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
bool stopOnFailure = false;
|
bool stopOnFailure = false;
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(targetRelationId);
|
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
stopOnFailure = true;
|
stopOnFailure = true;
|
||||||
}
|
}
|
||||||
|
@ -535,8 +534,7 @@ ExecutePlanIntoRelation(Oid targetRelationId, List *insertTargetList,
|
||||||
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
ParamListInfo paramListInfo = executorState->es_param_list_info;
|
||||||
bool stopOnFailure = false;
|
bool stopOnFailure = false;
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(targetRelationId);
|
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
stopOnFailure = true;
|
stopOnFailure = true;
|
||||||
}
|
}
|
||||||
|
@ -620,9 +618,8 @@ IsSupportedRedistributionTarget(Oid targetRelationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId);
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(targetRelationId);
|
||||||
|
|
||||||
/* only range and hash-distributed tables are currently supported */
|
if (!IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED) &&
|
||||||
if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH &&
|
!IsCitusTableTypeCacheEntry(tableEntry, RANGE_DISTRIBUTED))
|
||||||
tableEntry->partitionMethod != DISTRIBUTE_BY_RANGE)
|
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -259,6 +259,8 @@ static void InvalidateCitusTableCacheEntrySlot(CitusTableCacheEntrySlot *cacheSl
|
||||||
static void InvalidateDistTableCache(void);
|
static void InvalidateDistTableCache(void);
|
||||||
static void InvalidateDistObjectCache(void);
|
static void InvalidateDistObjectCache(void);
|
||||||
static void InitializeTableCacheEntry(int64 shardId);
|
static void InitializeTableCacheEntry(int64 shardId);
|
||||||
|
static bool IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType
|
||||||
|
tableType);
|
||||||
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
|
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry);
|
||||||
|
|
||||||
|
|
||||||
|
@ -296,6 +298,87 @@ EnsureModificationsCanRun(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusTableType returns true if the given table with relationId
|
||||||
|
* belongs to a citus table that matches the given table type. If cache
|
||||||
|
* entry already exists, prefer using IsCitusTableTypeCacheEntry to avoid
|
||||||
|
* an extra lookup.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusTableType(Oid relationId, CitusTableType tableType)
|
||||||
|
{
|
||||||
|
CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
|
/* we are not interested in postgres tables */
|
||||||
|
if (tableEntry == NULL)
|
||||||
|
{
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return IsCitusTableTypeInternal(tableEntry, tableType);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusTableTypeCacheEntry returns true if the given table cache entry
|
||||||
|
* belongs to a citus table that matches the given table type.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||||
|
{
|
||||||
|
return IsCitusTableTypeInternal(tableEntry, tableType);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusTableTypeInternal returns true if the given table entry belongs to
|
||||||
|
* the given table type group. For definition of table types, see CitusTableType.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||||
|
{
|
||||||
|
switch (tableType)
|
||||||
|
{
|
||||||
|
case HASH_DISTRIBUTED:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH;
|
||||||
|
}
|
||||||
|
|
||||||
|
case APPEND_DISTRIBUTED:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
case RANGE_DISTRIBUTED:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE;
|
||||||
|
}
|
||||||
|
|
||||||
|
case DISTRIBUTED_TABLE:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
|
tableEntry->partitionMethod == DISTRIBUTE_BY_RANGE ||
|
||||||
|
tableEntry->partitionMethod == DISTRIBUTE_BY_APPEND;
|
||||||
|
}
|
||||||
|
|
||||||
|
case REFERENCE_TABLE:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
case CITUS_TABLE_WITH_NO_DIST_KEY:
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("Unknown table type %d", tableType)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsCitusTable returns whether relationId is a distributed relation or
|
* IsCitusTable returns whether relationId is a distributed relation or
|
||||||
* not.
|
* not.
|
||||||
|
@ -416,9 +499,7 @@ ReferenceTableShardId(uint64 shardId)
|
||||||
{
|
{
|
||||||
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
|
ShardIdCacheEntry *shardIdEntry = LookupShardIdCacheEntry(shardId);
|
||||||
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
|
CitusTableCacheEntry *tableEntry = shardIdEntry->tableEntry;
|
||||||
char partitionMethod = tableEntry->partitionMethod;
|
return IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE);
|
||||||
|
|
||||||
return partitionMethod == DISTRIBUTE_BY_NONE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -224,14 +224,12 @@ ShouldSyncTableMetadata(Oid relationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
|
|
||||||
bool streamingReplicated =
|
bool streamingReplicated =
|
||||||
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
|
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
|
||||||
|
|
||||||
bool mxTable = (streamingReplicated && hashDistributed);
|
bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
|
||||||
bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE);
|
HASH_DISTRIBUTED));
|
||||||
|
if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE))
|
||||||
if (mxTable || referenceTable)
|
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -631,7 +629,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
|
||||||
char replicationModel = cacheEntry->replicationModel;
|
char replicationModel = cacheEntry->replicationModel;
|
||||||
StringInfo tablePartitionKeyString = makeStringInfo();
|
StringInfo tablePartitionKeyString = makeStringInfo();
|
||||||
|
|
||||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
appendStringInfo(tablePartitionKeyString, "NULL");
|
appendStringInfo(tablePartitionKeyString, "NULL");
|
||||||
}
|
}
|
||||||
|
|
|
@ -368,7 +368,7 @@ ErrorIfNotSuitableToGetSize(Oid relationId)
|
||||||
"distributed", escapedQueryString)));
|
"distributed", escapedQueryString)));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH &&
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED) &&
|
||||||
!SingleReplicatedTable(relationId))
|
!SingleReplicatedTable(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -1401,7 +1401,7 @@ EnsureFunctionOwner(Oid functionId)
|
||||||
void
|
void
|
||||||
EnsureHashDistributedTable(Oid relationId)
|
EnsureHashDistributedTable(Oid relationId)
|
||||||
{
|
{
|
||||||
if (!IsHashDistributedTable(relationId))
|
if (!IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("relation %s should be a "
|
errmsg("relation %s should be a "
|
||||||
|
@ -1410,23 +1410,6 @@ EnsureHashDistributedTable(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsHashDistributedTable returns true if the given relation is
|
|
||||||
* a distributed table.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
IsHashDistributedTable(Oid relationId)
|
|
||||||
{
|
|
||||||
if (!IsCitusTable(relationId))
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
CitusTableCacheEntry *sourceTableEntry = GetCitusTableCacheEntry(relationId);
|
|
||||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
|
||||||
return sourceDistributionMethod == DISTRIBUTE_BY_HASH;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* EnsureSuperUser check that the current user is a superuser and errors out if not.
|
* EnsureSuperUser check that the current user is a superuser and errors out if not.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -870,8 +870,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
errmsg("relation is not distributed")));
|
errmsg("relation is not distributed")));
|
||||||
}
|
}
|
||||||
|
|
||||||
char distributionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
if (distributionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
if (shardIntervalList == NIL)
|
if (shardIntervalList == NIL)
|
||||||
|
@ -881,8 +880,8 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||||
}
|
}
|
||||||
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
else if (IsCitusTableType(relationId, HASH_DISTRIBUTED) ||
|
||||||
distributionMethod == DISTRIBUTE_BY_RANGE)
|
IsCitusTableType(relationId, RANGE_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
|
|
|
@ -163,8 +163,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
|
||||||
Node *whereClause = (Node *) deleteQuery->jointree->quals;
|
Node *whereClause = (Node *) deleteQuery->jointree->quals;
|
||||||
Node *deleteCriteria = eval_const_expressions(NULL, whereClause);
|
Node *deleteCriteria = eval_const_expressions(NULL, whereClause);
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot delete from hash distributed table with this "
|
errmsg("cannot delete from hash distributed table with this "
|
||||||
|
@ -173,7 +172,7 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
|
||||||
"are not supported with master_apply_delete_command."),
|
"are not supported with master_apply_delete_command."),
|
||||||
errhint("Use the DELETE command instead.")));
|
errhint("Use the DELETE command instead.")));
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot delete from reference table"),
|
errmsg("cannot delete from reference table"),
|
||||||
|
|
|
@ -410,7 +410,7 @@ ReplicateColocatedShardPlacement(int64 shardId, char *sourceNodeName,
|
||||||
targetNodeName, targetNodePort);
|
targetNodeName, targetNodePort);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsReferenceTable(distributedTableId))
|
if (!IsCitusTableType(distributedTableId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* When copying a shard to a new node, we should first ensure that reference
|
* When copying a shard to a new node, we should first ensure that reference
|
||||||
|
@ -492,7 +492,7 @@ EnsureTableListSuitableForReplication(List *tableIdList)
|
||||||
GetReferencingForeignConstaintCommands(tableId);
|
GetReferencingForeignConstaintCommands(tableId);
|
||||||
|
|
||||||
if (foreignConstraintCommandList != NIL &&
|
if (foreignConstraintCommandList != NIL &&
|
||||||
PartitionMethod(tableId) != DISTRIBUTE_BY_NONE)
|
IsCitusTableType(tableId, DISTRIBUTED_TABLE))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot create foreign key constraint"),
|
errmsg("cannot create foreign key constraint"),
|
||||||
|
@ -850,7 +850,7 @@ CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
|
||||||
char *referencedSchemaName = get_namespace_name(referencedSchemaId);
|
char *referencedSchemaName = get_namespace_name(referencedSchemaId);
|
||||||
char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
|
char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
|
||||||
|
|
||||||
if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(referencedRelationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
referencedShardId = GetFirstShardId(referencedRelationId);
|
referencedShardId = GetFirstShardId(referencedRelationId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -136,15 +136,14 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("relation \"%s\" is a hash partitioned table",
|
ereport(ERROR, (errmsg("relation \"%s\" is a hash partitioned table",
|
||||||
relationName),
|
relationName),
|
||||||
errdetail("We currently don't support creating shards "
|
errdetail("We currently don't support creating shards "
|
||||||
"on hash-partitioned tables")));
|
"on hash-partitioned tables")));
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("relation \"%s\" is a reference table",
|
ereport(ERROR, (errmsg("relation \"%s\" is a reference table",
|
||||||
relationName),
|
relationName),
|
||||||
|
@ -253,8 +252,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
errdetail("The underlying shard is not a regular table")));
|
errdetail("The underlying shard is not a regular table")));
|
||||||
}
|
}
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED) || IsCitusTableType(relationId,
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE)
|
CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
|
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
|
||||||
errdetail("We currently don't support appending to shards "
|
errdetail("We currently don't support appending to shards "
|
||||||
|
@ -586,7 +585,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
|
||||||
relationShard->shardId = shardInterval->shardId;
|
relationShard->shardId = shardInterval->shardId;
|
||||||
List *relationShardList = list_make1(relationShard);
|
List *relationShardList = list_make1(relationShard);
|
||||||
|
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
||||||
cacheEntry->colocationId != INVALID_COLOCATION_ID)
|
cacheEntry->colocationId != INVALID_COLOCATION_ID)
|
||||||
{
|
{
|
||||||
shardIndex = ShardIndex(shardInterval);
|
shardIndex = ShardIndex(shardInterval);
|
||||||
|
@ -605,12 +604,12 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(fkeyRelationid, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
fkeyShardId = GetFirstShardId(fkeyRelationid);
|
fkeyShardId = GetFirstShardId(fkeyRelationid);
|
||||||
}
|
}
|
||||||
else if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
else if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
||||||
PartitionMethod(fkeyRelationid) == DISTRIBUTE_BY_HASH)
|
IsCitusTableType(fkeyRelationid, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
/* hash distributed tables should be colocated to have fkey */
|
/* hash distributed tables should be colocated to have fkey */
|
||||||
Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId);
|
Assert(TableColocationId(fkeyRelationid) == cacheEntry->colocationId);
|
||||||
|
@ -726,7 +725,7 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
|
||||||
{
|
{
|
||||||
referencedShardId = shardId;
|
referencedShardId = shardId;
|
||||||
}
|
}
|
||||||
else if (PartitionMethod(referencedRelationId) == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
referencedShardId = GetFirstShardId(referencedRelationId);
|
referencedShardId = GetFirstShardId(referencedRelationId);
|
||||||
}
|
}
|
||||||
|
@ -769,7 +768,6 @@ UpdateShardStatistics(int64 shardId)
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
char storageType = shardInterval->storageType;
|
char storageType = shardInterval->storageType;
|
||||||
char partitionType = PartitionMethod(relationId);
|
|
||||||
bool statsOK = false;
|
bool statsOK = false;
|
||||||
uint64 shardSize = 0;
|
uint64 shardSize = 0;
|
||||||
text *minValue = NULL;
|
text *minValue = NULL;
|
||||||
|
@ -827,7 +825,7 @@ UpdateShardStatistics(int64 shardId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* only update shard min/max values for append-partitioned tables */
|
/* only update shard min/max values for append-partitioned tables */
|
||||||
if (partitionType == DISTRIBUTE_BY_APPEND)
|
if (IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
DeleteShardRow(shardId);
|
DeleteShardRow(shardId);
|
||||||
InsertShardRow(relationId, shardId, storageType, minValue, maxValue);
|
InsertShardRow(relationId, shardId, storageType, minValue, maxValue);
|
||||||
|
@ -856,7 +854,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
||||||
StringInfo tableSizeQuery = makeStringInfo();
|
StringInfo tableSizeQuery = makeStringInfo();
|
||||||
|
|
||||||
const uint32 unusedTableId = 1;
|
const uint32 unusedTableId = 1;
|
||||||
char partitionType = PartitionMethod(relationId);
|
|
||||||
StringInfo partitionValueQuery = makeStringInfo();
|
StringInfo partitionValueQuery = makeStringInfo();
|
||||||
|
|
||||||
PGresult *queryResult = NULL;
|
PGresult *queryResult = NULL;
|
||||||
|
@ -921,7 +918,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
|
||||||
PQclear(queryResult);
|
PQclear(queryResult);
|
||||||
ForgetResults(connection);
|
ForgetResults(connection);
|
||||||
|
|
||||||
if (partitionType != DISTRIBUTE_BY_APPEND)
|
if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
/* we don't need min/max for non-append distributed tables */
|
/* we don't need min/max for non-append distributed tables */
|
||||||
return true;
|
return true;
|
||||||
|
|
|
@ -78,8 +78,7 @@ RebuildQueryStrings(Job *workerJob)
|
||||||
Query *copiedSubquery = copiedSubqueryRte->subquery;
|
Query *copiedSubquery = copiedSubqueryRte->subquery;
|
||||||
|
|
||||||
/* there are no restrictions to add for reference tables */
|
/* there are no restrictions to add for reference tables */
|
||||||
char partitionMethod = PartitionMethod(shardInterval->relationId);
|
if (IsCitusTableType(shardInterval->relationId, DISTRIBUTED_TABLE))
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
|
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);
|
||||||
}
|
}
|
||||||
|
|
|
@ -286,37 +286,6 @@ ExtractRangeTableEntryList(Query *query)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExtractClassifiedRangeTableEntryList extracts reference table rte's from
|
|
||||||
* the given rte list.
|
|
||||||
* Callers of this function are responsible for passing referenceTableRTEList
|
|
||||||
* to be non-null and initially pointing to an empty list.
|
|
||||||
*/
|
|
||||||
List *
|
|
||||||
ExtractReferenceTableRTEList(List *rteList)
|
|
||||||
{
|
|
||||||
List *referenceTableRTEList = NIL;
|
|
||||||
|
|
||||||
RangeTblEntry *rte = NULL;
|
|
||||||
foreach_ptr(rte, rteList)
|
|
||||||
{
|
|
||||||
if (rte->rtekind != RTE_RELATION || rte->relkind != RELKIND_RELATION)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
Oid relationOid = rte->relid;
|
|
||||||
if (IsCitusTable(relationOid) && PartitionMethod(relationOid) ==
|
|
||||||
DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
|
||||||
referenceTableRTEList = lappend(referenceTableRTEList, rte);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return referenceTableRTEList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
|
* NeedsDistributedPlanning returns true if the Citus extension is loaded and
|
||||||
* the query contains a distributed table.
|
* the query contains a distributed table.
|
||||||
|
@ -1855,7 +1824,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
cacheEntry = GetCitusTableCacheEntry(rte->relid);
|
||||||
|
|
||||||
relationRestrictionContext->allReferenceTables &=
|
relationRestrictionContext->allReferenceTables &=
|
||||||
(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
|
IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
relationRestrictionContext->relationRestrictionList =
|
relationRestrictionContext->relationRestrictionList =
|
||||||
|
|
|
@ -129,9 +129,8 @@ GroupedByPartitionColumn(MultiNode *node, MultiExtendedOp *opNode)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (!IsCitusTableType(relationId, RANGE_DISTRIBUTED) &&
|
||||||
if (partitionMethod != DISTRIBUTE_BY_RANGE &&
|
!IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
partitionMethod != DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
/* only range- and hash-distributed tables are strictly partitioned */
|
/* only range- and hash-distributed tables are strictly partitioned */
|
||||||
return false;
|
return false;
|
||||||
|
@ -298,7 +297,7 @@ PartitionColumnInTableList(Var *column, List *tableNodeList)
|
||||||
{
|
{
|
||||||
Assert(partitionColumn->varno == tableNode->rangeTableId);
|
Assert(partitionColumn->varno == tableNode->rangeTableId);
|
||||||
|
|
||||||
if (PartitionMethod(tableNode->relationId) != DISTRIBUTE_BY_APPEND)
|
if (!IsCitusTableType(tableNode->relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -205,15 +205,16 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
||||||
/* we don't want to deal with append/range distributed tables */
|
/* we don't want to deal with append/range distributed tables */
|
||||||
Oid distributedTableId = rangeTableEntry->relid;
|
Oid distributedTableId = rangeTableEntry->relid;
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
if (IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
|
||||||
cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE))
|
IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* WHERE clause should not be empty for distributed tables */
|
/* WHERE clause should not be empty for distributed tables */
|
||||||
if (joinTree == NULL ||
|
if (joinTree == NULL ||
|
||||||
(cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE && joinTree->quals == NULL))
|
(IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) && joinTree->quals ==
|
||||||
|
NULL))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -541,7 +541,6 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
{
|
{
|
||||||
Oid selectPartitionColumnTableId = InvalidOid;
|
Oid selectPartitionColumnTableId = InvalidOid;
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
char targetPartitionMethod = PartitionMethod(targetRelationId);
|
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
|
||||||
/* we only do this check for INSERT ... SELECT queries */
|
/* we only do this check for INSERT ... SELECT queries */
|
||||||
|
@ -589,7 +588,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
* If we're inserting into a reference table, all participating tables
|
* If we're inserting into a reference table, all participating tables
|
||||||
* should be reference tables as well.
|
* should be reference tables as well.
|
||||||
*/
|
*/
|
||||||
if (targetPartitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(targetRelationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
if (!allReferenceTables)
|
if (!allReferenceTables)
|
||||||
{
|
{
|
||||||
|
@ -1424,7 +1423,7 @@ NonPushableInsertSelectSupported(Query *insertSelectQuery)
|
||||||
}
|
}
|
||||||
|
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertSelectQuery);
|
||||||
if (PartitionMethod(insertRte->relid) == DISTRIBUTE_BY_APPEND)
|
if (IsCitusTableType(insertRte->relid, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"INSERT ... SELECT into an append-distributed table is "
|
"INSERT ... SELECT into an append-distributed table is "
|
||||||
|
|
|
@ -823,12 +823,12 @@ ReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
|
bool leftIsReferenceTable = IsCitusTableType(
|
||||||
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
|
currentJoinNode->tableEntry->relationId,
|
||||||
|
REFERENCE_TABLE);
|
||||||
if (!IsSupportedReferenceJoin(joinType,
|
bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId,
|
||||||
leftPartitionMethod == DISTRIBUTE_BY_NONE,
|
REFERENCE_TABLE);
|
||||||
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
|
if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -874,12 +874,13 @@ static JoinOrderNode *
|
||||||
CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
CartesianProductReferenceJoin(JoinOrderNode *currentJoinNode, TableEntry *candidateTable,
|
||||||
List *applicableJoinClauses, JoinType joinType)
|
List *applicableJoinClauses, JoinType joinType)
|
||||||
{
|
{
|
||||||
char candidatePartitionMethod = PartitionMethod(candidateTable->relationId);
|
bool leftIsReferenceTable = IsCitusTableType(
|
||||||
char leftPartitionMethod = PartitionMethod(currentJoinNode->tableEntry->relationId);
|
currentJoinNode->tableEntry->relationId,
|
||||||
|
REFERENCE_TABLE);
|
||||||
|
bool rightIsReferenceTable = IsCitusTableType(candidateTable->relationId,
|
||||||
|
REFERENCE_TABLE);
|
||||||
|
|
||||||
if (!IsSupportedReferenceJoin(joinType,
|
if (!IsSupportedReferenceJoin(joinType, leftIsReferenceTable, rightIsReferenceTable))
|
||||||
leftPartitionMethod == DISTRIBUTE_BY_NONE,
|
|
||||||
candidatePartitionMethod == DISTRIBUTE_BY_NONE))
|
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -1385,8 +1386,8 @@ DistPartitionKey(Oid relationId)
|
||||||
{
|
{
|
||||||
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* reference tables do not have partition column */
|
/* non-distributed tables do not have partition column */
|
||||||
if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -4276,10 +4276,8 @@ TablePartitioningSupportsDistinct(List *tableNodeList, MultiExtendedOp *opNode,
|
||||||
* We need to check that task results don't overlap. We can only do this
|
* We need to check that task results don't overlap. We can only do this
|
||||||
* if table is range partitioned.
|
* if table is range partitioned.
|
||||||
*/
|
*/
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) ||
|
||||||
|
IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_RANGE ||
|
|
||||||
partitionMethod == DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
Var *tablePartitionColumn = tableNode->partitionColumn;
|
Var *tablePartitionColumn = tableNode->partitionColumn;
|
||||||
|
|
||||||
|
|
|
@ -227,10 +227,10 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
|
||||||
FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column);
|
FindReferencedTableColumn(targetExpression, NIL, query, &relationId, &column);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If the expression belongs to a reference table continue searching for
|
* If the expression belongs to a non-distributed table continue searching for
|
||||||
* other partition keys.
|
* other partition keys.
|
||||||
*/
|
*/
|
||||||
if (IsCitusTable(relationId) && PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -341,8 +341,7 @@ bool
|
||||||
IsDistributedTableRTE(Node *node)
|
IsDistributedTableRTE(Node *node)
|
||||||
{
|
{
|
||||||
Oid relationId = NodeTryGetRteRelid(node);
|
Oid relationId = NodeTryGetRteRelid(node);
|
||||||
return relationId != InvalidOid && IsCitusTable(relationId) &&
|
return relationId != InvalidOid && IsCitusTableType(relationId, DISTRIBUTED_TABLE);
|
||||||
PartitionMethod(relationId) != DISTRIBUTE_BY_NONE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -354,7 +353,7 @@ bool
|
||||||
IsReferenceTableRTE(Node *node)
|
IsReferenceTableRTE(Node *node)
|
||||||
{
|
{
|
||||||
Oid relationId = NodeTryGetRteRelid(node);
|
Oid relationId = NodeTryGetRteRelid(node);
|
||||||
return relationId != InvalidOid && IsReferenceTable(relationId);
|
return relationId != InvalidOid && IsCitusTableType(relationId, REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1021,12 +1020,11 @@ ErrorHintRequired(const char *errorHint, Query *queryTree)
|
||||||
foreach(relationIdCell, distributedRelationIdList)
|
foreach(relationIdCell, distributedRelationIdList)
|
||||||
{
|
{
|
||||||
Oid relationId = lfirst_oid(relationIdCell);
|
Oid relationId = lfirst_oid(relationIdCell);
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_HASH)
|
else if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
int colocationId = TableColocationId(relationId);
|
int colocationId = TableColocationId(relationId);
|
||||||
colocationIdList = list_append_unique_int(colocationIdList, colocationId);
|
colocationIdList = list_append_unique_int(colocationIdList, colocationId);
|
||||||
|
|
|
@ -2141,7 +2141,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2299,22 +2299,21 @@ ErrorIfUnsupportedShardDistribution(Query *query)
|
||||||
foreach(relationIdCell, relationIdList)
|
foreach(relationIdCell, relationIdList)
|
||||||
{
|
{
|
||||||
Oid relationId = lfirst_oid(relationIdCell);
|
Oid relationId = lfirst_oid(relationIdCell);
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
if (IsCitusTableType(relationId, RANGE_DISTRIBUTED))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_RANGE)
|
|
||||||
{
|
{
|
||||||
rangeDistributedRelationCount++;
|
rangeDistributedRelationCount++;
|
||||||
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
||||||
relationId);
|
relationId);
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_HASH)
|
else if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
hashDistributedRelationCount++;
|
hashDistributedRelationCount++;
|
||||||
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
||||||
relationId);
|
relationId);
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/* do not need to handle reference tables */
|
/* do not need to handle non-distributed tables */
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
|
@ -2426,9 +2425,9 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/* reference table only has one shard */
|
/* non-distributed tables have only one shard */
|
||||||
shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
||||||
|
|
||||||
/* only use reference table as anchor shard if none exists yet */
|
/* only use reference table as anchor shard if none exists yet */
|
||||||
|
@ -2537,13 +2536,13 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||||
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
|
FmgrInfo *comparisonFunction = firstTableCache->shardIntervalCompareFunction;
|
||||||
|
|
||||||
/* reference tables are always & only copartitioned with reference tables */
|
/* reference tables are always & only copartitioned with reference tables */
|
||||||
if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) &&
|
||||||
secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE)
|
IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else if (firstTableCache->partitionMethod == DISTRIBUTE_BY_NONE ||
|
else if (IsCitusTableTypeCacheEntry(firstTableCache, CITUS_TABLE_WITH_NO_DIST_KEY) ||
|
||||||
secondTableCache->partitionMethod == DISTRIBUTE_BY_NONE)
|
IsCitusTableTypeCacheEntry(secondTableCache, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -2578,8 +2577,8 @@ CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||||
* different values for the same value. int vs bigint can be given as an
|
* different values for the same value. int vs bigint can be given as an
|
||||||
* example.
|
* example.
|
||||||
*/
|
*/
|
||||||
if (firstTableCache->partitionMethod == DISTRIBUTE_BY_HASH ||
|
if (IsCitusTableTypeCacheEntry(firstTableCache, HASH_DISTRIBUTED) ||
|
||||||
secondTableCache->partitionMethod == DISTRIBUTE_BY_HASH)
|
IsCitusTableTypeCacheEntry(secondTableCache, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
@ -3562,7 +3561,7 @@ NodeIsRangeTblRefReferenceTable(Node *node, List *rangeTableList)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return PartitionMethod(rangeTableEntry->relid) == DISTRIBUTE_BY_NONE;
|
return IsCitusTableType(rangeTableEntry->relid, REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -3734,15 +3733,12 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList)
|
||||||
if (rangeTableType == CITUS_RTE_RELATION)
|
if (rangeTableType == CITUS_RTE_RELATION)
|
||||||
{
|
{
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
char partitionMethod = PartitionMethod(relationId);
|
|
||||||
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);
|
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);
|
||||||
|
|
||||||
/* reference tables do not have partition columns */
|
/* non-distributed tables do not have partition columns */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
partitionedOnColumn = false;
|
return false;
|
||||||
|
|
||||||
return partitionedOnColumn;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionColumn->varattno == column->varattno)
|
if (partitionColumn->varattno == column->varattno)
|
||||||
|
@ -3927,7 +3923,7 @@ ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterva
|
||||||
CitusTableCacheEntry *intervalRelation =
|
CitusTableCacheEntry *intervalRelation =
|
||||||
GetCitusTableCacheEntry(firstInterval->relationId);
|
GetCitusTableCacheEntry(firstInterval->relationId);
|
||||||
|
|
||||||
Assert(intervalRelation->partitionMethod != DISTRIBUTE_BY_NONE);
|
Assert(IsCitusTableTypeCacheEntry(intervalRelation, DISTRIBUTED_TABLE));
|
||||||
|
|
||||||
FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction;
|
FmgrInfo *comparisonFunction = intervalRelation->shardIntervalCompareFunction;
|
||||||
Oid collation = intervalRelation->partitionColumn->varcollid;
|
Oid collation = intervalRelation->partitionColumn->varcollid;
|
||||||
|
|
|
@ -299,15 +299,14 @@ List *
|
||||||
ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex)
|
ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rteIndex)
|
||||||
{
|
{
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
char partitionMethod = PartitionMethod(shardInterval->relationId);
|
|
||||||
Var *partitionColumn = NULL;
|
Var *partitionColumn = NULL;
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
partitionColumn = MakeInt4Column();
|
partitionColumn = MakeInt4Column();
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_RANGE || partitionMethod ==
|
else if (IsCitusTableType(relationId, RANGE_DISTRIBUTED) || IsCitusTableType(
|
||||||
DISTRIBUTE_BY_APPEND)
|
relationId, APPEND_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
Assert(rteIndex > 0);
|
Assert(rteIndex > 0);
|
||||||
partitionColumn = PartitionColumn(relationId, rteIndex);
|
partitionColumn = PartitionColumn(relationId, rteIndex);
|
||||||
|
@ -1134,7 +1133,6 @@ MultiShardModifyQuerySupported(Query *originalQuery,
|
||||||
DeferredErrorMessage *errorMessage = NULL;
|
DeferredErrorMessage *errorMessage = NULL;
|
||||||
RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery);
|
RangeTblEntry *resultRangeTable = ExtractResultRelationRTE(originalQuery);
|
||||||
Oid resultRelationOid = resultRangeTable->relid;
|
Oid resultRelationOid = resultRangeTable->relid;
|
||||||
char resultPartitionMethod = PartitionMethod(resultRelationOid);
|
|
||||||
|
|
||||||
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
|
if (HasDangerousJoinUsing(originalQuery->rtable, (Node *) originalQuery->jointree))
|
||||||
{
|
{
|
||||||
|
@ -1151,7 +1149,7 @@ MultiShardModifyQuerySupported(Query *originalQuery,
|
||||||
"tables must not be VOLATILE",
|
"tables must not be VOLATILE",
|
||||||
NULL, NULL);
|
NULL, NULL);
|
||||||
}
|
}
|
||||||
else if (resultPartitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(resultRelationOid, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
errorMessage = DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"only reference tables may be queried when targeting "
|
"only reference tables may be queried when targeting "
|
||||||
|
@ -1882,9 +1880,8 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
|
|
||||||
CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(
|
CitusTableCacheEntry *modificationTableCacheEntry = GetCitusTableCacheEntry(
|
||||||
updateOrDeleteRTE->relid);
|
updateOrDeleteRTE->relid);
|
||||||
char modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
|
|
||||||
|
|
||||||
if (modificationPartitionMethod == DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry, REFERENCE_TABLE) &&
|
||||||
SelectsFromDistributedTable(rangeTableList, query))
|
SelectsFromDistributedTable(rangeTableList, query))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -2009,7 +2006,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
|
||||||
rangeTableEntry->relid);
|
rangeTableEntry->relid);
|
||||||
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
|
if (IsCitusTableTypeCacheEntry(cacheEntry, DISTRIBUTED_TABLE) &&
|
||||||
(resultRangeTableEntry == NULL || resultRangeTableEntry->relid !=
|
(resultRangeTableEntry == NULL || resultRangeTableEntry->relid !=
|
||||||
rangeTableEntry->relid))
|
rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
|
@ -2424,9 +2421,9 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
|
||||||
{
|
{
|
||||||
Oid relationId = ExtractFirstCitusTableId(query);
|
Oid relationId = ExtractFirstCitusTableId(query);
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/* we don't need to do shard pruning for reference tables */
|
/* we don't need to do shard pruning for non-distributed tables */
|
||||||
return list_make1(LoadShardIntervalList(relationId));
|
return list_make1(LoadShardIntervalList(relationId));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2702,14 +2699,13 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
List *modifyRouteList = NIL;
|
List *modifyRouteList = NIL;
|
||||||
ListCell *insertValuesCell = NULL;
|
ListCell *insertValuesCell = NULL;
|
||||||
|
|
||||||
Assert(query->commandType == CMD_INSERT);
|
Assert(query->commandType == CMD_INSERT);
|
||||||
|
|
||||||
/* reference tables can only have one shard */
|
/* reference tables can only have one shard */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||||
|
|
||||||
|
@ -2808,8 +2804,8 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
missingOk);
|
missingOk);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
|
||||||
DISTRIBUTE_BY_RANGE)
|
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
Datum partitionValue = partitionValueConst->constvalue;
|
Datum partitionValue = partitionValueConst->constvalue;
|
||||||
|
|
||||||
|
@ -3203,8 +3199,7 @@ ExtractInsertPartitionKeyValue(Query *query)
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
Const *singlePartitionValueConst = NULL;
|
Const *singlePartitionValueConst = NULL;
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(distributedTableId);
|
if (IsCitusTableType(distributedTableId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -3336,9 +3331,7 @@ MultiRouterPlannableQuery(Query *query)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
char partitionMethod = PartitionMethod(distributedTableId);
|
if (IsCitusTableType(distributedTableId, APPEND_DISTRIBUTED))
|
||||||
if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
|
|
||||||
DISTRIBUTE_BY_NONE || partitionMethod == DISTRIBUTE_BY_RANGE))
|
|
||||||
{
|
{
|
||||||
return DeferredError(
|
return DeferredError(
|
||||||
ERRCODE_FEATURE_NOT_SUPPORTED,
|
ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
@ -3346,7 +3339,7 @@ MultiRouterPlannableQuery(Query *query)
|
||||||
NULL, NULL);
|
NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(distributedTableId, DISTRIBUTED_TABLE))
|
||||||
{
|
{
|
||||||
hasDistributedTable = true;
|
hasDistributedTable = true;
|
||||||
}
|
}
|
||||||
|
@ -3361,7 +3354,8 @@ MultiRouterPlannableQuery(Query *query)
|
||||||
uint32 tableReplicationFactor = TableShardReplicationFactor(
|
uint32 tableReplicationFactor = TableShardReplicationFactor(
|
||||||
distributedTableId);
|
distributedTableId);
|
||||||
|
|
||||||
if (tableReplicationFactor > 1 && partitionMethod != DISTRIBUTE_BY_NONE)
|
if (tableReplicationFactor > 1 && IsCitusTableType(distributedTableId,
|
||||||
|
DISTRIBUTED_TABLE))
|
||||||
{
|
{
|
||||||
return DeferredError(
|
return DeferredError(
|
||||||
ERRCODE_FEATURE_NOT_SUPPORTED,
|
ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
|
@ -3490,13 +3484,12 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree)
|
||||||
|
|
||||||
CitusTableCacheEntry *modificationTableCacheEntry =
|
CitusTableCacheEntry *modificationTableCacheEntry =
|
||||||
GetCitusTableCacheEntry(distributedTableId);
|
GetCitusTableCacheEntry(distributedTableId);
|
||||||
char modificationPartitionMethod =
|
|
||||||
modificationTableCacheEntry->partitionMethod;
|
|
||||||
|
|
||||||
if (modificationPartitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry,
|
||||||
|
CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"cannot router plan modification of a reference table",
|
"cannot router plan modification of a non-distributed table",
|
||||||
NULL, NULL);
|
NULL, NULL);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@
|
||||||
#include "distributed/query_colocation_checker.h"
|
#include "distributed/query_colocation_checker.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/relation_restriction_equivalence.h"
|
#include "distributed/relation_restriction_equivalence.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_logical_planner.h" /* only to access utility functions */
|
#include "distributed/multi_logical_planner.h" /* only to access utility functions */
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "nodes/nodeFuncs.h"
|
#include "nodes/nodeFuncs.h"
|
||||||
|
@ -153,10 +154,10 @@ AnchorRte(Query *subquery)
|
||||||
{
|
{
|
||||||
Oid relationId = currentRte->relid;
|
Oid relationId = currentRte->relid;
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Reference tables should not be the anchor rte since they
|
* Non-distributed tables should not be the anchor rte since they
|
||||||
* don't have distribution key.
|
* don't have distribution key.
|
||||||
*/
|
*/
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -1445,8 +1445,7 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
||||||
if (rangeTableEntry->rtekind == RTE_RELATION)
|
if (rangeTableEntry->rtekind == RTE_RELATION)
|
||||||
{
|
{
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
if (IsCitusTable(relationId) &&
|
if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
*recurType = RECURRING_TUPLES_REFERENCE_TABLE;
|
*recurType = RECURRING_TUPLES_REFERENCE_TABLE;
|
||||||
|
|
||||||
|
|
|
@ -605,8 +605,10 @@ ReferenceRelationCount(RelationRestrictionContext *restrictionContext)
|
||||||
{
|
{
|
||||||
RelationRestriction *relationRestriction =
|
RelationRestriction *relationRestriction =
|
||||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||||
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(
|
||||||
|
relationRestriction->relationId);
|
||||||
|
|
||||||
if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
referenceRelationCount++;
|
referenceRelationCount++;
|
||||||
}
|
}
|
||||||
|
@ -657,8 +659,9 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
||||||
(RelationRestriction *) lfirst(relationRestrictionCell);
|
(RelationRestriction *) lfirst(relationRestrictionCell);
|
||||||
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
||||||
|
|
||||||
/* we shouldn't check for the equality of reference tables */
|
/* we shouldn't check for the equality of non-distributed tables */
|
||||||
if (PartitionMethod(relationRestriction->relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationRestriction->relationId,
|
||||||
|
CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1721,7 +1724,7 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio
|
||||||
{
|
{
|
||||||
Oid relationId = relationRestriction->relationId;
|
Oid relationId = relationRestriction->relationId;
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -306,8 +306,8 @@ static int ConstraintCount(PruningTreeNode *node);
|
||||||
* PruneShards returns all shards from a distributed table that cannot be
|
* PruneShards returns all shards from a distributed table that cannot be
|
||||||
* proven to be eliminated by whereClauseList.
|
* proven to be eliminated by whereClauseList.
|
||||||
*
|
*
|
||||||
* For reference tables, the function simply returns the single shard that the
|
* For non-distributed tables such as reference table, the function
|
||||||
* table has.
|
* simply returns the single shard that the table has.
|
||||||
*
|
*
|
||||||
* When there is a single <partition column> = <constant> filter in the where
|
* When there is a single <partition column> = <constant> filter in the where
|
||||||
* clause list, the constant is written to the partitionValueConst pointer.
|
* clause list, the constant is written to the partitionValueConst pointer.
|
||||||
|
@ -338,8 +338,8 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* short circuit for reference tables */
|
/* short circuit for non-distributed tables such as reference table */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray,
|
prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray,
|
||||||
cacheEntry->shardIntervalArrayLength);
|
cacheEntry->shardIntervalArrayLength);
|
||||||
|
|
|
@ -74,7 +74,7 @@ partition_task_list_results(PG_FUNCTION_ARGS)
|
||||||
*/
|
*/
|
||||||
int partitionColumnIndex = 0;
|
int partitionColumnIndex = 0;
|
||||||
|
|
||||||
if (targetRelation->partitionMethod != DISTRIBUTE_BY_NONE && IsA(
|
if (IsCitusTableTypeCacheEntry(targetRelation, DISTRIBUTED_TABLE) && IsA(
|
||||||
targetRelation->partitionColumn, Var))
|
targetRelation->partitionColumn, Var))
|
||||||
{
|
{
|
||||||
partitionColumnIndex = targetRelation->partitionColumn->varattno - 1;
|
partitionColumnIndex = targetRelation->partitionColumn->varattno - 1;
|
||||||
|
@ -146,7 +146,8 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
|
||||||
* Here SELECT query's target list should match column list of target relation,
|
* Here SELECT query's target list should match column list of target relation,
|
||||||
* so their partition column indexes are equal.
|
* so their partition column indexes are equal.
|
||||||
*/
|
*/
|
||||||
int partitionColumnIndex = targetRelation->partitionMethod != DISTRIBUTE_BY_NONE ?
|
int partitionColumnIndex = IsCitusTableTypeCacheEntry(targetRelation,
|
||||||
|
DISTRIBUTED_TABLE) ?
|
||||||
targetRelation->partitionColumn->varattno - 1 : 0;
|
targetRelation->partitionColumn->varattno - 1 : 0;
|
||||||
|
|
||||||
List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList,
|
List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList,
|
||||||
|
|
|
@ -173,7 +173,7 @@ RecordRelationAccessIfReferenceTable(Oid relationId, ShardPlacementAccessType ac
|
||||||
* recursively calling RecordRelationAccessBase(), so becareful about
|
* recursively calling RecordRelationAccessBase(), so becareful about
|
||||||
* removing this check.
|
* removing this check.
|
||||||
*/
|
*/
|
||||||
if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE)
|
if (!IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -697,7 +697,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
if (!(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) &&
|
||||||
cacheEntry->referencingRelationsViaForeignKey != NIL))
|
cacheEntry->referencingRelationsViaForeignKey != NIL))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
@ -817,7 +817,7 @@ CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessTyp
|
||||||
}
|
}
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
if (!(IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
||||||
cacheEntry->referencedRelationsViaForeignKey != NIL))
|
cacheEntry->referencedRelationsViaForeignKey != NIL))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
@ -893,7 +893,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
|
||||||
foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey)
|
foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey)
|
||||||
{
|
{
|
||||||
/* we're only interested in foreign keys to reference tables */
|
/* we're only interested in foreign keys to reference tables */
|
||||||
if (PartitionMethod(referencedRelation) != DISTRIBUTE_BY_NONE)
|
if (!IsCitusTableType(referencedRelation, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -955,7 +955,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
bool holdsConflictingLocks = false;
|
bool holdsConflictingLocks = false;
|
||||||
|
|
||||||
Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE);
|
Assert(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE));
|
||||||
|
|
||||||
Oid referencingRelation = InvalidOid;
|
Oid referencingRelation = InvalidOid;
|
||||||
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)
|
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)
|
||||||
|
@ -964,8 +964,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
|
||||||
* We're only interested in foreign keys to reference tables from
|
* We're only interested in foreign keys to reference tables from
|
||||||
* hash distributed tables.
|
* hash distributed tables.
|
||||||
*/
|
*/
|
||||||
if (!IsCitusTable(referencingRelation) ||
|
if (!IsCitusTableType(referencingRelation, HASH_DISTRIBUTED))
|
||||||
PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH)
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -431,11 +431,12 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (leftIntervalPartitionMethod == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableType(leftShardInterval->relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
|
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
|
||||||
}
|
}
|
||||||
else if (leftIntervalPartitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableType(leftShardInterval->relationId,
|
||||||
|
CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Reference tables has only a single shard and all reference tables
|
* Reference tables has only a single shard and all reference tables
|
||||||
|
@ -921,14 +922,13 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
List *colocatedShardList = NIL;
|
List *colocatedShardList = NIL;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* If distribution type of the table is not hash or reference, each shard of
|
* If distribution type of the table is append or range, each shard of
|
||||||
* the shard is only co-located with itself.
|
* the shard is only co-located with itself.
|
||||||
*/
|
*/
|
||||||
if ((partitionMethod == DISTRIBUTE_BY_APPEND) ||
|
if (IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
|
||||||
(partitionMethod == DISTRIBUTE_BY_RANGE))
|
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
ShardInterval *copyShardInterval = CopyShardInterval(shardInterval);
|
ShardInterval *copyShardInterval = CopyShardInterval(shardInterval);
|
||||||
|
|
||||||
|
|
|
@ -56,29 +56,6 @@ static bool AnyRelationsModifiedInTransaction(List *relationIdList);
|
||||||
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
|
PG_FUNCTION_INFO_V1(upgrade_to_reference_table);
|
||||||
PG_FUNCTION_INFO_V1(replicate_reference_tables);
|
PG_FUNCTION_INFO_V1(replicate_reference_tables);
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsReferenceTable returns whether the given relation ID identifies a reference
|
|
||||||
* table.
|
|
||||||
*/
|
|
||||||
bool
|
|
||||||
IsReferenceTable(Oid relationId)
|
|
||||||
{
|
|
||||||
if (!IsCitusTable(relationId))
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
|
||||||
|
|
||||||
if (tableEntry->partitionMethod != DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* replicate_reference_tables is a UDF to ensure that allreference tables are
|
* replicate_reference_tables is a UDF to ensure that allreference tables are
|
||||||
* replicated to all nodes.
|
* replicated to all nodes.
|
||||||
|
@ -350,7 +327,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
|
|
@ -395,7 +395,7 @@ SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag)
|
||||||
uint32 colocationId = citusTable->colocationId;
|
uint32 colocationId = citusTable->colocationId;
|
||||||
|
|
||||||
if (colocationId == INVALID_COLOCATION_ID ||
|
if (colocationId == INVALID_COLOCATION_ID ||
|
||||||
citusTable->partitionMethod != DISTRIBUTE_BY_HASH)
|
!IsCitusTableTypeCacheEntry(citusTable, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId);
|
SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId);
|
||||||
}
|
}
|
||||||
|
@ -493,7 +493,7 @@ GetSortedReferenceShardIntervals(List *relationList)
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, relationList)
|
foreach_oid(relationId, relationList)
|
||||||
{
|
{
|
||||||
if (PartitionMethod(relationId) != DISTRIBUTE_BY_NONE)
|
if (!IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -46,10 +46,10 @@ GetTableLocalShardOid(Oid citusTableOid, uint64 shardId)
|
||||||
Oid
|
Oid
|
||||||
GetReferenceTableLocalShardOid(Oid referenceTableOid)
|
GetReferenceTableLocalShardOid(Oid referenceTableOid)
|
||||||
{
|
{
|
||||||
const CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(referenceTableOid);
|
||||||
|
|
||||||
/* given OID should belong to a valid reference table */
|
/* given OID should belong to a valid reference table */
|
||||||
Assert(cacheEntry != NULL && cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
|
Assert(cacheEntry != NULL && IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE));
|
||||||
|
|
||||||
const ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
const ShardInterval *shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
||||||
uint64 referenceTableShardId = shardInterval->shardId;
|
uint64 referenceTableShardId = shardInterval->shardId;
|
||||||
|
|
|
@ -240,13 +240,14 @@ ShardIndex(ShardInterval *shardInterval)
|
||||||
Datum shardMinValue = shardInterval->minValue;
|
Datum shardMinValue = shardInterval->minValue;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that, we can also support append and range distributed tables, but
|
* Note that, we can also support append and range distributed tables, but
|
||||||
* currently it is not required.
|
* currently it is not required.
|
||||||
*/
|
*/
|
||||||
if (partitionMethod != DISTRIBUTE_BY_HASH && partitionMethod != DISTRIBUTE_BY_NONE)
|
if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
||||||
|
!IsCitusTableTypeCacheEntry(
|
||||||
|
cacheEntry, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("finding index of a given shard is only supported for "
|
errmsg("finding index of a given shard is only supported for "
|
||||||
|
@ -254,7 +255,7 @@ ShardIndex(ShardInterval *shardInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* short-circuit for reference tables */
|
/* short-circuit for reference tables */
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/* reference tables has only a single shard, so the index is fixed to 0 */
|
/* reference tables has only a single shard, so the index is fixed to 0 */
|
||||||
shardIndex = 0;
|
shardIndex = 0;
|
||||||
|
@ -279,7 +280,7 @@ FindShardInterval(Datum partitionColumnValue, CitusTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
Datum searchedValue = partitionColumnValue;
|
Datum searchedValue = partitionColumnValue;
|
||||||
|
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
searchedValue = FunctionCall1Coll(cacheEntry->hashFunction,
|
searchedValue = FunctionCall1Coll(cacheEntry->hashFunction,
|
||||||
cacheEntry->partitionColumn->varcollid,
|
cacheEntry->partitionColumn->varcollid,
|
||||||
|
@ -314,9 +315,8 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry)
|
||||||
{
|
{
|
||||||
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;
|
ShardInterval **shardIntervalCache = cacheEntry->sortedShardIntervalArray;
|
||||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
|
||||||
FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction;
|
FmgrInfo *compareFunction = cacheEntry->shardIntervalCompareFunction;
|
||||||
bool useBinarySearch = (partitionMethod != DISTRIBUTE_BY_HASH ||
|
bool useBinarySearch = (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
|
||||||
!cacheEntry->hasUniformHashDistribution);
|
!cacheEntry->hasUniformHashDistribution);
|
||||||
int shardIndex = INVALID_SHARD_INDEX;
|
int shardIndex = INVALID_SHARD_INDEX;
|
||||||
|
|
||||||
|
@ -325,7 +325,7 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry)
|
||||||
return INVALID_SHARD_INDEX;
|
return INVALID_SHARD_INDEX;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
if (useBinarySearch)
|
if (useBinarySearch)
|
||||||
{
|
{
|
||||||
|
@ -352,9 +352,9 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry)
|
||||||
shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount);
|
shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_NONE)
|
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
||||||
{
|
{
|
||||||
/* reference tables has a single shard, all values mapped to that shard */
|
/* non-distributed tables have a single shard, all values mapped to that shard */
|
||||||
Assert(shardCount == 1);
|
Assert(shardCount == 1);
|
||||||
|
|
||||||
shardIndex = 0;
|
shardIndex = 0;
|
||||||
|
@ -490,7 +490,7 @@ SingleReplicatedTable(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* for hash distributed tables, it is sufficient to only check one shard */
|
/* for hash distributed tables, it is sufficient to only check one shard */
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH)
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
/* checking only for the first shard id should suffice */
|
/* checking only for the first shard id should suffice */
|
||||||
uint64 shardId = *(uint64 *) linitial(shardList);
|
uint64 shardId = *(uint64 *) linitial(shardList);
|
||||||
|
|
|
@ -194,7 +194,7 @@ DistributedTablesSize(List *distTableOids)
|
||||||
* Ignore hash partitioned tables with size greater than 1, since
|
* Ignore hash partitioned tables with size greater than 1, since
|
||||||
* citus_table_size() doesn't work on them.
|
* citus_table_size() doesn't work on them.
|
||||||
*/
|
*/
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_HASH &&
|
if (IsCitusTableType(relationId, HASH_DISTRIBUTED) &&
|
||||||
!SingleReplicatedTable(relationId))
|
!SingleReplicatedTable(relationId))
|
||||||
{
|
{
|
||||||
table_close(relation, AccessShareLock);
|
table_close(relation, AccessShareLock);
|
||||||
|
|
|
@ -196,7 +196,6 @@ extern PlannedStmt * distributed_planner(Query *parse,
|
||||||
|
|
||||||
|
|
||||||
extern List * ExtractRangeTableEntryList(Query *query);
|
extern List * ExtractRangeTableEntryList(Query *query);
|
||||||
extern List * ExtractReferenceTableRTEList(List *rteList);
|
|
||||||
extern bool NeedsDistributedPlanning(Query *query);
|
extern bool NeedsDistributedPlanning(Query *query);
|
||||||
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
extern struct DistributedPlan * GetDistributedPlan(CustomScan *node);
|
||||||
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
extern void multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
|
|
|
@ -116,6 +116,25 @@ typedef struct DistObjectCacheEntry
|
||||||
int colocationId;
|
int colocationId;
|
||||||
} DistObjectCacheEntry;
|
} DistObjectCacheEntry;
|
||||||
|
|
||||||
|
typedef enum
|
||||||
|
{
|
||||||
|
HASH_DISTRIBUTED,
|
||||||
|
APPEND_DISTRIBUTED,
|
||||||
|
RANGE_DISTRIBUTED,
|
||||||
|
|
||||||
|
/* hash, range or append distributed table */
|
||||||
|
DISTRIBUTED_TABLE,
|
||||||
|
|
||||||
|
REFERENCE_TABLE,
|
||||||
|
CITUS_LOCAL_TABLE,
|
||||||
|
|
||||||
|
/* table without a dist key such as reference table */
|
||||||
|
CITUS_TABLE_WITH_NO_DIST_KEY
|
||||||
|
} CitusTableType;
|
||||||
|
|
||||||
|
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
|
||||||
|
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry, CitusTableType
|
||||||
|
tableType);
|
||||||
|
|
||||||
extern bool IsCitusTable(Oid relationId);
|
extern bool IsCitusTable(Oid relationId);
|
||||||
extern List * CitusTableList(void);
|
extern List * CitusTableList(void);
|
||||||
|
|
|
@ -150,7 +150,6 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||||
extern void EnsureTableOwner(Oid relationId);
|
extern void EnsureTableOwner(Oid relationId);
|
||||||
extern void EnsureSchemaOwner(Oid schemaId);
|
extern void EnsureSchemaOwner(Oid schemaId);
|
||||||
extern void EnsureHashDistributedTable(Oid relationId);
|
extern void EnsureHashDistributedTable(Oid relationId);
|
||||||
extern bool IsHashDistributedTable(Oid relationId);
|
|
||||||
extern void EnsureSequenceOwner(Oid sequenceOid);
|
extern void EnsureSequenceOwner(Oid sequenceOid);
|
||||||
extern void EnsureFunctionOwner(Oid functionId);
|
extern void EnsureFunctionOwner(Oid functionId);
|
||||||
extern void EnsureSuperUser(void);
|
extern void EnsureSuperUser(void);
|
||||||
|
|
|
@ -16,7 +16,8 @@
|
||||||
|
|
||||||
#include "listutils.h"
|
#include "listutils.h"
|
||||||
|
|
||||||
extern bool IsReferenceTable(Oid relationId);
|
#include "distributed/metadata_cache.h"
|
||||||
|
|
||||||
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
extern void EnsureReferenceTablesExistOnAllNodes(void);
|
||||||
extern uint32 CreateReferenceTableColocationId(void);
|
extern uint32 CreateReferenceTableColocationId(void);
|
||||||
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
extern void DeleteAllReferenceTablePlacementsFromNodeGroup(int32 groupId);
|
||||||
|
|
Loading…
Reference in New Issue