Propagate `mark_tables_colocated` changes in `pg_dist_partition` table to metadata workers.

pull/1045/head
Eren Basak 2016-12-19 15:09:23 +03:00
parent 71d73ec5ff
commit bed2e353db
5 changed files with 181 additions and 0 deletions

View File

@ -594,6 +594,25 @@ NodeDeleteCommand(uint32 nodeId)
} }
/*
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
* of the table with the given name to the given colocationId in pg_dist_partition
* table.
*/
char *
ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
{
StringInfo command = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
appendStringInfo(command, "UPDATE pg_dist_partition "
"SET colocationid = %d "
"WHERE logicalrelid = %s::regclass",
colocationId, quote_literal_cstr(qualifiedRelationName));
return command->data;
}
/* /*
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string. * of a worker and returns the command in a string.

View File

@ -22,11 +22,13 @@
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -617,6 +619,7 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
HeapTuple heapTuple = NULL; HeapTuple heapTuple = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
SysScanDesc scanDescriptor = NULL; SysScanDesc scanDescriptor = NULL;
bool shouldSyncMetadata = false;
bool indexOK = true; bool indexOK = true;
int scanKeyCount = 1; int scanKeyCount = 1;
ScanKeyData scanKey[scanKeyCount]; ScanKeyData scanKey[scanKeyCount];
@ -660,6 +663,15 @@ UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colocationId)
systable_endscan(scanDescriptor); systable_endscan(scanDescriptor);
heap_close(pgDistPartition, NoLock); heap_close(pgDistPartition, NoLock);
shouldSyncMetadata = ShouldSyncTableMetadata(distributedRelationId);
if (shouldSyncMetadata)
{
char *updateColocationIdCommand = ColocationIdUpdateCommand(distributedRelationId,
colocationId);
SendCommandToWorkers(WORKERS_WITH_METADATA, updateColocationIdCommand);
}
} }

View File

@ -29,6 +29,7 @@ extern char * TableOwnerResetCommand(Oid distributedRelationId);
extern char * NodeListInsertCommand(List *workerNodeList); extern char * NodeListInsertCommand(List *workerNodeList);
extern List * ShardListInsertCommand(List *shardIntervalList); extern List * ShardListInsertCommand(List *shardIntervalList);
extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeDeleteCommand(uint32 nodeId);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define DELETE_ALL_NODES "TRUNCATE pg_dist_node"

View File

@ -737,6 +737,94 @@ Foreign-key constraints:
Referenced by: Referenced by:
TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1) TABLE "mx_test_schema_2.mx_table_2" CONSTRAINT "mx_fk_constraint" FOREIGN KEY (col1) REFERENCES mx_test_schema_1.mx_table_1(col1)
-- Check that mark_tables_colocated call propagates the changes to the workers
\c - - - :master_port
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000;
SET citus.shard_count TO 7;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_colocation_test_1 (a int);
SELECT create_distributed_table('mx_colocation_test_1', 'a');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE mx_colocation_test_2 (a int);
SELECT create_distributed_table('mx_colocation_test_2', 'a');
create_distributed_table
--------------------------
(1 row)
-- Check the colocation IDs of the created tables
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass
ORDER BY logicalrelid;
logicalrelid | colocationid
----------------------+--------------
mx_colocation_test_1 | 10000
mx_colocation_test_2 | 10000
(2 rows)
-- Reset the colocation IDs of the test tables
DELETE FROM
pg_dist_colocation
WHERE EXISTS (
SELECT 1
FROM pg_dist_partition
WHERE
colocationid = pg_dist_partition.colocationid
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
UPDATE
pg_dist_partition
SET
colocationid = 0
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
-- Mark tables colocated and see the changes on the master and the worker
SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']);
mark_tables_colocated
-----------------------
(1 row)
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
logicalrelid | colocationid
----------------------+--------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001
(2 rows)
\c - - - :worker_1_port
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
logicalrelid | colocationid
----------------------+--------------
mx_colocation_test_1 | 10001
mx_colocation_test_2 | 10001
(2 rows)
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;

View File

@ -298,6 +298,67 @@ NOT VALID;
\c - - - :worker_1_port \c - - - :worker_1_port
\d mx_test_schema_1.mx_table_1 \d mx_test_schema_1.mx_table_1
-- Check that mark_tables_colocated call propagates the changes to the workers
\c - - - :master_port
SELECT nextval('pg_catalog.pg_dist_colocationid_seq') AS last_colocation_id \gset
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 10000;
SET citus.shard_count TO 7;
SET citus.shard_replication_factor TO 1;
CREATE TABLE mx_colocation_test_1 (a int);
SELECT create_distributed_table('mx_colocation_test_1', 'a');
CREATE TABLE mx_colocation_test_2 (a int);
SELECT create_distributed_table('mx_colocation_test_2', 'a');
-- Check the colocation IDs of the created tables
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass
ORDER BY logicalrelid;
-- Reset the colocation IDs of the test tables
DELETE FROM
pg_dist_colocation
WHERE EXISTS (
SELECT 1
FROM pg_dist_partition
WHERE
colocationid = pg_dist_partition.colocationid
AND pg_dist_partition.logicalrelid = 'mx_colocation_test_1'::regclass);
UPDATE
pg_dist_partition
SET
colocationid = 0
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
-- Mark tables colocated and see the changes on the master and the worker
SELECT mark_tables_colocated('mx_colocation_test_1', ARRAY['mx_colocation_test_2']);
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
\c - - - :worker_1_port
SELECT
logicalrelid, colocationid
FROM
pg_dist_partition
WHERE
logicalrelid = 'mx_colocation_test_1'::regclass
OR logicalrelid = 'mx_colocation_test_2'::regclass;
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
-- Cleanup -- Cleanup
\c - - - :worker_1_port \c - - - :worker_1_port