Replica identity on create_distributed_table

By this commit, citus minds the replica identity of the table when
we distribute the table. So the shards of the distributed table
have the same replica identity with the local table.
pull/1734/head
Furkan Sahin 2017-10-26 11:58:26 +03:00 committed by mehmet furkan şahin
parent 5661062a69
commit 2b39c52f0b
5 changed files with 289 additions and 2 deletions

View File

@ -69,10 +69,9 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
static List * GetTableReplicaIdentityCommand(Oid relationId);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_metadata);
PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
@ -474,6 +473,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
List *tableCreationCommandList = NIL; List *tableCreationCommandList = NIL;
List *indexAndConstraintCommandList = NIL; List *indexAndConstraintCommandList = NIL;
List *replicaIdentityEvents = NIL;
tableCreationCommandList = GetTableCreationCommands(relationId, tableCreationCommandList = GetTableCreationCommands(relationId,
includeSequenceDefaults); includeSequenceDefaults);
@ -482,10 +482,45 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId); indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId);
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId);
tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents);
return tableDDLEventList; return tableDDLEventList;
} }
/*
* GetTableReplicaIdentityCommand returns the list of DDL commands to
* (re)define the replica identity choice for a given table.
*/
static List *
GetTableReplicaIdentityCommand(Oid relationId)
{
List *replicaIdentityCreateCommandList = NIL;
char *replicaIdentityCreateCommand = NULL;
/*
* We skip non-relations because postgres does not support
* ALTER TABLE .. REPLICA IDENTITY on non-relations.
*/
char relationKind = get_rel_relkind(relationId);
if (relationKind != RELKIND_RELATION)
{
return NIL;
}
replicaIdentityCreateCommand = pg_get_replica_identity_command(relationId);
if (replicaIdentityCreateCommand)
{
replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList,
replicaIdentityCreateCommand);
}
return replicaIdentityCreateCommandList;
}
/* /*
* GetTableCreationCommands takes in a relationId, and returns the list of DDL * GetTableCreationCommands takes in a relationId, and returns the list of DDL
* commands needed to reconstruct the relation, excluding indexes and * commands needed to reconstruct the relation, excluding indexes and

View File

@ -1066,3 +1066,49 @@ contain_nextval_expression_walker(Node *node, void *context)
} }
return expression_tree_walker(node, contain_nextval_expression_walker, context); return expression_tree_walker(node, contain_nextval_expression_walker, context);
} }
/*
* pg_get_replica_identity_command function returns the required ALTER .. TABLE
* command to define the replica identity.
*/
char *
pg_get_replica_identity_command(Oid tableRelationId)
{
Relation relation = NULL;
StringInfo buf = makeStringInfo();
char *relationName = NULL;
char replicaIdentity = 0;
relation = heap_open(tableRelationId, AccessShareLock);
replicaIdentity = relation->rd_rel->relreplident;
relationName = generate_qualified_relation_name(tableRelationId);
if (replicaIdentity == REPLICA_IDENTITY_INDEX)
{
Oid indexId = RelationGetReplicaIndex(relation);
if (OidIsValid(indexId))
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY USING INDEX %s ",
relationName,
quote_identifier(get_rel_name(indexId)));
}
}
else if (replicaIdentity == REPLICA_IDENTITY_NOTHING)
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY NOTHING",
relationName);
}
else if (replicaIdentity == REPLICA_IDENTITY_FULL)
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY FULL",
relationName);
}
heap_close(relation, AccessShareLock);
return (buf->len > 0) ? buf->data : NULL;
}

View File

@ -41,6 +41,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId); extern List * pg_get_table_grants(Oid relationId);
extern bool contain_nextval_expression_walker(Node *node, void *context); extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */ /* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer); extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -920,8 +920,131 @@ NOTICE: Copying data from local table...
(1 row) (1 row)
COMMIT; COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,f)
(localhost,57638,t,f)
(2 rows)
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
DROP TABLE tt1; DROP TABLE tt1;
DROP TABLE tt2; DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE; DROP SCHEMA sc CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc.ref DETAIL: drop cascades to table sc.ref
@ -930,3 +1053,11 @@ DROP SCHEMA sc2 CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc2.hash DETAIL: drop cascades to table sc2.hash
drop cascades to table sc2.ref drop cascades to table sc2.ref
DROP SCHEMA sc3 CASCADE;
NOTICE: drop cascades to table sc3.alter_replica_table
DROP SCHEMA sc4 CASCADE;
NOTICE: drop cascades to table sc4.alter_replica_table
DROP SCHEMA sc5 CASCADE;
NOTICE: drop cascades to table sc5.alter_replica_table
DROP SCHEMA sc6 CASCADE;
NOTICE: drop cascades to table sc6.alter_replica_table

View File

@ -516,7 +516,81 @@ SELECT create_reference_table('sc2.ref');
COMMIT; COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
DROP TABLE tt1; DROP TABLE tt1;
DROP TABLE tt2; DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE; DROP SCHEMA sc CASCADE;
DROP SCHEMA sc2 CASCADE; DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SCHEMA sc4 CASCADE;
DROP SCHEMA sc5 CASCADE;
DROP SCHEMA sc6 CASCADE;