Address MX CONCURRENTLY problems

Adds a non-transactional multi-command method to propagate DDLs to all
MX/metadata-synced nodes.
pull/1938/head
Jason Petersen 2017-03-30 00:38:30 -06:00
parent afa9bd4840
commit d128ad723a
5 changed files with 117 additions and 20 deletions

View File

@ -1993,24 +1993,27 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
}
else
{
ShowNoticeIfNotUsing2PC();
}
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
if (shouldSyncMetadata)
{
List *commandList = list_make2(DISABLE_DDL_PROPAGATION,
(char *) ddlJob->commandString);
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
}
if (ddlJob->preventTransaction)
{
ExecuteSequentialTasksWithoutResults(ddlJob->taskList);
}
else
{
ShowNoticeIfNotUsing2PC();
if (shouldSyncMetadata)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
}
}

View File

@ -68,6 +68,51 @@ SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command)
}
void
SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet, List *commandList)
{
List *workerNodeList = WorkerNodeList();
ListCell *workerNodeCell = NULL;
char *nodeUser = CitusExtensionOwnerName();
ListCell *commandCell = NULL;
if (XactModificationLevel > XACT_MODIFICATION_NONE)
{
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
errmsg("cannot open new connections after the first modification "
"command within a transaction")));
}
/* run commands serially */
foreach(workerNodeCell, workerNodeList)
{
MultiConnection *workerConnection = NULL;
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
int connectionFlags = FORCE_NEW_CONNECTION;
if (targetWorkerSet == WORKERS_WITH_METADATA && !workerNode->hasMetadata)
{
continue;
}
workerConnection = GetNodeUserDatabaseConnection(connectionFlags, nodeName,
nodePort, nodeUser, NULL);
/* iterate over the commands and execute them in the same connection */
foreach(commandCell, commandList)
{
char *commandString = lfirst(commandCell);
ExecuteCriticalRemoteCommand(workerConnection, commandString);
}
CloseConnection(workerConnection);
}
}
/*
* SendCommandToWorkersParams sends a command to all workers in parallel.
* Commands are committed on the workers when the local transaction commits. The

View File

@ -30,6 +30,8 @@ typedef enum TargetWorkerSet
extern List * GetWorkerTransactions(void);
extern void SendCommandToWorker(char *nodeName, int32 nodePort, char *command);
extern void SendCommandToWorkers(TargetWorkerSet targetWorkerSet, char *command);
extern void SendBareCommandListToWorkers(TargetWorkerSet targetWorkerSet,
List *commandList);
extern void SendCommandToWorkersParams(TargetWorkerSet targetWorkerSet, char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);

View File

@ -18,6 +18,7 @@ SELECT * FROM mx_ddl_table ORDER BY key;
CREATE INDEX ddl_test_index ON mx_ddl_table(value);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value);
-- ADD COLUMN
ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER;
-- SET DEFAULT
@ -40,6 +41,7 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL;
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\c - - - :worker_1_port
@ -52,9 +54,21 @@ Indexes:
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
Table "public.mx_ddl_table_1220088"
Column | Type | Modifiers
---------+---------+--------------------
key | integer | not null
value | integer |
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index_1220088" btree (value)
"ddl_test_index_1220088" btree (value)
\c - - - :worker_2_port
\d mx_ddl_table
Table "public.mx_ddl_table"
@ -65,9 +79,21 @@ Indexes:
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index" btree (value)
"ddl_test_index" btree (value)
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
Table "public.mx_ddl_table_1220089"
Column | Type | Modifiers
---------+---------+--------------------
key | integer | not null
value | integer |
version | integer | not null default 1
Indexes:
"mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key)
"ddl_test_concurrent_index_1220089" btree (value)
"ddl_test_index_1220089" btree (value)
INSERT INTO mx_ddl_table VALUES (37, 78, 2);
INSERT INTO mx_ddl_table VALUES (38, 78);
-- Switch to the coordinator
@ -100,6 +126,7 @@ SELECT * FROM mx_ddl_table ORDER BY key;
DROP INDEX ddl_test_index;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DROP INDEX CONCURRENTLY ddl_test_concurrent_index;
-- DROP DEFAULT
ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT;
-- DROP NOT NULL
@ -126,7 +153,15 @@ Indexes:
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
Table "public.mx_ddl_table_1220088"
Column | Type | Modifiers
--------+---------+-----------
key | integer | not null
value | integer |
Indexes:
"mx_ddl_table_pkey_1220088" PRIMARY KEY, btree (key)
\c - - - :worker_2_port
\d mx_ddl_table
Table "public.mx_ddl_table"
@ -137,7 +172,15 @@ Indexes:
Indexes:
"mx_ddl_table_pkey" PRIMARY KEY, btree (key)
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
Table "public.mx_ddl_table_1220089"
Column | Type | Modifiers
--------+---------+-----------
key | integer | not null
value | integer |
Indexes:
"mx_ddl_table_pkey_1220089" PRIMARY KEY, btree (key)
-- Show that DDL commands are done within a two-phase commit transaction
\c - - - :master_port
SET client_min_messages TO debug2;

View File

@ -8,6 +8,8 @@ SELECT * FROM mx_ddl_table ORDER BY key;
-- CREATE INDEX
CREATE INDEX ddl_test_index ON mx_ddl_table(value);
CREATE INDEX CONCURRENTLY ddl_test_concurrent_index ON mx_ddl_table(value);
-- ADD COLUMN
ALTER TABLE mx_ddl_table ADD COLUMN version INTEGER;
@ -27,13 +29,13 @@ ALTER TABLE mx_ddl_table ALTER COLUMN version SET NOT NULL;
\d mx_ddl_table
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
\c - - - :worker_2_port
\d mx_ddl_table
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
INSERT INTO mx_ddl_table VALUES (37, 78, 2);
INSERT INTO mx_ddl_table VALUES (38, 78);
@ -56,6 +58,8 @@ SELECT * FROM mx_ddl_table ORDER BY key;
-- DROP INDEX
DROP INDEX ddl_test_index;
DROP INDEX CONCURRENTLY ddl_test_concurrent_index;
-- DROP DEFAULT
ALTER TABLE mx_ddl_table ALTER COLUMN version DROP DEFAULT;
@ -73,13 +77,13 @@ ALTER TABLE mx_ddl_table DROP COLUMN version;
\d mx_ddl_table
\d mx_ddl_table_1600000
\d mx_ddl_table_1220088
\c - - - :worker_2_port
\d mx_ddl_table
\d mx_ddl_table_1600001
\d mx_ddl_table_1220089
-- Show that DDL commands are done within a two-phase commit transaction
\c - - - :master_port