Remove copy from worker for append-partitioned table

pull/3261/head
Marco Slot 2019-12-01 23:17:37 +01:00 committed by Hadi Moshayedi
parent 5ec644c691
commit 90056f7d3c
4 changed files with 91 additions and 725 deletions

View File

@ -8,9 +8,6 @@
* COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input
* from stdin, a program, or a file, and decides to copy new rows to existing
* shards or new shards based on the partition method of the distributed table.
* If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which
* parses the master node copy options and handles communication with the master
* node.
*
* If this is the first command in the transaction, we open a new connection for
* every shard placement. Otherwise we open as many connections as we can to
@ -102,9 +99,6 @@
/* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */
static MultiConnection *masterConnection = NULL;
/*
* Data size threshold to switch over the active placement for a connection.
* If this is too low, overhead of starting COPY commands will hurt the
@ -196,19 +190,14 @@ typedef struct ShardConnections
/* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool
stopOnFailure,
bool useBinaryCopyFormat);
static bool BinaryOutputFunctionDefined(Oid typeId);
static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteActiveShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
@ -222,11 +211,7 @@ static void ReportCopyError(MultiConnection *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static int64 StartCopyToNewShard(ShardConnections *shardConnections,
CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName);
static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId);
static Oid TypeForColumnName(Oid relationId, TupleDesc tupleDescriptor, char *columnName);
static Oid * TypeArrayFromTupleDescriptor(TupleDesc tupleDescriptor);
@ -236,11 +221,8 @@ static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor,
Oid *finalColumnTypeArray);
static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
bool binaryFormat);
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool CopyStatementHasFormat(CopyStmt *copyStatement, char *formatName);
static bool IsCopyFromWorker(CopyStmt *copyStatement);
static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
static HTAB * CreateConnectionStateHash(MemoryContext memoryContext);
static HTAB * CreateShardStateHash(MemoryContext memoryContext);
@ -321,106 +303,31 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
}
}
masterConnection = NULL; /* reset, might still be set after error */
bool isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker)
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
/* disallow modifications to a partition table which have rep. factor > 1 */
EnsurePartitionTableNotReplicated(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE ||
partitionMethod == DISTRIBUTE_BY_NONE)
{
CopyFromWorkerNode(copyStatement, completionTag);
CopyToExistingShards(copyStatement, completionTag);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
CopyToNewShards(copyStatement, completionTag, relationId);
}
else
{
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
/* disallow modifications to a partition table which have rep. factor > 1 */
EnsurePartitionTableNotReplicated(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_RANGE || partitionMethod == DISTRIBUTE_BY_NONE)
{
CopyToExistingShards(copyStatement, completionTag);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
CopyToNewShards(copyStatement, completionTag, relationId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
XactModificationLevel = XACT_MODIFICATION_DATA;
}
/*
* IsCopyFromWorker checks if the given copy statement has the master host option.
*/
static bool
IsCopyFromWorker(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
{
return true;
}
}
return false;
}
/*
* CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes
* for append-partitioned tables.
*/
static void
CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
{
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
uint32 connectionFlags = FOR_DML;
masterConnection = GetNodeConnection(connectionFlags, nodeName, nodePort);
MarkRemoteTransactionCritical(masterConnection);
ClaimConnectionExclusively(masterConnection);
RemoteTransactionBeginIfNecessary(masterConnection);
/* strip schema name for local reference */
char *schemaName = copyStatement->relation->schemaname;
copyStatement->relation->schemaname = NULL;
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
/* put schema name back */
copyStatement->relation->schemaname = schemaName;
char partitionMethod = MasterPartitionMethod(copyStatement->relation);
if (partitionMethod != DISTRIBUTE_BY_APPEND)
{
ereport(ERROR, (errmsg("copy from worker nodes is only supported "
"for append-partitioned tables")));
}
/*
* Remove master node options from the copy statement because they are not
* recognized by PostgreSQL machinery.
*/
RemoveMasterOptions(copyStatement);
CopyToNewShards(copyStatement, completionTag, relationId);
UnclaimConnection(masterConnection);
masterConnection = NULL;
}
/*
* CopyToExistingShards implements the COPY table_name FROM ... for hash or
* range-partitioned tables where there are already shards into which to copy
@ -737,7 +644,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
}
EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId);
UpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0;
currentShardId = INVALID_SHARD_ID;
@ -761,7 +668,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
shardConnections->connectionList);
}
EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId);
UpdateShardStatistics(shardConnections->shardId);
}
EndCopyFrom(copyState);
@ -778,119 +685,6 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
}
/*
* MasterNodeAddress gets the master node address from copy options and returns
* it. Note that if the master_port is not provided, we use 5432 as the default
* port.
*/
static NodeAddress *
MasterNodeAddress(CopyStmt *copyStatement)
{
NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress));
char *nodeName = NULL;
/* set default port to 5432 */
int32 nodePort = 5432;
ListCell *optionCell = NULL;
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
{
nodeName = defGetString(defel);
}
else if (strncmp(defel->defname, "master_port", NAMEDATALEN) == 0)
{
nodePort = defGetInt32(defel);
}
}
masterNodeAddress->nodeName = nodeName;
masterNodeAddress->nodePort = nodePort;
return masterNodeAddress;
}
/*
* MasterPartitionMethod gets the partition method of the given relation from
* the master node and returns it.
*/
static char
MasterPartitionMethod(RangeVar *relation)
{
char partitionMethod = '\0';
bool raiseInterrupts = true;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
if (!SendRemoteCommand(masterConnection, partitionMethodCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
if (partitionMethodString == NULL || (*partitionMethodString) == '\0')
{
ereport(ERROR, (errmsg("could not find a partition method for the "
"table %s", relationName)));
}
partitionMethod = partitionMethodString[0];
}
else
{
ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not get the partition method of the "
"distributed table")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return partitionMethod;
}
/*
* RemoveMasterOptions removes master node related copy options from the option
* list of the copy statement.
*/
static void
RemoveMasterOptions(CopyStmt *copyStatement)
{
List *newOptionList = NIL;
ListCell *optionCell = NULL;
/* walk over the list of all options */
foreach(optionCell, copyStatement->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
/* skip master related options */
if ((strncmp(option->defname, "master_host", NAMEDATALEN) == 0) ||
(strncmp(option->defname, "master_port", NAMEDATALEN) == 0))
{
continue;
}
newOptionList = lappend(newOptionList, option);
}
copyStatement->options = newOptionList;
}
/*
* OpenCopyConnectionsForNewShards opens a connection for each placement of a shard and
* starts a COPY transaction if necessary. If a connection cannot be opened,
@ -918,7 +712,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
/* release active placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
List *activePlacementList = MasterShardPlacementList(shardId);
List *activePlacementList = ActiveShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext);
@ -988,8 +782,7 @@ OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for reference tables and
* copy from worker nodes.
* never reach to this point. This is the case for reference tables.
*/
Assert(!stopOnFailure || failedPlacementCount == 0);
@ -1096,87 +889,6 @@ BinaryOutputFunctionDefined(Oid typeId)
}
/*
* MasterShardPlacementList dispatches the active shard placements call
* between local or remote master node according to the master connection state.
*/
static List *
MasterShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
if (masterConnection == NULL)
{
activePlacementList = ActiveShardPlacementList(shardId);
}
else
{
activePlacementList = RemoteActiveShardPlacementList(shardId);
}
return activePlacementList;
}
/*
* RemoteActiveShardPlacementList gets the active shard placement list
* for the given shard id from the remote master node.
*/
static List *
RemoteActiveShardPlacementList(uint64 shardId)
{
List *activePlacementList = NIL;
bool raiseInterrupts = true;
StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, ACTIVE_SHARD_PLACEMENTS_QUERY, shardId);
if (!SendRemoteCommand(masterConnection, shardPlacementsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
int rowCount = PQntuples(queryResult);
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
char *placementIdString = PQgetvalue(queryResult, rowIndex, 0);
char *nodeName = pstrdup(PQgetvalue(queryResult, rowIndex, 1));
char *nodePortString = pstrdup(PQgetvalue(queryResult, rowIndex, 2));
uint32 nodePort = atoi(nodePortString);
uint64 placementId = atoll(placementIdString);
ShardPlacement *shardPlacement =
(ShardPlacement *) palloc0(sizeof(ShardPlacement));
shardPlacement->placementId = placementId;
shardPlacement->nodeName = nodeName;
shardPlacement->nodePort = nodePort;
/*
* We cannot know the nodeId, but it is not necessary at this point either.
* This is only used to to look up the connection for a group of co-located
* placements, but append-distributed tables are never co-located.
*/
shardPlacement->nodeId = -1;
activePlacementList = lappend(activePlacementList, shardPlacement);
}
}
else
{
ereport(ERROR, (errmsg("could not get shard placements from the master node")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return activePlacementList;
}
/* Send copy binary headers to given connections */
static void
SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId, List *connectionList)
@ -1838,7 +1550,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = MasterCreateEmptyShard(qualifiedName);
int64 shardId = CreateEmptyShard(qualifiedName);
bool stopOnFailure = true;
shardConnections->shardId = shardId;
@ -1853,27 +1565,6 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
}
/*
* MasterCreateEmptyShard dispatches the create empty shard call between local or
* remote master node according to the master connection state.
*/
static int64
MasterCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
if (masterConnection == NULL)
{
shardId = CreateEmptyShard(relationName);
}
else
{
shardId = RemoteCreateEmptyShard(relationName);
}
return shardId;
}
/*
* CreateEmptyShard creates a new shard and related shard placements from the
* local master node.
@ -1891,90 +1582,6 @@ CreateEmptyShard(char *relationName)
}
/*
* RemoteCreateEmptyShard creates a new shard and related shard placements from
* the remote master node.
*/
static int64
RemoteCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
bool raiseInterrupts = true;
StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
if (!SendRemoteCommand(masterConnection, createEmptyShardCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
char *shardIdStringEnd = NULL;
shardId = strtoul(shardIdString, &shardIdStringEnd, 0);
}
else
{
ReportResultError(masterConnection, queryResult, WARNING);
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
return shardId;
}
/*
* MasterUpdateShardStatistics dispatches the update shard statistics call
* between local or remote master node according to the master connection state.
*/
static void
MasterUpdateShardStatistics(uint64 shardId)
{
if (masterConnection == NULL)
{
UpdateShardStatistics(shardId);
}
else
{
RemoteUpdateShardStatistics(shardId);
}
}
/*
* RemoteUpdateShardStatistics updates shard statistics on the remote master node.
*/
static void
RemoteUpdateShardStatistics(uint64 shardId)
{
bool raiseInterrupts = true;
StringInfo updateShardStatisticsCommand = makeStringInfo();
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId);
if (!SendRemoteCommand(masterConnection, updateShardStatisticsCommand->data))
{
ReportConnectionError(masterConnection, ERROR);
}
PGresult *queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{
ereport(ERROR, (errmsg("could not update shard statistics")));
}
PQclear(queryResult);
queryResult = GetRemoteCommandResult(masterConnection, raiseInterrupts);
Assert(!queryResult);
}
/* *INDENT-OFF* */
/* Append data to the copy buffer in outputState */
static void
@ -2754,46 +2361,25 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
*/
if (copyStatement->relation != NULL)
{
bool isDistributedRelation = false;
bool isCopyFromWorker = IsCopyFromWorker(copyStatement);
bool isFrom = copyStatement->is_from;
if (isCopyFromWorker)
{
RangeVar *relation = copyStatement->relation;
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
/* consider using RangeVarGetRelidExtended to check perms before locking */
Relation copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock :
AccessShareLock);
CreateLocalTable(relation, nodeName, nodePort);
bool isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
/*
* We expect copy from worker to be on a distributed table; otherwise,
* it fails in CitusCopyFrom() while checking the partition method.
*/
isDistributedRelation = true;
}
else
{
bool isFrom = copyStatement->is_from;
/* ensure future lookups hit the same relation */
char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation));
/* consider using RangeVarGetRelidExtended to check perms before locking */
Relation copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock :
AccessShareLock);
/* ensure we copy string into proper context */
MemoryContext relationContext = GetMemoryChunkContext(
copyStatement->relation);
schemaName = MemoryContextStrdup(relationContext, schemaName);
copyStatement->relation->schemaname = schemaName;
isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
/* ensure future lookups hit the same relation */
char *schemaName = get_namespace_name(RelationGetNamespace(copiedRelation));
/* ensure we copy string into proper context */
MemoryContext relationContext = GetMemoryChunkContext(
copyStatement->relation);
schemaName = MemoryContextStrdup(relationContext, schemaName);
copyStatement->relation->schemaname = schemaName;
heap_close(copiedRelation, NoLock);
}
heap_close(copiedRelation, NoLock);
if (isDistributedRelation)
{
@ -2808,11 +2394,7 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
#endif
/* check permissions, we're bypassing postgres' normal checks */
if (!isCopyFromWorker)
{
CheckCopyPermissions(copyStatement);
}
CheckCopyPermissions(copyStatement);
CitusCopyFrom(copyStatement, completionTag);
return NULL;
}
@ -2898,85 +2480,6 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryS
}
/*
* CreateLocalTable gets DDL commands from the remote node for the given
* relation. Then, it creates the local relation as temporary and on commit drop.
*/
static void
CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
{
ListCell *ddlCommandCell = NULL;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName);
/*
* The warning message created in TableDDLCommandList() is descriptive
* enough; therefore, we just throw an error which says that we could not
* run the copy operation.
*/
List *ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName);
if (ddlCommandList == NIL)
{
ereport(ERROR, (errmsg("could not run copy from the worker node")));
}
/* apply DDL commands against the local database */
foreach(ddlCommandCell, ddlCommandList)
{
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
bool applyDDLCommand = false;
if (IsA(ddlCommandNode, CreateStmt) ||
IsA(ddlCommandNode, CreateForeignTableStmt))
{
CreateStmt *createStatement = (CreateStmt *) ddlCommandNode;
/* create the local relation as temporary and on commit drop */
createStatement->relation->relpersistence = RELPERSISTENCE_TEMP;
createStatement->oncommit = ONCOMMIT_DROP;
/* temporarily strip schema name */
createStatement->relation->schemaname = NULL;
applyDDLCommand = true;
}
else if (IsA(ddlCommandNode, CreateForeignServerStmt))
{
CreateForeignServerStmt *createServerStmt =
(CreateForeignServerStmt *) ddlCommandNode;
if (GetForeignServerByName(createServerStmt->servername, true) == NULL)
{
/* create server if not exists */
applyDDLCommand = true;
}
}
else if ((IsA(ddlCommandNode, CreateExtensionStmt)))
{
applyDDLCommand = true;
}
else if ((IsA(ddlCommandNode, CreateSeqStmt)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy to table with serial column from worker"),
errhint("Connect to the master node to COPY to tables which "
"use serial column types.")));
}
/* run only a selected set of DDL commands */
if (applyDDLCommand)
{
CitusProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode),
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
CommandCounterIncrement();
}
}
}
/*
* Check whether the current user has the permission to execute a COPY
* statement, raise ERROR if not. In some cases we have to do this separately
@ -3251,7 +2754,7 @@ InitializeCopyShardState(CopyShardState *shardState,
/* release active placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
List *activePlacementList = MasterShardPlacementList(shardId);
List *activePlacementList = ActiveShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext);
@ -3307,8 +2810,7 @@ InitializeCopyShardState(CopyShardState *shardState,
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for reference tables and
* copy from worker nodes.
* never reach to this point. This is the case for reference tables.
*/
Assert(!stopOnFailure || failedPlacementCount == 0);

View File

@ -70,13 +70,6 @@
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s CASCADE"
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s"
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
#define ACTIVE_SHARD_PLACEMENTS_QUERY \
"SELECT placementid, nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = " \
INT64_FORMAT
#define UPDATE_SHARD_STATISTICS_QUERY \
"SELECT master_update_shard_statistics(" INT64_FORMAT ")"
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
/* Enumeration that defines the shard placement policy to use while staging */
typedef enum

View File

@ -318,56 +318,6 @@ ORDER BY
LIMIT
5;
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
CREATE INDEX ON customer_worker_copy_append (c_name);
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Make sure we don't use 2PC when connecting to master, even if requested
BEGIN;
SET LOCAL citus.multi_shard_commit_protocol TO '2pc';
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
COMMIT;
-- Test if there is no relation to copy data with the worker copy
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
-- Test schema support on append partitioned tables
CREATE SCHEMA append;
CREATE TABLE append.customer_copy (
@ -384,14 +334,7 @@ SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'app
-- Test copy from the master node
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
-- Test copy from the worker node
\c - - - 57637
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|');
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;

View File

@ -32,7 +32,7 @@ SELECT master_create_worker_shards('customer_copy_hash', 64, 1);
-- Test empty copy
COPY customer_copy_hash FROM STDIN;
-- Test syntax error
-- Test syntax error
COPY customer_copy_hash (c_custkey,c_name) FROM STDIN;
ERROR: invalid input syntax for integer: "1,customer1"
CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1"
@ -426,65 +426,6 @@ LIMIT
560137 | 57637
(5 rows)
-- Ensure that copy from worker node of table with serial column fails
CREATE TABLE customer_worker_copy_append_seq (id integer, seq serial);
SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
ERROR: relation "public.customer_worker_copy_append_seq_seq_seq" does not exist
-- Connect back to the master node
\c - - - 57636
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
CREATE INDEX ON customer_worker_copy_append (c_name);
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
WARNING: table "customer_worker_copy_append" has a UNIQUE or EXCLUDE constraint
DETAIL: UNIQUE constraints, EXCLUDE constraints, and PRIMARY KEYs on append-partitioned tables cannot be enforced.
HINT: Consider using hash partitioning.
master_create_distributed_table
---------------------------------
(1 row)
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Make sure we don't use 2PC when connecting to master, even if requested
BEGIN;
SET LOCAL citus.multi_shard_commit_protocol TO '2pc';
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
COMMIT;
-- Test if there is no relation to copy data with the worker copy
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
WARNING: relation "lineitem_copy_none" does not exist
CONTEXT: while executing command on localhost:57636
ERROR: could not run copy from the worker node
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
min | max | avg | count
-----+------+-----------------------+-------
1 | 7000 | 4443.8028800000000000 | 2000
(1 row)
-- Test schema support on append partitioned tables
CREATE SCHEMA append;
CREATE TABLE append.customer_copy (
@ -504,11 +445,7 @@ SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'app
-- Test copy from the master node
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
-- Test copy from the worker node
\c - - - 57637
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|');
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
min | max | avg | count
@ -679,13 +616,10 @@ SELECT master_create_distributed_table('composite_partition_column_table', 'comp
\COPY composite_partition_column_table FROM STDIN WITH (FORMAT 'csv');
WARNING: function min(number_pack) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:57637
WARNING: function min(number_pack) does not exist
HINT: No function matches the given name and argument types. You might need to add explicit type casts.
CONTEXT: while executing command on localhost:57638
WARNING: could not get statistics for shard public.composite_partition_column_table_560164
WARNING: could not get statistics for shard public.composite_partition_column_table_560162
DETAIL: Setting shard statistics to NULL
ERROR: failure on connection marked as essential: localhost:57637
ERROR: failure on connection marked as essential: localhost:57638
-- Test copy on append distributed tables do not create shards on removed workers
CREATE TABLE numbers_append (a int, b int);
SELECT master_create_distributed_table('numbers_append', 'a', 'append');
@ -696,7 +630,7 @@ SELECT master_create_distributed_table('numbers_append', 'a', 'append');
-- no shards is created yet
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+----------+----------
@ -706,15 +640,13 @@ COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
-- verify there are shards at both workers
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
(4 rows)
560163 | localhost | 57637
560164 | localhost | 57638
(2 rows)
-- disable the first node
SELECT master_disable_node('localhost', :worker_1_port);
@ -725,23 +657,21 @@ NOTICE: Node localhost:57637 has active shard placements. Some queries may fail
(1 row)
-- set replication factor to 1 so that copy will
-- succeed without replication count error
-- succeed without replication count error
SET citus.shard_replication_factor TO 1;
-- add two new shards and verify they are created at the other node
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560163 | localhost | 57637
560164 | localhost | 57638
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
560167 | localhost | 57638
560168 | localhost | 57638
(6 rows)
(4 rows)
-- add the node back
SELECT 1 FROM master_activate_node('localhost', :worker_1_port);
@ -762,21 +692,19 @@ RESET citus.shard_replication_factor;
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
COPY numbers_append FROM STDIN WITH (FORMAT 'csv');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_append'::regclass order by placementid;
shardid | nodename | nodeport
---------+-----------+----------
560165 | localhost | 57637
560163 | localhost | 57637
560164 | localhost | 57638
560165 | localhost | 57638
560166 | localhost | 57638
560166 | localhost | 57637
560167 | localhost | 57637
560167 | localhost | 57638
560168 | localhost | 57638
560169 | localhost | 57637
560169 | localhost | 57638
560170 | localhost | 57638
560170 | localhost | 57637
(10 rows)
560168 | localhost | 57637
(8 rows)
DROP TABLE numbers_append;
-- Test copy failures against connection failures
@ -803,18 +731,18 @@ SELECT create_distributed_table('numbers_hash', 'a');
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
-- verify each placement is active
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560169 | 1 | localhost | 57637
560169 | 1 | localhost | 57638
560170 | 1 | localhost | 57637
560170 | 1 | localhost | 57638
560171 | 1 | localhost | 57637
560171 | 1 | localhost | 57638
560172 | 1 | localhost | 57637
560172 | 1 | localhost | 57638
560173 | 1 | localhost | 57637
560173 | 1 | localhost | 57638
560174 | 1 | localhost | 57637
560174 | 1 | localhost | 57638
(8 rows)
-- create a reference table
@ -835,18 +763,18 @@ SELECT create_distributed_table('numbers_hash_other', 'a');
(1 row)
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560174 | 1 | localhost | 57637
560174 | 1 | localhost | 57638
560175 | 1 | localhost | 57637
560175 | 1 | localhost | 57638
560176 | 1 | localhost | 57637
560176 | 1 | localhost | 57638
560177 | 1 | localhost | 57637
560177 | 1 | localhost | 57638
560178 | 1 | localhost | 57637
560178 | 1 | localhost | 57638
560179 | 1 | localhost | 57637
560179 | 1 | localhost | 57638
(8 rows)
-- manually corrupt pg_dist_shard such that both copies of one shard is placed in
@ -874,18 +802,18 @@ DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 6: "6,6"
-- verify shards in the first worker as marked invalid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560169 | 3 | localhost | 57637
560169 | 1 | localhost | 57638
560170 | 3 | localhost | 57637
560170 | 1 | localhost | 57638
560171 | 3 | localhost | 57637
560171 | 1 | localhost | 57638
560172 | 3 | localhost | 57637
560172 | 1 | localhost | 57638
560173 | 3 | localhost | 57637
560173 | 1 | localhost | 57638
560174 | 3 | localhost | 57637
560174 | 1 | localhost | 57638
(8 rows)
-- try to insert into a reference table copy should fail
@ -895,12 +823,12 @@ DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_reference, line 1: "3,1"
-- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_reference'::regclass order by placementid;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560175 | 1 | localhost | 57637
560175 | 1 | localhost | 57638
560173 | 1 | localhost | 57637
560173 | 1 | localhost | 57638
(2 rows)
-- try to insert into numbers_hash_other. copy should fail and rollback
@ -912,25 +840,25 @@ DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
ERROR: connection error: localhost:57637
CONTEXT: COPY numbers_hash_other, line 2: "2,2"
WARNING: connection error: localhost:57637
DETAIL: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
CONTEXT: COPY numbers_hash_other, line 3: "3,3"
-- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560174 | 3 | localhost | 57637
560174 | 1 | localhost | 57638
560175 | 3 | localhost | 57637
560175 | 1 | localhost | 57638
560176 | 1 | localhost | 57637
560176 | 1 | localhost | 57637
560177 | 1 | localhost | 57637
560177 | 3 | localhost | 57637
560177 | 1 | localhost | 57638
560178 | 1 | localhost | 57637
560178 | 1 | localhost | 57638
560179 | 1 | localhost | 57637
560179 | 1 | localhost | 57638
(8 rows)
-- re-enable test_user on the first worker
@ -961,7 +889,7 @@ ALTER TABLE numbers_hash_560180 DROP COLUMN b;
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
ERROR: column "b" of relation "numbers_hash_560180" does not exist
CONTEXT: while executing command on localhost:57637
COPY numbers_hash, line 1: "1,1"
COPY numbers_hash, line 6: "6,6"
-- verify no row is inserted
SELECT count(a) FROM numbers_hash;
count
@ -971,18 +899,18 @@ SELECT count(a) FROM numbers_hash;
-- verify shard is still marked as valid
SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport;
shardid | shardstate | nodename | nodeport
---------+------------+-----------+----------
560178 | 1 | localhost | 57637
560178 | 1 | localhost | 57638
560179 | 1 | localhost | 57637
560179 | 1 | localhost | 57638
560180 | 1 | localhost | 57637
560180 | 1 | localhost | 57638
560181 | 1 | localhost | 57637
560181 | 1 | localhost | 57638
560182 | 1 | localhost | 57637
560182 | 1 | localhost | 57638
560183 | 1 | localhost | 57637
560183 | 1 | localhost | 57638
(8 rows)
DROP TABLE numbers_hash;