mirror of https://github.com/citusdata/citus.git
Merge remote-tracking branch 'upstream/main' into issue/6694
commit
9f2ff9ed1f
|
@ -221,7 +221,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
if (!referencedIsCitus && !selfReferencingTable)
|
if (!referencedIsCitus && !selfReferencingTable)
|
||||||
{
|
{
|
||||||
if (IsCitusLocalTableByDistParams(referencingDistMethod,
|
if (IsCitusLocalTableByDistParams(referencingDistMethod,
|
||||||
referencingReplicationModel))
|
referencingReplicationModel,
|
||||||
|
referencingColocationId))
|
||||||
{
|
{
|
||||||
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(referencedTableId);
|
ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(referencedTableId);
|
||||||
}
|
}
|
||||||
|
@ -245,8 +246,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
if (!selfReferencingTable)
|
if (!selfReferencingTable)
|
||||||
{
|
{
|
||||||
referencedDistMethod = PartitionMethod(referencedTableId);
|
referencedDistMethod = PartitionMethod(referencedTableId);
|
||||||
referencedDistKey = IsCitusTableType(referencedTableId,
|
referencedDistKey = !HasDistributionKey(referencedTableId) ?
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY) ?
|
|
||||||
NULL :
|
NULL :
|
||||||
DistPartitionKey(referencedTableId);
|
DistPartitionKey(referencedTableId);
|
||||||
referencedColocationId = TableColocationId(referencedTableId);
|
referencedColocationId = TableColocationId(referencedTableId);
|
||||||
|
@ -278,9 +278,17 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
}
|
}
|
||||||
|
|
||||||
bool referencingIsCitusLocalOrRefTable =
|
bool referencingIsCitusLocalOrRefTable =
|
||||||
(referencingDistMethod == DISTRIBUTE_BY_NONE);
|
IsCitusLocalTableByDistParams(referencingDistMethod,
|
||||||
|
referencingReplicationModel,
|
||||||
|
referencingColocationId) ||
|
||||||
|
IsReferenceTableByDistParams(referencingDistMethod,
|
||||||
|
referencingReplicationModel);
|
||||||
bool referencedIsCitusLocalOrRefTable =
|
bool referencedIsCitusLocalOrRefTable =
|
||||||
(referencedDistMethod == DISTRIBUTE_BY_NONE);
|
IsCitusLocalTableByDistParams(referencedDistMethod,
|
||||||
|
referencedReplicationModel,
|
||||||
|
referencedColocationId) ||
|
||||||
|
IsReferenceTableByDistParams(referencedDistMethod,
|
||||||
|
referencedReplicationModel);
|
||||||
if (referencingIsCitusLocalOrRefTable && referencedIsCitusLocalOrRefTable)
|
if (referencingIsCitusLocalOrRefTable && referencedIsCitusLocalOrRefTable)
|
||||||
{
|
{
|
||||||
EnsureSupportedFKeyBetweenCitusLocalAndRefTable(constraintForm,
|
EnsureSupportedFKeyBetweenCitusLocalAndRefTable(constraintForm,
|
||||||
|
@ -313,7 +321,8 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
||||||
* reference table is referenced.
|
* reference table is referenced.
|
||||||
*/
|
*/
|
||||||
bool referencedIsReferenceTable =
|
bool referencedIsReferenceTable =
|
||||||
(referencedReplicationModel == REPLICATION_MODEL_2PC);
|
IsReferenceTableByDistParams(referencedDistMethod,
|
||||||
|
referencedReplicationModel);
|
||||||
if (!referencedIsReferenceTable && (
|
if (!referencedIsReferenceTable && (
|
||||||
referencingColocationId == INVALID_COLOCATION_ID ||
|
referencingColocationId == INVALID_COLOCATION_ID ||
|
||||||
referencingColocationId != referencedColocationId))
|
referencingColocationId != referencedColocationId))
|
||||||
|
|
|
@ -1190,7 +1190,7 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
|
||||||
* Non-distributed tables do not have partition key, and unique constraints
|
* Non-distributed tables do not have partition key, and unique constraints
|
||||||
* are allowed for them. Thus, we added a short-circuit for non-distributed tables.
|
* are allowed for them. Thus, we added a short-circuit for non-distributed tables.
|
||||||
*/
|
*/
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -393,7 +393,7 @@ CitusCopyFrom(CopyStmt *copyStatement, QueryCompletion *completionTag)
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
|
if (IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) ||
|
||||||
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
|
IsCitusTableTypeCacheEntry(cacheEntry, RANGE_DISTRIBUTED) ||
|
||||||
IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
|
IsCitusTableTypeCacheEntry(cacheEntry, APPEND_DISTRIBUTED) ||
|
||||||
IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
CopyToExistingShards(copyStatement, completionTag);
|
CopyToExistingShards(copyStatement, completionTag);
|
||||||
}
|
}
|
||||||
|
|
|
@ -75,7 +75,7 @@ static void DistributePartitionUsingParent(Oid parentRelationId,
|
||||||
static void ErrorIfMultiLevelPartitioning(Oid parentRelationId, Oid partitionRelationId);
|
static void ErrorIfMultiLevelPartitioning(Oid parentRelationId, Oid partitionRelationId);
|
||||||
static void ErrorIfAttachCitusTableToPgLocalTable(Oid parentRelationId,
|
static void ErrorIfAttachCitusTableToPgLocalTable(Oid parentRelationId,
|
||||||
Oid partitionRelationId);
|
Oid partitionRelationId);
|
||||||
static bool AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(
|
static bool ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(
|
||||||
AlterTableStmt *alterTableStatement);
|
AlterTableStmt *alterTableStatement);
|
||||||
static bool ShouldMarkConnectedRelationsNotAutoConverted(Oid leftRelationId,
|
static bool ShouldMarkConnectedRelationsNotAutoConverted(Oid leftRelationId,
|
||||||
Oid rightRelationId);
|
Oid rightRelationId);
|
||||||
|
@ -1119,7 +1119,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
||||||
|
|
||||||
if (ShouldEnableLocalReferenceForeignKeys() &&
|
if (ShouldEnableLocalReferenceForeignKeys() &&
|
||||||
processUtilityContext != PROCESS_UTILITY_SUBCOMMAND &&
|
processUtilityContext != PROCESS_UTILITY_SUBCOMMAND &&
|
||||||
AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(alterTableStatement))
|
ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(alterTableStatement))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We don't process subcommands generated by postgres.
|
* We don't process subcommands generated by postgres.
|
||||||
|
@ -1561,12 +1561,12 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* AlterTableDefinesFKeyBetweenPostgresAndNonDistTable returns true if given
|
* ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef returns true if given
|
||||||
* alter table command defines foreign key between a postgres table and a
|
* alter table command defines foreign key between a postgres table and a
|
||||||
* reference or citus local table.
|
* reference or citus local table.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(AlterTableStmt *alterTableStatement)
|
ATDefinesFKeyBetweenPostgresAndCitusLocalOrRef(AlterTableStmt *alterTableStatement)
|
||||||
{
|
{
|
||||||
List *foreignKeyConstraintList =
|
List *foreignKeyConstraintList =
|
||||||
GetAlterTableAddFKeyConstraintList(alterTableStatement);
|
GetAlterTableAddFKeyConstraintList(alterTableStatement);
|
||||||
|
@ -1584,9 +1584,12 @@ AlterTableDefinesFKeyBetweenPostgresAndNonDistTable(AlterTableStmt *alterTableSt
|
||||||
if (!IsCitusTable(leftRelationId))
|
if (!IsCitusTable(leftRelationId))
|
||||||
{
|
{
|
||||||
return RelationIdListContainsCitusTableType(rightRelationIdList,
|
return RelationIdListContainsCitusTableType(rightRelationIdList,
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY);
|
CITUS_LOCAL_TABLE) ||
|
||||||
|
RelationIdListContainsCitusTableType(rightRelationIdList,
|
||||||
|
REFERENCE_TABLE);
|
||||||
}
|
}
|
||||||
else if (IsCitusTableType(leftRelationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
else if (IsCitusTableType(leftRelationId, CITUS_LOCAL_TABLE) ||
|
||||||
|
IsCitusTableType(leftRelationId, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
return RelationIdListContainsPostgresTable(rightRelationIdList);
|
return RelationIdListContainsPostgresTable(rightRelationIdList);
|
||||||
}
|
}
|
||||||
|
@ -3626,7 +3629,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
|
||||||
* sequential mode.
|
* sequential mode.
|
||||||
*/
|
*/
|
||||||
if (executeSequentially &&
|
if (executeSequentially &&
|
||||||
!IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY) &&
|
HasDistributionKey(relationId) &&
|
||||||
ParallelQueryExecutedInTransaction())
|
ParallelQueryExecutedInTransaction())
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
|
@ -324,7 +324,7 @@ ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY) &&
|
if (IsCitusTable(relationId) && !HasDistributionKey(relationId) &&
|
||||||
TableReferenced(relationId))
|
TableReferenced(relationId))
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
|
|
|
@ -311,7 +311,7 @@ static void InvalidateDistTableCache(void);
|
||||||
static void InvalidateDistObjectCache(void);
|
static void InvalidateDistObjectCache(void);
|
||||||
static bool InitializeTableCacheEntry(int64 shardId, bool missingOk);
|
static bool InitializeTableCacheEntry(int64 shardId, bool missingOk);
|
||||||
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
static bool IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
CitusTableType tableType);
|
uint32 colocationId, CitusTableType tableType);
|
||||||
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry, bool
|
static bool RefreshTableCacheEntryIfInvalid(ShardIdCacheEntry *shardEntry, bool
|
||||||
missingOk);
|
missingOk);
|
||||||
|
|
||||||
|
@ -450,7 +450,36 @@ bool
|
||||||
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tableType)
|
||||||
{
|
{
|
||||||
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
|
return IsCitusTableTypeInternal(tableEntry->partitionMethod,
|
||||||
tableEntry->replicationModel, tableType);
|
tableEntry->replicationModel,
|
||||||
|
tableEntry->colocationId, tableType);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasDistributionKey returs true if given Citus table doesn't have a
|
||||||
|
* distribution key.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
HasDistributionKey(Oid relationId)
|
||||||
|
{
|
||||||
|
CitusTableCacheEntry *tableEntry = LookupCitusTableCacheEntry(relationId);
|
||||||
|
if (tableEntry == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("relation with oid %u is not a Citus table", relationId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
return HasDistributionKeyCacheEntry(tableEntry);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* HasDistributionKey returs true if given cache entry identifies a Citus
|
||||||
|
* table that doesn't have a distribution key.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry)
|
||||||
|
{
|
||||||
|
return tableEntry->partitionMethod != DISTRIBUTE_BY_NONE;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -460,7 +489,7 @@ IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEntry, CitusTableType tabl
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
CitusTableType tableType)
|
uint32 colocationId, CitusTableType tableType)
|
||||||
{
|
{
|
||||||
switch (tableType)
|
switch (tableType)
|
||||||
{
|
{
|
||||||
|
@ -501,12 +530,8 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
case CITUS_LOCAL_TABLE:
|
case CITUS_LOCAL_TABLE:
|
||||||
{
|
{
|
||||||
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
replicationModel != REPLICATION_MODEL_2PC;
|
replicationModel != REPLICATION_MODEL_2PC &&
|
||||||
}
|
colocationId == INVALID_COLOCATION_ID;
|
||||||
|
|
||||||
case CITUS_TABLE_WITH_NO_DIST_KEY:
|
|
||||||
{
|
|
||||||
return partitionMethod == DISTRIBUTE_BY_NONE;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
case ANY_CITUS_TABLE_TYPE:
|
case ANY_CITUS_TABLE_TYPE:
|
||||||
|
@ -529,33 +554,21 @@ IsCitusTableTypeInternal(char partitionMethod, char replicationModel,
|
||||||
char *
|
char *
|
||||||
GetTableTypeName(Oid tableId)
|
GetTableTypeName(Oid tableId)
|
||||||
{
|
{
|
||||||
bool regularTable = false;
|
if (!IsCitusTable(tableId))
|
||||||
char partitionMethod = ' ';
|
|
||||||
char replicationModel = ' ';
|
|
||||||
if (IsCitusTable(tableId))
|
|
||||||
{
|
|
||||||
CitusTableCacheEntry *referencingCacheEntry = GetCitusTableCacheEntry(tableId);
|
|
||||||
partitionMethod = referencingCacheEntry->partitionMethod;
|
|
||||||
replicationModel = referencingCacheEntry->replicationModel;
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
regularTable = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (regularTable)
|
|
||||||
{
|
{
|
||||||
return "regular table";
|
return "regular table";
|
||||||
}
|
}
|
||||||
else if (partitionMethod == 'h')
|
|
||||||
|
CitusTableCacheEntry *tableCacheEntry = GetCitusTableCacheEntry(tableId);
|
||||||
|
if (IsCitusTableTypeCacheEntry(tableCacheEntry, HASH_DISTRIBUTED))
|
||||||
{
|
{
|
||||||
return "distributed table";
|
return "distributed table";
|
||||||
}
|
}
|
||||||
else if (partitionMethod == 'n' && replicationModel == 't')
|
else if (IsCitusTableTypeCacheEntry(tableCacheEntry, REFERENCE_TABLE))
|
||||||
{
|
{
|
||||||
return "reference table";
|
return "reference table";
|
||||||
}
|
}
|
||||||
else if (partitionMethod == 'n' && replicationModel != 't')
|
else if (IsCitusTableTypeCacheEntry(tableCacheEntry, CITUS_LOCAL_TABLE))
|
||||||
{
|
{
|
||||||
return "citus local table";
|
return "citus local table";
|
||||||
}
|
}
|
||||||
|
@ -765,14 +778,28 @@ PgDistPartitionTupleViaCatalog(Oid relationId)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsCitusLocalTableByDistParams returns true if given partitionMethod and
|
* IsReferenceTableByDistParams returns true if given partitionMethod and
|
||||||
* replicationModel would identify a citus local table.
|
* replicationModel would identify a reference table.
|
||||||
*/
|
*/
|
||||||
bool
|
bool
|
||||||
IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel)
|
IsReferenceTableByDistParams(char partitionMethod, char replicationModel)
|
||||||
{
|
{
|
||||||
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
replicationModel != REPLICATION_MODEL_2PC;
|
replicationModel == REPLICATION_MODEL_2PC;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCitusLocalTableByDistParams returns true if given partitionMethod,
|
||||||
|
* replicationModel and colocationId would identify a citus local table.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
|
||||||
|
uint32 colocationId)
|
||||||
|
{
|
||||||
|
return partitionMethod == DISTRIBUTE_BY_NONE &&
|
||||||
|
replicationModel != REPLICATION_MODEL_2PC &&
|
||||||
|
colocationId == INVALID_COLOCATION_ID;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -4837,11 +4864,14 @@ CitusTableTypeIdList(CitusTableType citusTableType)
|
||||||
|
|
||||||
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
Datum partMethodDatum = datumArray[Anum_pg_dist_partition_partmethod - 1];
|
||||||
Datum replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1];
|
Datum replicationModelDatum = datumArray[Anum_pg_dist_partition_repmodel - 1];
|
||||||
|
Datum colocationIdDatum = datumArray[Anum_pg_dist_partition_colocationid - 1];
|
||||||
|
|
||||||
Oid partitionMethod = DatumGetChar(partMethodDatum);
|
Oid partitionMethod = DatumGetChar(partMethodDatum);
|
||||||
Oid replicationModel = DatumGetChar(replicationModelDatum);
|
Oid replicationModel = DatumGetChar(replicationModelDatum);
|
||||||
|
uint32 colocationId = DatumGetUInt32(colocationIdDatum);
|
||||||
|
|
||||||
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, citusTableType))
|
if (IsCitusTableTypeInternal(partitionMethod, replicationModel, colocationId,
|
||||||
|
citusTableType))
|
||||||
{
|
{
|
||||||
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
||||||
|
|
||||||
|
|
|
@ -535,7 +535,7 @@ ShouldSyncTableMetadata(Oid relationId)
|
||||||
|
|
||||||
bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED);
|
bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED);
|
||||||
bool citusTableWithNoDistKey =
|
bool citusTableWithNoDistKey =
|
||||||
IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY);
|
!HasDistributionKeyCacheEntry(tableEntry);
|
||||||
|
|
||||||
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
|
return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey);
|
||||||
}
|
}
|
||||||
|
@ -1158,7 +1158,7 @@ DistributionCreateCommand(CitusTableCacheEntry *cacheEntry)
|
||||||
char replicationModel = cacheEntry->replicationModel;
|
char replicationModel = cacheEntry->replicationModel;
|
||||||
StringInfo tablePartitionKeyNameString = makeStringInfo();
|
StringInfo tablePartitionKeyNameString = makeStringInfo();
|
||||||
|
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
appendStringInfo(tablePartitionKeyNameString, "NULL");
|
appendStringInfo(tablePartitionKeyNameString, "NULL");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1536,7 +1536,7 @@ get_shard_id_for_distribution_column(PG_FUNCTION_ARGS)
|
||||||
errmsg("relation is not distributed")));
|
errmsg("relation is not distributed")));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||||
if (shardIntervalList == NIL)
|
if (shardIntervalList == NIL)
|
||||||
|
|
|
@ -1383,7 +1383,7 @@ DistPartitionKey(Oid relationId)
|
||||||
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *partitionEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
/* non-distributed tables do not have partition column */
|
/* non-distributed tables do not have partition column */
|
||||||
if (IsCitusTableTypeCacheEntry(partitionEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(partitionEntry))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
|
@ -228,7 +228,7 @@ TargetListOnPartitionColumn(Query *query, List *targetEntryList)
|
||||||
* If the expression belongs to a non-distributed table continue searching for
|
* If the expression belongs to a non-distributed table continue searching for
|
||||||
* other partition keys.
|
* other partition keys.
|
||||||
*/
|
*/
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2199,7 +2199,7 @@ QueryPushdownSqlTaskList(Query *query, uint64 jobId,
|
||||||
Oid relationId = relationRestriction->relationId;
|
Oid relationId = relationRestriction->relationId;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -2377,7 +2377,7 @@ ErrorIfUnsupportedShardDistribution(Query *query)
|
||||||
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
nonReferenceRelations = lappend_oid(nonReferenceRelations,
|
||||||
relationId);
|
relationId);
|
||||||
}
|
}
|
||||||
else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
else if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
/* do not need to handle non-distributed tables */
|
/* do not need to handle non-distributed tables */
|
||||||
continue;
|
continue;
|
||||||
|
@ -2482,7 +2482,7 @@ QueryPushdownTaskCreate(Query *originalQuery, int shardIndex,
|
||||||
ShardInterval *shardInterval = NULL;
|
ShardInterval *shardInterval = NULL;
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
/* non-distributed tables have only one shard */
|
/* non-distributed tables have only one shard */
|
||||||
shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
shardInterval = cacheEntry->sortedShardIntervalArray[0];
|
||||||
|
@ -3697,7 +3697,7 @@ PartitionedOnColumn(Var *column, List *rangeTableList, List *dependentJobList)
|
||||||
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);
|
Var *partitionColumn = PartitionColumn(relationId, rangeTableId);
|
||||||
|
|
||||||
/* non-distributed tables do not have partition columns */
|
/* non-distributed tables do not have partition columns */
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -2675,7 +2675,7 @@ TargetShardIntervalForFastPathQuery(Query *query, bool *isMultiShardQuery,
|
||||||
{
|
{
|
||||||
Oid relationId = ExtractFirstCitusTableId(query);
|
Oid relationId = ExtractFirstCitusTableId(query);
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
/* we don't need to do shard pruning for non-distributed tables */
|
/* we don't need to do shard pruning for non-distributed tables */
|
||||||
return list_make1(LoadShardIntervalList(relationId));
|
return list_make1(LoadShardIntervalList(relationId));
|
||||||
|
@ -2968,7 +2968,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
|
||||||
Assert(query->commandType == CMD_INSERT);
|
Assert(query->commandType == CMD_INSERT);
|
||||||
|
|
||||||
/* reference tables and citus local tables can only have one shard */
|
/* reference tables and citus local tables can only have one shard */
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
List *shardIntervalList = LoadShardIntervalList(distributedTableId);
|
||||||
|
|
||||||
|
@ -3509,7 +3509,7 @@ ExtractInsertPartitionKeyValue(Query *query)
|
||||||
uint32 rangeTableId = 1;
|
uint32 rangeTableId = 1;
|
||||||
Const *singlePartitionValueConst = NULL;
|
Const *singlePartitionValueConst = NULL;
|
||||||
|
|
||||||
if (IsCitusTableType(distributedTableId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKey(distributedTableId))
|
||||||
{
|
{
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -3829,8 +3829,7 @@ ErrorIfQueryHasUnroutableModifyingCTE(Query *queryTree)
|
||||||
CitusTableCacheEntry *modificationTableCacheEntry =
|
CitusTableCacheEntry *modificationTableCacheEntry =
|
||||||
GetCitusTableCacheEntry(distributedTableId);
|
GetCitusTableCacheEntry(distributedTableId);
|
||||||
|
|
||||||
if (IsCitusTableTypeCacheEntry(modificationTableCacheEntry,
|
if (!HasDistributionKeyCacheEntry(modificationTableCacheEntry))
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY))
|
|
||||||
{
|
{
|
||||||
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
|
||||||
"cannot router plan modification of a non-distributed table",
|
"cannot router plan modification of a non-distributed table",
|
||||||
|
|
|
@ -168,7 +168,7 @@ AnchorRte(Query *subquery)
|
||||||
{
|
{
|
||||||
Oid relationId = currentRte->relid;
|
Oid relationId = currentRte->relid;
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Non-distributed tables should not be the anchor rte since they
|
* Non-distributed tables should not be the anchor rte since they
|
||||||
|
|
|
@ -703,8 +703,8 @@ EquivalenceListContainsRelationsEquality(List *attributeEquivalenceList,
|
||||||
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
int rteIdentity = GetRTEIdentity(relationRestriction->rte);
|
||||||
|
|
||||||
/* we shouldn't check for the equality of non-distributed tables */
|
/* we shouldn't check for the equality of non-distributed tables */
|
||||||
if (IsCitusTableType(relationRestriction->relationId,
|
if (IsCitusTable(relationRestriction->relationId) &&
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY))
|
!HasDistributionKey(relationRestriction->relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1933,7 +1933,7 @@ AllRelationsInRestrictionContextColocated(RelationRestrictionContext *restrictio
|
||||||
{
|
{
|
||||||
Oid relationId = relationRestriction->relationId;
|
Oid relationId = relationRestriction->relationId;
|
||||||
|
|
||||||
if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(relationId) && !HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
|
@ -333,7 +333,7 @@ PruneShards(Oid relationId, Index rangeTableId, List *whereClauseList,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* short circuit for non-distributed tables such as reference table */
|
/* short circuit for non-distributed tables such as reference table */
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray,
|
prunedList = ShardArrayToList(cacheEntry->sortedShardIntervalArray,
|
||||||
cacheEntry->shardIntervalArrayLength);
|
cacheEntry->shardIntervalArrayLength);
|
||||||
|
|
|
@ -195,7 +195,7 @@ RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType acce
|
||||||
* recursively calling RecordRelationAccessBase(), so becareful about
|
* recursively calling RecordRelationAccessBase(), so becareful about
|
||||||
* removing this check.
|
* removing this check.
|
||||||
*/
|
*/
|
||||||
if (!IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(relationId) && HasDistributionKey(relationId))
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -732,8 +732,8 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
|
||||||
|
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
|
|
||||||
if (!(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY) &&
|
if (HasDistributionKeyCacheEntry(cacheEntry) ||
|
||||||
cacheEntry->referencingRelationsViaForeignKey != NIL))
|
cacheEntry->referencingRelationsViaForeignKey == NIL)
|
||||||
{
|
{
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -931,7 +931,7 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
|
||||||
* We're only interested in foreign keys to reference tables and citus
|
* We're only interested in foreign keys to reference tables and citus
|
||||||
* local tables.
|
* local tables.
|
||||||
*/
|
*/
|
||||||
if (!IsCitusTableType(referencedRelation, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (IsCitusTable(referencedRelation) && HasDistributionKey(referencedRelation))
|
||||||
{
|
{
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -993,7 +993,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
|
||||||
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
|
||||||
bool holdsConflictingLocks = false;
|
bool holdsConflictingLocks = false;
|
||||||
|
|
||||||
Assert(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY));
|
Assert(!HasDistributionKeyCacheEntry(cacheEntry));
|
||||||
|
|
||||||
Oid referencingRelation = InvalidOid;
|
Oid referencingRelation = InvalidOid;
|
||||||
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)
|
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)
|
||||||
|
|
|
@ -442,8 +442,7 @@ ShardsIntervalsEqual(ShardInterval *leftShardInterval, ShardInterval *rightShard
|
||||||
{
|
{
|
||||||
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
|
return HashPartitionedShardIntervalsEqual(leftShardInterval, rightShardInterval);
|
||||||
}
|
}
|
||||||
else if (IsCitusTableType(leftShardInterval->relationId,
|
else if (!HasDistributionKey(leftShardInterval->relationId))
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY))
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Reference tables has only a single shard and all reference tables
|
* Reference tables has only a single shard and all reference tables
|
||||||
|
|
|
@ -223,8 +223,7 @@ ShardIndex(ShardInterval *shardInterval)
|
||||||
* currently it is not required.
|
* currently it is not required.
|
||||||
*/
|
*/
|
||||||
if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
|
||||||
!IsCitusTableTypeCacheEntry(
|
HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("finding index of a given shard is only supported for "
|
errmsg("finding index of a given shard is only supported for "
|
||||||
|
@ -233,7 +232,7 @@ ShardIndex(ShardInterval *shardInterval)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* short-circuit for reference tables */
|
/* short-circuit for reference tables */
|
||||||
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* Reference tables and citus local tables have only a single shard,
|
* Reference tables and citus local tables have only a single shard,
|
||||||
|
@ -333,7 +332,7 @@ FindShardIntervalIndex(Datum searchedValue, CitusTableCacheEntry *cacheEntry)
|
||||||
shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount);
|
shardIndex = CalculateUniformHashRangeIndex(hashedValue, shardCount);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
|
else if (!HasDistributionKeyCacheEntry(cacheEntry))
|
||||||
{
|
{
|
||||||
/* non-distributed tables have a single shard, all values mapped to that shard */
|
/* non-distributed tables have a single shard, all values mapped to that shard */
|
||||||
Assert(shardCount == 1);
|
Assert(shardCount == 1);
|
||||||
|
|
|
@ -133,9 +133,6 @@ typedef enum
|
||||||
REFERENCE_TABLE,
|
REFERENCE_TABLE,
|
||||||
CITUS_LOCAL_TABLE,
|
CITUS_LOCAL_TABLE,
|
||||||
|
|
||||||
/* table without a dist key such as reference table */
|
|
||||||
CITUS_TABLE_WITH_NO_DIST_KEY,
|
|
||||||
|
|
||||||
ANY_CITUS_TABLE_TYPE
|
ANY_CITUS_TABLE_TYPE
|
||||||
} CitusTableType;
|
} CitusTableType;
|
||||||
|
|
||||||
|
@ -143,6 +140,8 @@ extern List * AllCitusTableIds(void);
|
||||||
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
|
extern bool IsCitusTableType(Oid relationId, CitusTableType tableType);
|
||||||
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
|
extern bool IsCitusTableTypeCacheEntry(CitusTableCacheEntry *tableEtnry,
|
||||||
CitusTableType tableType);
|
CitusTableType tableType);
|
||||||
|
bool HasDistributionKey(Oid relationId);
|
||||||
|
bool HasDistributionKeyCacheEntry(CitusTableCacheEntry *tableEntry);
|
||||||
extern char * GetTableTypeName(Oid tableId);
|
extern char * GetTableTypeName(Oid tableId);
|
||||||
|
|
||||||
extern void SetCreateCitusTransactionLevel(int val);
|
extern void SetCreateCitusTransactionLevel(int val);
|
||||||
|
@ -154,7 +153,9 @@ extern List * LookupDistShardTuples(Oid relationId);
|
||||||
extern char PartitionMethodViaCatalog(Oid relationId);
|
extern char PartitionMethodViaCatalog(Oid relationId);
|
||||||
extern Var * PartitionColumnViaCatalog(Oid relationId);
|
extern Var * PartitionColumnViaCatalog(Oid relationId);
|
||||||
extern uint32 ColocationIdViaCatalog(Oid relationId);
|
extern uint32 ColocationIdViaCatalog(Oid relationId);
|
||||||
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel);
|
bool IsReferenceTableByDistParams(char partitionMethod, char replicationModel);
|
||||||
|
extern bool IsCitusLocalTableByDistParams(char partitionMethod, char replicationModel,
|
||||||
|
uint32 colocationId);
|
||||||
extern List * CitusTableList(void);
|
extern List * CitusTableList(void);
|
||||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||||
extern bool ShardExists(uint64 shardId);
|
extern bool ShardExists(uint64 shardId);
|
||||||
|
|
Loading…
Reference in New Issue