mirror of https://github.com/citusdata/citus.git
Add COPY support on worker nodes for append partitioned relations
Now, we can copy to an append-partitioned distributed relation from any worker node by providing master options such as; COPY relation_name FROM file_path WITH (delimiter '|', master_host 'localhost', master_port 5432); where master_port is optional and default is 5432.pull/1938/head
parent
a94671138d
commit
fb6b6daf9d
|
@ -5,7 +5,7 @@ citus_top_builddir = ../../..
|
||||||
|
|
||||||
MODULE_big = citus
|
MODULE_big = citus
|
||||||
EXTENSION = citus
|
EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1
|
EXTVERSIONS = 5.0 5.0-1 5.0-2
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
# Generated files for each version
|
# Generated files for each version
|
||||||
|
@ -29,6 +29,8 @@ $(EXTENSION)--5.0.sql: $(EXTENSION).sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--5.0-1.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql
|
$(EXTENSION)--5.0-1.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--5.0-2.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql $(EXTENSION)--5.0-1--5.0-2.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,8 @@
|
||||||
|
/* citus--5.0-1--5.0-2.sql */
|
||||||
|
|
||||||
|
CREATE FUNCTION master_update_shard_statistics(shard_id bigint)
|
||||||
|
RETURNS bigint
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$master_update_shard_statistics$$;
|
||||||
|
COMMENT ON FUNCTION master_update_shard_statistics(bigint)
|
||||||
|
IS 'updates shard statistics and returns the updated shard size';
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '5.0-1'
|
default_version = '5.0-2'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -4,22 +4,25 @@
|
||||||
* This file contains implementation of COPY utility for distributed
|
* This file contains implementation of COPY utility for distributed
|
||||||
* tables.
|
* tables.
|
||||||
*
|
*
|
||||||
* The CitusCopyFrom function should be called from the utility hook to
|
* The CitusCopyFrom function should be called from the utility hook to process
|
||||||
* process COPY ... FROM commands on distributed tables. CitusCopyFrom
|
* COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input
|
||||||
* parses the input from stdin, a program executed on the master, or a file
|
* from stdin, a program, or a file, and decides to copy new rows to existing
|
||||||
* on the master, and decides to copy new rows to existing shards or new shards
|
* shards or new shards based on the partition method of the distributed table.
|
||||||
* based on the partition method of the distributed table.
|
* If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which
|
||||||
|
* parses the master node copy options and handles communication with the master
|
||||||
|
* node.
|
||||||
*
|
*
|
||||||
* It opens a new connection for every shard placement and uses the PQputCopyData
|
* It opens a new connection for every shard placement and uses the PQputCopyData
|
||||||
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
|
* function to copy the data. Because PQputCopyData transmits data, asynchronously,
|
||||||
* the workers will ingest data at least partially in parallel.
|
* the workers will ingest data at least partially in parallel.
|
||||||
*
|
*
|
||||||
* When failing to connect to a worker, the master marks the placement for
|
* For hash-partitioned tables, if it fails to connect to a worker, the master
|
||||||
* which it was trying to open a connection as inactive, similar to the way
|
* marks the placement for which it was trying to open a connection as inactive,
|
||||||
* DML statements are handled. If a failure occurs after connecting, the
|
* similar to the way DML statements are handled. If a failure occurs after
|
||||||
* transaction is rolled back on all the workers. Note that, if the underlying
|
* connecting, the transaction is rolled back on all the workers. Note that,
|
||||||
* table is append-partitioned, metadata changes are rolled back on the master
|
* in the case of append-partitioned tables, if a fail occurs, immediately
|
||||||
* node, but shard placements are left on the workers.
|
* metadata changes are rolled back on the master node, but shard placements
|
||||||
|
* are left on the worker nodes.
|
||||||
*
|
*
|
||||||
* By default, COPY uses normal transactions on the workers. In the case of
|
* By default, COPY uses normal transactions on the workers. In the case of
|
||||||
* hash or range-partitioned tables, this can cause a problem when some of the
|
* hash or range-partitioned tables, this can cause a problem when some of the
|
||||||
|
@ -30,9 +33,9 @@
|
||||||
* shards are created and in the case of failure, metadata changes are rolled
|
* shards are created and in the case of failure, metadata changes are rolled
|
||||||
* back on the master node.
|
* back on the master node.
|
||||||
*
|
*
|
||||||
* Parsing options are processed and enforced on the master, while
|
* Parsing options are processed and enforced on the node where copy command
|
||||||
* constraints are enforced on the worker. In either case, failure causes
|
* is run, while constraints are enforced on the worker. In either case,
|
||||||
* the whole COPY to roll back.
|
* failure causes the whole COPY to roll back.
|
||||||
*
|
*
|
||||||
* Copyright (c) 2016, Citus Data, Inc.
|
* Copyright (c) 2016, Citus Data, Inc.
|
||||||
*
|
*
|
||||||
|
@ -128,6 +131,9 @@
|
||||||
/* constant used in binary protocol */
|
/* constant used in binary protocol */
|
||||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||||
|
|
||||||
|
/* use a global connection to the master node in order to skip passing it around */
|
||||||
|
static PGconn *masterConnection = NULL;
|
||||||
|
|
||||||
|
|
||||||
/* ShardConnections represents a set of connections for each placement of a shard */
|
/* ShardConnections represents a set of connections for each placement of a shard */
|
||||||
typedef struct ShardConnections
|
typedef struct ShardConnections
|
||||||
|
@ -138,8 +144,11 @@ typedef struct ShardConnections
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
|
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
|
||||||
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
||||||
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag);
|
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
|
||||||
|
static char MasterPartitionMethod(RangeVar *relation);
|
||||||
|
static void RemoveMasterOptions(CopyStmt *copyStatement);
|
||||||
static void LockAllShards(List *shardIntervalList);
|
static void LockAllShards(List *shardIntervalList);
|
||||||
static HTAB * CreateShardConnectionHash(void);
|
static HTAB * CreateShardConnectionHash(void);
|
||||||
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
static int CompareShardIntervalsById(const void *leftElement, const void *rightElement);
|
||||||
|
@ -147,7 +156,9 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash,
|
||||||
int64 shardId,
|
int64 shardId,
|
||||||
bool *shardConnectionsFound);
|
bool *shardConnectionsFound);
|
||||||
static void OpenCopyTransactions(CopyStmt *copyStatement,
|
static void OpenCopyTransactions(CopyStmt *copyStatement,
|
||||||
ShardConnections *shardConnections);
|
ShardConnections *shardConnections, bool stopOnFailure);
|
||||||
|
static List * MasterShardPlacementList(uint64 shardId);
|
||||||
|
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
||||||
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
|
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
|
||||||
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
|
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
|
||||||
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
|
||||||
|
@ -159,8 +170,13 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
|
||||||
static void ReportCopyError(PGconn *connection, PGresult *result);
|
static void ReportCopyError(PGconn *connection, PGresult *result);
|
||||||
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
|
||||||
static void StartCopyToNewShard(ShardConnections *shardConnections,
|
static void StartCopyToNewShard(ShardConnections *shardConnections,
|
||||||
Oid relationId, CopyStmt *copyStatement);
|
CopyStmt *copyStatement);
|
||||||
|
static int64 MasterCreateEmptyShard(char *relationName);
|
||||||
|
static int64 CreateEmptyShard(char *relationName);
|
||||||
|
static int64 RemoteCreateEmptyShard(char *relationName);
|
||||||
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
|
static void FinalizeCopyToNewShard(ShardConnections *shardConnections);
|
||||||
|
static void MasterUpdateShardStatistics(uint64 shardId);
|
||||||
|
static void RemoteUpdateShardStatistics(uint64 shardId);
|
||||||
|
|
||||||
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
/* Private functions copied and adapted from copy.c in PostgreSQL */
|
||||||
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
|
||||||
|
@ -174,14 +190,13 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
|
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
|
||||||
* statement to related subfunctions based on the partition method of the
|
* statement to related subfunctions based on where the copy command is run
|
||||||
* distributed table.
|
* and the partition method of the distributed table.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
bool isCopyFromWorker = false;
|
||||||
char partitionMethod = '\0';
|
|
||||||
|
|
||||||
/* disallow COPY to/from file or program except for superusers */
|
/* disallow COPY to/from file or program except for superusers */
|
||||||
if (copyStatement->filename != NULL && !superuser())
|
if (copyStatement->filename != NULL && !superuser())
|
||||||
|
@ -204,20 +219,129 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
partitionMethod = PartitionMethod(tableId);
|
isCopyFromWorker = IsCopyFromWorker(copyStatement);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
|
if (isCopyFromWorker)
|
||||||
|
{
|
||||||
|
CopyFromWorkerNode(copyStatement, completionTag);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||||
|
char partitionMethod = PartitionMethod(relationId);
|
||||||
|
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
|
||||||
|
DISTRIBUTE_BY_RANGE)
|
||||||
{
|
{
|
||||||
CopyToExistingShards(copyStatement, completionTag);
|
CopyToExistingShards(copyStatement, completionTag);
|
||||||
}
|
}
|
||||||
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
|
||||||
{
|
{
|
||||||
CopyToNewShards(copyStatement, completionTag);
|
CopyToNewShards(copyStatement, completionTag, relationId);
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||||
errmsg("unsupported partition method")));
|
errmsg("unsupported partition method")));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsCopyFromWorker checks if the given copy statement has the master host option.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCopyFromWorker(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
foreach(optionCell, copyStatement->options)
|
||||||
|
{
|
||||||
|
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||||
|
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes
|
||||||
|
* for append-partitioned tables.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
{
|
||||||
|
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
|
||||||
|
char *nodeName = masterNodeAddress->nodeName;
|
||||||
|
int32 nodePort = masterNodeAddress->nodePort;
|
||||||
|
char *nodeUser = CurrentUserName();
|
||||||
|
|
||||||
|
masterConnection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||||
|
|
||||||
|
PG_TRY();
|
||||||
|
{
|
||||||
|
PGresult *queryResult = NULL;
|
||||||
|
Oid relationId = InvalidOid;
|
||||||
|
char partitionMethod = 0;
|
||||||
|
|
||||||
|
/* strip schema name for local reference */
|
||||||
|
char *schemaName = copyStatement->relation->schemaname;
|
||||||
|
copyStatement->relation->schemaname = NULL;
|
||||||
|
|
||||||
|
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||||
|
|
||||||
|
/* put schema name back */
|
||||||
|
copyStatement->relation->schemaname = schemaName;
|
||||||
|
|
||||||
|
partitionMethod = MasterPartitionMethod(copyStatement->relation);
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_APPEND)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("copy from worker nodes is only supported "
|
||||||
|
"for append-partitioned tables")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* run all metadata commands in a transaction */
|
||||||
|
queryResult = PQexec(masterConnection, "BEGIN");
|
||||||
|
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not start to update master node metadata")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Remove master node options from the copy statement because they are not
|
||||||
|
* recognized by PostgreSQL machinery.
|
||||||
|
*/
|
||||||
|
RemoveMasterOptions(copyStatement);
|
||||||
|
|
||||||
|
CopyToNewShards(copyStatement, completionTag, relationId);
|
||||||
|
|
||||||
|
/* commit metadata transactions */
|
||||||
|
queryResult = PQexec(masterConnection, "COMMIT");
|
||||||
|
if (PQresultStatus(queryResult) != PGRES_COMMAND_OK)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not commit master node metadata changes")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
|
||||||
|
/* close the connection */
|
||||||
|
PQfinish(masterConnection);
|
||||||
|
masterConnection = NULL;
|
||||||
|
}
|
||||||
|
PG_CATCH();
|
||||||
|
{
|
||||||
|
/* close the connection */
|
||||||
|
PQfinish(masterConnection);
|
||||||
|
masterConnection = NULL;
|
||||||
|
|
||||||
|
PG_RE_THROW();
|
||||||
|
}
|
||||||
|
PG_END_TRY();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -420,7 +544,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
if (!shardConnectionsFound)
|
if (!shardConnectionsFound)
|
||||||
{
|
{
|
||||||
/* open connections and initiate COPY on shard placements */
|
/* open connections and initiate COPY on shard placements */
|
||||||
OpenCopyTransactions(copyStatement, shardConnections);
|
OpenCopyTransactions(copyStatement, shardConnections, false);
|
||||||
|
|
||||||
/* send copy binary headers to shard placements */
|
/* send copy binary headers to shard placements */
|
||||||
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
||||||
|
@ -492,9 +616,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
* tables where we create new shards into which to copy rows.
|
* tables where we create new shards into which to copy rows.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
{
|
{
|
||||||
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
|
||||||
FmgrInfo *columnOutputFunctions = NULL;
|
FmgrInfo *columnOutputFunctions = NULL;
|
||||||
|
|
||||||
/* allocate column values and nulls arrays */
|
/* allocate column values and nulls arrays */
|
||||||
|
@ -562,7 +685,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
|
||||||
if (!nextRowFound)
|
if (!nextRowFound)
|
||||||
{
|
{
|
||||||
|
/* switch to regular memory context and stop showing line number in errors */
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
error_context_stack = errorCallback.previous;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -581,7 +706,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
if (copiedDataSizeInBytes == 0)
|
if (copiedDataSizeInBytes == 0)
|
||||||
{
|
{
|
||||||
/* create shard and open connections to shard placements */
|
/* create shard and open connections to shard placements */
|
||||||
StartCopyToNewShard(shardConnections, relationId, copyStatement);
|
StartCopyToNewShard(shardConnections, copyStatement);
|
||||||
|
|
||||||
/* send copy binary headers to shard placements */
|
/* send copy binary headers to shard placements */
|
||||||
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
|
||||||
|
@ -606,7 +731,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
||||||
FinalizeCopyToNewShard(shardConnections);
|
FinalizeCopyToNewShard(shardConnections);
|
||||||
UpdateShardStatistics(relationId, shardConnections->shardId);
|
MasterUpdateShardStatistics(shardConnections->shardId);
|
||||||
|
|
||||||
copiedDataSizeInBytes = 0;
|
copiedDataSizeInBytes = 0;
|
||||||
}
|
}
|
||||||
|
@ -624,7 +749,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
|
||||||
FinalizeCopyToNewShard(shardConnections);
|
FinalizeCopyToNewShard(shardConnections);
|
||||||
UpdateShardStatistics(relationId, shardConnections->shardId);
|
MasterUpdateShardStatistics(shardConnections->shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
EndCopyFrom(copyState);
|
EndCopyFrom(copyState);
|
||||||
|
@ -652,6 +777,112 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MasterNodeAddress gets the master node address from copy options and returns
|
||||||
|
* it. Note that if the master_port is not provided, we use 5432 as the default
|
||||||
|
* port.
|
||||||
|
*/
|
||||||
|
NodeAddress *
|
||||||
|
MasterNodeAddress(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress));
|
||||||
|
char *nodeName = NULL;
|
||||||
|
|
||||||
|
/* set default port to 5432 */
|
||||||
|
int32 nodePort = 5432;
|
||||||
|
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
foreach(optionCell, copyStatement->options)
|
||||||
|
{
|
||||||
|
DefElem *defel = (DefElem *) lfirst(optionCell);
|
||||||
|
if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
nodeName = defGetString(defel);
|
||||||
|
}
|
||||||
|
else if (strncmp(defel->defname, "master_port", NAMEDATALEN) == 0)
|
||||||
|
{
|
||||||
|
nodePort = defGetInt32(defel);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
masterNodeAddress->nodeName = nodeName;
|
||||||
|
masterNodeAddress->nodePort = nodePort;
|
||||||
|
|
||||||
|
return masterNodeAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MasterPartitionMethod gets the partition method of the given relation from
|
||||||
|
* the master node and returns it.
|
||||||
|
*/
|
||||||
|
static char
|
||||||
|
MasterPartitionMethod(RangeVar *relation)
|
||||||
|
{
|
||||||
|
char partitionMethod = '\0';
|
||||||
|
PGresult *queryResult = NULL;
|
||||||
|
|
||||||
|
char *relationName = relation->relname;
|
||||||
|
char *schemaName = relation->schemaname;
|
||||||
|
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
|
||||||
|
|
||||||
|
StringInfo partitionMethodCommand = makeStringInfo();
|
||||||
|
appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName);
|
||||||
|
|
||||||
|
queryResult = PQexec(masterConnection, partitionMethodCommand->data);
|
||||||
|
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0);
|
||||||
|
if (partitionMethodString == NULL || (*partitionMethodString) == '\0')
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not find a partition method for the "
|
||||||
|
"table %s", relationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
partitionMethod = partitionMethodString[0];
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ReportRemoteError(masterConnection, queryResult);
|
||||||
|
ereport(ERROR, (errmsg("could not get the partition method of the "
|
||||||
|
"distributed table")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
|
||||||
|
return partitionMethod;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoveMasterOptions removes master node related copy options from the option
|
||||||
|
* list of the copy statement.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoveMasterOptions(CopyStmt *copyStatement)
|
||||||
|
{
|
||||||
|
List *newOptionList = NIL;
|
||||||
|
ListCell *optionCell = NULL;
|
||||||
|
|
||||||
|
/* walk over the list of all options */
|
||||||
|
foreach(optionCell, copyStatement->options)
|
||||||
|
{
|
||||||
|
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||||
|
|
||||||
|
/* skip master related options */
|
||||||
|
if ((strncmp(option->defname, "master_host", NAMEDATALEN) == 0) ||
|
||||||
|
(strncmp(option->defname, "master_port", NAMEDATALEN) == 0))
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
newOptionList = lappend(newOptionList, option);
|
||||||
|
}
|
||||||
|
|
||||||
|
copyStatement->options = newOptionList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* LockAllShards takes shared locks on the metadata and the data of all shards in
|
* LockAllShards takes shared locks on the metadata and the data of all shards in
|
||||||
* shardIntervalList. This prevents concurrent placement changes and concurrent
|
* shardIntervalList. This prevents concurrent placement changes and concurrent
|
||||||
|
@ -764,13 +995,15 @@ GetShardConnections(HTAB *shardConnectionHash, int64 shardId,
|
||||||
* shard placements.
|
* shard placements.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections)
|
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||||
|
bool stopOnFailure)
|
||||||
{
|
{
|
||||||
List *finalizedPlacementList = NIL;
|
List *finalizedPlacementList = NIL;
|
||||||
List *failedPlacementList = NIL;
|
List *failedPlacementList = NIL;
|
||||||
ListCell *placementCell = NULL;
|
ListCell *placementCell = NULL;
|
||||||
ListCell *failedPlacementCell = NULL;
|
ListCell *failedPlacementCell = NULL;
|
||||||
List *connectionList = NULL;
|
List *connectionList = NULL;
|
||||||
|
int64 shardId = shardConnections->shardId;
|
||||||
|
|
||||||
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext,
|
||||||
"OpenCopyTransactions",
|
"OpenCopyTransactions",
|
||||||
|
@ -781,7 +1014,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
/* release finalized placement list at the end of this function */
|
/* release finalized placement list at the end of this function */
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(localContext);
|
||||||
|
|
||||||
finalizedPlacementList = FinalizedShardPlacementList(shardConnections->shardId);
|
finalizedPlacementList = MasterShardPlacementList(shardId);
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
@ -791,17 +1024,20 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
char *nodeName = placement->nodeName;
|
char *nodeName = placement->nodeName;
|
||||||
int nodePort = placement->nodePort;
|
int nodePort = placement->nodePort;
|
||||||
char *nodeUser = CurrentUserName();
|
char *nodeUser = CurrentUserName();
|
||||||
|
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
|
||||||
|
|
||||||
TransactionConnection *transactionConnection = NULL;
|
TransactionConnection *transactionConnection = NULL;
|
||||||
StringInfo copyCommand = NULL;
|
StringInfo copyCommand = NULL;
|
||||||
PGresult *result = NULL;
|
PGresult *result = NULL;
|
||||||
|
|
||||||
PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser);
|
|
||||||
|
|
||||||
/* release failed placement list and copy command at the end of this function */
|
|
||||||
oldContext = MemoryContextSwitchTo(localContext);
|
|
||||||
|
|
||||||
if (connection == NULL)
|
if (connection == NULL)
|
||||||
{
|
{
|
||||||
|
if (stopOnFailure)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not open connection to %s:%d",
|
||||||
|
nodeName, nodePort)));
|
||||||
|
}
|
||||||
|
|
||||||
failedPlacementList = lappend(failedPlacementList, placement);
|
failedPlacementList = lappend(failedPlacementList, placement);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -811,9 +1047,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result);
|
ReportRemoteError(connection, result);
|
||||||
failedPlacementList = lappend(failedPlacementList, placement);
|
failedPlacementList = lappend(failedPlacementList, placement);
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId);
|
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId);
|
||||||
|
|
||||||
result = PQexec(connection, copyCommand->data);
|
result = PQexec(connection, copyCommand->data);
|
||||||
|
@ -821,11 +1060,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
{
|
{
|
||||||
ReportRemoteError(connection, result);
|
ReportRemoteError(connection, result);
|
||||||
failedPlacementList = lappend(failedPlacementList, placement);
|
failedPlacementList = lappend(failedPlacementList, placement);
|
||||||
|
|
||||||
|
PQclear(result);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* preserve transaction connection in regular memory context */
|
PQclear(result);
|
||||||
MemoryContextSwitchTo(oldContext);
|
|
||||||
|
|
||||||
transactionConnection = palloc0(sizeof(TransactionConnection));
|
transactionConnection = palloc0(sizeof(TransactionConnection));
|
||||||
|
|
||||||
|
@ -842,6 +1082,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
ereport(ERROR, (errmsg("could not find any active placements")));
|
ereport(ERROR, (errmsg("could not find any active placements")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If stopOnFailure is true, we just error out and code execution should
|
||||||
|
* never reach to this point. This is the case for copy from worker nodes.
|
||||||
|
*/
|
||||||
|
Assert(!stopOnFailure || list_length(failedPlacementList) == 0);
|
||||||
|
|
||||||
/* otherwise, mark failed placements as inactive: they're stale */
|
/* otherwise, mark failed placements as inactive: they're stale */
|
||||||
foreach(failedPlacementCell, failedPlacementList)
|
foreach(failedPlacementCell, failedPlacementList)
|
||||||
{
|
{
|
||||||
|
@ -860,6 +1106,71 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MasterShardPlacementList dispatches the finalized shard placements call
|
||||||
|
* between local or remote master node according to the master connection state.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
MasterShardPlacementList(uint64 shardId)
|
||||||
|
{
|
||||||
|
List *finalizedPlacementList = NIL;
|
||||||
|
if (masterConnection == NULL)
|
||||||
|
{
|
||||||
|
finalizedPlacementList = FinalizedShardPlacementList(shardId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
finalizedPlacementList = RemoteFinalizedShardPlacementList(shardId);
|
||||||
|
}
|
||||||
|
|
||||||
|
return finalizedPlacementList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteFinalizedShardPlacementList gets the finalized shard placement list
|
||||||
|
* for the given shard id from the remote master node.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
RemoteFinalizedShardPlacementList(uint64 shardId)
|
||||||
|
{
|
||||||
|
List *finalizedPlacementList = NIL;
|
||||||
|
PGresult *queryResult = NULL;
|
||||||
|
|
||||||
|
StringInfo shardPlacementsCommand = makeStringInfo();
|
||||||
|
appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId);
|
||||||
|
|
||||||
|
queryResult = PQexec(masterConnection, shardPlacementsCommand->data);
|
||||||
|
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
int rowCount = PQntuples(queryResult);
|
||||||
|
int rowIndex = 0;
|
||||||
|
|
||||||
|
for (rowIndex = 0; rowIndex < rowCount; rowIndex++)
|
||||||
|
{
|
||||||
|
char *nodeName = PQgetvalue(queryResult, rowIndex, 0);
|
||||||
|
|
||||||
|
char *nodePortString = PQgetvalue(queryResult, rowIndex, 1);
|
||||||
|
uint32 nodePort = atoi(nodePortString);
|
||||||
|
|
||||||
|
ShardPlacement *shardPlacement =
|
||||||
|
(ShardPlacement *) palloc0(sizeof(ShardPlacement));
|
||||||
|
|
||||||
|
shardPlacement->nodeName = nodeName;
|
||||||
|
shardPlacement->nodePort = nodePort;
|
||||||
|
|
||||||
|
finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not get shard placements from the master node")));
|
||||||
|
}
|
||||||
|
|
||||||
|
return finalizedPlacementList;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Send copy binary headers to given connections */
|
/* Send copy binary headers to given connections */
|
||||||
static void
|
static void
|
||||||
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
|
SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList)
|
||||||
|
@ -1259,23 +1570,93 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
|
||||||
* opens connections to shard placements.
|
* opens connections to shard placements.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId,
|
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
|
||||||
CopyStmt *copyStatement)
|
|
||||||
{
|
{
|
||||||
char *relationName = get_rel_name(relationId);
|
char *relationName = copyStatement->relation->relname;
|
||||||
text *relationNameText = cstring_to_text(relationName);
|
char *schemaName = copyStatement->relation->schemaname;
|
||||||
Datum relationNameDatum = PointerGetDatum(relationNameText);
|
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
|
||||||
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
|
|
||||||
relationNameDatum);
|
int64 shardId = MasterCreateEmptyShard(qualifiedName);
|
||||||
|
|
||||||
int64 shardId = DatumGetInt64(shardIdDatum);
|
|
||||||
shardConnections->shardId = shardId;
|
shardConnections->shardId = shardId;
|
||||||
|
|
||||||
list_free_deep(shardConnections->connectionList);
|
list_free_deep(shardConnections->connectionList);
|
||||||
shardConnections->connectionList = NIL;
|
shardConnections->connectionList = NIL;
|
||||||
|
|
||||||
/* connect to shards placements and start transactions */
|
/* connect to shards placements and start transactions */
|
||||||
OpenCopyTransactions(copyStatement, shardConnections);
|
OpenCopyTransactions(copyStatement, shardConnections, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MasterCreateEmptyShard dispatches the create empty shard call between local or
|
||||||
|
* remote master node according to the master connection state.
|
||||||
|
*/
|
||||||
|
static int64
|
||||||
|
MasterCreateEmptyShard(char *relationName)
|
||||||
|
{
|
||||||
|
int64 shardId = 0;
|
||||||
|
if (masterConnection == NULL)
|
||||||
|
{
|
||||||
|
shardId = CreateEmptyShard(relationName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
shardId = RemoteCreateEmptyShard(relationName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateEmptyShard creates a new shard and related shard placements from the
|
||||||
|
* local master node.
|
||||||
|
*/
|
||||||
|
static int64
|
||||||
|
CreateEmptyShard(char *relationName)
|
||||||
|
{
|
||||||
|
int64 shardId = 0;
|
||||||
|
|
||||||
|
text *relationNameText = cstring_to_text(relationName);
|
||||||
|
Datum relationNameDatum = PointerGetDatum(relationNameText);
|
||||||
|
Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard,
|
||||||
|
relationNameDatum);
|
||||||
|
shardId = DatumGetInt64(shardIdDatum);
|
||||||
|
|
||||||
|
return shardId;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteCreateEmptyShard creates a new shard and related shard placements from
|
||||||
|
* the remote master node.
|
||||||
|
*/
|
||||||
|
static int64
|
||||||
|
RemoteCreateEmptyShard(char *relationName)
|
||||||
|
{
|
||||||
|
int64 shardId = 0;
|
||||||
|
PGresult *queryResult = NULL;
|
||||||
|
|
||||||
|
StringInfo createEmptyShardCommand = makeStringInfo();
|
||||||
|
appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName);
|
||||||
|
|
||||||
|
queryResult = PQexec(masterConnection, createEmptyShardCommand->data);
|
||||||
|
if (PQresultStatus(queryResult) == PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0);
|
||||||
|
char *shardIdStringEnd = NULL;
|
||||||
|
shardId = strtoul(shardIdString, &shardIdStringEnd, 0);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ReportRemoteError(masterConnection, queryResult);
|
||||||
|
ereport(ERROR, (errmsg("could not create a new empty shard on the remote node")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
|
||||||
|
return shardId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -1295,6 +1676,46 @@ FinalizeCopyToNewShard(ShardConnections *shardConnections)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* MasterUpdateShardStatistics dispatches the update shard statistics call
|
||||||
|
* between local or remote master node according to the master connection state.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
MasterUpdateShardStatistics(uint64 shardId)
|
||||||
|
{
|
||||||
|
if (masterConnection == NULL)
|
||||||
|
{
|
||||||
|
UpdateShardStatistics(shardId);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
RemoteUpdateShardStatistics(shardId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* RemoteUpdateShardStatistics updates shard statistics on the remote master node.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
RemoteUpdateShardStatistics(uint64 shardId)
|
||||||
|
{
|
||||||
|
PGresult *queryResult = NULL;
|
||||||
|
|
||||||
|
StringInfo updateShardStatisticsCommand = makeStringInfo();
|
||||||
|
appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY,
|
||||||
|
shardId);
|
||||||
|
|
||||||
|
queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data);
|
||||||
|
if (PQresultStatus(queryResult) != PGRES_TUPLES_OK)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not update shard statistics")));
|
||||||
|
}
|
||||||
|
|
||||||
|
PQclear(queryResult);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/* *INDENT-OFF* */
|
/* *INDENT-OFF* */
|
||||||
/* Append data to the copy buffer in outputState */
|
/* Append data to the copy buffer in outputState */
|
||||||
static void
|
static void
|
||||||
|
|
|
@ -11,6 +11,7 @@
|
||||||
|
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "access/sysattr.h"
|
#include "access/sysattr.h"
|
||||||
|
#include "access/xact.h"
|
||||||
#include "catalog/catalog.h"
|
#include "catalog/catalog.h"
|
||||||
#include "catalog/index.h"
|
#include "catalog/index.h"
|
||||||
#include "catalog/namespace.h"
|
#include "catalog/namespace.h"
|
||||||
|
@ -25,11 +26,13 @@
|
||||||
#include "distributed/transmit.h"
|
#include "distributed/transmit.h"
|
||||||
#include "distributed/worker_manager.h"
|
#include "distributed/worker_manager.h"
|
||||||
#include "distributed/worker_protocol.h"
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "foreign/foreign.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
#include "parser/parser.h"
|
#include "parser/parser.h"
|
||||||
#include "parser/parse_utilcmd.h"
|
#include "parser/parse_utilcmd.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "tcop/pquery.h"
|
#include "tcop/pquery.h"
|
||||||
|
#include "tcop/utility.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/inval.h"
|
#include "utils/inval.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -70,6 +73,7 @@ static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement
|
||||||
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement);
|
||||||
|
|
||||||
/* Local functions forward declarations for helper functions */
|
/* Local functions forward declarations for helper functions */
|
||||||
|
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
|
||||||
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
|
static bool IsAlterTableRenameStmt(RenameStmt *renameStatement);
|
||||||
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString);
|
static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString);
|
||||||
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString,
|
static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString,
|
||||||
|
@ -311,9 +315,28 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
*/
|
*/
|
||||||
if (copyStatement->relation != NULL)
|
if (copyStatement->relation != NULL)
|
||||||
{
|
{
|
||||||
Relation copiedRelation = NULL;
|
|
||||||
bool isDistributedRelation = false;
|
bool isDistributedRelation = false;
|
||||||
|
bool isCopyFromWorker = IsCopyFromWorker(copyStatement);
|
||||||
|
|
||||||
|
if (isCopyFromWorker)
|
||||||
|
{
|
||||||
|
RangeVar *relation = copyStatement->relation;
|
||||||
|
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
|
||||||
|
char *nodeName = masterNodeAddress->nodeName;
|
||||||
|
int32 nodePort = masterNodeAddress->nodePort;
|
||||||
|
|
||||||
|
CreateLocalTable(relation, nodeName, nodePort);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* We expect copy from worker to be on a distributed table; otherwise,
|
||||||
|
* it fails in CitusCopyFrom() while checking the partition method.
|
||||||
|
*/
|
||||||
|
isDistributedRelation = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
bool isFrom = copyStatement->is_from;
|
bool isFrom = copyStatement->is_from;
|
||||||
|
Relation copiedRelation = NULL;
|
||||||
|
|
||||||
/* consider using RangeVarGetRelidExtended to check perms before locking */
|
/* consider using RangeVarGetRelidExtended to check perms before locking */
|
||||||
copiedRelation = heap_openrv(copyStatement->relation,
|
copiedRelation = heap_openrv(copyStatement->relation,
|
||||||
|
@ -326,13 +349,18 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
||||||
RelationGetNamespace(copiedRelation));
|
RelationGetNamespace(copiedRelation));
|
||||||
|
|
||||||
heap_close(copiedRelation, NoLock);
|
heap_close(copiedRelation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
if (isDistributedRelation)
|
if (isDistributedRelation)
|
||||||
{
|
{
|
||||||
if (copyStatement->is_from)
|
if (copyStatement->is_from)
|
||||||
{
|
{
|
||||||
/* check permissions, we're bypassing postgres' normal checks */
|
/* check permissions, we're bypassing postgres' normal checks */
|
||||||
|
if (!isCopyFromWorker)
|
||||||
|
{
|
||||||
CheckCopyPermissions(copyStatement);
|
CheckCopyPermissions(copyStatement);
|
||||||
|
}
|
||||||
|
|
||||||
CitusCopyFrom(copyStatement, completionTag);
|
CitusCopyFrom(copyStatement, completionTag);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -834,6 +862,82 @@ ErrorIfDistributedRenameStmt(RenameStmt *renameStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateLocalTable gets DDL commands from the remote node for the given
|
||||||
|
* relation. Then, it creates the local relation as temporary and on commit drop.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
|
||||||
|
{
|
||||||
|
List *ddlCommandList = NIL;
|
||||||
|
ListCell *ddlCommandCell = NULL;
|
||||||
|
|
||||||
|
char *relationName = relation->relname;
|
||||||
|
char *schemaName = relation->schemaname;
|
||||||
|
char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
|
||||||
|
|
||||||
|
/* fetch the ddl commands needed to create the table */
|
||||||
|
StringInfo tableNameStringInfo = makeStringInfo();
|
||||||
|
appendStringInfoString(tableNameStringInfo, qualifiedName);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The warning message created in TableDDLCommandList() is descriptive
|
||||||
|
* enough; therefore, we just throw an error which says that we could not
|
||||||
|
* run the copy operation.
|
||||||
|
*/
|
||||||
|
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableNameStringInfo);
|
||||||
|
if (ddlCommandList == NIL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("could not run copy from the worker node")));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* apply DDL commands against the local database */
|
||||||
|
foreach(ddlCommandCell, ddlCommandList)
|
||||||
|
{
|
||||||
|
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
|
||||||
|
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
|
||||||
|
bool applyDDLCommand = false;
|
||||||
|
|
||||||
|
if (IsA(ddlCommandNode, CreateStmt) ||
|
||||||
|
IsA(ddlCommandNode, CreateForeignTableStmt))
|
||||||
|
{
|
||||||
|
CreateStmt *createStatement = (CreateStmt *) ddlCommandNode;
|
||||||
|
|
||||||
|
/* create the local relation as temporary and on commit drop */
|
||||||
|
createStatement->relation->relpersistence = RELPERSISTENCE_TEMP;
|
||||||
|
createStatement->oncommit = ONCOMMIT_DROP;
|
||||||
|
|
||||||
|
/* temporarily strip schema name */
|
||||||
|
createStatement->relation->schemaname = NULL;
|
||||||
|
|
||||||
|
applyDDLCommand = true;
|
||||||
|
}
|
||||||
|
else if (IsA(ddlCommandNode, CreateForeignServerStmt))
|
||||||
|
{
|
||||||
|
CreateForeignServerStmt *createServerStmt =
|
||||||
|
(CreateForeignServerStmt *) ddlCommandNode;
|
||||||
|
if (GetForeignServerByName(createServerStmt->servername, true) == NULL)
|
||||||
|
{
|
||||||
|
/* create server if not exists */
|
||||||
|
applyDDLCommand = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else if ((IsA(ddlCommandNode, CreateExtensionStmt)))
|
||||||
|
{
|
||||||
|
applyDDLCommand = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* run only a selected set of DDL commands */
|
||||||
|
if (applyDDLCommand)
|
||||||
|
{
|
||||||
|
ProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode),
|
||||||
|
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||||
|
CommandCounterIncrement();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* IsAlterTableRenameStmt returns true if the passed in RenameStmt operates on a
|
* IsAlterTableRenameStmt returns true if the passed in RenameStmt operates on a
|
||||||
* distributed table or its objects. This includes:
|
* distributed table or its objects. This includes:
|
||||||
|
|
|
@ -54,6 +54,7 @@ static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid rela
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_create_empty_shard);
|
PG_FUNCTION_INFO_V1(master_create_empty_shard);
|
||||||
PG_FUNCTION_INFO_V1(master_append_table_to_shard);
|
PG_FUNCTION_INFO_V1(master_append_table_to_shard);
|
||||||
|
PG_FUNCTION_INFO_V1(master_update_shard_statistics);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -269,7 +270,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
RESUME_INTERRUPTS();
|
RESUME_INTERRUPTS();
|
||||||
|
|
||||||
/* update shard statistics and get new shard size */
|
/* update shard statistics and get new shard size */
|
||||||
newShardSize = UpdateShardStatistics(relationId, shardId);
|
newShardSize = UpdateShardStatistics(shardId);
|
||||||
|
|
||||||
/* calculate ratio of current shard size compared to shard max size */
|
/* calculate ratio of current shard size compared to shard max size */
|
||||||
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
|
shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
|
||||||
|
@ -279,6 +280,22 @@ master_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* master_update_shard_statistics updates metadata (shard size and shard min/max
|
||||||
|
* values) of the given shard and returns the updated shard size.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
master_update_shard_statistics(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
int64 shardId = PG_GETARG_INT64(0);
|
||||||
|
uint64 shardSize = 0;
|
||||||
|
|
||||||
|
shardSize = UpdateShardStatistics(shardId);
|
||||||
|
|
||||||
|
PG_RETURN_INT64(shardSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CheckDistributedTable checks if the given relationId corresponds to a
|
* CheckDistributedTable checks if the given relationId corresponds to a
|
||||||
* distributed table. If it does not, the function errors out.
|
* distributed table. If it does not, the function errors out.
|
||||||
|
@ -401,15 +418,16 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* UpdateShardStatistics updates metadata for the given shard id and returns
|
* UpdateShardStatistics updates metadata (shard size and shard min/max values)
|
||||||
* the new shard size.
|
* of the given shard and returns the updated shard size.
|
||||||
*/
|
*/
|
||||||
uint64
|
uint64
|
||||||
UpdateShardStatistics(Oid relationId, int64 shardId)
|
UpdateShardStatistics(int64 shardId)
|
||||||
{
|
{
|
||||||
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
ShardInterval *shardInterval = LoadShardInterval(shardId);
|
||||||
|
Oid relationId = shardInterval->relationId;
|
||||||
char storageType = shardInterval->storageType;
|
char storageType = shardInterval->storageType;
|
||||||
char *shardName = NULL;
|
char *shardQualifiedName = NULL;
|
||||||
List *shardPlacementList = NIL;
|
List *shardPlacementList = NIL;
|
||||||
ListCell *shardPlacementCell = NULL;
|
ListCell *shardPlacementCell = NULL;
|
||||||
bool statsOK = false;
|
bool statsOK = false;
|
||||||
|
@ -418,11 +436,17 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
|
||||||
text *maxValue = NULL;
|
text *maxValue = NULL;
|
||||||
|
|
||||||
/* if shard doesn't have an alias, extend regular table name */
|
/* if shard doesn't have an alias, extend regular table name */
|
||||||
shardName = LoadShardAlias(relationId, shardId);
|
shardQualifiedName = LoadShardAlias(relationId, shardId);
|
||||||
if (shardName == NULL)
|
if (shardQualifiedName == NULL)
|
||||||
{
|
{
|
||||||
shardName = get_rel_name(relationId);
|
char *relationName = get_rel_name(relationId);
|
||||||
AppendShardIdToName(&shardName, shardId);
|
|
||||||
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
|
|
||||||
|
shardQualifiedName = quote_qualified_identifier(schemaName, relationName);
|
||||||
|
|
||||||
|
AppendShardIdToName(&shardQualifiedName, shardId);
|
||||||
}
|
}
|
||||||
|
|
||||||
shardPlacementList = FinalizedShardPlacementList(shardId);
|
shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||||
|
@ -434,7 +458,7 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
|
||||||
char *workerName = placement->nodeName;
|
char *workerName = placement->nodeName;
|
||||||
uint32 workerPort = placement->nodePort;
|
uint32 workerPort = placement->nodePort;
|
||||||
|
|
||||||
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName,
|
statsOK = WorkerShardStats(workerName, workerPort, relationId, shardQualifiedName,
|
||||||
&shardSize, &minValue, &maxValue);
|
&shardSize, &minValue, &maxValue);
|
||||||
if (statsOK)
|
if (statsOK)
|
||||||
{
|
{
|
||||||
|
@ -451,7 +475,8 @@ UpdateShardStatistics(Oid relationId, int64 shardId)
|
||||||
*/
|
*/
|
||||||
if (!statsOK)
|
if (!statsOK)
|
||||||
{
|
{
|
||||||
ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName),
|
ereport(WARNING, (errmsg("could not get statistics for shard %s",
|
||||||
|
shardQualifiedName),
|
||||||
errdetail("Setting shard statistics to NULL")));
|
errdetail("Setting shard statistics to NULL")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -63,8 +63,6 @@ static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
|
||||||
StringInfo tableName);
|
StringInfo tableName);
|
||||||
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
|
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
|
||||||
StringInfo tableName);
|
StringInfo tableName);
|
||||||
static List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
|
||||||
StringInfo tableName);
|
|
||||||
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
|
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
|
||||||
StringInfo tableName);
|
StringInfo tableName);
|
||||||
static bool check_log_statement(List *stmt_list);
|
static bool check_log_statement(List *stmt_list);
|
||||||
|
@ -870,7 +868,7 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName)
|
||||||
* DDL commands used in creating the table. If an error occurs during fetching,
|
* DDL commands used in creating the table. If an error occurs during fetching,
|
||||||
* the function returns an empty list.
|
* the function returns an empty list.
|
||||||
*/
|
*/
|
||||||
static List *
|
List *
|
||||||
TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName)
|
TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName)
|
||||||
{
|
{
|
||||||
List *ddlCommandList = NIL;
|
List *ddlCommandList = NIL;
|
||||||
|
|
|
@ -60,6 +60,12 @@
|
||||||
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"
|
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s"
|
||||||
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s"
|
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s"
|
||||||
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"
|
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"
|
||||||
|
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
|
||||||
|
#define FINALIZED_SHARD_PLACEMENTS_QUERY \
|
||||||
|
"SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld"
|
||||||
|
#define UPDATE_SHARD_STATISTICS_QUERY \
|
||||||
|
"SELECT master_update_shard_statistics(%ld)"
|
||||||
|
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
|
||||||
|
|
||||||
|
|
||||||
/* Enumeration that defines the shard placement policy to use while staging */
|
/* Enumeration that defines the shard placement policy to use while staging */
|
||||||
|
@ -86,7 +92,7 @@ extern void CreateShardPlacements(int64 shardId, List *ddlEventList,
|
||||||
char *newPlacementOwner,
|
char *newPlacementOwner,
|
||||||
List *workerNodeList, int workerStartIndex,
|
List *workerNodeList, int workerStartIndex,
|
||||||
int replicationFactor);
|
int replicationFactor);
|
||||||
extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId);
|
extern uint64 UpdateShardStatistics(int64 shardId);
|
||||||
|
|
||||||
/* Function declarations for generating metadata for shard creation */
|
/* Function declarations for generating metadata for shard creation */
|
||||||
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
extern Datum master_get_table_metadata(PG_FUNCTION_ARGS);
|
||||||
|
@ -99,6 +105,7 @@ extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS);
|
||||||
/* Function declarations to help with data staging and deletion */
|
/* Function declarations to help with data staging and deletion */
|
||||||
extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
|
extern Datum master_create_empty_shard(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
|
extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS);
|
||||||
|
extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
extern Datum master_apply_delete_command(PG_FUNCTION_ARGS);
|
||||||
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
|
@ -108,4 +115,5 @@ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS);
|
||||||
/* function declarations for shard repair functionality */
|
/* function declarations for shard repair functionality */
|
||||||
extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
|
extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MASTER_PROTOCOL_H */
|
#endif /* MASTER_PROTOCOL_H */
|
||||||
|
|
|
@ -41,6 +41,13 @@ typedef struct CopyOutStateData
|
||||||
|
|
||||||
typedef struct CopyOutStateData *CopyOutState;
|
typedef struct CopyOutStateData *CopyOutState;
|
||||||
|
|
||||||
|
/* struct type to keep both hostname and port */
|
||||||
|
typedef struct NodeAddress
|
||||||
|
{
|
||||||
|
char *nodeName;
|
||||||
|
int32 nodePort;
|
||||||
|
} NodeAddress;
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
|
@ -51,6 +58,8 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
|
||||||
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
|
||||||
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
||||||
|
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
||||||
|
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
||||||
|
|
||||||
|
|
||||||
#endif /* MULTI_COPY_H */
|
#endif /* MULTI_COPY_H */
|
||||||
|
|
|
@ -119,6 +119,8 @@ extern void RemoveJobSchema(StringInfo schemaName);
|
||||||
extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
||||||
extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
||||||
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
||||||
|
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
||||||
|
StringInfo tableName);
|
||||||
|
|
||||||
/* Function declarations shared with the master planner */
|
/* Function declarations shared with the master planner */
|
||||||
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
||||||
|
|
|
@ -11,6 +11,7 @@ DROP EXTENSION citus;
|
||||||
-- Create extension in oldest version, test every upgrade step
|
-- Create extension in oldest version, test every upgrade step
|
||||||
CREATE EXTENSION citus VERSION '5.0';
|
CREATE EXTENSION citus VERSION '5.0';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.0-1';
|
ALTER EXTENSION citus UPDATE TO '5.0-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.0-2';
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
\c
|
\c
|
||||||
|
|
|
@ -254,3 +254,63 @@ SET citus.shard_max_size TO '256kB';
|
||||||
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|';
|
||||||
|
|
||||||
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass;
|
||||||
|
|
||||||
|
-- Create customer table for the worker copy with constraint and index
|
||||||
|
CREATE TABLE customer_worker_copy_append (
|
||||||
|
c_custkey integer ,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
|
||||||
|
CREATE INDEX ON customer_worker_copy_append (c_name);
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
|
||||||
|
|
||||||
|
-- Connect to the first worker node
|
||||||
|
\c - - - 57637
|
||||||
|
|
||||||
|
-- Test copy from the worker node
|
||||||
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
|
||||||
|
-- Test if there is no relation to copy data with the worker copy
|
||||||
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
|
||||||
|
-- Connect back to the master node
|
||||||
|
\c - - - 57636
|
||||||
|
|
||||||
|
-- Test the content of the table
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
|
||||||
|
|
||||||
|
-- Test schema support on append partitioned tables
|
||||||
|
CREATE SCHEMA append;
|
||||||
|
CREATE TABLE append.customer_copy (
|
||||||
|
c_custkey integer ,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117));
|
||||||
|
|
||||||
|
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
||||||
|
|
||||||
|
-- Test copy from the master node
|
||||||
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
|
||||||
|
|
||||||
|
-- Test copy from the worker node
|
||||||
|
\c - - - 57637
|
||||||
|
|
||||||
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
|
||||||
|
-- Connect back to the master node
|
||||||
|
\c - - - 57636
|
||||||
|
|
||||||
|
-- Test the content of the table
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
||||||
|
|
|
@ -304,3 +304,76 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::
|
||||||
5
|
5
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- Create customer table for the worker copy with constraint and index
|
||||||
|
CREATE TABLE customer_worker_copy_append (
|
||||||
|
c_custkey integer ,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117),
|
||||||
|
primary key (c_custkey));
|
||||||
|
CREATE INDEX ON customer_worker_copy_append (c_name);
|
||||||
|
SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append');
|
||||||
|
WARNING: table "customer_worker_copy_append" has a unique constraint
|
||||||
|
DETAIL: Unique constraints and primary keys on append-partitioned tables cannot be enforced.
|
||||||
|
HINT: Consider using hash partitioning.
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Connect to the first worker node
|
||||||
|
\c - - - 57637
|
||||||
|
-- Test copy from the worker node
|
||||||
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
-- Test if there is no relation to copy data with the worker copy
|
||||||
|
COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
WARNING: could not receive query results from localhost:57636
|
||||||
|
DETAIL: Client error: relation "lineitem_copy_none" does not exist
|
||||||
|
ERROR: could not run copy from the worker node
|
||||||
|
-- Connect back to the master node
|
||||||
|
\c - - - 57636
|
||||||
|
-- Test the content of the table
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append;
|
||||||
|
min | max | avg | count
|
||||||
|
-----+------+-----------------------+-------
|
||||||
|
1 | 7000 | 4443.8028800000000000 | 2000
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test schema support on append partitioned tables
|
||||||
|
CREATE SCHEMA append;
|
||||||
|
NOTICE: Citus partially supports CREATE SCHEMA for distributed databases
|
||||||
|
DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet
|
||||||
|
CREATE TABLE append.customer_copy (
|
||||||
|
c_custkey integer ,
|
||||||
|
c_name varchar(25) not null,
|
||||||
|
c_address varchar(40),
|
||||||
|
c_nationkey integer,
|
||||||
|
c_phone char(15),
|
||||||
|
c_acctbal decimal(15,2),
|
||||||
|
c_mktsegment char(10),
|
||||||
|
c_comment varchar(117));
|
||||||
|
SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append');
|
||||||
|
master_create_distributed_table
|
||||||
|
---------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- Test copy from the master node
|
||||||
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|');
|
||||||
|
-- Test copy from the worker node
|
||||||
|
\c - - - 57637
|
||||||
|
COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636);
|
||||||
|
-- Connect back to the master node
|
||||||
|
\c - - - 57636
|
||||||
|
-- Test the content of the table
|
||||||
|
SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy;
|
||||||
|
min | max | avg | count
|
||||||
|
-----+------+-----------------------+-------
|
||||||
|
1 | 7000 | 4443.8028800000000000 | 2000
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
|
|
@ -13,6 +13,7 @@ DROP EXTENSION citus;
|
||||||
-- Create extension in oldest version, test every upgrade step
|
-- Create extension in oldest version, test every upgrade step
|
||||||
CREATE EXTENSION citus VERSION '5.0';
|
CREATE EXTENSION citus VERSION '5.0';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.0-1';
|
ALTER EXTENSION citus UPDATE TO '5.0-1';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.0-2';
|
||||||
|
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
|
|
Loading…
Reference in New Issue