Use CopyShardForeignConstraintCommandList in WorkerCreateShardCommandList

What we do to collect foreign key constraint commands in
WorkerCreateShardCommandList is quite similar to what we do in
CopyShardForeignConstraintCommandList. Plus, the code that we used
in WorkerCreateShardCommandList before was not able to properly handle
foreign key constraints between Citus local tables --when creating a
reference table from the referencing one.

With a few slight modifications made to
CopyShardForeignConstraintCommandList, we can use the same logic in
WorkerCreateShardCommandList too.
pull/7131/head
Onur Tirtir 2023-08-18 15:00:02 +03:00
parent d97f786296
commit 5bdf19f517
4 changed files with 23 additions and 88 deletions

View File

@ -82,7 +82,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor, bool useExclusiveConnections) int32 replicationFactor, bool useExclusiveConnections)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(distributedTableId);
bool colocatedShard = false;
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
/* make sure table is hash partitioned */ /* make sure table is hash partitioned */
@ -201,7 +200,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
} }
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnections, colocatedShard); useExclusiveConnections);
} }
@ -213,7 +212,6 @@ void
CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
useExclusiveConnections) useExclusiveConnections)
{ {
bool colocatedShard = true;
List *insertedShardPlacements = NIL; List *insertedShardPlacements = NIL;
List *insertedShardIds = NIL; List *insertedShardIds = NIL;
@ -306,7 +304,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
} }
CreateShardsOnWorkers(targetRelationId, insertedShardPlacements, CreateShardsOnWorkers(targetRelationId, insertedShardPlacements,
useExclusiveConnections, colocatedShard); useExclusiveConnections);
} }
@ -322,7 +320,6 @@ CreateReferenceTableShard(Oid distributedTableId)
text *shardMinValue = NULL; text *shardMinValue = NULL;
text *shardMaxValue = NULL; text *shardMaxValue = NULL;
bool useExclusiveConnection = false; bool useExclusiveConnection = false;
bool colocatedShard = false;
/* /*
* In contrast to append/range partitioned tables it makes more sense to * In contrast to append/range partitioned tables it makes more sense to
@ -368,7 +365,7 @@ CreateReferenceTableShard(Oid distributedTableId)
replicationFactor); replicationFactor);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements, CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection, colocatedShard); useExclusiveConnection);
} }
@ -431,10 +428,8 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
* creating a single shard. * creating a single shard.
*/ */
bool useExclusiveConnection = false; bool useExclusiveConnection = false;
bool colocatedShard = false;
CreateShardsOnWorkers(relationId, insertedShardPlacements, CreateShardsOnWorkers(relationId, insertedShardPlacements,
useExclusiveConnection, colocatedShard); useExclusiveConnection);
} }

View File

@ -1841,7 +1841,11 @@ CopyShardForeignConstraintCommandListGrouped(ShardInterval *shardInterval,
char *referencedSchemaName = get_namespace_name(referencedSchemaId); char *referencedSchemaName = get_namespace_name(referencedSchemaId);
char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName); char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE)) if (relationId == referencedRelationId)
{
referencedShardId = shardInterval->shardId;
}
else if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE))
{ {
referencedShardId = GetFirstShardId(referencedRelationId); referencedShardId = GetFirstShardId(referencedRelationId);
} }

View File

@ -312,8 +312,6 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
int attemptCount = replicationFactor; int attemptCount = replicationFactor;
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
int placementsCreated = 0; int placementsCreated = 0;
List *foreignConstraintCommandList =
GetReferencingForeignConstaintCommands(relationId);
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS; IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = NO_IDENTITY; IncludeIdentities includeIdentityDefaults = NO_IDENTITY;
@ -346,7 +344,6 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
uint32 nodeGroupId = workerNode->groupId; uint32 nodeGroupId = workerNode->groupId;
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
int shardIndex = -1; /* not used in this code path */
const uint64 shardSize = 0; const uint64 shardSize = 0;
MultiConnection *connection = MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
@ -360,9 +357,8 @@ CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
continue; continue;
} }
List *commandList = WorkerCreateShardCommandList(relationId, shardIndex, shardId, List *commandList = WorkerCreateShardCommandList(relationId, shardId,
ddlCommandList, ddlCommandList);
foreignConstraintCommandList);
ExecuteCriticalRemoteCommandList(connection, commandList); ExecuteCriticalRemoteCommandList(connection, commandList);
@ -427,7 +423,7 @@ InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
*/ */
void void
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool colocatedShard) bool useExclusiveConnection)
{ {
IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS; IncludeSequenceDefaults includeSequenceDefaults = NO_SEQUENCE_DEFAULTS;
IncludeIdentities includeIdentityDefaults = NO_IDENTITY; IncludeIdentities includeIdentityDefaults = NO_IDENTITY;
@ -437,8 +433,6 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
includeSequenceDefaults, includeSequenceDefaults,
includeIdentityDefaults, includeIdentityDefaults,
creatingShellTableOnRemoteNode); creatingShellTableOnRemoteNode);
List *foreignConstraintCommandList =
GetReferencingForeignConstaintCommands(distributedRelationId);
int taskId = 1; int taskId = 1;
List *taskList = NIL; List *taskList = NIL;
@ -449,18 +443,10 @@ CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
{ {
uint64 shardId = shardPlacement->shardId; uint64 shardId = shardPlacement->shardId;
ShardInterval *shardInterval = LoadShardInterval(shardId); ShardInterval *shardInterval = LoadShardInterval(shardId);
int shardIndex = -1;
List *relationShardList = RelationShardListForShardCreate(shardInterval); List *relationShardList = RelationShardListForShardCreate(shardInterval);
if (colocatedShard)
{
shardIndex = ShardIndex(shardInterval);
}
List *commandList = WorkerCreateShardCommandList(distributedRelationId, List *commandList = WorkerCreateShardCommandList(distributedRelationId,
shardIndex, shardId, ddlCommandList);
shardId, ddlCommandList,
foreignConstraintCommandList);
Task *task = CitusMakeNode(Task); Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID; task->jobId = INVALID_JOB_ID;
@ -604,14 +590,12 @@ RelationShardListForShardCreate(ShardInterval *shardInterval)
* shardId to create the shard on the worker node. * shardId to create the shard on the worker node.
*/ */
List * List *
WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, WorkerCreateShardCommandList(Oid relationId, uint64 shardId,
List *ddlCommandList, List *ddlCommandList)
List *foreignConstraintCommandList)
{ {
List *commandList = NIL; List *commandList = NIL;
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
TableDDLCommand *ddlCommand = NULL; TableDDLCommand *ddlCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList) foreach_ptr(ddlCommand, ddlCommandList)
@ -622,57 +606,12 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
commandList = lappend(commandList, applyDDLCommand); commandList = lappend(commandList, applyDDLCommand);
} }
const char *command = NULL; ShardInterval *shardInterval = LoadShardInterval(shardId);
foreach_ptr(command, foreignConstraintCommandList)
{
char *escapedCommand = quote_literal_cstr(command);
uint64 referencedShardId = INVALID_SHARD_ID; commandList = list_concat(
commandList,
StringInfo applyForeignConstraintCommand = makeStringInfo(); CopyShardForeignConstraintCommandList(shardInterval)
);
/* we need to parse the foreign constraint command to get referencing table id */
Oid referencedRelationId = ForeignConstraintGetReferencedTableId(command);
if (referencedRelationId == InvalidOid)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot create foreign key constraint"),
errdetail("Referenced relation cannot be found.")));
}
Oid referencedSchemaId = get_rel_namespace(referencedRelationId);
char *referencedSchemaName = get_namespace_name(referencedSchemaId);
char *escapedReferencedSchemaName = quote_literal_cstr(referencedSchemaName);
/*
* In case of self referencing shards, relation itself might not be distributed
* already. Therefore we cannot use ColocatedShardIdInRelation which assumes
* given relation is distributed. Besides, since we know foreign key references
* itself, referencedShardId is actual shardId anyway. Also, if the referenced
* relation is a reference table, we cannot use ColocatedShardIdInRelation since
* reference tables only have one shard. Instead, we fetch the one and only shard
* from shardlist and use it.
*/
if (relationId == referencedRelationId)
{
referencedShardId = shardId;
}
else if (IsCitusTableType(referencedRelationId, REFERENCE_TABLE))
{
referencedShardId = GetFirstShardId(referencedRelationId);
}
else
{
referencedShardId = ColocatedShardIdInRelation(referencedRelationId,
shardIndex);
}
appendStringInfo(applyForeignConstraintCommand,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
referencedShardId, escapedReferencedSchemaName, escapedCommand);
commandList = lappend(commandList, applyForeignConstraintCommand->data);
}
/* /*
* If the shard is created for a partition, send the command to create the * If the shard is created for a partition, send the command to create the
@ -680,7 +619,6 @@ WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId,
*/ */
if (PartitionTable(relationId)) if (PartitionTable(relationId))
{ {
ShardInterval *shardInterval = LoadShardInterval(shardId);
char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval); char *attachPartitionCommand = GenerateAttachShardPartitionCommand(shardInterval);
commandList = lappend(commandList, attachPartitionCommand); commandList = lappend(commandList, attachPartitionCommand);

View File

@ -250,8 +250,7 @@ extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId
List *workerNodeList, int List *workerNodeList, int
replicationFactor); replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements, extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool useExclusiveConnection);
bool colocatedShard);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId, extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex, List *workerNodeList, int workerStartIndex,
int replicationFactor); int replicationFactor);
@ -264,9 +263,8 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
extern void CreateReferenceTableShard(Oid distributedTableId); extern void CreateReferenceTableShard(Oid distributedTableId);
extern void CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, extern void CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId,
uint32 colocationId); uint32 colocationId);
extern List * WorkerCreateShardCommandList(Oid relationId, int shardIndex, uint64 shardId, extern List * WorkerCreateShardCommandList(Oid relationId, uint64 shardId,
List *ddlCommandList, List *ddlCommandList);
List *foreignConstraintCommandList);
extern Oid ForeignConstraintGetReferencedTableId(const char *queryString); extern Oid ForeignConstraintGetReferencedTableId(const char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,