mirror of https://github.com/citusdata/citus.git
Merge pull request #3586 from citusdata/rename-distributed-to-citus
Try to create clear distinction between DistributedTable vs CitusTablepull/3587/head
commit
00a7bc3044
|
@ -97,7 +97,7 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *distTable = DistributedTableCacheEntry(colocatedRelationId);
|
DistTableCacheEntry *distTable = CitusTableCacheEntry(colocatedRelationId);
|
||||||
Var *partitionColumn = distTable->partitionColumn;
|
Var *partitionColumn = distTable->partitionColumn;
|
||||||
if (partitionColumn == NULL)
|
if (partitionColumn == NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -36,7 +36,7 @@ PreprocessClusterStmt(Node *node, const char *clusterCommand)
|
||||||
|
|
||||||
if (OidIsValid(relationId))
|
if (OidIsValid(relationId))
|
||||||
{
|
{
|
||||||
showPropagationWarning = IsDistributedTable(relationId);
|
showPropagationWarning = IsCitusTable(relationId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -723,7 +723,7 @@ EnsureRelationCanBeDistributed(Oid relationId, Var *distributionColumn,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* partitions cannot be distributed if their parent is not distributed */
|
/* partitions cannot be distributed if their parent is not distributed */
|
||||||
if (PartitionTable(relationId) && !IsDistributedTable(parentRelationId))
|
if (PartitionTable(relationId) && !IsCitusTable(parentRelationId))
|
||||||
{
|
{
|
||||||
char *parentRelationName = get_rel_name(parentRelationId);
|
char *parentRelationName = get_rel_name(parentRelationId);
|
||||||
|
|
||||||
|
@ -793,7 +793,7 @@ static void
|
||||||
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
|
EnsureTableCanBeColocatedWith(Oid relationId, char replicationModel,
|
||||||
Oid distributionColumnType, Oid sourceRelationId)
|
Oid distributionColumnType, Oid sourceRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
|
DistTableCacheEntry *sourceTableEntry = CitusTableCacheEntry(sourceRelationId);
|
||||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
||||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||||
Var *sourceDistributionColumn = ForceDistPartitionKey(sourceRelationId);
|
Var *sourceDistributionColumn = ForceDistPartitionKey(sourceRelationId);
|
||||||
|
@ -890,9 +890,9 @@ EnsureTableNotDistributed(Oid relationId)
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
||||||
bool isDistributedTable = IsDistributedTable(relationId);
|
bool isCitusTable = IsCitusTable(relationId);
|
||||||
|
|
||||||
if (isDistributedTable)
|
if (isCitusTable)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||||
errmsg("table \"%s\" is already distributed",
|
errmsg("table \"%s\" is already distributed",
|
||||||
|
@ -1030,7 +1030,7 @@ LocalTableEmpty(Oid tableId)
|
||||||
int rowId = 0;
|
int rowId = 0;
|
||||||
int attributeId = 1;
|
int attributeId = 1;
|
||||||
|
|
||||||
AssertArg(!IsDistributedTable(tableId));
|
AssertArg(!IsCitusTable(tableId));
|
||||||
|
|
||||||
int spiConnectionResult = SPI_connect();
|
int spiConnectionResult = SPI_connect();
|
||||||
if (spiConnectionResult != SPI_OK_CONNECT)
|
if (spiConnectionResult != SPI_OK_CONNECT)
|
||||||
|
|
|
@ -72,7 +72,7 @@ master_remove_partition_metadata(PG_FUNCTION_ARGS)
|
||||||
* user-friendly, but this function is really only meant to be called
|
* user-friendly, but this function is really only meant to be called
|
||||||
* from the trigger.
|
* from the trigger.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(relationId) || !EnableDDLPropagation)
|
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -132,7 +132,7 @@ MasterRemoveDistributedTableMetadataFromWorkers(Oid relationId, char *schemaName
|
||||||
* user-friendly, but this function is really only meant to be called
|
* user-friendly, but this function is really only meant to be called
|
||||||
* from the trigger.
|
* from the trigger.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(relationId) || !EnableDDLPropagation)
|
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -76,7 +76,7 @@ ConstraintIsAForeignKeyToReferenceTable(char *constraintName, Oid relationId)
|
||||||
|
|
||||||
Oid referencedTableId = constraintForm->confrelid;
|
Oid referencedTableId = constraintForm->confrelid;
|
||||||
|
|
||||||
Assert(IsDistributedTable(referencedTableId));
|
Assert(IsCitusTable(referencedTableId));
|
||||||
|
|
||||||
if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE)
|
if (PartitionMethod(referencedTableId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
@ -125,9 +125,9 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
|
|
||||||
Oid referencingTableId = relation->rd_id;
|
Oid referencingTableId = relation->rd_id;
|
||||||
bool referencingNotReplicated = true;
|
bool referencingNotReplicated = true;
|
||||||
bool referencingIsDistributed = IsDistributedTable(referencingTableId);
|
bool referencingIsCitus = IsCitusTable(referencingTableId);
|
||||||
|
|
||||||
if (referencingIsDistributed)
|
if (referencingIsCitus)
|
||||||
{
|
{
|
||||||
/* ALTER TABLE command is applied over single replicated table */
|
/* ALTER TABLE command is applied over single replicated table */
|
||||||
referencingNotReplicated = SingleReplicatedTable(referencingTableId);
|
referencingNotReplicated = SingleReplicatedTable(referencingTableId);
|
||||||
|
@ -166,11 +166,11 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid referencedTableId = constraintForm->confrelid;
|
Oid referencedTableId = constraintForm->confrelid;
|
||||||
bool referencedIsDistributed = IsDistributedTable(referencedTableId);
|
bool referencedIsCitus = IsCitusTable(referencedTableId);
|
||||||
|
|
||||||
bool selfReferencingTable = (referencingTableId == referencedTableId);
|
bool selfReferencingTable = (referencingTableId == referencedTableId);
|
||||||
|
|
||||||
if (!referencedIsDistributed && !selfReferencingTable)
|
if (!referencedIsCitus && !selfReferencingTable)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||||
errmsg("cannot create foreign key constraint"),
|
errmsg("cannot create foreign key constraint"),
|
||||||
|
@ -447,7 +447,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
||||||
* We check if the referenced table is a reference table. There cannot be
|
* We check if the referenced table is a reference table. There cannot be
|
||||||
* any foreign constraint from a distributed table to a local table.
|
* any foreign constraint from a distributed table to a local table.
|
||||||
*/
|
*/
|
||||||
Assert(IsDistributedTable(referencedTableId));
|
Assert(IsCitusTable(referencedTableId));
|
||||||
if (PartitionMethod(referencedTableId) != DISTRIBUTE_BY_NONE)
|
if (PartitionMethod(referencedTableId) != DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
@ -567,7 +567,7 @@ HasForeignKeyToReferenceTable(Oid relationId)
|
||||||
|
|
||||||
Oid referencedTableId = constraintForm->confrelid;
|
Oid referencedTableId = constraintForm->confrelid;
|
||||||
|
|
||||||
if (!IsDistributedTable(referencedTableId))
|
if (!IsCitusTable(referencedTableId))
|
||||||
{
|
{
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -410,7 +410,7 @@ static void
|
||||||
EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType,
|
EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnType,
|
||||||
Oid sourceRelationId)
|
Oid sourceRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
|
DistTableCacheEntry *sourceTableEntry = CitusTableCacheEntry(sourceRelationId);
|
||||||
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
char sourceDistributionMethod = sourceTableEntry->partitionMethod;
|
||||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||||
|
|
||||||
|
|
|
@ -140,7 +140,7 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
||||||
Relation relation = heap_openrv(createIndexStatement->relation, lockmode);
|
Relation relation = heap_openrv(createIndexStatement->relation, lockmode);
|
||||||
Oid relationId = RelationGetRelid(relation);
|
Oid relationId = RelationGetRelid(relation);
|
||||||
|
|
||||||
bool isDistributedRelation = IsDistributedTable(relationId);
|
bool isDistributedRelation = IsCitusTable(relationId);
|
||||||
|
|
||||||
if (createIndexStatement->relation->schemaname == NULL)
|
if (createIndexStatement->relation->schemaname == NULL)
|
||||||
{
|
{
|
||||||
|
@ -249,7 +249,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand)
|
||||||
relationId = RelationGetRelid(relation);
|
relationId = RelationGetRelid(relation);
|
||||||
}
|
}
|
||||||
|
|
||||||
isDistributedRelation = IsDistributedTable(relationId);
|
isDistributedRelation = IsCitusTable(relationId);
|
||||||
|
|
||||||
if (reindexStatement->relation->schemaname == NULL)
|
if (reindexStatement->relation->schemaname == NULL)
|
||||||
{
|
{
|
||||||
|
@ -359,7 +359,7 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand)
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid relationId = IndexGetRelation(indexId, false);
|
Oid relationId = IndexGetRelation(indexId, false);
|
||||||
bool isDistributedRelation = IsDistributedTable(relationId);
|
bool isDistributedRelation = IsCitusTable(relationId);
|
||||||
if (isDistributedRelation)
|
if (isDistributedRelation)
|
||||||
{
|
{
|
||||||
distributedIndexId = indexId;
|
distributedIndexId = indexId;
|
||||||
|
|
|
@ -2018,7 +2018,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
/* look up table properties */
|
/* look up table properties */
|
||||||
Relation distributedRelation = heap_open(tableId, RowExclusiveLock);
|
Relation distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(tableId);
|
||||||
partitionMethod = cacheEntry->partitionMethod;
|
partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
|
||||||
copyDest->distributedRelation = distributedRelation;
|
copyDest->distributedRelation = distributedRelation;
|
||||||
|
@ -2587,7 +2587,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
|
||||||
isFrom ? RowExclusiveLock :
|
isFrom ? RowExclusiveLock :
|
||||||
AccessShareLock);
|
AccessShareLock);
|
||||||
|
|
||||||
bool isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
|
bool isDistributedRelation = IsCitusTable(RelationGetRelid(copiedRelation));
|
||||||
|
|
||||||
/* ensure future lookups hit the same relation */
|
/* ensure future lookups hit the same relation */
|
||||||
char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation));
|
char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation));
|
||||||
|
|
|
@ -34,7 +34,7 @@ PreprocessCreatePolicyStmt(Node *node, const char *queryString)
|
||||||
Oid relationId = RangeVarGetRelid(stmt->table,
|
Oid relationId = RangeVarGetRelid(stmt->table,
|
||||||
AccessExclusiveLock,
|
AccessExclusiveLock,
|
||||||
false);
|
false);
|
||||||
if (IsDistributedTable(relationId))
|
if (IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("policies on distributed tables are only supported in "
|
errmsg("policies on distributed tables are only supported in "
|
||||||
|
|
|
@ -96,7 +96,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand)
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isDistributedRelation = IsDistributedTable(tableRelationId);
|
bool isDistributedRelation = IsCitusTable(tableRelationId);
|
||||||
if (!isDistributedRelation)
|
if (!isDistributedRelation)
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
|
|
|
@ -84,7 +84,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString)
|
||||||
Oid relationId = get_relname_relid(relationName, namespaceOid);
|
Oid relationId = get_relname_relid(relationName, namespaceOid);
|
||||||
|
|
||||||
/* we're not interested in non-valid, non-distributed relations */
|
/* we're not interested in non-valid, non-distributed relations */
|
||||||
if (relationId == InvalidOid || !IsDistributedTable(relationId))
|
if (relationId == InvalidOid || !IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -35,7 +35,7 @@ ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt)
|
||||||
/* create is easy: just prohibit any distributed OWNED BY */
|
/* create is easy: just prohibit any distributed OWNED BY */
|
||||||
if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId))
|
if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId))
|
||||||
{
|
{
|
||||||
if (IsDistributedTable(ownedByTableId))
|
if (IsCitusTable(ownedByTableId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("cannot create sequences that specify a distributed "
|
errmsg("cannot create sequences that specify a distributed "
|
||||||
|
@ -79,7 +79,7 @@ ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
|
||||||
/* see whether the sequence is already owned by a distributed table */
|
/* see whether the sequence is already owned by a distributed table */
|
||||||
if (sequenceOwned)
|
if (sequenceOwned)
|
||||||
{
|
{
|
||||||
hasDistributedOwner = IsDistributedTable(ownedByTableId);
|
hasDistributedOwner = IsCitusTable(ownedByTableId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId))
|
if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId))
|
||||||
|
@ -91,7 +91,7 @@ ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
|
||||||
errmsg("cannot alter OWNED BY option of a sequence "
|
errmsg("cannot alter OWNED BY option of a sequence "
|
||||||
"already owned by a distributed table")));
|
"already owned by a distributed table")));
|
||||||
}
|
}
|
||||||
else if (!hasDistributedOwner && IsDistributedTable(newOwnedByTableId))
|
else if (!hasDistributedOwner && IsCitusTable(newOwnedByTableId))
|
||||||
{
|
{
|
||||||
/* and don't let local sequences get a distributed OWNED BY */
|
/* and don't let local sequences get a distributed OWNED BY */
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
|
|
@ -80,7 +80,7 @@ PreprocessDropTableStmt(Node *node, const char *queryString)
|
||||||
Oid relationId = RangeVarGetRelid(tableRangeVar, AccessShareLock, missingOK);
|
Oid relationId = RangeVarGetRelid(tableRangeVar, AccessShareLock, missingOK);
|
||||||
|
|
||||||
/* we're not interested in non-valid, non-distributed relations */
|
/* we're not interested in non-valid, non-distributed relations */
|
||||||
if (relationId == InvalidOid || !IsDistributedTable(relationId))
|
if (relationId == InvalidOid || !IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -152,7 +152,7 @@ PostprocessCreateTableStmtPartitionOf(CreateStmt *createStatement, const
|
||||||
* If a partition is being created and if its parent is a distributed
|
* If a partition is being created and if its parent is a distributed
|
||||||
* table, we will distribute this table as well.
|
* table, we will distribute this table as well.
|
||||||
*/
|
*/
|
||||||
if (IsDistributedTable(parentRelationId))
|
if (IsCitusTable(parentRelationId))
|
||||||
{
|
{
|
||||||
bool missingOk = false;
|
bool missingOk = false;
|
||||||
Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock,
|
Oid relationId = RangeVarGetRelid(createStatement->relation, NoLock,
|
||||||
|
@ -219,8 +219,8 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
* If user first distributes the table then tries to attach it to non
|
* If user first distributes the table then tries to attach it to non
|
||||||
* distributed table, we error out.
|
* distributed table, we error out.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(relationId) &&
|
if (!IsCitusTable(relationId) &&
|
||||||
IsDistributedTable(partitionRelationId))
|
IsCitusTable(partitionRelationId))
|
||||||
{
|
{
|
||||||
char *parentRelationName = get_rel_name(partitionRelationId);
|
char *parentRelationName = get_rel_name(partitionRelationId);
|
||||||
|
|
||||||
|
@ -231,8 +231,8 @@ PostprocessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* if parent of this table is distributed, distribute this table too */
|
/* if parent of this table is distributed, distribute this table too */
|
||||||
if (IsDistributedTable(relationId) &&
|
if (IsCitusTable(relationId) &&
|
||||||
!IsDistributedTable(partitionRelationId))
|
!IsCitusTable(partitionRelationId))
|
||||||
{
|
{
|
||||||
Var *distributionColumn = ForceDistPartitionKey(relationId);
|
Var *distributionColumn = ForceDistPartitionKey(relationId);
|
||||||
char distributionMethod = DISTRIBUTE_BY_HASH;
|
char distributionMethod = DISTRIBUTE_BY_HASH;
|
||||||
|
@ -263,7 +263,7 @@ PostprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||||
|
|
||||||
ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
ObjectAddress tableAddress = GetObjectAddressFromParseTree((Node *) stmt, false);
|
||||||
|
|
||||||
if (!ShouldPropagate() || !IsDistributedTable(tableAddress.objectId))
|
if (!ShouldPropagate() || !IsCitusTable(tableAddress.objectId))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -314,7 +314,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
||||||
leftRelationId = IndexGetRelation(leftRelationId, missingOk);
|
leftRelationId = IndexGetRelation(leftRelationId, missingOk);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool referencingIsLocalTable = !IsDistributedTable(leftRelationId);
|
bool referencingIsLocalTable = !IsCitusTable(leftRelationId);
|
||||||
if (referencingIsLocalTable)
|
if (referencingIsLocalTable)
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
|
@ -428,7 +428,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
||||||
* is not distributed. Because, we'll manually convert the partition into
|
* is not distributed. Because, we'll manually convert the partition into
|
||||||
* distributed table and co-locate with its parent.
|
* distributed table and co-locate with its parent.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(rightRelationId))
|
if (!IsCitusTable(rightRelationId))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -467,7 +467,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand)
|
||||||
|
|
||||||
if (OidIsValid(rightRelationId))
|
if (OidIsValid(rightRelationId))
|
||||||
{
|
{
|
||||||
bool referencedIsLocalTable = !IsDistributedTable(rightRelationId);
|
bool referencedIsLocalTable = !IsCitusTable(rightRelationId);
|
||||||
if (referencedIsLocalTable)
|
if (referencedIsLocalTable)
|
||||||
{
|
{
|
||||||
ddlJob->taskList = NIL;
|
ddlJob->taskList = NIL;
|
||||||
|
@ -531,7 +531,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString)
|
||||||
Oid relationId = address.objectId;
|
Oid relationId = address.objectId;
|
||||||
|
|
||||||
/* first check whether a distributed relation is affected */
|
/* first check whether a distributed relation is affected */
|
||||||
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
|
if (!OidIsValid(relationId) || !IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return NIL;
|
return NIL;
|
||||||
}
|
}
|
||||||
|
@ -567,7 +567,7 @@ WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||||
return (Node *) alterTableStatement;
|
return (Node *) alterTableStatement;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isDistributedRelation = IsDistributedTable(leftRelationId);
|
bool isDistributedRelation = IsCitusTable(leftRelationId);
|
||||||
if (!isDistributedRelation)
|
if (!isDistributedRelation)
|
||||||
{
|
{
|
||||||
return (Node *) alterTableStatement;
|
return (Node *) alterTableStatement;
|
||||||
|
@ -654,7 +654,7 @@ ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement)
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
bool isDistributedRelation = IsDistributedTable(leftRelationId);
|
bool isDistributedRelation = IsCitusTable(leftRelationId);
|
||||||
if (!isDistributedRelation)
|
if (!isDistributedRelation)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
|
@ -1076,7 +1076,7 @@ ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement)
|
||||||
"separately.")));
|
"separately.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsDistributedTable(partitionRelationId) &&
|
if (IsCitusTable(partitionRelationId) &&
|
||||||
!TablesColocated(relationId, partitionRelationId))
|
!TablesColocated(relationId, partitionRelationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -1215,7 +1215,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
{
|
{
|
||||||
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
||||||
false);
|
false);
|
||||||
if (IsDistributedTable(rightRelationId) &&
|
if (IsCitusTable(rightRelationId) &&
|
||||||
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
executeSequentially = true;
|
executeSequentially = true;
|
||||||
|
@ -1251,7 +1251,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
{
|
{
|
||||||
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
Oid rightRelationId = RangeVarGetRelid(constraint->pktable, NoLock,
|
||||||
false);
|
false);
|
||||||
if (IsDistributedTable(rightRelationId) &&
|
if (IsCitusTable(rightRelationId) &&
|
||||||
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
PartitionMethod(rightRelationId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
executeSequentially = true;
|
executeSequentially = true;
|
||||||
|
@ -1273,7 +1273,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
* the distributed tables, thus contradicting our purpose of using
|
* the distributed tables, thus contradicting our purpose of using
|
||||||
* sequential mode.
|
* sequential mode.
|
||||||
*/
|
*/
|
||||||
if (executeSequentially && IsDistributedTable(relationId) &&
|
if (executeSequentially && IsCitusTable(relationId) &&
|
||||||
PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
|
PartitionMethod(relationId) != DISTRIBUTE_BY_NONE &&
|
||||||
ParallelQueryExecutedInTransaction())
|
ParallelQueryExecutedInTransaction())
|
||||||
{
|
{
|
||||||
|
|
|
@ -69,7 +69,7 @@ ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
char relationKind = get_rel_relkind(relationId);
|
char relationKind = get_rel_relkind(relationId);
|
||||||
if (IsDistributedTable(relationId) &&
|
if (IsCitusTable(relationId) &&
|
||||||
relationKind == RELKIND_FOREIGN_TABLE)
|
relationKind == RELKIND_FOREIGN_TABLE)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -94,7 +94,7 @@ EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -122,7 +122,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
||||||
|
|
||||||
if (IsDistributedTable(relationId) &&
|
if (IsCitusTable(relationId) &&
|
||||||
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE &&
|
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE &&
|
||||||
TableReferenced(relationId))
|
TableReferenced(relationId))
|
||||||
{
|
{
|
||||||
|
@ -177,7 +177,7 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, false);
|
||||||
Oid referencingRelationId = InvalidOid;
|
Oid referencingRelationId = InvalidOid;
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -189,7 +189,7 @@ LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
|
||||||
|
|
||||||
distributedRelationList = lappend_oid(distributedRelationList, relationId);
|
distributedRelationList = lappend_oid(distributedRelationList, relationId);
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
Assert(cacheEntry != NULL);
|
Assert(cacheEntry != NULL);
|
||||||
|
|
||||||
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
|
List *referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
|
||||||
|
|
|
@ -89,7 +89,7 @@ PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, relationIdList)
|
foreach_oid(relationId, relationIdList)
|
||||||
{
|
{
|
||||||
if (IsDistributedTable(relationId))
|
if (IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* VACUUM commands cannot run inside a transaction block, so we use
|
* VACUUM commands cannot run inside a transaction block, so we use
|
||||||
|
@ -148,7 +148,7 @@ IsDistributedVacuumStmt(int vacuumOptions, List *vacuumRelationIdList)
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, vacuumRelationIdList)
|
foreach_oid(relationId, vacuumRelationIdList)
|
||||||
{
|
{
|
||||||
if (OidIsValid(relationId) && IsDistributedTable(relationId))
|
if (OidIsValid(relationId) && IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
distributedRelationCount++;
|
distributedRelationCount++;
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,7 +220,7 @@ CoordinatorInsertSelectExecScanInternal(CustomScanState *node)
|
||||||
char *distResultPrefix = distResultPrefixString->data;
|
char *distResultPrefix = distResultPrefixString->data;
|
||||||
|
|
||||||
DistTableCacheEntry *targetRelation =
|
DistTableCacheEntry *targetRelation =
|
||||||
DistributedTableCacheEntry(targetRelationId);
|
CitusTableCacheEntry(targetRelationId);
|
||||||
|
|
||||||
int partitionColumnIndex =
|
int partitionColumnIndex =
|
||||||
PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn);
|
PartitionColumnIndex(insertTargetList, targetRelation->partitionColumn);
|
||||||
|
@ -484,7 +484,7 @@ TwoPhaseInsertSelectTaskList(Oid targetRelationId, Query *insertSelectQuery,
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(insertResultQuery);
|
||||||
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
|
RangeTblEntry *selectRte = ExtractSelectRangeTableEntry(insertResultQuery);
|
||||||
|
|
||||||
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetCacheEntry = CitusTableCacheEntry(targetRelationId);
|
||||||
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
||||||
uint32 taskIdIndex = 1;
|
uint32 taskIdIndex = 1;
|
||||||
uint64 jobId = INVALID_JOB_ID;
|
uint64 jobId = INVALID_JOB_ID;
|
||||||
|
@ -895,7 +895,7 @@ CastExpr(Expr *expr, Oid sourceType, Oid targetType, Oid targetCollation,
|
||||||
bool
|
bool
|
||||||
IsSupportedRedistributionTarget(Oid targetRelationId)
|
IsSupportedRedistributionTarget(Oid targetRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *tableEntry = CitusTableCacheEntry(targetRelationId);
|
||||||
|
|
||||||
/* only range and hash-distributed tables are currently supported */
|
/* only range and hash-distributed tables are currently supported */
|
||||||
if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH &&
|
if (tableEntry->partitionMethod != DISTRIBUTE_BY_HASH &&
|
||||||
|
|
|
@ -106,7 +106,7 @@ void
|
||||||
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
|
||||||
int32 replicationFactor, bool useExclusiveConnections)
|
int32 replicationFactor, bool useExclusiveConnections)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
bool colocatedShard = false;
|
bool colocatedShard = false;
|
||||||
List *insertedShardPlacements = NIL;
|
List *insertedShardPlacements = NIL;
|
||||||
|
|
||||||
|
|
|
@ -222,7 +222,7 @@ master_drop_all_shards(PG_FUNCTION_ARGS)
|
||||||
* The SQL_DROP trigger calls this function even for tables that are
|
* The SQL_DROP trigger calls this function even for tables that are
|
||||||
* not distributed. In that case, silently ignore and return -1.
|
* not distributed. In that case, silently ignore and return -1.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(relationId) || !EnableDDLPropagation)
|
if (!IsCitusTable(relationId) || !EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
PG_RETURN_INT32(-1);
|
PG_RETURN_INT32(-1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -246,7 +246,7 @@ DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, char *sizeQ
|
||||||
List *
|
List *
|
||||||
GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
|
GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *distTableCacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *resultList = NIL;
|
List *resultList = NIL;
|
||||||
|
|
||||||
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
|
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
|
||||||
|
@ -284,7 +284,7 @@ GroupShardPlacementsForTableOnGroup(Oid relationId, int32 groupId)
|
||||||
static List *
|
static List *
|
||||||
ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
|
ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *distTableCacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *distTableCacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *shardIntervalList = NIL;
|
List *shardIntervalList = NIL;
|
||||||
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
|
int shardIntervalArrayLength = distTableCacheEntry->shardIntervalArrayLength;
|
||||||
|
|
||||||
|
@ -372,7 +372,7 @@ GenerateSizeQueryOnMultiplePlacements(List *shardIntervalList, char *sizeQuery)
|
||||||
static void
|
static void
|
||||||
ErrorIfNotSuitableToGetSize(Oid relationId)
|
ErrorIfNotSuitableToGetSize(Oid relationId)
|
||||||
{
|
{
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
char *escapedQueryString = quote_literal_cstr(relationName);
|
char *escapedQueryString = quote_literal_cstr(relationName);
|
||||||
|
@ -486,7 +486,7 @@ TableShardReplicationFactor(Oid relationId)
|
||||||
List *
|
List *
|
||||||
LoadShardIntervalList(Oid relationId)
|
LoadShardIntervalList(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *shardList = NIL;
|
List *shardList = NIL;
|
||||||
|
|
||||||
for (int i = 0; i < cacheEntry->shardIntervalArrayLength; i++)
|
for (int i = 0; i < cacheEntry->shardIntervalArrayLength; i++)
|
||||||
|
@ -511,10 +511,10 @@ LoadShardIntervalList(Oid relationId)
|
||||||
int
|
int
|
||||||
ShardIntervalCount(Oid relationId)
|
ShardIntervalCount(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
int shardIntervalCount = 0;
|
int shardIntervalCount = 0;
|
||||||
|
|
||||||
if (cacheEntry->isDistributedTable)
|
if (cacheEntry->isCitusTable)
|
||||||
{
|
{
|
||||||
shardIntervalCount = cacheEntry->shardIntervalArrayLength;
|
shardIntervalCount = cacheEntry->shardIntervalArrayLength;
|
||||||
}
|
}
|
||||||
|
@ -533,7 +533,7 @@ ShardIntervalCount(Oid relationId)
|
||||||
List *
|
List *
|
||||||
LoadShardList(Oid relationId)
|
LoadShardList(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *shardList = NIL;
|
List *shardList = NIL;
|
||||||
|
|
||||||
for (int i = 0; i < cacheEntry->shardIntervalArrayLength; i++)
|
for (int i = 0; i < cacheEntry->shardIntervalArrayLength; i++)
|
||||||
|
|
|
@ -103,7 +103,7 @@ master_get_table_metadata(PG_FUNCTION_ARGS)
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
/* find partition tuple for partitioned relation */
|
/* find partition tuple for partitioned relation */
|
||||||
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *partitionEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* create tuple descriptor for return value */
|
/* create tuple descriptor for return value */
|
||||||
TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL,
|
TypeFuncClass resultTypeClass = get_call_result_type(fcinfo, NULL,
|
||||||
|
|
|
@ -356,7 +356,7 @@ CheckDistributedTable(Oid relationId)
|
||||||
/* check that the relationId belongs to a table */
|
/* check that the relationId belongs to a table */
|
||||||
EnsureRelationKindSupported(relationId);
|
EnsureRelationKindSupported(relationId);
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("relation \"%s\" is not a distributed table",
|
ereport(ERROR, (errmsg("relation \"%s\" is not a distributed table",
|
||||||
relationName)));
|
relationName)));
|
||||||
|
@ -558,7 +558,7 @@ static List *
|
||||||
RelationShardListForShardCreate(ShardInterval *shardInterval)
|
RelationShardListForShardCreate(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
Oid relationId = shardInterval->relationId;
|
Oid relationId = shardInterval->relationId;
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
|
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
|
||||||
List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey;
|
List *referencingRelationList = cacheEntry->referencingRelationsViaForeignKey;
|
||||||
int shardIndex = -1;
|
int shardIndex = -1;
|
||||||
|
@ -587,7 +587,7 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
|
||||||
{
|
{
|
||||||
uint64 fkeyShardId = INVALID_SHARD_ID;
|
uint64 fkeyShardId = INVALID_SHARD_ID;
|
||||||
|
|
||||||
if (!IsDistributedTable(fkeyRelationid))
|
if (!IsCitusTable(fkeyRelationid))
|
||||||
{
|
{
|
||||||
/* we're not interested in local tables */
|
/* we're not interested in local tables */
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -189,7 +189,7 @@ static ScanKeyData DistObjectScanKey[3];
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static bool IsDistributedTableViaCatalog(Oid relationId);
|
static bool IsCitusTableViaCatalog(Oid relationId);
|
||||||
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
|
static ShardCacheEntry * LookupShardCacheEntry(int64 shardId);
|
||||||
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
|
static DistTableCacheEntry * LookupDistTableCacheEntry(Oid relationId);
|
||||||
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
||||||
|
@ -268,11 +268,11 @@ EnsureModificationsCanRun(void)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsDistributedTable returns whether relationId is a distributed relation or
|
* IsCitusTable returns whether relationId is a distributed relation or
|
||||||
* not.
|
* not.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
IsDistributedTable(Oid relationId)
|
IsCitusTable(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = LookupDistTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = LookupDistTableCacheEntry(relationId);
|
||||||
|
|
||||||
|
@ -285,12 +285,12 @@ IsDistributedTable(Oid relationId)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
return cacheEntry->isDistributedTable;
|
return cacheEntry->isCitusTable;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsDistributedTableViaCatalog returns whether the given relation is a
|
* IsCitusTableViaCatalog returns whether the given relation is a
|
||||||
* distributed table or not.
|
* distributed table or not.
|
||||||
*
|
*
|
||||||
* It does so by searching pg_dist_partition, explicitly bypassing caches,
|
* It does so by searching pg_dist_partition, explicitly bypassing caches,
|
||||||
|
@ -302,7 +302,7 @@ IsDistributedTable(Oid relationId)
|
||||||
* that, we'll have to work a bit harder.
|
* that, we'll have to work a bit harder.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
IsDistributedTableViaCatalog(Oid relationId)
|
IsCitusTableViaCatalog(Oid relationId)
|
||||||
{
|
{
|
||||||
const int scanKeyCount = 1;
|
const int scanKeyCount = 1;
|
||||||
ScanKeyData scanKey[1];
|
ScanKeyData scanKey[1];
|
||||||
|
@ -326,11 +326,11 @@ IsDistributedTableViaCatalog(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DistributedTableList returns a list that includes all the valid distributed table
|
* CitusTableList returns a list that includes all the valid distributed table
|
||||||
* cache entries.
|
* cache entries.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
DistributedTableList(void)
|
CitusTableList(void)
|
||||||
{
|
{
|
||||||
List *distributedTableList = NIL;
|
List *distributedTableList = NIL;
|
||||||
|
|
||||||
|
@ -342,7 +342,8 @@ DistributedTableList(void)
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, distTableOidList)
|
foreach_oid(relationId, distTableOidList)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
Assert(cacheEntry->isCitusTable);
|
||||||
|
|
||||||
distributedTableList = lappend(distributedTableList, cacheEntry);
|
distributedTableList = lappend(distributedTableList, cacheEntry);
|
||||||
}
|
}
|
||||||
|
@ -364,7 +365,7 @@ LoadShardInterval(uint64 shardId)
|
||||||
|
|
||||||
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
|
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
|
||||||
|
|
||||||
Assert(tableEntry->isDistributedTable);
|
Assert(tableEntry->isCitusTable);
|
||||||
|
|
||||||
/* the offset better be in a valid range */
|
/* the offset better be in a valid range */
|
||||||
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
|
Assert(shardEntry->shardIndex < tableEntry->shardIntervalArrayLength);
|
||||||
|
@ -391,7 +392,7 @@ RelationIdForShard(uint64 shardId)
|
||||||
|
|
||||||
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
|
DistTableCacheEntry *tableEntry = shardEntry->tableEntry;
|
||||||
|
|
||||||
Assert(tableEntry->isDistributedTable);
|
Assert(tableEntry->isCitusTable);
|
||||||
|
|
||||||
return tableEntry->relationId;
|
return tableEntry->relationId;
|
||||||
}
|
}
|
||||||
|
@ -772,18 +773,18 @@ LookupShardCacheEntry(int64 shardId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DistributedTableCacheEntry looks up a pg_dist_partition entry for a
|
* CitusTableCacheEntry looks up a pg_dist_partition entry for a
|
||||||
* relation.
|
* relation.
|
||||||
*
|
*
|
||||||
* Errors out if no relation matching the criteria could be found.
|
* Errors out if no relation matching the criteria could be found.
|
||||||
*/
|
*/
|
||||||
DistTableCacheEntry *
|
DistTableCacheEntry *
|
||||||
DistributedTableCacheEntry(Oid distributedRelationId)
|
CitusTableCacheEntry(Oid distributedRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry =
|
DistTableCacheEntry *cacheEntry =
|
||||||
LookupDistTableCacheEntry(distributedRelationId);
|
LookupDistTableCacheEntry(distributedRelationId);
|
||||||
|
|
||||||
if (cacheEntry && cacheEntry->isDistributedTable)
|
if (cacheEntry && cacheEntry->isCitusTable)
|
||||||
{
|
{
|
||||||
return cacheEntry;
|
return cacheEntry;
|
||||||
}
|
}
|
||||||
|
@ -832,7 +833,7 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
*/
|
*/
|
||||||
if (!citusVersionKnownCompatible && EnableVersionChecks)
|
if (!citusVersionKnownCompatible && EnableVersionChecks)
|
||||||
{
|
{
|
||||||
bool isDistributed = IsDistributedTableViaCatalog(relationId);
|
bool isCitusTable = IsCitusTableViaCatalog(relationId);
|
||||||
int reportLevel = DEBUG1;
|
int reportLevel = DEBUG1;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -841,7 +842,7 @@ LookupDistTableCacheEntry(Oid relationId)
|
||||||
* want to check compatibility in the non-distributed case as well, so
|
* want to check compatibility in the non-distributed case as well, so
|
||||||
* future lookups can use the cache if compatible.
|
* future lookups can use the cache if compatible.
|
||||||
*/
|
*/
|
||||||
if (isDistributed)
|
if (isCitusTable)
|
||||||
{
|
{
|
||||||
reportLevel = ERROR;
|
reportLevel = ERROR;
|
||||||
}
|
}
|
||||||
|
@ -1018,12 +1019,12 @@ BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry)
|
||||||
/* not a distributed table, done */
|
/* not a distributed table, done */
|
||||||
if (distPartitionTuple == NULL)
|
if (distPartitionTuple == NULL)
|
||||||
{
|
{
|
||||||
cacheEntry->isDistributedTable = false;
|
cacheEntry->isCitusTable = false;
|
||||||
heap_close(pgDistPartition, NoLock);
|
heap_close(pgDistPartition, NoLock);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
cacheEntry->isDistributedTable = true;
|
cacheEntry->isCitusTable = true;
|
||||||
|
|
||||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||||
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
heap_deform_tuple(distPartitionTuple, tupleDescriptor, datumArray, isNullArray);
|
||||||
|
|
|
@ -222,7 +222,7 @@ ClusterHasKnownMetadataWorkers()
|
||||||
bool
|
bool
|
||||||
ShouldSyncTableMetadata(Oid relationId)
|
ShouldSyncTableMetadata(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *tableEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
|
bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
|
||||||
bool streamingReplicated =
|
bool streamingReplicated =
|
||||||
|
@ -356,7 +356,7 @@ List *
|
||||||
MetadataCreateCommands(void)
|
MetadataCreateCommands(void)
|
||||||
{
|
{
|
||||||
List *metadataSnapshotCommandList = NIL;
|
List *metadataSnapshotCommandList = NIL;
|
||||||
List *distributedTableList = DistributedTableList();
|
List *distributedTableList = CitusTableList();
|
||||||
List *propagatedTableList = NIL;
|
List *propagatedTableList = NIL;
|
||||||
bool includeNodesFromOtherClusters = true;
|
bool includeNodesFromOtherClusters = true;
|
||||||
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
|
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
|
||||||
|
@ -391,10 +391,9 @@ MetadataCreateCommands(void)
|
||||||
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
|
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Distributed tables might have dependencies on different objects, since we
|
* Tables might have dependencies on different objects, since we create shards for
|
||||||
* create shards for a distributed table via multiple sessions these objects will
|
* table via multiple sessions these objects will be created via their own connection
|
||||||
* be created via their own connection and committed immediately so they become
|
* and committed immediately so they become visible to all sessions creating shards.
|
||||||
* visible to all sessions creating shards.
|
|
||||||
*/
|
*/
|
||||||
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
|
||||||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||||
|
@ -467,7 +466,7 @@ MetadataCreateCommands(void)
|
||||||
List *
|
List *
|
||||||
GetDistributedTableDDLEvents(Oid relationId)
|
GetDistributedTableDDLEvents(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
List *commandList = NIL;
|
List *commandList = NIL;
|
||||||
bool includeSequenceDefaults = true;
|
bool includeSequenceDefaults = true;
|
||||||
|
@ -1306,7 +1305,7 @@ static List *
|
||||||
DetachPartitionCommandList(void)
|
DetachPartitionCommandList(void)
|
||||||
{
|
{
|
||||||
List *detachPartitionCommandList = NIL;
|
List *detachPartitionCommandList = NIL;
|
||||||
List *distributedTableList = DistributedTableList();
|
List *distributedTableList = CitusTableList();
|
||||||
|
|
||||||
/* we iterate over all distributed partitioned tables and DETACH their partitions */
|
/* we iterate over all distributed partitioned tables and DETACH their partitions */
|
||||||
DistTableCacheEntry *cacheEntry = NULL;
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
|
|
|
@ -793,7 +793,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
EnsureTablePermissions(relationId, ACL_SELECT);
|
EnsureTablePermissions(relationId, ACL_SELECT);
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||||
errmsg("relation is not distributed")));
|
errmsg("relation is not distributed")));
|
||||||
|
@ -813,7 +813,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
else if (distributionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
distributionMethod == DISTRIBUTE_BY_RANGE)
|
distributionMethod == DISTRIBUTE_BY_RANGE)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* if given table is not reference table, distributionValue cannot be NULL */
|
/* if given table is not reference table, distributionValue cannot be NULL */
|
||||||
if (PG_ARGISNULL(1))
|
if (PG_ARGISNULL(1))
|
||||||
|
|
|
@ -364,7 +364,7 @@ ListContainsDistributedTableRTE(List *rangeTableList)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsDistributedTable(rangeTableEntry->relid))
|
if (IsCitusTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -442,7 +442,7 @@ AdjustPartitioningForDistributedPlanning(List *rangeTableList,
|
||||||
* value before and after dropping to the standart_planner.
|
* value before and after dropping to the standart_planner.
|
||||||
*/
|
*/
|
||||||
if (rangeTableEntry->rtekind == RTE_RELATION &&
|
if (rangeTableEntry->rtekind == RTE_RELATION &&
|
||||||
IsDistributedTable(rangeTableEntry->relid) &&
|
IsCitusTable(rangeTableEntry->relid) &&
|
||||||
PartitionedTable(rangeTableEntry->relid))
|
PartitionedTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
rangeTableEntry->inh = setPartitionedTablesInherited;
|
rangeTableEntry->inh = setPartitionedTablesInherited;
|
||||||
|
@ -893,7 +893,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
|
||||||
Oid targetRelationId = ModifyQueryResultRelationId(query);
|
Oid targetRelationId = ModifyQueryResultRelationId(query);
|
||||||
EnsurePartitionTableNotReplicated(targetRelationId);
|
EnsurePartitionTableNotReplicated(targetRelationId);
|
||||||
|
|
||||||
if (InsertSelectIntoDistributedTable(originalQuery))
|
if (InsertSelectIntoCitusTable(originalQuery))
|
||||||
{
|
{
|
||||||
if (hasUnresolvedParams)
|
if (hasUnresolvedParams)
|
||||||
{
|
{
|
||||||
|
@ -1871,7 +1871,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
MemoryContext restrictionsMemoryContext = plannerRestrictionContext->memoryContext;
|
||||||
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
MemoryContext oldMemoryContext = MemoryContextSwitchTo(restrictionsMemoryContext);
|
||||||
|
|
||||||
bool distributedTable = IsDistributedTable(rte->relid);
|
bool distributedTable = IsCitusTable(rte->relid);
|
||||||
bool localTable = !distributedTable;
|
bool localTable = !distributedTable;
|
||||||
|
|
||||||
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
RelationRestriction *relationRestriction = palloc0(sizeof(RelationRestriction));
|
||||||
|
@ -1897,7 +1897,7 @@ multi_relation_restriction_hook(PlannerInfo *root, RelOptInfo *relOptInfo,
|
||||||
*/
|
*/
|
||||||
if (distributedTable)
|
if (distributedTable)
|
||||||
{
|
{
|
||||||
cacheEntry = DistributedTableCacheEntry(rte->relid);
|
cacheEntry = CitusTableCacheEntry(rte->relid);
|
||||||
|
|
||||||
relationRestrictionContext->allReferenceTables &=
|
relationRestrictionContext->allReferenceTables &=
|
||||||
(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
|
(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE);
|
||||||
|
@ -2420,14 +2420,13 @@ IsLocalReferenceTableJoin(Query *parse, List *rangeTableList)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!IsDistributedTable(rangeTableEntry->relid))
|
if (!IsCitusTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
hasLocalTable = true;
|
hasLocalTable = true;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(rangeTableEntry->relid);
|
||||||
rangeTableEntry->relid);
|
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
hasReferenceTable = true;
|
hasReferenceTable = true;
|
||||||
|
@ -2492,12 +2491,12 @@ UpdateReferenceTablesWithShard(Node *node, void *context)
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid relationId = newRte->relid;
|
Oid relationId = newRte->relid;
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -133,7 +133,7 @@ GeneratePlaceHolderPlannedStmt(Query *parse)
|
||||||
result->planTree = (Plan *) plan;
|
result->planTree = (Plan *) plan;
|
||||||
result->hasReturning = (parse->returningList != NIL);
|
result->hasReturning = (parse->returningList != NIL);
|
||||||
|
|
||||||
Oid relationId = ExtractFirstDistributedTableId(parse);
|
Oid relationId = ExtractFirstCitusTableId(parse);
|
||||||
result->relationOids = list_make1_oid(relationId);
|
result->relationOids = list_make1_oid(relationId);
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
|
@ -202,7 +202,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
|
||||||
|
|
||||||
/* we don't want to deal with append/range distributed tables */
|
/* we don't want to deal with append/range distributed tables */
|
||||||
Oid distributedTableId = rangeTableEntry->relid;
|
Oid distributedTableId = rangeTableEntry->relid;
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH ||
|
||||||
cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE))
|
cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE))
|
||||||
{
|
{
|
||||||
|
|
|
@ -267,7 +267,7 @@ TryToDelegateFunctionCall(DistributedPlanningContext *planContext)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
distTable = DistributedTableCacheEntry(colocatedRelationId);
|
distTable = CitusTableCacheEntry(colocatedRelationId);
|
||||||
partitionColumn = distTable->partitionColumn;
|
partitionColumn = distTable->partitionColumn;
|
||||||
if (partitionColumn == NULL)
|
if (partitionColumn == NULL)
|
||||||
{
|
{
|
||||||
|
|
|
@ -74,22 +74,22 @@ static DeferredErrorMessage * CoordinatorInsertSelectSupported(Query *insertSele
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* InsertSelectIntoDistributedTable returns true when the input query is an
|
* InsertSelectIntoCitusTable returns true when the input query is an
|
||||||
* INSERT INTO ... SELECT kind of query and the target is a distributed
|
* INSERT INTO ... SELECT kind of query and the target is a citus
|
||||||
* table.
|
* table.
|
||||||
*
|
*
|
||||||
* Note that the input query should be the original parsetree of
|
* Note that the input query should be the original parsetree of
|
||||||
* the query (i.e., not passed trough the standard planner).
|
* the query (i.e., not passed trough the standard planner).
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
InsertSelectIntoDistributedTable(Query *query)
|
InsertSelectIntoCitusTable(Query *query)
|
||||||
{
|
{
|
||||||
bool insertSelectQuery = CheckInsertSelectQuery(query);
|
bool insertSelectQuery = CheckInsertSelectQuery(query);
|
||||||
|
|
||||||
if (insertSelectQuery)
|
if (insertSelectQuery)
|
||||||
{
|
{
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
|
||||||
if (IsDistributedTable(insertRte->relid))
|
if (IsCitusTable(insertRte->relid))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -112,7 +112,7 @@ InsertSelectIntoLocalTable(Query *query)
|
||||||
if (insertSelectQuery)
|
if (insertSelectQuery)
|
||||||
{
|
{
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(query);
|
||||||
if (!IsDistributedTable(insertRte->relid))
|
if (!IsCitusTable(insertRte->relid))
|
||||||
{
|
{
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -218,7 +218,7 @@ CreateDistributedInsertSelectPlan(Query *originalQuery,
|
||||||
RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery);
|
RangeTblEntry *insertRte = ExtractResultRelationRTE(originalQuery);
|
||||||
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
|
RangeTblEntry *subqueryRte = ExtractSelectRangeTableEntry(originalQuery);
|
||||||
Oid targetRelationId = insertRte->relid;
|
Oid targetRelationId = insertRte->relid;
|
||||||
DistTableCacheEntry *targetCacheEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetCacheEntry = CitusTableCacheEntry(targetRelationId);
|
||||||
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
int shardCount = targetCacheEntry->shardIntervalArrayLength;
|
||||||
RelationRestrictionContext *relationRestrictionContext =
|
RelationRestrictionContext *relationRestrictionContext =
|
||||||
plannerRestrictionContext->relationRestrictionContext;
|
plannerRestrictionContext->relationRestrictionContext;
|
||||||
|
@ -319,7 +319,7 @@ DistributedInsertSelectSupported(Query *queryTree, RangeTblEntry *insertRte,
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
|
||||||
/* we only do this check for INSERT ... SELECT queries */
|
/* we only do this check for INSERT ... SELECT queries */
|
||||||
AssertArg(InsertSelectIntoDistributedTable(queryTree));
|
AssertArg(InsertSelectIntoCitusTable(queryTree));
|
||||||
|
|
||||||
Query *subquery = subqueryRte->subquery;
|
Query *subquery = subqueryRte->subquery;
|
||||||
|
|
||||||
|
@ -426,7 +426,7 @@ RouterModifyTaskForShardInterval(Query *originalQuery, ShardInterval *shardInter
|
||||||
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
uint64 shardId = shardInterval->shardId;
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0(
|
PlannerRestrictionContext *copyOfPlannerRestrictionContext = palloc0(
|
||||||
sizeof(PlannerRestrictionContext));
|
sizeof(PlannerRestrictionContext));
|
||||||
|
@ -610,7 +610,7 @@ ReorderInsertSelectTargetLists(Query *originalQuery, RangeTblEntry *insertRte,
|
||||||
Index insertTableId = 1;
|
Index insertTableId = 1;
|
||||||
int targetEntryIndex = 0;
|
int targetEntryIndex = 0;
|
||||||
|
|
||||||
AssertArg(InsertSelectIntoDistributedTable(originalQuery));
|
AssertArg(InsertSelectIntoCitusTable(originalQuery));
|
||||||
|
|
||||||
Query *subquery = subqueryRte->subquery;
|
Query *subquery = subqueryRte->subquery;
|
||||||
|
|
||||||
|
|
|
@ -1380,7 +1380,7 @@ PartitionColumn(Oid relationId, uint32 rangeTableId)
|
||||||
Var *
|
Var *
|
||||||
DistPartitionKey(Oid relationId)
|
DistPartitionKey(Oid relationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *partitionEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* reference tables do not have partition column */
|
/* reference tables do not have partition column */
|
||||||
if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
@ -1417,7 +1417,7 @@ char
|
||||||
PartitionMethod(Oid relationId)
|
PartitionMethod(Oid relationId)
|
||||||
{
|
{
|
||||||
/* errors out if not a distributed table */
|
/* errors out if not a distributed table */
|
||||||
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *partitionEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
char partitionMethod = partitionEntry->partitionMethod;
|
char partitionMethod = partitionEntry->partitionMethod;
|
||||||
|
|
||||||
|
@ -1430,7 +1430,7 @@ char
|
||||||
TableReplicationModel(Oid relationId)
|
TableReplicationModel(Oid relationId)
|
||||||
{
|
{
|
||||||
/* errors out if not a distributed table */
|
/* errors out if not a distributed table */
|
||||||
DistTableCacheEntry *partitionEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *partitionEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
char replicationModel = partitionEntry->replicationModel;
|
char replicationModel = partitionEntry->replicationModel;
|
||||||
|
|
||||||
|
|
|
@ -284,8 +284,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
|
||||||
* If the expression belongs to a reference table continue searching for
|
* If the expression belongs to a reference table continue searching for
|
||||||
* other partition keys.
|
* other partition keys.
|
||||||
*/
|
*/
|
||||||
if (IsDistributedTable(relationId) && PartitionMethod(relationId) ==
|
if (IsCitusTable(relationId) && PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||||
DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -437,7 +436,7 @@ IsDistributedTableRTE(Node *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
if (!IsDistributedTable(relationId) ||
|
if (!IsCitusTable(relationId) ||
|
||||||
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -1950,7 +1950,7 @@ BuildMapMergeJob(Query *jobQuery, List *dependentJobList, Var *partitionKey,
|
||||||
else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType ==
|
else if (partitionType == SINGLE_HASH_PARTITION_TYPE || partitionType ==
|
||||||
RANGE_PARTITION_TYPE)
|
RANGE_PARTITION_TYPE)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cache = DistributedTableCacheEntry(baseRelationId);
|
DistTableCacheEntry *cache = CitusTableCacheEntry(baseRelationId);
|
||||||
uint32 shardCount = cache->shardIntervalArrayLength;
|
uint32 shardCount = cache->shardIntervalArrayLength;
|
||||||
ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray;
|
ShardInterval **sortedShardIntervalArray = cache->sortedShardIntervalArray;
|
||||||
|
|
||||||
|
@ -2177,7 +2177,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
List *prunedShardList = (List *) lfirst(prunedRelationShardCell);
|
List *prunedShardList = (List *) lfirst(prunedRelationShardCell);
|
||||||
ListCell *shardIntervalCell = NULL;
|
ListCell *shardIntervalCell = NULL;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
@ -2309,7 +2309,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *distTableEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *distTableEntry = CitusTableCacheEntry(relationId);
|
||||||
if (distTableEntry->hasOverlappingShardInterval)
|
if (distTableEntry->hasOverlappingShardInterval)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
|
@ -2415,7 +2415,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
Oid relationId = relationRestriction->relationId;
|
Oid relationId = relationRestriction->relationId;
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
/* reference table only has one shard */
|
/* reference table only has one shard */
|
||||||
|
@ -2509,8 +2509,8 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
bool
|
bool
|
||||||
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
CoPartitionedTables(Oid firstRelationId, Oid secondRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *firstTableCache = DistributedTableCacheEntry(firstRelationId);
|
DistTableCacheEntry *firstTableCache = CitusTableCacheEntry(firstRelationId);
|
||||||
DistTableCacheEntry *secondTableCache = DistributedTableCacheEntry(secondRelationId);
|
DistTableCacheEntry *secondTableCache = CitusTableCacheEntry(secondRelationId);
|
||||||
|
|
||||||
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
ShardInterval **sortedFirstIntervalArray = firstTableCache->sortedShardIntervalArray;
|
||||||
ShardInterval **sortedSecondIntervalArray =
|
ShardInterval **sortedSecondIntervalArray =
|
||||||
|
@ -3904,7 +3904,7 @@ bool
|
||||||
ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
ShardIntervalsOverlap(ShardInterval *firstInterval, ShardInterval *secondInterval)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *intervalRelation =
|
DistTableCacheEntry *intervalRelation =
|
||||||
DistributedTableCacheEntry(firstInterval->relationId);
|
CitusTableCacheEntry(firstInterval->relationId);
|
||||||
|
|
||||||
Assert(intervalRelation->partitionMethod != DISTRIBUTE_BY_NONE);
|
Assert(intervalRelation->partitionMethod != DISTRIBUTE_BY_NONE);
|
||||||
|
|
||||||
|
|
|
@ -458,10 +458,10 @@ AddShardIntervalRestrictionToSelect(Query *subqery, ShardInterval *shardInterval
|
||||||
RangeTblEntry *
|
RangeTblEntry *
|
||||||
ExtractSelectRangeTableEntry(Query *query)
|
ExtractSelectRangeTableEntry(Query *query)
|
||||||
{
|
{
|
||||||
Assert(InsertSelectIntoDistributedTable(query));
|
Assert(InsertSelectIntoCitusTable(query));
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since we already asserted InsertSelectIntoDistributedTable() it is safe to access
|
* Since we already asserted InsertSelectIntoCitusTable() it is safe to access
|
||||||
* both lists
|
* both lists
|
||||||
*/
|
*/
|
||||||
List *fromList = query->jointree->fromlist;
|
List *fromList = query->jointree->fromlist;
|
||||||
|
@ -562,7 +562,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
plannerRestrictionContext->fastPathRestrictionContext->fastPathRouterQuery;
|
||||||
|
|
||||||
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
|
Oid distributedTableId = ModifyQueryResultRelationId(queryTree);
|
||||||
if (!IsDistributedTable(distributedTableId))
|
if (!IsCitusTable(distributedTableId))
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"cannot plan modifications of local tables involving "
|
"cannot plan modifications of local tables involving "
|
||||||
|
@ -697,7 +697,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
{
|
{
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
StringInfo errorMessage = makeStringInfo();
|
StringInfo errorMessage = makeStringInfo();
|
||||||
char *relationName = get_rel_name(rangeTableEntry->relid);
|
char *relationName = get_rel_name(rangeTableEntry->relid);
|
||||||
|
@ -741,7 +741,7 @@ ModifyQuerySupported(Query *queryTree, Query *originalQuery, bool multiShardQuer
|
||||||
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
|
if (rangeTableEntry->rtekind == RTE_SUBQUERY)
|
||||||
{
|
{
|
||||||
StringInfo errorHint = makeStringInfo();
|
StringInfo errorHint = makeStringInfo();
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(
|
||||||
distributedTableId);
|
distributedTableId);
|
||||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||||
char *partitionColumnName = ColumnToColumnName(distributedTableId,
|
char *partitionColumnName = ColumnToColumnName(distributedTableId,
|
||||||
|
@ -978,7 +978,7 @@ ErrorIfOnConflictNotSupported(Query *queryTree)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(queryTree);
|
Oid distributedTableId = ExtractFirstCitusTableId(queryTree);
|
||||||
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
Var *partitionColumn = PartitionColumn(distributedTableId, rangeTableId);
|
||||||
|
|
||||||
List *onConflictSet = queryTree->onConflict->onConflictSet;
|
List *onConflictSet = queryTree->onConflict->onConflictSet;
|
||||||
|
@ -1414,7 +1414,7 @@ TargetEntryChangesValue(TargetEntry *targetEntry, Var *column, FromExpr *joinTre
|
||||||
static Job *
|
static Job *
|
||||||
RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
RouterInsertJob(Query *originalQuery, Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
||||||
List *taskList = NIL;
|
List *taskList = NIL;
|
||||||
bool requiresMasterEvaluation = false;
|
bool requiresMasterEvaluation = false;
|
||||||
bool deferredPruning = false;
|
bool deferredPruning = false;
|
||||||
|
@ -1570,8 +1570,8 @@ RouterInsertTaskList(Query *query, bool parametersInQueryResolved,
|
||||||
List *insertTaskList = NIL;
|
List *insertTaskList = NIL;
|
||||||
ListCell *modifyRouteCell = NULL;
|
ListCell *modifyRouteCell = NULL;
|
||||||
|
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
ErrorIfNoShardsExist(cacheEntry);
|
ErrorIfNoShardsExist(cacheEntry);
|
||||||
|
|
||||||
|
@ -1639,7 +1639,7 @@ CreateTask(TaskType taskType)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ExtractFirstDistributedTableId takes a given query, and finds the relationId
|
* ExtractFirstCitusTableId takes a given query, and finds the relationId
|
||||||
* for the first distributed table in that query. If the function cannot find a
|
* for the first distributed table in that query. If the function cannot find a
|
||||||
* distributed table, it returns InvalidOid.
|
* distributed table, it returns InvalidOid.
|
||||||
*
|
*
|
||||||
|
@ -1647,7 +1647,7 @@ CreateTask(TaskType taskType)
|
||||||
* should have the first distributed table in the top-level rtable.
|
* should have the first distributed table in the top-level rtable.
|
||||||
*/
|
*/
|
||||||
Oid
|
Oid
|
||||||
ExtractFirstDistributedTableId(Query *query)
|
ExtractFirstCitusTableId(Query *query)
|
||||||
{
|
{
|
||||||
List *rangeTableList = query->rtable;
|
List *rangeTableList = query->rtable;
|
||||||
ListCell *rangeTableCell = NULL;
|
ListCell *rangeTableCell = NULL;
|
||||||
|
@ -1657,7 +1657,7 @@ ExtractFirstDistributedTableId(Query *query)
|
||||||
{
|
{
|
||||||
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
RangeTblEntry *rangeTableEntry = (RangeTblEntry *) lfirst(rangeTableCell);
|
||||||
|
|
||||||
if (IsDistributedTable(rangeTableEntry->relid))
|
if (IsCitusTable(rangeTableEntry->relid))
|
||||||
{
|
{
|
||||||
distributedTableId = rangeTableEntry->relid;
|
distributedTableId = rangeTableEntry->relid;
|
||||||
break;
|
break;
|
||||||
|
@ -1944,7 +1944,7 @@ RowLocksOnRelations(Node *node, List **relationRowLockList)
|
||||||
RangeTblEntry *rangeTable = rt_fetch(rowMarkClause->rti, query->rtable);
|
RangeTblEntry *rangeTable = rt_fetch(rowMarkClause->rti, query->rtable);
|
||||||
Oid relationId = rangeTable->relid;
|
Oid relationId = rangeTable->relid;
|
||||||
|
|
||||||
if (IsDistributedTable(relationId))
|
if (IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
RelationRowLock *relationRowLock = CitusMakeNode(RelationRowLock);
|
RelationRowLock *relationRowLock = CitusMakeNode(RelationRowLock);
|
||||||
relationRowLock->relationId = relationId;
|
relationRowLock->relationId = relationId;
|
||||||
|
@ -1978,7 +1978,7 @@ SingleShardModifyTaskList(Query *query, uint64 jobId, List *relationShardList,
|
||||||
RangeTblEntry *updateOrDeleteRTE = GetUpdateOrDeleteRTE(query);
|
RangeTblEntry *updateOrDeleteRTE = GetUpdateOrDeleteRTE(query);
|
||||||
Assert(updateOrDeleteRTE != NULL);
|
Assert(updateOrDeleteRTE != NULL);
|
||||||
|
|
||||||
DistTableCacheEntry *modificationTableCacheEntry = DistributedTableCacheEntry(
|
DistTableCacheEntry *modificationTableCacheEntry = CitusTableCacheEntry(
|
||||||
updateOrDeleteRTE->relid);
|
updateOrDeleteRTE->relid);
|
||||||
char modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
|
char modificationPartitionMethod = modificationTableCacheEntry->partitionMethod;
|
||||||
|
|
||||||
|
@ -2043,7 +2043,7 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(
|
||||||
rangeTableEntry->relid);
|
rangeTableEntry->relid);
|
||||||
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
|
if (cacheEntry->partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||||
(resultRangeTableEntry == NULL || resultRangeTableEntry->relid !=
|
(resultRangeTableEntry == NULL || resultRangeTableEntry->relid !=
|
||||||
|
@ -2399,7 +2399,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
|
||||||
Const *inputDistributionKeyValue,
|
Const *inputDistributionKeyValue,
|
||||||
Const **outputPartitionValueConst)
|
Const **outputPartitionValueConst)
|
||||||
{
|
{
|
||||||
Oid relationId = ExtractFirstDistributedTableId(query);
|
Oid relationId = ExtractFirstCitusTableId(query);
|
||||||
|
|
||||||
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
@ -2409,7 +2409,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
|
||||||
|
|
||||||
if (inputDistributionKeyValue && !inputDistributionKeyValue->constisnull)
|
if (inputDistributionKeyValue && !inputDistributionKeyValue->constisnull)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cache = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cache = CitusTableCacheEntry(relationId);
|
||||||
ShardInterval *shardInterval =
|
ShardInterval *shardInterval =
|
||||||
FindShardInterval(inputDistributionKeyValue->constvalue, cache);
|
FindShardInterval(inputDistributionKeyValue->constvalue, cache);
|
||||||
if (shardInterval == NULL)
|
if (shardInterval == NULL)
|
||||||
|
@ -2495,7 +2495,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
|
||||||
(RelationRestriction *) lfirst(restrictionCell);
|
(RelationRestriction *) lfirst(restrictionCell);
|
||||||
Oid relationId = relationRestriction->relationId;
|
Oid relationId = relationRestriction->relationId;
|
||||||
Index tableId = relationRestriction->index;
|
Index tableId = relationRestriction->index;
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
List *baseRestrictionList = relationRestriction->relOptInfo->baserestrictinfo;
|
||||||
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
List *restrictClauseList = get_all_actual_clauses(baseRestrictionList);
|
||||||
|
@ -2659,8 +2659,8 @@ WorkersContainingAllShards(List *prunedShardIntervalsList)
|
||||||
static List *
|
static List *
|
||||||
BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
List *modifyRouteList = NIL;
|
List *modifyRouteList = NIL;
|
||||||
|
@ -2730,7 +2730,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
{
|
{
|
||||||
Datum partitionValue = partitionValueConst->constvalue;
|
Datum partitionValue = partitionValueConst->constvalue;
|
||||||
|
|
||||||
cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
ShardInterval *shardInterval = FindShardInterval(partitionValue, cacheEntry);
|
||||||
if (shardInterval != NULL)
|
if (shardInterval != NULL)
|
||||||
{
|
{
|
||||||
|
@ -3118,7 +3118,7 @@ ExtractInsertValuesList(Query *query, Var *partitionColumn)
|
||||||
Const *
|
Const *
|
||||||
ExtractInsertPartitionKeyValue(Query *query)
|
ExtractInsertPartitionKeyValue(Query *query)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = ExtractFirstDistributedTableId(query);
|
Oid distributedTableId = ExtractFirstCitusTableId(query);
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
Const *singlePartitionValueConst = NULL;
|
Const *singlePartitionValueConst = NULL;
|
||||||
|
|
||||||
|
@ -3235,7 +3235,7 @@ MultiRouterPlannableQuery(Query *query)
|
||||||
/* only hash partitioned tables are supported */
|
/* only hash partitioned tables are supported */
|
||||||
Oid distributedTableId = rte->relid;
|
Oid distributedTableId = rte->relid;
|
||||||
|
|
||||||
if (!IsDistributedTable(distributedTableId))
|
if (!IsCitusTable(distributedTableId))
|
||||||
{
|
{
|
||||||
/* local tables cannot be read from workers */
|
/* local tables cannot be read from workers */
|
||||||
return DeferredError(
|
return DeferredError(
|
||||||
|
|
|
@ -1431,7 +1431,7 @@ HasRecurringTuples(Node *node, RecurringTuplesType *recurType)
|
||||||
if (rangeTableEntry->rtekind == RTE_RELATION)
|
if (rangeTableEntry->rtekind == RTE_RELATION)
|
||||||
{
|
{
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
if (IsDistributedTable(relationId) &&
|
if (IsCitusTable(relationId) &&
|
||||||
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
*recurType = RECURRING_TUPLES_REFERENCE_TABLE;
|
*recurType = RECURRING_TUPLES_REFERENCE_TABLE;
|
||||||
|
|
|
@ -1069,7 +1069,7 @@ IsLocalTableRTE(Node *node)
|
||||||
}
|
}
|
||||||
|
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
if (IsDistributedTable(relationId))
|
if (IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1400,7 +1400,7 @@ AddRteRelationToAttributeEquivalenceClass(AttributeEquivalenceClass **
|
||||||
Oid relationId = rangeTableEntry->relid;
|
Oid relationId = rangeTableEntry->relid;
|
||||||
|
|
||||||
/* we don't consider local tables in the equality on columns */
|
/* we don't consider local tables in the equality on columns */
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -1721,7 +1721,7 @@ DistributedRelationIdList(Query *query)
|
||||||
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
TableEntry *tableEntry = (TableEntry *) lfirst(tableEntryCell);
|
||||||
Oid relationId = tableEntry->relationId;
|
Oid relationId = tableEntry->relationId;
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -294,7 +294,7 @@ List *
|
||||||
PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
|
PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
|
||||||
Const **partitionValueConst)
|
Const **partitionValueConst)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
int shardCount = cacheEntry->shardIntervalArrayLength;
|
int shardCount = cacheEntry->shardIntervalArrayLength;
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
ClauseWalkerContext context = { 0 };
|
ClauseWalkerContext context = { 0 };
|
||||||
|
|
|
@ -59,7 +59,7 @@ deparse_shard_query_test(PG_FUNCTION_ARGS)
|
||||||
StringInfo buffer = makeStringInfo();
|
StringInfo buffer = makeStringInfo();
|
||||||
|
|
||||||
/* reoreder the target list only for INSERT .. SELECT queries */
|
/* reoreder the target list only for INSERT .. SELECT queries */
|
||||||
if (InsertSelectIntoDistributedTable(query))
|
if (InsertSelectIntoCitusTable(query))
|
||||||
{
|
{
|
||||||
RangeTblEntry *insertRte = linitial(query->rtable);
|
RangeTblEntry *insertRte = linitial(query->rtable);
|
||||||
RangeTblEntry *subqueryRte = lsecond(query->rtable);
|
RangeTblEntry *subqueryRte = lsecond(query->rtable);
|
||||||
|
|
|
@ -64,7 +64,7 @@ partition_task_list_results(PG_FUNCTION_ARGS)
|
||||||
Job *job = distributedPlan->workerJob;
|
Job *job = distributedPlan->workerJob;
|
||||||
List *taskList = job->taskList;
|
List *taskList = job->taskList;
|
||||||
|
|
||||||
DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *targetRelation = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Here SELECT query's target list should match column list of target relation,
|
* Here SELECT query's target list should match column list of target relation,
|
||||||
|
@ -132,7 +132,7 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
|
||||||
Job *job = distributedPlan->workerJob;
|
Job *job = distributedPlan->workerJob;
|
||||||
List *taskList = job->taskList;
|
List *taskList = job->taskList;
|
||||||
|
|
||||||
DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *targetRelation = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Here SELECT query's target list should match column list of target relation,
|
* Here SELECT query's target list should match column list of target relation,
|
||||||
|
|
|
@ -198,9 +198,9 @@ Datum
|
||||||
is_distributed_table(PG_FUNCTION_ARGS)
|
is_distributed_table(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
Oid distributedTableId = PG_GETARG_OID(0);
|
Oid distributedTableId = PG_GETARG_OID(0);
|
||||||
bool isDistributedTable = IsDistributedTable(distributedTableId);
|
bool isCitusTable = IsCitusTable(distributedTableId);
|
||||||
|
|
||||||
PG_RETURN_BOOL(isDistributedTable);
|
PG_RETURN_BOOL(isCitusTable);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -38,7 +38,7 @@ get_referencing_relation_id_list(PG_FUNCTION_ARGS)
|
||||||
if (SRF_IS_FIRSTCALL())
|
if (SRF_IS_FIRSTCALL())
|
||||||
{
|
{
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *refList = cacheEntry->referencingRelationsViaForeignKey;
|
List *refList = cacheEntry->referencingRelationsViaForeignKey;
|
||||||
|
|
||||||
/* create a function context for cross-call persistence */
|
/* create a function context for cross-call persistence */
|
||||||
|
@ -89,7 +89,7 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS)
|
||||||
if (SRF_IS_FIRSTCALL())
|
if (SRF_IS_FIRSTCALL())
|
||||||
{
|
{
|
||||||
Oid relationId = PG_GETARG_OID(0);
|
Oid relationId = PG_GETARG_OID(0);
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *refList = cacheEntry->referencedRelationsViaForeignKey;
|
List *refList = cacheEntry->referencedRelationsViaForeignKey;
|
||||||
|
|
||||||
/* create a function context for cross-call persistence */
|
/* create a function context for cross-call persistence */
|
||||||
|
|
|
@ -241,7 +241,7 @@ SortedShardIntervalArray(Oid distributedTableId)
|
||||||
{
|
{
|
||||||
Oid shardIdTypeId = INT8OID;
|
Oid shardIdTypeId = INT8OID;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
|
||||||
int shardIdCount = cacheEntry->shardIntervalArrayLength;
|
int shardIdCount = cacheEntry->shardIntervalArrayLength;
|
||||||
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
Datum *shardIdDatumArray = palloc0(shardIdCount * sizeof(Datum));
|
||||||
|
|
|
@ -685,12 +685,12 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
|
||||||
Oid conflictingReferencingRelationId = InvalidOid;
|
Oid conflictingReferencingRelationId = InvalidOid;
|
||||||
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
|
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
|
||||||
|
|
||||||
if (!EnforceForeignKeyRestrictions || !IsDistributedTable(relationId))
|
if (!EnforceForeignKeyRestrictions || !IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
cacheEntry->referencingRelationsViaForeignKey != NIL))
|
cacheEntry->referencingRelationsViaForeignKey != NIL))
|
||||||
|
@ -806,12 +806,12 @@ CheckConflictingParallelRelationAccesses(Oid relationId, ShardPlacementAccessTyp
|
||||||
Oid conflictingReferencingRelationId = InvalidOid;
|
Oid conflictingReferencingRelationId = InvalidOid;
|
||||||
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
|
ShardPlacementAccessType conflictingAccessType = PLACEMENT_ACCESS_SELECT;
|
||||||
|
|
||||||
if (!EnforceForeignKeyRestrictions || !IsDistributedTable(relationId))
|
if (!EnforceForeignKeyRestrictions || !IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
if (!(cacheEntry->partitionMethod == DISTRIBUTE_BY_HASH &&
|
||||||
cacheEntry->referencedRelationsViaForeignKey != NIL))
|
cacheEntry->referencedRelationsViaForeignKey != NIL))
|
||||||
{
|
{
|
||||||
|
@ -882,7 +882,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
|
||||||
ShardPlacementAccessType *
|
ShardPlacementAccessType *
|
||||||
conflictingAccessMode)
|
conflictingAccessMode)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
Oid referencedRelation = InvalidOid;
|
Oid referencedRelation = InvalidOid;
|
||||||
foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey)
|
foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey)
|
||||||
|
@ -947,7 +947,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
|
||||||
ShardPlacementAccessType *
|
ShardPlacementAccessType *
|
||||||
conflictingAccessMode)
|
conflictingAccessMode)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
bool holdsConflictingLocks = false;
|
bool holdsConflictingLocks = false;
|
||||||
|
|
||||||
Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE);
|
Assert(PartitionMethod(relationId) == DISTRIBUTE_BY_NONE);
|
||||||
|
@ -959,7 +959,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
|
||||||
* We're only interested in foreign keys to reference tables from
|
* We're only interested in foreign keys to reference tables from
|
||||||
* hash distributed tables.
|
* hash distributed tables.
|
||||||
*/
|
*/
|
||||||
if (!IsDistributedTable(referencingRelation) ||
|
if (!IsCitusTable(referencingRelation) ||
|
||||||
PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH)
|
PartitionMethod(referencingRelation) != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -546,10 +546,10 @@ GetNextColocationId()
|
||||||
void
|
void
|
||||||
CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
CheckReplicationModel(Oid sourceRelationId, Oid targetRelationId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *sourceTableEntry = DistributedTableCacheEntry(sourceRelationId);
|
DistTableCacheEntry *sourceTableEntry = CitusTableCacheEntry(sourceRelationId);
|
||||||
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
char sourceReplicationModel = sourceTableEntry->replicationModel;
|
||||||
|
|
||||||
DistTableCacheEntry *targetTableEntry = DistributedTableCacheEntry(targetRelationId);
|
DistTableCacheEntry *targetTableEntry = CitusTableCacheEntry(targetRelationId);
|
||||||
char targetReplicationModel = targetTableEntry->replicationModel;
|
char targetReplicationModel = targetTableEntry->replicationModel;
|
||||||
|
|
||||||
if (sourceReplicationModel != targetReplicationModel)
|
if (sourceReplicationModel != targetReplicationModel)
|
||||||
|
@ -690,7 +690,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
|
||||||
uint32
|
uint32
|
||||||
TableColocationId(Oid distributedTableId)
|
TableColocationId(Oid distributedTableId)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
return cacheEntry->colocationId;
|
return cacheEntry->colocationId;
|
||||||
}
|
}
|
||||||
|
@ -836,7 +836,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
List *colocatedShardList = NIL;
|
List *colocatedShardList = NIL;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -864,7 +864,7 @@ ColocatedShardIntervalList(ShardInterval *shardInterval)
|
||||||
foreach_oid(colocatedTableId, colocatedTableList)
|
foreach_oid(colocatedTableId, colocatedTableList)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *colocatedTableCacheEntry =
|
DistTableCacheEntry *colocatedTableCacheEntry =
|
||||||
DistributedTableCacheEntry(colocatedTableId);
|
CitusTableCacheEntry(colocatedTableId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Since we iterate over co-located tables, shard count of each table should be
|
* Since we iterate over co-located tables, shard count of each table should be
|
||||||
|
@ -966,7 +966,7 @@ ColocatedTableId(Oid colocationId)
|
||||||
uint64
|
uint64
|
||||||
ColocatedShardIdInRelation(Oid relationId, int shardIndex)
|
ColocatedShardIdInRelation(Oid relationId, int shardIndex)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *tableCacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *tableCacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
|
return tableCacheEntry->sortedShardIntervalArray[shardIndex]->shardId;
|
||||||
}
|
}
|
||||||
|
|
|
@ -60,7 +60,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
EnsureTableOwner(relationId);
|
EnsureTableOwner(relationId);
|
||||||
|
|
||||||
if (!IsDistributedTable(relationId))
|
if (!IsCitusTable(relationId))
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||||
|
@ -70,7 +70,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
||||||
"create_reference_table('%s');", relationName)));
|
"create_reference_table('%s');", relationName)));
|
||||||
}
|
}
|
||||||
|
|
||||||
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *tableEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
@ -463,7 +463,7 @@ ReferenceTableOidList()
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
foreach_oid(relationId, distTableOidList)
|
foreach_oid(relationId, distTableOidList)
|
||||||
{
|
{
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
|
|
|
@ -360,13 +360,12 @@ static void
|
||||||
SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag)
|
SetLocktagForShardDistributionMetadata(int64 shardId, LOCKTAG *tag)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid citusTableId = shardInterval->relationId;
|
||||||
DistTableCacheEntry *distributedTable = DistributedTableCacheEntry(
|
DistTableCacheEntry *citusTable = CitusTableCacheEntry(citusTableId);
|
||||||
distributedTableId);
|
uint32 colocationId = citusTable->colocationId;
|
||||||
uint32 colocationId = distributedTable->colocationId;
|
|
||||||
|
|
||||||
if (colocationId == INVALID_COLOCATION_ID ||
|
if (colocationId == INVALID_COLOCATION_ID ||
|
||||||
distributedTable->partitionMethod != DISTRIBUTE_BY_HASH)
|
citusTable->partitionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId);
|
SET_LOCKTAG_SHARD_METADATA_RESOURCE(*tag, MyDatabaseId, shardId);
|
||||||
}
|
}
|
||||||
|
@ -391,7 +390,7 @@ LockReferencedReferenceShardDistributionMetadata(uint64 shardId, LOCKMODE lockMo
|
||||||
{
|
{
|
||||||
Oid relationId = RelationIdForShard(shardId);
|
Oid relationId = RelationIdForShard(shardId);
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
|
List *referencedRelationList = cacheEntry->referencedRelationsViaForeignKey;
|
||||||
List *shardIntervalList = GetSortedReferenceShardIntervals(referencedRelationList);
|
List *shardIntervalList = GetSortedReferenceShardIntervals(referencedRelationList);
|
||||||
|
|
||||||
|
@ -421,7 +420,7 @@ LockReferencedReferenceShardResources(uint64 shardId, LOCKMODE lockMode)
|
||||||
{
|
{
|
||||||
Oid relationId = RelationIdForShard(shardId);
|
Oid relationId = RelationIdForShard(shardId);
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Note that referencedRelationsViaForeignKey contains transitively referenced
|
* Note that referencedRelationsViaForeignKey contains transitively referenced
|
||||||
|
|
|
@ -239,7 +239,7 @@ ShardIndex(ShardInterval *shardInterval)
|
||||||
Oid distributedTableId = shardInterval->relationId;
|
Oid distributedTableId = shardInterval->relationId;
|
||||||
Datum shardMinValue = shardInterval->minValue;
|
Datum shardMinValue = shardInterval->minValue;
|
||||||
|
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
|
DistTableCacheEntry *cacheEntry = CitusTableCacheEntry(distributedTableId);
|
||||||
char partitionMethod = cacheEntry->partitionMethod;
|
char partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
#include "nodes/plannodes.h"
|
#include "nodes/plannodes.h"
|
||||||
|
|
||||||
|
|
||||||
extern bool InsertSelectIntoDistributedTable(Query *query);
|
extern bool InsertSelectIntoCitusTable(Query *query);
|
||||||
extern bool CheckInsertSelectQuery(Query *query);
|
extern bool CheckInsertSelectQuery(Query *query);
|
||||||
extern bool InsertSelectIntoLocalTable(Query *query);
|
extern bool InsertSelectIntoLocalTable(Query *query);
|
||||||
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
extern Query * ReorderInsertSelectTargetLists(Query *originalQuery,
|
||||||
|
|
|
@ -52,7 +52,7 @@ typedef struct
|
||||||
*/
|
*/
|
||||||
bool isValid;
|
bool isValid;
|
||||||
|
|
||||||
bool isDistributedTable;
|
bool isCitusTable;
|
||||||
bool hasUninitializedShardInterval;
|
bool hasUninitializedShardInterval;
|
||||||
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
|
bool hasUniformHashDistribution; /* valid for hash partitioned tables */
|
||||||
bool hasOverlappingShardInterval;
|
bool hasOverlappingShardInterval;
|
||||||
|
@ -117,15 +117,15 @@ typedef struct DistObjectCacheEntry
|
||||||
} DistObjectCacheEntry;
|
} DistObjectCacheEntry;
|
||||||
|
|
||||||
|
|
||||||
extern bool IsDistributedTable(Oid relationId);
|
extern bool IsCitusTable(Oid relationId);
|
||||||
extern List * DistributedTableList(void);
|
extern List * CitusTableList(void);
|
||||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern Oid RelationIdForShard(uint64 shardId);
|
extern Oid RelationIdForShard(uint64 shardId);
|
||||||
extern bool ReferenceTableShardId(uint64 shardId);
|
extern bool ReferenceTableShardId(uint64 shardId);
|
||||||
extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
|
extern ShardPlacement * FindShardPlacementOnGroup(int32 groupId, uint64 shardId);
|
||||||
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
|
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
|
||||||
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
||||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
extern DistTableCacheEntry * CitusTableCacheEntry(Oid distributedRelationId);
|
||||||
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
|
extern DistObjectCacheEntry * LookupDistObjectCacheEntry(Oid classid, Oid objid, int32
|
||||||
objsubid);
|
objsubid);
|
||||||
extern int32 GetLocalGroupId(void);
|
extern int32 GetLocalGroupId(void);
|
||||||
|
|
|
@ -65,7 +65,7 @@ extern List * ShardIntervalOpExpressions(ShardInterval *shardInterval, Index rte
|
||||||
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
extern RelationRestrictionContext * CopyRelationRestrictionContext(
|
||||||
RelationRestrictionContext *oldContext);
|
RelationRestrictionContext *oldContext);
|
||||||
|
|
||||||
extern Oid ExtractFirstDistributedTableId(Query *query);
|
extern Oid ExtractFirstCitusTableId(Query *query);
|
||||||
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
extern RangeTblEntry * ExtractSelectRangeTableEntry(Query *query);
|
||||||
extern Oid ModifyQueryResultRelationId(Query *query);
|
extern Oid ModifyQueryResultRelationId(Query *query);
|
||||||
extern RangeTblEntry * ExtractResultRelationRTE(Query *query);
|
extern RangeTblEntry * ExtractResultRelationRTE(Query *query);
|
||||||
|
|
Loading…
Reference in New Issue