Fix COPY produces error when using array of user-defined types

Fixes #463

OID of user-defined types may be different in master and worker nodes. This causes errors
while sending data between nodes with binary nodes. Because binary copy format adds OID
of the element if it is in an array. The code adding OID is in PostgreSQL code, therefore
we cannot change it. Instead we decided to use text format if we try to send array of
user-defined type.
pull/611/head
Burak Yucesoy 2016-06-17 12:06:50 +03:00
parent 800cb26ffd
commit cab03a6274
3 changed files with 326 additions and 20 deletions

View File

@ -141,12 +141,16 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid re
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool stopOnFailure);
ShardConnections *shardConnections, bool stopOnFailure,
bool useBinaryCopyFormat);
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
int64 shardId);
@ -154,7 +158,7 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections,
CopyStmt *copyStatement);
CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName);
@ -348,6 +352,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
FmgrInfo *compareFunction = NULL;
bool hasUniformHashDistribution = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
int shardCount = 0;
List *shardIntervalList = NULL;
@ -441,7 +447,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
executorExpressionContext = GetPerTupleExprContext(executorState);
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = true;
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
@ -528,10 +537,15 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
if (!shardConnectionsFound)
{
/* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections, false);
OpenCopyTransactions(copyStatement, shardConnections, false,
copyOutState->binary);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
@ -546,7 +560,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
connectionList = ConnectionList(shardConnectionHash);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, connectionList);
}
/* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous;
@ -615,6 +632,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
/*
* Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
@ -631,7 +651,10 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
copyStatement->options);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = true;
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
@ -690,10 +713,15 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (copiedDataSizeInBytes == 0)
{
/* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, copyStatement);
StartCopyToNewShard(shardConnections, copyStatement,
copyOutState->binary);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
@ -713,7 +741,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
* */
if (copiedDataSizeInBytes > shardMaxSizeInBytes)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState,
shardConnections->connectionList);
}
FinalizeCopyToNewShard(shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId);
@ -731,7 +763,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
*/
if (copiedDataSizeInBytes > 0)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState,
shardConnections->connectionList);
}
FinalizeCopyToNewShard(shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId);
}
@ -875,7 +911,7 @@ RemoveMasterOptions(CopyStmt *copyStatement)
*/
static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure)
bool stopOnFailure, bool useBinaryCopyFormat)
{
List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL;
@ -932,7 +968,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
PQclear(result);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat);
result = PQexec(connection, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN)
@ -985,6 +1022,49 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
/*
* CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState
* and looks for a column whose type is array of user-defined type or composite type.
* If it finds such column, that means we cannot use binary format for COPY, because
* binary format sends Oid of the types, which are generally not same in master and
* worker nodes for user-defined types.
*/
static bool
CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
{
bool useBinaryCopyFormat = true;
int totalColumnCount = tupleDescription->natts;
int columnIndex = 0;
for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++)
{
Form_pg_attribute currentColumn = tupleDescription->attrs[columnIndex];
Oid typeId = InvalidOid;
char typeCategory = '\0';
bool typePreferred = false;
if (currentColumn->attisdropped)
{
continue;
}
typeId = currentColumn->atttypid;
if (typeId >= FirstNormalObjectId)
{
get_type_category_preferred(typeId, &typeCategory, &typePreferred);
if (typeCategory == TYPCATEGORY_ARRAY ||
typeCategory == TYPCATEGORY_COMPOSITE)
{
useBinaryCopyFormat = false;
break;
}
}
}
return useBinaryCopyFormat;
}
/*
* MasterShardPlacementList dispatches the finalized shard placements call
* between local or remote master node according to the master connection state.
@ -1075,7 +1155,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
* shard.
*/
static StringInfo
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat)
{
StringInfo command = makeStringInfo();
@ -1084,14 +1164,22 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
char *shardName = pstrdup(relationName);
char *shardQualifiedName = NULL;
const char *copyFormat = NULL;
AppendShardIdToName(&shardName, shardId);
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
appendStringInfo(command,
"COPY %s FROM STDIN WITH (FORMAT BINARY)",
shardQualifiedName);
if (useBinaryCopyFormat)
{
copyFormat = "BINARY";
}
else
{
copyFormat = "TEXT";
}
appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName,
copyFormat);
return command;
}
@ -1430,7 +1518,8 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* opens connections to shard placements.
*/
static void
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
bool useBinaryCopyFormat)
{
char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname;
@ -1444,7 +1533,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections, true);
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat);
}

View File

@ -414,3 +414,99 @@ WITH (FORMAT 'csv');
-- Confirm that data was copied
SELECT count(*) FROM "1_customer";
-- Test COPY with types having different Oid at master and workers
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker1
\c - - - :worker_1_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker2
\c - - - :worker_2_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Connect back to master
\c - - - :master_port
-- Test array of user-defined type with hash distribution
CREATE TABLE packed_numbers_hash (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite';
COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_hash;
-- Test composite type containing an element with different Oid with hash distribution
CREATE TABLE super_packed_numbers_hash (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite';
COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_hash;
-- Test array of user-defined type with append distribution
CREATE TABLE packed_numbers_append (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_append;
-- Test composite type containing an element with different Oid with append distribution
CREATE TABLE super_packed_numbers_append (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_append;

View File

@ -550,3 +550,124 @@ SELECT count(*) FROM "1_customer";
2
(1 row)
-- Test COPY with types having different Oid at master and workers
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker1
\c - - - :worker_1_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker2
\c - - - :worker_2_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Connect back to master
\c - - - :master_port
-- Test array of user-defined type with hash distribution
CREATE TABLE packed_numbers_hash (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite';
COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_hash;
id | packed_numbers
----+-----------------------
1 | {"(42,42)","(42,42)"}
(1 row)
-- Test composite type containing an element with different Oid with hash distribution
CREATE TABLE super_packed_numbers_hash (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite';
COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_hash;
id | super_packed_number
----+-----------------------
1 | ("(42,42)","(42,42)")
(1 row)
-- Test array of user-defined type with append distribution
CREATE TABLE packed_numbers_append (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_append;
id | packed_numbers
----+-----------------------
1 | {"(42,42)","(42,42)"}
(1 row)
-- Test composite type containing an element with different Oid with append distribution
CREATE TABLE super_packed_numbers_append (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_append;
id | super_packed_number
----+-----------------------
1 | ("(42,42)","(42,42)")
(1 row)