diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 0e4ab0251..f5bc4846f 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -43,6 +43,7 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/metadata/dependency.h" +#include "distributed/metadata/distobject.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_executor.h" @@ -719,6 +720,32 @@ ConvertTable(TableConversionState *con) CreateCitusTableLike(con); } + /* preserve colocation with procedures/functions */ + if (con->conversionType == ALTER_DISTRIBUTED_TABLE) + { + /* + * Updating the colocationId of functions is always desirable for + * the following scenario: + * we have shardCount or colocateWith change + * AND entire co-location group is altered + * The reason for the second condition is because we currently don't + * remember the original table specified in the colocateWith when + * distributing the function. We only remember the colocationId in + * pg_dist_object table. + */ + if ((!con->shardCountIsNull || con->colocateWith != NULL) && + (con->cascadeToColocated == CASCADE_TO_COLOCATED_YES || list_length( + con->colocatedTableList) == 1) && con->distributionColumn == NULL) + { + /* + * Update the colocationId from the one of the old relation to the one + * of the new relation for all tuples in citus.pg_dist_object + */ + UpdateDistributedObjectColocationId(TableColocationId(con->relationId), + TableColocationId(con->newRelationId)); + } + } + ReplaceTable(con->relationId, con->newRelationId, justBeforeDropCommands, con->suppressNoticeMessages); diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c index cc08f3f9f..9f4c3ddc4 100644 --- a/src/backend/distributed/metadata/distobject.c +++ b/src/backend/distributed/metadata/distobject.c @@ -373,3 +373,56 @@ GetDistributedObjectAddressList(void) return objectAddressList; } + + +/* + * UpdateDistributedObjectColocationId gets an old and a new colocationId + * and updates the colocationId of all tuples in citus.pg_dist_object which + * have the old colocationId to the new colocationId. + */ +void +UpdateDistributedObjectColocationId(uint32 oldColocationId, + uint32 newColocationId) +{ + const bool indexOK = false; + ScanKeyData scanKey[1]; + Relation pgDistObjectRel = table_open(DistObjectRelationId(), + RowExclusiveLock); + TupleDesc tupleDescriptor = RelationGetDescr(pgDistObjectRel); + + /* scan pg_dist_object for colocationId equal to old colocationId */ + ScanKeyInit(&scanKey[0], Anum_pg_dist_object_colocationid, + BTEqualStrategyNumber, + F_INT4EQ, UInt32GetDatum(oldColocationId)); + + SysScanDesc scanDescriptor = systable_beginscan(pgDistObjectRel, + InvalidOid, + indexOK, + NULL, 1, scanKey); + HeapTuple heapTuple; + while (HeapTupleIsValid(heapTuple = systable_getnext(scanDescriptor))) + { + Datum values[Natts_pg_dist_object]; + bool isnull[Natts_pg_dist_object]; + bool replace[Natts_pg_dist_object]; + + memset(replace, 0, sizeof(replace)); + + replace[Anum_pg_dist_object_colocationid - 1] = true; + + /* update the colocationId to the new one */ + values[Anum_pg_dist_object_colocationid - 1] = UInt32GetDatum(newColocationId); + + isnull[Anum_pg_dist_object_colocationid - 1] = false; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, + replace); + + CatalogTupleUpdate(pgDistObjectRel, &heapTuple->t_self, heapTuple); + CitusInvalidateRelcacheByRelid(DistObjectRelationId()); + } + + systable_endscan(scanDescriptor); + table_close(pgDistObjectRel, NoLock); + CommandCounterIncrement(); +} diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h index 6d857ef3f..6a61e3c67 100644 --- a/src/include/distributed/metadata/distobject.h +++ b/src/include/distributed/metadata/distobject.h @@ -27,5 +27,6 @@ extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target, ObjectAddress *extensionAddress); extern List * GetDistributedObjectAddressList(void); - +extern void UpdateDistributedObjectColocationId(uint32 oldColocationId, uint32 + newColocationId); #endif /* CITUS_METADATA_DISTOBJECT_H */ diff --git a/src/test/regress/expected/multi_mx_alter_distributed_table.out b/src/test/regress/expected/multi_mx_alter_distributed_table.out index 91e6c835f..43a3e00db 100644 --- a/src/test/regress/expected/multi_mx_alter_distributed_table.out +++ b/src/test/regress/expected/multi_mx_alter_distributed_table.out @@ -1,6 +1,8 @@ CREATE SCHEMA mx_alter_distributed_table; SET search_path TO mx_alter_distributed_table; SET citus.shard_replication_factor TO 1; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1410000; +SET citus.replication_model TO 'streaming'; -- test alter_distributed_table UDF CREATE TABLE adt_table (a INT, b INT); CREATE TABLE adt_col (a INT UNIQUE, b INT); @@ -159,5 +161,317 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi adt_table | distributed | a | 6 (1 row) +-- test procedure colocation is preserved with alter_distributed_table +CREATE TABLE test_proc_colocation_0 (a float8); +SELECT create_distributed_table('test_proc_colocation_0', 'a'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE procedure proc_0(dist_key float8) +LANGUAGE plpgsql +AS $$ +DECLARE + res INT := 0; +BEGIN + INSERT INTO test_proc_colocation_0 VALUES (dist_key); + SELECT count(*) INTO res FROM test_proc_colocation_0; + RAISE NOTICE 'Res: %', res; + COMMIT; +END;$$; +SELECT create_distributed_function('proc_0(float8)', 'dist_key', 'test_proc_colocation_0' ); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410002 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410002 +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 1 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +-- shardCount is not null && list_length(colocatedTableList) = 1 +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 2 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410003 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410003 +(1 row) + +-- colocatewith is not null && list_length(colocatedTableList) = 1 +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE test_proc_colocation_1 (a float8); +SELECT create_distributed_table('test_proc_colocation_1', 'a', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_1'); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 3 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410004 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410004 +(1 row) + +-- shardCount is not null && cascade_to_colocated is true +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 4 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410003 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410003 +(1 row) + +-- colocatewith is not null && cascade_to_colocated is true +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4, cascade_to_colocated := true); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE test_proc_colocation_2 (a float8); +SELECT create_distributed_table('test_proc_colocation_2', 'a', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_2', cascade_to_colocated := true); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 5 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410005 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410005 +(1 row) + +-- try a case with more than one procedure +CREATE OR REPLACE procedure proc_1(dist_key float8) +LANGUAGE plpgsql +AS $$ +DECLARE + res INT := 0; +BEGIN + INSERT INTO test_proc_colocation_0 VALUES (dist_key); + SELECT count(*) INTO res FROM test_proc_colocation_0; + RAISE NOTICE 'Res: %', res; + COMMIT; +END;$$; +SELECT create_distributed_function('proc_1(float8)', 'dist_key', 'test_proc_colocation_0' ); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410005 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410005 + proc_1 | 1410005 +(2 rows) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 6 +DETAIL: from localhost:xxxxx +CALL proc_1(2.0); +DEBUG: pushing down the procedure +NOTICE: Res: 7 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_2 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_2 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_2 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_2 +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_1 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_1 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +DEBUG: pushing down the procedure +NOTICE: Res: 8 +DETAIL: from localhost:xxxxx +CALL proc_1(2.0); +DEBUG: pushing down the procedure +NOTICE: Res: 9 +DETAIL: from localhost:xxxxx +RESET client_min_messages; +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410003 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410003 + proc_1 | 1410003 +(2 rows) + +-- case which shouldn't preserve colocation for now +-- shardCount is not null && cascade_to_colocated is false +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 18, cascade_to_colocated := false); +NOTICE: creating a new table for mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Moving the data of mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Dropping the old mx_alter_distributed_table.test_proc_colocation_0 +NOTICE: Renaming the new table to mx_alter_distributed_table.test_proc_colocation_0 + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); + logicalrelid | colocationid +--------------------------------------------------------------------- + test_proc_colocation_0 | 1410006 +(1 row) + +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + proname | colocationid +--------------------------------------------------------------------- + proc_0 | 1410003 + proc_1 | 1410003 +(2 rows) + SET client_min_messages TO WARNING; DROP SCHEMA mx_alter_distributed_table CASCADE; diff --git a/src/test/regress/sql/multi_mx_alter_distributed_table.sql b/src/test/regress/sql/multi_mx_alter_distributed_table.sql index 572452496..976ffb64e 100644 --- a/src/test/regress/sql/multi_mx_alter_distributed_table.sql +++ b/src/test/regress/sql/multi_mx_alter_distributed_table.sql @@ -1,6 +1,8 @@ CREATE SCHEMA mx_alter_distributed_table; SET search_path TO mx_alter_distributed_table; SET citus.shard_replication_factor TO 1; +ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1410000; +SET citus.replication_model TO 'streaming'; -- test alter_distributed_table UDF CREATE TABLE adt_table (a INT, b INT); @@ -48,5 +50,114 @@ END; SELECT table_name, citus_table_type, distribution_column, shard_count FROM public.citus_tables WHERE table_name::text = 'adt_table'; +-- test procedure colocation is preserved with alter_distributed_table +CREATE TABLE test_proc_colocation_0 (a float8); +SELECT create_distributed_table('test_proc_colocation_0', 'a'); + +CREATE OR REPLACE procedure proc_0(dist_key float8) +LANGUAGE plpgsql +AS $$ +DECLARE + res INT := 0; +BEGIN + INSERT INTO test_proc_colocation_0 VALUES (dist_key); + SELECT count(*) INTO res FROM test_proc_colocation_0; + RAISE NOTICE 'Res: %', res; + COMMIT; +END;$$; +SELECT create_distributed_function('proc_0(float8)', 'dist_key', 'test_proc_colocation_0' ); + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +RESET client_min_messages; + +-- shardCount is not null && list_length(colocatedTableList) = 1 +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +RESET client_min_messages; + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + +-- colocatewith is not null && list_length(colocatedTableList) = 1 +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4); +CREATE TABLE test_proc_colocation_1 (a float8); +SELECT create_distributed_table('test_proc_colocation_1', 'a', colocate_with := 'none'); +SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_1'); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +RESET client_min_messages; + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + +-- shardCount is not null && cascade_to_colocated is true +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +RESET client_min_messages; + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + +-- colocatewith is not null && cascade_to_colocated is true +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 4, cascade_to_colocated := true); +CREATE TABLE test_proc_colocation_2 (a float8); +SELECT create_distributed_table('test_proc_colocation_2', 'a', colocate_with := 'none'); +SELECT alter_distributed_table('test_proc_colocation_0', colocate_with := 'test_proc_colocation_2', cascade_to_colocated := true); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +RESET client_min_messages; + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0'); + +-- try a case with more than one procedure +CREATE OR REPLACE procedure proc_1(dist_key float8) +LANGUAGE plpgsql +AS $$ +DECLARE + res INT := 0; +BEGIN + INSERT INTO test_proc_colocation_0 VALUES (dist_key); + SELECT count(*) INTO res FROM test_proc_colocation_0; + RAISE NOTICE 'Res: %', res; + COMMIT; +END;$$; +SELECT create_distributed_function('proc_1(float8)', 'dist_key', 'test_proc_colocation_0' ); + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +CALL proc_1(2.0); +RESET client_min_messages; + +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 8, cascade_to_colocated := true); + +SET client_min_messages TO DEBUG1; +CALL proc_0(1.0); +CALL proc_1(2.0); +RESET client_min_messages; + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + +-- case which shouldn't preserve colocation for now +-- shardCount is not null && cascade_to_colocated is false +SELECT alter_distributed_table('test_proc_colocation_0', shard_count:= 18, cascade_to_colocated := false); + +SELECT logicalrelid, colocationid FROM pg_dist_partition WHERE logicalrelid::regclass::text IN ('test_proc_colocation_0'); +SELECT proname, colocationid FROM pg_proc JOIN citus.pg_dist_object ON pg_proc.oid = citus.pg_dist_object.objid WHERE proname IN ('proc_0', 'proc_1') ORDER BY proname; + SET client_min_messages TO WARNING; DROP SCHEMA mx_alter_distributed_table CASCADE;