break sequence dependency during table creation (#6889)

We need to break sequence dependency for a table while creating the
table during non-transactional metadata sync to ensure idempotency of
the creation of the table.

**Problem:**
When we send `SELECT
pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text)
FROM pg_dist_partition` to workers during the non-transactional sync,
table might not be in `pg_dist_partition` at worker, and sequence
dependency is not broken at the worker.

**Solution:** 
We break sequence dependency via `SELECT
pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text)`
for each table while creating it at the workers. It is safe to send
since the udf is a no-op when there is no sequence dependency.

DESCRIPTION: Fixes a bug related to sequence idempotency at
non-transactional sync.

Fixes https://github.com/citusdata/citus/issues/6888.
pull/6887/head^2
aykut-bozkurt 2023-04-28 15:09:09 +03:00 committed by GitHub
parent 135aaf45ca
commit 8cb69cfd13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 522 additions and 37 deletions

View File

@ -1710,20 +1710,13 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands,
}
else if (ShouldSyncTableMetadata(sourceId))
{
char *qualifiedTableName = quote_qualified_identifier(schemaName, sourceName);
/*
* We are converting a citus local table to a distributed/reference table,
* so we should prevent dropping the sequence on the table. Otherwise, we'd
* lose track of the previous changes in the sequence.
*/
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);",
quote_literal_cstr(qualifiedTableName));
SendCommandToWorkersWithMetadata(command->data);
char *command = WorkerDropSequenceDependencyCommand(sourceId);
SendCommandToWorkersWithMetadata(command);
}
}

View File

@ -393,9 +393,17 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
tableDDLCommand));
}
/* we need to drop table, if exists, first to make table creation idempotent */
/*
* We need to drop table, if exists, first to make table creation
* idempotent. Before dropping the table, we should also break
* dependencies with sequences since `drop cascade table` would also
* drop depended sequences. This is safe as we still record dependency
* with the sequence during table creation.
*/
commandList = lcons(DropTableIfExistsCommand(relationId),
commandList);
commandList = lcons(WorkerDropSequenceDependencyCommand(relationId),
commandList);
}
return commandList;

View File

@ -686,7 +686,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
bool singleTransaction = true;
List *dropMetadataCommandList = DetachPartitionCommandList();
dropMetadataCommandList = lappend(dropMetadataCommandList,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND);
dropMetadataCommandList = lappend(dropMetadataCommandList,
WorkerDropAllShellTablesCommand(singleTransaction));
dropMetadataCommandList = list_concat(dropMetadataCommandList,
@ -4235,6 +4235,22 @@ WorkerDropAllShellTablesCommand(bool singleTransaction)
}
/*
* WorkerDropSequenceDependencyCommand returns command to drop sequence dependencies for
* given table.
*/
char *
WorkerDropSequenceDependencyCommand(Oid relationId)
{
char *qualifiedTableName = generate_qualified_relation_name(relationId);
StringInfo breakSequenceDepCommand = makeStringInfo();
appendStringInfo(breakSequenceDepCommand,
BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND,
quote_literal_cstr(qualifiedTableName));
return breakSequenceDepCommand->data;
}
/*
* PropagateNodeWideObjectsCommandList is called during node activation to
* propagate any object that should be propagated for every node. These are
@ -4352,8 +4368,8 @@ SendNodeWideObjectsSyncCommands(MetadataSyncContext *context)
void
SendShellTableDeletionCommands(MetadataSyncContext *context)
{
/* break all sequence deps for citus tables and remove all shell tables */
char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
/* break all sequence deps for citus tables */
char *breakSeqDepsCommand = BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand));
/* remove shell tables */

View File

@ -156,6 +156,7 @@ extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
extern void ActivateNodeList(MetadataSyncContext *context);
extern char * WorkerDropAllShellTablesCommand(bool singleTransaction);
extern char * WorkerDropSequenceDependencyCommand(Oid relationId);
extern void SyncDistributedObjects(MetadataSyncContext *context);
extern void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context);
@ -180,8 +181,10 @@ extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
#define BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition"
#define BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND \
"SELECT pg_catalog.worker_drop_sequence_dependency(%s);"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"

View File

@ -19,10 +19,30 @@ SET client_min_messages TO ERROR;
-- Create roles
CREATE ROLE foo1;
CREATE ROLE foo2;
-- Create collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
-- Create type
CREATE TYPE pair_type AS (a int, b int);
-- Create function
CREATE FUNCTION one_as_result() RETURNS INT LANGUAGE SQL AS
$$
SELECT 1;
$$;
-- Create text search dictionary
CREATE TEXT SEARCH DICTIONARY my_german_dict (
template = snowball,
language = german,
stopwords = german
);
-- Create text search config
CREATE TEXT SEARCH CONFIGURATION my_ts_config ( parser = default );
ALTER TEXT SEARCH CONFIGURATION my_ts_config ALTER MAPPING FOR asciiword WITH my_german_dict;
-- Create sequence
CREATE SEQUENCE seq;
-- Create colocated distributed tables
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'));
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'), col int default (one_as_result()), myserial serial, phone text COLLATE german_phonebook, initials pair_type);
CREATE SEQUENCE seq_owned OWNED BY dist1.id;
CREATE INDEX dist1_search_phone_idx ON dist1 USING gin (to_tsvector('my_ts_config'::regconfig, (COALESCE(phone, ''::text))::text));
SELECT create_distributed_table('dist1', 'id');
create_distributed_table
---------------------------------------------------------------------
@ -52,6 +72,8 @@ CREATE TABLE loc1 (id int PRIMARY KEY);
INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- Create publication
CREATE PUBLICATION pub_all;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
citus_set_coordinator_host
@ -168,8 +190,8 @@ SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").cancel(' || :pid || ')');
-- Failure to drop sequence dependency for all tables
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -177,7 +199,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequen
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").kill()');
mitmproxy
---------------------------------------------------------------------
@ -321,7 +343,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="ALTER DATABASE.*OWNER TO").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Filure to create schema
-- Failure to create schema
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
@ -336,6 +358,108 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metad
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create collation
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create function
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create text search dictionary
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create text search config
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create type
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create publication
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create sequence
@ -353,6 +477,40 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop sequence dependency for distributed table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to drop distributed table if exists
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create distributed table
@ -370,6 +528,40 @@ SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to record sequence dependency for table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create index for table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to create reference table
@ -540,6 +732,125 @@ SELECT citus.mitmproxy('conn.onQuery(query="SELECT citus_internal_add_object_met
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark function as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark collation as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark text search dictionary as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark text search configuration as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark type as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark sequence as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to mark publication as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").cancel(' || :pid || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: canceling statement due to user request
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
ERROR: connection to the remote node localhost:xxxxx failed with the following error: connection not open
-- Failure to set isactive to true
@ -680,9 +991,10 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
RESET citus.metadata_sync_mode;
DROP PUBLICATION pub_all;
DROP SCHEMA dummy;
DROP SCHEMA mx_metadata_sync_multi_trans CASCADE;
NOTICE: drop cascades to 10 other objects
NOTICE: drop cascades to 15 other objects
DROP ROLE foo1;
DROP ROLE foo2;
SELECT citus_remove_node('localhost', :master_port);

View File

@ -149,6 +149,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -174,7 +175,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(49 rows)
(50 rows)
-- Show that CREATE INDEX commands are included in the activate node snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -206,6 +207,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -231,7 +233,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(50 rows)
(51 rows)
-- Show that schema changes are included in the activate node snapshot
CREATE SCHEMA mx_testing_schema;
@ -265,6 +267,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -291,7 +294,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that append distributed tables are not included in the activate node snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -331,6 +334,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -357,7 +361,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that range distributed tables are not included in the activate node snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -390,6 +394,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -416,7 +421,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Test start_metadata_sync_to_node and citus_activate_node UDFs
-- Ensure that hasmetadata=false for all nodes
@ -1943,6 +1948,12 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10010, 's')
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10009, 't')
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10010, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_1.mx_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_2.mx_table_2');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency('public.dist_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_ref');
SELECT pg_catalog.worker_drop_sequence_dependency('public.test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -1997,7 +2008,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(111 rows)
(117 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;

View File

@ -149,6 +149,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -174,7 +175,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(49 rows)
(50 rows)
-- Show that CREATE INDEX commands are included in the activate node snapshot
CREATE INDEX mx_index ON mx_test_table(col_2);
@ -206,6 +207,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -231,7 +233,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(50 rows)
(51 rows)
-- Show that schema changes are included in the activate node snapshot
CREATE SCHEMA mx_testing_schema;
@ -265,6 +267,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -291,7 +294,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that append distributed tables are not included in the activate node snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -331,6 +334,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -357,7 +361,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Show that range distributed tables are not included in the activate node snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -390,6 +394,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
RESET ROLE
SELECT alter_role_if_exists('postgres', 'ALTER ROLE postgres SET lc_messages = ''C''')
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 2, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -416,7 +421,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
WITH placement_data(shardid, shardlength, groupid, placementid) AS (VALUES (1310000, 0, 1, 100000), (1310001, 0, 2, 100001), (1310002, 0, 1, 100002), (1310003, 0, 2, 100003), (1310004, 0, 1, 100004), (1310005, 0, 2, 100005), (1310006, 0, 1, 100006), (1310007, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardlength, groupid, placementid) FROM placement_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(52 rows)
(53 rows)
-- Test start_metadata_sync_to_node and citus_activate_node UDFs
-- Ensure that hasmetadata=false for all nodes
@ -1943,6 +1948,12 @@ SELECT unnest(activate_node_snapshot()) order by 1;
SELECT citus_internal_add_partition_metadata ('public.dist_table_1'::regclass, 'h', 'a', 10010, 's')
SELECT citus_internal_add_partition_metadata ('public.mx_ref'::regclass, 'n', NULL, 10009, 't')
SELECT citus_internal_add_partition_metadata ('public.test_table'::regclass, 'h', 'id', 10010, 's')
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_1.mx_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_test_schema_2.mx_table_2');
SELECT pg_catalog.worker_drop_sequence_dependency('mx_testing_schema.mx_test_table');
SELECT pg_catalog.worker_drop_sequence_dependency('public.dist_table_1');
SELECT pg_catalog.worker_drop_sequence_dependency('public.mx_ref');
SELECT pg_catalog.worker_drop_sequence_dependency('public.test_table');
SELECT pg_catalog.worker_drop_sequence_dependency(logicalrelid::regclass::text) FROM pg_dist_partition
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
@ -1997,7 +2008,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
(111 rows)
(117 rows)
-- shouldn't work since test_table is MX
ALTER TABLE test_table ADD COLUMN id3 bigserial;

View File

@ -18,11 +18,36 @@ SET client_min_messages TO ERROR;
CREATE ROLE foo1;
CREATE ROLE foo2;
-- Create collation
CREATE COLLATION german_phonebook (provider = icu, locale = 'de-u-co-phonebk');
-- Create type
CREATE TYPE pair_type AS (a int, b int);
-- Create function
CREATE FUNCTION one_as_result() RETURNS INT LANGUAGE SQL AS
$$
SELECT 1;
$$;
-- Create text search dictionary
CREATE TEXT SEARCH DICTIONARY my_german_dict (
template = snowball,
language = german,
stopwords = german
);
-- Create text search config
CREATE TEXT SEARCH CONFIGURATION my_ts_config ( parser = default );
ALTER TEXT SEARCH CONFIGURATION my_ts_config ALTER MAPPING FOR asciiword WITH my_german_dict;
-- Create sequence
CREATE SEQUENCE seq;
-- Create colocated distributed tables
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'));
CREATE TABLE dist1 (id int PRIMARY KEY default nextval('seq'), col int default (one_as_result()), myserial serial, phone text COLLATE german_phonebook, initials pair_type);
CREATE SEQUENCE seq_owned OWNED BY dist1.id;
CREATE INDEX dist1_search_phone_idx ON dist1 USING gin (to_tsvector('my_ts_config'::regconfig, (COALESCE(phone, ''::text))::text));
SELECT create_distributed_table('dist1', 'id');
INSERT INTO dist1 SELECT i FROM generate_series(1,100) i;
@ -42,6 +67,9 @@ INSERT INTO loc1 SELECT i FROM generate_series(1,100) i;
CREATE TABLE loc2 (id int REFERENCES loc1(id));
INSERT INTO loc2 SELECT i FROM generate_series(1,100) i;
-- Create publication
CREATE PUBLICATION pub_all;
-- citus_set_coordinator_host with wrong port
SELECT citus_set_coordinator_host('localhost', 9999);
-- citus_set_coordinator_host with correct port
@ -88,10 +116,10 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="INSERT INTO pg_dist_node").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").cancel(' || :pid || ')');
-- Failure to drop sequence dependency for all tables
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency").kill()');
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*FROM pg_dist_partition").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop shell table
@ -142,24 +170,84 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="ALTER DATABASE.*OWNER TO").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Filure to create schema
-- Failure to create schema
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE SCHEMA IF NOT EXISTS mx_metadata_sync_multi_trans AUTHORIZATION").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create collation
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*CREATE COLLATION mx_metadata_sync_multi_trans.german_phonebook").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create function
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE OR REPLACE FUNCTION mx_metadata_sync_multi_trans.one_as_result").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create text search dictionary
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_german_dict").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create text search config
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*my_ts_config").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create type
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_create_or_replace_object.*pair_type").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create publication
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE PUBLICATION.*pub_all").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create sequence
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT worker_apply_sequence_command").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop sequence dependency for distributed table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_drop_sequence_dependency.*mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to drop distributed table if exists
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="DROP TABLE IF EXISTS mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create distributed table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.dist1").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.dist1").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to record sequence dependency for table
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT pg_catalog.worker_record_sequence_dependency").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create index for table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="CREATE INDEX dist1_search_phone_idx ON mx_metadata_sync_multi_trans.dist1 USING gin").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to create reference table
SELECT citus.mitmproxy('conn.onQuery(query="CREATE TABLE mx_metadata_sync_multi_trans.ref").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
@ -220,6 +308,48 @@ SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="SELECT citus_internal_add_object_metadata").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark function as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*one_as_result").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark collation as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*german_phonebook").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark text search dictionary as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_german_dict").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark text search configuration as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*my_ts_config").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark type as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pair_type").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark sequence as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*seq_owned").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to mark publication as distributed
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.onQuery(query="WITH distributed_object_data.*pub_all").kill()');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
-- Failure to set isactive to true
SELECT citus.mitmproxy('conn.onQuery(query="UPDATE pg_dist_node SET isactive = TRUE").cancel(' || :pid || ')');
SELECT citus_activate_node('localhost', :worker_2_proxy_port);
@ -277,6 +407,7 @@ SELECT * FROM pg_dist_node ORDER BY nodeport;
SELECT citus.mitmproxy('conn.allow()');
RESET citus.metadata_sync_mode;
DROP PUBLICATION pub_all;
DROP SCHEMA dummy;
DROP SCHEMA mx_metadata_sync_multi_trans CASCADE;
DROP ROLE foo1;