Merge pull request #1734 from citusdata/create_table_replica_identity

create_distributed_table minds the replica identity
pull/1743/head
Mehmet Furkan ŞAHİN 2017-10-31 13:31:28 +03:00 committed by GitHub
commit d85a3952f5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 289 additions and 2 deletions

View File

@ -69,10 +69,9 @@ int ShardReplicationFactor = 1; /* desired replication factor for shards */
int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */ int ShardMaxSize = 1048576; /* maximum size in KB one shard can grow to */
int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN; int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
static List * GetTableReplicaIdentityCommand(Oid relationId);
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor); static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_metadata);
PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_table_ddl_events);
@ -474,6 +473,7 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
List *tableCreationCommandList = NIL; List *tableCreationCommandList = NIL;
List *indexAndConstraintCommandList = NIL; List *indexAndConstraintCommandList = NIL;
List *replicaIdentityEvents = NIL;
tableCreationCommandList = GetTableCreationCommands(relationId, tableCreationCommandList = GetTableCreationCommands(relationId,
includeSequenceDefaults); includeSequenceDefaults);
@ -482,10 +482,45 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId); indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId);
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList); tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
replicaIdentityEvents = GetTableReplicaIdentityCommand(relationId);
tableDDLEventList = list_concat(tableDDLEventList, replicaIdentityEvents);
return tableDDLEventList; return tableDDLEventList;
} }
/*
* GetTableReplicaIdentityCommand returns the list of DDL commands to
* (re)define the replica identity choice for a given table.
*/
static List *
GetTableReplicaIdentityCommand(Oid relationId)
{
List *replicaIdentityCreateCommandList = NIL;
char *replicaIdentityCreateCommand = NULL;
/*
* We skip non-relations because postgres does not support
* ALTER TABLE .. REPLICA IDENTITY on non-relations.
*/
char relationKind = get_rel_relkind(relationId);
if (relationKind != RELKIND_RELATION)
{
return NIL;
}
replicaIdentityCreateCommand = pg_get_replica_identity_command(relationId);
if (replicaIdentityCreateCommand)
{
replicaIdentityCreateCommandList = lappend(replicaIdentityCreateCommandList,
replicaIdentityCreateCommand);
}
return replicaIdentityCreateCommandList;
}
/* /*
* GetTableCreationCommands takes in a relationId, and returns the list of DDL * GetTableCreationCommands takes in a relationId, and returns the list of DDL
* commands needed to reconstruct the relation, excluding indexes and * commands needed to reconstruct the relation, excluding indexes and

View File

@ -1066,3 +1066,49 @@ contain_nextval_expression_walker(Node *node, void *context)
} }
return expression_tree_walker(node, contain_nextval_expression_walker, context); return expression_tree_walker(node, contain_nextval_expression_walker, context);
} }
/*
* pg_get_replica_identity_command function returns the required ALTER .. TABLE
* command to define the replica identity.
*/
char *
pg_get_replica_identity_command(Oid tableRelationId)
{
Relation relation = NULL;
StringInfo buf = makeStringInfo();
char *relationName = NULL;
char replicaIdentity = 0;
relation = heap_open(tableRelationId, AccessShareLock);
replicaIdentity = relation->rd_rel->relreplident;
relationName = generate_qualified_relation_name(tableRelationId);
if (replicaIdentity == REPLICA_IDENTITY_INDEX)
{
Oid indexId = RelationGetReplicaIndex(relation);
if (OidIsValid(indexId))
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY USING INDEX %s ",
relationName,
quote_identifier(get_rel_name(indexId)));
}
}
else if (replicaIdentity == REPLICA_IDENTITY_NOTHING)
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY NOTHING",
relationName);
}
else if (replicaIdentity == REPLICA_IDENTITY_FULL)
{
appendStringInfo(buf, "ALTER TABLE %s REPLICA IDENTITY FULL",
relationName);
}
heap_close(relation, AccessShareLock);
return (buf->len > 0) ? buf->data : NULL;
}

View File

@ -41,6 +41,7 @@ extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId); extern List * pg_get_table_grants(Oid relationId);
extern bool contain_nextval_expression_walker(Node *node, void *context); extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
/* Function declarations for version dependent PostgreSQL ruleutils functions */ /* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer); extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -920,8 +920,131 @@ NOTICE: Copying data from local table...
(1 row) (1 row)
COMMIT; COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,f)
(localhost,57638,t,f)
(2 rows)
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
run_command_on_workers
------------------------
(localhost,57637,t,i)
(localhost,57638,t,i)
(2 rows)
DROP TABLE tt1; DROP TABLE tt1;
DROP TABLE tt2; DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE; DROP SCHEMA sc CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc.ref DETAIL: drop cascades to table sc.ref
@ -930,3 +1053,11 @@ DROP SCHEMA sc2 CASCADE;
NOTICE: drop cascades to 2 other objects NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table sc2.hash DETAIL: drop cascades to table sc2.hash
drop cascades to table sc2.ref drop cascades to table sc2.ref
DROP SCHEMA sc3 CASCADE;
NOTICE: drop cascades to table sc3.alter_replica_table
DROP SCHEMA sc4 CASCADE;
NOTICE: drop cascades to table sc4.alter_replica_table
DROP SCHEMA sc5 CASCADE;
NOTICE: drop cascades to table sc5.alter_replica_table
DROP SCHEMA sc6 CASCADE;
NOTICE: drop cascades to table sc6.alter_replica_table

View File

@ -516,7 +516,81 @@ SELECT create_reference_table('sc2.ref');
COMMIT; COMMIT;
SET citus.shard_count TO 4;
BEGIN;
CREATE SCHEMA sc3;
CREATE TABLE sc3.alter_replica_table
(
name text,
id int PRIMARY KEY
);
ALTER TABLE sc3.alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('sc3.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc3' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc4;
CREATE TABLE sc4.alter_replica_table
(
name text,
id int PRIMARY KEY
);
INSERT INTO sc4.alter_replica_table(id) SELECT generate_series(1,100);
SET search_path = 'sc4';
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX alter_replica_table_pkey;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc4' LIMIT 1$$);
SET search_path = 'public';
BEGIN;
CREATE SCHEMA sc5;
CREATE TABLE sc5.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc5.alter_replica_table(id) SELECT generate_series(1,100);
ALTER TABLE sc5.alter_replica_table REPLICA IDENTITY FULL;
SELECT create_distributed_table('sc5.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc5' LIMIT 1$$);
BEGIN;
CREATE SCHEMA sc6;
CREATE TABLE sc6.alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO sc6.alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON sc6.alter_replica_table(id);
ALTER TABLE sc6.alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('sc6.alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='sc6' LIMIT 1$$);
BEGIN;
CREATE TABLE alter_replica_table
(
name text,
id int NOT NULL
);
INSERT INTO alter_replica_table(id) SELECT generate_series(1,100);
CREATE UNIQUE INDEX unique_idx ON alter_replica_table(id);
ALTER TABLE alter_replica_table REPLICA IDENTITY USING INDEX unique_idx;
SELECT create_distributed_table('alter_replica_table', 'id');
COMMIT;
SELECT run_command_on_workers($$SELECT relreplident FROM pg_class join information_schema.tables AS tables ON (pg_class.relname=tables.table_name) WHERE relname LIKE 'alter_replica_table_%' AND table_schema='public' LIMIT 1$$);
DROP TABLE tt1; DROP TABLE tt1;
DROP TABLE tt2; DROP TABLE tt2;
DROP TABLE alter_replica_table;
DROP SCHEMA sc CASCADE; DROP SCHEMA sc CASCADE;
DROP SCHEMA sc2 CASCADE; DROP SCHEMA sc2 CASCADE;
DROP SCHEMA sc3 CASCADE;
DROP SCHEMA sc4 CASCADE;
DROP SCHEMA sc5 CASCADE;
DROP SCHEMA sc6 CASCADE;