Adapt other cache entry changes for citus local tables

pull/4143/head
Onur Tirtir 2020-09-09 11:44:05 +03:00
parent a58a4395ab
commit 0b1cc118a9
11 changed files with 59 additions and 29 deletions

View File

@ -102,7 +102,6 @@ CallFuncExprRemotely(CallStmt *callStmt, DistObjectCacheEntry *procedure,
ereport(DEBUG1, (errmsg( ereport(DEBUG1, (errmsg(
"will push down CALL for reference tables"))); "will push down CALL for reference tables")));
colocatedWithReferenceTable = true; colocatedWithReferenceTable = true;
Assert(IsCitusTableType(colocatedRelationId, REFERENCE_TABLE));
} }
ShardPlacement *placement = NULL; ShardPlacement *placement = NULL;

View File

@ -362,7 +362,14 @@ IsCitusTableTypeInternal(CitusTableCacheEntry *tableEntry, CitusTableType tableT
case REFERENCE_TABLE: case REFERENCE_TABLE:
{ {
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE; return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
tableEntry->replicationModel == REPLICATION_MODEL_2PC;
}
case CITUS_LOCAL_TABLE:
{
return tableEntry->partitionMethod == DISTRIBUTE_BY_NONE &&
tableEntry->replicationModel != REPLICATION_MODEL_2PC;
} }
case CITUS_TABLE_WITH_NO_DIST_KEY: case CITUS_TABLE_WITH_NO_DIST_KEY:

View File

@ -229,7 +229,7 @@ ShouldSyncTableMetadata(Oid relationId)
bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry, bool mxTable = (streamingReplicated && IsCitusTableTypeCacheEntry(tableEntry,
HASH_DISTRIBUTED)); HASH_DISTRIBUTED));
if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE)) if (mxTable || IsCitusTableTypeCacheEntry(tableEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
return true; return true;
} }

View File

@ -172,12 +172,12 @@ master_apply_delete_command(PG_FUNCTION_ARGS)
"are not supported with master_apply_delete_command."), "are not supported with master_apply_delete_command."),
errhint("Use the DELETE command instead."))); errhint("Use the DELETE command instead.")));
} }
else if (IsCitusTableType(relationId, REFERENCE_TABLE)) else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot delete from reference table"), errmsg("cannot delete from table"),
errdetail("Delete statements on reference tables " errdetail("Delete statements on reference and citus "
"are not supported."))); "local tables are not supported.")));
} }

View File

@ -143,13 +143,20 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
errdetail("We currently don't support creating shards " errdetail("We currently don't support creating shards "
"on hash-partitioned tables"))); "on hash-partitioned tables")));
} }
else if (IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY)) else if (IsCitusTableType(relationId, REFERENCE_TABLE))
{ {
ereport(ERROR, (errmsg("relation \"%s\" is a reference table", ereport(ERROR, (errmsg("relation \"%s\" is a reference table",
relationName), relationName),
errdetail("We currently don't support creating shards " errdetail("We currently don't support creating shards "
"on reference tables"))); "on reference tables")));
} }
else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg("relation \"%s\" is a citus local table",
relationName),
errdetail("We currently don't support creating shards "
"on citus local tables")));
}
char replicationModel = TableReplicationModel(relationId); char replicationModel = TableReplicationModel(relationId);
@ -257,7 +264,8 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
{ {
ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId), ereport(ERROR, (errmsg("cannot append to shardId " UINT64_FORMAT, shardId),
errdetail("We currently don't support appending to shards " errdetail("We currently don't support appending to shards "
"in hash-partitioned or reference tables"))); "in hash-partitioned, reference and citus local "
"tables")));
} }
/* ensure that the shard placement metadata does not change during the append */ /* ensure that the shard placement metadata does not change during the append */

View File

@ -77,7 +77,7 @@ RebuildQueryStrings(Job *workerJob)
RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query); RangeTblEntry *copiedSubqueryRte = ExtractSelectRangeTableEntry(query);
Query *copiedSubquery = copiedSubqueryRte->subquery; Query *copiedSubquery = copiedSubqueryRte->subquery;
/* there are no restrictions to add for reference tables */ /* there are no restrictions to add for reference and citus local tables */
if (IsCitusTableType(shardInterval->relationId, DISTRIBUTED_TABLE)) if (IsCitusTableType(shardInterval->relationId, DISTRIBUTED_TABLE))
{ {
AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval); AddShardIntervalRestrictionToSelect(copiedSubquery, shardInterval);

View File

@ -2704,7 +2704,7 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
Assert(query->commandType == CMD_INSERT); Assert(query->commandType == CMD_INSERT);
/* reference tables can only have one shard */ /* reference tables and citus local tables can only have one shard */
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
List *shardIntervalList = LoadShardIntervalList(distributedTableId); List *shardIntervalList = LoadShardIntervalList(distributedTableId);
@ -2712,7 +2712,16 @@ BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError)
int shardCount = list_length(shardIntervalList); int shardCount = list_length(shardIntervalList);
if (shardCount != 1) if (shardCount != 1)
{ {
ereport(ERROR, (errmsg("reference table cannot have %d shards", shardCount))); if (IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE))
{
ereport(ERROR, (errmsg("reference table cannot have %d shards",
shardCount)));
}
else if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg("citus local table cannot have %d shards",
shardCount)));
}
} }
ShardInterval *shardInterval = linitial(shardIntervalList); ShardInterval *shardInterval = linitial(shardIntervalList);

View File

@ -173,7 +173,7 @@ RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType acce
* recursively calling RecordRelationAccessBase(), so becareful about * recursively calling RecordRelationAccessBase(), so becareful about
* removing this check. * removing this check.
*/ */
if (!IsCitusTableType(relationId, REFERENCE_TABLE)) if (!IsCitusTableType(relationId, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
return; return;
} }
@ -691,9 +691,9 @@ ShouldRecordRelationAccess()
/* /*
* CheckConflictingRelationAccesses is mostly a wrapper around * CheckConflictingRelationAccesses is mostly a wrapper around
* HoldsConflictingLockWithReferencingRelations(). We're only interested in accesses * HoldsConflictingLockWithReferencingRelations(). We're only interested in
* to reference tables that are referenced via a foreign constraint by a * accesses to reference tables and citus local tables that are referenced via
* hash distributed tables. * a foreign constraint by a hash distributed table.
*/ */
static void static void
CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType accessType) CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType accessType)
@ -708,7 +708,7 @@ CheckConflictingRelationAccesses(Oid relationId, ShardPlacementAccessType access
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
if (!(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE) && if (!(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY) &&
cacheEntry->referencingRelationsViaForeignKey != NIL)) cacheEntry->referencingRelationsViaForeignKey != NIL))
{ {
return; return;
@ -903,8 +903,11 @@ HoldsConflictingLockWithReferencedRelations(Oid relationId, ShardPlacementAccess
Oid referencedRelation = InvalidOid; Oid referencedRelation = InvalidOid;
foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey) foreach_oid(referencedRelation, cacheEntry->referencedRelationsViaForeignKey)
{ {
/* we're only interested in foreign keys to reference tables */ /*
if (!IsCitusTableType(referencedRelation, REFERENCE_TABLE)) * We're only interested in foreign keys to reference tables and citus
* local tables.
*/
if (!IsCitusTableType(referencedRelation, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
continue; continue;
} }
@ -966,7 +969,7 @@ HoldsConflictingLockWithReferencingRelations(Oid relationId, ShardPlacementAcces
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
bool holdsConflictingLocks = false; bool holdsConflictingLocks = false;
Assert(IsCitusTableTypeCacheEntry(cacheEntry, REFERENCE_TABLE)); Assert(IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY));
Oid referencingRelation = InvalidOid; Oid referencingRelation = InvalidOid;
foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey) foreach_oid(referencingRelation, cacheEntry->referencingRelationsViaForeignKey)

View File

@ -229,8 +229,8 @@ CompareRelationShards(const void *leftElement, const void *rightElement)
* *
* For hash partitioned tables, it calculates hash value of a number in its * For hash partitioned tables, it calculates hash value of a number in its
* range (e.g. min value) and finds which shard should contain the hashed * range (e.g. min value) and finds which shard should contain the hashed
* value. For reference tables, it simply returns 0. For distribution methods * value. For reference tables and citus local tables, it simply returns 0.
* other than hash and reference, the function errors out. * For the other table types, the function errors out.
*/ */
int int
ShardIndex(ShardInterval *shardInterval) ShardIndex(ShardInterval *shardInterval)
@ -247,17 +247,21 @@ ShardIndex(ShardInterval *shardInterval)
*/ */
if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) && if (!IsCitusTableTypeCacheEntry(cacheEntry, HASH_DISTRIBUTED) &&
!IsCitusTableTypeCacheEntry( !IsCitusTableTypeCacheEntry(
cacheEntry, REFERENCE_TABLE)) cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("finding index of a given shard is only supported for " errmsg("finding index of a given shard is only supported for "
"hash distributed and reference tables"))); "hash distributed tables, reference tables and citus "
"local tables")));
} }
/* short-circuit for reference tables */ /* short-circuit for reference tables */
if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY)) if (IsCitusTableTypeCacheEntry(cacheEntry, CITUS_TABLE_WITH_NO_DIST_KEY))
{ {
/* reference tables has only a single shard, so the index is fixed to 0 */ /*
* Reference tables and citus local tables have only a single shard,
* so the index is fixed to 0.
*/
shardIndex = 0; shardIndex = 0;
return shardIndex; return shardIndex;

View File

@ -1409,8 +1409,8 @@ ALTER TABLE reference_schema.reference_table_ddl_test RENAME TO reference_table_
-- now test reference tables against some helper UDFs that Citus provides -- now test reference tables against some helper UDFs that Citus provides
-- cannot delete / drop shards from a reference table -- cannot delete / drop shards from a reference table
SELECT master_apply_delete_command('DELETE FROM reference_schema.reference_table_ddl'); SELECT master_apply_delete_command('DELETE FROM reference_schema.reference_table_ddl');
ERROR: cannot delete from reference table ERROR: cannot delete from table
DETAIL: Delete statements on reference tables are not supported. DETAIL: Delete statements on reference and citus local tables are not supported.
-- cannot add shards -- cannot add shards
SELECT master_create_empty_shard('reference_schema.reference_table_ddl'); SELECT master_create_empty_shard('reference_schema.reference_table_ddl');
ERROR: relation "reference_schema.reference_table_ddl" is a reference table ERROR: relation "reference_schema.reference_table_ddl" is a reference table
@ -1434,7 +1434,7 @@ SELECT master_update_shard_statistics(:a_shard_id);
CREATE TABLE append_reference_tmp_table (id INT); CREATE TABLE append_reference_tmp_table (id INT);
SELECT master_append_table_to_shard(:a_shard_id, 'append_reference_tmp_table', 'localhost', :master_port); SELECT master_append_table_to_shard(:a_shard_id, 'append_reference_tmp_table', 'localhost', :master_port);
ERROR: cannot append to shardId 1250019 ERROR: cannot append to shardId 1250019
DETAIL: We currently don't support appending to shards in hash-partitioned or reference tables DETAIL: We currently don't support appending to shards in hash-partitioned, reference and citus local tables
SELECT master_get_table_ddl_events('reference_schema.reference_table_ddl'); SELECT master_get_table_ddl_events('reference_schema.reference_table_ddl');
master_get_table_ddl_events master_get_table_ddl_events
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -82,7 +82,7 @@ FROM
pg_dist_shard pg_dist_shard
WHERE 'multi_append_table_to_shard_right_reference_hash'::regclass::oid = logicalrelid; WHERE 'multi_append_table_to_shard_right_reference_hash'::regclass::oid = logicalrelid;
ERROR: cannot append to shardId 230001 ERROR: cannot append to shardId 230001
DETAIL: We currently don't support appending to shards in hash-partitioned or reference tables DETAIL: We currently don't support appending to shards in hash-partitioned, reference and citus local tables
-- Clean up after test -- Clean up after test
SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left'); SELECT master_apply_delete_command('DELETE FROM multi_append_table_to_shard_left');
master_apply_delete_command master_apply_delete_command