diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 3f1c3bcfa..4fe427247 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -141,12 +141,16 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid re static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); static void OpenCopyTransactions(CopyStmt *copyStatement, - ShardConnections *shardConnections, bool stopOnFailure); + ShardConnections *shardConnections, bool stopOnFailure, + bool useBinaryCopyFormat); +static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, + CopyOutState rowOutputState); static List * MasterShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); -static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); +static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, + bool useBinaryCopyFormat); static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId); @@ -154,7 +158,7 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static void StartCopyToNewShard(ShardConnections *shardConnections, - CopyStmt *copyStatement); + CopyStmt *copyStatement, bool useBinaryCopyFormat); static int64 MasterCreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName); @@ -348,6 +352,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) FmgrInfo *compareFunction = NULL; bool hasUniformHashDistribution = false; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; int shardCount = 0; List *shardIntervalList = NULL; @@ -441,7 +447,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) executorExpressionContext = GetPerTupleExprContext(executorState); copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->binary = true; + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->rowcontext = executorTupleContext; @@ -528,10 +537,15 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) if (!shardConnectionsFound) { /* open connections and initiate COPY on shard placements */ - OpenCopyTransactions(copyStatement, shardConnections, false); + OpenCopyTransactions(copyStatement, shardConnections, false, + copyOutState->binary); /* send copy binary headers to shard placements */ - SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, + shardConnections->connectionList); + } } /* replicate row to shard placements */ @@ -546,7 +560,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) connectionList = ConnectionList(shardConnectionHash); /* send copy binary footers to all shard placements */ - SendCopyBinaryFooters(copyOutState, connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, connectionList); + } /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; @@ -615,6 +632,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + /* * Shard connections should be initialized before the PG_TRY, since it is * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH @@ -631,7 +651,10 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) copyStatement->options); CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->binary = true; + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->rowcontext = executorTupleContext; @@ -690,10 +713,15 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) if (copiedDataSizeInBytes == 0) { /* create shard and open connections to shard placements */ - StartCopyToNewShard(shardConnections, copyStatement); + StartCopyToNewShard(shardConnections, copyStatement, + copyOutState->binary); /* send copy binary headers to shard placements */ - SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, + shardConnections->connectionList); + } } /* replicate row to shard placements */ @@ -713,7 +741,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) * */ if (copiedDataSizeInBytes > shardMaxSizeInBytes) { - SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, + shardConnections->connectionList); + } FinalizeCopyToNewShard(shardConnections); MasterUpdateShardStatistics(shardConnections->shardId); @@ -731,7 +763,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) */ if (copiedDataSizeInBytes > 0) { - SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, + shardConnections->connectionList); + } FinalizeCopyToNewShard(shardConnections); MasterUpdateShardStatistics(shardConnections->shardId); } @@ -875,7 +911,7 @@ RemoveMasterOptions(CopyStmt *copyStatement) */ static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, - bool stopOnFailure) + bool stopOnFailure, bool useBinaryCopyFormat) { List *finalizedPlacementList = NIL; List *failedPlacementList = NIL; @@ -932,7 +968,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } PQclear(result); - copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId); + copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, + useBinaryCopyFormat); result = PQexec(connection, copyCommand->data); if (PQresultStatus(result) != PGRES_COPY_IN) @@ -985,6 +1022,49 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } +/* + * CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState + * and looks for a column whose type is array of user-defined type or composite type. + * If it finds such column, that means we cannot use binary format for COPY, because + * binary format sends Oid of the types, which are generally not same in master and + * worker nodes for user-defined types. + */ +static bool +CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState) +{ + bool useBinaryCopyFormat = true; + int totalColumnCount = tupleDescription->natts; + int columnIndex = 0; + + for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescription->attrs[columnIndex]; + Oid typeId = InvalidOid; + char typeCategory = '\0'; + bool typePreferred = false; + + if (currentColumn->attisdropped) + { + continue; + } + + typeId = currentColumn->atttypid; + if (typeId >= FirstNormalObjectId) + { + get_type_category_preferred(typeId, &typeCategory, &typePreferred); + if (typeCategory == TYPCATEGORY_ARRAY || + typeCategory == TYPCATEGORY_COMPOSITE) + { + useBinaryCopyFormat = false; + break; + } + } + } + + return useBinaryCopyFormat; +} + + /* * MasterShardPlacementList dispatches the finalized shard placements call * between local or remote master node according to the master connection state. @@ -1075,7 +1155,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) * shard. */ static StringInfo -ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) +ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat) { StringInfo command = makeStringInfo(); @@ -1084,14 +1164,22 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; + const char *copyFormat = NULL; AppendShardIdToName(&shardName, shardId); shardQualifiedName = quote_qualified_identifier(schemaName, shardName); - appendStringInfo(command, - "COPY %s FROM STDIN WITH (FORMAT BINARY)", - shardQualifiedName); + if (useBinaryCopyFormat) + { + copyFormat = "BINARY"; + } + else + { + copyFormat = "TEXT"; + } + appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName, + copyFormat); return command; } @@ -1430,7 +1518,8 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState) * opens connections to shard placements. */ static void -StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement) +StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, + bool useBinaryCopyFormat) { char *relationName = copyStatement->relation->relname; char *schemaName = copyStatement->relation->schemaname; @@ -1444,7 +1533,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement) shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyTransactions(copyStatement, shardConnections, true); + OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); } diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index b518d01e2..c5454061e 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -414,3 +414,99 @@ WITH (FORMAT 'csv'); -- Confirm that data was copied SELECT count(*) FROM "1_customer"; + +-- Test COPY with types having different Oid at master and workers +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + +-- Create same types in worker1 +\c - - - :worker_1_port + +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + +-- Create same types in worker2 +\c - - - :worker_2_port + +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + + +-- Connect back to master +\c - - - :master_port + + +-- Test array of user-defined type with hash distribution +CREATE TABLE packed_numbers_hash ( + id integer, + packed_numbers number_pack[] +); + +SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash'); +SELECT master_create_worker_shards('packed_numbers_hash', 4, 1); +COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite'; +COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite'; + +-- Verify data is actually copied +SELECT * FROM packed_numbers_hash; + +-- Test composite type containing an element with different Oid with hash distribution + +CREATE TABLE super_packed_numbers_hash ( + id integer, + super_packed_number super_number_pack +); + +SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash'); +SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1); +COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite'; +COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite'; + +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_hash; + +-- Test array of user-defined type with append distribution +CREATE TABLE packed_numbers_append ( + id integer, + packed_numbers number_pack[] +); + +SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append'); +COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite'; + +-- Verify data is actually copied +SELECT * FROM packed_numbers_append; + +-- Test composite type containing an element with different Oid with append distribution + +CREATE TABLE super_packed_numbers_append ( + id integer, + super_packed_number super_number_pack +); + +SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append'); +COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite'; + +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_append; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index cb16b1498..326605937 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -550,3 +550,124 @@ SELECT count(*) FROM "1_customer"; 2 (1 row) +-- Test COPY with types having different Oid at master and workers +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Create same types in worker1 +\c - - - :worker_1_port +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Create same types in worker2 +\c - - - :worker_2_port +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Connect back to master +\c - - - :master_port +-- Test array of user-defined type with hash distribution +CREATE TABLE packed_numbers_hash ( + id integer, + packed_numbers number_pack[] +); +SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('packed_numbers_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite'; +COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite'; +-- Verify data is actually copied +SELECT * FROM packed_numbers_hash; + id | packed_numbers +----+----------------------- + 1 | {"(42,42)","(42,42)"} +(1 row) + +-- Test composite type containing an element with different Oid with hash distribution +CREATE TABLE super_packed_numbers_hash ( + id integer, + super_packed_number super_number_pack +); +SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite'; +COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite'; +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_hash; + id | super_packed_number +----+----------------------- + 1 | ("(42,42)","(42,42)") +(1 row) + +-- Test array of user-defined type with append distribution +CREATE TABLE packed_numbers_append ( + id integer, + packed_numbers number_pack[] +); +SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite'; +-- Verify data is actually copied +SELECT * FROM packed_numbers_append; + id | packed_numbers +----+----------------------- + 1 | {"(42,42)","(42,42)"} +(1 row) + +-- Test composite type containing an element with different Oid with append distribution +CREATE TABLE super_packed_numbers_append ( + id integer, + super_packed_number super_number_pack +); +SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite'; +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_append; + id | super_packed_number +----+----------------------- + 1 | ("(42,42)","(42,42)") +(1 row) +