diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index bcc89b281..481f21a96 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -69,10 +69,9 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; - +static List * GetTableReplicaIdentityCommand(Oid relationId); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_ddl_events); @@ -474,6 +473,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) List *tableDDLEventList = NIL; List *tableCreationCommandList = NIL; List *indexAndConstraintCommandList = NIL; + List *replicaIdentityEvents = NIL; tableCreationCommandList = GetTableCreationCommands(relationId, includeSequenceDefaults); @@ -482,10 +482,45 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId); tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); + replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId); + tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents); + return tableDDLEventList; } +/* + * GetTableReplicaIdentityCommand returns the list of DDL commands to + * (re)define the replica identity choice for a given table. + */ +static List * +GetTableReplicaIdentityCommand(Oid relationId) +{ + List *replicaIdentityCreateCommandList = NIL; + char *replicaIdentityCreateCommand = NULL; + + /* + * We skip non-relations because postgres does not support + * ALTER TABLE .. REPLICA IDENTITY on non-relations. + */ + char relationKind = get_rel_relkind(relationId); + if (relationKind != RELKIND_RELATION) + { + return NIL; + } + + replicaIdentityCreateCommand = pg_get_replica_identity_command(relationId); + + if (replicaIdentityCreateCommand) + { + replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList, + replicaIdentityCreateCommand); + } + + return replicaIdentityCreateCommandList; +} + + /* * GetTableCreationCommands takes in a relationId, and returns the list of DDL * commands needed to reconstruct the relation, excluding indexes and diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index d8e1a4236..3dbe27801 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -1066,3 +1066,49 @@ contain_nextval_expression_walker(Node *node, void *context) } return expression_tree_walker(node, contain_nextval_expression_walker, context); } + + +/* + * pg_get_replica_identity_command function returns the required ALTER .. TABLE + * command to define the replica identity. + */ +char * +pg_get_replica_identity_command(Oid tableRelationId) +{ + Relation relation = NULL; + StringInfo buf = makeStringInfo(); + char *relationName = NULL; + char replicaIdentity = 0; + + relation = heap_open(tableRelationId, AccessShareLock); + + replicaIdentity = relation->rd_rel->relreplident; + + relationName = generate_qualified_relation_name(tableRelationId); + + if (replicaIdentity == REPLICA_IDENTITY_INDEX) + { + Oid indexId = RelationGetReplicaIndex(relation); + + if (OidIsValid(indexId)) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY USING INDEX %s ", + relationName, + quote_identifier(get_rel_name(indexId))); + } + } + else if (replicaIdentity == REPLICA_IDENTITY_NOTHING) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY NOTHING", + relationName); + } + else if (replicaIdentity == REPLICA_IDENTITY_FULL) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY FULL", + relationName); + } + + heap_close(relation, AccessShareLock); + + return (buf->len > 0) ? buf->data : NULL; +} diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 0e14c3ed7..4f1413e2f 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -41,6 +41,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); extern bool contain_nextval_expression_walker(Node *node, void *context); +extern char * pg_get_replica_identity_command(Oid tableRelationId); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index f498325a5..e71b6c120 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -920,8 +920,131 @@ NOTICE: Copying data from local table... (1 row) COMMIT; +SET citus.shard_count TO 4; +BEGIN; +CREATE SCHEMA sc3; +CREATE TABLE sc3.alter_replica_table +( + name text, + id int PRIMARY KEY +); +ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('sc3.alter_replica_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +BEGIN; +CREATE SCHEMA sc4; +CREATE TABLE sc4.alter_replica_table +( + name text, + id int PRIMARY KEY +); +INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100); +SET search_path = 'sc4'; +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +SET search_path = 'public'; +BEGIN; +CREATE SCHEMA sc5; +CREATE TABLE sc5.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100); +ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL; +SELECT create_distributed_table('sc5.alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,f) + (localhost,57638,t,f) +(2 rows) + +BEGIN; +CREATE SCHEMA sc6; +CREATE TABLE sc6.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id); +ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('sc6.alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +BEGIN; +CREATE TABLE alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id); +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + DROP TABLE tt1; DROP TABLE tt2; +DROP TABLE alter_replica_table; DROP SCHEMA sc CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table sc.ref @@ -930,3 +1053,11 @@ DROP SCHEMA sc2 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table sc2.hash drop cascades to table sc2.ref +DROP SCHEMA sc3 CASCADE; +NOTICE: drop cascades to table sc3.alter_replica_table +DROP SCHEMA sc4 CASCADE; +NOTICE: drop cascades to table sc4.alter_replica_table +DROP SCHEMA sc5 CASCADE; +NOTICE: drop cascades to table sc5.alter_replica_table +DROP SCHEMA sc6 CASCADE; +NOTICE: drop cascades to table sc6.alter_replica_table diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index d054eb314..17c9914a9 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -516,7 +516,81 @@ SELECT create_reference_table('sc2.ref'); COMMIT; +SET citus.shard_count TO 4; + +BEGIN; +CREATE SCHEMA sc3; +CREATE TABLE sc3.alter_replica_table +( + name text, + id int PRIMARY KEY +); +ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('sc3.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$); + +BEGIN; +CREATE SCHEMA sc4; +CREATE TABLE sc4.alter_replica_table +( + name text, + id int PRIMARY KEY +); +INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100); +SET search_path = 'sc4'; +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$); +SET search_path = 'public'; + +BEGIN; +CREATE SCHEMA sc5; +CREATE TABLE sc5.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100); +ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL; +SELECT create_distributed_table('sc5.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$); + +BEGIN; +CREATE SCHEMA sc6; +CREATE TABLE sc6.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id); +ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('sc6.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$); + +BEGIN; +CREATE TABLE alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id); +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$); + DROP TABLE tt1; DROP TABLE tt2; +DROP TABLE alter_replica_table; DROP SCHEMA sc CASCADE; DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SCHEMA sc4 CASCADE; +DROP SCHEMA sc5 CASCADE; +DROP SCHEMA sc6 CASCADE;