Test RedistributeTaskListResult

pull/3355/head
Hadi Moshayedi 2020-01-09 13:08:00 -08:00
parent 527d7d41c1
commit 40ba2cdd6e
4 changed files with 543 additions and 30 deletions

View File

@ -18,15 +18,18 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "catalog/pg_type.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "distributed/listutils.h"
#include "tcop/tcopprot.h"
PG_FUNCTION_INFO_V1(partition_task_list_results);
PG_FUNCTION_INFO_V1(redistribute_task_list_results);
/*
* partition_task_list_results partitions results of each of distributed
@ -89,3 +92,78 @@ partition_task_list_results(PG_FUNCTION_ARGS)
PG_RETURN_DATUM(0);
}
/*
* redistribute_task_list_results exposes RedistributeTaskListResult for testing.
* It executes a query and repartitions and colocates its results according to
* a relation.
*/
Datum
redistribute_task_list_results(PG_FUNCTION_ARGS)
{
text *resultIdPrefixText = PG_GETARG_TEXT_P(0);
char *resultIdPrefix = text_to_cstring(resultIdPrefixText);
text *queryText = PG_GETARG_TEXT_P(1);
char *queryString = text_to_cstring(queryText);
Oid relationId = PG_GETARG_OID(2);
bool binaryFormat = PG_GETARG_BOOL(3);
Query *parsedQuery = ParseQueryString(queryString, NULL, 0);
PlannedStmt *queryPlan = pg_plan_query(parsedQuery,
CURSOR_OPT_PARALLEL_OK,
NULL);
if (!IsCitusCustomScan(queryPlan->planTree))
{
ereport(ERROR, (errmsg("query must be distributed and shouldn't require "
"any merging on the coordinator.")));
}
CustomScan *customScan = (CustomScan *) queryPlan->planTree;
DistributedPlan *distributedPlan = GetDistributedPlan(customScan);
Job *job = distributedPlan->workerJob;
List *taskList = job->taskList;
DistTableCacheEntry *targetRelation = DistributedTableCacheEntry(relationId);
List **shardResultIds = RedistributeTaskListResults(resultIdPrefix, taskList,
targetRelation, binaryFormat);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
int shardCount = targetRelation->shardIntervalArrayLength;
for (int shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval =
targetRelation->sortedShardIntervalArray[shardIndex];
uint64 shardId = shardInterval->shardId;
int fragmentCount = list_length(shardResultIds[shardIndex]);
Datum *resultIdValues = palloc0(fragmentCount * sizeof(Datum));
List *sortedResultIds = SortList(shardResultIds[shardIndex], pg_qsort_strcmp);
ListCell *resultIdCell = NULL;
int resultIdIndex = 0;
foreach(resultIdCell, sortedResultIds)
{
char *resultId = lfirst(resultIdCell);
resultIdValues[resultIdIndex++] = CStringGetTextDatum(resultId);
}
ArrayType *resultIdArray = DatumArrayToArrayType(resultIdValues, fragmentCount,
TEXTOID);
bool columnNulls[2] = { 0 };
Datum columnValues[2] = {
Int64GetDatum(shardId),
PointerGetDatum(resultIdArray)
};
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
}
tuplestore_donestoring(tupleStore);
PG_RETURN_DATUM(0);
}

View File

@ -76,6 +76,9 @@ s/_id_other_column_ref_fkey/_id_fkey/g
# intermediate_results
s/(ERROR.*)pgsql_job_cache\/([0-9]+_[0-9]+_[0-9]+)\/(.*).data/\1pgsql_job_cache\/xx_x_xxx\/\3.data/g
# toast tables
s/pg_toast_[0-9]+/pg_toast_xxxxx/g
# Plan numbers are not very stable, so we normalize those
# subplan numbers are quite stable so we keep those
s/DEBUG: Plan [0-9]+/DEBUG: Plan XXX/g

View File

@ -2,12 +2,26 @@
CREATE SCHEMA distributed_intermediate_results;
SET search_path TO 'distributed_intermediate_results';
SET citus.next_shard_id TO 4213581;
SET citus.shard_replication_factor TO 1;
-- redistribute_task_list_results test the internal RedistributeTaskListResult
CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(shardid bigint,
colocated_results text[])
LANGUAGE C STRICT VOLATILE
AS 'citus', $$redistribute_task_list_results$$;
--
-- We don't have extensive tests for partition_task_results, since it will be
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
-- We don't have extensive tests for partition_task_results or
-- redistribute_task_list_results, since they will be tested by higher level
-- "INSERT/SELECT with repartitioning" tests anyway.
--
--
-- partition_task_list_results, hash partitioning, binary format
-- Case 1.
-- hash partitioning, binary format
-- * partition_task_list_results
-- * redistribute_task_list_results
--
CREATE TABLE source_table(a int);
SET citus.shard_count TO 3;
@ -26,6 +40,16 @@ SELECT create_distributed_table('target_table', 'a');
(1 row)
CREATE TABLE colocated_with_target(a int);
SELECT create_distributed_table('colocated_with_target', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES (1), (2);
-- partition_task_list_results
-- should error out
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
ERROR: query must be distributed and shouldn't require any merging on the coordinator.
@ -63,10 +87,44 @@ SELECT count(*), sum(x) FROM
100 | 5050
(1 row)
END;
DROP TABLE source_table, target_table, distributed_result_info;
ROLLBACK;
-- redistribute_task_list_results
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
-- expected colocated results on the same node as each of two shards.
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
shardid | colocated_results
---------------------------------------------------------------------
4213584 | {test_from_4213581_to_0,test_from_4213582_to_0}
4213585 | {test_from_4213582_to_1,test_from_4213583_to_1}
(2 rows)
WITH shard_1 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 1
), shard_2 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 2
), all_rows AS (
(SELECT * FROM shard_1) UNION (SELECT * FROM shard_2)
)
SELECT count(*), sum(x) FROM all_rows;
count | sum
---------------------------------------------------------------------
100 | 5050
(1 row)
ROLLBACK;
DROP TABLE source_table, target_table, colocated_with_target;
--
-- partition_task_list_results, range partitioning, text format
-- Case 2.
-- range partitioning, text format
-- * partition_task_list_results
-- * redistribute_task_list_results
--
CREATE TABLE source_table(a int);
SELECT create_distributed_table('source_table', 'a', 'range');
@ -89,6 +147,19 @@ SELECT create_distributed_table('target_table', 'a', 'range');
CALL public.create_range_partitioned_shards('target_table',
'{0,25,50,76}',
'{24,49,75,200}');
CREATE TABLE colocated_with_target(a int);
SELECT create_distributed_table('colocated_with_target', 'a', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('colocated_with_target',
'{0,25,50,76}',
'{24,49,75,200}');
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES (1), (26), (51), (77);
-- partition_task_list_results
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
@ -98,22 +169,22 @@ CREATE TABLE distributed_result_info AS
SELECT * FROM distributed_result_info ORDER BY resultId;
resultid | nodeport | rowcount | targetshardid | targetshardindex
---------------------------------------------------------------------
test_from_4213586_to_0 | 57638 | 7 | 4213590 | 0
test_from_4213586_to_1 | 57638 | 6 | 4213591 | 1
test_from_4213586_to_2 | 57638 | 7 | 4213592 | 2
test_from_4213586_to_3 | 57638 | 4 | 4213593 | 3
test_from_4213587_to_0 | 57637 | 7 | 4213590 | 0
test_from_4213587_to_1 | 57637 | 6 | 4213591 | 1
test_from_4213587_to_2 | 57637 | 8 | 4213592 | 2
test_from_4213587_to_3 | 57637 | 4 | 4213593 | 3
test_from_4213588_to_0 | 57638 | 8 | 4213590 | 0
test_from_4213588_to_1 | 57638 | 6 | 4213591 | 1
test_from_4213588_to_2 | 57638 | 8 | 4213592 | 2
test_from_4213588_to_3 | 57638 | 4 | 4213593 | 3
test_from_4213589_to_0 | 57637 | 8 | 4213590 | 0
test_from_4213589_to_1 | 57637 | 6 | 4213591 | 1
test_from_4213589_to_2 | 57637 | 7 | 4213592 | 2
test_from_4213589_to_3 | 57637 | 4 | 4213593 | 3
test_from_4213588_to_0 | 57638 | 7 | 4213592 | 0
test_from_4213588_to_1 | 57638 | 6 | 4213593 | 1
test_from_4213588_to_2 | 57638 | 7 | 4213594 | 2
test_from_4213588_to_3 | 57638 | 4 | 4213595 | 3
test_from_4213589_to_0 | 57637 | 7 | 4213592 | 0
test_from_4213589_to_1 | 57637 | 6 | 4213593 | 1
test_from_4213589_to_2 | 57637 | 8 | 4213594 | 2
test_from_4213589_to_3 | 57637 | 4 | 4213595 | 3
test_from_4213590_to_0 | 57638 | 8 | 4213592 | 0
test_from_4213590_to_1 | 57638 | 6 | 4213593 | 1
test_from_4213590_to_2 | 57638 | 8 | 4213594 | 2
test_from_4213590_to_3 | 57638 | 4 | 4213595 | 3
test_from_4213591_to_0 | 57637 | 8 | 4213592 | 0
test_from_4213591_to_1 | 57637 | 6 | 4213593 | 1
test_from_4213591_to_2 | 57637 | 7 | 4213594 | 2
test_from_4213591_to_3 | 57637 | 4 | 4213595 | 3
(16 rows)
-- fetch from workers
@ -135,10 +206,190 @@ SELECT count(*), sum(x) FROM
100 | 4550
(1 row)
ROLLBACK;
-- redistribute_task_list_results
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
-- expected colocated results on the same node as each of two shards.
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
shardid | colocated_results
---------------------------------------------------------------------
4213592 | {test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}
4213593 | {test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}
4213594 | {test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}
4213595 | {test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}
(4 rows)
WITH shard_1 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 1
), shard_2 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 26
), shard_3 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 51
), shard_4 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 77
), all_rows AS (
(SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL
(SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4)
)
SELECT count(*), sum(x) FROM all_rows;
count | sum
---------------------------------------------------------------------
100 | 4550
(1 row)
ROLLBACK;
DROP TABLE source_table, target_table, colocated_with_target;
--
-- Case 3.
-- range partitioning, text format, replication factor 2 (both source and destination)
-- composite distribution column
--
-- only redistribute_task_list_results
--
CREATE TYPE composite_key_type AS (f1 int, f2 text);
SET citus.shard_replication_factor TO 2;
-- source
CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type);
SELECT create_distributed_table('source_table', 'key', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard xxxxx -> shard xxxxx
INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard xxxxx -> shard xxxxx
-- target
CREATE TABLE target_table(key composite_key_type, value int);
SELECT create_distributed_table('target_table', 'key', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
-- colocated with target, used for routing calls to read_intermediate_results
CREATE TABLE colocated_with_target(key composite_key_type, value_sum int);
SELECT create_distributed_table('colocated_with_target', 'key', 'range');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES ((0,'a'), 0);
INSERT INTO colocated_with_target VALUES ((25, 'a'), 0);
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
shardid | colocated_results
---------------------------------------------------------------------
4213602 | {test_from_4213600_to_0,test_from_4213601_to_0}
4213603 | {test_from_4213600_to_1,test_from_4213601_to_1}
(2 rows)
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int))
WHERE key=(0,'a')::composite_key_type;
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int))
WHERE key=(25,'a')::composite_key_type;
SELECT * FROM colocated_with_target ORDER BY key;
key | value_sum
---------------------------------------------------------------------
(0,a) | 11
(25,a) | 17
(2 rows)
END;
DROP TABLE source_table, target_table, distributed_result_info;
-- verify that replicas of colocated_with_target are consistent (i.e. copies
-- of result files in both nodes were same when calling read_intermediate_results()
-- in the above UPDATE calls).
\c - - - :worker_1_port
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
key | value_sum
---------------------------------------------------------------------
(0,a) | 11
(1 row)
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
key | value_sum
---------------------------------------------------------------------
(25,a) | 17
(1 row)
\c - - - :worker_2_port
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
key | value_sum
---------------------------------------------------------------------
(0,a) | 11
(1 row)
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
key | value_sum
---------------------------------------------------------------------
(25,a) | 17
(1 row)
\c - - - :master_port
SET search_path TO 'distributed_intermediate_results';
DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info;
DROP TYPE composite_key_type;
--
-- Case 4. target relation is a reference table or an append partitioned table
--
CREATE TABLE source_table(a int);
SELECT create_distributed_table('source_table', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
CREATE TABLE target_table_reference(a int);
SELECT create_reference_table('target_table_reference');
create_reference_table
---------------------------------------------------------------------
(1 row)
CREATE TABLE target_table_append(a int);
SELECT create_distributed_table('target_table_append', 'a', 'append');
create_distributed_table
---------------------------------------------------------------------
(1 row)
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference');
ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned.
ROLLBACK;
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append');
ERROR: repartitioning results of a tasklist is only supported when target relation is hash or range partitioned.
ROLLBACK;
-- clean-up
SET client_min_messages TO WARNING;
DROP SCHEMA distributed_intermediate_results CASCADE;
\set VERBOSITY default
SET client_min_messages TO DEFAULT;
SET citus.shard_count TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;

View File

@ -3,14 +3,29 @@ CREATE SCHEMA distributed_intermediate_results;
SET search_path TO 'distributed_intermediate_results';
SET citus.next_shard_id TO 4213581;
SET citus.shard_replication_factor TO 1;
-- redistribute_task_list_results test the internal RedistributeTaskListResult
CREATE OR REPLACE FUNCTION pg_catalog.redistribute_task_list_results(resultIdPrefix text,
query text,
target_table regclass,
binaryFormat bool DEFAULT true)
RETURNS TABLE(shardid bigint,
colocated_results text[])
LANGUAGE C STRICT VOLATILE
AS 'citus', $$redistribute_task_list_results$$;
--
-- We don't have extensive tests for partition_task_results, since it will be
-- tested by higher level "INSERT/SELECT with repartitioning" tests anyway.
-- We don't have extensive tests for partition_task_results or
-- redistribute_task_list_results, since they will be tested by higher level
-- "INSERT/SELECT with repartitioning" tests anyway.
--
--
-- partition_task_list_results, hash partitioning, binary format
-- Case 1.
-- hash partitioning, binary format
-- * partition_task_list_results
-- * redistribute_task_list_results
--
CREATE TABLE source_table(a int);
@ -22,6 +37,13 @@ CREATE TABLE target_table(a int);
SET citus.shard_count TO 2;
SELECT create_distributed_table('target_table', 'a');
CREATE TABLE colocated_with_target(a int);
SELECT create_distributed_table('colocated_with_target', 'a');
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES (1), (2);
-- partition_task_list_results
-- should error out
SELECT partition_task_list_results('test', $$ SELECT avg(a) FROM source_table $$, 'target_table');
SELECT partition_task_list_results('test', $$ SELECT * FROM generate_series(1, 2) $$, 'target_table');
@ -39,12 +61,36 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost',
SELECT count(*), sum(x) FROM
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
'binary') AS res (x int);
END;
ROLLBACK;
DROP TABLE source_table, target_table, distributed_result_info;
-- redistribute_task_list_results
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
-- expected colocated results on the same node as each of two shards.
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
WITH shard_1 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213581_to_0,test_from_4213582_to_0}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 1
), shard_2 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213582_to_1,test_from_4213583_to_1}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 2
), all_rows AS (
(SELECT * FROM shard_1) UNION (SELECT * FROM shard_2)
)
SELECT count(*), sum(x) FROM all_rows;
ROLLBACK;
DROP TABLE source_table, target_table, colocated_with_target;
--
-- partition_task_list_results, range partitioning, text format
-- Case 2.
-- range partitioning, text format
-- * partition_task_list_results
-- * redistribute_task_list_results
--
CREATE TABLE source_table(a int);
SELECT create_distributed_table('source_table', 'a', 'range');
@ -58,7 +104,15 @@ SELECT create_distributed_table('target_table', 'a', 'range');
CALL public.create_range_partitioned_shards('target_table',
'{0,25,50,76}',
'{24,49,75,200}');
CREATE TABLE colocated_with_target(a int);
SELECT create_distributed_table('colocated_with_target', 'a', 'range');
CALL public.create_range_partitioned_shards('colocated_with_target',
'{0,25,50,76}',
'{24,49,75,200}');
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES (1), (26), (51), (77);
-- partition_task_list_results
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
@ -74,9 +128,135 @@ SELECT nodeport, fetch_intermediate_results((array_agg(resultId)), 'localhost',
SELECT count(*), sum(x) FROM
read_intermediate_results((SELECT array_agg(resultId) FROM distributed_result_info),
'text') AS res (x int);
ROLLBACK;
-- redistribute_task_list_results
-- Verify that redistribute_task_list_results colocated fragments properly by reading the
-- expected colocated results on the same node as each of two shards.
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT (3 * a * a) % 100 FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
WITH shard_1 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_0,test_from_4213589_to_0,test_from_4213590_to_0,test_from_4213591_to_0}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 1
), shard_2 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_1,test_from_4213589_to_1,test_from_4213590_to_1,test_from_4213591_to_1}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 26
), shard_3 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_2,test_from_4213589_to_2,test_from_4213590_to_2,test_from_4213591_to_2}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 51
), shard_4 AS (
SELECT t.* FROM colocated_with_target, (
SELECT * FROM read_intermediate_results('{test_from_4213588_to_3,test_from_4213589_to_3,test_from_4213590_to_3,test_from_4213591_to_3}'::text[], 'binary') AS res (x int)) t
WHERE colocated_with_target.a = 77
), all_rows AS (
(SELECT * FROM shard_1) UNION ALL (SELECT * FROM shard_2) UNION ALL
(SELECT * FROM shard_3) UNION ALL (SELECT * FROM shard_4)
)
SELECT count(*), sum(x) FROM all_rows;
ROLLBACK;
DROP TABLE source_table, target_table, colocated_with_target;
--
-- Case 3.
-- range partitioning, text format, replication factor 2 (both source and destination)
-- composite distribution column
--
-- only redistribute_task_list_results
--
CREATE TYPE composite_key_type AS (f1 int, f2 text);
SET citus.shard_replication_factor TO 2;
-- source
CREATE TABLE source_table(key composite_key_type, value int, mapped_key composite_key_type);
SELECT create_distributed_table('source_table', 'key', 'range');
CALL public.create_range_partitioned_shards('source_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
INSERT INTO source_table VALUES ((0, 'a'), 1, (0, 'a')); -- shard 1 -> shard 1
INSERT INTO source_table VALUES ((1, 'b'), 2, (26, 'b')); -- shard 1 -> shard 2
INSERT INTO source_table VALUES ((2, 'c'), 3, (3, 'c')); -- shard 1 -> shard 1
INSERT INTO source_table VALUES ((4, 'd'), 4, (27, 'd')); -- shard 1 -> shard 2
INSERT INTO source_table VALUES ((30, 'e'), 5, (30, 'e')); -- shard 2 -> shard 2
INSERT INTO source_table VALUES ((31, 'f'), 6, (31, 'f')); -- shard 2 -> shard 2
INSERT INTO source_table VALUES ((32, 'g'), 7, (8, 'g')); -- shard 2 -> shard 1
-- target
CREATE TABLE target_table(key composite_key_type, value int);
SELECT create_distributed_table('target_table', 'key', 'range');
CALL public.create_range_partitioned_shards('target_table', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
-- colocated with target, used for routing calls to read_intermediate_results
CREATE TABLE colocated_with_target(key composite_key_type, value_sum int);
SELECT create_distributed_table('colocated_with_target', 'key', 'range');
CALL public.create_range_partitioned_shards('colocated_with_target', '{"(0,a)","(25,a)"}','{"(24,z)","(49,z)"}');
-- one value per shard, so we can route calls to read_intermediate_shards
INSERT INTO colocated_with_target VALUES ((0,'a'), 0);
INSERT INTO colocated_with_target VALUES ((25, 'a'), 0);
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT mapped_key, value FROM source_table $$, 'target_table');
SELECT * FROM distributed_result_info ORDER BY shardid;
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_0,test_from_4213601_to_0}'::text[], 'binary') AS res (x composite_key_type, y int))
WHERE key=(0,'a')::composite_key_type;
UPDATE colocated_with_target SET value_sum=(SELECT sum(y) FROM read_intermediate_results('{test_from_4213600_to_1,test_from_4213601_to_1}'::text[], 'binary') AS res (x composite_key_type, y int))
WHERE key=(25,'a')::composite_key_type;
SELECT * FROM colocated_with_target ORDER BY key;
END;
DROP TABLE source_table, target_table, distributed_result_info;
-- verify that replicas of colocated_with_target are consistent (i.e. copies
-- of result files in both nodes were same when calling read_intermediate_results()
-- in the above UPDATE calls).
\c - - - :worker_1_port
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
\c - - - :worker_2_port
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213604 ORDER BY key;
SELECT * FROM distributed_intermediate_results.colocated_with_target_4213605 ORDER BY key;
\c - - - :master_port
SET search_path TO 'distributed_intermediate_results';
DROP TABLE source_table, target_table, colocated_with_target, distributed_result_info;
DROP TYPE composite_key_type;
--
-- Case 4. target relation is a reference table or an append partitioned table
--
CREATE TABLE source_table(a int);
SELECT create_distributed_table('source_table', 'a');
INSERT INTO source_table SELECT * FROM generate_series(1, 100);
CREATE TABLE target_table_reference(a int);
SELECT create_reference_table('target_table_reference');
CREATE TABLE target_table_append(a int);
SELECT create_distributed_table('target_table_append', 'a', 'append');
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_reference');
ROLLBACK;
BEGIN;
CREATE TABLE distributed_result_info AS
SELECT * FROM redistribute_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table_append');
ROLLBACK;
-- clean-up
SET client_min_messages TO WARNING;
DROP SCHEMA distributed_intermediate_results CASCADE;
@ -84,3 +264,4 @@ DROP SCHEMA distributed_intermediate_results CASCADE;
\set VERBOSITY default
SET client_min_messages TO DEFAULT;
SET citus.shard_count TO DEFAULT;
SET citus.shard_replication_factor TO DEFAULT;