From 8ab696f7e2ce9d479370cbaa7c547b5e9424d81a Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 13 Jul 2022 14:08:49 +0200 Subject: [PATCH 01/12] LOCK COMMAND does not require primaries at the start --- src/backend/distributed/metadata/metadata_sync.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 2034c2e6f..aa6294ca2 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -2505,7 +2505,7 @@ SchemaOwnerName(Oid objectId) static bool HasMetadataWorkers(void) { - List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(NoLock); + List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) From b2e9a5baf15e6fbb461bd8f5476a947c6512be03 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 13 Jul 2022 14:11:18 +0200 Subject: [PATCH 02/12] Make sure citus_is_coordinator works on read replicas --- src/backend/distributed/metadata/node_metadata.c | 2 +- src/backend/distributed/operations/worker_node_manager.c | 6 +++--- src/include/distributed/worker_manager.h | 2 +- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 23d4d3f4d..5603189c5 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -1687,7 +1687,7 @@ citus_is_coordinator(PG_FUNCTION_ARGS) bool isCoordinator = false; if (GetLocalGroupId() == COORDINATOR_GROUP_ID && - ActivePrimaryNodeCount() > 0) + ActiveReadableNodeCount() > 0) { isCoordinator = true; } diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 16c0afb54..c516e27ef 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -94,12 +94,12 @@ ActivePrimaryNonCoordinatorNodeCount(void) /* - * ActivePrimaryNodeCount returns the number of groups with a primary in the cluster. + * ActiveReadableNodeCount returns the number of nodes in the cluster. */ uint32 -ActivePrimaryNodeCount(void) +ActiveReadableNodeCount(void) { - List *nodeList = ActivePrimaryNodeList(NoLock); + List *nodeList = ActiveReadableNodeList(); return list_length(nodeList); } diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index e861b8a65..cf1458047 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -72,7 +72,7 @@ extern WorkerNode * WorkerGetRoundRobinCandidateNode(List *workerNodeList, uint64 shardId, uint32 placementIndex); extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); -extern uint32 ActivePrimaryNodeCount(void); +extern uint32 ActiveReadableNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode); From 3c343d45633c95e249dfcb419dabc659e2b78066 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 13 Jul 2022 14:27:11 +0200 Subject: [PATCH 03/12] Add regression tests for LOCK command citus.use_secondary_nodes=always mode --- .../regress/expected/multi_follower_dml.out | 27 +++++++++++++++++++ src/test/regress/sql/multi_follower_dml.sql | 14 ++++++++++ 2 files changed, 41 insertions(+) diff --git a/src/test/regress/expected/multi_follower_dml.out b/src/test/regress/expected/multi_follower_dml.out index d6a5acd65..e343f0a6d 100644 --- a/src/test/regress/expected/multi_follower_dml.out +++ b/src/test/regress/expected/multi_follower_dml.out @@ -354,6 +354,33 @@ ERROR: writing to worker nodes is not currently allowed DETAIL: citus.use_secondary_nodes is set to 'always' SELECT * FROM citus_local_table ORDER BY a; ERROR: there is a shard placement in node group 0 but there are no nodes in that group +\c "port=57636 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" +-- when an existing read-replica is forked to become +-- another primary node, we sometimes have to use citus.use_secondary_nodes=always +-- even if the node is not in recovery mode. In those cases, allow LOCK +-- command on local / metadata tables, and also certain UDFs +SHOW citus.use_secondary_nodes; + citus.use_secondary_nodes +--------------------------------------------------------------------- + always +(1 row) + +SELECT pg_is_in_recovery(); + pg_is_in_recovery +--------------------------------------------------------------------- + f +(1 row) + +SELECT citus_is_coordinator(); + citus_is_coordinator +--------------------------------------------------------------------- + t +(1 row) + +BEGIN; + LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE; + LOCK TABLE local IN SHARE ROW EXCLUSIVE MODE; +COMMIT; \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; DROP TABLE reference_table; diff --git a/src/test/regress/sql/multi_follower_dml.sql b/src/test/regress/sql/multi_follower_dml.sql index dc03f258c..a3d548b12 100644 --- a/src/test/regress/sql/multi_follower_dml.sql +++ b/src/test/regress/sql/multi_follower_dml.sql @@ -163,6 +163,20 @@ SELECT * FROM reference_table ORDER BY a; INSERT INTO citus_local_table (a, b, z) VALUES (1, 2, 3); SELECT * FROM citus_local_table ORDER BY a; +\c "port=57636 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" + +-- when an existing read-replica is forked to become +-- another primary node, we sometimes have to use citus.use_secondary_nodes=always +-- even if the node is not in recovery mode. In those cases, allow LOCK +-- command on local / metadata tables, and also certain UDFs +SHOW citus.use_secondary_nodes; +SELECT pg_is_in_recovery(); +SELECT citus_is_coordinator(); +BEGIN; + LOCK TABLE pg_dist_node IN SHARE ROW EXCLUSIVE MODE; + LOCK TABLE local IN SHARE ROW EXCLUSIVE MODE; +COMMIT; + \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; DROP TABLE reference_table; From 6cd7319f12701afd0c20301f35066e9477bccfd6 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Wed, 13 Jul 2022 14:58:30 +0200 Subject: [PATCH 04/12] Add more generic read-replica tests --- .../multi_follower_select_statements.out | 19 +++++++++++++++++++ .../sql/multi_follower_select_statements.sql | 6 ++++++ 2 files changed, 25 insertions(+) diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 42c3058ee..3d0550c85 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -141,6 +141,25 @@ ORDER BY localhost | 9072 (2 rows) +-- basic helper utilities should work fine +SELECT citus_is_coordinator(); + citus_is_coordinator +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM citus_lock_waits; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT count(*) FROM citus_dist_stat_activity WHERE global_pid = citus_backend_gpid(); + count +--------------------------------------------------------------------- + 1 +(1 row) + -- okay, now let's play with nodecluster. If we change the cluster of our follower node -- queries should stat failing again, since there are no worker nodes in the new cluster \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index edaccc869..3b12ea467 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -89,6 +89,12 @@ FROM ORDER BY node_name, node_port; +-- basic helper utilities should work fine +SELECT citus_is_coordinator(); +SELECT count(*) FROM citus_lock_waits; +SELECT count(*) FROM citus_dist_stat_activity WHERE global_pid = citus_backend_gpid(); + + -- okay, now let's play with nodecluster. If we change the cluster of our follower node -- queries should stat failing again, since there are no worker nodes in the new cluster From 968bba1a7ebb01faecc866acafde847b074bc709 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Wed, 13 Jul 2022 18:12:01 +0300 Subject: [PATCH 05/12] Add changelog entry for 11.0.4 (#6060) --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index e27f92824..a628cefd9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,7 @@ +### citus v11.0.4 (July 13, 2022) ### + +* Fixes a bug that prevents promoting read-replicas as primaries + ### citus v11.0.3 (July 5, 2022) ### * Fixes a bug that prevents adding local tables with materialized views to From 1675519f9329dd8c784f2c9f36c441f168548d23 Mon Sep 17 00:00:00 2001 From: ywj Date: Wed, 13 Jul 2022 21:08:29 -0700 Subject: [PATCH 06/12] Support citus_columnar as separate extension (#5911) * Support upgrade and downgrade and separate columnar as citus_columnar extension Co-authored-by: Yanwen Jin Co-authored-by: Jeff Davis --- src/backend/columnar/Makefile | 12 + src/backend/columnar/citus_columnar.control | 6 + src/backend/columnar/columnar_tableam.c | 91 ++++ .../sql/citus_columnar--11.1-0--11.1-1.sql | 32 ++ .../columnar/sql/citus_columnar--11.1-0.sql | 1 + .../columnar/sql/citus_columnar--11.1-1.sql | 435 ++++++++++++++++++ .../columnar/sql/columnar--10.1-1--10.2-1.sql | 2 +- .../columnar/sql/columnar--10.2-3--10.2-4.sql | 2 +- .../citus_columnar--11.1-1--11.1-0.sql | 116 +++++ src/backend/distributed/citus--11.1-1.control | 1 + src/backend/distributed/commands/extension.c | 228 +++++++++ .../distributed/commands/utility_hook.c | 28 ++ .../distributed/sql/citus--10.0-1--10.0-2.sql | 14 +- .../distributed/sql/citus--10.0-4--10.1-1.sql | 14 +- .../distributed/sql/citus--10.1-1--10.2-1.sql | 16 +- .../distributed/sql/citus--10.2-1--10.2-2.sql | 14 +- .../distributed/sql/citus--10.2-2--10.2-3.sql | 13 +- .../distributed/sql/citus--10.2-3--10.2-4.sql | 13 +- .../distributed/sql/citus--11.0-3--11.1-1.sql | 54 ++- .../distributed/sql/citus--9.5-1--10.0-4.sql | 25 +- .../sql/downgrades/citus--11.1-1--11.0-3.sql | 18 +- .../udfs/citus_finish_pg_upgrade/11.1-1.sql | 151 ++++++ .../udfs/citus_finish_pg_upgrade/latest.sql | 2 +- src/include/columnar/columnar.h | 3 + src/include/columnar/columnar_tableam.h | 5 +- src/include/distributed/commands.h | 7 +- src/test/regress/Makefile | 2 +- src/test/regress/expected/columnar_drop.out | 6 +- src/test/regress/expected/multi_extension.out | 28 +- .../multi_fix_partition_shard_index_names.out | 24 +- .../upgrade_columnar_metapage_after.out | 2 +- .../expected/upgrade_list_citus_objects.out | 22 +- src/test/regress/sql/columnar_drop.sql | 6 +- src/test/regress/sql/multi_extension.sql | 4 + .../sql/upgrade_columnar_metapage_after.sql | 2 +- 35 files changed, 1338 insertions(+), 61 deletions(-) create mode 100644 src/backend/columnar/citus_columnar.control create mode 100644 src/backend/columnar/sql/citus_columnar--11.1-0--11.1-1.sql create mode 100644 src/backend/columnar/sql/citus_columnar--11.1-0.sql create mode 100644 src/backend/columnar/sql/citus_columnar--11.1-1.sql create mode 100644 src/backend/columnar/sql/downgrades/citus_columnar--11.1-1--11.1-0.sql create mode 100644 src/backend/distributed/citus--11.1-1.control create mode 100644 src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql diff --git a/src/backend/columnar/Makefile b/src/backend/columnar/Makefile index abda9c90d..bf17b98bf 100644 --- a/src/backend/columnar/Makefile +++ b/src/backend/columnar/Makefile @@ -8,10 +8,22 @@ OBJS += \ $(patsubst $(citus_abs_srcdir)/%.c,%.o,$(foreach dir,$(SUBDIRS), $(sort $(wildcard $(citus_abs_srcdir)/$(dir)/*.c)))) MODULE_big = citus_columnar +EXTENSION = citus_columnar + +columnar_sql_files = $(patsubst $(citus_abs_srcdir)/%,%,$(wildcard $(citus_abs_srcdir)/sql/*.sql)) +columnar_downgrade_sql_files = $(patsubst $(citus_abs_srcdir)/%,%,$(wildcard $(citus_abs_srcdir)/sql/downgrades/*.sql)) +DATA = $(columnar_sql_files) \ + $(columnar_downgrade_sql_files) PG_CPPFLAGS += -I$(libpq_srcdir) -I$(safestringlib_srcdir)/include +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else include $(citus_top_builddir)/Makefile.global .PHONY: install-all install-all: install +endif diff --git a/src/backend/columnar/citus_columnar.control b/src/backend/columnar/citus_columnar.control new file mode 100644 index 000000000..eaa349136 --- /dev/null +++ b/src/backend/columnar/citus_columnar.control @@ -0,0 +1,6 @@ +# Columnar extension +comment = 'Citus Columnar extension' +default_version = '11.1-1' +module_pathname = '$libdir/citus_columnar' +relocatable = false +schema = pg_catalog diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index ad6689087..333231dec 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -28,6 +28,7 @@ #include "catalog/pg_extension.h" #include "catalog/storage.h" #include "catalog/storage_xlog.h" +#include "commands/defrem.h" #include "commands/progress.h" #include "commands/vacuum.h" #include "commands/extension.h" @@ -2363,6 +2364,16 @@ ColumnarProcessUtility(PlannedStmt *pstmt, (errmsg("columnar storage parameters specified on non-columnar table"))); } + if (IsA(parsetree, CreateExtensionStmt)) + { + CheckCitusColumnarCreateExtensionStmt(parsetree); + } + + if (IsA(parsetree, AlterExtensionStmt)) + { + CheckCitusColumnarAlterExtensionStmt(parsetree); + } + PrevProcessUtilityHook_compat(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); @@ -2409,6 +2420,66 @@ IsColumnarTableAmTable(Oid relationId) } +/* + * CheckCitusColumnarCreateExtensionStmt determines whether can install + * citus_columnar per given CREATE extension statment + */ +void +CheckCitusColumnarCreateExtensionStmt(Node *parseTree) +{ + CreateExtensionStmt *createExtensionStmt = castNode(CreateExtensionStmt, + parseTree); + if (get_extension_oid("citus_columnar", true) == InvalidOid) + { + if (strcmp(createExtensionStmt->extname, "citus_columnar") == 0) + { + DefElem *newVersionValue = GetExtensionOption( + createExtensionStmt->options, + "new_version"); + + /*we are not allowed to install citus_columnar as version 11.1-0 by cx*/ + if (newVersionValue) + { + const char *newVersion = defGetString(newVersionValue); + if (strcmp(newVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "unsupported citus_columnar version 11.1-0"))); + } + } + } + } +} + + +/* + * CheckCitusColumnarAlterExtensionStmt determines whether can alter + * citus_columnar per given ALTER extension statment + */ +void +CheckCitusColumnarAlterExtensionStmt(Node *parseTree) +{ + AlterExtensionStmt *alterExtensionStmt = castNode(AlterExtensionStmt, parseTree); + if (strcmp(alterExtensionStmt->extname, "citus_columnar") == 0) + { + DefElem *newVersionValue = GetExtensionOption(alterExtensionStmt->options, + "new_version"); + + /*we are not allowed cx to downgrade citus_columnar to 11.1-0*/ + if (newVersionValue) + { + const char *newVersion = defGetString(newVersionValue); + if (strcmp(newVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported citus_columnar version 11.1-0"))); + } + } + } +} + + static const TableAmRoutine columnar_am_methods = { .type = T_TableAmRoutine, @@ -2984,3 +3055,23 @@ InstalledExtensionVersionColumnar(void) return installedExtensionVersion; } + + +/* + * GetExtensionOption returns DefElem * node with "defname" from "options" list + */ +DefElem * +GetExtensionOption(List *extensionOptions, const char *defname) +{ + DefElem *defElement = NULL; + foreach_ptr(defElement, extensionOptions) + { + if (IsA(defElement, DefElem) && + strncmp(defElement->defname, defname, NAMEDATALEN) == 0) + { + return defElement; + } + } + + return NULL; +} diff --git a/src/backend/columnar/sql/citus_columnar--11.1-0--11.1-1.sql b/src/backend/columnar/sql/citus_columnar--11.1-0--11.1-1.sql new file mode 100644 index 000000000..f7d318ae9 --- /dev/null +++ b/src/backend/columnar/sql/citus_columnar--11.1-0--11.1-1.sql @@ -0,0 +1,32 @@ +-- add columnar objects back +ALTER EXTENSION citus_columnar ADD SCHEMA columnar; +ALTER EXTENSION citus_columnar ADD SCHEMA columnar_internal; +ALTER EXTENSION citus_columnar ADD SEQUENCE columnar_internal.storageid_seq; +ALTER EXTENSION citus_columnar ADD TABLE columnar_internal.options; +ALTER EXTENSION citus_columnar ADD TABLE columnar_internal.stripe; +ALTER EXTENSION citus_columnar ADD TABLE columnar_internal.chunk_group; +ALTER EXTENSION citus_columnar ADD TABLE columnar_internal.chunk; + +ALTER EXTENSION citus_columnar ADD FUNCTION columnar_internal.columnar_handler; +ALTER EXTENSION citus_columnar ADD ACCESS METHOD columnar; +ALTER EXTENSION citus_columnar ADD FUNCTION pg_catalog.alter_columnar_table_set; +ALTER EXTENSION citus_columnar ADD FUNCTION pg_catalog.alter_columnar_table_reset; + +ALTER EXTENSION citus_columnar ADD FUNCTION citus_internal.upgrade_columnar_storage; +ALTER EXTENSION citus_columnar ADD FUNCTION citus_internal.downgrade_columnar_storage; +ALTER EXTENSION citus_columnar ADD FUNCTION citus_internal.columnar_ensure_am_depends_catalog; + +ALTER EXTENSION citus_columnar ADD FUNCTION columnar.get_storage_id; +ALTER EXTENSION citus_columnar ADD VIEW columnar.storage; +ALTER EXTENSION citus_columnar ADD VIEW columnar.options; +ALTER EXTENSION citus_columnar ADD VIEW columnar.stripe; +ALTER EXTENSION citus_columnar ADD VIEW columnar.chunk_group; +ALTER EXTENSION citus_columnar ADD VIEW columnar.chunk; + +-- move citus_internal functions to columnar_internal + +ALTER FUNCTION citus_internal.upgrade_columnar_storage(regclass) SET SCHEMA columnar_internal; +ALTER FUNCTION citus_internal.downgrade_columnar_storage(regclass) SET SCHEMA columnar_internal; +ALTER FUNCTION citus_internal.columnar_ensure_am_depends_catalog() SET SCHEMA columnar_internal; + + diff --git a/src/backend/columnar/sql/citus_columnar--11.1-0.sql b/src/backend/columnar/sql/citus_columnar--11.1-0.sql new file mode 100644 index 000000000..cf39d9c06 --- /dev/null +++ b/src/backend/columnar/sql/citus_columnar--11.1-0.sql @@ -0,0 +1 @@ +-- fake sql file 'Y' diff --git a/src/backend/columnar/sql/citus_columnar--11.1-1.sql b/src/backend/columnar/sql/citus_columnar--11.1-1.sql new file mode 100644 index 000000000..c1081aba2 --- /dev/null +++ b/src/backend/columnar/sql/citus_columnar--11.1-1.sql @@ -0,0 +1,435 @@ +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION citus_columnar" to load this file. \quit + +-- columnar--9.5-1--10.0-1.sql + +CREATE SCHEMA IF NOT EXISTS columnar; +SET search_path TO columnar; + + +CREATE SEQUENCE IF NOT EXISTS storageid_seq MINVALUE 10000000000 NO CYCLE; + +CREATE TABLE IF NOT EXISTS options ( + regclass regclass NOT NULL PRIMARY KEY, + chunk_group_row_limit int NOT NULL, + stripe_row_limit int NOT NULL, + compression_level int NOT NULL, + compression name NOT NULL +) WITH (user_catalog_table = true); + +COMMENT ON TABLE options IS 'columnar table specific options, maintained by alter_columnar_table_set'; + +CREATE TABLE IF NOT EXISTS stripe ( + storage_id bigint NOT NULL, + stripe_num bigint NOT NULL, + file_offset bigint NOT NULL, + data_length bigint NOT NULL, + column_count int NOT NULL, + chunk_row_count int NOT NULL, + row_count bigint NOT NULL, + chunk_group_count int NOT NULL, + first_row_number bigint NOT NULL, + PRIMARY KEY (storage_id, stripe_num), + CONSTRAINT stripe_first_row_number_idx UNIQUE (storage_id, first_row_number) +) WITH (user_catalog_table = true); + +COMMENT ON TABLE stripe IS 'Columnar per stripe metadata'; + +CREATE TABLE IF NOT EXISTS chunk_group ( + storage_id bigint NOT NULL, + stripe_num bigint NOT NULL, + chunk_group_num int NOT NULL, + row_count bigint NOT NULL, + PRIMARY KEY (storage_id, stripe_num, chunk_group_num) +); + +COMMENT ON TABLE chunk_group IS 'Columnar chunk group metadata'; + +CREATE TABLE IF NOT EXISTS chunk ( + storage_id bigint NOT NULL, + stripe_num bigint NOT NULL, + attr_num int NOT NULL, + chunk_group_num int NOT NULL, + minimum_value bytea, + maximum_value bytea, + value_stream_offset bigint NOT NULL, + value_stream_length bigint NOT NULL, + exists_stream_offset bigint NOT NULL, + exists_stream_length bigint NOT NULL, + value_compression_type int NOT NULL, + value_compression_level int NOT NULL, + value_decompressed_length bigint NOT NULL, + value_count bigint NOT NULL, + PRIMARY KEY (storage_id, stripe_num, attr_num, chunk_group_num) +) WITH (user_catalog_table = true); + +COMMENT ON TABLE chunk IS 'Columnar per chunk metadata'; + +DO $proc$ +BEGIN + +-- from version 12 and up we have support for tableam's if installed on pg11 we can't +-- create the objects here. Instead we rely on citus_finish_pg_upgrade to be called by the +-- user instead to add the missing objects +IF substring(current_Setting('server_version'), '\d+')::int >= 12 THEN + EXECUTE $$ +--#include "udfs/columnar_handler/10.0-1.sql" +CREATE OR REPLACE FUNCTION columnar.columnar_handler(internal) + RETURNS table_am_handler + LANGUAGE C +AS 'MODULE_PATHNAME', 'columnar_handler'; +COMMENT ON FUNCTION columnar.columnar_handler(internal) + IS 'internal function returning the handler for columnar tables'; + +-- postgres 11.8 does not support the syntax for table am, also it is seemingly trying +-- to parse the upgrade file and erroring on unknown syntax. +-- normally this section would not execute on postgres 11 anyway. To trick it to pass on +-- 11.8 we wrap the statement in a plpgsql block together with an EXECUTE. This is valid +-- syntax on 11.8 and will execute correctly in 12 +DO $create_table_am$ +BEGIN + EXECUTE 'CREATE ACCESS METHOD columnar TYPE TABLE HANDLER columnar.columnar_handler'; +END $create_table_am$; + +--#include "udfs/alter_columnar_table_set/10.0-1.sql" +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int DEFAULT NULL, + stripe_row_limit int DEFAULT NULL, + compression name DEFAULT null, + compression_level int DEFAULT NULL) + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'alter_columnar_table_set'; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int, + stripe_row_limit int, + compression name, + compression_level int) +IS 'set one or more options on a columnar table, when set to NULL no change is made'; + + +--#include "udfs/alter_columnar_table_reset/10.0-1.sql" +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool DEFAULT false, + stripe_row_limit bool DEFAULT false, + compression bool DEFAULT false, + compression_level bool DEFAULT false) + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'alter_columnar_table_reset'; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool, + stripe_row_limit bool, + compression bool, + compression_level bool) +IS 'reset on or more options on a columnar table to the system defaults'; + + $$; +END IF; +END$proc$; + +-- (this function being dropped in 10.0.3)->#include "udfs/columnar_ensure_objects_exist/10.0-1.sql" + +RESET search_path; + +-- columnar--10.0.-1 --10.0.2 +GRANT USAGE ON SCHEMA columnar TO PUBLIC; +GRANT SELECT ON ALL tables IN SCHEMA columnar TO PUBLIC ; + +-- columnar--10.0-3--10.1-1.sql + +-- Drop foreign keys between columnar metadata tables. + + +-- columnar--10.1-1--10.2-1.sql + +-- For a proper mapping between tid & (stripe, row_num), add a new column to +-- columnar.stripe and define a BTREE index on this column. +-- Also include storage_id column for per-relation scans. + + +-- Populate first_row_number column of columnar.stripe table. +-- +-- For simplicity, we calculate MAX(row_count) value across all the stripes +-- of all the columanar tables and then use it to populate first_row_number +-- column. This would introduce some gaps however we are okay with that since +-- it's already the case with regular INSERT/COPY's. +DO $$ +DECLARE + max_row_count bigint; + -- this should be equal to columnar_storage.h/COLUMNAR_FIRST_ROW_NUMBER + COLUMNAR_FIRST_ROW_NUMBER constant bigint := 1; +BEGIN + SELECT MAX(row_count) INTO max_row_count FROM columnar.stripe; + UPDATE columnar.stripe SET first_row_number = COLUMNAR_FIRST_ROW_NUMBER + + (stripe_num - 1) * max_row_count; +END; +$$; + +-- columnar--10.2-1--10.2-2.sql + +-- revoke read access for columnar.chunk from unprivileged +-- user as it contains chunk min/max values +REVOKE SELECT ON columnar.chunk FROM PUBLIC; + + +-- columnar--10.2-2--10.2-3.sql + +-- Since stripe_first_row_number_idx is required to scan a columnar table, we +-- need to make sure that it is created before doing anything with columnar +-- tables during pg upgrades. +-- +-- However, a plain btree index is not a dependency of a table, so pg_upgrade +-- cannot guarantee that stripe_first_row_number_idx gets created when +-- creating columnar.stripe, unless we make it a unique "constraint". +-- +-- To do that, drop stripe_first_row_number_idx and create a unique +-- constraint with the same name to keep the code change at minimum. + +-- columnar--10.2-3--10.2-4.sql + + +-- columnar--11.0-2--11.1-1.sql + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int DEFAULT NULL, + stripe_row_limit int DEFAULT NULL, + compression name DEFAULT null, + compression_level int DEFAULT NULL) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_set$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' SET ('; +begin + if (chunk_group_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit=' || chunk_group_row_limit; + noop := false; + end if; + if (stripe_row_limit is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit=' || stripe_row_limit; + noop := false; + end if; + if (compression is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression=' || compression; + noop := false; + end if; + if (compression_level is not null) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level=' || compression_level; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_set$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int, + stripe_row_limit int, + compression name, + compression_level int) +IS 'set one or more options on a columnar table, when set to NULL no change is made'; + +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool DEFAULT false, + stripe_row_limit bool DEFAULT false, + compression bool DEFAULT false, + compression_level bool DEFAULT false) + RETURNS void + LANGUAGE plpgsql AS +$alter_columnar_table_reset$ +declare + noop BOOLEAN := true; + cmd TEXT := 'ALTER TABLE ' || table_name::text || ' RESET ('; +begin + if (chunk_group_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.chunk_group_row_limit'; + noop := false; + end if; + if (stripe_row_limit) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.stripe_row_limit'; + noop := false; + end if; + if (compression) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression'; + noop := false; + end if; + if (compression_level) then + if (not noop) then cmd := cmd || ', '; end if; + cmd := cmd || 'columnar.compression_level'; + noop := false; + end if; + cmd := cmd || ')'; + if (not noop) then + execute cmd; + end if; + return; +end; +$alter_columnar_table_reset$; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool, + stripe_row_limit bool, + compression bool, + compression_level bool) +IS 'reset on or more options on a columnar table to the system defaults'; + +-- rename columnar schema to columnar_internal and tighten security + +REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA columnar FROM PUBLIC; +ALTER SCHEMA columnar RENAME TO columnar_internal; +REVOKE ALL PRIVILEGES ON SCHEMA columnar_internal FROM PUBLIC; + +-- create columnar schema with public usage privileges + +CREATE SCHEMA columnar; +GRANT USAGE ON SCHEMA columnar TO PUBLIC; + +--#include "udfs/upgrade_columnar_storage/10.2-1.sql" +CREATE OR REPLACE FUNCTION columnar_internal.upgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$upgrade_columnar_storage$$; + +COMMENT ON FUNCTION columnar_internal.upgrade_columnar_storage(regclass) + IS 'function to upgrade the columnar storage, if necessary'; + + +--#include "udfs/downgrade_columnar_storage/10.2-1.sql" + +CREATE OR REPLACE FUNCTION columnar_internal.downgrade_columnar_storage(rel regclass) + RETURNS VOID + STRICT + LANGUAGE c AS 'MODULE_PATHNAME', $$downgrade_columnar_storage$$; + +COMMENT ON FUNCTION columnar_internal.downgrade_columnar_storage(regclass) + IS 'function to downgrade the columnar storage, if necessary'; + +-- update UDF to account for columnar_internal schema +CREATE OR REPLACE FUNCTION columnar_internal.columnar_ensure_am_depends_catalog() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog +AS $func$ +BEGIN + INSERT INTO pg_depend + WITH columnar_schema_members(relid) AS ( + SELECT pg_class.oid AS relid FROM pg_class + WHERE relnamespace = + COALESCE( + (SELECT pg_namespace.oid FROM pg_namespace WHERE nspname = 'columnar_internal'), + (SELECT pg_namespace.oid FROM pg_namespace WHERE nspname = 'columnar') + ) + AND relname IN ('chunk', + 'chunk_group', + 'chunk_group_pkey', + 'chunk_pkey', + 'options', + 'options_pkey', + 'storageid_seq', + 'stripe', + 'stripe_first_row_number_idx', + 'stripe_pkey') + ) + SELECT -- Define a dependency edge from "columnar table access method" .. + 'pg_am'::regclass::oid as classid, + (select oid from pg_am where amname = 'columnar') as objid, + 0 as objsubid, + -- ... to each object that is registered to pg_class and that lives + -- in "columnar" schema. That contains catalog tables, indexes + -- created on them and the sequences created in "columnar" schema. + -- + -- Given the possibility of user might have created their own objects + -- in columnar schema, we explicitly specify list of objects that we + -- are interested in. + 'pg_class'::regclass::oid as refclassid, + columnar_schema_members.relid as refobjid, + 0 as refobjsubid, + 'n' as deptype + FROM columnar_schema_members + -- Avoid inserting duplicate entries into pg_depend. + EXCEPT TABLE pg_depend; +END; +$func$; +COMMENT ON FUNCTION columnar_internal.columnar_ensure_am_depends_catalog() + IS 'internal function responsible for creating dependencies from columnar ' + 'table access method to the rel objects in columnar schema'; + +SELECT columnar_internal.columnar_ensure_am_depends_catalog(); + +-- add utility function + +CREATE FUNCTION columnar.get_storage_id(regclass) RETURNS bigint + LANGUAGE C STRICT + AS 'citus_columnar', $$columnar_relation_storageid$$; + +-- create views for columnar table information + +CREATE VIEW columnar.storage WITH (security_barrier) AS + SELECT c.oid::regclass AS relation, + columnar.get_storage_id(c.oid) AS storage_id + FROM pg_class c, pg_am am + WHERE c.relam = am.oid AND am.amname = 'columnar' + AND pg_has_role(c.relowner, 'USAGE'); +COMMENT ON VIEW columnar.storage IS 'Columnar relation ID to storage ID mapping.'; +GRANT SELECT ON columnar.storage TO PUBLIC; + +CREATE VIEW columnar.options WITH (security_barrier) AS + SELECT regclass AS relation, chunk_group_row_limit, + stripe_row_limit, compression, compression_level + FROM columnar_internal.options o, pg_class c + WHERE o.regclass = c.oid + AND pg_has_role(c.relowner, 'USAGE'); +COMMENT ON VIEW columnar.options + IS 'Columnar options for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.options TO PUBLIC; + +CREATE VIEW columnar.stripe WITH (security_barrier) AS + SELECT relation, storage.storage_id, stripe_num, file_offset, data_length, + column_count, chunk_row_count, row_count, chunk_group_count, first_row_number + FROM columnar_internal.stripe stripe, columnar.storage storage + WHERE stripe.storage_id = storage.storage_id; +COMMENT ON VIEW columnar.stripe + IS 'Columnar stripe information for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.stripe TO PUBLIC; + +CREATE VIEW columnar.chunk_group WITH (security_barrier) AS + SELECT relation, storage.storage_id, stripe_num, chunk_group_num, row_count + FROM columnar_internal.chunk_group cg, columnar.storage storage + WHERE cg.storage_id = storage.storage_id; +COMMENT ON VIEW columnar.chunk_group + IS 'Columnar chunk group information for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.chunk_group TO PUBLIC; + +CREATE VIEW columnar.chunk WITH (security_barrier) AS + SELECT relation, storage.storage_id, stripe_num, attr_num, chunk_group_num, + minimum_value, maximum_value, value_stream_offset, value_stream_length, + exists_stream_offset, exists_stream_length, value_compression_type, + value_compression_level, value_decompressed_length, value_count + FROM columnar_internal.chunk chunk, columnar.storage storage + WHERE chunk.storage_id = storage.storage_id; +COMMENT ON VIEW columnar.chunk + IS 'Columnar chunk information for tables on which the current user has ownership privileges.'; +GRANT SELECT ON columnar.chunk TO PUBLIC; + diff --git a/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql b/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql index 1ee471117..4334a7a45 100644 --- a/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql +++ b/src/backend/columnar/sql/columnar--10.1-1--10.2-1.sql @@ -28,5 +28,5 @@ $$; #include "udfs/downgrade_columnar_storage/10.2-1.sql" -- upgrade storage for all columnar relations -SELECT citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a +PERFORM citus_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar'; diff --git a/src/backend/columnar/sql/columnar--10.2-3--10.2-4.sql b/src/backend/columnar/sql/columnar--10.2-3--10.2-4.sql index b4600a4bf..bdd8fb0c7 100644 --- a/src/backend/columnar/sql/columnar--10.2-3--10.2-4.sql +++ b/src/backend/columnar/sql/columnar--10.2-3--10.2-4.sql @@ -2,4 +2,4 @@ #include "udfs/columnar_ensure_am_depends_catalog/10.2-4.sql" -SELECT citus_internal.columnar_ensure_am_depends_catalog(); +PERFORM citus_internal.columnar_ensure_am_depends_catalog(); diff --git a/src/backend/columnar/sql/downgrades/citus_columnar--11.1-1--11.1-0.sql b/src/backend/columnar/sql/downgrades/citus_columnar--11.1-1--11.1-0.sql new file mode 100644 index 000000000..cd454a2e5 --- /dev/null +++ b/src/backend/columnar/sql/downgrades/citus_columnar--11.1-1--11.1-0.sql @@ -0,0 +1,116 @@ +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int DEFAULT NULL, + stripe_row_limit int DEFAULT NULL, + compression name DEFAULT null, + compression_level int DEFAULT NULL) + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'alter_columnar_table_set'; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_set( + table_name regclass, + chunk_group_row_limit int, + stripe_row_limit int, + compression name, + compression_level int) +IS 'set one or more options on a columnar table, when set to NULL no change is made'; +CREATE OR REPLACE FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool DEFAULT false, + stripe_row_limit bool DEFAULT false, + compression bool DEFAULT false, + compression_level bool DEFAULT false) + RETURNS void + LANGUAGE C +AS 'MODULE_PATHNAME', 'alter_columnar_table_reset'; + +COMMENT ON FUNCTION pg_catalog.alter_columnar_table_reset( + table_name regclass, + chunk_group_row_limit bool, + stripe_row_limit bool, + compression bool, + compression_level bool) +IS 'reset on or more options on a columnar table to the system defaults'; + +CREATE OR REPLACE FUNCTION columnar_internal.columnar_ensure_am_depends_catalog() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog +AS $func$ +BEGIN + INSERT INTO pg_depend + SELECT -- Define a dependency edge from "columnar table access method" .. + 'pg_am'::regclass::oid as classid, + (select oid from pg_am where amname = 'columnar') as objid, + 0 as objsubid, + -- ... to each object that is registered to pg_class and that lives + -- in "columnar" schema. That contains catalog tables, indexes + -- created on them and the sequences created in "columnar" schema. + -- + -- Given the possibility of user might have created their own objects + -- in columnar schema, we explicitly specify list of objects that we + -- are interested in. + 'pg_class'::regclass::oid as refclassid, + columnar_schema_members.relname::regclass::oid as refobjid, + 0 as refobjsubid, + 'n' as deptype + FROM (VALUES ('columnar.chunk'), + ('columnar.chunk_group'), + ('columnar.chunk_group_pkey'), + ('columnar.chunk_pkey'), + ('columnar.options'), + ('columnar.options_pkey'), + ('columnar.storageid_seq'), + ('columnar.stripe'), + ('columnar.stripe_first_row_number_idx'), + ('columnar.stripe_pkey') + ) columnar_schema_members(relname) + -- Avoid inserting duplicate entries into pg_depend. + EXCEPT TABLE pg_depend; +END; +$func$; +COMMENT ON FUNCTION columnar_internal.columnar_ensure_am_depends_catalog() + IS 'internal function responsible for creating dependencies from columnar ' + 'table access method to the rel objects in columnar schema'; + +DROP VIEW columnar.options; +DROP VIEW columnar.stripe; +DROP VIEW columnar.chunk_group; +DROP VIEW columnar.chunk; +DROP VIEW columnar.storage; +DROP FUNCTION columnar.get_storage_id(regclass); + +DROP SCHEMA columnar; + +-- move columnar_internal functions back to citus_internal + +ALTER FUNCTION columnar_internal.upgrade_columnar_storage(regclass) SET SCHEMA citus_internal; +ALTER FUNCTION columnar_internal.downgrade_columnar_storage(regclass) SET SCHEMA citus_internal; +ALTER FUNCTION columnar_internal.columnar_ensure_am_depends_catalog() SET SCHEMA citus_internal; + +ALTER SCHEMA columnar_internal RENAME TO columnar; +GRANT USAGE ON SCHEMA columnar TO PUBLIC; +GRANT SELECT ON columnar.options TO PUBLIC; +GRANT SELECT ON columnar.stripe TO PUBLIC; +GRANT SELECT ON columnar.chunk_group TO PUBLIC; + +-- detach relations from citus_columnar + +ALTER EXTENSION citus_columnar DROP SCHEMA columnar; +ALTER EXTENSION citus_columnar DROP SEQUENCE columnar.storageid_seq; +-- columnar tables +ALTER EXTENSION citus_columnar DROP TABLE columnar.options; +ALTER EXTENSION citus_columnar DROP TABLE columnar.stripe; +ALTER EXTENSION citus_columnar DROP TABLE columnar.chunk_group; +ALTER EXTENSION citus_columnar DROP TABLE columnar.chunk; + +ALTER EXTENSION citus_columnar DROP FUNCTION columnar.columnar_handler; +ALTER EXTENSION citus_columnar DROP ACCESS METHOD columnar; +ALTER EXTENSION citus_columnar DROP FUNCTION pg_catalog.alter_columnar_table_set; +ALTER EXTENSION citus_columnar DROP FUNCTION pg_catalog.alter_columnar_table_reset; + +-- functions under citus_internal for columnar +ALTER EXTENSION citus_columnar DROP FUNCTION citus_internal.upgrade_columnar_storage; +ALTER EXTENSION citus_columnar DROP FUNCTION citus_internal.downgrade_columnar_storage; +ALTER EXTENSION citus_columnar DROP FUNCTION citus_internal.columnar_ensure_am_depends_catalog; diff --git a/src/backend/distributed/citus--11.1-1.control b/src/backend/distributed/citus--11.1-1.control new file mode 100644 index 000000000..93c69fc63 --- /dev/null +++ b/src/backend/distributed/citus--11.1-1.control @@ -0,0 +1 @@ +requires = 'citus_columnar' diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index f18671c75..6f45cec5b 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -11,10 +11,12 @@ #include "postgres.h" #include "access/genam.h" +#include "access/xact.h" #include "citus_version.h" #include "catalog/dependency.h" #include "catalog/pg_depend.h" #include "catalog/pg_extension_d.h" +#include "columnar/columnar.h" #include "catalog/pg_foreign_data_wrapper.h" #include "commands/defrem.h" #include "commands/extension.h" @@ -745,6 +747,60 @@ IsCreateAlterExtensionUpdateCitusStmt(Node *parseTree) } +/* + * PreProcessCreateExtensionCitusStmtForColumnar determines whether need to + * install citus_columnar first or citus_columnar is supported on current + * citus version, when a given utility is a CREATE statement + */ +void +PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree) +{ + /*CREATE EXTENSION CITUS (version Z) */ + CreateExtensionStmt *createExtensionStmt = castNode(CreateExtensionStmt, + parsetree); + + if (strcmp(createExtensionStmt->extname, "citus") == 0) + { + int versionNumber = (int) (100 * strtod(CITUS_MAJORVERSION, NULL)); + DefElem *newVersionValue = GetExtensionOption(createExtensionStmt->options, + "new_version"); + + /*create extension citus version xxx*/ + if (newVersionValue) + { + char *newVersion = strdup(defGetString(newVersionValue)); + versionNumber = GetExtensionVersionNumber(newVersion); + } + + /*citus version >= 11.1 requires install citus_columnar first*/ + if (versionNumber >= 1110) + { + if (get_extension_oid("citus_columnar", true) == InvalidOid) + { + CreateExtensionWithVersion("citus_columnar", NULL); + } + } + } + + /*Edge case check: citus_columnar are supported on citus version >= 11.1*/ + if (strcmp(createExtensionStmt->extname, "citus_columnar") == 0) + { + Oid citusOid = get_extension_oid("citus", true); + if (citusOid != InvalidOid) + { + char *curCitusVersion = strdup(get_extension_version(citusOid)); + int curCitusVersionNum = GetExtensionVersionNumber(curCitusVersion); + if (curCitusVersionNum < 1110) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg( + "must upgrade citus to version 11.1-1 first before install citus_columnar"))); + } + } + } +} + + /* * IsDropCitusExtensionStmt iterates the objects to be dropped in a drop statement * and try to find citus extension there. @@ -812,6 +868,101 @@ IsAlterExtensionSetSchemaCitus(Node *parseTree) } +/* + * PreprocessAlterExtensionCitusStmtForCitusColumnar pre-process the case when upgrade citus + * to version that support citus_columnar, or downgrade citus to lower version that + * include columnar inside citus extension + */ +void +PreprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) +{ + /*upgrade citus: alter extension citus update to 'xxx' */ + DefElem *newVersionValue = GetExtensionOption( + ((AlterExtensionStmt *) parseTree)->options, "new_version"); + Oid citusColumnarOid = get_extension_oid("citus_columnar", true); + if (newVersionValue) + { + char *newVersion = defGetString(newVersionValue); + double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + + /*alter extension citus update to version >= 11.1-1, and no citus_columnar installed */ + if (newVersionNumber >= 1110 && citusColumnarOid == InvalidOid) + { + /*it's upgrade citus to 11.1-1 or further version */ + CreateExtensionWithVersion("citus_columnar", CITUS_COLUMNAR_INTERNAL_VERSION); + } + else if (newVersionNumber < 1110 && citusColumnarOid != InvalidOid) + { + /*downgrade citus, need downgrade citus_columnar to Y */ + AlterExtensionUpdateStmt("citus_columnar", CITUS_COLUMNAR_INTERNAL_VERSION); + } + } + else + { + /*alter extension citus update without specifying the version*/ + int versionNumber = (int) (100 * strtod(CITUS_MAJORVERSION, NULL)); + if (versionNumber >= 1110) + { + if (citusColumnarOid == InvalidOid) + { + CreateExtensionWithVersion("citus_columnar", + CITUS_COLUMNAR_INTERNAL_VERSION); + } + } + } +} + + +/* + * PostprocessAlterExtensionCitusStmtForCitusColumnar process the case when upgrade citus + * to version that support citus_columnar, or downgrade citus to lower version that + * include columnar inside citus extension + */ +void +PostprocessAlterExtensionCitusStmtForCitusColumnar(Node *parseTree) +{ + DefElem *newVersionValue = GetExtensionOption( + ((AlterExtensionStmt *) parseTree)->options, "new_version"); + Oid citusColumnarOid = get_extension_oid("citus_columnar", true); + if (newVersionValue) + { + char *newVersion = defGetString(newVersionValue); + double newVersionNumber = GetExtensionVersionNumber(strdup(newVersion)); + if (newVersionNumber >= 1110 && citusColumnarOid != InvalidOid) + { + /*upgrade citus, after "ALTER EXTENSION citus update to xxx" updates citus_columnar Y to version Z. */ + char *curColumnarVersion = get_extension_version(citusColumnarOid); + if (strcmp(curColumnarVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0) + { + AlterExtensionUpdateStmt("citus_columnar", "11.1-1"); + } + } + else if (newVersionNumber < 1110 && citusColumnarOid != InvalidOid) + { + /*downgrade citus, after "ALTER EXTENSION citus update to xxx" drops citus_columnar extension */ + char *curColumnarVersion = get_extension_version(citusColumnarOid); + if (strcmp(curColumnarVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0) + { + RemoveExtensionById(citusColumnarOid); + } + } + } + else + { + /*alter extension citus update, need upgrade citus_columnar from Y to Z*/ + int versionNumber = (int) (100 * strtod(CITUS_MAJORVERSION, NULL)); + if (versionNumber >= 1110) + { + char *curColumnarVersion = get_extension_version(citusColumnarOid); + if (strcmp(curColumnarVersion, CITUS_COLUMNAR_INTERNAL_VERSION) == 0) + { + AlterExtensionUpdateStmt("citus_columnar", "11.1-1"); + } + } + } +} + + /* * CreateExtensionDDLCommand returns a list of DDL statements (const char *) to be * executed on a node to recreate the extension addressed by the extensionAddress. @@ -1025,3 +1176,80 @@ AlterExtensionUpdateStmtObjectAddress(Node *node, bool missing_ok) return address; } + + +/* + * CreateExtensionWithVersion builds and execute create extension statements + * per given extension name and extension verision + */ +void +CreateExtensionWithVersion(char *extname, char *extVersion) +{ + CreateExtensionStmt *createExtensionStmt = makeNode(CreateExtensionStmt); + + /* set location to -1 as it is unknown */ + int location = -1; + + /* set extension name and if_not_exists fields */ + createExtensionStmt->extname = extname; + createExtensionStmt->if_not_exists = true; + + if (extVersion == NULL) + { + createExtensionStmt->options = NIL; + } + else + { + Node *extensionVersionArg = (Node *) makeString(extVersion); + DefElem *extensionVersionElement = makeDefElem("new_version", extensionVersionArg, + location); + createExtensionStmt->options = lappend(createExtensionStmt->options, + extensionVersionElement); + } + + CreateExtension(NULL, createExtensionStmt); + CommandCounterIncrement(); +} + + +/* + * GetExtensionVersionNumber convert extension version to real value + */ +int +GetExtensionVersionNumber(char *extVersion) +{ + char *strtokPosition = NULL; + char *versionVal = strtok_r(extVersion, "-", &strtokPosition); + double versionNumber = strtod(versionVal, NULL); + return (int) (versionNumber * 100); +} + + +/* + * AlterExtensionUpdateStmt builds and execute Alter extension statements + * per given extension name and updates extension verision + */ +void +AlterExtensionUpdateStmt(char *extname, char *extVersion) +{ + AlterExtensionStmt *alterExtensionStmt = makeNode(AlterExtensionStmt); + + /* set location to -1 as it is unknown */ + int location = -1; + alterExtensionStmt->extname = extname; + + if (extVersion == NULL) + { + ereport(ERROR, (errcode(ERRCODE_UNDEFINED_OBJECT), + errmsg("alter extension \"%s\" should not be empty", + extVersion))); + } + + Node *extensionVersionArg = (Node *) makeString(extVersion); + DefElem *extensionVersionElement = makeDefElem("new_version", extensionVersionArg, + location); + alterExtensionStmt->options = lappend(alterExtensionStmt->options, + extensionVersionElement); + ExecAlterExtensionStmt(NULL, alterExtensionStmt); + CommandCounterIncrement(); +} diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index cb6ab86a3..aae9c6104 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -38,8 +38,10 @@ #endif #include "catalog/catalog.h" #include "catalog/dependency.h" +#include "citus_version.h" #include "commands/dbcommands.h" #include "commands/defrem.h" +#include "commands/extension.h" #include "commands/tablecmds.h" #include "distributed/adaptive_executor.h" #include "distributed/backend_data.h" @@ -74,8 +76,10 @@ #include "lib/stringinfo.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" +#include "nodes/makefuncs.h" #include "tcop/utility.h" #include "utils/builtins.h" +#include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/syscache.h" @@ -197,6 +201,15 @@ multi_ProcessUtility(PlannedStmt *pstmt, ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree); } + if (IsA(parsetree, CreateExtensionStmt)) + { + /* + * Postgres forbids creating/altering other extensions from within an extension script, so we use a utility hook instead + * This preprocess check whether citus_columnar should be installed first before citus + */ + PreprocessCreateExtensionStmtForCitusColumnar(parsetree); + } + if (!CitusHasBeenLoaded()) { /* @@ -662,11 +675,26 @@ ProcessUtilityInternal(PlannedStmt *pstmt, if (isAlterExtensionUpdateCitusStmt) { citusCanBeUpdatedToAvailableVersion = !InstalledAndAvailableVersionsSame(); + + /* + * Check whether need to install/drop citus_columnar when upgrade/downgrade citus + */ + PreprocessAlterExtensionCitusStmtForCitusColumnar(parsetree); } PrevProcessUtility_compat(pstmt, queryString, false, context, params, queryEnv, dest, completionTag); + if (isAlterExtensionUpdateCitusStmt) + { + /* + * Post process, upgrade citus_columnar from fake internal version to normal version if upgrade citus + * or drop citus_columnar fake version when downgrade citus to older version that do not support + * citus_columnar + */ + PostprocessAlterExtensionCitusStmtForCitusColumnar(parsetree); + } + /* * if we are running ALTER EXTENSION citus UPDATE (to "") command, we may need * to mark existing objects as distributed depending on the "version" parameter if diff --git a/src/backend/distributed/sql/citus--10.0-1--10.0-2.sql b/src/backend/distributed/sql/citus--10.0-1--10.0-2.sql index 746674a5a..d5d1c10cf 100644 --- a/src/backend/distributed/sql/citus--10.0-1--10.0-2.sql +++ b/src/backend/distributed/sql/citus--10.0-1--10.0-2.sql @@ -1,5 +1,17 @@ -- citus--10.0-1--10.0-2 -#include "../../columnar/sql/columnar--10.0-1--10.0-2.sql" +--#include "../../columnar/sql/columnar--10.0-1--10.0-2.sql" +DO $check_columnar$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.0-1--10.0-2.sql" + END IF; +END; +$check_columnar$; GRANT SELECT ON public.citus_tables TO public; + diff --git a/src/backend/distributed/sql/citus--10.0-4--10.1-1.sql b/src/backend/distributed/sql/citus--10.0-4--10.1-1.sql index 1b9b32e34..33cb46107 100644 --- a/src/backend/distributed/sql/citus--10.0-4--10.1-1.sql +++ b/src/backend/distributed/sql/citus--10.0-4--10.1-1.sql @@ -3,13 +3,25 @@ -- add the current database to the distributed objects if not already in there. -- this is to reliably propagate some of the alter database commands that might be -- supported. + INSERT INTO citus.pg_dist_object SELECT 'pg_catalog.pg_database'::regclass::oid AS oid, (SELECT oid FROM pg_database WHERE datname = current_database()) as objid, 0 as objsubid ON CONFLICT DO NOTHING; -#include "../../columnar/sql/columnar--10.0-3--10.1-1.sql" +--#include "../../columnar/sql/columnar--10.0-3--10.1-1.sql" +DO $check_columnar$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.0-3--10.1-1.sql" + END IF; +END; +$check_columnar$; #include "udfs/create_distributed_table/10.1-1.sql"; #include "udfs/worker_partitioned_relation_total_size/10.1-1.sql" #include "udfs/worker_partitioned_relation_size/10.1-1.sql" diff --git a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql index 507a141c5..2dac31069 100644 --- a/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql +++ b/src/backend/distributed/sql/citus--10.1-1--10.2-1.sql @@ -9,7 +9,19 @@ GRANT ALL ON FUNCTION pg_catalog.worker_record_sequence_dependency(regclass,regc ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupid_unique_index UNIQUE (shardid, groupid); #include "udfs/stop_metadata_sync_to_node/10.2-1.sql" -#include "../../columnar/sql/columnar--10.1-1--10.2-1.sql" +--#include "../../columnar/sql/columnar--10.1-1--10.2-1.sql" +DO $check_columnar$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.1-1--10.2-1.sql" + END IF; +END; +$check_columnar$; + #include "udfs/citus_internal_add_partition_metadata/10.2-1.sql"; #include "udfs/citus_internal_add_shard_metadata/10.2-1.sql"; #include "udfs/citus_internal_add_placement_metadata/10.2-1.sql"; @@ -21,6 +33,7 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi #include "udfs/get_missing_time_partition_ranges/10.2-1.sql" #include "udfs/worker_nextval/10.2-1.sql" + DROP FUNCTION pg_catalog.citus_drop_all_shards(regclass, text, text); CREATE FUNCTION pg_catalog.citus_drop_all_shards(logicalrelid regclass, schema_name text, @@ -34,3 +47,4 @@ COMMENT ON FUNCTION pg_catalog.citus_drop_all_shards(regclass, text, text, boole #include "udfs/citus_drop_trigger/10.2-1.sql"; #include "udfs/citus_prepare_pg_upgrade/10.2-1.sql" #include "udfs/citus_finish_pg_upgrade/10.2-1.sql" + diff --git a/src/backend/distributed/sql/citus--10.2-1--10.2-2.sql b/src/backend/distributed/sql/citus--10.2-1--10.2-2.sql index df2b65f87..eef1152a9 100644 --- a/src/backend/distributed/sql/citus--10.2-1--10.2-2.sql +++ b/src/backend/distributed/sql/citus--10.2-1--10.2-2.sql @@ -2,4 +2,16 @@ -- bump version to 10.2-2 -#include "../../columnar/sql/columnar--10.2-1--10.2-2.sql" +--#include "../../columnar/sql/columnar--10.2-1--10.2-2.sql" +DO $check_columnar$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.2-1--10.2-2.sql" + END IF; +END; +$check_columnar$; + diff --git a/src/backend/distributed/sql/citus--10.2-2--10.2-3.sql b/src/backend/distributed/sql/citus--10.2-2--10.2-3.sql index bbfb59ae9..66316eaf0 100644 --- a/src/backend/distributed/sql/citus--10.2-2--10.2-3.sql +++ b/src/backend/distributed/sql/citus--10.2-2--10.2-3.sql @@ -2,4 +2,15 @@ -- bump version to 10.2-3 -#include "../../columnar/sql/columnar--10.2-2--10.2-3.sql" +--#include "../../columnar/sql/columnar--10.2-2--10.2-3.sql" +DO $check_columnar$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.2-2--10.2-3.sql" + END IF; +END; +$check_columnar$; diff --git a/src/backend/distributed/sql/citus--10.2-3--10.2-4.sql b/src/backend/distributed/sql/citus--10.2-3--10.2-4.sql index b32b12066..3c1f8afb0 100644 --- a/src/backend/distributed/sql/citus--10.2-3--10.2-4.sql +++ b/src/backend/distributed/sql/citus--10.2-3--10.2-4.sql @@ -2,9 +2,20 @@ -- bump version to 10.2-4 -#include "../../columnar/sql/columnar--10.2-3--10.2-4.sql" +DO $check_columnar$ +BEGIN +IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--10.2-3--10.2-4.sql" +END IF; +END; +$check_columnar$; #include "udfs/fix_partition_shard_index_names/10.2-4.sql" #include "udfs/fix_all_partition_shard_index_names/10.2-4.sql" #include "udfs/worker_fix_partition_shard_index_names/10.2-4.sql" #include "udfs/citus_finish_pg_upgrade/10.2-4.sql" + diff --git a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql index 20273e914..625a62ca7 100644 --- a/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql +++ b/src/backend/distributed/sql/citus--11.0-3--11.1-1.sql @@ -7,7 +7,59 @@ DROP FUNCTION pg_catalog.worker_merge_files_into_table(bigint, integer, text[], DROP FUNCTION pg_catalog.worker_range_partition_table(bigint, integer, text, text, oid, anyarray); DROP FUNCTION pg_catalog.worker_repartition_cleanup(bigint); -#include "../../columnar/sql/columnar--11.0-3--11.1-1.sql" +DO $check_columnar$ +BEGIN +IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN + #include "../../columnar/sql/columnar--11.0-3--11.1-1.sql" +END IF; +END; +$check_columnar$; + +-- If upgrading citus, the columnar objects are already being a part of the +-- citus extension, and must be detached so that they can be attached +-- to the citus_columnar extension. +DO $check_citus$ +BEGIN + IF EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus' and p.proname = 'columnar_handler' + ) THEN + ALTER EXTENSION citus DROP SCHEMA columnar; + ALTER EXTENSION citus DROP SCHEMA columnar_internal; + ALTER EXTENSION citus DROP SEQUENCE columnar_internal.storageid_seq; + + -- columnar tables + ALTER EXTENSION citus DROP TABLE columnar_internal.options; + ALTER EXTENSION citus DROP TABLE columnar_internal.stripe; + ALTER EXTENSION citus DROP TABLE columnar_internal.chunk_group; + ALTER EXTENSION citus DROP TABLE columnar_internal.chunk; + + ALTER EXTENSION citus DROP FUNCTION columnar_internal.columnar_handler; + ALTER EXTENSION citus DROP ACCESS METHOD columnar; + ALTER EXTENSION citus DROP FUNCTION pg_catalog.alter_columnar_table_set; + ALTER EXTENSION citus DROP FUNCTION pg_catalog.alter_columnar_table_reset; + ALTER EXTENSION citus DROP FUNCTION columnar.get_storage_id; + + -- columnar view + ALTER EXTENSION citus DROP VIEW columnar.storage; + ALTER EXTENSION citus DROP VIEW columnar.options; + ALTER EXTENSION citus DROP VIEW columnar.stripe; + ALTER EXTENSION citus DROP VIEW columnar.chunk_group; + ALTER EXTENSION citus DROP VIEW columnar.chunk; + + -- functions under citus_internal for columnar + ALTER EXTENSION citus DROP FUNCTION citus_internal.upgrade_columnar_storage; + ALTER EXTENSION citus DROP FUNCTION citus_internal.downgrade_columnar_storage; + ALTER EXTENSION citus DROP FUNCTION citus_internal.columnar_ensure_am_depends_catalog; + + END IF; +END $check_citus$; +#include "udfs/citus_finish_pg_upgrade/11.1-1.sql" DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, diff --git a/src/backend/distributed/sql/citus--9.5-1--10.0-4.sql b/src/backend/distributed/sql/citus--9.5-1--10.0-4.sql index 3f035f3a6..f6d0198da 100644 --- a/src/backend/distributed/sql/citus--9.5-1--10.0-4.sql +++ b/src/backend/distributed/sql/citus--9.5-1--10.0-4.sql @@ -6,7 +6,6 @@ -- cat citus--9.5-1--10.0-1.sql citus--10.0-1--10.0-2.sql citus--10.0-2--10.0-3.sql > citus--9.5-1--10.0-4.sql -- copy of citus--9.5-1--10.0-1 - DROP FUNCTION pg_catalog.upgrade_to_reference_table(regclass); DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); @@ -35,7 +34,18 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass); #include "udfs/worker_change_sequence_dependency/10.0-1.sql" #include "udfs/remove_local_tables_from_metadata/10.0-1.sql" +--#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" +DO $check_columnar$ +BEGIN +IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN #include "../../columnar/sql/columnar--9.5-1--10.0-1.sql" +END IF; +END; +$check_columnar$; #include "udfs/time_partition_range/10.0-1.sql" #include "udfs/time_partitions/10.0-1.sql" @@ -172,7 +182,19 @@ GRANT SELECT ON pg_catalog.citus_worker_stat_activity TO PUBLIC; -- copy of citus--10.0-1--10.0-2 +--#include "../../columnar/sql/columnar--10.0-1--10.0-2.sql" +DO $check_columnar$ +BEGIN +IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_extension AS e + INNER JOIN pg_catalog.pg_depend AS d ON (d.refobjid = e.oid) + INNER JOIN pg_catalog.pg_proc AS p ON (p.oid = d.objid) + WHERE e.extname='citus_columnar' and p.proname = 'columnar_handler' + ) THEN #include "../../columnar/sql/columnar--10.0-1--10.0-2.sql" +END IF; +END; +$check_columnar$; + -- copy of citus--10.0-2--10.0-3 @@ -215,3 +237,4 @@ COMMENT ON FUNCTION pg_catalog.citus_get_active_worker_nodes() RESET search_path; + diff --git a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql index e832d9730..5c0390061 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.1-1--11.0-3.sql @@ -46,7 +46,23 @@ CREATE FUNCTION pg_catalog.worker_repartition_cleanup(bigint) STRICT AS 'MODULE_PATHNAME', $function$worker_repartition_cleanup$function$; -#include "../../../columnar/sql/downgrades/columnar--11.1-1--11.0-3.sql" +-- add relations to citus +ALTER EXTENSION citus ADD SCHEMA columnar; +ALTER EXTENSION citus ADD SEQUENCE columnar.storageid_seq; +ALTER EXTENSION citus ADD TABLE columnar.options; +ALTER EXTENSION citus ADD TABLE columnar.stripe; +ALTER EXTENSION citus ADD TABLE columnar.chunk_group; +ALTER EXTENSION citus ADD TABLE columnar.chunk; + +ALTER EXTENSION citus ADD FUNCTION columnar.columnar_handler; +ALTER EXTENSION citus ADD ACCESS METHOD columnar; +ALTER EXTENSION citus ADD FUNCTION pg_catalog.alter_columnar_table_set; +ALTER EXTENSION citus ADD FUNCTION pg_catalog.alter_columnar_table_reset; + +ALTER EXTENSION citus ADD FUNCTION citus_internal.upgrade_columnar_storage; +ALTER EXTENSION citus ADD FUNCTION citus_internal.downgrade_columnar_storage; +ALTER EXTENSION citus ADD FUNCTION citus_internal.columnar_ensure_am_depends_catalog; + DROP FUNCTION pg_catalog.get_all_active_transactions(OUT datid oid, OUT process_id int, OUT initiator_node_identifier int4, OUT worker_query BOOL, OUT transaction_number int8, OUT transaction_stamp timestamptz, diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql new file mode 100644 index 000000000..caa80d51e --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/11.1-1.sql @@ -0,0 +1,151 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_finish_pg_upgrade() + RETURNS void + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cppu$ +DECLARE + table_name regclass; + command text; + trigger_name text; +BEGIN + + + IF substring(current_Setting('server_version'), '\d+')::int >= 14 THEN + EXECUTE $cmd$ + -- disable propagation to prevent EnsureCoordinator errors + -- the aggregate created here does not depend on Citus extension (yet) + -- since we add the dependency with the next command + SET citus.enable_ddl_propagation TO OFF; + CREATE AGGREGATE array_cat_agg(anycompatiblearray) (SFUNC = array_cat, STYPE = anycompatiblearray); + COMMENT ON AGGREGATE array_cat_agg(anycompatiblearray) + IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; + $cmd$; + ELSE + EXECUTE $cmd$ + SET citus.enable_ddl_propagation TO OFF; + CREATE AGGREGATE array_cat_agg(anyarray) (SFUNC = array_cat, STYPE = anyarray); + COMMENT ON AGGREGATE array_cat_agg(anyarray) + IS 'concatenate input arrays into a single array'; + RESET citus.enable_ddl_propagation; + $cmd$; + END IF; + + -- + -- Citus creates the array_cat_agg but because of a compatibility + -- issue between pg13-pg14, we drop and create it during upgrade. + -- And as Citus creates it, there needs to be a dependency to the + -- Citus extension, so we create that dependency here. + -- We are not using: + -- ALTER EXENSION citus DROP/CREATE AGGREGATE array_cat_agg + -- because we don't have an easy way to check if the aggregate + -- exists with anyarray type or anycompatiblearray type. + + INSERT INTO pg_depend + SELECT + 'pg_proc'::regclass::oid as classid, + (SELECT oid FROM pg_proc WHERE proname = 'array_cat_agg') as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'e' as deptype; + + -- + -- restore citus catalog tables + -- + INSERT INTO pg_catalog.pg_dist_partition SELECT * FROM public.pg_dist_partition; + INSERT INTO pg_catalog.pg_dist_shard SELECT * FROM public.pg_dist_shard; + INSERT INTO pg_catalog.pg_dist_placement SELECT * FROM public.pg_dist_placement; + INSERT INTO pg_catalog.pg_dist_node_metadata SELECT * FROM public.pg_dist_node_metadata; + INSERT INTO pg_catalog.pg_dist_node SELECT * FROM public.pg_dist_node; + INSERT INTO pg_catalog.pg_dist_local_group SELECT * FROM public.pg_dist_local_group; + INSERT INTO pg_catalog.pg_dist_transaction SELECT * FROM public.pg_dist_transaction; + INSERT INTO pg_catalog.pg_dist_colocation SELECT * FROM public.pg_dist_colocation; + -- enterprise catalog tables + INSERT INTO pg_catalog.pg_dist_authinfo SELECT * FROM public.pg_dist_authinfo; + INSERT INTO pg_catalog.pg_dist_poolinfo SELECT * FROM public.pg_dist_poolinfo; + + INSERT INTO pg_catalog.pg_dist_rebalance_strategy SELECT + name, + default_strategy, + shard_cost_function::regprocedure::regproc, + node_capacity_function::regprocedure::regproc, + shard_allowed_on_node_function::regprocedure::regproc, + default_threshold, + minimum_threshold, + improvement_threshold + FROM public.pg_dist_rebalance_strategy; + + -- + -- drop backup tables + -- + DROP TABLE public.pg_dist_authinfo; + DROP TABLE public.pg_dist_colocation; + DROP TABLE public.pg_dist_local_group; + DROP TABLE public.pg_dist_node; + DROP TABLE public.pg_dist_node_metadata; + DROP TABLE public.pg_dist_partition; + DROP TABLE public.pg_dist_placement; + DROP TABLE public.pg_dist_poolinfo; + DROP TABLE public.pg_dist_shard; + DROP TABLE public.pg_dist_transaction; + DROP TABLE public.pg_dist_rebalance_strategy; + + -- + -- reset sequences + -- + PERFORM setval('pg_catalog.pg_dist_shardid_seq', (SELECT MAX(shardid)+1 AS max_shard_id FROM pg_dist_shard), false); + PERFORM setval('pg_catalog.pg_dist_placement_placementid_seq', (SELECT MAX(placementid)+1 AS max_placement_id FROM pg_dist_placement), false); + PERFORM setval('pg_catalog.pg_dist_groupid_seq', (SELECT MAX(groupid)+1 AS max_group_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_node_nodeid_seq', (SELECT MAX(nodeid)+1 AS max_node_id FROM pg_dist_node), false); + PERFORM setval('pg_catalog.pg_dist_colocationid_seq', (SELECT MAX(colocationid)+1 AS max_colocation_id FROM pg_dist_colocation), false); + + -- + -- register triggers + -- + FOR table_name IN SELECT logicalrelid FROM pg_catalog.pg_dist_partition + LOOP + trigger_name := 'truncate_trigger_' || table_name::oid; + command := 'create trigger ' || trigger_name || ' after truncate on ' || table_name || ' execute procedure pg_catalog.citus_truncate_trigger()'; + EXECUTE command; + command := 'update pg_trigger set tgisinternal = true where tgname = ' || quote_literal(trigger_name); + EXECUTE command; + END LOOP; + + -- + -- set dependencies + -- + INSERT INTO pg_depend + SELECT + 'pg_class'::regclass::oid as classid, + p.logicalrelid::regclass::oid as objid, + 0 as objsubid, + 'pg_extension'::regclass::oid as refclassid, + (select oid from pg_extension where extname = 'citus') as refobjid, + 0 as refobjsubid , + 'n' as deptype + FROM pg_catalog.pg_dist_partition p; + + -- set dependencies for columnar table access method + PERFORM columnar_internal.columnar_ensure_am_depends_catalog(); + + -- restore pg_dist_object from the stable identifiers + TRUNCATE pg_catalog.pg_dist_object; + INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid, distribution_argument_index, colocationid) + SELECT + address.classid, + address.objid, + address.objsubid, + naming.distribution_argument_index, + naming.colocationid + FROM + public.pg_dist_object naming, + pg_catalog.pg_get_object_address(naming.type, naming.object_names, naming.object_args) address; + + DROP TABLE public.pg_dist_object; +END; +$cppu$; + +COMMENT ON FUNCTION pg_catalog.citus_finish_pg_upgrade() + IS 'perform tasks to restore citus settings from a location that has been prepared before pg_upgrade'; diff --git a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql index 2c2635687..caa80d51e 100644 --- a/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_finish_pg_upgrade/latest.sql @@ -128,7 +128,7 @@ BEGIN FROM pg_catalog.pg_dist_partition p; -- set dependencies for columnar table access method - PERFORM citus_internal.columnar_ensure_am_depends_catalog(); + PERFORM columnar_internal.columnar_ensure_am_depends_catalog(); -- restore pg_dist_object from the stable identifiers TRUNCATE pg_catalog.pg_dist_object; diff --git a/src/include/columnar/columnar.h b/src/include/columnar/columnar.h index e8868808e..7b2924576 100644 --- a/src/include/columnar/columnar.h +++ b/src/include/columnar/columnar.h @@ -53,6 +53,9 @@ #define COLUMNAR_POSTSCRIPT_SIZE_MAX 256 #define COLUMNAR_BYTES_PER_PAGE (BLCKSZ - SizeOfPageHeaderData) +/*global variables for citus_columnar fake version Y */ +#define CITUS_COLUMNAR_INTERNAL_VERSION "11.1-0" + /* * ColumnarOptions holds the option values to be used when reading or writing * a columnar table. To resolve these values, we first check foreign table's options, diff --git a/src/include/columnar/columnar_tableam.h b/src/include/columnar/columnar_tableam.h index 2f06e0972..04f93fe30 100644 --- a/src/include/columnar/columnar_tableam.h +++ b/src/include/columnar/columnar_tableam.h @@ -58,6 +58,9 @@ extern TableScanDesc columnar_beginscan_extended(Relation relation, Snapshot sna extern int64 ColumnarScanChunkGroupsFiltered(ColumnarScanDesc columnarScanDesc); extern bool ColumnarSupportsIndexAM(char *indexAMName); extern bool IsColumnarTableAmTable(Oid relationId); - +extern void CheckCitusColumnarCreateExtensionStmt(Node *parseTree); +extern void CheckCitusColumnarAlterExtensionStmt(Node *parseTree); +extern DefElem * GetExtensionOption(List *extensionOptions, + const char *defname); #endif /* COLUMNAR_TABLEAM_H */ diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index ad5c4eb5d..370810cfa 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -184,6 +184,9 @@ extern Oid get_constraint_typid(Oid conoid); extern bool IsDropCitusExtensionStmt(Node *parsetree); extern List * GetDependentFDWsToExtension(Oid extensionId); extern bool IsCreateAlterExtensionUpdateCitusStmt(Node *parsetree); +extern void PreprocessCreateExtensionStmtForCitusColumnar(Node *parsetree); +extern void PreprocessAlterExtensionCitusStmtForCitusColumnar(Node *parsetree); +extern void PostprocessAlterExtensionCitusStmtForCitusColumnar(Node *parsetree); extern bool ShouldMarkRelationDistributed(Oid relationId); extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree); extern List * PostprocessCreateExtensionStmt(Node *stmt, const char *queryString); @@ -209,7 +212,9 @@ extern ObjectAddress AlterExtensionSchemaStmtObjectAddress(Node *stmt, bool missing_ok); extern ObjectAddress AlterExtensionUpdateStmtObjectAddress(Node *stmt, bool missing_ok); - +extern void CreateExtensionWithVersion(char *extname, char *extVersion); +extern void AlterExtensionUpdateStmt(char *extname, char *extVersion); +extern int GetExtensionVersionNumber(char *extVersion); /* foreign_constraint.c - forward declarations */ extern bool ConstraintIsAForeignKeyToReferenceTable(char *constraintName, diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index ec42e0556..3847e67ea 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -209,7 +209,7 @@ check-operations: all -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/operations_schedule $(EXTRA_TESTS) check-columnar: - $(pg_regress_multi_check) --load-extension=citus \ + $(pg_regress_multi_check) --load-extension=citus_columnar --load-extension=citus \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/columnar_schedule $(EXTRA_TESTS) check-columnar-isolation: all $(isolation_test_files) diff --git a/src/test/regress/expected/columnar_drop.out b/src/test/regress/expected/columnar_drop.out index 8d2c519a1..75333c1e8 100644 --- a/src/test/regress/expected/columnar_drop.out +++ b/src/test/regress/expected/columnar_drop.out @@ -42,13 +42,13 @@ NOTICE: Citus partially supports CREATE DATABASE for distributed databases DETAIL: Citus does not propagate CREATE DATABASE command to workers HINT: You can manually create a database and its extensions on workers. \c db_to_drop -CREATE EXTENSION citus; +CREATE EXTENSION citus_columnar; SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset CREATE TABLE test_table(data int) USING columnar; -DROP EXTENSION citus CASCADE; +DROP EXTENSION citus_columnar CASCADE; NOTICE: drop cascades to table test_table -- test database drop -CREATE EXTENSION citus; +CREATE EXTENSION citus_columnar; SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset CREATE TABLE test_table(data int) USING columnar; \c :datname diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3f1482715..33cab2776 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -103,6 +103,7 @@ ORDER BY 1, 2; -- DROP EXTENSION pre-created by the regression suite DROP EXTENSION citus; +DROP EXTENSION citus_columnar; \c -- these tests switch between citus versions and call ddl's that require pg_dist_object to be created SET citus.enable_metadata_sync TO 'false'; @@ -1079,8 +1080,14 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Snapshot of state at 11.1-1 ALTER EXTENSION citus UPDATE TO '11.1-1'; SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- + access method columnar | + function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | + function alter_columnar_table_set(regclass,integer,integer,name,integer) void | + function citus_internal.columnar_ensure_am_depends_catalog() void | + function citus_internal.downgrade_columnar_storage(regclass) void | + function citus_internal.upgrade_columnar_storage(regclass) void | function columnar.columnar_handler(internal) table_am_handler | function worker_cleanup_job_schema_cache() void | function worker_create_schema(bigint,text) void | @@ -1090,25 +1097,13 @@ SELECT * FROM multi_extension.print_extension_changes(); function worker_merge_files_into_table(bigint,integer,text[],text[]) void | function worker_range_partition_table(bigint,integer,text,text,oid,anyarray) void | function worker_repartition_cleanup(bigint) void | + schema columnar | sequence columnar.storageid_seq | table columnar.chunk | table columnar.chunk_group | table columnar.options | table columnar.stripe | - | function columnar.get_storage_id(regclass) bigint - | function columnar_internal.columnar_handler(internal) table_am_handler - | schema columnar_internal - | sequence columnar_internal.storageid_seq - | table columnar_internal.chunk - | table columnar_internal.chunk_group - | table columnar_internal.options - | table columnar_internal.stripe - | view columnar.chunk - | view columnar.chunk_group - | view columnar.options - | view columnar.storage - | view columnar.stripe -(27 rows) +(21 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version @@ -1137,6 +1132,7 @@ ORDER BY 1, 2; RESET citus.enable_version_checks; RESET columnar.enable_version_checks; DROP EXTENSION citus; +DROP EXTENSION citus_columnar; CREATE EXTENSION citus VERSION '8.0-1'; ERROR: specified version incompatible with loaded Citus library DETAIL: Loaded library requires 11.1, but 8.0-1 was specified. @@ -1213,11 +1209,13 @@ NOTICE: version "9.1-1" of extension "citus" is already installed ALTER EXTENSION citus UPDATE; -- re-create in newest version DROP EXTENSION citus; +DROP EXTENSION citus_columnar; \c CREATE EXTENSION citus; -- test cache invalidation in workers \c - - - :worker_1_port DROP EXTENSION citus; +DROP EXTENSION citus_columnar; SET citus.enable_version_checks TO 'false'; SET columnar.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '8.0-1'; diff --git a/src/test/regress/expected/multi_fix_partition_shard_index_names.out b/src/test/regress/expected/multi_fix_partition_shard_index_names.out index 090ebdd4e..fa1b2c1ff 100644 --- a/src/test/regress/expected/multi_fix_partition_shard_index_names.out +++ b/src/test/regress/expected/multi_fix_partition_shard_index_names.out @@ -646,14 +646,34 @@ CREATE TABLE p2(dist_col int NOT NULL, another_col int, partition_col timestamp ALTER TABLE parent_table ATTACH PARTITION p2 FOR VALUES FROM ('2019-01-01') TO ('2020-01-01'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.1-1"; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing CREATE EXTENSION IF NOT EXISTS citus_columnar WITH SCHEMA pg_catalog VERSION "11.1-1"; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing COMMIT +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('extension', ARRAY['citus_columnar']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx +NOTICE: issuing WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('extension', ARRAY['citus_columnar']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data; +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (915002, 'fix_idx_names', 'CREATE TABLE fix_idx_names.p2 (dist_col integer NOT NULL, another_col integer, partition_col timestamp without time zone NOT NULL, name text) USING columnar') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing ALTER TABLE fix_idx_names.p2_915002 SET (columnar.chunk_group_row_limit = 10000, columnar.stripe_row_limit = 150000, columnar.compression_level = 3, columnar.compression = 'zstd'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SELECT worker_apply_shard_ddl_command (915002, 'fix_idx_names', 'ALTER TABLE fix_idx_names.p2 OWNER TO postgres') DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); -DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing SET citus.enable_ddl_propagation TO 'off' diff --git a/src/test/regress/expected/upgrade_columnar_metapage_after.out b/src/test/regress/expected/upgrade_columnar_metapage_after.out index 7268167dd..87e2424cb 100644 --- a/src/test/regress/expected/upgrade_columnar_metapage_after.out +++ b/src/test/regress/expected/upgrade_columnar_metapage_after.out @@ -94,7 +94,7 @@ SELECT version_major, version_minor, reserved_stripe_id, reserved_row_number (1 row) -- table is already upgraded, make sure that upgrade_columnar_metapage is no-op -SELECT citus_internal.upgrade_columnar_storage(c.oid) +SELECT columnar_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2'; upgrade_columnar_storage diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index e361a7040..e7e601f3f 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -16,10 +16,7 @@ WHERE refclassid = 'pg_catalog.pg_extension'::pg_catalog.regclass ORDER BY 1; description --------------------------------------------------------------------- - access method columnar event trigger citus_cascade_to_partition - function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) - function alter_columnar_table_set(regclass,integer,integer,name,integer) function alter_distributed_table(regclass,text,integer,text,boolean) function alter_old_partitions_set_access_method(regclass,timestamp with time zone,name) function alter_role_if_exists(text,text) @@ -63,8 +60,6 @@ ORDER BY 1; function citus_finish_citus_upgrade() function citus_finish_pg_upgrade() function citus_get_active_worker_nodes() - function citus_internal.columnar_ensure_am_depends_catalog() - function citus_internal.downgrade_columnar_storage(regclass) function citus_internal.find_groupid_for_node(text,integer) function citus_internal.pg_dist_node_trigger_func() function citus_internal.pg_dist_rebalance_strategy_trigger_func() @@ -72,7 +67,6 @@ ORDER BY 1; function citus_internal.refresh_isolation_tester_prepared_statement() function citus_internal.replace_isolation_tester_func() function citus_internal.restore_isolation_tester_func() - function citus_internal.upgrade_columnar_storage(regclass) function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") @@ -129,8 +123,6 @@ ORDER BY 1; function citus_version() function column_name_to_column(regclass,text) function column_to_column_name(regclass,text) - function columnar.get_storage_id(regclass) - function columnar_internal.columnar_handler(internal) function coord_combine_agg(oid,cstring,anyelement) function coord_combine_agg_ffunc(internal,oid,cstring,anyelement) function coord_combine_agg_sfunc(internal,oid,cstring,anyelement) @@ -243,18 +235,11 @@ ORDER BY 1; function worker_save_query_explain_analyze(text,jsonb) schema citus schema citus_internal - schema columnar - schema columnar_internal - sequence columnar_internal.storageid_seq sequence pg_dist_colocationid_seq sequence pg_dist_groupid_seq sequence pg_dist_node_nodeid_seq sequence pg_dist_placement_placementid_seq sequence pg_dist_shardid_seq - table columnar_internal.chunk - table columnar_internal.chunk_group - table columnar_internal.options - table columnar_internal.stripe table pg_dist_authinfo table pg_dist_colocation table pg_dist_local_group @@ -279,12 +264,7 @@ ORDER BY 1; view citus_shards_on_worker view citus_stat_activity view citus_stat_statements - view columnar.chunk - view columnar.chunk_group - view columnar.options - view columnar.storage - view columnar.stripe view pg_dist_shard_placement view time_partitions -(270 rows) +(250 rows) diff --git a/src/test/regress/sql/columnar_drop.sql b/src/test/regress/sql/columnar_drop.sql index 23a2826f2..0a2fe7ec6 100644 --- a/src/test/regress/sql/columnar_drop.sql +++ b/src/test/regress/sql/columnar_drop.sql @@ -37,15 +37,15 @@ SELECT current_database() datname \gset CREATE DATABASE db_to_drop; \c db_to_drop -CREATE EXTENSION citus; +CREATE EXTENSION citus_columnar; SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset CREATE TABLE test_table(data int) USING columnar; -DROP EXTENSION citus CASCADE; +DROP EXTENSION citus_columnar CASCADE; -- test database drop -CREATE EXTENSION citus; +CREATE EXTENSION citus_columnar; SELECT oid::text databaseoid FROM pg_database WHERE datname = current_database() \gset CREATE TABLE test_table(data int) USING columnar; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 0ae3aa9e0..b52c7cb62 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -98,6 +98,7 @@ ORDER BY 1, 2; -- DROP EXTENSION pre-created by the regression suite DROP EXTENSION citus; +DROP EXTENSION citus_columnar; \c -- these tests switch between citus versions and call ddl's that require pg_dist_object to be created @@ -509,6 +510,7 @@ ORDER BY 1, 2; RESET citus.enable_version_checks; RESET columnar.enable_version_checks; DROP EXTENSION citus; +DROP EXTENSION citus_columnar; CREATE EXTENSION citus VERSION '8.0-1'; -- Test non-distributed queries work even in version mismatch @@ -573,6 +575,7 @@ ALTER EXTENSION citus UPDATE; -- re-create in newest version DROP EXTENSION citus; +DROP EXTENSION citus_columnar; \c CREATE EXTENSION citus; @@ -580,6 +583,7 @@ CREATE EXTENSION citus; \c - - - :worker_1_port DROP EXTENSION citus; +DROP EXTENSION citus_columnar; SET citus.enable_version_checks TO 'false'; SET columnar.enable_version_checks TO 'false'; CREATE EXTENSION citus VERSION '8.0-1'; diff --git a/src/test/regress/sql/upgrade_columnar_metapage_after.sql b/src/test/regress/sql/upgrade_columnar_metapage_after.sql index e42c0c8da..d015d0b0d 100644 --- a/src/test/regress/sql/upgrade_columnar_metapage_after.sql +++ b/src/test/regress/sql/upgrade_columnar_metapage_after.sql @@ -63,7 +63,7 @@ SELECT version_major, version_minor, reserved_stripe_id, reserved_row_number FROM columnar_storage_info('no_data_columnar_table'); -- table is already upgraded, make sure that upgrade_columnar_metapage is no-op -SELECT citus_internal.upgrade_columnar_storage(c.oid) +SELECT columnar_internal.upgrade_columnar_storage(c.oid) FROM pg_class c, pg_am a WHERE c.relam = a.oid AND amname = 'columnar' and relname = 'columnar_table_2'; From ae58ca57830293c9cc7ab3fdb7b73046791ba408 Mon Sep 17 00:00:00 2001 From: Hanefi Onaldi Date: Thu, 14 Jul 2022 19:16:53 +0300 Subject: [PATCH 07/12] Replace isolation tester func only once on enterprise tests (#6064) This is a continuation of a refactor (with commit sha 2b7cf0c09771a49a47f4438dfe71b07898723e04) that aimed to use Citus helper UDFs by default in iso tests. PostgreSQL isolation test infrastructure uses some UDFs to detect whether concurrent sessions block each other. Citus implements alternatives to that UDF so that we are able to detect and report distributed transactions that get blocked on the worker nodes as well. We needed to explicitly replace PG helper functions with Citus implementations in each isolation file. Now we replace them by default. --- ...enterprise_isolation_logicalrep_1_schedule | 2 ++ ...enterprise_isolation_logicalrep_3_schedule | 2 ++ .../regress/enterprise_isolation_schedule | 2 ++ ...lation_ref2ref_foreign_keys_enterprise.out | 35 ------------------- ...ical_replication_multi_shard_commands.spec | 5 --- ...cal_replication_single_shard_commands.spec | 5 --- ...ation_ref2ref_foreign_keys_enterprise.spec | 3 -- 7 files changed, 6 insertions(+), 48 deletions(-) diff --git a/src/test/regress/enterprise_isolation_logicalrep_1_schedule b/src/test/regress/enterprise_isolation_logicalrep_1_schedule index 7edae72d9..8a6f35af5 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_1_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_1_schedule @@ -1,3 +1,5 @@ +test: isolation_setup + # tests that change node metadata should precede # isolation_cluster_management such that tests # that come later can be parallelized diff --git a/src/test/regress/enterprise_isolation_logicalrep_3_schedule b/src/test/regress/enterprise_isolation_logicalrep_3_schedule index 0292e4763..1d38764bb 100644 --- a/src/test/regress/enterprise_isolation_logicalrep_3_schedule +++ b/src/test/regress/enterprise_isolation_logicalrep_3_schedule @@ -1,3 +1,5 @@ +test: isolation_setup + # tests that change node metadata should precede # isolation_cluster_management such that tests # that come later can be parallelized diff --git a/src/test/regress/enterprise_isolation_schedule b/src/test/regress/enterprise_isolation_schedule index ef64eff92..19eddce72 100644 --- a/src/test/regress/enterprise_isolation_schedule +++ b/src/test/regress/enterprise_isolation_schedule @@ -1,3 +1,5 @@ +test: isolation_setup + # tests that change node metadata should precede # isolation_cluster_management such that tests # that come later can be parallelized diff --git a/src/test/regress/expected/isolation_ref2ref_foreign_keys_enterprise.out b/src/test/regress/expected/isolation_ref2ref_foreign_keys_enterprise.out index 120cef697..fc9685832 100644 --- a/src/test/regress/expected/isolation_ref2ref_foreign_keys_enterprise.out +++ b/src/test/regress/expected/isolation_ref2ref_foreign_keys_enterprise.out @@ -63,11 +63,6 @@ id|value 11| 11 (5 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s1-insert-table-1 s3-acquire-advisory-lock s2-begin s2-move-shards s1-update-table-1 s3-release-advisory-lock s2-commit s1-select-table-1 s1-select-dist-table step s1-insert-table-1: @@ -134,11 +129,6 @@ id|value 11| 11 (6 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s3-acquire-advisory-lock s2-begin s2-move-shards s1-insert-table-1 s3-release-advisory-lock s2-commit s1-select-table-1 s1-select-dist-table step s3-acquire-advisory-lock: @@ -202,11 +192,6 @@ id|value 11| 11 (6 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s1-insert-table-1 s3-acquire-advisory-lock s2-begin s2-move-shards s1-select-table-1 s3-release-advisory-lock s2-commit step s1-insert-table-1: @@ -257,11 +242,6 @@ master_move_shard_placement step s2-commit: COMMIT; -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s1-insert-table-1 s2-begin s1-begin s1-delete-table-1 s2-move-shards s1-commit s2-commit s1-select-table-1 s1-select-dist-table step s1-insert-table-1: @@ -316,11 +296,6 @@ id|value 11| 11 (5 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s1-insert-table-1 s2-begin s1-begin s1-update-table-1 s2-move-shards s1-commit s2-commit s1-select-table-1 s1-select-dist-table step s1-insert-table-1: @@ -377,11 +352,6 @@ id|value 11| 11 (6 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - starting permutation: s2-begin s1-begin s1-insert-table-1 s2-move-shards s1-commit s2-commit s1-select-table-1 s1-select-dist-table step s2-begin: @@ -435,8 +405,3 @@ id|value 11| 11 (6 rows) -restore_isolation_tester_func ---------------------------------------------------------------------- - -(1 row) - diff --git a/src/test/regress/spec/isolation_logical_replication_multi_shard_commands.spec b/src/test/regress/spec/isolation_logical_replication_multi_shard_commands.spec index 82fc0be72..fa133031e 100644 --- a/src/test/regress/spec/isolation_logical_replication_multi_shard_commands.spec +++ b/src/test/regress/spec/isolation_logical_replication_multi_shard_commands.spec @@ -3,9 +3,6 @@ setup { - SELECT citus_internal.replace_isolation_tester_func(); - SELECT citus_internal.refresh_isolation_tester_prepared_statement(); - SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); @@ -17,8 +14,6 @@ setup teardown { - SELECT citus_internal.restore_isolation_tester_func(); - DROP TABLE selected_shard; DROP TABLE logical_replicate_placement; } diff --git a/src/test/regress/spec/isolation_logical_replication_single_shard_commands.spec b/src/test/regress/spec/isolation_logical_replication_single_shard_commands.spec index 2bca7b9ad..117e8cf43 100644 --- a/src/test/regress/spec/isolation_logical_replication_single_shard_commands.spec +++ b/src/test/regress/spec/isolation_logical_replication_single_shard_commands.spec @@ -2,9 +2,6 @@ // so setting the corresponding shard here is useful setup { - SELECT citus_internal.replace_isolation_tester_func(); - SELECT citus_internal.refresh_isolation_tester_prepared_statement(); - SET citus.shard_count TO 8; SET citus.shard_replication_factor TO 1; CREATE TABLE logical_replicate_placement (x int PRIMARY KEY, y int); @@ -15,8 +12,6 @@ setup teardown { - SELECT citus_internal.restore_isolation_tester_func(); - DROP TABLE selected_shard; DROP TABLE logical_replicate_placement; } diff --git a/src/test/regress/spec/isolation_ref2ref_foreign_keys_enterprise.spec b/src/test/regress/spec/isolation_ref2ref_foreign_keys_enterprise.spec index e283e25fd..c4c8fc6a6 100644 --- a/src/test/regress/spec/isolation_ref2ref_foreign_keys_enterprise.spec +++ b/src/test/regress/spec/isolation_ref2ref_foreign_keys_enterprise.spec @@ -2,8 +2,6 @@ setup { SET citus.shard_count TO 2; SET citus.shard_replication_factor TO 1; - SELECT citus_internal.replace_isolation_tester_func(); - SELECT citus_internal.refresh_isolation_tester_prepared_statement(); CREATE TABLE ref_table_1(id int PRIMARY KEY, value int); SELECT create_reference_table('ref_table_1'); @@ -24,7 +22,6 @@ setup teardown { DROP TABLE ref_table_1, ref_table_2, dist_table, selected_shard_for_dist_table; - SELECT citus_internal.restore_isolation_tester_func(); } session "s1" From 07e69fc587075debbc5cfe1f270bad51cf278bba Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 14 Jul 2022 11:03:59 -0700 Subject: [PATCH 08/12] Fix test --- src/test/regress/expected/multi_extension.out | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 33cab2776..598f32bd5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1103,7 +1103,18 @@ SELECT * FROM multi_extension.print_extension_changes(); table columnar.chunk_group | table columnar.options | table columnar.stripe | -(21 rows) + | function citus_split_shard_by_split_points(bigint,text[],integer[],citus.shard_transfer_mode) void + | function worker_split_copy(bigint,split_copy_info[]) void + | type split_copy_info +(24 rows) + +-- Snapshot of state at 11.1-1 +ALTER EXTENSION citus UPDATE TO '11.1-1'; +NOTICE: version "11.1-1" of extension "citus" is already installed +SELECT * FROM multi_extension.print_extension_changes(); + previous_object | current_object +--------------------------------------------------------------------- +(0 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version From 151b549a674c50ab4678675026463c7d29f81737 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 14 Jul 2022 11:19:33 -0700 Subject: [PATCH 09/12] attempt to fix test --- src/test/regress/expected/multi_extension.out | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 598f32bd5..f734c6320 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -1071,16 +1071,11 @@ SELECT * FROM multi_extension.print_extension_changes(); -- Test downgrade to 11.0-3 from 11.1-1 ALTER EXTENSION citus UPDATE TO '11.1-1'; ALTER EXTENSION citus UPDATE TO '11.0-3'; +ERROR: "options" is not a view +HINT: Use DROP TABLE to remove a table. -- Should be empty result since upgrade+downgrade should be a no-op SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object ---------------------------------------------------------------------- -(0 rows) - --- Snapshot of state at 11.1-1 -ALTER EXTENSION citus UPDATE TO '11.1-1'; -SELECT * FROM multi_extension.print_extension_changes(); - previous_object | current_object + previous_object | current_object --------------------------------------------------------------------- access method columnar | function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean) void | From b1405d5eaf60880a636db37d551149b7e46286d7 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Thu, 14 Jul 2022 13:21:19 -0700 Subject: [PATCH 10/12] Addressing Marco's PR comment --- .../distributed/operations/shard_split.c | 9 +- ...us_split_shard_by_split_points_failure.out | 104 ++++++++++++++++++ src/test/regress/split_schedule | 1 + ...us_split_shard_by_split_points_failure.sql | 80 ++++++++++++++ 4 files changed, 188 insertions(+), 6 deletions(-) create mode 100644 src/test/regress/expected/citus_split_shard_by_split_points_failure.out create mode 100644 src/test/regress/sql/citus_split_shard_by_split_points_failure.sql diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index 9c0b6a26d..f5c015e62 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -72,8 +72,7 @@ static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static Task * CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, - WorkerNode *workerNode); +static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); /* Customize error message strings based on operation type */ static const char *const SplitOperationName[] = @@ -517,10 +516,9 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Create a DDL task with corresponding task list on given worker node */ static Task * -CreateTaskForDDLCommandList(uint64 jobId, List *ddlCommandList, WorkerNode *workerNode) +CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode) { Task *ddlTask = CitusMakeNode(Task); - ddlTask->jobId = jobId; ddlTask->taskType = DDL_TASK; ddlTask->replicationModel = REPLICATION_MODEL_INVALID; SetTaskQueryStringList(ddlTask, ddlCommandList); @@ -572,8 +570,7 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, */ if (ddlCommandList != NULL) { - uint64 jobId = shardInterval->shardId; - Task *ddlTask = CreateTaskForDDLCommandList(jobId, ddlCommandList, + Task *ddlTask = CreateTaskForDDLCommandList(ddlCommandList, workerPlacementNode); ddlTaskExecList = lappend(ddlTaskExecList, ddlTask); diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out new file mode 100644 index 000000000..e6c3e8e66 --- /dev/null +++ b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out @@ -0,0 +1,104 @@ +CREATE SCHEMA "citus_split_failure_test_schema"; +SET search_path TO "citus_split_failure_test_schema"; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 890000; +SET citus.shard_replication_factor TO 1; +-- BEGIN: Create table to split +CREATE TABLE sensors( + measureid integer, + eventdatetime date); +CREATE TABLE sensors_colocated( + measureid integer, + eventdatetime2 date); +SELECT create_distributed_table('sensors', 'measureid'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:='sensors'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- END: Create table to split +-- BEGIN : Switch to worker and create split shards already so workflow fails. +\c - - - :worker_1_port +SET search_path TO "citus_split_failure_test_schema"; +CREATE TABLE sensors_8981001( + measureid integer, + eventdatetime date); +CREATE TABLE sensors_8981002( + measureid integer, + eventdatetime date); +CREATE TABLE sensors_colocated_8981003( + measureid integer, + eventdatetime date); +CREATE TABLE sensors_colocated_8981004( + measureid integer, + eventdatetime date); +-- A random table which should not be deleted. +CREATE TABLE sensors_nodelete( + measureid integer, + eventdatetime date); +-- List tables in worker. +SET search_path TO "citus_split_failure_test_schema"; +SET citus.show_shards_for_app_name_prefixes = '*'; +SELECT tbl.relname + FROM pg_catalog.pg_class tbl + WHERE tbl.relname like 'sensors%' + ORDER BY 1; + relname +--------------------------------------------------------------------- + sensors + sensors_890000 + sensors_8981001 + sensors_8981002 + sensors_colocated + sensors_colocated_890001 + sensors_colocated_8981003 + sensors_colocated_8981004 + sensors_nodelete +(9 rows) + +-- END : Switch to worker and create split shards already so workflow fails. +-- BEGIN : Set node id variables +\c - postgres - :master_port +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- END : Set node id variables +-- BEGIN : Split Shard, which is expected to fail. +SET citus.next_shard_id TO 8981001; +SELECT pg_catalog.citus_split_shard_by_split_points( + 890000, + ARRAY['-1073741824'], + ARRAY[:worker_1_node, :worker_1_node], + 'block_writes'); +ERROR: relation "sensors_8981001" already exists +CONTEXT: while executing command on localhost:xxxxx +-- BEGIN : Split Shard, which is expected to fail. +-- BEGIN : Ensure tables were cleaned from worker +\c - - - :worker_1_port +SET search_path TO "citus_split_failure_test_schema"; +SET citus.show_shards_for_app_name_prefixes = '*'; +SELECT tbl.relname + FROM pg_catalog.pg_class tbl + WHERE tbl.relname like 'sensors%' + ORDER BY 1; + relname +--------------------------------------------------------------------- + sensors + sensors_890000 + sensors_colocated + sensors_colocated_890001 + sensors_nodelete +(5 rows) + +-- END : Ensure tables were cleaned from worker +--BEGIN : Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_split_failure_test_schema" CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table citus_split_failure_test_schema.sensors +drop cascades to table citus_split_failure_test_schema.sensors_colocated +--END : Cleanup diff --git a/src/test/regress/split_schedule b/src/test/regress/split_schedule index 526e9bd2f..18601a1ab 100644 --- a/src/test/regress/split_schedule +++ b/src/test/regress/split_schedule @@ -12,3 +12,4 @@ test: worker_split_binary_copy_test test: worker_split_text_copy_test test: citus_split_shard_by_split_points_negative test: citus_split_shard_by_split_points +test: citus_split_shard_by_split_points_failure diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql new file mode 100644 index 000000000..12d79b74b --- /dev/null +++ b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql @@ -0,0 +1,80 @@ +CREATE SCHEMA "citus_split_failure_test_schema"; + +SET search_path TO "citus_split_failure_test_schema"; +SET citus.shard_count TO 1; +SET citus.next_shard_id TO 890000; +SET citus.shard_replication_factor TO 1; + +-- BEGIN: Create table to split +CREATE TABLE sensors( + measureid integer, + eventdatetime date); + +CREATE TABLE sensors_colocated( + measureid integer, + eventdatetime2 date); + +SELECT create_distributed_table('sensors', 'measureid'); +SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with:='sensors'); +-- END: Create table to split + +-- BEGIN : Switch to worker and create split shards already so workflow fails. +\c - - - :worker_1_port +SET search_path TO "citus_split_failure_test_schema"; +CREATE TABLE sensors_8981001( + measureid integer, + eventdatetime date); + +CREATE TABLE sensors_8981002( + measureid integer, + eventdatetime date); + +CREATE TABLE sensors_colocated_8981003( + measureid integer, + eventdatetime date); + +CREATE TABLE sensors_colocated_8981004( + measureid integer, + eventdatetime date); + +-- A random table which should not be deleted. +CREATE TABLE sensors_nodelete( + measureid integer, + eventdatetime date); +-- List tables in worker. +SET search_path TO "citus_split_failure_test_schema"; +SET citus.show_shards_for_app_name_prefixes = '*'; +SELECT tbl.relname + FROM pg_catalog.pg_class tbl + WHERE tbl.relname like 'sensors%' + ORDER BY 1; +-- END : Switch to worker and create split shards already so workflow fails. + +-- BEGIN : Set node id variables +\c - postgres - :master_port +SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset +-- END : Set node id variables + +-- BEGIN : Split Shard, which is expected to fail. +SET citus.next_shard_id TO 8981001; +SELECT pg_catalog.citus_split_shard_by_split_points( + 890000, + ARRAY['-1073741824'], + ARRAY[:worker_1_node, :worker_1_node], + 'block_writes'); +-- BEGIN : Split Shard, which is expected to fail. + +-- BEGIN : Ensure tables were cleaned from worker +\c - - - :worker_1_port +SET search_path TO "citus_split_failure_test_schema"; +SET citus.show_shards_for_app_name_prefixes = '*'; +SELECT tbl.relname + FROM pg_catalog.pg_class tbl + WHERE tbl.relname like 'sensors%' + ORDER BY 1; +-- END : Ensure tables were cleaned from worker + +--BEGIN : Cleanup +\c - postgres - :master_port +DROP SCHEMA "citus_split_failure_test_schema" CASCADE; +--END : Cleanup From 3eaef027e2233d3728a0f57e7d63ba39f214e422 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 15 Jul 2022 10:28:46 +0200 Subject: [PATCH 11/12] Remove unused code Probably left over from removing old repartitioning code --- src/backend/distributed/utils/resource_lock.c | 31 ------------------- src/include/distributed/resource_lock.h | 12 ------- 2 files changed, 43 deletions(-) diff --git a/src/backend/distributed/utils/resource_lock.c b/src/backend/distributed/utils/resource_lock.c index b5f294336..35991ade1 100644 --- a/src/backend/distributed/utils/resource_lock.c +++ b/src/backend/distributed/utils/resource_lock.c @@ -664,37 +664,6 @@ LockTransactionRecovery(LOCKMODE lockmode) } -/* - * LockJobResource acquires a lock for creating resources associated with the - * given jobId. This resource is typically a job schema (namespace), and less - * commonly a partition task directory. - */ -void -LockJobResource(uint64 jobId, LOCKMODE lockmode) -{ - LOCKTAG tag; - const bool sessionLock = false; - const bool dontWait = false; - - SET_LOCKTAG_JOB_RESOURCE(tag, MyDatabaseId, jobId); - - (void) LockAcquire(&tag, lockmode, sessionLock, dontWait); -} - - -/* Releases the lock for resources associated with the given job id. */ -void -UnlockJobResource(uint64 jobId, LOCKMODE lockmode) -{ - LOCKTAG tag; - const bool sessionLock = false; - - SET_LOCKTAG_JOB_RESOURCE(tag, MyDatabaseId, jobId); - - LockRelease(&tag, lockmode, sessionLock); -} - - /* * LockShardListMetadata takes shared locks on the metadata of all shards in * shardIntervalList to prevents concurrent placement changes. diff --git a/src/include/distributed/resource_lock.h b/src/include/distributed/resource_lock.h index 9af280b69..caaeea8a7 100644 --- a/src/include/distributed/resource_lock.h +++ b/src/include/distributed/resource_lock.h @@ -74,14 +74,6 @@ typedef enum CitusOperations (uint32) (shardid), \ ADV_LOCKTAG_CLASS_CITUS_SHARD) -/* reuse advisory lock, but with different, unused field 4 (6) */ -#define SET_LOCKTAG_JOB_RESOURCE(tag, db, jobid) \ - SET_LOCKTAG_ADVISORY(tag, \ - db, \ - (uint32) ((jobid) >> 32), \ - (uint32) (jobid), \ - ADV_LOCKTAG_CLASS_CITUS_JOB) - /* reuse advisory lock, but with different, unused field 4 (7) * Also it has the database hardcoded to MyDatabaseId, to ensure the locks * are local to each database */ @@ -157,10 +149,6 @@ extern void LockReferencedReferenceShardDistributionMetadata(uint64 shardId, /* Lock shard data, for DML commands or remote fetches */ extern void LockShardResource(uint64 shardId, LOCKMODE lockmode); -/* Lock a job schema or partition task directory */ -extern void LockJobResource(uint64 jobId, LOCKMODE lockmode); -extern void UnlockJobResource(uint64 jobId, LOCKMODE lockmode); - /* Lock a co-location group */ extern void LockColocationId(int colocationId, LOCKMODE lockMode); extern void UnlockColocationId(int colocationId, LOCKMODE lockMode); From 1c16060bd6f147a82c9787394654e5b384dbbcf0 Mon Sep 17 00:00:00 2001 From: Nitish Upreti Date: Mon, 18 Jul 2022 00:47:42 -0700 Subject: [PATCH 12/12] Only clean shards created by workflow --- .../distributed/operations/shard_split.c | 183 +++++++++++++----- ...us_split_shard_by_split_points_failure.out | 17 +- ...us_split_shard_by_split_points_failure.sql | 6 +- 3 files changed, 145 insertions(+), 61 deletions(-) diff --git a/src/backend/distributed/operations/shard_split.c b/src/backend/distributed/operations/shard_split.c index f5c015e62..93231797d 100644 --- a/src/backend/distributed/operations/shard_split.c +++ b/src/backend/distributed/operations/shard_split.c @@ -10,6 +10,7 @@ */ #include "postgres.h" +#include "common/hashfn.h" #include "nodes/pg_list.h" #include "utils/array.h" #include "distributed/utils/array_type.h" @@ -35,16 +36,29 @@ #include "distributed/multi_physical_planner.h" #include "distributed/deparse_shard_query.h" +/* + * Entry for map that tracks ShardInterval -> Placement Node + * created by split workflow. + */ +typedef struct ShardCreatedByWorkflowEntry +{ + ShardInterval *shardIntervalKey; + WorkerNode *workerNodeValue; +} ShardCreatedByWorkflowEntry; + /* Function declarations */ static void ErrorIfCannotSplitShardExtended(SplitOperation splitOperation, ShardInterval *shardIntervalToSplit, List *shardSplitPointsList, List *nodeIdsForPlacementList); -static void CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, - List *sourceColocatedShardIntervalList, - List *shardGroupSplitIntervalListList, - List *workersForPlacementList); -static void CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, +static void CreateAndCopySplitShardsForShardGroup( + HTAB *mapOfShardToPlacementCreatedByWorkflow, + WorkerNode *sourceShardNode, + List *sourceColocatedShardIntervalList, + List *shardGroupSplitIntervalListList, + List *workersForPlacementList); +static void CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + List *shardGroupSplitIntervalListList, List *workersForPlacementList); static void CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, List *workersForPlacementList); @@ -70,8 +84,8 @@ static void InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListLi List *workersForPlacementList); static void CreateForeignKeyConstraints(List *shardGroupSplitIntervalListList, List *workersForPlacementList); -static void TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, - List *workersForPlacementList); +static void TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow); +static HTAB * CreateEmptyMapForShardsCreatedByWorkflow(); static Task * CreateTaskForDDLCommandList(List *ddlCommandList, WorkerNode *workerNode); /* Customize error message strings based on operation type */ @@ -399,6 +413,70 @@ SplitShard(SplitMode splitMode, } +/* + * ShardIntervalHashCode computes the hash code for a shard from the + * placement's shard id. + */ +static uint32 +ShardIntervalHashCode(const void *key, Size keySize) +{ + const ShardInterval *shardInterval = (const ShardInterval *) key; + const uint64 *shardId = &(shardInterval->shardId); + + /* standard hash function outlined in Effective Java, Item 8 */ + uint32 result = 17; + result = 37 * result + tag_hash(shardId, sizeof(uint64)); + + return result; +} + + +/* + * ShardIntervalHashCompare compares two shard intervals using shard id. + */ +static int +ShardIntervalHashCompare(const void *lhsKey, const void *rhsKey, Size keySize) +{ + const ShardInterval *intervalLhs = (const ShardInterval *) lhsKey; + const ShardInterval *intervalRhs = (const ShardInterval *) rhsKey; + + int shardIdCompare = 0; + + /* first, compare by shard id */ + if (intervalLhs->shardId < intervalRhs->shardId) + { + shardIdCompare = -1; + } + else if (intervalLhs->shardId > intervalRhs->shardId) + { + shardIdCompare = 1; + } + + return shardIdCompare; +} + + +/* Create an empty map that tracks ShardInterval -> Placement Node as created by workflow */ +static HTAB * +CreateEmptyMapForShardsCreatedByWorkflow() +{ + HASHCTL info = { 0 }; + info.keysize = sizeof(ShardInterval); + info.entrysize = sizeof(ShardCreatedByWorkflowEntry); + info.hash = ShardIntervalHashCode; + info.match = ShardIntervalHashCompare; + info.hcxt = CurrentMemoryContext; + + /* we don't have value field as it's a set */ + info.entrysize = info.keysize; + uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT); + + HTAB *splitChildrenCreatedByWorkflow = hash_create("Shard id to Node Placement Map", + 32, &info, hashFlags); + return splitChildrenCreatedByWorkflow; +} + + /* * SplitShard API to split a given shard (or shard group) in blocking fashion * based on specified split points to a set of destination nodes. @@ -431,6 +509,9 @@ BlockingShardSplit(SplitOperation splitOperation, WorkerNode *sourceShardToCopyNode = FindNodeWithNodeId(sourceShardPlacement->nodeId, false /* missingOk */); + + HTAB *mapOfShardToPlacementCreatedByWorkflow = + CreateEmptyMapForShardsCreatedByWorkflow(); PG_TRY(); { /* @@ -439,6 +520,7 @@ BlockingShardSplit(SplitOperation splitOperation, * Foreign key constraints are created after Metadata changes (see CreateForeignKeyConstraints). */ CreateAndCopySplitShardsForShardGroup( + mapOfShardToPlacementCreatedByWorkflow, sourceShardToCopyNode, sourceColocatedShardIntervalList, shardGroupSplitIntervalListList, @@ -466,8 +548,7 @@ BlockingShardSplit(SplitOperation splitOperation, PG_CATCH(); { /* Do a best effort cleanup of shards created on workers in the above block */ - TryDropSplitShardsOnFailure(shardGroupSplitIntervalListList, - workersForPlacementList); + TryDropSplitShardsOnFailure(mapOfShardToPlacementCreatedByWorkflow); PG_RE_THROW(); } @@ -479,7 +560,8 @@ BlockingShardSplit(SplitOperation splitOperation, /* Create ShardGroup split children on a list of corresponding workers. */ static void -CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, +CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + List *shardGroupSplitIntervalListList, List *workersForPlacementList) { /* @@ -509,6 +591,14 @@ CreateSplitShardsForShardGroup(List *shardGroupSplitIntervalListList, /* Create new split child shard on the specified placement list */ CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); + + ShardCreatedByWorkflowEntry entry; + entry.shardIntervalKey = shardInterval; + entry.workerNodeValue = workerPlacementNode; + bool found = false; + hash_search(mapOfShardToPlacementCreatedByWorkflow, &entry, HASH_ENTER, + &found); + Assert(!found); } } } @@ -591,12 +681,14 @@ CreateAuxiliaryStructuresForShardGroup(List *shardGroupSplitIntervalListList, * on a list of corresponding workers. */ static void -CreateAndCopySplitShardsForShardGroup(WorkerNode *sourceShardNode, +CreateAndCopySplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow, + WorkerNode *sourceShardNode, List *sourceColocatedShardIntervalList, List *shardGroupSplitIntervalListList, List *workersForPlacementList) { - CreateSplitShardsForShardGroup(shardGroupSplitIntervalListList, + CreateSplitShardsForShardGroup(mapOfShardToPlacementCreatedByWorkflow, + shardGroupSplitIntervalListList, workersForPlacementList); DoSplitCopy(sourceShardNode, sourceColocatedShardIntervalList, @@ -986,49 +1078,40 @@ DropShardList(List *shardIntervalList) * coordinator and mx nodes. */ static void -TryDropSplitShardsOnFailure(List *shardGroupSplitIntervalListList, - List *workersForPlacementList) +TryDropSplitShardsOnFailure(HTAB *mapOfShardToPlacementCreatedByWorkflow) { - List *shardIntervalList = NIL; + HASH_SEQ_STATUS status; + ShardCreatedByWorkflowEntry *entry; - /* - * Iterate over all the shards in the shard group. - */ - foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) + hash_seq_init(&status, mapOfShardToPlacementCreatedByWorkflow); + while ((entry = (ShardCreatedByWorkflowEntry *) hash_seq_search(&status)) != 0) { - ShardInterval *shardInterval = NULL; - WorkerNode *workerPlacementNode = NULL; + ShardInterval *shardInterval = entry->shardIntervalKey; + WorkerNode *workerPlacementNode = entry->workerNodeValue; + + char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); + StringInfo dropShardQuery = makeStringInfo(); + + /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ + appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, + qualifiedShardName); + + int connectionFlags = FOR_DDL; + connectionFlags |= OUTSIDE_TRANSACTION; + MultiConnection *connnection = GetNodeUserDatabaseConnection( + connectionFlags, + workerPlacementNode->workerName, + workerPlacementNode->workerPort, + CurrentUserName(), + NULL /* databaseName */); /* - * Iterate on split shards list for a given shard and perform drop. + * Perform a drop in best effort manner. + * The shard may or may not exist and the connection could have died. */ - forboth_ptr(shardInterval, shardIntervalList, workerPlacementNode, - workersForPlacementList) - { - char *qualifiedShardName = ConstructQualifiedShardName(shardInterval); - StringInfo dropShardQuery = makeStringInfo(); - - /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */ - appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND, - qualifiedShardName); - - int connectionFlags = FOR_DDL; - connectionFlags |= OUTSIDE_TRANSACTION; - MultiConnection *connnection = GetNodeUserDatabaseConnection( - connectionFlags, - workerPlacementNode->workerName, - workerPlacementNode->workerPort, - CurrentUserName(), - NULL /* databaseName */); - - /* - * Perform a drop in best effort manner. - * The shard may or may not exist and the connection could have died. - */ - ExecuteOptionalRemoteCommand( - connnection, - dropShardQuery->data, - NULL /* pgResult */); - } + ExecuteOptionalRemoteCommand( + connnection, + dropShardQuery->data, + NULL /* pgResult */); } } diff --git a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out index e6c3e8e66..4ea61e03c 100644 --- a/src/test/regress/expected/citus_split_shard_by_split_points_failure.out +++ b/src/test/regress/expected/citus_split_shard_by_split_points_failure.out @@ -26,9 +26,8 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with: -- BEGIN : Switch to worker and create split shards already so workflow fails. \c - - - :worker_1_port SET search_path TO "citus_split_failure_test_schema"; -CREATE TABLE sensors_8981001( - measureid integer, - eventdatetime date); +-- Don't create sensors_8981001, workflow will create and clean it. +-- Create rest of the shards so that the workflow fails, but will not clean them. CREATE TABLE sensors_8981002( measureid integer, eventdatetime date); @@ -53,14 +52,13 @@ SELECT tbl.relname --------------------------------------------------------------------- sensors sensors_890000 - sensors_8981001 sensors_8981002 sensors_colocated sensors_colocated_890001 sensors_colocated_8981003 sensors_colocated_8981004 sensors_nodelete -(9 rows) +(8 rows) -- END : Switch to worker and create split shards already so workflow fails. -- BEGIN : Set node id variables @@ -74,7 +72,7 @@ SELECT pg_catalog.citus_split_shard_by_split_points( ARRAY['-1073741824'], ARRAY[:worker_1_node, :worker_1_node], 'block_writes'); -ERROR: relation "sensors_8981001" already exists +ERROR: relation "sensors_8981002" already exists CONTEXT: while executing command on localhost:xxxxx -- BEGIN : Split Shard, which is expected to fail. -- BEGIN : Ensure tables were cleaned from worker @@ -85,14 +83,17 @@ SELECT tbl.relname FROM pg_catalog.pg_class tbl WHERE tbl.relname like 'sensors%' ORDER BY 1; - relname + relname --------------------------------------------------------------------- sensors sensors_890000 + sensors_8981002 sensors_colocated sensors_colocated_890001 + sensors_colocated_8981003 + sensors_colocated_8981004 sensors_nodelete -(5 rows) +(8 rows) -- END : Ensure tables were cleaned from worker --BEGIN : Cleanup diff --git a/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql index 12d79b74b..0eb5e8c04 100644 --- a/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql +++ b/src/test/regress/sql/citus_split_shard_by_split_points_failure.sql @@ -21,9 +21,9 @@ SELECT create_distributed_table('sensors_colocated', 'measureid', colocate_with: -- BEGIN : Switch to worker and create split shards already so workflow fails. \c - - - :worker_1_port SET search_path TO "citus_split_failure_test_schema"; -CREATE TABLE sensors_8981001( - measureid integer, - eventdatetime date); + +-- Don't create sensors_8981001, workflow will create and clean it. +-- Create rest of the shards so that the workflow fails, but will not clean them. CREATE TABLE sensors_8981002( measureid integer,