Check groupid existence hackily while removing metadata

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-04 12:04:33 +03:00
parent 7e3f2486f3
commit 320a8ecdea
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
7 changed files with 35 additions and 9 deletions

View File

@ -2187,7 +2187,7 @@ LookupTaskPlacementHostAndPort(ShardPlacement *taskPlacement, char **nodeName,
* there were changes in pg_dist_node and we will get those invalidations * there were changes in pg_dist_node and we will get those invalidations
* in LookupNodeForGroup. * in LookupNodeForGroup.
*/ */
WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId); WorkerNode *workerNode = LookupNodeForGroup(taskPlacement->groupId, false);
*nodeName = workerNode->workerName; *nodeName = workerNode->workerName;
*nodePort = workerNode->workerPort; *nodePort = workerNode->workerPort;
} }

View File

@ -833,7 +833,12 @@ ResolveGroupShardPlacement(GroupShardPlacement *groupShardPlacement,
ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement); ShardPlacement *shardPlacement = CitusMakeNode(ShardPlacement);
int32 groupId = groupShardPlacement->groupId; int32 groupId = groupShardPlacement->groupId;
WorkerNode *workerNode = LookupNodeForGroup(groupId); WorkerNode *workerNode = LookupNodeForGroup(groupId, true);
if (workerNode == NULL)
{
return NULL;
}
/* copy everything into shardPlacement but preserve the header */ /* copy everything into shardPlacement but preserve the header */
CitusNode header = shardPlacement->type; CitusNode header = shardPlacement->type;
@ -929,7 +934,7 @@ LookupNodeByNodeIdOrError(uint32 nodeId)
* appropriate error message. * appropriate error message.
*/ */
WorkerNode * WorkerNode *
LookupNodeForGroup(int32 groupId) LookupNodeForGroup(int32 groupId, bool checkExistence)
{ {
bool foundAnyNodes = false; bool foundAnyNodes = false;
@ -953,10 +958,20 @@ LookupNodeForGroup(int32 groupId)
} }
if (!foundAnyNodes) if (!foundAnyNodes)
{
if (checkExistence)
{
ereport(NOTICE, (errmsg("there is a shard placement in node group %d but "
"there are no nodes in that group", groupId)));
return NULL;
}
else
{ {
ereport(ERROR, (errmsg("there is a shard placement in node group %d but " ereport(ERROR, (errmsg("there is a shard placement in node group %d but "
"there are no nodes in that group", groupId))); "there are no nodes in that group", groupId)));
} }
}
switch (ReadFromSecondaries) switch (ReadFromSecondaries)
{ {
@ -1014,6 +1029,11 @@ ShardPlacementListIncludingOrphanedPlacements(uint64 shardId)
tableEntry, tableEntry,
shardIndex); shardIndex);
if (shardPlacement == NULL)
{
return NULL;
}
placementList = lappend(placementList, shardPlacement); placementList = lappend(placementList, shardPlacement);
} }

View File

@ -391,7 +391,7 @@ FullShardPlacementList(Oid relationId, ArrayType *excludedShardArray)
placementIndex++) placementIndex++)
{ {
GroupShardPlacement *groupPlacement = &placementArray[placementIndex]; GroupShardPlacement *groupPlacement = &placementArray[placementIndex];
WorkerNode *worker = LookupNodeForGroup(groupPlacement->groupId); WorkerNode *worker = LookupNodeForGroup(groupPlacement->groupId, false);
ShardPlacement *placement = CitusMakeNode(ShardPlacement); ShardPlacement *placement = CitusMakeNode(ShardPlacement);
placement->shardId = groupPlacement->shardId; placement->shardId = groupPlacement->shardId;
placement->shardLength = groupPlacement->shardLength; placement->shardLength = groupPlacement->shardLength;

View File

@ -455,7 +455,7 @@ CoordinatorNodeIfAddedAsWorkerOrError()
{ {
ErrorIfCoordinatorNotAddedAsWorkerNode(); ErrorIfCoordinatorNotAddedAsWorkerNode();
WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID); WorkerNode *coordinatorNode = LookupNodeForGroup(COORDINATOR_GROUP_ID, false);
WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode)); WorkerNode *coordinatorNodeCopy = palloc0(sizeof(WorkerNode));
*coordinatorNodeCopy = *coordinatorNode; *coordinatorNodeCopy = *coordinatorNode;

View File

@ -268,6 +268,12 @@ worker_drop_distributed_table_metadata_only(PG_FUNCTION_ARGS)
uint64 shardId = *shardIdPointer; uint64 shardId = *shardIdPointer;
List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId); List *shardPlacementList = ShardPlacementListIncludingOrphanedPlacements(shardId);
if (shardPlacementList == NULL)
{
ereport(NOTICE, (errmsg("placement for relation with oid %d does not exist, skipping", relationId)));
PG_RETURN_VOID();
}
ShardPlacement *placement = NULL; ShardPlacement *placement = NULL;
foreach_ptr(placement, shardPlacementList) foreach_ptr(placement, shardPlacementList)
{ {

View File

@ -211,7 +211,7 @@ extern bool HasAnyNodes(void);
extern HTAB * GetWorkerNodeHash(void); extern HTAB * GetWorkerNodeHash(void);
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId); extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId); extern WorkerNode * LookupNodeByNodeIdOrError(uint32 nodeId);
extern WorkerNode * LookupNodeForGroup(int32 groupId); extern WorkerNode * LookupNodeForGroup(int32 groupId, bool checkExistence);
/* namespace oids */ /* namespace oids */
extern Oid CitusCatalogNamespaceId(void); extern Oid CitusCatalogNamespaceId(void);

View File

@ -75,7 +75,7 @@ extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
#define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition" "SELECT worker_drop_distributed_table_only(logicalrelid::regclass::text) FROM pg_dist_partition"
#define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_METADATA_ONLY_COMMAND \
"SELECT worker_drop_distributed_table_metadata_only(logicalrelid) FROM pg_dist_partition" "SELECT worker_drop_distributed_table_metadata_only(logicalrelid::oid) FROM pg_dist_partition"
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition" "SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"