From cab03a6274b77087ec5ec9b7d4e13a5506b7f422 Mon Sep 17 00:00:00 2001 From: Burak Yucesoy Date: Fri, 17 Jun 2016 12:06:50 +0300 Subject: [PATCH] Fix COPY produces error when using array of user-defined types Fixes #463 OID of user-defined types may be different in master and worker nodes. This causes errors while sending data between nodes with binary nodes. Because binary copy format adds OID of the element if it is in an array. The code adding OID is in PostgreSQL code, therefore we cannot change it. Instead we decided to use text format if we try to send array of user-defined type. --- src/backend/distributed/commands/multi_copy.c | 129 +++++++++++++++--- src/test/regress/input/multi_copy.source | 96 +++++++++++++ src/test/regress/output/multi_copy.source | 121 ++++++++++++++++ 3 files changed, 326 insertions(+), 20 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 3f1c3bcfa..4fe427247 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -141,12 +141,16 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid re static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); static void OpenCopyTransactions(CopyStmt *copyStatement, - ShardConnections *shardConnections, bool stopOnFailure); + ShardConnections *shardConnections, bool stopOnFailure, + bool useBinaryCopyFormat); +static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, + CopyOutState rowOutputState); static List * MasterShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId); static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); -static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId); +static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, + bool useBinaryCopyFormat); static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, int64 shardId); @@ -154,7 +158,7 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void ReportCopyError(PGconn *connection, PGresult *result); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static void StartCopyToNewShard(ShardConnections *shardConnections, - CopyStmt *copyStatement); + CopyStmt *copyStatement, bool useBinaryCopyFormat); static int64 MasterCreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName); @@ -348,6 +352,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) FmgrInfo *compareFunction = NULL; bool hasUniformHashDistribution = false; DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; int shardCount = 0; List *shardIntervalList = NULL; @@ -441,7 +447,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) executorExpressionContext = GetPerTupleExprContext(executorState); copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->binary = true; + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->rowcontext = executorTupleContext; @@ -528,10 +537,15 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) if (!shardConnectionsFound) { /* open connections and initiate COPY on shard placements */ - OpenCopyTransactions(copyStatement, shardConnections, false); + OpenCopyTransactions(copyStatement, shardConnections, false, + copyOutState->binary); /* send copy binary headers to shard placements */ - SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, + shardConnections->connectionList); + } } /* replicate row to shard placements */ @@ -546,7 +560,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) connectionList = ConnectionList(shardConnectionHash); /* send copy binary footers to all shard placements */ - SendCopyBinaryFooters(copyOutState, connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, connectionList); + } /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; @@ -615,6 +632,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + /* * Shard connections should be initialized before the PG_TRY, since it is * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH @@ -631,7 +651,10 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) copyStatement->options); CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->binary = true; + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); copyOutState->fe_msgbuf = makeStringInfo(); copyOutState->rowcontext = executorTupleContext; @@ -690,10 +713,15 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) if (copiedDataSizeInBytes == 0) { /* create shard and open connections to shard placements */ - StartCopyToNewShard(shardConnections, copyStatement); + StartCopyToNewShard(shardConnections, copyStatement, + copyOutState->binary); /* send copy binary headers to shard placements */ - SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, + shardConnections->connectionList); + } } /* replicate row to shard placements */ @@ -713,7 +741,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) * */ if (copiedDataSizeInBytes > shardMaxSizeInBytes) { - SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, + shardConnections->connectionList); + } FinalizeCopyToNewShard(shardConnections); MasterUpdateShardStatistics(shardConnections->shardId); @@ -731,7 +763,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) */ if (copiedDataSizeInBytes > 0) { - SendCopyBinaryFooters(copyOutState, shardConnections->connectionList); + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, + shardConnections->connectionList); + } FinalizeCopyToNewShard(shardConnections); MasterUpdateShardStatistics(shardConnections->shardId); } @@ -875,7 +911,7 @@ RemoveMasterOptions(CopyStmt *copyStatement) */ static void OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, - bool stopOnFailure) + bool stopOnFailure, bool useBinaryCopyFormat) { List *finalizedPlacementList = NIL; List *failedPlacementList = NIL; @@ -932,7 +968,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } PQclear(result); - copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId); + copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, + useBinaryCopyFormat); result = PQexec(connection, copyCommand->data); if (PQresultStatus(result) != PGRES_COPY_IN) @@ -985,6 +1022,49 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections } +/* + * CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState + * and looks for a column whose type is array of user-defined type or composite type. + * If it finds such column, that means we cannot use binary format for COPY, because + * binary format sends Oid of the types, which are generally not same in master and + * worker nodes for user-defined types. + */ +static bool +CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState) +{ + bool useBinaryCopyFormat = true; + int totalColumnCount = tupleDescription->natts; + int columnIndex = 0; + + for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescription->attrs[columnIndex]; + Oid typeId = InvalidOid; + char typeCategory = '\0'; + bool typePreferred = false; + + if (currentColumn->attisdropped) + { + continue; + } + + typeId = currentColumn->atttypid; + if (typeId >= FirstNormalObjectId) + { + get_type_category_preferred(typeId, &typeCategory, &typePreferred); + if (typeCategory == TYPCATEGORY_ARRAY || + typeCategory == TYPCATEGORY_COMPOSITE) + { + useBinaryCopyFormat = false; + break; + } + } + } + + return useBinaryCopyFormat; +} + + /* * MasterShardPlacementList dispatches the finalized shard placements call * between local or remote master node according to the master connection state. @@ -1075,7 +1155,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList) * shard. */ static StringInfo -ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) +ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat) { StringInfo command = makeStringInfo(); @@ -1084,14 +1164,22 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId) char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; + const char *copyFormat = NULL; AppendShardIdToName(&shardName, shardId); shardQualifiedName = quote_qualified_identifier(schemaName, shardName); - appendStringInfo(command, - "COPY %s FROM STDIN WITH (FORMAT BINARY)", - shardQualifiedName); + if (useBinaryCopyFormat) + { + copyFormat = "BINARY"; + } + else + { + copyFormat = "TEXT"; + } + appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName, + copyFormat); return command; } @@ -1430,7 +1518,8 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState) * opens connections to shard placements. */ static void -StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement) +StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, + bool useBinaryCopyFormat) { char *relationName = copyStatement->relation->relname; char *schemaName = copyStatement->relation->schemaname; @@ -1444,7 +1533,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement) shardConnections->connectionList = NIL; /* connect to shards placements and start transactions */ - OpenCopyTransactions(copyStatement, shardConnections, true); + OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); } diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index b518d01e2..c5454061e 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -414,3 +414,99 @@ WITH (FORMAT 'csv'); -- Confirm that data was copied SELECT count(*) FROM "1_customer"; + +-- Test COPY with types having different Oid at master and workers +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + +-- Create same types in worker1 +\c - - - :worker_1_port + +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + +-- Create same types in worker2 +\c - - - :worker_2_port + +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); + +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); + + +-- Connect back to master +\c - - - :master_port + + +-- Test array of user-defined type with hash distribution +CREATE TABLE packed_numbers_hash ( + id integer, + packed_numbers number_pack[] +); + +SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash'); +SELECT master_create_worker_shards('packed_numbers_hash', 4, 1); +COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite'; +COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite'; + +-- Verify data is actually copied +SELECT * FROM packed_numbers_hash; + +-- Test composite type containing an element with different Oid with hash distribution + +CREATE TABLE super_packed_numbers_hash ( + id integer, + super_packed_number super_number_pack +); + +SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash'); +SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1); +COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite'; +COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite'; + +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_hash; + +-- Test array of user-defined type with append distribution +CREATE TABLE packed_numbers_append ( + id integer, + packed_numbers number_pack[] +); + +SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append'); +COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite'; + +-- Verify data is actually copied +SELECT * FROM packed_numbers_append; + +-- Test composite type containing an element with different Oid with append distribution + +CREATE TABLE super_packed_numbers_append ( + id integer, + super_packed_number super_number_pack +); + +SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append'); +COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite'; + +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_append; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index cb16b1498..326605937 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -550,3 +550,124 @@ SELECT count(*) FROM "1_customer"; 2 (1 row) +-- Test COPY with types having different Oid at master and workers +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Create same types in worker1 +\c - - - :worker_1_port +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Create same types in worker2 +\c - - - :worker_2_port +CREATE TYPE number_pack AS ( + number1 integer, + number2 integer +); +CREATE TYPE super_number_pack AS ( + packed_number1 number_pack, + packed_number2 number_pack +); +-- Connect back to master +\c - - - :master_port +-- Test array of user-defined type with hash distribution +CREATE TABLE packed_numbers_hash ( + id integer, + packed_numbers number_pack[] +); +SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('packed_numbers_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite'; +COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite'; +-- Verify data is actually copied +SELECT * FROM packed_numbers_hash; + id | packed_numbers +----+----------------------- + 1 | {"(42,42)","(42,42)"} +(1 row) + +-- Test composite type containing an element with different Oid with hash distribution +CREATE TABLE super_packed_numbers_hash ( + id integer, + super_packed_number super_number_pack +); +SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite'; +COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite'; +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_hash; + id | super_packed_number +----+----------------------- + 1 | ("(42,42)","(42,42)") +(1 row) + +-- Test array of user-defined type with append distribution +CREATE TABLE packed_numbers_append ( + id integer, + packed_numbers number_pack[] +); +SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite'; +-- Verify data is actually copied +SELECT * FROM packed_numbers_append; + id | packed_numbers +----+----------------------- + 1 | {"(42,42)","(42,42)"} +(1 row) + +-- Test composite type containing an element with different Oid with append distribution +CREATE TABLE super_packed_numbers_append ( + id integer, + super_packed_number super_number_pack +); +SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite'; +-- Verify data is actually copied +SELECT * FROM super_packed_numbers_append; + id | super_packed_number +----+----------------------- + 1 | ("(42,42)","(42,42)") +(1 row) +