remove task tracker specific tests from worker_schedule

pull/3850/head
Sait Talha Nisanci 2020-05-19 16:15:52 +03:00
parent b0b3cb9e5b
commit 79dc807c32
12 changed files with 5 additions and 530 deletions

View File

@ -1,89 +0,0 @@
--
-- TASK_TRACKER_ASSIGN_TASK
--
\set JobId 401010
\set SimpleTaskId 101101
\set RecoverableTaskId 801102
\set SimpleTaskTable lineitem_simple_task
\set BadQueryString '\'SELECT COUNT(*) FROM bad_table_name\''
\set GoodQueryString '\'SELECT COUNT(*) FROM lineitem\''
\set SelectAll 'SELECT *'
-- We assign two tasks to the task tracker. The first task simply executes. The
-- recoverable task on the other hand repeatedly fails, and we sleep until the
-- task tracker stops retrying the recoverable task.
SELECT task_tracker_assign_task(:JobId, :SimpleTaskId,
'COPY (SELECT * FROM lineitem) TO '
'''base/pgsql_job_cache/job_401010/task_101101''');
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_assign_task(:JobId, :RecoverableTaskId, :BadQueryString);
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
-- After assigning the two tasks, we wait for them to make progress. Note that
-- these tasks get scheduled and run asynchronously, so if the sleep interval is
-- not enough, the regression tests may fail on an overloaded box.
SELECT pg_sleep(3.0);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_task_status(:JobId, :SimpleTaskId);
task_tracker_task_status
---------------------------------------------------------------------
6
(1 row)
SELECT task_tracker_task_status(:JobId, :RecoverableTaskId);
task_tracker_task_status
---------------------------------------------------------------------
5
(1 row)
COPY :SimpleTaskTable FROM 'base/pgsql_job_cache/job_401010/task_101101';
SELECT COUNT(*) FROM :SimpleTaskTable;
count
---------------------------------------------------------------------
12000
(1 row)
SELECT COUNT(*) AS diff_lhs FROM ( :SelectAll FROM :SimpleTaskTable EXCEPT ALL
:SelectAll FROM lineitem ) diff;
diff_lhs
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) As diff_rhs FROM ( :SelectAll FROM lineitem EXCEPT ALL
:SelectAll FROM :SimpleTaskTable ) diff;
diff_rhs
---------------------------------------------------------------------
0
(1 row)
-- We now reassign the recoverable task with a good query string. This updates
-- the task's query string, and reschedules the updated task for execution.
SELECT task_tracker_assign_task(:JobId, :RecoverableTaskId, :GoodQueryString);
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(2.0);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_task_status(:JobId, :RecoverableTaskId);
task_tracker_task_status
---------------------------------------------------------------------
6
(1 row)

View File

@ -1,110 +0,0 @@
--
-- TASK_TRACKER_CLEANUP_JOB
--
SET citus.next_shard_id TO 1060000;
\set JobId 401010
\set CompletedTaskId 801107
\set RunningTaskId 801108
-- Test worker_cleanup_job_schema_cache
SELECT * FROM task_tracker_assign_task(2, 2, '');
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 'pg_merge_job_0002';
count
---------------------------------------------------------------------
1
(1 row)
SELECT worker_cleanup_job_schema_cache();
worker_cleanup_job_schema_cache
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 'pg_merge_job_0002';
count
---------------------------------------------------------------------
0
(1 row)
-- We assign two tasks to the task tracker. The first task should complete and
-- the second task should continue to keep running.
SELECT task_tracker_assign_task(:JobId, :CompletedTaskId,
'COPY (SELECT * FROM lineitem) TO '
'''base/pgsql_job_cache/job_401010/task_801107''');
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_assign_task(:JobId, :RunningTaskId,
'SELECT pg_sleep(100)');
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(2.0);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_task_status(:JobId, :CompletedTaskId);
task_tracker_task_status
---------------------------------------------------------------------
6
(1 row)
SELECT task_tracker_task_status(:JobId, :RunningTaskId);
task_tracker_task_status
---------------------------------------------------------------------
3
(1 row)
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
isdir
---------------------------------------------------------------------
f
(1 row)
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
isdir
---------------------------------------------------------------------
t
(1 row)
-- We now clean up all tasks for this job id. As a result, shared hash entries,
-- files, and connections associated with these tasks should all be cleaned up.
SELECT task_tracker_cleanup_job(:JobId);
task_tracker_cleanup_job
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(1.0);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_task_status(:JobId, :CompletedTaskId);
ERROR: could not find the worker task
DETAIL: Task jobId: 401010 and taskId: 801107
SELECT task_tracker_task_status(:JobId, :RunningTaskId);
ERROR: could not find the worker task
DETAIL: Task jobId: 401010 and taskId: 801108
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
ERROR: could not stat file "base/pgsql_job_cache/job_401010/task_801107": No such file or directory
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
ERROR: could not stat file "base/pgsql_job_cache/job_401010": No such file or directory
-- Also clean up worker_cleanup_job_schema_cache job
SELECT task_tracker_cleanup_job(2);
task_tracker_cleanup_job
---------------------------------------------------------------------
(1 row)

View File

@ -1,11 +0,0 @@
--
-- TASK_TRACKER_CREATE_TABLE
--
SET citus.next_shard_id TO 1070000;
-- New table definitions to test the task tracker process and protocol
CREATE TABLE lineitem_simple_task ( LIKE lineitem );
CREATE TABLE lineitem_compute_task ( LIKE lineitem );
CREATE TABLE lineitem_compute_update_task ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_00 ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_01 ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_02 ( LIKE lineitem );

View File

@ -1,108 +0,0 @@
--
-- TASK_TRACKER_PARTITION_TASK
--
\set JobId 401010
\set PartitionTaskId 801106
\set PartitionColumn l_orderkey
\set SelectAll 'SELECT *'
\set TablePart00 lineitem_partition_task_part_00
\set TablePart01 lineitem_partition_task_part_01
\set TablePart02 lineitem_partition_task_part_02
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
\set File_Basedir base/pgsql_job_cache
\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid
\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid
\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid
-- We assign a partition task and wait for it to complete. Note that we hardcode
-- the partition function call string, including the job and task identifiers,
-- into the argument in the task assignment function. This hardcoding is
-- necessary as the current psql version does not perform variable interpolation
-- for names inside single quotes.
SELECT task_tracker_assign_task(:JobId, :PartitionTaskId,
'SELECT worker_range_partition_table('
'401010, 801106, ''SELECT * FROM lineitem'', '
'''l_orderkey'', 20, ARRAY[1000, 3000]::_int8)');
task_tracker_assign_task
---------------------------------------------------------------------
(1 row)
SELECT pg_sleep(4.0);
pg_sleep
---------------------------------------------------------------------
(1 row)
SELECT task_tracker_task_status(:JobId, :PartitionTaskId);
task_tracker_task_status
---------------------------------------------------------------------
6
(1 row)
COPY :TablePart00 FROM :'Table_File_00';
COPY :TablePart01 FROM :'Table_File_01';
COPY :TablePart02 FROM :'Table_File_02';
SELECT COUNT(*) FROM :TablePart00;
count
---------------------------------------------------------------------
1004
(1 row)
SELECT COUNT(*) FROM :TablePart02;
count
---------------------------------------------------------------------
8970
(1 row)
-- We first compute the difference of partition tables against the base table.
-- Then, we compute the difference of the base table against partitioned tables.
SELECT COUNT(*) AS diff_lhs_00 FROM (
:SelectAll FROM :TablePart00 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn < 1000 ) diff;
diff_lhs_00
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) AS diff_lhs_01 FROM (
:SelectAll FROM :TablePart01 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn >= 1000 AND
:PartitionColumn < 3000 ) diff;
diff_lhs_01
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) AS diff_lhs_02 FROM (
:SelectAll FROM :TablePart02 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn >= 3000 ) diff;
diff_lhs_02
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) AS diff_rhs_00 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn < 1000 EXCEPT ALL
:SelectAll FROM :TablePart00 ) diff;
diff_rhs_00
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) AS diff_rhs_01 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn >= 1000 AND
:PartitionColumn < 3000 EXCEPT ALL
:SelectAll FROM :TablePart01 ) diff;
diff_rhs_01
---------------------------------------------------------------------
0
(1 row)
SELECT COUNT(*) AS diff_rhs_02 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn >= 3000 EXCEPT ALL
:SelectAll FROM :TablePart02 ) diff;
diff_rhs_02
---------------------------------------------------------------------
0
(1 row)

View File

@ -1,8 +0,0 @@
-- Clear job directory used by previous tests
\set JobId 201010
SELECT task_tracker_cleanup_job(:JobId);
task_tracker_cleanup_job
---------------------------------------------------------------------
(1 row)

View File

@ -29,8 +29,8 @@ SELECT create_distributed_table('lineitem_hash', 'l_orderkey', 'hash');
(1 row)
\copy lineitem_hash FROM '/home/talha/citus/src/test/regress/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash FROM '/home/talha/citus/src/test/regress/data/lineitem.2.data' with delimiter '|'
\copy lineitem_hash FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'
\copy lineitem_hash FROM '@abs_srcdir@/data/lineitem.2.data' with delimiter '|'
ANALYZE lineitem_hash;
SET citus.task_executor_type to "task-tracker";
-- count(distinct) is supported on top level query if there

View File

@ -1,51 +0,0 @@
--
-- TASK_TRACKER_ASSIGN_TASK
--
\set JobId 401010
\set SimpleTaskId 101101
\set RecoverableTaskId 801102
\set SimpleTaskTable lineitem_simple_task
\set BadQueryString '\'SELECT COUNT(*) FROM bad_table_name\''
\set GoodQueryString '\'SELECT COUNT(*) FROM lineitem\''
\set SelectAll 'SELECT *'
-- We assign two tasks to the task tracker. The first task simply executes. The
-- recoverable task on the other hand repeatedly fails, and we sleep until the
-- task tracker stops retrying the recoverable task.
SELECT task_tracker_assign_task(:JobId, :SimpleTaskId,
'COPY (SELECT * FROM lineitem) TO '
'''base/pgsql_job_cache/job_401010/task_101101''');
SELECT task_tracker_assign_task(:JobId, :RecoverableTaskId, :BadQueryString);
-- After assigning the two tasks, we wait for them to make progress. Note that
-- these tasks get scheduled and run asynchronously, so if the sleep interval is
-- not enough, the regression tests may fail on an overloaded box.
SELECT pg_sleep(3.0);
SELECT task_tracker_task_status(:JobId, :SimpleTaskId);
SELECT task_tracker_task_status(:JobId, :RecoverableTaskId);
COPY :SimpleTaskTable FROM 'base/pgsql_job_cache/job_401010/task_101101';
SELECT COUNT(*) FROM :SimpleTaskTable;
SELECT COUNT(*) AS diff_lhs FROM ( :SelectAll FROM :SimpleTaskTable EXCEPT ALL
:SelectAll FROM lineitem ) diff;
SELECT COUNT(*) As diff_rhs FROM ( :SelectAll FROM lineitem EXCEPT ALL
:SelectAll FROM :SimpleTaskTable ) diff;
-- We now reassign the recoverable task with a good query string. This updates
-- the task's query string, and reschedules the updated task for execution.
SELECT task_tracker_assign_task(:JobId, :RecoverableTaskId, :GoodQueryString);
SELECT pg_sleep(2.0);
SELECT task_tracker_task_status(:JobId, :RecoverableTaskId);

View File

@ -1,51 +0,0 @@
--
-- TASK_TRACKER_CLEANUP_JOB
--
SET citus.next_shard_id TO 1060000;
\set JobId 401010
\set CompletedTaskId 801107
\set RunningTaskId 801108
-- Test worker_cleanup_job_schema_cache
SELECT * FROM task_tracker_assign_task(2, 2, '');
SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 'pg_merge_job_0002';
SELECT worker_cleanup_job_schema_cache();
SELECT count(*) FROM pg_catalog.pg_namespace WHERE nspname = 'pg_merge_job_0002';
-- We assign two tasks to the task tracker. The first task should complete and
-- the second task should continue to keep running.
SELECT task_tracker_assign_task(:JobId, :CompletedTaskId,
'COPY (SELECT * FROM lineitem) TO '
'''base/pgsql_job_cache/job_401010/task_801107''');
SELECT task_tracker_assign_task(:JobId, :RunningTaskId,
'SELECT pg_sleep(100)');
SELECT pg_sleep(2.0);
SELECT task_tracker_task_status(:JobId, :CompletedTaskId);
SELECT task_tracker_task_status(:JobId, :RunningTaskId);
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
-- We now clean up all tasks for this job id. As a result, shared hash entries,
-- files, and connections associated with these tasks should all be cleaned up.
SELECT task_tracker_cleanup_job(:JobId);
SELECT pg_sleep(1.0);
SELECT task_tracker_task_status(:JobId, :CompletedTaskId);
SELECT task_tracker_task_status(:JobId, :RunningTaskId);
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010/task_801107');
SELECT isdir FROM pg_stat_file('base/pgsql_job_cache/job_401010');
-- Also clean up worker_cleanup_job_schema_cache job
SELECT task_tracker_cleanup_job(2);

View File

@ -1,17 +0,0 @@
--
-- TASK_TRACKER_CREATE_TABLE
--
SET citus.next_shard_id TO 1070000;
-- New table definitions to test the task tracker process and protocol
CREATE TABLE lineitem_simple_task ( LIKE lineitem );
CREATE TABLE lineitem_compute_task ( LIKE lineitem );
CREATE TABLE lineitem_compute_update_task ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_00 ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_01 ( LIKE lineitem );
CREATE TABLE lineitem_partition_task_part_02 ( LIKE lineitem );

View File

@ -1,69 +0,0 @@
--
-- TASK_TRACKER_PARTITION_TASK
--
\set JobId 401010
\set PartitionTaskId 801106
\set PartitionColumn l_orderkey
\set SelectAll 'SELECT *'
\set TablePart00 lineitem_partition_task_part_00
\set TablePart01 lineitem_partition_task_part_01
\set TablePart02 lineitem_partition_task_part_02
SELECT usesysid AS userid FROM pg_user WHERE usename = current_user \gset
\set File_Basedir base/pgsql_job_cache
\set Table_File_00 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00000.:userid
\set Table_File_01 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00001.:userid
\set Table_File_02 :File_Basedir/job_:JobId/task_:PartitionTaskId/p_00002.:userid
-- We assign a partition task and wait for it to complete. Note that we hardcode
-- the partition function call string, including the job and task identifiers,
-- into the argument in the task assignment function. This hardcoding is
-- necessary as the current psql version does not perform variable interpolation
-- for names inside single quotes.
SELECT task_tracker_assign_task(:JobId, :PartitionTaskId,
'SELECT worker_range_partition_table('
'401010, 801106, ''SELECT * FROM lineitem'', '
'''l_orderkey'', 20, ARRAY[1000, 3000]::_int8)');
SELECT pg_sleep(4.0);
SELECT task_tracker_task_status(:JobId, :PartitionTaskId);
COPY :TablePart00 FROM :'Table_File_00';
COPY :TablePart01 FROM :'Table_File_01';
COPY :TablePart02 FROM :'Table_File_02';
SELECT COUNT(*) FROM :TablePart00;
SELECT COUNT(*) FROM :TablePart02;
-- We first compute the difference of partition tables against the base table.
-- Then, we compute the difference of the base table against partitioned tables.
SELECT COUNT(*) AS diff_lhs_00 FROM (
:SelectAll FROM :TablePart00 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn < 1000 ) diff;
SELECT COUNT(*) AS diff_lhs_01 FROM (
:SelectAll FROM :TablePart01 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn >= 1000 AND
:PartitionColumn < 3000 ) diff;
SELECT COUNT(*) AS diff_lhs_02 FROM (
:SelectAll FROM :TablePart02 EXCEPT ALL
:SelectAll FROM lineitem WHERE :PartitionColumn >= 3000 ) diff;
SELECT COUNT(*) AS diff_rhs_00 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn < 1000 EXCEPT ALL
:SelectAll FROM :TablePart00 ) diff;
SELECT COUNT(*) AS diff_rhs_01 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn >= 1000 AND
:PartitionColumn < 3000 EXCEPT ALL
:SelectAll FROM :TablePart01 ) diff;
SELECT COUNT(*) AS diff_rhs_02 FROM (
:SelectAll FROM lineitem WHERE :PartitionColumn >= 3000 EXCEPT ALL
:SelectAll FROM :TablePart02 ) diff;

View File

@ -1,5 +0,0 @@
-- Clear job directory used by previous tests
\set JobId 201010
SELECT task_tracker_cleanup_job(:JobId);

View File

@ -19,16 +19,10 @@ test: worker_hash_partition worker_hash_partition_complex
test: worker_merge_range_files worker_merge_hash_files
test: worker_binary_data_partition worker_null_data_partition
test: worker_check_invalid_arguments
test: worker_remove_files
# ----------
# All task tracker tests use the following tables
# ----------
test: task_tracker_create_table
test: task_tracker_assign_task task_tracker_partition_task
test: task_tracker_cleanup_job
# ---------
# test that no tests leaked intermediate results. This should always be last
# ---------
test: ensure_no_intermediate_data_leak
# there will be open some leak because we removed task tracker remove files
# this shouldn't be a problem in this schedule
# test: ensure_no_intermediate_data_leak