mirror of https://github.com/citusdata/citus.git
Validate relation name before logging it
parent
2c50101074
commit
789ff7b162
|
@ -79,6 +79,8 @@ static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode,
|
||||||
List *sourceColocatedShardIntervalList,
|
List *sourceColocatedShardIntervalList,
|
||||||
List *shardGroupSplitIntervalListList,
|
List *shardGroupSplitIntervalListList,
|
||||||
List *workersForPlacementList);
|
List *workersForPlacementList);
|
||||||
|
static bool CheckIfRelationWithSameNameExists(ShardInterval *shardInterval,
|
||||||
|
WorkerNode *workerNode);
|
||||||
static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
||||||
List *workersForPlacementList);
|
List *workersForPlacementList);
|
||||||
static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
static void CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
|
@ -548,6 +550,55 @@ BlockingShardSplit(SplitOperation splitOperation,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/* Check if a relation with given name already exists on the worker node */
|
||||||
|
static bool
|
||||||
|
CheckIfRelationWithSameNameExists(ShardInterval *shardInterval, WorkerNode *workerNode)
|
||||||
|
{
|
||||||
|
char *schemaName = get_namespace_name(
|
||||||
|
get_rel_namespace(shardInterval->relationId));
|
||||||
|
char *shardName = get_rel_name(shardInterval->relationId);
|
||||||
|
AppendShardIdToName(&shardName, shardInterval->shardId);
|
||||||
|
|
||||||
|
StringInfo checkShardExistsQuery = makeStringInfo();
|
||||||
|
appendStringInfo(checkShardExistsQuery,
|
||||||
|
"SELECT EXISTS (SELECT FROM pg_tables WHERE schemaname = '%s' AND tablename = '%s');",
|
||||||
|
schemaName,
|
||||||
|
shardName);
|
||||||
|
|
||||||
|
int connectionFlags = 0;
|
||||||
|
MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags,
|
||||||
|
workerNode->workerName,
|
||||||
|
workerNode->workerPort,
|
||||||
|
CitusExtensionOwnerName(),
|
||||||
|
get_database_name(
|
||||||
|
MyDatabaseId));
|
||||||
|
|
||||||
|
PGresult *result = NULL;
|
||||||
|
int queryResult = ExecuteOptionalRemoteCommand(connection,
|
||||||
|
checkShardExistsQuery->data, &result);
|
||||||
|
if (queryResult != RESPONSE_OKAY || !IsResponseOK(result) || PQntuples(result) != 1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_CONNECTION_FAILURE),
|
||||||
|
errmsg(
|
||||||
|
"Cannot check if relation %s already exists for split on worker %s:%d",
|
||||||
|
ConstructQualifiedShardName(shardInterval),
|
||||||
|
connection->hostname,
|
||||||
|
connection->port)));
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *checkExists = PQgetvalue(result, 0, 0);
|
||||||
|
PQclear(result);
|
||||||
|
ForgetResults(connection);
|
||||||
|
|
||||||
|
return strcmp(checkExists, "t") == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Create ShardGroup split children on a list of corresponding workers. */
|
/* Create ShardGroup split children on a list of corresponding workers. */
|
||||||
static void
|
static void
|
||||||
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
||||||
|
@ -578,7 +629,24 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
||||||
splitShardCreationCommandList,
|
splitShardCreationCommandList,
|
||||||
shardInterval->shardId);
|
shardInterval->shardId);
|
||||||
|
|
||||||
/* Log resource for cleanup in case of failure only. */
|
/* Log resource for cleanup in case of failure only.
|
||||||
|
* Before we log a record, do a best effort check to see if a shard with same name exists.
|
||||||
|
* This is because, it will cause shard creation to fail and we will end up cleaning the
|
||||||
|
* old shard. We don't want that.
|
||||||
|
*/
|
||||||
|
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
|
||||||
|
workerPlacementNode);
|
||||||
|
|
||||||
|
if (relationExists)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
|
||||||
|
errmsg("Relation %s already exists on worker %s:%d.",
|
||||||
|
ConstructQualifiedShardName(shardInterval),
|
||||||
|
workerPlacementNode->workerName,
|
||||||
|
workerPlacementNode->workerPort)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
CleanupPolicy policy = CLEANUP_ON_FAILURE;
|
CleanupPolicy policy = CLEANUP_ON_FAILURE;
|
||||||
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
|
InsertCleanupRecordInSubtransaction(CLEANUP_SHARD_PLACEMENT,
|
||||||
ConstructQualifiedShardName(
|
ConstructQualifiedShardName(
|
||||||
|
@ -587,7 +655,9 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
|
||||||
policy);
|
policy);
|
||||||
|
|
||||||
/* Create new split child shard on the specified placement list */
|
/* Create new split child shard on the specified placement list */
|
||||||
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
|
CreateObjectOnPlacement(splitShardCreationCommandList,
|
||||||
|
workerPlacementNode);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1459,6 +1529,24 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
splitShardCreationCommandList,
|
splitShardCreationCommandList,
|
||||||
shardInterval->shardId);
|
shardInterval->shardId);
|
||||||
|
|
||||||
|
/* Log resource for cleanup in case of failure only.
|
||||||
|
* Before we log a record, do a best effort check to see if a shard with same name exists.
|
||||||
|
* This is because, it will cause shard creation to fail and we will end up cleaning the
|
||||||
|
* old shard. We don't want that.
|
||||||
|
*/
|
||||||
|
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
|
||||||
|
workerPlacementNode);
|
||||||
|
|
||||||
|
if (relationExists)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
|
||||||
|
errmsg("Relation %s already exists on worker %s:%d.",
|
||||||
|
ConstructQualifiedShardName(shardInterval),
|
||||||
|
workerPlacementNode->workerName,
|
||||||
|
workerPlacementNode->workerPort)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
|
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
|
||||||
* we want to cleanup irrespective of operation success or failure.
|
* we want to cleanup irrespective of operation success or failure.
|
||||||
*/
|
*/
|
||||||
|
@ -1470,7 +1558,8 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
policy);
|
policy);
|
||||||
|
|
||||||
/* Create dummy source shard on the specified placement list */
|
/* Create dummy source shard on the specified placement list */
|
||||||
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
|
CreateObjectOnPlacement(splitShardCreationCommandList,
|
||||||
|
workerPlacementNode);
|
||||||
|
|
||||||
/* Add dummy source shard entry created for placement node in map */
|
/* Add dummy source shard entry created for placement node in map */
|
||||||
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
|
AddDummyShardEntryInMap(mapOfPlacementToDummyShardList,
|
||||||
|
@ -1478,6 +1567,7 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
shardInterval);
|
shardInterval);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Statisfy Constraint 2: Create dummy target shards from shard group on source node.
|
* Statisfy Constraint 2: Create dummy target shards from shard group on source node.
|
||||||
|
@ -1504,6 +1594,24 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
splitShardCreationCommandList,
|
splitShardCreationCommandList,
|
||||||
shardInterval->shardId);
|
shardInterval->shardId);
|
||||||
|
|
||||||
|
/* Log resource for cleanup in case of failure only.
|
||||||
|
* Before we log a record, do a best effort check to see if a shard with same name exists.
|
||||||
|
* This is because, it will cause shard creation to fail and we will end up cleaning the
|
||||||
|
* old shard. We don't want that.
|
||||||
|
*/
|
||||||
|
bool relationExists = CheckIfRelationWithSameNameExists(shardInterval,
|
||||||
|
sourceWorkerNode);
|
||||||
|
|
||||||
|
if (relationExists)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_DUPLICATE_TABLE),
|
||||||
|
errmsg("Relation %s already exists on worker %s:%d.",
|
||||||
|
ConstructQualifiedShardName(shardInterval),
|
||||||
|
sourceWorkerNode->workerName,
|
||||||
|
sourceWorkerNode->workerPort)));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
|
/* Log shard in pg_dist_cleanup. Given dummy shards are transient resources,
|
||||||
* we want to cleanup irrespective of operation success or failure.
|
* we want to cleanup irrespective of operation success or failure.
|
||||||
*/
|
*/
|
||||||
|
@ -1524,6 +1632,7 @@ CreateDummyShardsForShardGroup(HTAB *mapOfPlacementToDummyShardList,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -72,8 +72,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
|
||||||
ARRAY['-1073741824'],
|
ARRAY['-1073741824'],
|
||||||
ARRAY[:worker_1_node, :worker_1_node],
|
ARRAY[:worker_1_node, :worker_1_node],
|
||||||
'block_writes');
|
'block_writes');
|
||||||
ERROR: relation "sensors_8981002" already exists
|
ERROR: Relation citus_split_failure_test_schema.sensors_8981002 already exists on worker localhost:xxxxx.
|
||||||
CONTEXT: while executing command on localhost:xxxxx
|
|
||||||
-- BEGIN : Split Shard, which is expected to fail.
|
-- BEGIN : Split Shard, which is expected to fail.
|
||||||
-- BEGIN : Ensure tables were cleaned from worker
|
-- BEGIN : Ensure tables were cleaned from worker
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
|
|
Loading…
Reference in New Issue