From fb6b6daf9dbb04f61c67724735b9b8c7e22367d3 Mon Sep 17 00:00:00 2001 From: Metin Doslu Date: Fri, 22 Apr 2016 11:27:56 +0300 Subject: [PATCH] Add COPY support on worker nodes for append partitioned relations Now, we can copy to an append-partitioned distributed relation from any worker node by providing master options such as; COPY relation_name FROM file_path WITH (delimiter '|', master_host 'localhost', master_port 5432); where master_port is optional and default is 5432. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--5.0-1--5.0-2.sql | 8 + src/backend/distributed/citus.control | 2 +- src/backend/distributed/commands/multi_copy.c | 529 ++++++++++++++++-- .../distributed/executor/multi_utility.c | 126 ++++- .../master/master_stage_protocol.c | 47 +- .../worker/worker_data_fetch_protocol.c | 4 +- src/include/distributed/master_protocol.h | 10 +- src/include/distributed/multi_copy.h | 9 + src/include/distributed/worker_protocol.h | 2 + src/test/regress/expected/multi_extension.out | 1 + src/test/regress/input/multi_copy.source | 60 ++ src/test/regress/output/multi_copy.source | 73 +++ src/test/regress/sql/multi_extension.sql | 1 + 14 files changed, 794 insertions(+), 82 deletions(-) create mode 100644 src/backend/distributed/citus--5.0-1--5.0-2.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index fd9b920e9..e64c5404f 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -5,7 +5,7 @@ citus_top_builddir = ../../.. MODULE_big = citus EXTENSION = citus -EXTVERSIONS = 5.0 5.0-1 +EXTVERSIONS = 5.0 5.0-1 5.0-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) # Generated files for each version @@ -29,6 +29,8 @@ $(EXTENSION)--5.0.sql: $(EXTENSION).sql cat $^ > $@ $(EXTENSION)--5.0-1.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql cat $^ > $@ +$(EXTENSION)--5.0-2.sql: $(EXTENSION)--5.0.sql $(EXTENSION)--5.0--5.0-1.sql $(EXTENSION)--5.0-1--5.0-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.0-1--5.0-2.sql b/src/backend/distributed/citus--5.0-1--5.0-2.sql new file mode 100644 index 000000000..0f6a32f37 --- /dev/null +++ b/src/backend/distributed/citus--5.0-1--5.0-2.sql @@ -0,0 +1,8 @@ +/* citus--5.0-1--5.0-2.sql */ + +CREATE FUNCTION master_update_shard_statistics(shard_id bigint) + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_update_shard_statistics$$; +COMMENT ON FUNCTION master_update_shard_statistics(bigint) + IS 'updates shard statistics and returns the updated shard size'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 7db6e73fd..f5bbc9634 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.0-1' +default_version = '5.0-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 03b612837..ed4242305 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -4,22 +4,25 @@ * This file contains implementation of COPY utility for distributed * tables. * - * The CitusCopyFrom function should be called from the utility hook to - * process COPY ... FROM commands on distributed tables. CitusCopyFrom - * parses the input from stdin, a program executed on the master, or a file - * on the master, and decides to copy new rows to existing shards or new shards - * based on the partition method of the distributed table. + * The CitusCopyFrom function should be called from the utility hook to process + * COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input + * from stdin, a program, or a file, and decides to copy new rows to existing + * shards or new shards based on the partition method of the distributed table. + * If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which + * parses the master node copy options and handles communication with the master + * node. * * It opens a new connection for every shard placement and uses the PQputCopyData * function to copy the data. Because PQputCopyData transmits data, asynchronously, * the workers will ingest data at least partially in parallel. * - * When failing to connect to a worker, the master marks the placement for - * which it was trying to open a connection as inactive, similar to the way - * DML statements are handled. If a failure occurs after connecting, the - * transaction is rolled back on all the workers. Note that, if the underlying - * table is append-partitioned, metadata changes are rolled back on the master - * node, but shard placements are left on the workers. + * For hash-partitioned tables, if it fails to connect to a worker, the master + * marks the placement for which it was trying to open a connection as inactive, + * similar to the way DML statements are handled. If a failure occurs after + * connecting, the transaction is rolled back on all the workers. Note that, + * in the case of append-partitioned tables, if a fail occurs, immediately + * metadata changes are rolled back on the master node, but shard placements + * are left on the worker nodes. * * By default, COPY uses normal transactions on the workers. In the case of * hash or range-partitioned tables, this can cause a problem when some of the @@ -30,9 +33,9 @@ * shards are created and in the case of failure, metadata changes are rolled * back on the master node. * - * Parsing options are processed and enforced on the master, while - * constraints are enforced on the worker. In either case, failure causes - * the whole COPY to roll back. + * Parsing options are processed and enforced on the node where copy command + * is run, while constraints are enforced on the worker. In either case, + * failure causes the whole COPY to roll back. * * Copyright (c) 2016, Citus Data, Inc. * @@ -128,6 +131,9 @@ /* constant used in binary protocol */ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; +/* use a global connection to the master node in order to skip passing it around */ +static PGconn *masterConnection = NULL; + /* ShardConnections represents a set of connections for each placement of a shard */ typedef struct ShardConnections @@ -138,8 +144,11 @@ typedef struct ShardConnections /* Local functions forward declarations */ +static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); -static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag); +static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); +static char MasterPartitionMethod(RangeVar *relation); +static void RemoveMasterOptions(CopyStmt *copyStatement); static void LockAllShards(List *shardIntervalList); static HTAB * CreateShardConnectionHash(void); static int CompareShardIntervalsById(const void *leftElement, const void *rightElement); @@ -147,7 +156,9 @@ static ShardConnections * GetShardConnections(HTAB *shardConnectionHash, int64 shardId, bool *shardConnectionsFound); static void OpenCopyTransactions(CopyStmt *copyStatement, - ShardConnections *shardConnections); + ShardConnections *shardConnections, bool stopOnFailure); +static List * MasterShardPlacementList(uint64 shardId); +static List * RemoteFinalizedShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); @@ -159,8 +170,13 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static void StartCopyToNewShard(ShardConnections *shardConnections, - Oid relationId, CopyStmt *copyStatement); + CopyStmt *copyStatement); +static int64 MasterCreateEmptyShard(char *relationName); +static int64 CreateEmptyShard(char *relationName); +static int64 RemoteCreateEmptyShard(char *relationName); static void FinalizeCopyToNewShard(ShardConnections *shardConnections); +static void MasterUpdateShardStatistics(uint64 shardId); +static void RemoteUpdateShardStatistics(uint64 shardId); /* Private functions copied and adapted from copy.c in PostgreSQL */ static void CopySendData(CopyOutState outputState, const void *databuf, int datasize); @@ -174,14 +190,13 @@ static inline void CopyFlushOutput(CopyOutState outputState, char *start, char * /* * CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy - * statement to related subfunctions based on the partition method of the - * distributed table. + * statement to related subfunctions based on where the copy command is run + * and the partition method of the distributed table. */ void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) { - Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char partitionMethod = '\0'; + bool isCopyFromWorker = false; /* disallow COPY to/from file or program except for superusers */ if (copyStatement->filename != NULL && !superuser()) @@ -204,23 +219,132 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) } } - partitionMethod = PartitionMethod(tableId); - if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE) + isCopyFromWorker = IsCopyFromWorker(copyStatement); + if (isCopyFromWorker) { - CopyToExistingShards(copyStatement, completionTag); - } - else if (partitionMethod == DISTRIBUTE_BY_APPEND) - { - CopyToNewShards(copyStatement, completionTag); + CopyFromWorkerNode(copyStatement, completionTag); } else { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("unsupported partition method"))); + Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + char partitionMethod = PartitionMethod(relationId); + + if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == + DISTRIBUTE_BY_RANGE) + { + CopyToExistingShards(copyStatement, completionTag); + } + else if (partitionMethod == DISTRIBUTE_BY_APPEND) + { + CopyToNewShards(copyStatement, completionTag, relationId); + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("unsupported partition method"))); + } } } +/* + * IsCopyFromWorker checks if the given copy statement has the master host option. + */ +bool +IsCopyFromWorker(CopyStmt *copyStatement) +{ + ListCell *optionCell = NULL; + foreach(optionCell, copyStatement->options) + { + DefElem *defel = (DefElem *) lfirst(optionCell); + if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0) + { + return true; + } + } + + return false; +} + + +/* + * CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes + * for append-partitioned tables. + */ +static void +CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) +{ + NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); + char *nodeName = masterNodeAddress->nodeName; + int32 nodePort = masterNodeAddress->nodePort; + char *nodeUser = CurrentUserName(); + + masterConnection = ConnectToNode(nodeName, nodePort, nodeUser); + + PG_TRY(); + { + PGresult *queryResult = NULL; + Oid relationId = InvalidOid; + char partitionMethod = 0; + + /* strip schema name for local reference */ + char *schemaName = copyStatement->relation->schemaname; + copyStatement->relation->schemaname = NULL; + + relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); + + /* put schema name back */ + copyStatement->relation->schemaname = schemaName; + + partitionMethod = MasterPartitionMethod(copyStatement->relation); + if (partitionMethod != DISTRIBUTE_BY_APPEND) + { + ereport(ERROR, (errmsg("copy from worker nodes is only supported " + "for append-partitioned tables"))); + } + + /* run all metadata commands in a transaction */ + queryResult = PQexec(masterConnection, "BEGIN"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ereport(ERROR, (errmsg("could not start to update master node metadata"))); + } + + PQclear(queryResult); + + /* + * Remove master node options from the copy statement because they are not + * recognized by PostgreSQL machinery. + */ + RemoveMasterOptions(copyStatement); + + CopyToNewShards(copyStatement, completionTag, relationId); + + /* commit metadata transactions */ + queryResult = PQexec(masterConnection, "COMMIT"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ereport(ERROR, (errmsg("could not commit master node metadata changes"))); + } + + PQclear(queryResult); + + /* close the connection */ + PQfinish(masterConnection); + masterConnection = NULL; + } + PG_CATCH(); + { + /* close the connection */ + PQfinish(masterConnection); + masterConnection = NULL; + + PG_RE_THROW(); + } + PG_END_TRY(); +} + + /* * CopyToExistingShards implements the COPY table_name FROM ... for hash or * range-partitioned tables where there are already shards into which to copy @@ -420,7 +544,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) if (!shardConnectionsFound) { /* open connections and initiate COPY on shard placements */ - OpenCopyTransactions(copyStatement, shardConnections); + OpenCopyTransactions(copyStatement, shardConnections, false); /* send copy binary headers to shard placements */ SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); @@ -492,9 +616,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) * tables where we create new shards into which to copy rows. */ static void -CopyToNewShards(CopyStmt *copyStatement, char *completionTag) +CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) { - Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); FmgrInfo *columnOutputFunctions = NULL; /* allocate column values and nulls arrays */ @@ -562,7 +685,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag) if (!nextRowFound) { + /* switch to regular memory context and stop showing line number in errors */ MemoryContextSwitchTo(oldContext); + error_context_stack = errorCallback.previous; break; } @@ -581,7 +706,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag) if (copiedDataSizeInBytes == 0) { /* create shard and open connections to shard placements */ - StartCopyToNewShard(shardConnections, relationId, copyStatement); + StartCopyToNewShard(shardConnections, copyStatement); /* send copy binary headers to shard placements */ SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); @@ -606,7 +731,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag) { SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); FinalizeCopyToNewShard(shardConnections); - UpdateShardStatistics(relationId, shardConnections->shardId); + MasterUpdateShardStatistics(shardConnections->shardId); copiedDataSizeInBytes = 0; } @@ -624,7 +749,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag) { SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); FinalizeCopyToNewShard(shardConnections); - UpdateShardStatistics(relationId, shardConnections->shardId); + MasterUpdateShardStatistics(shardConnections->shardId); } EndCopyFrom(copyState); @@ -652,6 +777,112 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag) } +/* + * MasterNodeAddress gets the master node address from copy options and returns + * it. Note that if the master_port is not provided, we use 5432 as the default + * port. + */ +NodeAddress * +MasterNodeAddress(CopyStmt *copyStatement) +{ + NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress)); + char *nodeName = NULL; + + /* set default port to 5432 */ + int32 nodePort = 5432; + + ListCell *optionCell = NULL; + foreach(optionCell, copyStatement->options) + { + DefElem *defel = (DefElem *) lfirst(optionCell); + if (strncmp(defel->defname, "master_host", NAMEDATALEN) == 0) + { + nodeName = defGetString(defel); + } + else if (strncmp(defel->defname, "master_port", NAMEDATALEN) == 0) + { + nodePort = defGetInt32(defel); + } + } + + masterNodeAddress->nodeName = nodeName; + masterNodeAddress->nodePort = nodePort; + + return masterNodeAddress; +} + + +/* + * MasterPartitionMethod gets the partition method of the given relation from + * the master node and returns it. + */ +static char +MasterPartitionMethod(RangeVar *relation) +{ + char partitionMethod = '\0'; + PGresult *queryResult = NULL; + + char *relationName = relation->relname; + char *schemaName = relation->schemaname; + char *qualifiedName = quote_qualified_identifier(schemaName, relationName); + + StringInfo partitionMethodCommand = makeStringInfo(); + appendStringInfo(partitionMethodCommand, PARTITION_METHOD_QUERY, qualifiedName); + + queryResult = PQexec(masterConnection, partitionMethodCommand->data); + if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) + { + char *partitionMethodString = PQgetvalue((PGresult *) queryResult, 0, 0); + if (partitionMethodString == NULL || (*partitionMethodString) == '\0') + { + ereport(ERROR, (errmsg("could not find a partition method for the " + "table %s", relationName))); + } + + partitionMethod = partitionMethodString[0]; + } + else + { + ReportRemoteError(masterConnection, queryResult); + ereport(ERROR, (errmsg("could not get the partition method of the " + "distributed table"))); + } + + PQclear(queryResult); + + return partitionMethod; +} + + +/* + * RemoveMasterOptions removes master node related copy options from the option + * list of the copy statement. + */ +static void +RemoveMasterOptions(CopyStmt *copyStatement) +{ + List *newOptionList = NIL; + ListCell *optionCell = NULL; + + /* walk over the list of all options */ + foreach(optionCell, copyStatement->options) + { + DefElem *option = (DefElem *) lfirst(optionCell); + + /* skip master related options */ + if ((strncmp(option->defname, "master_host", NAMEDATALEN) == 0) || + (strncmp(option->defname, "master_port", NAMEDATALEN) == 0)) + { + continue; + } + + newOptionList = lappend(newOptionList, option); + } + + copyStatement->options = newOptionList; +} + + /* * LockAllShards takes shared locks on the metadata and the data of all shards in * shardIntervalList. This prevents concurrent placement changes and concurrent @@ -764,13 +995,15 @@ GetShardConnections(HTAB *shardConnectionHash, int64 shardId, * shard placements. */ static void -OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections) +OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, + bool stopOnFailure) { List *finalizedPlacementList = NIL; List *failedPlacementList = NIL; ListCell *placementCell = NULL; ListCell *failedPlacementCell = NULL; List *connectionList = NULL; + int64 shardId = shardConnections->shardId; MemoryContext localContext = AllocSetContextCreate(CurrentMemoryContext, "OpenCopyTransactions", @@ -781,7 +1014,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections /* release finalized placement list at the end of this function */ MemoryContext oldContext = MemoryContextSwitchTo(localContext); - finalizedPlacementList = FinalizedShardPlacementList(shardConnections->shardId); + finalizedPlacementList = MasterShardPlacementList(shardId); MemoryContextSwitchTo(oldContext); @@ -791,17 +1024,20 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections char *nodeName = placement->nodeName; int nodePort = placement->nodePort; char *nodeUser = CurrentUserName(); + PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); + TransactionConnection *transactionConnection = NULL; StringInfo copyCommand = NULL; PGresult *result = NULL; - PGconn *connection = ConnectToNode(nodeName, nodePort, nodeUser); - - /* release failed placement list and copy command at the end of this function */ - oldContext = MemoryContextSwitchTo(localContext); - if (connection == NULL) { + if (stopOnFailure) + { + ereport(ERROR, (errmsg("could not open connection to %s:%d", + nodeName, nodePort))); + } + failedPlacementList = lappend(failedPlacementList, placement); continue; } @@ -811,9 +1047,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections { ReportRemoteError(connection, result); failedPlacementList = lappend(failedPlacementList, placement); + + PQclear(result); continue; } + PQclear(result); copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId); result = PQexec(connection, copyCommand->data); @@ -821,11 +1060,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections { ReportRemoteError(connection, result); failedPlacementList = lappend(failedPlacementList, placement); + + PQclear(result); continue; } - /* preserve transaction connection in regular memory context */ - MemoryContextSwitchTo(oldContext); + PQclear(result); transactionConnection = palloc0(sizeof(TransactionConnection)); @@ -842,6 +1082,12 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections ereport(ERROR, (errmsg("could not find any active placements"))); } + /* + * If stopOnFailure is true, we just error out and code execution should + * never reach to this point. This is the case for copy from worker nodes. + */ + Assert(!stopOnFailure || list_length(failedPlacementList) == 0); + /* otherwise, mark failed placements as inactive: they're stale */ foreach(failedPlacementCell, failedPlacementList) { @@ -860,6 +1106,71 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } +/* + * MasterShardPlacementList dispatches the finalized shard placements call + * between local or remote master node according to the master connection state. + */ +static List * +MasterShardPlacementList(uint64 shardId) +{ + List *finalizedPlacementList = NIL; + if (masterConnection == NULL) + { + finalizedPlacementList = FinalizedShardPlacementList(shardId); + } + else + { + finalizedPlacementList = RemoteFinalizedShardPlacementList(shardId); + } + + return finalizedPlacementList; +} + + +/* + * RemoteFinalizedShardPlacementList gets the finalized shard placement list + * for the given shard id from the remote master node. + */ +static List * +RemoteFinalizedShardPlacementList(uint64 shardId) +{ + List *finalizedPlacementList = NIL; + PGresult *queryResult = NULL; + + StringInfo shardPlacementsCommand = makeStringInfo(); + appendStringInfo(shardPlacementsCommand, FINALIZED_SHARD_PLACEMENTS_QUERY, shardId); + + queryResult = PQexec(masterConnection, shardPlacementsCommand->data); + if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) + { + int rowCount = PQntuples(queryResult); + int rowIndex = 0; + + for (rowIndex = 0; rowIndex < rowCount; rowIndex++) + { + char *nodeName = PQgetvalue(queryResult, rowIndex, 0); + + char *nodePortString = PQgetvalue(queryResult, rowIndex, 1); + uint32 nodePort = atoi(nodePortString); + + ShardPlacement *shardPlacement = + (ShardPlacement *) palloc0(sizeof(ShardPlacement)); + + shardPlacement->nodeName = nodeName; + shardPlacement->nodePort = nodePort; + + finalizedPlacementList = lappend(finalizedPlacementList, shardPlacement); + } + } + else + { + ereport(ERROR, (errmsg("could not get shard placements from the master node"))); + } + + return finalizedPlacementList; +} + + /* Send copy binary headers to given connections */ static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList) @@ -1259,23 +1570,93 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState) * opens connections to shard placements. */ static void -StartCopyToNewShard(ShardConnections *shardConnections, Oid relationId, - CopyStmt *copyStatement) +StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement) { - char *relationName = get_rel_name(relationId); - text *relationNameText = cstring_to_text(relationName); - Datum relationNameDatum = PointerGetDatum(relationNameText); - Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard, - relationNameDatum); + char *relationName = copyStatement->relation->relname; + char *schemaName = copyStatement->relation->schemaname; + char *qualifiedName = quote_qualified_identifier(schemaName, relationName); + + int64 shardId = MasterCreateEmptyShard(qualifiedName); - int64 shardId = DatumGetInt64(shardIdDatum); shardConnections->shardId = shardId; list_free_deep(shardConnections->connectionList); shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyTransactions(copyStatement, shardConnections); + OpenCopyTransactions(copyStatement, shardConnections, true); +} + + +/* + * MasterCreateEmptyShard dispatches the create empty shard call between local or + * remote master node according to the master connection state. + */ +static int64 +MasterCreateEmptyShard(char *relationName) +{ + int64 shardId = 0; + if (masterConnection == NULL) + { + shardId = CreateEmptyShard(relationName); + } + else + { + shardId = RemoteCreateEmptyShard(relationName); + } + + return shardId; +} + + +/* + * CreateEmptyShard creates a new shard and related shard placements from the + * local master node. + */ +static int64 +CreateEmptyShard(char *relationName) +{ + int64 shardId = 0; + + text *relationNameText = cstring_to_text(relationName); + Datum relationNameDatum = PointerGetDatum(relationNameText); + Datum shardIdDatum = DirectFunctionCall1(master_create_empty_shard, + relationNameDatum); + shardId = DatumGetInt64(shardIdDatum); + + return shardId; +} + + +/* + * RemoteCreateEmptyShard creates a new shard and related shard placements from + * the remote master node. + */ +static int64 +RemoteCreateEmptyShard(char *relationName) +{ + int64 shardId = 0; + PGresult *queryResult = NULL; + + StringInfo createEmptyShardCommand = makeStringInfo(); + appendStringInfo(createEmptyShardCommand, CREATE_EMPTY_SHARD_QUERY, relationName); + + queryResult = PQexec(masterConnection, createEmptyShardCommand->data); + if (PQresultStatus(queryResult) == PGRES_TUPLES_OK) + { + char *shardIdString = PQgetvalue((PGresult *) queryResult, 0, 0); + char *shardIdStringEnd = NULL; + shardId = strtoul(shardIdString, &shardIdStringEnd, 0); + } + else + { + ReportRemoteError(masterConnection, queryResult); + ereport(ERROR, (errmsg("could not create a new empty shard on the remote node"))); + } + + PQclear(queryResult); + + return shardId; } @@ -1295,6 +1676,46 @@ FinalizeCopyToNewShard(ShardConnections *shardConnections) } +/* + * MasterUpdateShardStatistics dispatches the update shard statistics call + * between local or remote master node according to the master connection state. + */ +static void +MasterUpdateShardStatistics(uint64 shardId) +{ + if (masterConnection == NULL) + { + UpdateShardStatistics(shardId); + } + else + { + RemoteUpdateShardStatistics(shardId); + } +} + + +/* + * RemoteUpdateShardStatistics updates shard statistics on the remote master node. + */ +static void +RemoteUpdateShardStatistics(uint64 shardId) +{ + PGresult *queryResult = NULL; + + StringInfo updateShardStatisticsCommand = makeStringInfo(); + appendStringInfo(updateShardStatisticsCommand, UPDATE_SHARD_STATISTICS_QUERY, + shardId); + + queryResult = PQexec(masterConnection, updateShardStatisticsCommand->data); + if (PQresultStatus(queryResult) != PGRES_TUPLES_OK) + { + ereport(ERROR, (errmsg("could not update shard statistics"))); + } + + PQclear(queryResult); +} + + /* *INDENT-OFF* */ /* Append data to the copy buffer in outputState */ static void diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index 389bc15da..5a22904e1 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -11,6 +11,7 @@ #include "access/htup_details.h" #include "access/sysattr.h" +#include "access/xact.h" #include "catalog/catalog.h" #include "catalog/index.h" #include "catalog/namespace.h" @@ -25,11 +26,13 @@ #include "distributed/transmit.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" +#include "foreign/foreign.h" #include "executor/executor.h" #include "parser/parser.h" #include "parser/parse_utilcmd.h" #include "storage/lmgr.h" #include "tcop/pquery.h" +#include "tcop/utility.h" #include "utils/builtins.h" #include "utils/inval.h" #include "utils/lsyscache.h" @@ -70,6 +73,7 @@ static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement static void ErrorIfDistributedRenameStmt(RenameStmt *renameStatement); /* Local functions forward declarations for helper functions */ +static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort); static bool IsAlterTableRenameStmt(RenameStmt *renameStatement); static void ExecuteDistributedDDLCommand(Oid relationId, const char *ddlCommandString); static bool ExecuteCommandOnWorkerShards(Oid relationId, const char *commandString, @@ -311,28 +315,52 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR */ if (copyStatement->relation != NULL) { - Relation copiedRelation = NULL; bool isDistributedRelation = false; - bool isFrom = copyStatement->is_from; + bool isCopyFromWorker = IsCopyFromWorker(copyStatement); - /* consider using RangeVarGetRelidExtended to check perms before locking */ - copiedRelation = heap_openrv(copyStatement->relation, - isFrom ? RowExclusiveLock : AccessShareLock); + if (isCopyFromWorker) + { + RangeVar *relation = copyStatement->relation; + NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); + char *nodeName = masterNodeAddress->nodeName; + int32 nodePort = masterNodeAddress->nodePort; - isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); + CreateLocalTable(relation, nodeName, nodePort); - /* ensure future lookups hit the same relation */ - copyStatement->relation->schemaname = get_namespace_name( - RelationGetNamespace(copiedRelation)); + /* + * We expect copy from worker to be on a distributed table; otherwise, + * it fails in CitusCopyFrom() while checking the partition method. + */ + isDistributedRelation = true; + } + else + { + bool isFrom = copyStatement->is_from; + Relation copiedRelation = NULL; - heap_close(copiedRelation, NoLock); + /* consider using RangeVarGetRelidExtended to check perms before locking */ + copiedRelation = heap_openrv(copyStatement->relation, + isFrom ? RowExclusiveLock : AccessShareLock); + + isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation)); + + /* ensure future lookups hit the same relation */ + copyStatement->relation->schemaname = get_namespace_name( + RelationGetNamespace(copiedRelation)); + + heap_close(copiedRelation, NoLock); + } if (isDistributedRelation) { if (copyStatement->is_from) { /* check permissions, we're bypassing postgres' normal checks */ - CheckCopyPermissions(copyStatement); + if (!isCopyFromWorker) + { + CheckCopyPermissions(copyStatement); + } + CitusCopyFrom(copyStatement, completionTag); return NULL; } @@ -834,6 +862,82 @@ ErrorIfDistributedRenameStmt(RenameStmt *renameStatement) } +/* + * CreateLocalTable gets DDL commands from the remote node for the given + * relation. Then, it creates the local relation as temporary and on commit drop. + */ +static void +CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort) +{ + List *ddlCommandList = NIL; + ListCell *ddlCommandCell = NULL; + + char *relationName = relation->relname; + char *schemaName = relation->schemaname; + char *qualifiedName = quote_qualified_identifier(schemaName, relationName); + + /* fetch the ddl commands needed to create the table */ + StringInfo tableNameStringInfo = makeStringInfo(); + appendStringInfoString(tableNameStringInfo, qualifiedName); + + /* + * The warning message created in TableDDLCommandList() is descriptive + * enough; therefore, we just throw an error which says that we could not + * run the copy operation. + */ + ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableNameStringInfo); + if (ddlCommandList == NIL) + { + ereport(ERROR, (errmsg("could not run copy from the worker node"))); + } + + /* apply DDL commands against the local database */ + foreach(ddlCommandCell, ddlCommandList) + { + StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell); + Node *ddlCommandNode = ParseTreeNode(ddlCommand->data); + bool applyDDLCommand = false; + + if (IsA(ddlCommandNode, CreateStmt) || + IsA(ddlCommandNode, CreateForeignTableStmt)) + { + CreateStmt *createStatement = (CreateStmt *) ddlCommandNode; + + /* create the local relation as temporary and on commit drop */ + createStatement->relation->relpersistence = RELPERSISTENCE_TEMP; + createStatement->oncommit = ONCOMMIT_DROP; + + /* temporarily strip schema name */ + createStatement->relation->schemaname = NULL; + + applyDDLCommand = true; + } + else if (IsA(ddlCommandNode, CreateForeignServerStmt)) + { + CreateForeignServerStmt *createServerStmt = + (CreateForeignServerStmt *) ddlCommandNode; + if (GetForeignServerByName(createServerStmt->servername, true) == NULL) + { + /* create server if not exists */ + applyDDLCommand = true; + } + } + else if ((IsA(ddlCommandNode, CreateExtensionStmt))) + { + applyDDLCommand = true; + } + + /* run only a selected set of DDL commands */ + if (applyDDLCommand) + { + ProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode), + PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL); + CommandCounterIncrement(); + } + } +} + + /* * IsAlterTableRenameStmt returns true if the passed in RenameStmt operates on a * distributed table or its objects. This includes: diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 80785ea7e..04dfbd815 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -54,6 +54,7 @@ static StringInfo WorkerPartitionValue(char *nodeName, uint32 nodePort, Oid rela /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_empty_shard); PG_FUNCTION_INFO_V1(master_append_table_to_shard); +PG_FUNCTION_INFO_V1(master_update_shard_statistics); /* @@ -269,7 +270,7 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) RESUME_INTERRUPTS(); /* update shard statistics and get new shard size */ - newShardSize = UpdateShardStatistics(relationId, shardId); + newShardSize = UpdateShardStatistics(shardId); /* calculate ratio of current shard size compared to shard max size */ shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; @@ -279,6 +280,22 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) } +/* + * master_update_shard_statistics updates metadata (shard size and shard min/max + * values) of the given shard and returns the updated shard size. + */ +Datum +master_update_shard_statistics(PG_FUNCTION_ARGS) +{ + int64 shardId = PG_GETARG_INT64(0); + uint64 shardSize = 0; + + shardSize = UpdateShardStatistics(shardId); + + PG_RETURN_INT64(shardSize); +} + + /* * CheckDistributedTable checks if the given relationId corresponds to a * distributed table. If it does not, the function errors out. @@ -401,15 +418,16 @@ WorkerCreateShard(char *nodeName, uint32 nodePort, uint64 shardId, /* - * UpdateShardStatistics updates metadata for the given shard id and returns - * the new shard size. + * UpdateShardStatistics updates metadata (shard size and shard min/max values) + * of the given shard and returns the updated shard size. */ uint64 -UpdateShardStatistics(Oid relationId, int64 shardId) +UpdateShardStatistics(int64 shardId) { ShardInterval *shardInterval = LoadShardInterval(shardId); + Oid relationId = shardInterval->relationId; char storageType = shardInterval->storageType; - char *shardName = NULL; + char *shardQualifiedName = NULL; List *shardPlacementList = NIL; ListCell *shardPlacementCell = NULL; bool statsOK = false; @@ -418,11 +436,17 @@ UpdateShardStatistics(Oid relationId, int64 shardId) text *maxValue = NULL; /* if shard doesn't have an alias, extend regular table name */ - shardName = LoadShardAlias(relationId, shardId); - if (shardName == NULL) + shardQualifiedName = LoadShardAlias(relationId, shardId); + if (shardQualifiedName == NULL) { - shardName = get_rel_name(relationId); - AppendShardIdToName(&shardName, shardId); + char *relationName = get_rel_name(relationId); + + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + + shardQualifiedName = quote_qualified_identifier(schemaName, relationName); + + AppendShardIdToName(&shardQualifiedName, shardId); } shardPlacementList = FinalizedShardPlacementList(shardId); @@ -434,7 +458,7 @@ UpdateShardStatistics(Oid relationId, int64 shardId) char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; - statsOK = WorkerShardStats(workerName, workerPort, relationId, shardName, + statsOK = WorkerShardStats(workerName, workerPort, relationId, shardQualifiedName, &shardSize, &minValue, &maxValue); if (statsOK) { @@ -451,7 +475,8 @@ UpdateShardStatistics(Oid relationId, int64 shardId) */ if (!statsOK) { - ereport(WARNING, (errmsg("could not get statistics for shard %s", shardName), + ereport(WARNING, (errmsg("could not get statistics for shard %s", + shardQualifiedName), errdetail("Setting shard statistics to NULL"))); } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 7e927f75c..2fe1ca67f 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -63,8 +63,6 @@ static bool FetchForeignTable(const char *nodeName, uint32 nodePort, StringInfo tableName); static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName); -static List * TableDDLCommandList(const char *nodeName, uint32 nodePort, - StringInfo tableName); static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort, StringInfo tableName); static bool check_log_statement(List *stmt_list); @@ -870,7 +868,7 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, StringInfo tableName) * DDL commands used in creating the table. If an error occurs during fetching, * the function returns an empty list. */ -static List * +List * TableDDLCommandList(const char *nodeName, uint32 nodePort, StringInfo tableName) { List *ddlCommandList = NIL; diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 95ecb5cbf..136d55f76 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -60,6 +60,12 @@ #define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s" #define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s" #define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s" +#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')" +#define FINALIZED_SHARD_PLACEMENTS_QUERY \ + "SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld" +#define UPDATE_SHARD_STATISTICS_QUERY \ + "SELECT master_update_shard_statistics(%ld)" +#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" /* Enumeration that defines the shard placement policy to use while staging */ @@ -86,7 +92,7 @@ extern void CreateShardPlacements(int64 shardId, List *ddlEventList, char *newPlacementOwner, List *workerNodeList, int workerStartIndex, int replicationFactor); -extern uint64 UpdateShardStatistics(Oid relationId, int64 shardId); +extern uint64 UpdateShardStatistics(int64 shardId); /* Function declarations for generating metadata for shard creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); @@ -99,6 +105,7 @@ extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS); /* Function declarations to help with data staging and deletion */ extern Datum master_create_empty_shard(PG_FUNCTION_ARGS); extern Datum master_append_table_to_shard(PG_FUNCTION_ARGS); +extern Datum master_update_shard_statistics(PG_FUNCTION_ARGS); extern Datum master_apply_delete_command(PG_FUNCTION_ARGS); extern Datum master_drop_all_shards(PG_FUNCTION_ARGS); @@ -108,4 +115,5 @@ extern Datum master_create_worker_shards(PG_FUNCTION_ARGS); /* function declarations for shard repair functionality */ extern Datum master_copy_shard_placement(PG_FUNCTION_ARGS); + #endif /* MASTER_PROTOCOL_H */ diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index fa6ce2af8..3cc44092b 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -41,6 +41,13 @@ typedef struct CopyOutStateData typedef struct CopyOutStateData *CopyOutState; +/* struct type to keep both hostname and port */ +typedef struct NodeAddress +{ + char *nodeName; + int32 nodePort; +} NodeAddress; + /* function declarations for copying into a distributed table */ extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); @@ -51,6 +58,8 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag); +extern bool IsCopyFromWorker(CopyStmt *copyStatement); +extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); #endif /* MULTI_COPY_H */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index b74fa0158..8b2e69d8b 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -119,6 +119,8 @@ extern void RemoveJobSchema(StringInfo schemaName); extern Datum * DeconstructArrayObject(ArrayType *arrayObject); extern int32 ArrayObjectCount(ArrayType *arrayObject); extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId); +extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, + StringInfo tableName); /* Function declarations shared with the master planner */ extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 6175881af..1bd3b8dcd 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -11,6 +11,7 @@ DROP EXTENSION citus; -- Create extension in oldest version, test every upgrade step CREATE EXTENSION citus VERSION '5.0'; ALTER EXTENSION citus UPDATE TO '5.0-1'; +ALTER EXTENSION citus UPDATE TO '5.0-2'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index a59bdf8b2..1c3d6acb0 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -254,3 +254,63 @@ SET citus.shard_max_size TO '256kB'; COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with delimiter '|'; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; + +-- Create customer table for the worker copy with constraint and index +CREATE TABLE customer_worker_copy_append ( + c_custkey integer , + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); + +CREATE INDEX ON customer_worker_copy_append (c_name); + +SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append'); + +-- Connect to the first worker node +\c - - - 57637 + +-- Test copy from the worker node +COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); +COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); + +-- Test if there is no relation to copy data with the worker copy +COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); + +-- Connect back to the master node +\c - - - 57636 + +-- Test the content of the table +SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append; + +-- Test schema support on append partitioned tables +CREATE SCHEMA append; +CREATE TABLE append.customer_copy ( + c_custkey integer , + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117)); + +SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append'); + +-- Test copy from the master node +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|'); + +-- Test copy from the worker node +\c - - - 57637 + +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); + +-- Connect back to the master node +\c - - - 57636 + +-- Test the content of the table +SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 804dffb3a..3b26ff0e0 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -304,3 +304,76 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append':: 5 (1 row) +-- Create customer table for the worker copy with constraint and index +CREATE TABLE customer_worker_copy_append ( + c_custkey integer , + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117), + primary key (c_custkey)); +CREATE INDEX ON customer_worker_copy_append (c_name); +SELECT master_create_distributed_table('customer_worker_copy_append', 'c_custkey', 'append'); +WARNING: table "customer_worker_copy_append" has a unique constraint +DETAIL: Unique constraints and primary keys on append-partitioned tables cannot be enforced. +HINT: Consider using hash partitioning. + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Connect to the first worker node +\c - - - 57637 +-- Test copy from the worker node +COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); +COPY customer_worker_copy_append FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); +-- Test if there is no relation to copy data with the worker copy +COPY lineitem_copy_none FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); +WARNING: could not receive query results from localhost:57636 +DETAIL: Client error: relation "lineitem_copy_none" does not exist +ERROR: could not run copy from the worker node +-- Connect back to the master node +\c - - - 57636 +-- Test the content of the table +SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM customer_worker_copy_append; + min | max | avg | count +-----+------+-----------------------+------- + 1 | 7000 | 4443.8028800000000000 | 2000 +(1 row) + +-- Test schema support on append partitioned tables +CREATE SCHEMA append; +NOTICE: Citus partially supports CREATE SCHEMA for distributed databases +DETAIL: schema usage in joins and in some UDFs provided by Citus are not supported yet +CREATE TABLE append.customer_copy ( + c_custkey integer , + c_name varchar(25) not null, + c_address varchar(40), + c_nationkey integer, + c_phone char(15), + c_acctbal decimal(15,2), + c_mktsegment char(10), + c_comment varchar(117)); +SELECT master_create_distributed_table('append.customer_copy', 'c_custkey', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Test copy from the master node +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|'); +-- Test copy from the worker node +\c - - - 57637 +COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', master_host 'localhost', master_port 57636); +-- Connect back to the master node +\c - - - 57636 +-- Test the content of the table +SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy; + min | max | avg | count +-----+------+-----------------------+------- + 1 | 7000 | 4443.8028800000000000 | 2000 +(1 row) + diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 7c851e1e1..b6b260490 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -13,6 +13,7 @@ DROP EXTENSION citus; -- Create extension in oldest version, test every upgrade step CREATE EXTENSION citus VERSION '5.0'; ALTER EXTENSION citus UPDATE TO '5.0-1'; +ALTER EXTENSION citus UPDATE TO '5.0-2'; -- drop extension an re-create in newest version DROP EXTENSION citus;