mirror of https://github.com/citusdata/citus.git
Preserve colocation with procedures in alter_distributed_table (#4743)
parent
8820541fd4
commit
5ebd4eac7f
|
@ -43,6 +43,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/metadata/dependency.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
|
@ -719,6 +720,32 @@ ConvertTable(TableConversionState *con)
|
|||
CreateCitusTableLike(con);
|
||||
}
|
||||
|
||||
/* preserve colocation with procedures/functions */
|
||||
if (con->conversionType == ALTER_DISTRIBUTED_TABLE)
|
||||
{
|
||||
/*
|
||||
* Updating the colocationId of functions is always desirable for
|
||||
* the following scenario:
|
||||
* we have shardCount or colocateWith change
|
||||
* AND entire co-location group is altered
|
||||
* The reason for the second condition is because we currently don't
|
||||
* remember the original table specified in the colocateWith when
|
||||
* distributing the function. We only remember the colocationId in
|
||||
* pg_dist_object table.
|
||||
*/
|
||||
if ((!con->shardCountIsNull || con->colocateWith != NULL) &&
|
||||
(con->cascadeToColocated == CASCADE_TO_COLOCATED_YES || list_length(
|
||||
con->colocatedTableList) == 1) && con->distributionColumn == NULL)
|
||||
{
|
||||
/*
|
||||
* Update the colocationId from the one of the old relation to the one
|
||||
* of the new relation for all tuples in citus.pg_dist_object
|
||||
*/
|
||||
UpdateDistributedObjectColocationId(TableColocationId(con->relationId),
|
||||
TableColocationId(con->newRelationId));
|
||||
}
|
||||
}
|
||||
|
||||
ReplaceTable(con->relationId, con->newRelationId, justBeforeDropCommands,
|
||||
con->suppressNoticeMessages);
|
||||
|
||||
|
|
|
@ -373,3 +373,56 @@ GetDistributedObjectAddressList(void)
|
|||
|
||||
return objectAddressList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UpdateDistributedObjectColocationId gets an old and a new colocationId
|
||||
* and updates the colocationId of all tuples in citus.pg_dist_object which
|
||||
* have the old colocationId to the new colocationId.
|
||||
*/
|
||||
void
|
||||
UpdateDistributedObjectColocationId(uint32 oldColocationId,
|
||||
uint32 newColocationId)
|
||||
{
|
||||
const bool indexOK = false;
|
||||
ScanKeyData scanKey[1];
|
||||
Relation pgDistObjectRel = table_open(DistObjectRelationId(),
|
||||
RowExclusiveLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel);
|
||||
|
||||
/* scan pg_dist_object for colocationId equal to old colocationId */
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_object_colocationid,
|
||||
BTEqualStrategyNumber,
|
||||
F_INT4EQ, UInt32GetDatum(oldColocationId));
|
||||
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistObjectRel,
|
||||
InvalidOid,
|
||||
indexOK,
|
||||
NULL, 1, scanKey);
|
||||
HeapTuple heapTuple;
|
||||
while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor)))
|
||||
{
|
||||
Datum values[Natts_pg_dist_object];
|
||||
bool isnull[Natts_pg_dist_object];
|
||||
bool replace[Natts_pg_dist_object];
|
||||
|
||||
memset(replace, 0, sizeof(replace));
|
||||
|
||||
replace[Anum_pg_dist_object_colocationid - 1] = true;
|
||||
|
||||
/* update the colocationId to the new one */
|
||||
values[Anum_pg_dist_object_colocationid - 1] = UInt32GetDatum(newColocationId);
|
||||
|
||||
isnull[Anum_pg_dist_object_colocationid - 1] = false;
|
||||
|
||||
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull,
|
||||
replace);
|
||||
|
||||
CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple);
|
||||
CitusInvalidateRelcacheByRelid(DistObjectRelationId());
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistObjectRel, NoLock);
|
||||
CommandCounterIncrement();
|
||||
}
|
||||
|
|
|
@ -27,5 +27,6 @@ extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
|||
ObjectAddress *extensionAddress);
|
||||
|
||||
extern List * GetDistributedObjectAddressList(void);
|
||||
|
||||
extern void UpdateDistributedObjectColocationId(uint32 oldColocationId, uint32
|
||||
newColocationId);
|
||||
#endif /* CITUS_METADATA_DISTOBJECT_H */
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
CREATE SCHEMA mx_alter_distributed_table;
|
||||
SET search_path TO mx_alter_distributed_table;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1410000;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
-- test alter_distributed_table UDF
|
||||
CREATE TABLE adt_table (a INT, b INT);
|
||||
CREATE TABLE adt_col (a INT UNIQUE, b INT);
|
||||
|
@ -159,5 +161,317 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi
|
|||
adt_table | distributed | a | 6
|
||||
(1 row)
|
||||
|
||||
-- test procedure colocation is preserved with alter_distributed_table
|
||||
CREATE TABLE test_proc_colocation_0 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_0', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE procedure proc_0(dist_key float8)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
res INT := 0;
|
||||
BEGIN
|
||||
INSERT INTO test_proc_colocation_0 VALUES (dist_key);
|
||||
SELECT count(*) INTO res FROM test_proc_colocation_0;
|
||||
RAISE NOTICE 'Res: %', res;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
SELECT create_distributed_function('proc_0(float8)', 'dist_key', 'test_proc_colocation_0' );
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410002
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410002
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 1
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
-- shardCount is not null && list_length(colocatedTableList) = 1
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 2
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410003
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410003
|
||||
(1 row)
|
||||
|
||||
-- colocatewith is not null && list_length(colocatedTableList) = 1
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE test_proc_colocation_1 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_1', 'a', colocate_with := 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_1');
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 3
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410004
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410004
|
||||
(1 row)
|
||||
|
||||
-- shardCount is not null && cascade_to_colocated is true
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 4
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410003
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410003
|
||||
(1 row)
|
||||
|
||||
-- colocatewith is not null && cascade_to_colocated is true
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4, cascade_to_colocated := true);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE test_proc_colocation_2 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_2', 'a', colocate_with := 'none');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_2', cascade_to_colocated := true);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 5
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410005
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410005
|
||||
(1 row)
|
||||
|
||||
-- try a case with more than one procedure
|
||||
CREATE OR REPLACE procedure proc_1(dist_key float8)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
res INT := 0;
|
||||
BEGIN
|
||||
INSERT INTO test_proc_colocation_0 VALUES (dist_key);
|
||||
SELECT count(*) INTO res FROM test_proc_colocation_0;
|
||||
RAISE NOTICE 'Res: %', res;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
SELECT create_distributed_function('proc_1(float8)', 'dist_key', 'test_proc_colocation_0' );
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410005
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410005
|
||||
proc_1 | 1410005
|
||||
(2 rows)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 6
|
||||
DETAIL: from localhost:xxxxx
|
||||
CALL proc_1(2.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 7
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_2
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_2
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_2
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_2
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 8
|
||||
DETAIL: from localhost:xxxxx
|
||||
CALL proc_1(2.0);
|
||||
DEBUG: pushing down the procedure
|
||||
NOTICE: Res: 9
|
||||
DETAIL: from localhost:xxxxx
|
||||
RESET client_min_messages;
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410003
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410003
|
||||
proc_1 | 1410003
|
||||
(2 rows)
|
||||
|
||||
-- case which shouldn't preserve colocation for now
|
||||
-- shardCount is not null && cascade_to_colocated is false
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 18, cascade_to_colocated := false);
|
||||
NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0
|
||||
NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0
|
||||
alter_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
logicalrelid | colocationid
|
||||
---------------------------------------------------------------------
|
||||
test_proc_colocation_0 | 1410006
|
||||
(1 row)
|
||||
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
proname | colocationid
|
||||
---------------------------------------------------------------------
|
||||
proc_0 | 1410003
|
||||
proc_1 | 1410003
|
||||
(2 rows)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA mx_alter_distributed_table CASCADE;
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
CREATE SCHEMA mx_alter_distributed_table;
|
||||
SET search_path TO mx_alter_distributed_table;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1410000;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
-- test alter_distributed_table UDF
|
||||
CREATE TABLE adt_table (a INT, b INT);
|
||||
|
@ -48,5 +50,114 @@ END;
|
|||
|
||||
SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table';
|
||||
|
||||
-- test procedure colocation is preserved with alter_distributed_table
|
||||
CREATE TABLE test_proc_colocation_0 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_0', 'a');
|
||||
|
||||
CREATE OR REPLACE procedure proc_0(dist_key float8)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
res INT := 0;
|
||||
BEGIN
|
||||
INSERT INTO test_proc_colocation_0 VALUES (dist_key);
|
||||
SELECT count(*) INTO res FROM test_proc_colocation_0;
|
||||
RAISE NOTICE 'Res: %', res;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
SELECT create_distributed_function('proc_0(float8)', 'dist_key', 'test_proc_colocation_0' );
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
-- shardCount is not null && list_length(colocatedTableList) = 1
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
|
||||
-- colocatewith is not null && list_length(colocatedTableList) = 1
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4);
|
||||
CREATE TABLE test_proc_colocation_1 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_1', 'a', colocate_with := 'none');
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_1');
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
|
||||
-- shardCount is not null && cascade_to_colocated is true
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
|
||||
-- colocatewith is not null && cascade_to_colocated is true
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4, cascade_to_colocated := true);
|
||||
CREATE TABLE test_proc_colocation_2 (a float8);
|
||||
SELECT create_distributed_table('test_proc_colocation_2', 'a', colocate_with := 'none');
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_2', cascade_to_colocated := true);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0');
|
||||
|
||||
-- try a case with more than one procedure
|
||||
CREATE OR REPLACE procedure proc_1(dist_key float8)
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
DECLARE
|
||||
res INT := 0;
|
||||
BEGIN
|
||||
INSERT INTO test_proc_colocation_0 VALUES (dist_key);
|
||||
SELECT count(*) INTO res FROM test_proc_colocation_0;
|
||||
RAISE NOTICE 'Res: %', res;
|
||||
COMMIT;
|
||||
END;$$;
|
||||
SELECT create_distributed_function('proc_1(float8)', 'dist_key', 'test_proc_colocation_0' );
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
CALL proc_1(2.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true);
|
||||
|
||||
SET client_min_messages TO DEBUG1;
|
||||
CALL proc_0(1.0);
|
||||
CALL proc_1(2.0);
|
||||
RESET client_min_messages;
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
|
||||
-- case which shouldn't preserve colocation for now
|
||||
-- shardCount is not null && cascade_to_colocated is false
|
||||
SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 18, cascade_to_colocated := false);
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0');
|
||||
SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA mx_alter_distributed_table CASCADE;
|
||||
|
|
Loading…
Reference in New Issue