From 35ec9721b42097d1632d3da8dd87e8c243467cd6 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Tue, 22 Feb 2022 16:13:53 +0100 Subject: [PATCH] Add a new API for enabling Citus MX for clusters upgrading from earlier versions Clusters created pre-Citus 11 mostly didn't have metadata sync enabled. For those clusters, we add a utility UDF which fixes some minor issues and sync the necessary objects to the workers. --- src/backend/distributed/commands/extension.c | 21 ++ .../distributed/metadata/metadata_sync.c | 3 +- .../distributed/sql/citus--10.2-4--11.0-1.sql | 3 + .../sql/downgrades/citus--11.0-1--10.2-4.sql | 3 +- .../11.0-1.sql | 224 ++++++++++++++++++ .../latest.sql | 224 ++++++++++++++++++ src/include/distributed/metadata_sync.h | 1 + .../after_citus_upgrade_coord_schedule | 1 + .../before_citus_upgrade_coord_schedule | 1 + src/test/regress/expected/multi_extension.out | 5 +- .../expected/upgrade_list_citus_objects.out | 3 +- .../upgrade_pg_dist_object_test_after.out | 42 +++- .../upgrade_pg_dist_object_test_after_1.out | 54 +++++ .../expected/upgrade_post_11_after.out | 76 ++++++ .../expected/upgrade_post_11_before.out | 184 ++++++++++++++ src/test/regress/sql/multi_extension.sql | 2 +- .../regress/sql/upgrade_post_11_after.sql | 42 ++++ .../regress/sql/upgrade_post_11_before.sql | 155 ++++++++++++ 18 files changed, 1029 insertions(+), 15 deletions(-) create mode 100644 src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql create mode 100644 src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql create mode 100644 src/test/regress/expected/upgrade_pg_dist_object_test_after_1.out create mode 100644 src/test/regress/expected/upgrade_post_11_after.out create mode 100644 src/test/regress/expected/upgrade_post_11_before.out create mode 100644 src/test/regress/sql/upgrade_post_11_after.sql create mode 100644 src/test/regress/sql/upgrade_post_11_before.sql diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index f585b6a67..3aa782c06 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -513,6 +513,16 @@ MarkExistingObjectDependenciesDistributedIfSupported() ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); + if (ShouldSyncTableMetadata(citusTableId)) + { + /* we need to pass pointer allocated in the heap */ + ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); + *addressPointer = tableAddress; + + /* as of Citus 11, tables that should be synced are also considered object */ + resultingObjectAddresses = lappend(resultingObjectAddresses, addressPointer); + } + List *distributableDependencyObjectAddresses = GetDistributableDependenciesForObject(&tableAddress); @@ -536,11 +546,22 @@ MarkExistingObjectDependenciesDistributedIfSupported() /* remove duplicates from object addresses list for efficiency */ List *uniqueObjectAddresses = GetUniqueDependenciesList(resultingObjectAddresses); + /* + * We should sync the new dependencies during ALTER EXTENSION because + * we cannot know whether the nodes has already been upgraded or not. If + * the nodes are not upgraded at this point, we cannot sync the object. Also, + * when the workers upgraded, they'd get the same objects anyway. + */ + bool prevMetadataSyncValue = EnableMetadataSync; + SetLocalEnableMetadataSync(false); + ObjectAddress *objectAddress = NULL; foreach_ptr(objectAddress, uniqueObjectAddresses) { MarkObjectDistributed(objectAddress); } + + SetLocalEnableMetadataSync(prevMetadataSyncValue); } diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 651c70d35..ef7e64ec1 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -107,7 +107,6 @@ static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId); static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission); static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem); -static void SetLocalEnableMetadataSync(bool state); static void SetLocalReplicateReferenceTablesOnActivate(bool state); static char * GenerateSetRoleQuery(Oid roleOid); static void MetadataSyncSigTermHandler(SIGNAL_ARGS); @@ -1948,7 +1947,7 @@ GetAccessPrivObjectForGrantStmt(char *permission) /* * SetLocalEnableMetadataSync sets the enable_metadata_sync locally */ -static void +void SetLocalEnableMetadataSync(bool state) { set_config_option("citus.enable_metadata_sync", state == true ? "on" : "off", diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 78156d634..d836d4b72 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -80,3 +80,6 @@ BEGIN UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb(partitioned_table_exists), true); END; $$; + +#include "udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql" + diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 21f9107da..f11a9d450 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -90,7 +90,6 @@ DROP FUNCTION pg_catalog.citus_shard_indexes_on_worker(); #include "../udfs/create_distributed_function/9.0-1.sql" ALTER TABLE citus.pg_dist_object DROP COLUMN force_delegation; - SET search_path = 'pg_catalog'; @@ -347,4 +346,6 @@ JOIN ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog; GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC; +DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool); + RESET search_path; diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql new file mode 100644 index 000000000..d93dd0f93 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql @@ -0,0 +1,224 @@ +-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures +-- the upgrade to Citus 11 is finished successfully. Upgrade to +-- Citus 11 requires all active primary worker nodes to get the +-- metadata. And, this function's job is to sync the metadata to +-- the nodes that does not already have +-- once the function finishes without any errors and returns true +-- the cluster is ready for running distributed queries from +-- the worker nodes. When debug is enabled, the function provides +-- more information to the user. +CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true) + RETURNS bool + LANGUAGE plpgsql + AS $$ +BEGIN + + --------------------------------------------- + -- This script consists of N stages + -- Each step is documented, and if log level + -- is reduced to DEBUG1, each step is logged + -- as well + --------------------------------------------- + +------------------------------------------------------------------------------------------ + -- STAGE 0: Ensure no concurrent node metadata changing operation happens while this + -- script is running via acquiring a strong lock on the pg_dist_node +------------------------------------------------------------------------------------------ +BEGIN + LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT; + + EXCEPTION WHEN OTHERS THEN + RAISE 'Another node metadata changing operation is in progress, try again.'; +END; + +------------------------------------------------------------------------------------------ + -- STAGE 1: We want all the commands to run in the same transaction block. Without + -- sequential mode, metadata syncing cannot be done in a transaction block along with + -- other commands +------------------------------------------------------------------------------------------ + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + +------------------------------------------------------------------------------------------ + -- STAGE 2: Ensure we have the prerequisites + -- (a) only superuser can run this script + -- (b) cannot be executed when enable_ddl_propagation is False + -- (c) can only be executed from the coordinator +------------------------------------------------------------------------------------------ +DECLARE + is_superuser_running boolean := False; + enable_ddl_prop boolean:= False; + local_group_id int := 0; +BEGIN + SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user; + IF is_superuser_running IS NOT True THEN + RAISE EXCEPTION 'This operation can only be initiated by superuser'; + END IF; + + SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop; + IF enable_ddl_prop IS NOT True THEN + RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.'; + END IF; + + SELECT groupid INTO local_group_id FROM pg_dist_local_group; + + IF local_group_id != 0 THEN + RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.'; + ELSE + RAISE DEBUG 'We are on the coordinator, continue to sync metadata'; + END IF; +END; + + + ------------------------------------------------------------------------------------------ + -- STAGE 3: Ensure all primary nodes are active + ------------------------------------------------------------------------------------------ + DECLARE + primary_disabled_worker_node_count int := 0; + BEGIN + SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node + WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive; + + IF primary_disabled_worker_node_count != 0 THEN + RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.' + 'Use SELECT citus_activate_node() to activate the disabled nodes'; + ELSE + RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 4: Ensure there is no connectivity issues in the cluster + ------------------------------------------------------------------------------------------ + DECLARE + all_nodes_can_connect_to_each_other boolean := False; + BEGIN + SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health(); + + IF all_nodes_can_connect_to_each_other != True THEN + RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all ' + 'nodes are up and runnnig. Also, make sure that all nodes can connect ' + 'to each other. Use SELECT * FROM citus_check_cluster_node_health(); ' + 'to check the cluster health'; + ELSE + RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 5: Ensure all nodes are on the same version + ------------------------------------------------------------------------------------------ + DECLARE + coordinator_version text := ''; + worker_node_version text := ''; + worker_node_version_count int := 0; + + BEGIN + SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus'; + + -- first, check if all nodes have the same versions + SELECT + count(*) INTO worker_node_version_count + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + IF enforce_version_check AND worker_node_version_count != 1 THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the some of the workers has version different versions'; + ELSE + RAISE DEBUG 'All worker nodes have the same Citus version'; + END IF; + + -- second, check if all nodes have the same versions + SELECT + result INTO worker_node_version + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + + IF enforce_version_check AND coordinator_version != worker_node_version THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the coordinator has version % and the worker(s) has %', + coordinator_version, worker_node_version; + ELSE + RAISE DEBUG 'All nodes have the same Citus version'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 6: Ensure all the partitioned tables have the proper naming structure + -- As described on https://github.com/citusdata/citus/issues/4962 + -- existing indexes on partitioned distributed tables can collide + -- with the index names exists on the shards + -- luckily, we know how to fix it. + -- And, note that we should do this even if the cluster is a basic plan + -- (e.g., single node Citus) such that when cluster scaled out, everything + -- works as intended + -- And, this should be done only ONCE for a cluster as it can be a pretty + -- time consuming operation. Thus, even if the function is called multiple time, + -- we keep track of it and do not re-execute this part if not needed. + ------------------------------------------------------------------------------------------ + DECLARE + partitioned_table_exists_pre_11 boolean:=False; + BEGIN + + -- we recorded if partitioned tables exists during upgrade to Citus 11 + SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11 + FROM pg_dist_node_metadata; + + IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN + + -- this might take long depending on the number of partitions and shards... + RAISE NOTICE 'Preparing all the existing partitioned table indexes'; + PERFORM pg_catalog.fix_all_partition_shard_index_names(); + + -- great, we are done with fixing the existing wrong index names + -- so, lets remove this + UPDATE pg_dist_node_metadata + SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11'); + ELSE + RAISE DEBUG 'There are no partitioned tables that should be fixed'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 7: Return early if there are no primary worker nodes + -- We don't strictly need this step, but it gives a nicer notice message + ------------------------------------------------------------------------------------------ + DECLARE + primary_worker_node_count bigint :=0; + BEGIN + SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + + IF primary_worker_node_count = 0 THEN + RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node'; + RETURN true; + ELSE + RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 8: Do the actual metadata & object syncing to the worker nodes + -- For the "already synced" metadata nodes, we do not strictly need to + -- sync the objects & metadata, but there is no harm to do it anyway + -- it'll only cost some execution time but makes sure that we have a + -- a consistent metadata & objects across all the nodes + ------------------------------------------------------------------------------------------ + DECLARE + BEGIN + + -- this might take long depending on the number of tables & objects ... + RAISE NOTICE 'Preparing to sync the metadata to all nodes'; + + PERFORM start_metadata_sync_to_node(nodename,nodeport) + FROM + pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + END; + + RETURN true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) + IS 'finalizes upgrade to Citus'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC; diff --git a/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql new file mode 100644 index 000000000..d93dd0f93 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/latest.sql @@ -0,0 +1,224 @@ +-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures +-- the upgrade to Citus 11 is finished successfully. Upgrade to +-- Citus 11 requires all active primary worker nodes to get the +-- metadata. And, this function's job is to sync the metadata to +-- the nodes that does not already have +-- once the function finishes without any errors and returns true +-- the cluster is ready for running distributed queries from +-- the worker nodes. When debug is enabled, the function provides +-- more information to the user. +CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true) + RETURNS bool + LANGUAGE plpgsql + AS $$ +BEGIN + + --------------------------------------------- + -- This script consists of N stages + -- Each step is documented, and if log level + -- is reduced to DEBUG1, each step is logged + -- as well + --------------------------------------------- + +------------------------------------------------------------------------------------------ + -- STAGE 0: Ensure no concurrent node metadata changing operation happens while this + -- script is running via acquiring a strong lock on the pg_dist_node +------------------------------------------------------------------------------------------ +BEGIN + LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT; + + EXCEPTION WHEN OTHERS THEN + RAISE 'Another node metadata changing operation is in progress, try again.'; +END; + +------------------------------------------------------------------------------------------ + -- STAGE 1: We want all the commands to run in the same transaction block. Without + -- sequential mode, metadata syncing cannot be done in a transaction block along with + -- other commands +------------------------------------------------------------------------------------------ + SET LOCAL citus.multi_shard_modify_mode TO 'sequential'; + +------------------------------------------------------------------------------------------ + -- STAGE 2: Ensure we have the prerequisites + -- (a) only superuser can run this script + -- (b) cannot be executed when enable_ddl_propagation is False + -- (c) can only be executed from the coordinator +------------------------------------------------------------------------------------------ +DECLARE + is_superuser_running boolean := False; + enable_ddl_prop boolean:= False; + local_group_id int := 0; +BEGIN + SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user; + IF is_superuser_running IS NOT True THEN + RAISE EXCEPTION 'This operation can only be initiated by superuser'; + END IF; + + SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop; + IF enable_ddl_prop IS NOT True THEN + RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.'; + END IF; + + SELECT groupid INTO local_group_id FROM pg_dist_local_group; + + IF local_group_id != 0 THEN + RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.'; + ELSE + RAISE DEBUG 'We are on the coordinator, continue to sync metadata'; + END IF; +END; + + + ------------------------------------------------------------------------------------------ + -- STAGE 3: Ensure all primary nodes are active + ------------------------------------------------------------------------------------------ + DECLARE + primary_disabled_worker_node_count int := 0; + BEGIN + SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node + WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive; + + IF primary_disabled_worker_node_count != 0 THEN + RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.' + 'Use SELECT citus_activate_node() to activate the disabled nodes'; + ELSE + RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 4: Ensure there is no connectivity issues in the cluster + ------------------------------------------------------------------------------------------ + DECLARE + all_nodes_can_connect_to_each_other boolean := False; + BEGIN + SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health(); + + IF all_nodes_can_connect_to_each_other != True THEN + RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all ' + 'nodes are up and runnnig. Also, make sure that all nodes can connect ' + 'to each other. Use SELECT * FROM citus_check_cluster_node_health(); ' + 'to check the cluster health'; + ELSE + RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 5: Ensure all nodes are on the same version + ------------------------------------------------------------------------------------------ + DECLARE + coordinator_version text := ''; + worker_node_version text := ''; + worker_node_version_count int := 0; + + BEGIN + SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus'; + + -- first, check if all nodes have the same versions + SELECT + count(*) INTO worker_node_version_count + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + IF enforce_version_check AND worker_node_version_count != 1 THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the some of the workers has version different versions'; + ELSE + RAISE DEBUG 'All worker nodes have the same Citus version'; + END IF; + + -- second, check if all nodes have the same versions + SELECT + result INTO worker_node_version + FROM + run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';') + GROUP BY result; + + IF enforce_version_check AND coordinator_version != worker_node_version THEN + RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently ' + 'the coordinator has version % and the worker(s) has %', + coordinator_version, worker_node_version; + ELSE + RAISE DEBUG 'All nodes have the same Citus version'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 6: Ensure all the partitioned tables have the proper naming structure + -- As described on https://github.com/citusdata/citus/issues/4962 + -- existing indexes on partitioned distributed tables can collide + -- with the index names exists on the shards + -- luckily, we know how to fix it. + -- And, note that we should do this even if the cluster is a basic plan + -- (e.g., single node Citus) such that when cluster scaled out, everything + -- works as intended + -- And, this should be done only ONCE for a cluster as it can be a pretty + -- time consuming operation. Thus, even if the function is called multiple time, + -- we keep track of it and do not re-execute this part if not needed. + ------------------------------------------------------------------------------------------ + DECLARE + partitioned_table_exists_pre_11 boolean:=False; + BEGIN + + -- we recorded if partitioned tables exists during upgrade to Citus 11 + SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11 + FROM pg_dist_node_metadata; + + IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN + + -- this might take long depending on the number of partitions and shards... + RAISE NOTICE 'Preparing all the existing partitioned table indexes'; + PERFORM pg_catalog.fix_all_partition_shard_index_names(); + + -- great, we are done with fixing the existing wrong index names + -- so, lets remove this + UPDATE pg_dist_node_metadata + SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11'); + ELSE + RAISE DEBUG 'There are no partitioned tables that should be fixed'; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 7: Return early if there are no primary worker nodes + -- We don't strictly need this step, but it gives a nicer notice message + ------------------------------------------------------------------------------------------ + DECLARE + primary_worker_node_count bigint :=0; + BEGIN + SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + + IF primary_worker_node_count = 0 THEN + RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node'; + RETURN true; + ELSE + RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count; + END IF; + END; + + ------------------------------------------------------------------------------------------ + -- STAGE 8: Do the actual metadata & object syncing to the worker nodes + -- For the "already synced" metadata nodes, we do not strictly need to + -- sync the objects & metadata, but there is no harm to do it anyway + -- it'll only cost some execution time but makes sure that we have a + -- a consistent metadata & objects across all the nodes + ------------------------------------------------------------------------------------------ + DECLARE + BEGIN + + -- this might take long depending on the number of tables & objects ... + RAISE NOTICE 'Preparing to sync the metadata to all nodes'; + + PERFORM start_metadata_sync_to_node(nodename,nodeport) + FROM + pg_dist_node WHERE groupid != 0 AND noderole = 'primary'; + END; + + RETURN true; +END; +$$; +COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) + IS 'finalizes upgrade to Citus'; + +REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC; diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 31154a203..905d2cab5 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -75,6 +75,7 @@ extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList, attnum); extern List * GetDependentFunctionsWithRelation(Oid relationId); extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum); +extern void SetLocalEnableMetadataSync(bool state); #define DELETE_ALL_NODES "DELETE FROM pg_dist_node" #define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement" diff --git a/src/test/regress/after_citus_upgrade_coord_schedule b/src/test/regress/after_citus_upgrade_coord_schedule index 49b4e73d9..e73837c59 100644 --- a/src/test/regress/after_citus_upgrade_coord_schedule +++ b/src/test/regress/after_citus_upgrade_coord_schedule @@ -4,3 +4,4 @@ test: upgrade_basic_after test: upgrade_partition_constraints_after test: upgrade_pg_dist_object_test_after test: upgrade_columnar_metapage_after +test: upgrade_post_11_after diff --git a/src/test/regress/before_citus_upgrade_coord_schedule b/src/test/regress/before_citus_upgrade_coord_schedule index 006217e8a..169a7f418 100644 --- a/src/test/regress/before_citus_upgrade_coord_schedule +++ b/src/test/regress/before_citus_upgrade_coord_schedule @@ -4,3 +4,4 @@ test: upgrade_basic_before test: upgrade_partition_constraints_before test: upgrade_pg_dist_object_test_before test: upgrade_columnar_metapage_before +test: upgrade_post_11_before diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index c7fcc73e5..79d21dd3e 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -966,7 +966,7 @@ DELETE FROM pg_dist_shard WHERE shardid = 1; CREATE TABLE e_transactions(order_id varchar(255) NULL, transaction_id int) PARTITION BY LIST(transaction_id); CREATE TABLE orders_2020_07_01 PARTITION OF e_transactions FOR VALUES IN (1,2,3); -INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', NULL, 7, 's'); +INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', '{VAR :varno 1 :varattno 1 :vartype 1043 :vartypmod 259 :varcollid 100 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}', 7, 's'); SELECT (metadata->>'partitioned_citus_table_exists_pre_11')::boolean as partitioned_citus_table_exists_pre_11, (metadata->>'partitioned_citus_table_exists_pre_11') IS NULL as is_null @@ -1012,6 +1012,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void + | function citus_finalize_upgrade_to_citus11(boolean) boolean | function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void | function citus_internal_global_blocked_processes() SETOF record | function citus_internal_local_blocked_processes() SETOF record @@ -1025,7 +1026,7 @@ SELECT * FROM multi_extension.print_extension_changes(); | function worker_drop_sequence_dependency(text) void | function worker_drop_shell_table(text) void | function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record -(22 rows) +(23 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index e5e7b9215..5f5ea7b1e 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -57,6 +57,7 @@ ORDER BY 1; function citus_drop_trigger() function citus_executor_name(integer) function citus_extradata_container(internal) + function citus_finalize_upgrade_to_citus11(boolean) function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() function citus_internal.columnar_ensure_am_depends_catalog() @@ -272,5 +273,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(256 rows) +(257 rows) diff --git a/src/test/regress/expected/upgrade_pg_dist_object_test_after.out b/src/test/regress/expected/upgrade_pg_dist_object_test_after.out index 616b4fc32..443cb6205 100644 --- a/src/test/regress/expected/upgrade_pg_dist_object_test_after.out +++ b/src/test/regress/expected/upgrade_pg_dist_object_test_after.out @@ -39,12 +39,38 @@ drop cascades to table upgrade_basic.t_range SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3; type | object_names | object_args --------------------------------------------------------------------- - database | {postgres} | {} - extension | {isn} | {} - role | {postgres} | {} - schema | {fooschema} | {} - schema | {new_schema} | {} - schema | {public} | {} - type | {fooschema.footype} | {} - (7 rows) + collation | {post_11_upgrade,german_phonebook_unpropagated} | {} + database | {postgres} | {} + extension | {isn} | {} + extension | {plpgsql} | {} + function | {post_11_upgrade,func_in_transaction_def} | {} + role | {postgres} | {} + schema | {fooschema} | {} + schema | {new_schema} | {} + schema | {post_11_upgrade} | {} + schema | {public} | {} + table | {fooschema,footable} | {} + table | {new_schema,another_dist_table} | {} + table | {post_11_upgrade,colocated_dist_table} | {} + table | {post_11_upgrade,colocated_partitioned_table} | {} + table | {post_11_upgrade,colocated_partitioned_table_2020_01_01} | {} + table | {post_11_upgrade,dist} | {} + table | {post_11_upgrade,index_backed_rep_identity} | {} + table | {post_11_upgrade,part_table} | {} + table | {post_11_upgrade,part_table_p202008} | {} + table | {post_11_upgrade,part_table_p202009} | {} + table | {post_11_upgrade,reference_table} | {} + table | {post_11_upgrade,sensors} | {} + table | {post_11_upgrade,sensors_2020_01_01} | {} + table | {post_11_upgrade,sensors_news} | {} + table | {post_11_upgrade,sensors_old} | {} + table | {post_11_upgrade,sensors_parser} | {} + table | {post_11_upgrade,sensors_parser_a_partition} | {} + table | {post_11_upgrade,test_propagate_collate} | {} + table | {public,dist_table} | {} + table | {public,isn_dist_table} | {} + text search configuration | {post_11_upgrade,partial_index_test_config} | {} + type | {fooschema.footype} | {} + type | {post_11_upgrade.my_type} | {} + (33 rows) diff --git a/src/test/regress/expected/upgrade_pg_dist_object_test_after_1.out b/src/test/regress/expected/upgrade_pg_dist_object_test_after_1.out new file mode 100644 index 000000000..743164bfc --- /dev/null +++ b/src/test/regress/expected/upgrade_pg_dist_object_test_after_1.out @@ -0,0 +1,54 @@ +-- run this test only when old citus version is 9.0 +\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"` +SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 9 AND + substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int = 0 +AS upgrade_test_old_citus_version_e_9_0; + upgrade_test_old_citus_version_e_9_0 +--------------------------------------------------------------------- + t +(1 row) + +\gset +\if :upgrade_test_old_citus_version_e_9_0 +\else +\q +\endif +-- drop objects from previous test (uprade_basic_after.sql) for a clean test +-- drop upgrade_basic schema and switch back to public schema +SET search_path to public; +DROP SCHEMA upgrade_basic CASCADE; +NOTICE: drop cascades to 7 other objects +DETAIL: drop cascades to table upgrade_basic.t +drop cascades to table upgrade_basic.tp +drop cascades to table upgrade_basic.t_ab +drop cascades to table upgrade_basic.t2 +drop cascades to table upgrade_basic.r +drop cascades to table upgrade_basic.tr +drop cascades to table upgrade_basic.t_range +-- as we updated citus to available version, +-- "isn" extension +-- "new_schema" schema +-- "public" schema +-- "fooschema" schema +-- "footype" type (under schema 'fooschema') + -- will now be marked as distributed + -- but, + -- "seg" extension + -- will not be marked as distributed +-- see underlying objects +SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3; + type | object_names | object_args +--------------------------------------------------------------------- + database | {postgres} | {} + extension | {isn} | {} + role | {postgres} | {} + schema | {fooschema} | {} + schema | {new_schema} | {} + schema | {public} | {} + table | {fooschema,footable} | {} + table | {new_schema,another_dist_table} | {} + table | {public,dist_table} | {} + table | {public,isn_dist_table} | {} + type | {fooschema.footype} | {} + (11 rows) + diff --git a/src/test/regress/expected/upgrade_post_11_after.out b/src/test/regress/expected/upgrade_post_11_after.out new file mode 100644 index 000000000..2da475c08 --- /dev/null +++ b/src/test/regress/expected/upgrade_post_11_after.out @@ -0,0 +1,76 @@ +SET search_path = post_11_upgrade; +-- make sure that we always (re)sync the metadata +UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true); +SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false); +NOTICE: Preparing all the existing partitioned table indexes +NOTICE: Preparing to sync the metadata to all nodes + citus_finalize_upgrade_to_citus11 +--------------------------------------------------------------------- + t +(1 row) + +-- tables are objects with Citus 11+ +SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1; + pg_identify_object_as_address +--------------------------------------------------------------------- + (function,"{post_11_upgrade,func_in_transaction_def}",{}) + (schema,{post_11_upgrade},{}) + (table,"{post_11_upgrade,part_table}",{}) + (table,"{post_11_upgrade,sensors}",{}) + ("text search configuration","{post_11_upgrade,partial_index_test_config}",{}) + (type,{post_11_upgrade.my_type},{}) +(6 rows) + +-- on all nodes +SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1; + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") + (localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}") +(2 rows) + +-- Create the necessary test utility function +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; +-- make sure that workers and the coordinator has the same datesyle +SET datestyle = "ISO, YMD"; +SELECT 1 FROM run_command_on_workers($$ALTER SYSTEM SET datestyle = "ISO, YMD";$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +-- make sure that the metadata is consistent across all nodes +-- we exclude the distributed_object_data as they are +-- not sorted in the same order (as OIDs differ on the nodes) +SELECT count(*) = 0 AS same_metadata_in_workers FROM +( + ( + SELECT unnest(activate_node_snapshot()) as command + EXCEPT + SELECT unnest(result::text[]) AS command + FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$) + ) +UNION + ( + SELECT unnest(result::text[]) AS command + FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$) + EXCEPT + SELECT unnest(activate_node_snapshot()) as command + ) +) AS foo WHERE command NOT ILIKE '%distributed_object_data%'; + same_metadata_in_workers +--------------------------------------------------------------------- + t +(1 row) + diff --git a/src/test/regress/expected/upgrade_post_11_before.out b/src/test/regress/expected/upgrade_post_11_before.out new file mode 100644 index 000000000..37bbab11b --- /dev/null +++ b/src/test/regress/expected/upgrade_post_11_before.out @@ -0,0 +1,184 @@ +-- test cases for #3970 +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE SCHEMA post_11_upgrade; +SET search_path = post_11_upgrade; +--1. create a partitioned table, and a vanilla table that will be colocated with this table +CREATE TABLE part_table ( + work_ymdt timestamp without time zone NOT NULL, + seq bigint NOT NULL, + my_seq bigint NOT NULL, + work_memo character varying(150), + CONSTRAINT work_memo_check CHECK ((octet_length((work_memo)::text) <= 150)), + PRIMARY KEY(seq, work_ymdt) +) +PARTITION BY RANGE (work_ymdt); +CREATE TABLE dist(seq bigint UNIQUE); +--2. perform create_distributed_table +SELECT create_distributed_table('part_table', 'seq'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('dist','seq'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +--3. add a partitions +CREATE TABLE part_table_p202008 PARTITION OF part_table FOR VALUES FROM ('2020-08-01 00:00:00') TO ('2020-09-01 00:00:00'); +CREATE TABLE part_table_p202009 PARTITION OF part_table FOR VALUES FROM ('2020-09-01 00:00:00') TO ('2020-10-01 00:00:00'); +--3. create indexes +CREATE INDEX i_part_1 ON part_table(seq); +CREATE INDEX i_part_2 ON part_table(my_seq, seq); +CREATE INDEX i_part_3 ON part_table(work_memo, seq); +CREATE TABLE sensors( +measureid integer, +eventdatetime date, +measure_data jsonb, +PRIMARY KEY (measureid, eventdatetime, measure_data)) +PARTITION BY RANGE(eventdatetime); +CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01'); +CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01'); +CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01'); +CREATE INDEX index_on_parent ON sensors(lower(measureid::text)); +CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text)); +CREATE INDEX hash_index ON sensors USING HASH((measure_data->'IsFailed')); +CREATE INDEX index_with_include ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime); +CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors; +CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01; +ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000; +ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000; +CLUSTER sensors_2020_01_01 USING index_on_child; +SELECT create_distributed_table('sensors', 'measureid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- create a colocated distributed tables and create foreign keys FROM/TO +-- the partitions +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +SELECT create_distributed_table('colocated_dist_table', 'measureid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CLUSTER colocated_dist_table USING colocated_dist_table_pkey; +WARNING: not propagating CLUSTER command to worker nodes +CREATE TABLE colocated_partitioned_table( + measureid integer, + eventdatetime date, + PRIMARY KEY (measureid, eventdatetime)) +PARTITION BY RANGE(eventdatetime); +CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01'); +SELECT create_distributed_table('colocated_partitioned_table', 'measureid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey; +WARNING: not propagating CLUSTER command to worker nodes +CREATE TABLE reference_table (measureid integer PRIMARY KEY); +SELECT create_reference_table('reference_table'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +-- this table is used to make sure that index backed +-- replica identites can have clustered indexes +-- and no index statistics +CREATE TABLE index_backed_rep_identity(key int NOT NULL); +CREATE UNIQUE INDEX uqx ON index_backed_rep_identity(key); +ALTER TABLE index_backed_rep_identity REPLICA IDENTITY USING INDEX uqx; +CLUSTER index_backed_rep_identity USING uqx; +SELECT create_distributed_table('index_backed_rep_identity', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- from parent to regular dist +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid); +-- from parent to parent +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime); +-- from parent to child +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime); +-- load some data +INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i; +INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i; +SET citus.enable_ddl_propagation TO off; +CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default ); +SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; +SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$;'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57636,t,SET) + (localhost,57637,t,SET) +(2 rows) + +CREATE TYPE post_11_upgrade.my_type AS (a int); +RESET citus.enable_ddl_propagation; +CREATE TABLE sensors_parser( + measureid integer, + eventdatetime date, + measure_data jsonb, + name text, + col_with_def int DEFAULT post_11_upgrade.func_in_transaction_def(), + col_with_type post_11_upgrade.my_type, + PRIMARY KEY (measureid, eventdatetime, measure_data) +) PARTITION BY RANGE(eventdatetime); +CREATE TABLE sensors_parser_a_partition PARTITION OF sensors_parser FOR VALUES FROM ('2000-01-01') TO ('2020-01-01'); +CREATE INDEX sensors_parser_search_name ON sensors_parser USING gin (to_tsvector('partial_index_test_config'::regconfig, (COALESCE(name, ''::character varying))::text)); +SELECT create_distributed_table('sensors_parser', 'measureid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SET citus.enable_ddl_propagation TO off; +CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk'); +SELECT 1 FROM run_command_on_workers($$CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +SET citus.enable_ddl_propagation TO on; +CREATE TABLE test_propagate_collate(id int, t2 text COLLATE german_phonebook_unpropagated); +SELECT create_distributed_table('test_propagate_collate', 'id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index bd97fc961..0bac6d067 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -430,7 +430,7 @@ DELETE FROM pg_dist_shard WHERE shardid = 1; CREATE TABLE e_transactions(order_id varchar(255) NULL, transaction_id int) PARTITION BY LIST(transaction_id); CREATE TABLE orders_2020_07_01 PARTITION OF e_transactions FOR VALUES IN (1,2,3); -INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', NULL, 7, 's'); +INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', '{VAR :varno 1 :varattno 1 :vartype 1043 :vartypmod 259 :varcollid 100 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1}', 7, 's'); SELECT (metadata->>'partitioned_citus_table_exists_pre_11')::boolean as partitioned_citus_table_exists_pre_11, diff --git a/src/test/regress/sql/upgrade_post_11_after.sql b/src/test/regress/sql/upgrade_post_11_after.sql new file mode 100644 index 000000000..5e08ca8b1 --- /dev/null +++ b/src/test/regress/sql/upgrade_post_11_after.sql @@ -0,0 +1,42 @@ +SET search_path = post_11_upgrade; + +-- make sure that we always (re)sync the metadata +UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true); +SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false); + +-- tables are objects with Citus 11+ +SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1; + +-- on all nodes +SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1; + +-- Create the necessary test utility function +CREATE OR REPLACE FUNCTION activate_node_snapshot() + RETURNS text[] + LANGUAGE C STRICT + AS 'citus'; + +-- make sure that workers and the coordinator has the same datesyle +SET datestyle = "ISO, YMD"; +SELECT 1 FROM run_command_on_workers($$ALTER SYSTEM SET datestyle = "ISO, YMD";$$); +SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$); + +-- make sure that the metadata is consistent across all nodes +-- we exclude the distributed_object_data as they are +-- not sorted in the same order (as OIDs differ on the nodes) +SELECT count(*) = 0 AS same_metadata_in_workers FROM +( + ( + SELECT unnest(activate_node_snapshot()) as command + EXCEPT + SELECT unnest(result::text[]) AS command + FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$) + ) +UNION + ( + SELECT unnest(result::text[]) AS command + FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$) + EXCEPT + SELECT unnest(activate_node_snapshot()) as command + ) +) AS foo WHERE command NOT ILIKE '%distributed_object_data%'; diff --git a/src/test/regress/sql/upgrade_post_11_before.sql b/src/test/regress/sql/upgrade_post_11_before.sql new file mode 100644 index 000000000..959b026f8 --- /dev/null +++ b/src/test/regress/sql/upgrade_post_11_before.sql @@ -0,0 +1,155 @@ + +-- test cases for #3970 +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; + +CREATE SCHEMA post_11_upgrade; +SET search_path = post_11_upgrade; + +--1. create a partitioned table, and a vanilla table that will be colocated with this table +CREATE TABLE part_table ( + work_ymdt timestamp without time zone NOT NULL, + seq bigint NOT NULL, + my_seq bigint NOT NULL, + work_memo character varying(150), + CONSTRAINT work_memo_check CHECK ((octet_length((work_memo)::text) <= 150)), + PRIMARY KEY(seq, work_ymdt) +) +PARTITION BY RANGE (work_ymdt); + +CREATE TABLE dist(seq bigint UNIQUE); + +--2. perform create_distributed_table +SELECT create_distributed_table('part_table', 'seq'); +SELECT create_distributed_table('dist','seq'); + +--3. add a partitions +CREATE TABLE part_table_p202008 PARTITION OF part_table FOR VALUES FROM ('2020-08-01 00:00:00') TO ('2020-09-01 00:00:00'); +CREATE TABLE part_table_p202009 PARTITION OF part_table FOR VALUES FROM ('2020-09-01 00:00:00') TO ('2020-10-01 00:00:00'); + +--3. create indexes +CREATE INDEX i_part_1 ON part_table(seq); +CREATE INDEX i_part_2 ON part_table(my_seq, seq); +CREATE INDEX i_part_3 ON part_table(work_memo, seq); + + +CREATE TABLE sensors( +measureid integer, +eventdatetime date, +measure_data jsonb, +PRIMARY KEY (measureid, eventdatetime, measure_data)) +PARTITION BY RANGE(eventdatetime); + +CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01'); +CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01'); +CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01'); + +CREATE INDEX index_on_parent ON sensors(lower(measureid::text)); +CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text)); +CREATE INDEX hash_index ON sensors USING HASH((measure_data->'IsFailed')); +CREATE INDEX index_with_include ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime); + +CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors; +CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01; + +ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000; +ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000; + +CLUSTER sensors_2020_01_01 USING index_on_child; +SELECT create_distributed_table('sensors', 'measureid'); + + +-- create a colocated distributed tables and create foreign keys FROM/TO +-- the partitions +CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY); +SELECT create_distributed_table('colocated_dist_table', 'measureid'); + +CLUSTER colocated_dist_table USING colocated_dist_table_pkey; + +CREATE TABLE colocated_partitioned_table( + measureid integer, + eventdatetime date, + PRIMARY KEY (measureid, eventdatetime)) +PARTITION BY RANGE(eventdatetime); + +CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01'); +SELECT create_distributed_table('colocated_partitioned_table', 'measureid'); + +CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey; + +CREATE TABLE reference_table (measureid integer PRIMARY KEY); +SELECT create_reference_table('reference_table'); + +-- this table is used to make sure that index backed +-- replica identites can have clustered indexes +-- and no index statistics +CREATE TABLE index_backed_rep_identity(key int NOT NULL); +CREATE UNIQUE INDEX uqx ON index_backed_rep_identity(key); +ALTER TABLE index_backed_rep_identity REPLICA IDENTITY USING INDEX uqx; +CLUSTER index_backed_rep_identity USING uqx; +SELECT create_distributed_table('index_backed_rep_identity', 'key'); + +-- from parent to regular dist +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid); + +-- from parent to parent +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime); + +-- from parent to child +ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime); + +-- load some data +INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i; +INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i; +INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i; + + +SET citus.enable_ddl_propagation TO off; +CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default ); +SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$); + +CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$; + +SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off; +CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def() +RETURNS int +LANGUAGE plpgsql AS +$$ +BEGIN + return 1; +END; +$$;'); + +CREATE TYPE post_11_upgrade.my_type AS (a int); + +RESET citus.enable_ddl_propagation; + +CREATE TABLE sensors_parser( + measureid integer, + eventdatetime date, + measure_data jsonb, + name text, + col_with_def int DEFAULT post_11_upgrade.func_in_transaction_def(), + col_with_type post_11_upgrade.my_type, + PRIMARY KEY (measureid, eventdatetime, measure_data) +) PARTITION BY RANGE(eventdatetime); +CREATE TABLE sensors_parser_a_partition PARTITION OF sensors_parser FOR VALUES FROM ('2000-01-01') TO ('2020-01-01'); +CREATE INDEX sensors_parser_search_name ON sensors_parser USING gin (to_tsvector('partial_index_test_config'::regconfig, (COALESCE(name, ''::character varying))::text)); +SELECT create_distributed_table('sensors_parser', 'measureid'); + + +SET citus.enable_ddl_propagation TO off; +CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk'); +SELECT 1 FROM run_command_on_workers($$CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');$$); +SET citus.enable_ddl_propagation TO on; + +CREATE TABLE test_propagate_collate(id int, t2 text COLLATE german_phonebook_unpropagated); +SELECT create_distributed_table('test_propagate_collate', 'id');