diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 91c8e8903..d41c737a4 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -8,9 +8,6 @@ * COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input * from stdin, a program, or a file, and decides to copy new rows to existing * shards or new shards based on the partition method of the distributed table. - * If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which - * parses the master node copy options and handles communication with the master - * node. * * If this is the first command in the transaction, we open a new connection for * every shard placement. Otherwise we open as many connections as we can to @@ -102,9 +99,6 @@ /* constant used in binary protocol */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; -/* use a global connection to the master node in order to skip passing it around */ -static MultiConnection *masterConnection = NULL; - /* * Data size threshold to switch over the active placement for a connection. * If this is too low, overhead of starting COPY commands will hurt the @@ -196,19 +190,14 @@ typedef struct ShardConnections /* Local functions forward declarations */ -static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); -static char MasterPartitionMethod(RangeVar *relation); -static void RemoveMasterOptions(CopyStmt *copyStatement); static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, ShardConnections *shardConnections, bool stopOnFailure, bool useBinaryCopyFormat); static bool BinaryOutputFunctionDefined(Oid typeId); -static List * MasterShardPlacementList(uint64 shardId); -static List * RemoteActiveShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId, @@ -222,11 +211,7 @@ static void ReportCopyError(MultiConnection *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static int64 StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, bool useBinaryCopyFormat); -static int64 MasterCreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName); -static int64 RemoteCreateEmptyShard(char *relationName); -static void MasterUpdateShardStatistics(uint64 shardId); -static void RemoteUpdateShardStatistics(uint64 shardId); static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName); static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor); @@ -236,11 +221,8 @@ static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor, Oid *finalColumnTypeArray); static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray, bool binaryFormat); -static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist); static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName); -static bool IsCopyFromWorker(CopyStmt *copyStatement); -static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); static HTAB * CreateConnectionStateHash(MemoryContext memoryContext); static HTAB * CreateShardStateHash(MemoryContext memoryContext); @@ -321,106 +303,31 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) } } - masterConnection = NULL; /* reset, might still be set after error */ - bool isCopyFromWorker = IsCopyFromWorker(copyStatement); - if (isCopyFromWorker) + Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + char partitionMethod = PartitionMethod(relationId); + + /* disallow modifications to a partition table which have rep. factor > 1 */ + EnsurePartitionTableNotReplicated(relationId); + + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE || + partitionMethod == DISTRIBUTE_BY_NONE) { - CopyFromWorkerNode(copyStatement, completionTag); + CopyToExistingShards(copyStatement, completionTag); + } + else if (partitionMethod == DISTRIBUTE_BY_APPEND) + { + CopyToNewShards(copyStatement, completionTag, relationId); } else { - Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char partitionMethod = PartitionMethod(relationId); - - /* disallow modifications to a partition table which have rep. factor > 1 */ - EnsurePartitionTableNotReplicated(relationId); - - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == - DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE) - { - CopyToExistingShards(copyStatement, completionTag); - } - else if (partitionMethod == DISTRIBUTE_BY_APPEND) - { - CopyToNewShards(copyStatement, completionTag, relationId); - } - else - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("unsupported partition method"))); - } + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported partition method"))); } XactModificationLevel = XACT_MODIFICATION_DATA; } -/* - * IsCopyFromWorker checks if the given copy statement has the master host option. - */ -static bool -IsCopyFromWorker(CopyStmt *copyStatement) -{ - ListCell *optionCell = NULL; - foreach(optionCell, copyStatement->options) - { - DefElem *defel = (DefElem *) lfirst(optionCell); - if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0) - { - return true; - } - } - - return false; -} - - -/* - * CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes - * for append-partitioned tables. - */ -static void -CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) -{ - NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); - char *nodeName = masterNodeAddress->nodeName; - int32 nodePort = masterNodeAddress->nodePort; - uint32 connectionFlags = FOR_DML; - - masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort); - MarkRemoteTransactionCritical(masterConnection); - ClaimConnectionExclusively(masterConnection); - - RemoteTransactionBeginIfNecessary(masterConnection); - - /* strip schema name for local reference */ - char *schemaName = copyStatement->relation->schemaname; - copyStatement->relation->schemaname = NULL; - - Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - - /* put schema name back */ - copyStatement->relation->schemaname = schemaName; - char partitionMethod = MasterPartitionMethod(copyStatement->relation); - if (partitionMethod != DISTRIBUTE_BY_APPEND) - { - ereport(ERROR, (errmsg("copy from worker nodes is only supported " - "for append-partitioned tables"))); - } - - /* - * Remove master node options from the copy statement because they are not - * recognized by PostgreSQL machinery. - */ - RemoveMasterOptions(copyStatement); - - CopyToNewShards(copyStatement, completionTag, relationId); - - UnclaimConnection(masterConnection); - masterConnection = NULL; -} - - /* * CopyToExistingShards implements the COPY table_name FROM ... for hash or * range-partitioned tables where there are already shards into which to copy @@ -737,7 +644,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) } EndRemoteCopy(currentShardId, shardConnections->connectionList); - MasterUpdateShardStatistics(shardConnections->shardId); + UpdateShardStatistics(shardConnections->shardId); copiedDataSizeInBytes = 0; currentShardId = INVALID_SHARD_ID; @@ -761,7 +668,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) shardConnections->connectionList); } EndRemoteCopy(currentShardId, shardConnections->connectionList); - MasterUpdateShardStatistics(shardConnections->shardId); + UpdateShardStatistics(shardConnections->shardId); } EndCopyFrom(copyState); @@ -778,119 +685,6 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) } -/* - * MasterNodeAddress gets the master node address from copy options and returns - * it. Note that if the master_port is not provided, we use 5432 as the default - * port. - */ -static NodeAddress * -MasterNodeAddress(CopyStmt *copyStatement) -{ - NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress)); - char *nodeName = NULL; - - /* set default port to 5432 */ - int32 nodePort = 5432; - - ListCell *optionCell = NULL; - foreach(optionCell, copyStatement->options) - { - DefElem *defel = (DefElem *) lfirst(optionCell); - if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0) - { - nodeName = defGetString(defel); - } - else if (strncmp(defel->defname, "master_port", NAMEDATALEN) == 0) - { - nodePort = defGetInt32(defel); - } - } - - masterNodeAddress->nodeName = nodeName; - masterNodeAddress->nodePort = nodePort; - - return masterNodeAddress; -} - - -/* - * MasterPartitionMethod gets the partition method of the given relation from - * the master node and returns it. - */ -static char -MasterPartitionMethod(RangeVar *relation) -{ - char partitionMethod = '\0'; - bool raiseInterrupts = true; - - char *relationName = relation->relname; - char *schemaName = relation->schemaname; - char *qualifiedName = quote_qualified_identifier(schemaName, relationName); - - StringInfo partitionMethodCommand = makeStringInfo(); - appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); - - if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data)) - { - ReportConnectionError(masterConnection, ERROR); - } - PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) - { - char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); - if (partitionMethodString == NULL || (*partitionMethodString) == '\0') - { - ereport(ERROR, (errmsg("could not find a partition method for the " - "table %s", relationName))); - } - - partitionMethod = partitionMethodString[0]; - } - else - { - ReportResultError(masterConnection, queryResult, WARNING); - ereport(ERROR, (errmsg("could not get the partition method of the " - "distributed table"))); - } - - PQclear(queryResult); - - queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - Assert(!queryResult); - - return partitionMethod; -} - - -/* - * RemoveMasterOptions removes master node related copy options from the option - * list of the copy statement. - */ -static void -RemoveMasterOptions(CopyStmt *copyStatement) -{ - List *newOptionList = NIL; - ListCell *optionCell = NULL; - - /* walk over the list of all options */ - foreach(optionCell, copyStatement->options) - { - DefElem *option = (DefElem *) lfirst(optionCell); - - /* skip master related options */ - if ((strncmp(option->defname, "master_host", NAMEDATALEN) == 0) || - (strncmp(option->defname, "master_port", NAMEDATALEN) == 0)) - { - continue; - } - - newOptionList = lappend(newOptionList, option); - } - - copyStatement->options = newOptionList; -} - - /* * OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and * starts a COPY transaction if necessary. If a connection cannot be opened, @@ -918,7 +712,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); - List *activePlacementList = MasterShardPlacementList(shardId); + List *activePlacementList = ActiveShardPlacementList(shardId); MemoryContextSwitchTo(oldContext); @@ -988,8 +782,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement, /* * If stopOnFailure is true, we just error out and code execution should - * never reach to this point. This is the case for reference tables and - * copy from worker nodes. + * never reach to this point. This is the case for reference tables. */ Assert(!stopOnFailure || failedPlacementCount == 0); @@ -1096,87 +889,6 @@ BinaryOutputFunctionDefined(Oid typeId) } -/* - * MasterShardPlacementList dispatches the active shard placements call - * between local or remote master node according to the master connection state. - */ -static List * -MasterShardPlacementList(uint64 shardId) -{ - List *activePlacementList = NIL; - if (masterConnection == NULL) - { - activePlacementList = ActiveShardPlacementList(shardId); - } - else - { - activePlacementList = RemoteActiveShardPlacementList(shardId); - } - - return activePlacementList; -} - - -/* - * RemoteActiveShardPlacementList gets the active shard placement list - * for the given shard id from the remote master node. - */ -static List * -RemoteActiveShardPlacementList(uint64 shardId) -{ - List *activePlacementList = NIL; - bool raiseInterrupts = true; - - StringInfo shardPlacementsCommand = makeStringInfo(); - appendStringInfo(shardPlacementsCommand, ACTIVE_SHARD_PLACEMENTS_QUERY, shardId); - - if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data)) - { - ReportConnectionError(masterConnection, ERROR); - } - PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) - { - int rowCount = PQntuples(queryResult); - - for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) - { - char *placementIdString = PQgetvalue(queryResult, rowIndex, 0); - char *nodeName = pstrdup(PQgetvalue(queryResult, rowIndex, 1)); - char *nodePortString = pstrdup(PQgetvalue(queryResult, rowIndex, 2)); - uint32 nodePort = atoi(nodePortString); - uint64 placementId = atoll(placementIdString); - - ShardPlacement *shardPlacement = - (ShardPlacement *) palloc0(sizeof(ShardPlacement)); - - shardPlacement->placementId = placementId; - shardPlacement->nodeName = nodeName; - shardPlacement->nodePort = nodePort; - - /* - * We cannot know the nodeId, but it is not necessary at this point either. - * This is only used to to look up the connection for a group of co-located - * placements, but append-distributed tables are never co-located. - */ - shardPlacement->nodeId = -1; - - activePlacementList = lappend(activePlacementList, shardPlacement); - } - } - else - { - ereport(ERROR, (errmsg("could not get shard placements from the master node"))); - } - - PQclear(queryResult); - queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - Assert(!queryResult); - - return activePlacementList; -} - - /* Send copy binary headers to given connections */ static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList) @@ -1838,7 +1550,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, char *relationName = copyStatement->relation->relname; char *schemaName = copyStatement->relation->schemaname; char *qualifiedName = quote_qualified_identifier(schemaName, relationName); - int64 shardId = MasterCreateEmptyShard(qualifiedName); + int64 shardId = CreateEmptyShard(qualifiedName); bool stopOnFailure = true; shardConnections->shardId = shardId; @@ -1853,27 +1565,6 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, } -/* - * MasterCreateEmptyShard dispatches the create empty shard call between local or - * remote master node according to the master connection state. - */ -static int64 -MasterCreateEmptyShard(char *relationName) -{ - int64 shardId = 0; - if (masterConnection == NULL) - { - shardId = CreateEmptyShard(relationName); - } - else - { - shardId = RemoteCreateEmptyShard(relationName); - } - - return shardId; -} - - /* * CreateEmptyShard creates a new shard and related shard placements from the * local master node. @@ -1891,90 +1582,6 @@ CreateEmptyShard(char *relationName) } -/* - * RemoteCreateEmptyShard creates a new shard and related shard placements from - * the remote master node. - */ -static int64 -RemoteCreateEmptyShard(char *relationName) -{ - int64 shardId = 0; - bool raiseInterrupts = true; - - StringInfo createEmptyShardCommand = makeStringInfo(); - appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); - - if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data)) - { - ReportConnectionError(masterConnection, ERROR); - } - PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) - { - char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); - char *shardIdStringEnd = NULL; - shardId = strtoul(shardIdString, &shardIdStringEnd, 0); - } - else - { - ReportResultError(masterConnection, queryResult, WARNING); - ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); - } - - PQclear(queryResult); - queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - Assert(!queryResult); - - return shardId; -} - - -/* - * MasterUpdateShardStatistics dispatches the update shard statistics call - * between local or remote master node according to the master connection state. - */ -static void -MasterUpdateShardStatistics(uint64 shardId) -{ - if (masterConnection == NULL) - { - UpdateShardStatistics(shardId); - } - else - { - RemoteUpdateShardStatistics(shardId); - } -} - - -/* - * RemoteUpdateShardStatistics updates shard statistics on the remote master node. - */ -static void -RemoteUpdateShardStatistics(uint64 shardId) -{ - bool raiseInterrupts = true; - - StringInfo updateShardStatisticsCommand = makeStringInfo(); - appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, - shardId); - - if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data)) - { - ReportConnectionError(masterConnection, ERROR); - } - PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) - { - ereport(ERROR, (errmsg("could not update shard statistics"))); - } - - PQclear(queryResult); - queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts); - Assert(!queryResult); -} - - /* *INDENT-OFF* */ /* Append data to the copy buffer in outputState */ static void @@ -2754,46 +2361,25 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS */ if (copyStatement->relation != NULL) { - bool isDistributedRelation = false; - bool isCopyFromWorker = IsCopyFromWorker(copyStatement); + bool isFrom = copyStatement->is_from; - if (isCopyFromWorker) - { - RangeVar *relation = copyStatement->relation; - NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); - char *nodeName = masterNodeAddress->nodeName; - int32 nodePort = masterNodeAddress->nodePort; + /* consider using RangeVarGetRelidExtended to check perms before locking */ + Relation copiedRelation = heap_openrv(copyStatement->relation, + isFrom ? RowExclusiveLock : + AccessShareLock); - CreateLocalTable(relation, nodeName, nodePort); + bool isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); - /* - * We expect copy from worker to be on a distributed table; otherwise, - * it fails in CitusCopyFrom() while checking the partition method. - */ - isDistributedRelation = true; - } - else - { - bool isFrom = copyStatement->is_from; + /* ensure future lookups hit the same relation */ + char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation)); - /* consider using RangeVarGetRelidExtended to check perms before locking */ - Relation copiedRelation = heap_openrv(copyStatement->relation, - isFrom ? RowExclusiveLock : - AccessShareLock); + /* ensure we copy string into proper context */ + MemoryContext relationContext = GetMemoryChunkContext( + copyStatement->relation); + schemaName = MemoryContextStrdup(relationContext, schemaName); + copyStatement->relation->schemaname = schemaName; - isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); - - /* ensure future lookups hit the same relation */ - char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation)); - - /* ensure we copy string into proper context */ - MemoryContext relationContext = GetMemoryChunkContext( - copyStatement->relation); - schemaName = MemoryContextStrdup(relationContext, schemaName); - copyStatement->relation->schemaname = schemaName; - - heap_close(copiedRelation, NoLock); - } + heap_close(copiedRelation, NoLock); if (isDistributedRelation) { @@ -2808,11 +2394,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS #endif /* check permissions, we're bypassing postgres' normal checks */ - if (!isCopyFromWorker) - { - CheckCopyPermissions(copyStatement); - } - + CheckCopyPermissions(copyStatement); CitusCopyFrom(copyStatement, completionTag); return NULL; } @@ -2898,85 +2480,6 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS } -/* - * CreateLocalTable gets DDL commands from the remote node for the given - * relation. Then, it creates the local relation as temporary and on commit drop. - */ -static void -CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) -{ - ListCell *ddlCommandCell = NULL; - - char *relationName = relation->relname; - char *schemaName = relation->schemaname; - char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName); - - /* - * The warning message created in TableDDLCommandList() is descriptive - * enough; therefore, we just throw an error which says that we could not - * run the copy operation. - */ - List *ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName); - if (ddlCommandList == NIL) - { - ereport(ERROR, (errmsg("could not run copy from the worker node"))); - } - - /* apply DDL commands against the local database */ - foreach(ddlCommandCell, ddlCommandList) - { - StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); - Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); - bool applyDDLCommand = false; - - if (IsA(ddlCommandNode, CreateStmt) || - IsA(ddlCommandNode, CreateForeignTableStmt)) - { - CreateStmt *createStatement = (CreateStmt *) ddlCommandNode; - - /* create the local relation as temporary and on commit drop */ - createStatement->relation->relpersistence = RELPERSISTENCE_TEMP; - createStatement->oncommit = ONCOMMIT_DROP; - - /* temporarily strip schema name */ - createStatement->relation->schemaname = NULL; - - applyDDLCommand = true; - } - else if (IsA(ddlCommandNode, CreateForeignServerStmt)) - { - CreateForeignServerStmt *createServerStmt = - (CreateForeignServerStmt *) ddlCommandNode; - if (GetForeignServerByName(createServerStmt->servername, true) == NULL) - { - /* create server if not exists */ - applyDDLCommand = true; - } - } - else if ((IsA(ddlCommandNode, CreateExtensionStmt))) - { - applyDDLCommand = true; - } - else if ((IsA(ddlCommandNode, CreateSeqStmt))) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot copy to table with serial column from worker"), - errhint("Connect to the master node to COPY to tables which " - "use serial column types."))); - } - - /* run only a selected set of DDL commands */ - if (applyDDLCommand) - { - CitusProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode), - PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); - - CommandCounterIncrement(); - } - } -} - - /* * Check whether the current user has the permission to execute a COPY * statement, raise ERROR if not. In some cases we have to do this separately @@ -3251,7 +2754,7 @@ InitializeCopyShardState(CopyShardState *shardState, /* release active placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); - List *activePlacementList = MasterShardPlacementList(shardId); + List *activePlacementList = ActiveShardPlacementList(shardId); MemoryContextSwitchTo(oldContext); @@ -3307,8 +2810,7 @@ InitializeCopyShardState(CopyShardState *shardState, /* * If stopOnFailure is true, we just error out and code execution should - * never reach to this point. This is the case for reference tables and - * copy from worker nodes. + * never reach to this point. This is the case for reference tables. */ Assert(!stopOnFailure || failedPlacementCount == 0); diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 30f703039..e88e27e47 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -70,13 +70,6 @@ #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s CASCADE" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s" -#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" -#define ACTIVE_SHARD_PLACEMENTS_QUERY \ - "SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = " \ - INT64_FORMAT -#define UPDATE_SHARD_STATISTICS_QUERY \ - "SELECT master_update_shard_statistics(" INT64_FORMAT ")" -#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" /* Enumeration that defines the shard placement policy to use while staging */ typedef enum diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 432dfc87f..16b7a0e1b 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -318,56 +318,6 @@ ORDER BY LIMIT 5; --- Ensure that copy from worker node of table with serial column fails -CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial); -SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append'); - --- Connect to the first worker node -\c - - - 57637 - --- Test copy from the worker node -COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); - --- Connect back to the master node -\c - - - 57636 - --- Create customer table for the worker copy with constraint and index -CREATE TABLE customer_worker_copy_append ( - c_custkey integer , - c_name varchar(25) not null, - c_address varchar(40), - c_nationkey integer, - c_phone char(15), - c_acctbal decimal(15,2), - c_mktsegment char(10), - c_comment varchar(117), - primary key (c_custkey)); - -CREATE INDEX ON customer_worker_copy_append (c_name); - -SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append'); - --- Connect to the first worker node -\c - - - 57637 - --- Test copy from the worker node -COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); - --- Make sure we don't use 2PC when connecting to master, even if requested -BEGIN; -SET LOCAL citus.multi_shard_commit_protocol TO '2pc'; -COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); -COMMIT; - --- Test if there is no relation to copy data with the worker copy -COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); - --- Connect back to the master node -\c - - - 57636 - --- Test the content of the table -SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append; - -- Test schema support on append partitioned tables CREATE SCHEMA append; CREATE TABLE append.customer_copy ( @@ -384,14 +334,7 @@ SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'app -- Test copy from the master node COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|'); - --- Test copy from the worker node -\c - - - 57637 - -COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); - --- Connect back to the master node -\c - - - 57636 +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|'); -- Test the content of the table SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index ac3405042..99b3aa124 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -32,7 +32,7 @@ SELECT master_create_worker_shards('customer_copy_hash', 64, 1); -- Test empty copy COPY customer_copy_hash FROM STDIN; --- Test syntax error +-- Test syntax error COPY customer_copy_hash (c_custkey,c_name) FROM STDIN; ERROR: invalid input syntax for integer: "1,customer1" CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1" @@ -426,65 +426,6 @@ LIMIT 560137 | 57637 (5 rows) --- Ensure that copy from worker node of table with serial column fails -CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial); -SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append'); - master_create_distributed_table ---------------------------------- - -(1 row) - --- Connect to the first worker node -\c - - - 57637 --- Test copy from the worker node -COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); -ERROR: relation "public.customer_worker_copy_append_seq_seq_seq" does not exist --- Connect back to the master node -\c - - - 57636 --- Create customer table for the worker copy with constraint and index -CREATE TABLE customer_worker_copy_append ( - c_custkey integer , - c_name varchar(25) not null, - c_address varchar(40), - c_nationkey integer, - c_phone char(15), - c_acctbal decimal(15,2), - c_mktsegment char(10), - c_comment varchar(117), - primary key (c_custkey)); -CREATE INDEX ON customer_worker_copy_append (c_name); -SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append'); -WARNING: table "customer_worker_copy_append" has a UNIQUE or EXCLUDE constraint -DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced. -HINT: Consider using hash partitioning. - master_create_distributed_table ---------------------------------- - -(1 row) - --- Connect to the first worker node -\c - - - 57637 --- Test copy from the worker node -COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); --- Make sure we don't use 2PC when connecting to master, even if requested -BEGIN; -SET LOCAL citus.multi_shard_commit_protocol TO '2pc'; -COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); -COMMIT; --- Test if there is no relation to copy data with the worker copy -COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); -WARNING: relation "lineitem_copy_none" does not exist -CONTEXT: while executing command on localhost:57636 -ERROR: could not run copy from the worker node --- Connect back to the master node -\c - - - 57636 --- Test the content of the table -SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append; - min | max | avg | count ------+------+-----------------------+------- - 1 | 7000 | 4443.8028800000000000 | 2000 -(1 row) - -- Test schema support on append partitioned tables CREATE SCHEMA append; CREATE TABLE append.customer_copy ( @@ -504,11 +445,7 @@ SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'app -- Test copy from the master node COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|'); --- Test copy from the worker node -\c - - - 57637 -COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); --- Connect back to the master node -\c - - - 57636 +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|'); -- Test the content of the table SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy; min | max | avg | count @@ -679,13 +616,10 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp \COPY composite_partition_column_table FROM STDIN WITH (FORMAT 'csv'); WARNING: function min(number_pack) does not exist HINT: No function matches the given name and argument types. You might need to add explicit type casts. -CONTEXT: while executing command on localhost:57637 -WARNING: function min(number_pack) does not exist -HINT: No function matches the given name and argument types. You might need to add explicit type casts. CONTEXT: while executing command on localhost:57638 -WARNING: could not get statistics for shard public.composite_partition_column_table_560164 +WARNING: could not get statistics for shard public.composite_partition_column_table_560162 DETAIL: Setting shard statistics to NULL -ERROR: failure on connection marked as essential: localhost:57637 +ERROR: failure on connection marked as essential: localhost:57638 -- Test copy on append distributed tables do not create shards on removed workers CREATE TABLE numbers_append (a int, b int); SELECT master_create_distributed_table('numbers_append', 'a', 'append'); @@ -696,7 +630,7 @@ SELECT master_create_distributed_table('numbers_append', 'a', 'append'); -- no shards is created yet SELECT shardid, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; shardid | nodename | nodeport ---------+----------+---------- @@ -706,15 +640,13 @@ COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); -- verify there are shards at both workers SELECT shardid, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; shardid | nodename | nodeport ---------+-----------+---------- - 560165 | localhost | 57637 - 560165 | localhost | 57638 - 560166 | localhost | 57638 - 560166 | localhost | 57637 -(4 rows) + 560163 | localhost | 57637 + 560164 | localhost | 57638 +(2 rows) -- disable the first node SELECT master_disable_node('localhost', :worker_1_port); @@ -725,23 +657,21 @@ NOTICE: Node localhost:57637 has active shard placements. Some queries may fail (1 row) -- set replication factor to 1 so that copy will --- succeed without replication count error +-- succeed without replication count error SET citus.shard_replication_factor TO 1; -- add two new shards and verify they are created at the other node COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); SELECT shardid, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; shardid | nodename | nodeport ---------+-----------+---------- - 560165 | localhost | 57637 + 560163 | localhost | 57637 + 560164 | localhost | 57638 560165 | localhost | 57638 560166 | localhost | 57638 - 560166 | localhost | 57637 - 560167 | localhost | 57638 - 560168 | localhost | 57638 -(6 rows) +(4 rows) -- add the node back SELECT 1 FROM master_activate_node('localhost', :worker_1_port); @@ -762,21 +692,19 @@ RESET citus.shard_replication_factor; COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); COPY numbers_append FROM STDIN WITH (FORMAT 'csv'); SELECT shardid, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; shardid | nodename | nodeport ---------+-----------+---------- - 560165 | localhost | 57637 + 560163 | localhost | 57637 + 560164 | localhost | 57638 560165 | localhost | 57638 560166 | localhost | 57638 - 560166 | localhost | 57637 + 560167 | localhost | 57637 560167 | localhost | 57638 560168 | localhost | 57638 - 560169 | localhost | 57637 - 560169 | localhost | 57638 - 560170 | localhost | 57638 - 560170 | localhost | 57637 -(10 rows) + 560168 | localhost | 57637 +(8 rows) DROP TABLE numbers_append; -- Test copy failures against connection failures @@ -803,18 +731,18 @@ SELECT create_distributed_table('numbers_hash', 'a'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -- verify each placement is active SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- + 560169 | 1 | localhost | 57637 + 560169 | 1 | localhost | 57638 + 560170 | 1 | localhost | 57637 + 560170 | 1 | localhost | 57638 560171 | 1 | localhost | 57637 560171 | 1 | localhost | 57638 560172 | 1 | localhost | 57637 560172 | 1 | localhost | 57638 - 560173 | 1 | localhost | 57637 - 560173 | 1 | localhost | 57638 - 560174 | 1 | localhost | 57637 - 560174 | 1 | localhost | 57638 (8 rows) -- create a reference table @@ -835,18 +763,18 @@ SELECT create_distributed_table('numbers_hash_other', 'a'); (1 row) SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- + 560174 | 1 | localhost | 57637 + 560174 | 1 | localhost | 57638 + 560175 | 1 | localhost | 57637 + 560175 | 1 | localhost | 57638 560176 | 1 | localhost | 57637 560176 | 1 | localhost | 57638 560177 | 1 | localhost | 57637 560177 | 1 | localhost | 57638 - 560178 | 1 | localhost | 57637 - 560178 | 1 | localhost | 57638 - 560179 | 1 | localhost | 57637 - 560179 | 1 | localhost | 57638 (8 rows) -- manually corrupt pg_dist_shard such that both copies of one shard is placed in @@ -874,18 +802,18 @@ DETAIL: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash, line 6: "6,6" -- verify shards in the first worker as marked invalid SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- + 560169 | 3 | localhost | 57637 + 560169 | 1 | localhost | 57638 + 560170 | 3 | localhost | 57637 + 560170 | 1 | localhost | 57638 560171 | 3 | localhost | 57637 560171 | 1 | localhost | 57638 560172 | 3 | localhost | 57637 560172 | 1 | localhost | 57638 - 560173 | 3 | localhost | 57637 - 560173 | 1 | localhost | 57638 - 560174 | 3 | localhost | 57637 - 560174 | 1 | localhost | 57638 (8 rows) -- try to insert into a reference table copy should fail @@ -895,12 +823,12 @@ DETAIL: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_reference, line 1: "3,1" -- verify shards for reference table are still valid SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_reference'::regclass order by placementid; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- - 560175 | 1 | localhost | 57637 - 560175 | 1 | localhost | 57638 + 560173 | 1 | localhost | 57637 + 560173 | 1 | localhost | 57638 (2 rows) -- try to insert into numbers_hash_other. copy should fail and rollback @@ -912,25 +840,25 @@ DETAIL: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash_other, line 1: "1,1" WARNING: connection error: localhost:57637 DETAIL: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash_other, line 1: "1,1" -ERROR: connection error: localhost:57637 +CONTEXT: COPY numbers_hash_other, line 2: "2,2" +WARNING: connection error: localhost:57637 DETAIL: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash_other, line 1: "1,1" +CONTEXT: COPY numbers_hash_other, line 3: "3,3" -- verify shards for numbers_hash_other are still valid -- since copy has failed altogether SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- + 560174 | 3 | localhost | 57637 + 560174 | 1 | localhost | 57638 + 560175 | 3 | localhost | 57637 + 560175 | 1 | localhost | 57638 560176 | 1 | localhost | 57637 560176 | 1 | localhost | 57637 - 560177 | 1 | localhost | 57637 + 560177 | 3 | localhost | 57637 560177 | 1 | localhost | 57638 - 560178 | 1 | localhost | 57637 - 560178 | 1 | localhost | 57638 - 560179 | 1 | localhost | 57637 - 560179 | 1 | localhost | 57638 (8 rows) -- re-enable test_user on the first worker @@ -961,7 +889,7 @@ ALTER TABLE numbers_hash_560180 DROP COLUMN b; COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); ERROR: column "b" of relation "numbers_hash_560180" does not exist CONTEXT: while executing command on localhost:57637 -COPY numbers_hash, line 1: "1,1" +COPY numbers_hash, line 6: "6,6" -- verify no row is inserted SELECT count(a) FROM numbers_hash; count @@ -971,18 +899,18 @@ SELECT count(a) FROM numbers_hash; -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport - FROM pg_dist_shard_placement join pg_dist_shard using(shardid) + FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; shardid | shardstate | nodename | nodeport ---------+------------+-----------+---------- + 560178 | 1 | localhost | 57637 + 560178 | 1 | localhost | 57638 + 560179 | 1 | localhost | 57637 + 560179 | 1 | localhost | 57638 560180 | 1 | localhost | 57637 560180 | 1 | localhost | 57638 560181 | 1 | localhost | 57637 560181 | 1 | localhost | 57638 - 560182 | 1 | localhost | 57637 - 560182 | 1 | localhost | 57638 - 560183 | 1 | localhost | 57637 - 560183 | 1 | localhost | 57638 (8 rows) DROP TABLE numbers_hash;