Mark the given table as not auto-converted

talha_tes1
Ahmet Gedemenli 2021-09-24 15:28:54 +03:00
parent a231822de0
commit 2d20fa37ad
4 changed files with 67 additions and 2 deletions

View File

@ -87,6 +87,8 @@ PG_FUNCTION_INFO_V1(citus_add_local_table_to_metadata);
PG_FUNCTION_INFO_V1(create_citus_local_table); PG_FUNCTION_INFO_V1(create_citus_local_table);
PG_FUNCTION_INFO_V1(remove_local_tables_from_metadata); PG_FUNCTION_INFO_V1(remove_local_tables_from_metadata);
bool autoConverted = false;
/* /*
* citus_add_local_table_to_metadata creates a citus local table from the table with * citus_add_local_table_to_metadata creates a citus local table from the table with
@ -258,6 +260,11 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
"cascade_via_foreign_keys=>true)", "cascade_via_foreign_keys=>true)",
qualifiedRelationName, qualifiedRelationName))); qualifiedRelationName, qualifiedRelationName)));
} }
/* save the relation name, to obtain the shell relation id later */
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
autoConverted = true;
/* /*
* By acquiring AccessExclusiveLock, make sure that no modifications happen * By acquiring AccessExclusiveLock, make sure that no modifications happen
@ -266,6 +273,13 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
CascadeOperationForFkeyConnectedRelations(relationId, lockMode, CascadeOperationForFkeyConnectedRelations(relationId, lockMode,
CASCADE_ADD_LOCAL_TABLE_TO_METADATA); CASCADE_ADD_LOCAL_TABLE_TO_METADATA);
autoConverted = false;
Oid shellRelationId = get_relname_relid(relationName, relationSchemaId);
/* mark the shell relation with autoConverted=false, as it was a user request */
UpdatePartitionAutoConverted(shellRelationId, autoConverted);
/* /*
* We converted every foreign key connected table in our subgraph * We converted every foreign key connected table in our subgraph
* including itself to a citus local table, so return here. * including itself to a citus local table, so return here.
@ -1102,7 +1116,7 @@ InsertMetadataForCitusLocalTable(Oid citusLocalTableId, uint64 shardId)
Var *distributionColumn = NULL; Var *distributionColumn = NULL;
InsertIntoPgDistPartition(citusLocalTableId, distributionMethod, InsertIntoPgDistPartition(citusLocalTableId, distributionMethod,
distributionColumn, colocationId, distributionColumn, colocationId,
replicationModel, false); replicationModel, autoConverted);
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(citusLocalTableId); char shardStorageType = ShardStorageType(citusLocalTableId);

View File

@ -2070,6 +2070,56 @@ UpdatePlacementGroupId(uint64 placementId, int groupId)
} }
/*
* UpdatePartitionAutoConverted sets the autoConverted for the partition identified
* by distributedRelationId.
*/
void
UpdatePartitionAutoConverted(Oid distributedRelationId, bool autoConverted)
{
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for partition oid: %u",
distributedRelationId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistPartition, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(distributedRelationId);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
}
/* /*
* Check that the current user has `mode` permissions on relationId, error out * Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions. * if not. Superusers always have such permissions.

View File

@ -164,7 +164,7 @@ ShouldUndistributeCitusLocalTable(Oid relationId)
*/ */
return false; return false;
} }
if (!AutoConvertedViaCatalog(relationId)) if (!AutoConvertedViaCatalog(relationOid))
{ {
/* /*
* The relation is connected to a (or, is a) Citus Local Table created * The relation is connected to a (or, is a) Citus Local Table created

View File

@ -235,6 +235,7 @@ extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
Var *distributionColumn, uint32 colocationId, Var *distributionColumn, uint32 colocationId,
char replicationModel, bool autoConverted); char replicationModel, bool autoConverted);
extern void UpdatePartitionAutoConverted(Oid distributedRelationId, bool autoConverted);
extern void DeletePartitionRow(Oid distributedRelationId); extern void DeletePartitionRow(Oid distributedRelationId);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern void UpdateShardPlacementState(uint64 placementId, char shardState); extern void UpdateShardPlacementState(uint64 placementId, char shardState);