Addressing Marco's PR comment

pull/6029/head
Nitish Upreti 2022-07-14 13:21:19 -07:00
parent 151b549a67
commit b1405d5eaf
4 changed files with 188 additions and 6 deletions

View File

@ -72,8 +72,7 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList,
List *workersForPlacementList); List *workersForPlacementList);
static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode);
WorkerNode *workerNode);
/* Customize error message strings based on operation type */ /* Customize error message strings based on operation type */
static const char *const SplitOperationName[] = static const char *const SplitOperationName[] =
@ -517,10 +516,9 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList,
/* Create a DDL task with corresponding task list on given worker node */ /* Create a DDL task with corresponding task list on given worker node */
static Task * static Task *
CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode) CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode)
{ {
Task *ddlTask = CitusMakeNode(Task); Task *ddlTask = CitusMakeNode(Task);
ddlTask->jobId = jobId;
ddlTask->taskType = DDL_TASK; ddlTask->taskType = DDL_TASK;
ddlTask->replicationModel = REPLICATION_MODEL_INVALID; ddlTask->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryStringList(ddlTask, ddlCommandList); SetTaskQueryStringList(ddlTask, ddlCommandList);
@ -572,8 +570,7 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList,
*/ */
if (ddlCommandList != NULL) if (ddlCommandList != NULL)
{ {
uint64 jobId = shardInterval->shardId; Task *ddlTask = CreateTaskForDDLCommandList(ddlCommandList,
Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList,
workerPlacementNode); workerPlacementNode);
ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); ddlTaskExecList = lappend(ddlTaskExecList, ddlTask);

View File

@ -0,0 +1,104 @@
CREATE SCHEMA "citus_split_failure_test_schema";
SET search_path TO "citus_split_failure_test_schema";
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 890000;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split
CREATE TABLE sensors(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated(
measureid integer,
eventdatetime2 date);
SELECT create_distributed_table('sensors', 'measureid');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:='sensors');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- END: Create table to split
-- BEGIN : Switch to worker and create split shards already so workflow fails.
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
CREATE TABLE sensors_8981001(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_8981002(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated_8981003(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated_8981004(
measureid integer,
eventdatetime date);
-- A random table which should not be deleted.
CREATE TABLE sensors_nodelete(
measureid integer,
eventdatetime date);
-- List tables in worker.
SET search_path TO "citus_split_failure_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname
FROM pg_catalog.pg_class tbl
WHERE tbl.relname like 'sensors%'
ORDER BY 1;
relname
---------------------------------------------------------------------
sensors
sensors_890000
sensors_8981001
sensors_8981002
sensors_colocated
sensors_colocated_890001
sensors_colocated_8981003
sensors_colocated_8981004
sensors_nodelete
(9 rows)
-- END : Switch to worker and create split shards already so workflow fails.
-- BEGIN : Set node id variables
\c - postgres - :master_port
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- END : Set node id variables
-- BEGIN : Split Shard, which is expected to fail.
SET citus.next_shard_id TO 8981001;
SELECT pg_catalog.citus_split_shard_by_split_points(
890000,
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_1_node],
'block_writes');
ERROR: relation "sensors_8981001" already exists
CONTEXT: while executing command on localhost:xxxxx
-- BEGIN : Split Shard, which is expected to fail.
-- BEGIN : Ensure tables were cleaned from worker
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname
FROM pg_catalog.pg_class tbl
WHERE tbl.relname like 'sensors%'
ORDER BY 1;
relname
---------------------------------------------------------------------
sensors
sensors_890000
sensors_colocated
sensors_colocated_890001
sensors_nodelete
(5 rows)
-- END : Ensure tables were cleaned from worker
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_failure_test_schema" CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table citus_split_failure_test_schema.sensors
drop cascades to table citus_split_failure_test_schema.sensors_colocated
--END : Cleanup

View File

@ -12,3 +12,4 @@ test: worker_split_binary_copy_test
test: worker_split_text_copy_test test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points test: citus_split_shard_by_split_points
test: citus_split_shard_by_split_points_failure

View File

@ -0,0 +1,80 @@
CREATE SCHEMA "citus_split_failure_test_schema";
SET search_path TO "citus_split_failure_test_schema";
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 890000;
SET citus.shard_replication_factor TO 1;
-- BEGIN: Create table to split
CREATE TABLE sensors(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated(
measureid integer,
eventdatetime2 date);
SELECT create_distributed_table('sensors', 'measureid');
SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:='sensors');
-- END: Create table to split
-- BEGIN : Switch to worker and create split shards already so workflow fails.
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
CREATE TABLE sensors_8981001(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_8981002(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated_8981003(
measureid integer,
eventdatetime date);
CREATE TABLE sensors_colocated_8981004(
measureid integer,
eventdatetime date);
-- A random table which should not be deleted.
CREATE TABLE sensors_nodelete(
measureid integer,
eventdatetime date);
-- List tables in worker.
SET search_path TO "citus_split_failure_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname
FROM pg_catalog.pg_class tbl
WHERE tbl.relname like 'sensors%'
ORDER BY 1;
-- END : Switch to worker and create split shards already so workflow fails.
-- BEGIN : Set node id variables
\c - postgres - :master_port
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
-- END : Set node id variables
-- BEGIN : Split Shard, which is expected to fail.
SET citus.next_shard_id TO 8981001;
SELECT pg_catalog.citus_split_shard_by_split_points(
890000,
ARRAY['-1073741824'],
ARRAY[:worker_1_node, :worker_1_node],
'block_writes');
-- BEGIN : Split Shard, which is expected to fail.
-- BEGIN : Ensure tables were cleaned from worker
\c - - - :worker_1_port
SET search_path TO "citus_split_failure_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*';
SELECT tbl.relname
FROM pg_catalog.pg_class tbl
WHERE tbl.relname like 'sensors%'
ORDER BY 1;
-- END : Ensure tables were cleaned from worker
--BEGIN : Cleanup
\c - postgres - :master_port
DROP SCHEMA "citus_split_failure_test_schema" CASCADE;
--END : Cleanup