mirror of https://github.com/citusdata/citus.git
Parallelize and transactionalize DDL.
parent
57f2b7c452
commit
8d42d18c81
|
@ -30,17 +30,15 @@
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "commands/tablecmds.h"
|
#include "commands/tablecmds.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/commit_protocol.h"
|
|
||||||
#include "distributed/connection_cache.h"
|
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_copy.h"
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_join_order.h"
|
#include "distributed/multi_join_order.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
|
||||||
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
|
#include "distributed/multi_utility.h" /* IWYU pragma: keep */
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
@ -95,14 +93,13 @@ static void VerifyTransmitStmt(CopyStmt *copyStatement);
|
||||||
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
static Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
|
||||||
bool *commandMustRunAsOwner);
|
bool *commandMustRunAsOwner);
|
||||||
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
static Node * ProcessIndexStmt(IndexStmt *createIndexStatement,
|
||||||
const char *createIndexCommand, bool isTopLevel);
|
const char *createIndexCommand);
|
||||||
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
static Node * ProcessDropIndexStmt(DropStmt *dropIndexStatement,
|
||||||
const char *dropIndexCommand, bool isTopLevel);
|
const char *dropIndexCommand);
|
||||||
static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
static Node * ProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
|
||||||
const char *alterTableCommand, bool isTopLevel);
|
const char *alterTableCommand);
|
||||||
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
static Node * ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
const char *alterObjectSchemaCommand,
|
const char *alterObjectSchemaCommand);
|
||||||
bool isTopLevel);
|
|
||||||
|
|
||||||
/* Local functions forward declarations for unsupported command checks */
|
/* Local functions forward declarations for unsupported command checks */
|
||||||
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
|
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
|
||||||
|
@ -117,12 +114,8 @@ static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
||||||
/* Local functions forward declarations for helper functions */
|
/* Local functions forward declarations for helper functions */
|
||||||
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
|
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
|
||||||
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
|
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
|
||||||
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString);
|
||||||
bool isTopLevel);
|
|
||||||
static void ShowNoticeIfNotUsing2PC(void);
|
static void ShowNoticeIfNotUsing2PC(void);
|
||||||
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString);
|
|
||||||
static void ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
|
|
||||||
ShardConnections *shardConnections);
|
|
||||||
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
|
||||||
void *arg);
|
void *arg);
|
||||||
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
static void CheckCopyPermissions(CopyStmt *copyStatement);
|
||||||
|
@ -210,12 +203,9 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
|
/* ddl commands are propagated to workers only if EnableDDLPropagation is set */
|
||||||
if (EnableDDLPropagation)
|
if (EnableDDLPropagation)
|
||||||
{
|
{
|
||||||
bool isTopLevel = (context == PROCESS_UTILITY_TOPLEVEL);
|
|
||||||
|
|
||||||
if (IsA(parsetree, IndexStmt))
|
if (IsA(parsetree, IndexStmt))
|
||||||
{
|
{
|
||||||
parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString,
|
parsetree = ProcessIndexStmt((IndexStmt *) parsetree, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsA(parsetree, DropStmt))
|
if (IsA(parsetree, DropStmt))
|
||||||
|
@ -223,7 +213,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
DropStmt *dropStatement = (DropStmt *) parsetree;
|
DropStmt *dropStatement = (DropStmt *) parsetree;
|
||||||
if (dropStatement->removeType == OBJECT_INDEX)
|
if (dropStatement->removeType == OBJECT_INDEX)
|
||||||
{
|
{
|
||||||
parsetree = ProcessDropIndexStmt(dropStatement, queryString, isTopLevel);
|
parsetree = ProcessDropIndexStmt(dropStatement, queryString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,8 +222,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
|
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
|
||||||
if (alterTableStmt->relkind == OBJECT_TABLE)
|
if (alterTableStmt->relkind == OBJECT_TABLE)
|
||||||
{
|
{
|
||||||
parsetree = ProcessAlterTableStmt(alterTableStmt, queryString,
|
parsetree = ProcessAlterTableStmt(alterTableStmt, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -257,8 +246,7 @@ multi_ProcessUtility(Node *parsetree,
|
||||||
if (IsA(parsetree, AlterObjectSchemaStmt))
|
if (IsA(parsetree, AlterObjectSchemaStmt))
|
||||||
{
|
{
|
||||||
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
|
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
|
||||||
parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString,
|
parsetree = ProcessAlterObjectSchemaStmt(setSchemaStmt, queryString);
|
||||||
isTopLevel);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -538,8 +526,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand,
|
ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We first check whether a distributed relation is affected. For that, we need to
|
* We first check whether a distributed relation is affected. For that, we need to
|
||||||
|
@ -586,7 +573,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
|
||||||
ErrorIfUnsupportedIndexStmt(createIndexStatement);
|
ErrorIfUnsupportedIndexStmt(createIndexStatement);
|
||||||
|
|
||||||
/* if it is supported, go ahead and execute the command */
|
/* if it is supported, go ahead and execute the command */
|
||||||
ExecuteDistributedDDLCommand(relationId, createIndexCommand, isTopLevel);
|
ExecuteDistributedDDLCommand(relationId, createIndexCommand);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -603,8 +590,7 @@ ProcessIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
ListCell *dropObjectCell = NULL;
|
ListCell *dropObjectCell = NULL;
|
||||||
Oid distributedIndexId = InvalidOid;
|
Oid distributedIndexId = InvalidOid;
|
||||||
|
@ -673,7 +659,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
||||||
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
|
||||||
|
|
||||||
/* if it is supported, go ahead and execute the command */
|
/* if it is supported, go ahead and execute the command */
|
||||||
ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand, isTopLevel);
|
ExecuteDistributedDDLCommand(distributedRelationId, dropIndexCommand);
|
||||||
}
|
}
|
||||||
|
|
||||||
return (Node *) dropIndexStatement;
|
return (Node *) dropIndexStatement;
|
||||||
|
@ -689,8 +675,7 @@ ProcessDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand,
|
||||||
* master node table.
|
* master node table.
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand,
|
ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTableCommand)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
/* first check whether a distributed relation is affected */
|
/* first check whether a distributed relation is affected */
|
||||||
if (alterTableStatement->relation != NULL)
|
if (alterTableStatement->relation != NULL)
|
||||||
|
@ -705,7 +690,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
|
||||||
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
|
ErrorIfUnsupportedAlterTableStmt(alterTableStatement);
|
||||||
|
|
||||||
/* if it is supported, go ahead and execute the command */
|
/* if it is supported, go ahead and execute the command */
|
||||||
ExecuteDistributedDDLCommand(relationId, alterTableCommand, isTopLevel);
|
ExecuteDistributedDDLCommand(relationId, alterTableCommand);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -722,7 +707,7 @@ ProcessAlterTableStmt(AlterTableStmt *alterTableStatement, const char *alterTabl
|
||||||
*/
|
*/
|
||||||
static Node *
|
static Node *
|
||||||
ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
||||||
const char *alterObjectSchemaCommand, bool isTopLevel)
|
const char *alterObjectSchemaCommand)
|
||||||
{
|
{
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
bool noWait = false;
|
bool noWait = false;
|
||||||
|
@ -1278,16 +1263,13 @@ IsAlterTableRenameStmt(RenameStmt *renameStmt)
|
||||||
* ExecuteDistributedDDLCommand applies a given DDL command to the given
|
* ExecuteDistributedDDLCommand applies a given DDL command to the given
|
||||||
* distributed table in a distributed transaction. If the multi shard commit protocol is
|
* distributed table in a distributed transaction. If the multi shard commit protocol is
|
||||||
* in its default value of '1pc', then a notice message indicating that '2pc' might be
|
* in its default value of '1pc', then a notice message indicating that '2pc' might be
|
||||||
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to
|
* used for extra safety.
|
||||||
* each shard placement and COMMIT/ROLLBACK is handled by
|
*
|
||||||
* CompleteShardPlacementTransactions function.
|
* DDL is executed, via worker_apply_shard_ddl_command(), on the workers.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString)
|
||||||
bool isTopLevel)
|
|
||||||
{
|
{
|
||||||
bool executionOK = false;
|
|
||||||
|
|
||||||
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
if (XactModificationLevel == XACT_MODIFICATION_DATA)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
ereport(ERROR, (errcode(ERRCODE_ACTIVE_SQL_TRANSACTION),
|
||||||
|
@ -1295,16 +1277,14 @@ ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString,
|
||||||
"transaction blocks containing data modifications")));
|
"transaction blocks containing data modifications")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
BeginOrContinueCoordinatedTransaction();
|
||||||
|
|
||||||
|
/* FIXME: Move into ExecuteDDLOnRelationPlacements()? */
|
||||||
ShowNoticeIfNotUsing2PC();
|
ShowNoticeIfNotUsing2PC();
|
||||||
|
|
||||||
executionOK = ExecuteCommandOnWorkerShards(relationId, ddlCommandString);
|
ExecuteDDLOnRelationPlacements(relationId, ddlCommandString);
|
||||||
|
|
||||||
/* if command could not be executed on any finalized shard placement, error out */
|
|
||||||
if (!executionOK)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errmsg("could not execute DDL command on worker node shards")));
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/* FIXME: Move into ExecuteDDLOnRelationPlacements()? */
|
||||||
XactModificationLevel = XACT_MODIFICATION_SCHEMA;
|
XactModificationLevel = XACT_MODIFICATION_SCHEMA;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1328,116 +1308,6 @@ ShowNoticeIfNotUsing2PC(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteCommandOnWorkerShards executes a given command on all the finalized
|
|
||||||
* shard placements of the given table within a distributed transaction. The
|
|
||||||
* value of citus.multi_shard_commit_protocol is set to '2pc' by the caller
|
|
||||||
* ExecuteDistributedDDLCommand function so that two phase commit protocol is used.
|
|
||||||
*
|
|
||||||
* ExecuteCommandOnWorkerShards opens an individual connection for each of the
|
|
||||||
* shard placement. After all connections are opened, a BEGIN command followed by
|
|
||||||
* a proper "SELECT worker_apply_shard_ddl_command(<shardId>, <DDL Command>)" is
|
|
||||||
* sent to all open connections in a serial manner.
|
|
||||||
*
|
|
||||||
* The opened transactions are handled by the CompleteShardPlacementTransactions
|
|
||||||
* function.
|
|
||||||
*
|
|
||||||
* Note: There are certain errors which would occur on few nodes and not on the
|
|
||||||
* others. For example, adding a column with a type which exists on some nodes
|
|
||||||
* and not on the others.
|
|
||||||
*
|
|
||||||
* Note: The execution will be blocked if a prepared transaction from previous
|
|
||||||
* executions exist on the workers. In this case, those prepared transactions should
|
|
||||||
* be removed by either COMMIT PREPARED or ROLLBACK PREPARED.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString)
|
|
||||||
{
|
|
||||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
|
||||||
char *tableOwner = TableOwner(relationId);
|
|
||||||
ListCell *shardIntervalCell = NULL;
|
|
||||||
Oid schemaId = get_rel_namespace(relationId);
|
|
||||||
char *schemaName = get_namespace_name(schemaId);
|
|
||||||
|
|
||||||
LockShards(shardIntervalList, ShareLock);
|
|
||||||
OpenTransactionsToAllShardPlacements(shardIntervalList, tableOwner);
|
|
||||||
|
|
||||||
foreach(shardIntervalCell, shardIntervalList)
|
|
||||||
{
|
|
||||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
|
||||||
uint64 shardId = shardInterval->shardId;
|
|
||||||
ShardConnections *shardConnections = NULL;
|
|
||||||
bool shardConnectionsFound = false;
|
|
||||||
char *escapedSchemaName = quote_literal_cstr(schemaName);
|
|
||||||
char *escapedCommandString = quote_literal_cstr(commandString);
|
|
||||||
StringInfo applyCommand = makeStringInfo();
|
|
||||||
|
|
||||||
shardConnections = GetShardConnections(shardId, &shardConnectionsFound);
|
|
||||||
Assert(shardConnectionsFound);
|
|
||||||
|
|
||||||
/* build the shard ddl command */
|
|
||||||
appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
|
|
||||||
escapedSchemaName, escapedCommandString);
|
|
||||||
|
|
||||||
ExecuteCommandOnShardPlacements(applyCommand, shardId, shardConnections);
|
|
||||||
|
|
||||||
FreeStringInfo(applyCommand);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* check for cancellation one last time before returning */
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* ExecuteCommandOnShardPlacements executes the given ddl command on the
|
|
||||||
* placements of the given shard, using the given shard connections.
|
|
||||||
*/
|
|
||||||
static void
|
|
||||||
ExecuteCommandOnShardPlacements(StringInfo applyCommand, uint64 shardId,
|
|
||||||
ShardConnections *shardConnections)
|
|
||||||
{
|
|
||||||
List *connectionList = shardConnections->connectionList;
|
|
||||||
ListCell *connectionCell = NULL;
|
|
||||||
|
|
||||||
Assert(connectionList != NIL);
|
|
||||||
|
|
||||||
foreach(connectionCell, connectionList)
|
|
||||||
{
|
|
||||||
TransactionConnection *transactionConnection =
|
|
||||||
(TransactionConnection *) lfirst(connectionCell);
|
|
||||||
PGconn *connection = transactionConnection->connection;
|
|
||||||
PGresult *result = NULL;
|
|
||||||
|
|
||||||
/* send the query */
|
|
||||||
result = PQexec(connection, applyCommand->data);
|
|
||||||
if (PQresultStatus(result) != PGRES_TUPLES_OK)
|
|
||||||
{
|
|
||||||
WarnRemoteError(connection, result);
|
|
||||||
ereport(ERROR, (errmsg("could not execute DDL command on worker "
|
|
||||||
"node shards")));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
char *workerName = ConnectionGetOptionValue(connection, "host");
|
|
||||||
char *workerPort = ConnectionGetOptionValue(connection, "port");
|
|
||||||
|
|
||||||
ereport(DEBUG2, (errmsg("applied command on shard " UINT64_FORMAT
|
|
||||||
" on node %s:%s", shardId, workerName,
|
|
||||||
workerPort)));
|
|
||||||
}
|
|
||||||
|
|
||||||
PQclear(result);
|
|
||||||
|
|
||||||
transactionConnection->transactionState = TRANSACTION_STATE_OPEN;
|
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Before acquiring a table lock, check whether we have sufficient rights.
|
* Before acquiring a table lock, check whether we have sufficient rights.
|
||||||
* In the case of DROP INDEX, also try to lock the table before the index.
|
* In the case of DROP INDEX, also try to lock the table before the index.
|
||||||
|
@ -1761,7 +1631,6 @@ ReplicateGrantStmt(Node *parsetree)
|
||||||
RangeVar *relvar = (RangeVar *) lfirst(objectCell);
|
RangeVar *relvar = (RangeVar *) lfirst(objectCell);
|
||||||
Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
|
Oid relOid = RangeVarGetRelid(relvar, NoLock, false);
|
||||||
const char *grantOption = "";
|
const char *grantOption = "";
|
||||||
bool isTopLevel = true;
|
|
||||||
|
|
||||||
if (!IsDistributedTable(relOid))
|
if (!IsDistributedTable(relOid))
|
||||||
{
|
{
|
||||||
|
@ -1794,7 +1663,7 @@ ReplicateGrantStmt(Node *parsetree)
|
||||||
granteesString.data);
|
granteesString.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
ExecuteDistributedDDLCommand(relOid, ddlString.data, isTopLevel);
|
ExecuteDistributedDDLCommand(relOid, ddlString.data);
|
||||||
resetStringInfo(&ddlString);
|
resetStringInfo(&ddlString);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,6 @@
|
||||||
|
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
#include "catalog/pg_class.h"
|
#include "catalog/pg_class.h"
|
||||||
#include "distributed/connection_cache.h"
|
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
|
|
@ -155,12 +155,12 @@ NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
WARNING: data type bigint has no default operator class for access method "gist"
|
WARNING: data type bigint has no default operator class for access method "gist"
|
||||||
HINT: You must specify an operator class for the index or define a default operator class for the data type.
|
HINT: You must specify an operator class for the index or define a default operator class for the data type.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
CREATE INDEX try_index ON lineitem (non_existent_column);
|
CREATE INDEX try_index ON lineitem (non_existent_column);
|
||||||
WARNING: column "non_existent_column" does not exist
|
WARNING: column "non_existent_column" does not exist
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
CREATE INDEX ON lineitem (l_orderkey);
|
CREATE INDEX ON lineitem (l_orderkey);
|
||||||
ERROR: creating index without a name on a distributed table is currently unsupported
|
ERROR: creating index without a name on a distributed table is currently unsupported
|
||||||
-- Verify that none of failed indexes got created on the master node
|
-- Verify that none of failed indexes got created on the master node
|
||||||
|
|
|
@ -44,11 +44,7 @@ SELECT master_create_worker_shards('lineitem_hash', 2, 1);
|
||||||
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
|
CREATE INDEX lineitem_hash_time_index ON lineitem_hash (l_shipdate);
|
||||||
NOTICE: using one-phase commit for distributed DDL commands
|
NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
DEBUG: applied command on shard 650000 on node localhost:57637
|
|
||||||
DEBUG: applied command on shard 650001 on node localhost:57638
|
|
||||||
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
|
DEBUG: building index "lineitem_hash_time_index" on table "lineitem_hash"
|
||||||
DEBUG: sent COMMIT over connection 650000
|
|
||||||
DEBUG: sent COMMIT over connection 650001
|
|
||||||
CREATE TABLE orders_hash (
|
CREATE TABLE orders_hash (
|
||||||
o_orderkey bigint not null,
|
o_orderkey bigint not null,
|
||||||
o_custkey integer not null,
|
o_custkey integer not null,
|
||||||
|
|
|
@ -230,7 +230,9 @@ COMMIT;
|
||||||
|
|
||||||
-- Nothing from the block should have committed
|
-- Nothing from the block should have committed
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||||
|
SET client_min_messages = 'ERROR';
|
||||||
|
ALTER TABLE lineitem_alter DROP COLUMN first;
|
||||||
|
RESET client_min_messages;
|
||||||
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
||||||
CREATE TABLE single_shard_items (id integer, name text);
|
CREATE TABLE single_shard_items (id integer, name text);
|
||||||
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
||||||
|
@ -264,14 +266,13 @@ COMMIT;
|
||||||
-- Nothing from the block should have committed
|
-- Nothing from the block should have committed
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
|
|
||||||
-- Now try with 2pc off
|
-- Now try with 2pc off, that'll result in invalid shards
|
||||||
RESET citus.multi_shard_commit_protocol;
|
RESET citus.multi_shard_commit_protocol;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
|
|
||||||
-- The block should have committed with a warning
|
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
|
|
||||||
\c - - - :worker_2_port
|
\c - - - :worker_2_port
|
||||||
|
|
|
@ -262,8 +262,8 @@ NOTICE: relation "non_existent_table" does not exist, skipping
|
||||||
ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER;
|
ALTER TABLE IF EXISTS lineitem_alter ALTER COLUMN int_column2 SET DATA TYPE INTEGER;
|
||||||
ALTER TABLE lineitem_alter DROP COLUMN non_existent_column;
|
ALTER TABLE lineitem_alter DROP COLUMN non_existent_column;
|
||||||
WARNING: column "non_existent_column" of relation "lineitem_alter_220000" does not exist
|
WARNING: column "non_existent_column" of relation "lineitem_alter_220000" does not exist
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column;
|
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS non_existent_column;
|
||||||
NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping
|
NOTICE: column "non_existent_column" of relation "lineitem_alter" does not exist, skipping
|
||||||
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2;
|
ALTER TABLE lineitem_alter DROP COLUMN IF EXISTS int_column2;
|
||||||
|
@ -362,16 +362,16 @@ DETAIL: Only ADD|DROP COLUMN, SET|DROP NOT NULL, SET|DROP DEFAULT and TYPE subc
|
||||||
-- types
|
-- types
|
||||||
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;
|
ALTER TABLE lineitem_alter ADD COLUMN new_column non_existent_type;
|
||||||
WARNING: type "non_existent_type" does not exist
|
WARNING: type "non_existent_type" does not exist
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL;
|
ALTER TABLE lineitem_alter ALTER COLUMN null_column SET NOT NULL;
|
||||||
WARNING: column "null_column" contains null values
|
WARNING: column "null_column" contains null values
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a';
|
ALTER TABLE lineitem_alter ALTER COLUMN l_partkey SET DEFAULT 'a';
|
||||||
WARNING: invalid input syntax for integer: "a"
|
WARNING: invalid input syntax for integer: "a"
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
-- Verify that we error out on statements involving RENAME
|
-- Verify that we error out on statements involving RENAME
|
||||||
ALTER TABLE lineitem_alter RENAME TO lineitem_renamed;
|
ALTER TABLE lineitem_alter RENAME TO lineitem_renamed;
|
||||||
ERROR: renaming distributed tables or their objects is currently unsupported
|
ERROR: renaming distributed tables or their objects is currently unsupported
|
||||||
|
@ -520,7 +520,7 @@ HINT: You can enable two-phase commit for extra safety with: SET citus.multi_sh
|
||||||
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
ALTER TABLE lineitem_alter ADD COLUMN first integer;
|
||||||
WARNING: column "first" of relation "lineitem_alter_220000" already exists
|
WARNING: column "first" of relation "lineitem_alter_220000" already exists
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57638
|
||||||
COMMIT;
|
COMMIT;
|
||||||
-- Nothing from the block should have committed
|
-- Nothing from the block should have committed
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||||
|
@ -528,6 +528,10 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
|
||||||
-----------+-----------
|
-----------+-----------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
|
SET client_min_messages = 'ERROR';
|
||||||
|
ALTER TABLE lineitem_alter DROP COLUMN first;
|
||||||
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
|
RESET client_min_messages;
|
||||||
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
-- Create single-shard table (to avoid deadlocks in the upcoming test hackery)
|
||||||
CREATE TABLE single_shard_items (id integer, name text);
|
CREATE TABLE single_shard_items (id integer, name text);
|
||||||
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
SELECT master_create_distributed_table('single_shard_items', 'id', 'hash');
|
||||||
|
@ -565,14 +569,14 @@ COMMIT;
|
||||||
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
||||||
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57638
|
||||||
ERROR: failed to prepare transaction
|
ERROR: failure on connection marked as essential: localhost:57638
|
||||||
-- Nothing from the block should have committed
|
-- Nothing from the block should have committed
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
indexname | tablename
|
indexname | tablename
|
||||||
-----------+-----------
|
-----------+-----------
|
||||||
(0 rows)
|
(0 rows)
|
||||||
|
|
||||||
-- Now try with 2pc off
|
-- Now try with 2pc off, that'll result in invalid shards
|
||||||
RESET citus.multi_shard_commit_protocol;
|
RESET citus.multi_shard_commit_protocol;
|
||||||
BEGIN;
|
BEGIN;
|
||||||
CREATE INDEX single_index_2 ON single_shard_items(id);
|
CREATE INDEX single_index_2 ON single_shard_items(id);
|
||||||
|
@ -580,8 +584,9 @@ NOTICE: using one-phase commit for distributed DDL commands
|
||||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||||
CREATE INDEX single_index_3 ON single_shard_items(name);
|
CREATE INDEX single_index_3 ON single_shard_items(name);
|
||||||
COMMIT;
|
COMMIT;
|
||||||
WARNING: failed to commit transaction on localhost:57638
|
WARNING: duplicate key value violates unique constraint "ddl_commands_command_key"
|
||||||
-- The block should have committed with a warning
|
DETAIL: Key (command)=(CREATE INDEX) already exists.
|
||||||
|
CONTEXT: while executing command on localhost:57638
|
||||||
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'single_shard_items';
|
||||||
indexname | tablename
|
indexname | tablename
|
||||||
----------------+--------------------
|
----------------+--------------------
|
||||||
|
@ -653,8 +658,8 @@ INSERT INTO test_ab VALUES (2, 11);
|
||||||
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
|
CREATE UNIQUE INDEX temp_unique_index_1 ON test_ab(a);
|
||||||
WARNING: could not create unique index "temp_unique_index_1_220022"
|
WARNING: could not create unique index "temp_unique_index_1_220022"
|
||||||
DETAIL: Key (a)=(2) is duplicated.
|
DETAIL: Key (a)=(2) is duplicated.
|
||||||
CONTEXT: while executing command on localhost:57638
|
CONTEXT: while executing command on localhost:57637
|
||||||
ERROR: could not execute DDL command on worker node shards
|
ERROR: failure on connection marked as essential: localhost:57637
|
||||||
SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
|
SELECT shardid FROM pg_dist_shard_placement NATURAL JOIN pg_dist_shard
|
||||||
WHERE logicalrelid='test_ab'::regclass AND shardstate=3;
|
WHERE logicalrelid='test_ab'::regclass AND shardstate=3;
|
||||||
shardid
|
shardid
|
||||||
|
|
Loading…
Reference in New Issue