Merge pull request #468 from citusdata/feature/worker-copy-for-append-partitioning

Add COPY support on worker nodes for append partitioned relations

CR: @marcocitus
pull/485/head
Metin Döşlü 2016-05-03 16:08:27 +03:00
commit 2db30af07f
14 changed files with 794 additions and 82 deletions

View File

@ -5,7 +5,7 @@ citus_top_builddir = ../../..
MODULE_big = citus
EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1
EXTVERSIONS = 5.0 5.0-1 5.0-2
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
# Generated files for each version
@ -29,6 +29,8 @@ $(EXTENSION)--5.0.sql: $(EXTENSION).sql
cat $^ > $@
$(EXTENSION)--5.0-1.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql
cat $^ > $@
$(EXTENSION)--5.0-2.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql $(EXTENSION)--5.0-1--5.0-2.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,8 @@
/* citus--5.0-1--5.0-2.sql */
CREATE FUNCTION master_update_shard_statistics(shard_id bigint)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_update_shard_statistics$$;
COMMENT ON FUNCTION master_update_shard_statistics(bigint)
IS 'updates shard statistics and returns the updated shard size';

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '5.0-1'
default_version = '5.0-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -4,22 +4,25 @@
* This file contains implementation of COPY utility for distributed
* tables.
*
* The CitusCopyFrom function should be called from the utility hook to
* process COPY ... FROM commands on distributed tables. CitusCopyFrom
* parses the input from stdin, a program executed on the master, or a file
* on the master, and decides to copy new rows to existing shards or new shards
* based on the partition method of the distributed table.
* The CitusCopyFrom function should be called from the utility hook to process
* COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input
* from stdin, a program, or a file, and decides to copy new rows to existing
* shards or new shards based on the partition method of the distributed table.
* If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which
* parses the master node copy options and handles communication with the master
* node.
*
* It opens a new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
* the workers will ingest data at least partially in parallel.
*
* When failing to connect to a worker, the master marks the placement for
* which it was trying to open a connection as inactive, similar to the way
* DML statements are handled. If a failure occurs after connecting, the
* transaction is rolled back on all the workers. Note that, if the underlying
* table is append-partitioned, metadata changes are rolled back on the master
* node, but shard placements are left on the workers.
* For hash-partitioned tables, if it fails to connect to a worker, the master
* marks the placement for which it was trying to open a connection as inactive,
* similar to the way DML statements are handled. If a failure occurs after
* connecting, the transaction is rolled back on all the workers. Note that,
* in the case of append-partitioned tables, if a fail occurs, immediately
* metadata changes are rolled back on the master node, but shard placements
* are left on the worker nodes.
*
* By default, COPY uses normal transactions on the workers. In the case of
* hash or range-partitioned tables, this can cause a problem when some of the
@ -30,9 +33,9 @@
* shards are created and in the case of failure, metadata changes are rolled
* back on the master node.
*
* Parsing options are processed and enforced on the master, while
* constraints are enforced on the worker. In either case, failure causes
* the whole COPY to roll back.
* Parsing options are processed and enforced on the node where copy command
* is run, while constraints are enforced on the worker. In either case,
* failure causes the whole COPY to roll back.
*
* Copyright (c) 2016, Citus Data, Inc.
*
@ -128,6 +131,9 @@
/* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */
static PGconn *masterConnection = NULL;
/* ShardConnections represents a set of connections for each placement of a shard */
typedef struct ShardConnections
@ -138,8 +144,11 @@ typedef struct ShardConnections
/* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
static void LockAllShards(List *shardIntervalList);
static HTAB * CreateShardConnectionHash(void);
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
@ -147,7 +156,9 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
int64 shardId,
bool *shardConnectionsFound);
static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections);
ShardConnections *shardConnections, bool stopOnFailure);
static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
@ -159,8 +170,13 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections,
Oid relationId, CopyStmt *copyStatement);
CopyStmt *copyStatement);
static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName);
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
static void MasterUpdateShardStatistics(uint64 shardId);
static void RemoteUpdateShardStatistics(uint64 shardId);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
@ -174,14 +190,13 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *
/*
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
* statement to related subfunctions based on the partition method of the
* distributed table.
* statement to related subfunctions based on where the copy command is run
* and the partition method of the distributed table.
*/
void
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = '\0';
bool isCopyFromWorker = false;
/* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser())
@ -204,23 +219,132 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
}
}
partitionMethod = PartitionMethod(tableId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker)
{
CopyToExistingShards(copyStatement, completionTag);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
CopyToNewShards(copyStatement, completionTag);
CopyFromWorkerNode(copyStatement, completionTag);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
DISTRIBUTE_BY_RANGE)
{
CopyToExistingShards(copyStatement, completionTag);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
CopyToNewShards(copyStatement, completionTag, relationId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
}
}
/*
* IsCopyFromWorker checks if the given copy statement has the master host option.
*/
bool
IsCopyFromWorker(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
{
return true;
}
}
return false;
}
/*
* CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes
* for append-partitioned tables.
*/
static void
CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
{
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
char *nodeUser = CurrentUserName();
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
PG_TRY();
{
PGresult *queryResult = NULL;
Oid relationId = InvalidOid;
char partitionMethod = 0;
/* strip schema name for local reference */
char *schemaName = copyStatement->relation->schemaname;
copyStatement->relation->schemaname = NULL;
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
/* put schema name back */
copyStatement->relation->schemaname = schemaName;
partitionMethod = MasterPartitionMethod(copyStatement->relation);
if (partitionMethod != DISTRIBUTE_BY_APPEND)
{
ereport(ERROR, (errmsg("copy from worker nodes is only supported "
"for append-partitioned tables")));
}
/* run all metadata commands in a transaction */
queryResult = PQexec(masterConnection, "BEGIN");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not start to update master node metadata")));
}
PQclear(queryResult);
/*
* Remove master node options from the copy statement because they are not
* recognized by PostgreSQL machinery.
*/
RemoveMasterOptions(copyStatement);
CopyToNewShards(copyStatement, completionTag, relationId);
/* commit metadata transactions */
queryResult = PQexec(masterConnection, "COMMIT");
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
{
ereport(ERROR, (errmsg("could not commit master node metadata changes")));
}
PQclear(queryResult);
/* close the connection */
PQfinish(masterConnection);
masterConnection = NULL;
}
PG_CATCH();
{
/* close the connection */
PQfinish(masterConnection);
masterConnection = NULL;
PG_RE_THROW();
}
PG_END_TRY();
}
/*
* CopyToExistingShards implements the COPY table_name FROM ... for hash or
* range-partitioned tables where there are already shards into which to copy
@ -420,7 +544,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
if (!shardConnectionsFound)
{
/* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections);
OpenCopyTransactions(copyStatement, shardConnections, false);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
@ -492,9 +616,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
* tables where we create new shards into which to copy rows.
*/
static void
CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
{
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
FmgrInfo *columnOutputFunctions = NULL;
/* allocate column values and nulls arrays */
@ -562,7 +685,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
if (!nextRowFound)
{
/* switch to regular memory context and stop showing line number in errors */
MemoryContextSwitchTo(oldContext);
error_context_stack = errorCallback.previous;
break;
}
@ -581,7 +706,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
if (copiedDataSizeInBytes == 0)
{
/* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, relationId, copyStatement);
StartCopyToNewShard(shardConnections, copyStatement);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
@ -606,7 +731,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
FinalizeCopyToNewShard(shardConnections);
UpdateShardStatistics(relationId, shardConnections->shardId);
MasterUpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0;
}
@ -624,7 +749,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
FinalizeCopyToNewShard(shardConnections);
UpdateShardStatistics(relationId, shardConnections->shardId);
MasterUpdateShardStatistics(shardConnections->shardId);
}
EndCopyFrom(copyState);
@ -652,6 +777,112 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
}
/*
* MasterNodeAddress gets the master node address from copy options and returns
* it. Note that if the master_port is not provided, we use 5432 as the default
* port.
*/
NodeAddress *
MasterNodeAddress(CopyStmt *copyStatement)
{
NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress));
char *nodeName = NULL;
/* set default port to 5432 */
int32 nodePort = 5432;
ListCell *optionCell = NULL;
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
{
nodeName = defGetString(defel);
}
else if (strncmp(defel->defname, "master_port", NAMEDATALEN) == 0)
{
nodePort = defGetInt32(defel);
}
}
masterNodeAddress->nodeName = nodeName;
masterNodeAddress->nodePort = nodePort;
return masterNodeAddress;
}
/*
* MasterPartitionMethod gets the partition method of the given relation from
* the master node and returns it.
*/
static char
MasterPartitionMethod(RangeVar *relation)
{
char partitionMethod = '\0';
PGresult *queryResult = NULL;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
StringInfo partitionMethodCommand = makeStringInfo();
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
queryResult = PQexec(masterConnection, partitionMethodCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
if (partitionMethodString == NULL || (*partitionMethodString) == '\0')
{
ereport(ERROR, (errmsg("could not find a partition method for the "
"table %s", relationName)));
}
partitionMethod = partitionMethodString[0];
}
else
{
ReportRemoteError(masterConnection, queryResult);
ereport(ERROR, (errmsg("could not get the partition method of the "
"distributed table")));
}
PQclear(queryResult);
return partitionMethod;
}
/*
* RemoveMasterOptions removes master node related copy options from the option
* list of the copy statement.
*/
static void
RemoveMasterOptions(CopyStmt *copyStatement)
{
List *newOptionList = NIL;
ListCell *optionCell = NULL;
/* walk over the list of all options */
foreach(optionCell, copyStatement->options)
{
DefElem *option = (DefElem *) lfirst(optionCell);
/* skip master related options */
if ((strncmp(option->defname, "master_host", NAMEDATALEN) == 0) ||
(strncmp(option->defname, "master_port", NAMEDATALEN) == 0))
{
continue;
}
newOptionList = lappend(newOptionList, option);
}
copyStatement->options = newOptionList;
}
/*
* LockAllShards takes shared locks on the metadata and the data of all shards in
* shardIntervalList. This prevents concurrent placement changes and concurrent
@ -764,13 +995,15 @@ GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
* shard placements.
*/
static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections)
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure)
{
List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL;
ListCell *placementCell = NULL;
ListCell *failedPlacementCell = NULL;
List *connectionList = NULL;
int64 shardId = shardConnections->shardId;
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
"OpenCopyTransactions",
@ -781,7 +1014,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
/* release finalized placement list at the end of this function */
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
finalizedPlacementList = FinalizedShardPlacementList(shardConnections->shardId);
finalizedPlacementList = MasterShardPlacementList(shardId);
MemoryContextSwitchTo(oldContext);
@ -791,17 +1024,20 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
char *nodeName = placement->nodeName;
int nodePort = placement->nodePort;
char *nodeUser = CurrentUserName();
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
TransactionConnection *transactionConnection = NULL;
StringInfo copyCommand = NULL;
PGresult *result = NULL;
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
/* release failed placement list and copy command at the end of this function */
oldContext = MemoryContextSwitchTo(localContext);
if (connection == NULL)
{
if (stopOnFailure)
{
ereport(ERROR, (errmsg("could not open connection to %s:%d",
nodeName, nodePort)));
}
failedPlacementList = lappend(failedPlacementList, placement);
continue;
}
@ -811,9 +1047,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
{
ReportRemoteError(connection, result);
failedPlacementList = lappend(failedPlacementList, placement);
PQclear(result);
continue;
}
PQclear(result);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId);
result = PQexec(connection, copyCommand->data);
@ -821,11 +1060,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
{
ReportRemoteError(connection, result);
failedPlacementList = lappend(failedPlacementList, placement);
PQclear(result);
continue;
}
/* preserve transaction connection in regular memory context */
MemoryContextSwitchTo(oldContext);
PQclear(result);
transactionConnection = palloc0(sizeof(TransactionConnection));
@ -842,6 +1082,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
ereport(ERROR, (errmsg("could not find any active placements")));
}
/*
* If stopOnFailure is true, we just error out and code execution should
* never reach to this point. This is the case for copy from worker nodes.
*/
Assert(!stopOnFailure || list_length(failedPlacementList) == 0);
/* otherwise, mark failed placements as inactive: they're stale */
foreach(failedPlacementCell, failedPlacementList)
{
@ -860,6 +1106,71 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
/*
* MasterShardPlacementList dispatches the finalized shard placements call
* between local or remote master node according to the master connection state.
*/
static List *
MasterShardPlacementList(uint64 shardId)
{
List *finalizedPlacementList = NIL;
if (masterConnection == NULL)
{
finalizedPlacementList = FinalizedShardPlacementList(shardId);
}
else
{
finalizedPlacementList = RemoteFinalizedShardPlacementList(shardId);
}
return finalizedPlacementList;
}
/*
* RemoteFinalizedShardPlacementList gets the finalized shard placement list
* for the given shard id from the remote master node.
*/
static List *
RemoteFinalizedShardPlacementList(uint64 shardId)
{
List *finalizedPlacementList = NIL;
PGresult *queryResult = NULL;
StringInfo shardPlacementsCommand = makeStringInfo();
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
queryResult = PQexec(masterConnection, shardPlacementsCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
int rowCount = PQntuples(queryResult);
int rowIndex = 0;
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
{
char *nodeName = PQgetvalue(queryResult, rowIndex, 0);
char *nodePortString = PQgetvalue(queryResult, rowIndex, 1);
uint32 nodePort = atoi(nodePortString);
ShardPlacement *shardPlacement =
(ShardPlacement *) palloc0(sizeof(ShardPlacement));
shardPlacement->nodeName = nodeName;
shardPlacement->nodePort = nodePort;
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
}
}
else
{
ereport(ERROR, (errmsg("could not get shard placements from the master node")));
}
return finalizedPlacementList;
}
/* Send copy binary headers to given connections */
static void
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
@ -1259,23 +1570,93 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* opens connections to shard placements.
*/
static void
StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId,
CopyStmt *copyStatement)
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
{
char *relationName = get_rel_name(relationId);
text *relationNameText = cstring_to_text(relationName);
Datum relationNameDatum = PointerGetDatum(relationNameText);
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
relationNameDatum);
char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = MasterCreateEmptyShard(qualifiedName);
int64 shardId = DatumGetInt64(shardIdDatum);
shardConnections->shardId = shardId;
list_free_deep(shardConnections->connectionList);
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections);
OpenCopyTransactions(copyStatement, shardConnections, true);
}
/*
* MasterCreateEmptyShard dispatches the create empty shard call between local or
* remote master node according to the master connection state.
*/
static int64
MasterCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
if (masterConnection == NULL)
{
shardId = CreateEmptyShard(relationName);
}
else
{
shardId = RemoteCreateEmptyShard(relationName);
}
return shardId;
}
/*
* CreateEmptyShard creates a new shard and related shard placements from the
* local master node.
*/
static int64
CreateEmptyShard(char *relationName)
{
int64 shardId = 0;
text *relationNameText = cstring_to_text(relationName);
Datum relationNameDatum = PointerGetDatum(relationNameText);
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
relationNameDatum);
shardId = DatumGetInt64(shardIdDatum);
return shardId;
}
/*
* RemoteCreateEmptyShard creates a new shard and related shard placements from
* the remote master node.
*/
static int64
RemoteCreateEmptyShard(char *relationName)
{
int64 shardId = 0;
PGresult *queryResult = NULL;
StringInfo createEmptyShardCommand = makeStringInfo();
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
queryResult = PQexec(masterConnection, createEmptyShardCommand->data);
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
{
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
char *shardIdStringEnd = NULL;
shardId = strtoul(shardIdString, &shardIdStringEnd, 0);
}
else
{
ReportRemoteError(masterConnection, queryResult);
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
}
PQclear(queryResult);
return shardId;
}
@ -1295,6 +1676,46 @@ FinalizeCopyToNewShard(ShardConnections *shardConnections)
}
/*
* MasterUpdateShardStatistics dispatches the update shard statistics call
* between local or remote master node according to the master connection state.
*/
static void
MasterUpdateShardStatistics(uint64 shardId)
{
if (masterConnection == NULL)
{
UpdateShardStatistics(shardId);
}
else
{
RemoteUpdateShardStatistics(shardId);
}
}
/*
* RemoteUpdateShardStatistics updates shard statistics on the remote master node.
*/
static void
RemoteUpdateShardStatistics(uint64 shardId)
{
PGresult *queryResult = NULL;
StringInfo updateShardStatisticsCommand = makeStringInfo();
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
shardId);
queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data);
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
{
ereport(ERROR, (errmsg("could not update shard statistics")));
}
PQclear(queryResult);
}
/* *INDENT-OFF* */
/* Append data to the copy buffer in outputState */
static void

View File

@ -11,6 +11,7 @@
#include "access/htup_details.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
@ -25,11 +26,13 @@
#include "distributed/transmit.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "foreign/foreign.h"
#include "executor/executor.h"
#include "parser/parser.h"
#include "parser/parse_utilcmd.h"
#include "storage/lmgr.h"
#include "tcop/pquery.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
@ -70,6 +73,7 @@ static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
/* Local functions forward declarations for helper functions */
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString);
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString,
@ -311,28 +315,52 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
*/
if (copyStatement->relation != NULL)
{
Relation copiedRelation = NULL;
bool isDistributedRelation = false;
bool isFrom = copyStatement->is_from;
bool isCopyFromWorker = IsCopyFromWorker(copyStatement);
/* consider using RangeVarGetRelidExtended to check perms before locking */
copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock : AccessShareLock);
if (isCopyFromWorker)
{
RangeVar *relation = copyStatement->relation;
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
CreateLocalTable(relation, nodeName, nodePort);
/* ensure future lookups hit the same relation */
copyStatement->relation->schemaname = get_namespace_name(
RelationGetNamespace(copiedRelation));
/*
* We expect copy from worker to be on a distributed table; otherwise,
* it fails in CitusCopyFrom() while checking the partition method.
*/
isDistributedRelation = true;
}
else
{
bool isFrom = copyStatement->is_from;
Relation copiedRelation = NULL;
heap_close(copiedRelation, NoLock);
/* consider using RangeVarGetRelidExtended to check perms before locking */
copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock : AccessShareLock);
isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
/* ensure future lookups hit the same relation */
copyStatement->relation->schemaname = get_namespace_name(
RelationGetNamespace(copiedRelation));
heap_close(copiedRelation, NoLock);
}
if (isDistributedRelation)
{
if (copyStatement->is_from)
{
/* check permissions, we're bypassing postgres' normal checks */
CheckCopyPermissions(copyStatement);
if (!isCopyFromWorker)
{
CheckCopyPermissions(copyStatement);
}
CitusCopyFrom(copyStatement, completionTag);
return NULL;
}
@ -834,6 +862,82 @@ ErrorIfDistributedRenameStmt(RenameStmt *renameStatement)
}
/*
* CreateLocalTable gets DDL commands from the remote node for the given
* relation. Then, it creates the local relation as temporary and on commit drop.
*/
static void
CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
{
List *ddlCommandList = NIL;
ListCell *ddlCommandCell = NULL;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
/* fetch the ddl commands needed to create the table */
StringInfo tableNameStringInfo = makeStringInfo();
appendStringInfoString(tableNameStringInfo, qualifiedName);
/*
* The warning message created in TableDDLCommandList() is descriptive
* enough; therefore, we just throw an error which says that we could not
* run the copy operation.
*/
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableNameStringInfo);
if (ddlCommandList == NIL)
{
ereport(ERROR, (errmsg("could not run copy from the worker node")));
}
/* apply DDL commands against the local database */
foreach(ddlCommandCell, ddlCommandList)
{
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
bool applyDDLCommand = false;
if (IsA(ddlCommandNode, CreateStmt) ||
IsA(ddlCommandNode, CreateForeignTableStmt))
{
CreateStmt *createStatement = (CreateStmt *) ddlCommandNode;
/* create the local relation as temporary and on commit drop */
createStatement->relation->relpersistence = RELPERSISTENCE_TEMP;
createStatement->oncommit = ONCOMMIT_DROP;
/* temporarily strip schema name */
createStatement->relation->schemaname = NULL;
applyDDLCommand = true;
}
else if (IsA(ddlCommandNode, CreateForeignServerStmt))
{
CreateForeignServerStmt *createServerStmt =
(CreateForeignServerStmt *) ddlCommandNode;
if (GetForeignServerByName(createServerStmt->servername, true) == NULL)
{
/* create server if not exists */
applyDDLCommand = true;
}
}
else if ((IsA(ddlCommandNode, CreateExtensionStmt)))
{
applyDDLCommand = true;
}
/* run only a selected set of DDL commands */
if (applyDDLCommand)
{
ProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode),
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
CommandCounterIncrement();
}
}
}
/*
* IsAlterTableRenameStmt returns true if the passed in RenameStmt operates on a
* distributed table or its objects. This includes:

View File

@ -54,6 +54,7 @@ static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid rela
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_empty_shard);
PG_FUNCTION_INFO_V1(master_append_table_to_shard);
PG_FUNCTION_INFO_V1(master_update_shard_statistics);
/*
@ -269,7 +270,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
RESUME_INTERRUPTS();
/* update shard statistics and get new shard size */
newShardSize = UpdateShardStatistics(relationId, shardId);
newShardSize = UpdateShardStatistics(shardId);
/* calculate ratio of current shard size compared to shard max size */
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
@ -279,6 +280,22 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
}
/*
* master_update_shard_statistics updates metadata (shard size and shard min/max
* values) of the given shard and returns the updated shard size.
*/
Datum
master_update_shard_statistics(PG_FUNCTION_ARGS)
{
int64 shardId = PG_GETARG_INT64(0);
uint64 shardSize = 0;
shardSize = UpdateShardStatistics(shardId);
PG_RETURN_INT64(shardSize);
}
/*
* CheckDistributedTable checks if the given relationId corresponds to a
* distributed table. If it does not, the function errors out.
@ -401,15 +418,16 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
/*
* UpdateShardStatistics updates metadata for the given shard id and returns
* the new shard size.
* UpdateShardStatistics updates metadata (shard size and shard min/max values)
* of the given shard and returns the updated shard size.
*/
uint64
UpdateShardStatistics(Oid relationId, int64 shardId)
UpdateShardStatistics(int64 shardId)
{
ShardInterval *shardInterval = LoadShardInterval(shardId);
Oid relationId = shardInterval->relationId;
char storageType = shardInterval->storageType;
char *shardName = NULL;
char *shardQualifiedName = NULL;
List *shardPlacementList = NIL;
ListCell *shardPlacementCell = NULL;
bool statsOK = false;
@ -418,11 +436,17 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
text *maxValue = NULL;
/* if shard doesn't have an alias, extend regular table name */
shardName = LoadShardAlias(relationId, shardId);
if (shardName == NULL)
shardQualifiedName = LoadShardAlias(relationId, shardId);
if (shardQualifiedName == NULL)
{
shardName = get_rel_name(relationId);
AppendShardIdToName(&shardName, shardId);
char *relationName = get_rel_name(relationId);
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
shardQualifiedName = quote_qualified_identifier(schemaName, relationName);
AppendShardIdToName(&shardQualifiedName, shardId);
}
shardPlacementList = FinalizedShardPlacementList(shardId);
@ -434,7 +458,7 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
char *workerName = placement->nodeName;
uint32 workerPort = placement->nodePort;
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardQualifiedName,
&shardSize, &minValue, &maxValue);
if (statsOK)
{
@ -451,7 +475,8 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
*/
if (!statsOK)
{
ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName),
ereport(WARNING, (errmsg("could not get statistics for shard %s",
shardQualifiedName),
errdetail("Setting shard statistics to NULL")));
}

View File

@ -63,8 +63,6 @@ static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
StringInfo tableName);
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
StringInfo tableName);
static List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
StringInfo tableName);
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
StringInfo tableName);
static bool check_log_statement(List *stmt_list);
@ -870,7 +868,7 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName)
* DDL commands used in creating the table. If an error occurs during fetching,
* the function returns an empty list.
*/
static List *
List *
TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName)
{
List *ddlCommandList = NIL;

View File

@ -60,6 +60,12 @@
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s"
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
#define FINALIZED_SHARD_PLACEMENTS_QUERY \
"SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld"
#define UPDATE_SHARD_STATISTICS_QUERY \
"SELECT master_update_shard_statistics(%ld)"
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
/* Enumeration that defines the shard placement policy to use while staging */
@ -86,7 +92,7 @@ extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
char *newPlacementOwner,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId);
extern uint64 UpdateShardStatistics(int64 shardId);
/* Function declarations for generating metadata for shard creation */
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
@ -99,6 +105,7 @@ extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);
/* Function declarations to help with data staging and deletion */
extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
@ -108,4 +115,5 @@ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS);
/* function declarations for shard repair functionality */
extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
#endif /* MASTER_PROTOCOL_H */

View File

@ -41,6 +41,13 @@ typedef struct CopyOutStateData
typedef struct CopyOutStateData *CopyOutState;
/* struct type to keep both hostname and port */
typedef struct NodeAddress
{
char *nodeName;
int32 nodePort;
} NodeAddress;
/* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
@ -51,6 +58,8 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
#endif /* MULTI_COPY_H */

View File

@ -119,6 +119,8 @@ extern void RemoveJobSchema(StringInfo schemaName);
extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
extern int32 ArrayObjectCount(ArrayType *arrayObject);
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
StringInfo tableName);
/* Function declarations shared with the master planner */
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);

View File

@ -11,6 +11,7 @@ DROP EXTENSION citus;
-- Create extension in oldest version, test every upgrade step
CREATE EXTENSION citus VERSION '5.0';
ALTER EXTENSION citus UPDATE TO '5.0-1';
ALTER EXTENSION citus UPDATE TO '5.0-2';
-- drop extension an re-create in newest version
DROP EXTENSION citus;
\c

View File

@ -254,3 +254,63 @@ SET citus.shard_max_size TO '256kB';
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
CREATE INDEX ON customer_worker_copy_append (c_name);
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Test if there is no relation to copy data with the worker copy
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
-- Test schema support on append partitioned tables
CREATE SCHEMA append;
CREATE TABLE append.customer_copy (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
-- Test copy from the master node
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
-- Test copy from the worker node
\c - - - 57637
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;

View File

@ -304,3 +304,76 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::
5
(1 row)
-- Create customer table for the worker copy with constraint and index
CREATE TABLE customer_worker_copy_append (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117),
primary key (c_custkey));
CREATE INDEX ON customer_worker_copy_append (c_name);
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
WARNING: table "customer_worker_copy_append" has a unique constraint
DETAIL: Unique constraints and primary keys on append-partitioned tables cannot be enforced.
HINT: Consider using hash partitioning.
master_create_distributed_table
---------------------------------
(1 row)
-- Connect to the first worker node
\c - - - 57637
-- Test copy from the worker node
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Test if there is no relation to copy data with the worker copy
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
WARNING: could not receive query results from localhost:57636
DETAIL: Client error: relation "lineitem_copy_none" does not exist
ERROR: could not run copy from the worker node
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
min | max | avg | count
-----+------+-----------------------+-------
1 | 7000 | 4443.8028800000000000 | 2000
(1 row)
-- Test schema support on append partitioned tables
CREATE SCHEMA append;
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
CREATE TABLE append.customer_copy (
c_custkey integer ,
c_name varchar(25) not null,
c_address varchar(40),
c_nationkey integer,
c_phone char(15),
c_acctbal decimal(15,2),
c_mktsegment char(10),
c_comment varchar(117));
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Test copy from the master node
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
-- Test copy from the worker node
\c - - - 57637
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
-- Connect back to the master node
\c - - - 57636
-- Test the content of the table
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
min | max | avg | count
-----+------+-----------------------+-------
1 | 7000 | 4443.8028800000000000 | 2000
(1 row)

View File

@ -13,6 +13,7 @@ DROP EXTENSION citus;
-- Create extension in oldest version, test every upgrade step
CREATE EXTENSION citus VERSION '5.0';
ALTER EXTENSION citus UPDATE TO '5.0-1';
ALTER EXTENSION citus UPDATE TO '5.0-2';
-- drop extension an re-create in newest version
DROP EXTENSION citus;