mirror of https://github.com/citusdata/citus.git
Merge pull request #5556 from citusdata/add_udf_for_upgrades
Add a new API for enabling Citus MX for clusters upgrading from earli…pull/5748/head
commit
b36c58f231
|
@ -513,6 +513,16 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
ObjectAddress tableAddress = { 0 };
|
||||
ObjectAddressSet(tableAddress, RelationRelationId, citusTableId);
|
||||
|
||||
if (ShouldSyncTableMetadata(citusTableId))
|
||||
{
|
||||
/* we need to pass pointer allocated in the heap */
|
||||
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
|
||||
*addressPointer = tableAddress;
|
||||
|
||||
/* as of Citus 11, tables that should be synced are also considered object */
|
||||
resultingObjectAddresses = lappend(resultingObjectAddresses, addressPointer);
|
||||
}
|
||||
|
||||
List *distributableDependencyObjectAddresses =
|
||||
GetDistributableDependenciesForObject(&tableAddress);
|
||||
|
||||
|
@ -536,11 +546,22 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
/* remove duplicates from object addresses list for efficiency */
|
||||
List *uniqueObjectAddresses = GetUniqueDependenciesList(resultingObjectAddresses);
|
||||
|
||||
/*
|
||||
* We should sync the new dependencies during ALTER EXTENSION because
|
||||
* we cannot know whether the nodes has already been upgraded or not. If
|
||||
* the nodes are not upgraded at this point, we cannot sync the object. Also,
|
||||
* when the workers upgraded, they'd get the same objects anyway.
|
||||
*/
|
||||
bool prevMetadataSyncValue = EnableMetadataSync;
|
||||
SetLocalEnableMetadataSync(false);
|
||||
|
||||
ObjectAddress *objectAddress = NULL;
|
||||
foreach_ptr(objectAddress, uniqueObjectAddresses)
|
||||
{
|
||||
MarkObjectDistributed(objectAddress);
|
||||
}
|
||||
|
||||
SetLocalEnableMetadataSync(prevMetadataSyncValue);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -107,7 +107,6 @@ static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId);
|
|||
static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission);
|
||||
static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,
|
||||
AclItem *aclItem);
|
||||
static void SetLocalEnableMetadataSync(bool state);
|
||||
static void SetLocalReplicateReferenceTablesOnActivate(bool state);
|
||||
static char * GenerateSetRoleQuery(Oid roleOid);
|
||||
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
|
||||
|
@ -1948,7 +1947,7 @@ GetAccessPrivObjectForGrantStmt(char *permission)
|
|||
/*
|
||||
* SetLocalEnableMetadataSync sets the enable_metadata_sync locally
|
||||
*/
|
||||
static void
|
||||
void
|
||||
SetLocalEnableMetadataSync(bool state)
|
||||
{
|
||||
set_config_option("citus.enable_metadata_sync", state == true ? "on" : "off",
|
||||
|
|
|
@ -80,3 +80,6 @@ BEGIN
|
|||
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb(partitioned_table_exists), true);
|
||||
END;
|
||||
$$;
|
||||
|
||||
#include "udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql"
|
||||
|
||||
|
|
|
@ -90,7 +90,6 @@ DROP FUNCTION pg_catalog.citus_shard_indexes_on_worker();
|
|||
#include "../udfs/create_distributed_function/9.0-1.sql"
|
||||
ALTER TABLE citus.pg_dist_object DROP COLUMN force_delegation;
|
||||
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
|
||||
|
@ -347,4 +346,6 @@ JOIN
|
|||
ALTER VIEW citus.citus_lock_waits SET SCHEMA pg_catalog;
|
||||
GRANT SELECT ON pg_catalog.citus_lock_waits TO PUBLIC;
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool);
|
||||
|
||||
RESET search_path;
|
||||
|
|
224
src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql
generated
Normal file
224
src/backend/distributed/sql/udfs/citus_finalize_upgrade_to_citus11/11.0-1.sql
generated
Normal file
|
@ -0,0 +1,224 @@
|
|||
-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures
|
||||
-- the upgrade to Citus 11 is finished successfully. Upgrade to
|
||||
-- Citus 11 requires all active primary worker nodes to get the
|
||||
-- metadata. And, this function's job is to sync the metadata to
|
||||
-- the nodes that does not already have
|
||||
-- once the function finishes without any errors and returns true
|
||||
-- the cluster is ready for running distributed queries from
|
||||
-- the worker nodes. When debug is enabled, the function provides
|
||||
-- more information to the user.
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true)
|
||||
RETURNS bool
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
|
||||
---------------------------------------------
|
||||
-- This script consists of N stages
|
||||
-- Each step is documented, and if log level
|
||||
-- is reduced to DEBUG1, each step is logged
|
||||
-- as well
|
||||
---------------------------------------------
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 0: Ensure no concurrent node metadata changing operation happens while this
|
||||
-- script is running via acquiring a strong lock on the pg_dist_node
|
||||
------------------------------------------------------------------------------------------
|
||||
BEGIN
|
||||
LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT;
|
||||
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE 'Another node metadata changing operation is in progress, try again.';
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 1: We want all the commands to run in the same transaction block. Without
|
||||
-- sequential mode, metadata syncing cannot be done in a transaction block along with
|
||||
-- other commands
|
||||
------------------------------------------------------------------------------------------
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 2: Ensure we have the prerequisites
|
||||
-- (a) only superuser can run this script
|
||||
-- (b) cannot be executed when enable_ddl_propagation is False
|
||||
-- (c) can only be executed from the coordinator
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
is_superuser_running boolean := False;
|
||||
enable_ddl_prop boolean:= False;
|
||||
local_group_id int := 0;
|
||||
BEGIN
|
||||
SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user;
|
||||
IF is_superuser_running IS NOT True THEN
|
||||
RAISE EXCEPTION 'This operation can only be initiated by superuser';
|
||||
END IF;
|
||||
|
||||
SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop;
|
||||
IF enable_ddl_prop IS NOT True THEN
|
||||
RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.';
|
||||
END IF;
|
||||
|
||||
SELECT groupid INTO local_group_id FROM pg_dist_local_group;
|
||||
|
||||
IF local_group_id != 0 THEN
|
||||
RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.';
|
||||
ELSE
|
||||
RAISE DEBUG 'We are on the coordinator, continue to sync metadata';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 3: Ensure all primary nodes are active
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
primary_disabled_worker_node_count int := 0;
|
||||
BEGIN
|
||||
SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node
|
||||
WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive;
|
||||
|
||||
IF primary_disabled_worker_node_count != 0 THEN
|
||||
RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.'
|
||||
'Use SELECT citus_activate_node() to activate the disabled nodes';
|
||||
ELSE
|
||||
RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 4: Ensure there is no connectivity issues in the cluster
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
all_nodes_can_connect_to_each_other boolean := False;
|
||||
BEGIN
|
||||
SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health();
|
||||
|
||||
IF all_nodes_can_connect_to_each_other != True THEN
|
||||
RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all '
|
||||
'nodes are up and runnnig. Also, make sure that all nodes can connect '
|
||||
'to each other. Use SELECT * FROM citus_check_cluster_node_health(); '
|
||||
'to check the cluster health';
|
||||
ELSE
|
||||
RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 5: Ensure all nodes are on the same version
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
coordinator_version text := '';
|
||||
worker_node_version text := '';
|
||||
worker_node_version_count int := 0;
|
||||
|
||||
BEGIN
|
||||
SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus';
|
||||
|
||||
-- first, check if all nodes have the same versions
|
||||
SELECT
|
||||
count(*) INTO worker_node_version_count
|
||||
FROM
|
||||
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';')
|
||||
GROUP BY result;
|
||||
IF enforce_version_check AND worker_node_version_count != 1 THEN
|
||||
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
|
||||
'the some of the workers has version different versions';
|
||||
ELSE
|
||||
RAISE DEBUG 'All worker nodes have the same Citus version';
|
||||
END IF;
|
||||
|
||||
-- second, check if all nodes have the same versions
|
||||
SELECT
|
||||
result INTO worker_node_version
|
||||
FROM
|
||||
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';')
|
||||
GROUP BY result;
|
||||
|
||||
IF enforce_version_check AND coordinator_version != worker_node_version THEN
|
||||
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
|
||||
'the coordinator has version % and the worker(s) has %',
|
||||
coordinator_version, worker_node_version;
|
||||
ELSE
|
||||
RAISE DEBUG 'All nodes have the same Citus version';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 6: Ensure all the partitioned tables have the proper naming structure
|
||||
-- As described on https://github.com/citusdata/citus/issues/4962
|
||||
-- existing indexes on partitioned distributed tables can collide
|
||||
-- with the index names exists on the shards
|
||||
-- luckily, we know how to fix it.
|
||||
-- And, note that we should do this even if the cluster is a basic plan
|
||||
-- (e.g., single node Citus) such that when cluster scaled out, everything
|
||||
-- works as intended
|
||||
-- And, this should be done only ONCE for a cluster as it can be a pretty
|
||||
-- time consuming operation. Thus, even if the function is called multiple time,
|
||||
-- we keep track of it and do not re-execute this part if not needed.
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
partitioned_table_exists_pre_11 boolean:=False;
|
||||
BEGIN
|
||||
|
||||
-- we recorded if partitioned tables exists during upgrade to Citus 11
|
||||
SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11
|
||||
FROM pg_dist_node_metadata;
|
||||
|
||||
IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN
|
||||
|
||||
-- this might take long depending on the number of partitions and shards...
|
||||
RAISE NOTICE 'Preparing all the existing partitioned table indexes';
|
||||
PERFORM pg_catalog.fix_all_partition_shard_index_names();
|
||||
|
||||
-- great, we are done with fixing the existing wrong index names
|
||||
-- so, lets remove this
|
||||
UPDATE pg_dist_node_metadata
|
||||
SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11');
|
||||
ELSE
|
||||
RAISE DEBUG 'There are no partitioned tables that should be fixed';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 7: Return early if there are no primary worker nodes
|
||||
-- We don't strictly need this step, but it gives a nicer notice message
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
primary_worker_node_count bigint :=0;
|
||||
BEGIN
|
||||
SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
|
||||
|
||||
IF primary_worker_node_count = 0 THEN
|
||||
RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node';
|
||||
RETURN true;
|
||||
ELSE
|
||||
RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count;
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 8: Do the actual metadata & object syncing to the worker nodes
|
||||
-- For the "already synced" metadata nodes, we do not strictly need to
|
||||
-- sync the objects & metadata, but there is no harm to do it anyway
|
||||
-- it'll only cost some execution time but makes sure that we have a
|
||||
-- a consistent metadata & objects across all the nodes
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
BEGIN
|
||||
|
||||
-- this might take long depending on the number of tables & objects ...
|
||||
RAISE NOTICE 'Preparing to sync the metadata to all nodes';
|
||||
|
||||
PERFORM start_metadata_sync_to_node(nodename,nodeport)
|
||||
FROM
|
||||
pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
|
||||
END;
|
||||
|
||||
RETURN true;
|
||||
END;
|
||||
$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool)
|
||||
IS 'finalizes upgrade to Citus';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC;
|
|
@ -0,0 +1,224 @@
|
|||
-- citus_finalize_upgrade_to_citus11() is a helper UDF ensures
|
||||
-- the upgrade to Citus 11 is finished successfully. Upgrade to
|
||||
-- Citus 11 requires all active primary worker nodes to get the
|
||||
-- metadata. And, this function's job is to sync the metadata to
|
||||
-- the nodes that does not already have
|
||||
-- once the function finishes without any errors and returns true
|
||||
-- the cluster is ready for running distributed queries from
|
||||
-- the worker nodes. When debug is enabled, the function provides
|
||||
-- more information to the user.
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(enforce_version_check bool default true)
|
||||
RETURNS bool
|
||||
LANGUAGE plpgsql
|
||||
AS $$
|
||||
BEGIN
|
||||
|
||||
---------------------------------------------
|
||||
-- This script consists of N stages
|
||||
-- Each step is documented, and if log level
|
||||
-- is reduced to DEBUG1, each step is logged
|
||||
-- as well
|
||||
---------------------------------------------
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 0: Ensure no concurrent node metadata changing operation happens while this
|
||||
-- script is running via acquiring a strong lock on the pg_dist_node
|
||||
------------------------------------------------------------------------------------------
|
||||
BEGIN
|
||||
LOCK TABLE pg_dist_node IN EXCLUSIVE MODE NOWAIT;
|
||||
|
||||
EXCEPTION WHEN OTHERS THEN
|
||||
RAISE 'Another node metadata changing operation is in progress, try again.';
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 1: We want all the commands to run in the same transaction block. Without
|
||||
-- sequential mode, metadata syncing cannot be done in a transaction block along with
|
||||
-- other commands
|
||||
------------------------------------------------------------------------------------------
|
||||
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 2: Ensure we have the prerequisites
|
||||
-- (a) only superuser can run this script
|
||||
-- (b) cannot be executed when enable_ddl_propagation is False
|
||||
-- (c) can only be executed from the coordinator
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
is_superuser_running boolean := False;
|
||||
enable_ddl_prop boolean:= False;
|
||||
local_group_id int := 0;
|
||||
BEGIN
|
||||
SELECT rolsuper INTO is_superuser_running FROM pg_roles WHERE rolname = current_user;
|
||||
IF is_superuser_running IS NOT True THEN
|
||||
RAISE EXCEPTION 'This operation can only be initiated by superuser';
|
||||
END IF;
|
||||
|
||||
SELECT current_setting('citus.enable_ddl_propagation') INTO enable_ddl_prop;
|
||||
IF enable_ddl_prop IS NOT True THEN
|
||||
RAISE EXCEPTION 'This operation cannot be completed when citus.enable_ddl_propagation is False.';
|
||||
END IF;
|
||||
|
||||
SELECT groupid INTO local_group_id FROM pg_dist_local_group;
|
||||
|
||||
IF local_group_id != 0 THEN
|
||||
RAISE EXCEPTION 'Operation is not allowed on this node. Connect to the coordinator and run it again.';
|
||||
ELSE
|
||||
RAISE DEBUG 'We are on the coordinator, continue to sync metadata';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 3: Ensure all primary nodes are active
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
primary_disabled_worker_node_count int := 0;
|
||||
BEGIN
|
||||
SELECT count(*) INTO primary_disabled_worker_node_count FROM pg_dist_node
|
||||
WHERE groupid != 0 AND noderole = 'primary' AND NOT isactive;
|
||||
|
||||
IF primary_disabled_worker_node_count != 0 THEN
|
||||
RAISE EXCEPTION 'There are inactive primary worker nodes, you need to activate the nodes first.'
|
||||
'Use SELECT citus_activate_node() to activate the disabled nodes';
|
||||
ELSE
|
||||
RAISE DEBUG 'There are no disabled worker nodes, continue to sync metadata';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 4: Ensure there is no connectivity issues in the cluster
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
all_nodes_can_connect_to_each_other boolean := False;
|
||||
BEGIN
|
||||
SELECT bool_and(coalesce(result, false)) INTO all_nodes_can_connect_to_each_other FROM citus_check_cluster_node_health();
|
||||
|
||||
IF all_nodes_can_connect_to_each_other != True THEN
|
||||
RAISE EXCEPTION 'There are unhealth primary nodes, you need to ensure all '
|
||||
'nodes are up and runnnig. Also, make sure that all nodes can connect '
|
||||
'to each other. Use SELECT * FROM citus_check_cluster_node_health(); '
|
||||
'to check the cluster health';
|
||||
ELSE
|
||||
RAISE DEBUG 'Cluster is healthy, all nodes can connect to each other';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 5: Ensure all nodes are on the same version
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
coordinator_version text := '';
|
||||
worker_node_version text := '';
|
||||
worker_node_version_count int := 0;
|
||||
|
||||
BEGIN
|
||||
SELECT extversion INTO coordinator_version from pg_extension WHERE extname = 'citus';
|
||||
|
||||
-- first, check if all nodes have the same versions
|
||||
SELECT
|
||||
count(*) INTO worker_node_version_count
|
||||
FROM
|
||||
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';')
|
||||
GROUP BY result;
|
||||
IF enforce_version_check AND worker_node_version_count != 1 THEN
|
||||
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
|
||||
'the some of the workers has version different versions';
|
||||
ELSE
|
||||
RAISE DEBUG 'All worker nodes have the same Citus version';
|
||||
END IF;
|
||||
|
||||
-- second, check if all nodes have the same versions
|
||||
SELECT
|
||||
result INTO worker_node_version
|
||||
FROM
|
||||
run_command_on_workers('SELECT extversion from pg_extension WHERE extname = ''citus'';')
|
||||
GROUP BY result;
|
||||
|
||||
IF enforce_version_check AND coordinator_version != worker_node_version THEN
|
||||
RAISE EXCEPTION 'All nodes should have the same Citus version installed. Currently '
|
||||
'the coordinator has version % and the worker(s) has %',
|
||||
coordinator_version, worker_node_version;
|
||||
ELSE
|
||||
RAISE DEBUG 'All nodes have the same Citus version';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 6: Ensure all the partitioned tables have the proper naming structure
|
||||
-- As described on https://github.com/citusdata/citus/issues/4962
|
||||
-- existing indexes on partitioned distributed tables can collide
|
||||
-- with the index names exists on the shards
|
||||
-- luckily, we know how to fix it.
|
||||
-- And, note that we should do this even if the cluster is a basic plan
|
||||
-- (e.g., single node Citus) such that when cluster scaled out, everything
|
||||
-- works as intended
|
||||
-- And, this should be done only ONCE for a cluster as it can be a pretty
|
||||
-- time consuming operation. Thus, even if the function is called multiple time,
|
||||
-- we keep track of it and do not re-execute this part if not needed.
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
partitioned_table_exists_pre_11 boolean:=False;
|
||||
BEGIN
|
||||
|
||||
-- we recorded if partitioned tables exists during upgrade to Citus 11
|
||||
SELECT metadata->>'partitioned_citus_table_exists_pre_11' INTO partitioned_table_exists_pre_11
|
||||
FROM pg_dist_node_metadata;
|
||||
|
||||
IF partitioned_table_exists_pre_11 IS NOT NULL AND partitioned_table_exists_pre_11 THEN
|
||||
|
||||
-- this might take long depending on the number of partitions and shards...
|
||||
RAISE NOTICE 'Preparing all the existing partitioned table indexes';
|
||||
PERFORM pg_catalog.fix_all_partition_shard_index_names();
|
||||
|
||||
-- great, we are done with fixing the existing wrong index names
|
||||
-- so, lets remove this
|
||||
UPDATE pg_dist_node_metadata
|
||||
SET metadata=jsonb_delete(metadata, 'partitioned_citus_table_exists_pre_11');
|
||||
ELSE
|
||||
RAISE DEBUG 'There are no partitioned tables that should be fixed';
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 7: Return early if there are no primary worker nodes
|
||||
-- We don't strictly need this step, but it gives a nicer notice message
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
primary_worker_node_count bigint :=0;
|
||||
BEGIN
|
||||
SELECT count(*) INTO primary_worker_node_count FROM pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
|
||||
|
||||
IF primary_worker_node_count = 0 THEN
|
||||
RAISE NOTICE 'There are no primary worker nodes, no need to sync metadata to any node';
|
||||
RETURN true;
|
||||
ELSE
|
||||
RAISE DEBUG 'There are % primary worker nodes, continue to sync metadata', primary_worker_node_count;
|
||||
END IF;
|
||||
END;
|
||||
|
||||
------------------------------------------------------------------------------------------
|
||||
-- STAGE 8: Do the actual metadata & object syncing to the worker nodes
|
||||
-- For the "already synced" metadata nodes, we do not strictly need to
|
||||
-- sync the objects & metadata, but there is no harm to do it anyway
|
||||
-- it'll only cost some execution time but makes sure that we have a
|
||||
-- a consistent metadata & objects across all the nodes
|
||||
------------------------------------------------------------------------------------------
|
||||
DECLARE
|
||||
BEGIN
|
||||
|
||||
-- this might take long depending on the number of tables & objects ...
|
||||
RAISE NOTICE 'Preparing to sync the metadata to all nodes';
|
||||
|
||||
PERFORM start_metadata_sync_to_node(nodename,nodeport)
|
||||
FROM
|
||||
pg_dist_node WHERE groupid != 0 AND noderole = 'primary';
|
||||
END;
|
||||
|
||||
RETURN true;
|
||||
END;
|
||||
$$;
|
||||
COMMENT ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool)
|
||||
IS 'finalizes upgrade to Citus';
|
||||
|
||||
REVOKE ALL ON FUNCTION pg_catalog.citus_finalize_upgrade_to_citus11(bool) FROM PUBLIC;
|
|
@ -75,6 +75,7 @@ extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
|
|||
attnum);
|
||||
extern List * GetDependentFunctionsWithRelation(Oid relationId);
|
||||
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
|
||||
extern void SetLocalEnableMetadataSync(bool state);
|
||||
|
||||
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
||||
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
|
||||
|
|
|
@ -4,3 +4,4 @@ test: upgrade_basic_after
|
|||
test: upgrade_partition_constraints_after
|
||||
test: upgrade_pg_dist_object_test_after
|
||||
test: upgrade_columnar_metapage_after
|
||||
test: upgrade_post_11_after
|
||||
|
|
|
@ -4,3 +4,4 @@ test: upgrade_basic_before
|
|||
test: upgrade_partition_constraints_before
|
||||
test: upgrade_pg_dist_object_test_before
|
||||
test: upgrade_columnar_metapage_before
|
||||
test: upgrade_post_11_before
|
||||
|
|
|
@ -966,7 +966,7 @@ DELETE FROM pg_dist_shard WHERE shardid = 1;
|
|||
CREATE TABLE e_transactions(order_id varchar(255) NULL, transaction_id int) PARTITION BY LIST(transaction_id);
|
||||
CREATE TABLE orders_2020_07_01
|
||||
PARTITION OF e_transactions FOR VALUES IN (1,2,3);
|
||||
INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', NULL, 7, 's');
|
||||
INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', '{VAR :varno 1 :varattno 1 :vartype 1043 :vartypmod 259 :varcollid 100 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}', 7, 's');
|
||||
SELECT
|
||||
(metadata->>'partitioned_citus_table_exists_pre_11')::boolean as partitioned_citus_table_exists_pre_11,
|
||||
(metadata->>'partitioned_citus_table_exists_pre_11') IS NULL as is_null
|
||||
|
@ -1012,6 +1012,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_check_cluster_node_health() SETOF record
|
||||
| function citus_check_connection_to_node(text,integer) boolean
|
||||
| function citus_disable_node(text,integer,boolean) void
|
||||
| function citus_finalize_upgrade_to_citus11(boolean) boolean
|
||||
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void
|
||||
| function citus_internal_global_blocked_processes() SETOF record
|
||||
| function citus_internal_local_blocked_processes() SETOF record
|
||||
|
@ -1025,7 +1026,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function worker_drop_sequence_dependency(text) void
|
||||
| function worker_drop_shell_table(text) void
|
||||
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
|
||||
(22 rows)
|
||||
(23 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -57,6 +57,7 @@ ORDER BY 1;
|
|||
function citus_drop_trigger()
|
||||
function citus_executor_name(integer)
|
||||
function citus_extradata_container(internal)
|
||||
function citus_finalize_upgrade_to_citus11(boolean)
|
||||
function citus_finish_pg_upgrade()
|
||||
function citus_get_active_worker_nodes()
|
||||
function citus_internal.columnar_ensure_am_depends_catalog()
|
||||
|
@ -272,5 +273,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(256 rows)
|
||||
(257 rows)
|
||||
|
||||
|
|
|
@ -39,12 +39,38 @@ drop cascades to table upgrade_basic.t_range
|
|||
SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3;
|
||||
type | object_names | object_args
|
||||
---------------------------------------------------------------------
|
||||
database | {postgres} | {}
|
||||
extension | {isn} | {}
|
||||
role | {postgres} | {}
|
||||
schema | {fooschema} | {}
|
||||
schema | {new_schema} | {}
|
||||
schema | {public} | {}
|
||||
type | {fooschema.footype} | {}
|
||||
(7 rows)
|
||||
collation | {post_11_upgrade,german_phonebook_unpropagated} | {}
|
||||
database | {postgres} | {}
|
||||
extension | {isn} | {}
|
||||
extension | {plpgsql} | {}
|
||||
function | {post_11_upgrade,func_in_transaction_def} | {}
|
||||
role | {postgres} | {}
|
||||
schema | {fooschema} | {}
|
||||
schema | {new_schema} | {}
|
||||
schema | {post_11_upgrade} | {}
|
||||
schema | {public} | {}
|
||||
table | {fooschema,footable} | {}
|
||||
table | {new_schema,another_dist_table} | {}
|
||||
table | {post_11_upgrade,colocated_dist_table} | {}
|
||||
table | {post_11_upgrade,colocated_partitioned_table} | {}
|
||||
table | {post_11_upgrade,colocated_partitioned_table_2020_01_01} | {}
|
||||
table | {post_11_upgrade,dist} | {}
|
||||
table | {post_11_upgrade,index_backed_rep_identity} | {}
|
||||
table | {post_11_upgrade,part_table} | {}
|
||||
table | {post_11_upgrade,part_table_p202008} | {}
|
||||
table | {post_11_upgrade,part_table_p202009} | {}
|
||||
table | {post_11_upgrade,reference_table} | {}
|
||||
table | {post_11_upgrade,sensors} | {}
|
||||
table | {post_11_upgrade,sensors_2020_01_01} | {}
|
||||
table | {post_11_upgrade,sensors_news} | {}
|
||||
table | {post_11_upgrade,sensors_old} | {}
|
||||
table | {post_11_upgrade,sensors_parser} | {}
|
||||
table | {post_11_upgrade,sensors_parser_a_partition} | {}
|
||||
table | {post_11_upgrade,test_propagate_collate} | {}
|
||||
table | {public,dist_table} | {}
|
||||
table | {public,isn_dist_table} | {}
|
||||
text search configuration | {post_11_upgrade,partial_index_test_config} | {}
|
||||
type | {fooschema.footype} | {}
|
||||
type | {post_11_upgrade.my_type} | {}
|
||||
(33 rows)
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
-- run this test only when old citus version is 9.0
|
||||
\set upgrade_test_old_citus_version `echo "$CITUS_OLD_VERSION"`
|
||||
SELECT substring(:'upgrade_test_old_citus_version', 'v(\d+)\.\d+\.\d+')::int = 9 AND
|
||||
substring(:'upgrade_test_old_citus_version', 'v\d+\.(\d+)\.\d+')::int = 0
|
||||
AS upgrade_test_old_citus_version_e_9_0;
|
||||
upgrade_test_old_citus_version_e_9_0
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
\gset
|
||||
\if :upgrade_test_old_citus_version_e_9_0
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
-- drop objects from previous test (uprade_basic_after.sql) for a clean test
|
||||
-- drop upgrade_basic schema and switch back to public schema
|
||||
SET search_path to public;
|
||||
DROP SCHEMA upgrade_basic CASCADE;
|
||||
NOTICE: drop cascades to 7 other objects
|
||||
DETAIL: drop cascades to table upgrade_basic.t
|
||||
drop cascades to table upgrade_basic.tp
|
||||
drop cascades to table upgrade_basic.t_ab
|
||||
drop cascades to table upgrade_basic.t2
|
||||
drop cascades to table upgrade_basic.r
|
||||
drop cascades to table upgrade_basic.tr
|
||||
drop cascades to table upgrade_basic.t_range
|
||||
-- as we updated citus to available version,
|
||||
-- "isn" extension
|
||||
-- "new_schema" schema
|
||||
-- "public" schema
|
||||
-- "fooschema" schema
|
||||
-- "footype" type (under schema 'fooschema')
|
||||
-- will now be marked as distributed
|
||||
-- but,
|
||||
-- "seg" extension
|
||||
-- will not be marked as distributed
|
||||
-- see underlying objects
|
||||
SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3;
|
||||
type | object_names | object_args
|
||||
---------------------------------------------------------------------
|
||||
database | {postgres} | {}
|
||||
extension | {isn} | {}
|
||||
role | {postgres} | {}
|
||||
schema | {fooschema} | {}
|
||||
schema | {new_schema} | {}
|
||||
schema | {public} | {}
|
||||
table | {fooschema,footable} | {}
|
||||
table | {new_schema,another_dist_table} | {}
|
||||
table | {public,dist_table} | {}
|
||||
table | {public,isn_dist_table} | {}
|
||||
type | {fooschema.footype} | {}
|
||||
(11 rows)
|
||||
|
|
@ -0,0 +1,76 @@
|
|||
SET search_path = post_11_upgrade;
|
||||
-- make sure that we always (re)sync the metadata
|
||||
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true);
|
||||
SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false);
|
||||
NOTICE: Preparing all the existing partitioned table indexes
|
||||
NOTICE: Preparing to sync the metadata to all nodes
|
||||
citus_finalize_upgrade_to_citus11
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
-- tables are objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
||||
pg_identify_object_as_address
|
||||
---------------------------------------------------------------------
|
||||
(function,"{post_11_upgrade,func_in_transaction_def}",{})
|
||||
(schema,{post_11_upgrade},{})
|
||||
(table,"{post_11_upgrade,part_table}",{})
|
||||
(table,"{post_11_upgrade,sensors}",{})
|
||||
("text search configuration","{post_11_upgrade,partial_index_test_config}",{})
|
||||
(type,{post_11_upgrade.my_type},{})
|
||||
(6 rows)
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(2 rows)
|
||||
|
||||
-- Create the necessary test utility function
|
||||
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
||||
RETURNS text[]
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
-- make sure that workers and the coordinator has the same datesyle
|
||||
SET datestyle = "ISO, YMD";
|
||||
SELECT 1 FROM run_command_on_workers($$ALTER SYSTEM SET datestyle = "ISO, YMD";$$);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
-- make sure that the metadata is consistent across all nodes
|
||||
-- we exclude the distributed_object_data as they are
|
||||
-- not sorted in the same order (as OIDs differ on the nodes)
|
||||
SELECT count(*) = 0 AS same_metadata_in_workers FROM
|
||||
(
|
||||
(
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
EXCEPT
|
||||
SELECT unnest(result::text[]) AS command
|
||||
FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$)
|
||||
)
|
||||
UNION
|
||||
(
|
||||
SELECT unnest(result::text[]) AS command
|
||||
FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$)
|
||||
EXCEPT
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
)
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%';
|
||||
same_metadata_in_workers
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
|
@ -0,0 +1,184 @@
|
|||
-- test cases for #3970
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE SCHEMA post_11_upgrade;
|
||||
SET search_path = post_11_upgrade;
|
||||
--1. create a partitioned table, and a vanilla table that will be colocated with this table
|
||||
CREATE TABLE part_table (
|
||||
work_ymdt timestamp without time zone NOT NULL,
|
||||
seq bigint NOT NULL,
|
||||
my_seq bigint NOT NULL,
|
||||
work_memo character varying(150),
|
||||
CONSTRAINT work_memo_check CHECK ((octet_length((work_memo)::text) <= 150)),
|
||||
PRIMARY KEY(seq, work_ymdt)
|
||||
)
|
||||
PARTITION BY RANGE (work_ymdt);
|
||||
CREATE TABLE dist(seq bigint UNIQUE);
|
||||
--2. perform create_distributed_table
|
||||
SELECT create_distributed_table('part_table', 'seq');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist','seq');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
--3. add a partitions
|
||||
CREATE TABLE part_table_p202008 PARTITION OF part_table FOR VALUES FROM ('2020-08-01 00:00:00') TO ('2020-09-01 00:00:00');
|
||||
CREATE TABLE part_table_p202009 PARTITION OF part_table FOR VALUES FROM ('2020-09-01 00:00:00') TO ('2020-10-01 00:00:00');
|
||||
--3. create indexes
|
||||
CREATE INDEX i_part_1 ON part_table(seq);
|
||||
CREATE INDEX i_part_2 ON part_table(my_seq, seq);
|
||||
CREATE INDEX i_part_3 ON part_table(work_memo, seq);
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
|
||||
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
|
||||
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
|
||||
CREATE INDEX index_on_parent ON sensors(lower(measureid::text));
|
||||
CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text));
|
||||
CREATE INDEX hash_index ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime);
|
||||
CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors;
|
||||
CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01;
|
||||
ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
CLUSTER sensors_2020_01_01 USING index_on_child;
|
||||
SELECT create_distributed_table('sensors', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- create a colocated distributed tables and create foreign keys FROM/TO
|
||||
-- the partitions
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
WARNING: not propagating CLUSTER command to worker nodes
|
||||
CREATE TABLE colocated_partitioned_table(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
PRIMARY KEY (measureid, eventdatetime))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
|
||||
SELECT create_distributed_table('colocated_partitioned_table', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey;
|
||||
WARNING: not propagating CLUSTER command to worker nodes
|
||||
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_reference_table('reference_table');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- this table is used to make sure that index backed
|
||||
-- replica identites can have clustered indexes
|
||||
-- and no index statistics
|
||||
CREATE TABLE index_backed_rep_identity(key int NOT NULL);
|
||||
CREATE UNIQUE INDEX uqx ON index_backed_rep_identity(key);
|
||||
ALTER TABLE index_backed_rep_identity REPLICA IDENTITY USING INDEX uqx;
|
||||
CLUSTER index_backed_rep_identity USING uqx;
|
||||
SELECT create_distributed_table('index_backed_rep_identity', 'key');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- from parent to regular dist
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
|
||||
-- from parent to parent
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime);
|
||||
-- from parent to child
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime);
|
||||
-- load some data
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||
SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def()
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
BEGIN
|
||||
return 1;
|
||||
END;
|
||||
$$;
|
||||
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||
CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def()
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
BEGIN
|
||||
return 1;
|
||||
END;
|
||||
$$;');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,SET)
|
||||
(localhost,57637,t,SET)
|
||||
(2 rows)
|
||||
|
||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||
RESET citus.enable_ddl_propagation;
|
||||
CREATE TABLE sensors_parser(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
name text,
|
||||
col_with_def int DEFAULT post_11_upgrade.func_in_transaction_def(),
|
||||
col_with_type post_11_upgrade.my_type,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data)
|
||||
) PARTITION BY RANGE(eventdatetime);
|
||||
CREATE TABLE sensors_parser_a_partition PARTITION OF sensors_parser FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
|
||||
CREATE INDEX sensors_parser_search_name ON sensors_parser USING gin (to_tsvector('partial_index_test_config'::regconfig, (COALESCE(name, ''::character varying))::text));
|
||||
SELECT create_distributed_table('sensors_parser', 'measureid');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');
|
||||
SELECT 1 FROM run_command_on_workers($$CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');$$);
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
SET citus.enable_ddl_propagation TO on;
|
||||
CREATE TABLE test_propagate_collate(id int, t2 text COLLATE german_phonebook_unpropagated);
|
||||
SELECT create_distributed_table('test_propagate_collate', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
|
@ -430,7 +430,7 @@ DELETE FROM pg_dist_shard WHERE shardid = 1;
|
|||
CREATE TABLE e_transactions(order_id varchar(255) NULL, transaction_id int) PARTITION BY LIST(transaction_id);
|
||||
CREATE TABLE orders_2020_07_01
|
||||
PARTITION OF e_transactions FOR VALUES IN (1,2,3);
|
||||
INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', NULL, 7, 's');
|
||||
INSERT INTO pg_dist_partition VALUES ('e_transactions'::regclass,'h', '{VAR :varno 1 :varattno 1 :vartype 1043 :vartypmod 259 :varcollid 100 :varlevelsup 0 :varnosyn 1 :varattnosyn 1 :location -1}', 7, 's');
|
||||
|
||||
SELECT
|
||||
(metadata->>'partitioned_citus_table_exists_pre_11')::boolean as partitioned_citus_table_exists_pre_11,
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
SET search_path = post_11_upgrade;
|
||||
|
||||
-- make sure that we always (re)sync the metadata
|
||||
UPDATE pg_dist_node_metadata SET metadata=jsonb_set(metadata, '{partitioned_citus_table_exists_pre_11}', to_jsonb('true'::bool), true);
|
||||
SELECT citus_finalize_upgrade_to_citus11(enforce_version_check:=false);
|
||||
|
||||
-- tables are objects with Citus 11+
|
||||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM citus.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype) ORDER BY 1;$$) ORDER BY 1;
|
||||
|
||||
-- Create the necessary test utility function
|
||||
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
||||
RETURNS text[]
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
-- make sure that workers and the coordinator has the same datesyle
|
||||
SET datestyle = "ISO, YMD";
|
||||
SELECT 1 FROM run_command_on_workers($$ALTER SYSTEM SET datestyle = "ISO, YMD";$$);
|
||||
SELECT 1 FROM run_command_on_workers($$SELECT pg_reload_conf()$$);
|
||||
|
||||
-- make sure that the metadata is consistent across all nodes
|
||||
-- we exclude the distributed_object_data as they are
|
||||
-- not sorted in the same order (as OIDs differ on the nodes)
|
||||
SELECT count(*) = 0 AS same_metadata_in_workers FROM
|
||||
(
|
||||
(
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
EXCEPT
|
||||
SELECT unnest(result::text[]) AS command
|
||||
FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$)
|
||||
)
|
||||
UNION
|
||||
(
|
||||
SELECT unnest(result::text[]) AS command
|
||||
FROM run_command_on_workers($$SELECT post_11_upgrade.activate_node_snapshot()$$)
|
||||
EXCEPT
|
||||
SELECT unnest(activate_node_snapshot()) as command
|
||||
)
|
||||
) AS foo WHERE command NOT ILIKE '%distributed_object_data%';
|
|
@ -0,0 +1,155 @@
|
|||
|
||||
-- test cases for #3970
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE SCHEMA post_11_upgrade;
|
||||
SET search_path = post_11_upgrade;
|
||||
|
||||
--1. create a partitioned table, and a vanilla table that will be colocated with this table
|
||||
CREATE TABLE part_table (
|
||||
work_ymdt timestamp without time zone NOT NULL,
|
||||
seq bigint NOT NULL,
|
||||
my_seq bigint NOT NULL,
|
||||
work_memo character varying(150),
|
||||
CONSTRAINT work_memo_check CHECK ((octet_length((work_memo)::text) <= 150)),
|
||||
PRIMARY KEY(seq, work_ymdt)
|
||||
)
|
||||
PARTITION BY RANGE (work_ymdt);
|
||||
|
||||
CREATE TABLE dist(seq bigint UNIQUE);
|
||||
|
||||
--2. perform create_distributed_table
|
||||
SELECT create_distributed_table('part_table', 'seq');
|
||||
SELECT create_distributed_table('dist','seq');
|
||||
|
||||
--3. add a partitions
|
||||
CREATE TABLE part_table_p202008 PARTITION OF part_table FOR VALUES FROM ('2020-08-01 00:00:00') TO ('2020-09-01 00:00:00');
|
||||
CREATE TABLE part_table_p202009 PARTITION OF part_table FOR VALUES FROM ('2020-09-01 00:00:00') TO ('2020-10-01 00:00:00');
|
||||
|
||||
--3. create indexes
|
||||
CREATE INDEX i_part_1 ON part_table(seq);
|
||||
CREATE INDEX i_part_2 ON part_table(my_seq, seq);
|
||||
CREATE INDEX i_part_3 ON part_table(work_memo, seq);
|
||||
|
||||
|
||||
CREATE TABLE sensors(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
|
||||
CREATE TABLE sensors_old PARTITION OF sensors FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
|
||||
CREATE TABLE sensors_2020_01_01 PARTITION OF sensors FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
|
||||
CREATE TABLE sensors_news PARTITION OF sensors FOR VALUES FROM ('2020-05-01') TO ('2025-01-01');
|
||||
|
||||
CREATE INDEX index_on_parent ON sensors(lower(measureid::text));
|
||||
CREATE INDEX index_on_child ON sensors_2020_01_01(lower(measure_data::text));
|
||||
CREATE INDEX hash_index ON sensors USING HASH((measure_data->'IsFailed'));
|
||||
CREATE INDEX index_with_include ON sensors ((measure_data->'IsFailed')) INCLUDE (measure_data, eventdatetime);
|
||||
|
||||
CREATE STATISTICS s1 (dependencies) ON measureid, eventdatetime FROM sensors;
|
||||
CREATE STATISTICS s2 (dependencies) ON measureid, eventdatetime FROM sensors_2020_01_01;
|
||||
|
||||
ALTER INDEX index_on_parent ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
ALTER INDEX index_on_child ALTER COLUMN 1 SET STATISTICS 1000;
|
||||
|
||||
CLUSTER sensors_2020_01_01 USING index_on_child;
|
||||
SELECT create_distributed_table('sensors', 'measureid');
|
||||
|
||||
|
||||
-- create a colocated distributed tables and create foreign keys FROM/TO
|
||||
-- the partitions
|
||||
CREATE TABLE colocated_dist_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_distributed_table('colocated_dist_table', 'measureid');
|
||||
|
||||
CLUSTER colocated_dist_table USING colocated_dist_table_pkey;
|
||||
|
||||
CREATE TABLE colocated_partitioned_table(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
PRIMARY KEY (measureid, eventdatetime))
|
||||
PARTITION BY RANGE(eventdatetime);
|
||||
|
||||
CREATE TABLE colocated_partitioned_table_2020_01_01 PARTITION OF colocated_partitioned_table FOR VALUES FROM ('2020-01-01') TO ('2020-02-01');
|
||||
SELECT create_distributed_table('colocated_partitioned_table', 'measureid');
|
||||
|
||||
CLUSTER colocated_partitioned_table_2020_01_01 USING colocated_partitioned_table_2020_01_01_pkey;
|
||||
|
||||
CREATE TABLE reference_table (measureid integer PRIMARY KEY);
|
||||
SELECT create_reference_table('reference_table');
|
||||
|
||||
-- this table is used to make sure that index backed
|
||||
-- replica identites can have clustered indexes
|
||||
-- and no index statistics
|
||||
CREATE TABLE index_backed_rep_identity(key int NOT NULL);
|
||||
CREATE UNIQUE INDEX uqx ON index_backed_rep_identity(key);
|
||||
ALTER TABLE index_backed_rep_identity REPLICA IDENTITY USING INDEX uqx;
|
||||
CLUSTER index_backed_rep_identity USING uqx;
|
||||
SELECT create_distributed_table('index_backed_rep_identity', 'key');
|
||||
|
||||
-- from parent to regular dist
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_dist FOREIGN KEY (measureid) REFERENCES colocated_dist_table(measureid);
|
||||
|
||||
-- from parent to parent
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_parent FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table(measureid, eventdatetime);
|
||||
|
||||
-- from parent to child
|
||||
ALTER TABLE sensors ADD CONSTRAINT fkey_from_parent_to_child FOREIGN KEY (measureid, eventdatetime) REFERENCES colocated_partitioned_table_2020_01_01(measureid, eventdatetime);
|
||||
|
||||
-- load some data
|
||||
INSERT INTO reference_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_dist_table SELECT i FROM generate_series(0,1000)i;
|
||||
INSERT INTO colocated_partitioned_table SELECT i, '2020-01-05' FROM generate_series(0,1000)i;
|
||||
INSERT INTO sensors SELECT i, '2020-01-05', '{}' FROM generate_series(0,1000)i;
|
||||
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );
|
||||
SELECT 1 FROM run_command_on_workers($$CREATE TEXT SEARCH CONFIGURATION post_11_upgrade.partial_index_test_config ( parser = default );$$);
|
||||
|
||||
CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def()
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
BEGIN
|
||||
return 1;
|
||||
END;
|
||||
$$;
|
||||
|
||||
SELECT run_command_on_workers('SET citus.enable_ddl_propagation TO off;
|
||||
CREATE OR REPLACE FUNCTION post_11_upgrade.func_in_transaction_def()
|
||||
RETURNS int
|
||||
LANGUAGE plpgsql AS
|
||||
$$
|
||||
BEGIN
|
||||
return 1;
|
||||
END;
|
||||
$$;');
|
||||
|
||||
CREATE TYPE post_11_upgrade.my_type AS (a int);
|
||||
|
||||
RESET citus.enable_ddl_propagation;
|
||||
|
||||
CREATE TABLE sensors_parser(
|
||||
measureid integer,
|
||||
eventdatetime date,
|
||||
measure_data jsonb,
|
||||
name text,
|
||||
col_with_def int DEFAULT post_11_upgrade.func_in_transaction_def(),
|
||||
col_with_type post_11_upgrade.my_type,
|
||||
PRIMARY KEY (measureid, eventdatetime, measure_data)
|
||||
) PARTITION BY RANGE(eventdatetime);
|
||||
CREATE TABLE sensors_parser_a_partition PARTITION OF sensors_parser FOR VALUES FROM ('2000-01-01') TO ('2020-01-01');
|
||||
CREATE INDEX sensors_parser_search_name ON sensors_parser USING gin (to_tsvector('partial_index_test_config'::regconfig, (COALESCE(name, ''::character varying))::text));
|
||||
SELECT create_distributed_table('sensors_parser', 'measureid');
|
||||
|
||||
|
||||
SET citus.enable_ddl_propagation TO off;
|
||||
CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');
|
||||
SELECT 1 FROM run_command_on_workers($$CREATE COLLATION post_11_upgrade.german_phonebook_unpropagated (provider = icu, locale = 'de-u-co-phonebk');$$);
|
||||
SET citus.enable_ddl_propagation TO on;
|
||||
|
||||
CREATE TABLE test_propagate_collate(id int, t2 text COLLATE german_phonebook_unpropagated);
|
||||
SELECT create_distributed_table('test_propagate_collate', 'id');
|
Loading…
Reference in New Issue