From 2b39c52f0b32ef7b77b2f3724d94caa6da0fa8f4 Mon Sep 17 00:00:00 2001 From: Furkan Sahin Date: Thu, 26 Oct 2017 11:58:26 +0300 Subject: [PATCH] Replica identity on create_distributed_table By this commit, citus minds the replica identity of the table when we distribute the table. So the shards of the distributed table have the same replica identity with the local table. --- .../distributed/master/master_node_protocol.c | 39 +++++- .../distributed/utils/citus_ruleutils.c | 46 ++++++ src/include/distributed/citus_ruleutils.h | 1 + .../regress/expected/multi_create_table.out | 131 ++++++++++++++++++ src/test/regress/sql/multi_create_table.sql | 74 ++++++++++ 5 files changed, 289 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index bcc89b281..481f21a96 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -69,10 +69,9 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; - +static List * GetTableReplicaIdentityCommand(Oid relationId); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); - /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_ddl_events); @@ -474,6 +473,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) List *tableDDLEventList = NIL; List *tableCreationCommandList = NIL; List *indexAndConstraintCommandList = NIL; + List *replicaIdentityEvents = NIL; tableCreationCommandList = GetTableCreationCommands(relationId, includeSequenceDefaults); @@ -482,10 +482,45 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults) indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId); tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); + replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId); + tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents); + return tableDDLEventList; } +/* + * GetTableReplicaIdentityCommand returns the list of DDL commands to + * (re)define the replica identity choice for a given table. + */ +static List * +GetTableReplicaIdentityCommand(Oid relationId) +{ + List *replicaIdentityCreateCommandList = NIL; + char *replicaIdentityCreateCommand = NULL; + + /* + * We skip non-relations because postgres does not support + * ALTER TABLE .. REPLICA IDENTITY on non-relations. + */ + char relationKind = get_rel_relkind(relationId); + if (relationKind != RELKIND_RELATION) + { + return NIL; + } + + replicaIdentityCreateCommand = pg_get_replica_identity_command(relationId); + + if (replicaIdentityCreateCommand) + { + replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList, + replicaIdentityCreateCommand); + } + + return replicaIdentityCreateCommandList; +} + + /* * GetTableCreationCommands takes in a relationId, and returns the list of DDL * commands needed to reconstruct the relation, excluding indexes and diff --git a/src/backend/distributed/utils/citus_ruleutils.c b/src/backend/distributed/utils/citus_ruleutils.c index d8e1a4236..3dbe27801 100644 --- a/src/backend/distributed/utils/citus_ruleutils.c +++ b/src/backend/distributed/utils/citus_ruleutils.c @@ -1066,3 +1066,49 @@ contain_nextval_expression_walker(Node *node, void *context) } return expression_tree_walker(node, contain_nextval_expression_walker, context); } + + +/* + * pg_get_replica_identity_command function returns the required ALTER .. TABLE + * command to define the replica identity. + */ +char * +pg_get_replica_identity_command(Oid tableRelationId) +{ + Relation relation = NULL; + StringInfo buf = makeStringInfo(); + char *relationName = NULL; + char replicaIdentity = 0; + + relation = heap_open(tableRelationId, AccessShareLock); + + replicaIdentity = relation->rd_rel->relreplident; + + relationName = generate_qualified_relation_name(tableRelationId); + + if (replicaIdentity == REPLICA_IDENTITY_INDEX) + { + Oid indexId = RelationGetReplicaIndex(relation); + + if (OidIsValid(indexId)) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY USING INDEX %s ", + relationName, + quote_identifier(get_rel_name(indexId))); + } + } + else if (replicaIdentity == REPLICA_IDENTITY_NOTHING) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY NOTHING", + relationName); + } + else if (replicaIdentity == REPLICA_IDENTITY_FULL) + { + appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY FULL", + relationName); + } + + heap_close(relation, AccessShareLock); + + return (buf->len > 0) ? buf->data : NULL; +} diff --git a/src/include/distributed/citus_ruleutils.h b/src/include/distributed/citus_ruleutils.h index 0e14c3ed7..4f1413e2f 100644 --- a/src/include/distributed/citus_ruleutils.h +++ b/src/include/distributed/citus_ruleutils.h @@ -41,6 +41,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid, extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern List * pg_get_table_grants(Oid relationId); extern bool contain_nextval_expression_walker(Node *node, void *context); +extern char * pg_get_replica_identity_command(Oid tableRelationId); /* Function declarations for version dependent PostgreSQL ruleutils functions */ extern void pg_get_query_def(Query *query, StringInfo buffer); diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index f498325a5..e71b6c120 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -920,8 +920,131 @@ NOTICE: Copying data from local table... (1 row) COMMIT; +SET citus.shard_count TO 4; +BEGIN; +CREATE SCHEMA sc3; +CREATE TABLE sc3.alter_replica_table +( + name text, + id int PRIMARY KEY +); +ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('sc3.alter_replica_table', 'id'); + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +BEGIN; +CREATE SCHEMA sc4; +CREATE TABLE sc4.alter_replica_table +( + name text, + id int PRIMARY KEY +); +INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100); +SET search_path = 'sc4'; +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +SET search_path = 'public'; +BEGIN; +CREATE SCHEMA sc5; +CREATE TABLE sc5.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100); +ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL; +SELECT create_distributed_table('sc5.alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,f) + (localhost,57638,t,f) +(2 rows) + +BEGIN; +CREATE SCHEMA sc6; +CREATE TABLE sc6.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id); +ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('sc6.alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + +BEGIN; +CREATE TABLE alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id); +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('alter_replica_table', 'id'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$); + run_command_on_workers +------------------------ + (localhost,57637,t,i) + (localhost,57638,t,i) +(2 rows) + DROP TABLE tt1; DROP TABLE tt2; +DROP TABLE alter_replica_table; DROP SCHEMA sc CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table sc.ref @@ -930,3 +1053,11 @@ DROP SCHEMA sc2 CASCADE; NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table sc2.hash drop cascades to table sc2.ref +DROP SCHEMA sc3 CASCADE; +NOTICE: drop cascades to table sc3.alter_replica_table +DROP SCHEMA sc4 CASCADE; +NOTICE: drop cascades to table sc4.alter_replica_table +DROP SCHEMA sc5 CASCADE; +NOTICE: drop cascades to table sc5.alter_replica_table +DROP SCHEMA sc6 CASCADE; +NOTICE: drop cascades to table sc6.alter_replica_table diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index d054eb314..17c9914a9 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -516,7 +516,81 @@ SELECT create_reference_table('sc2.ref'); COMMIT; +SET citus.shard_count TO 4; + +BEGIN; +CREATE SCHEMA sc3; +CREATE TABLE sc3.alter_replica_table +( + name text, + id int PRIMARY KEY +); +ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('sc3.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$); + +BEGIN; +CREATE SCHEMA sc4; +CREATE TABLE sc4.alter_replica_table +( + name text, + id int PRIMARY KEY +); +INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100); +SET search_path = 'sc4'; +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey; +SELECT create_distributed_table('alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$); +SET search_path = 'public'; + +BEGIN; +CREATE SCHEMA sc5; +CREATE TABLE sc5.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100); +ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL; +SELECT create_distributed_table('sc5.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$); + +BEGIN; +CREATE SCHEMA sc6; +CREATE TABLE sc6.alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id); +ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('sc6.alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$); + +BEGIN; +CREATE TABLE alter_replica_table +( + name text, + id int NOT NULL +); +INSERT INTO alter_replica_table(id) SELECT generate_series(1,100); +CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id); +ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx; +SELECT create_distributed_table('alter_replica_table', 'id'); +COMMIT; +SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$); + DROP TABLE tt1; DROP TABLE tt2; +DROP TABLE alter_replica_table; DROP SCHEMA sc CASCADE; DROP SCHEMA sc2 CASCADE; +DROP SCHEMA sc3 CASCADE; +DROP SCHEMA sc4 CASCADE; +DROP SCHEMA sc5 CASCADE; +DROP SCHEMA sc6 CASCADE;