Merge pull request #611 from citusdata/fix/fix_463_copy_on_array_of_user_defined_types

Fix COPY produces error when using array of user-defined types
pull/648/head
Burak Yücesoy 2016-07-13 11:43:21 +03:00 committed by GitHub
commit b01d19db3d
3 changed files with 326 additions and 20 deletions

View File

@ -141,12 +141,16 @@ static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid re
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyTransactions(CopyStmt *copyStatement,
ShardConnections *shardConnections, bool stopOnFailure);
ShardConnections *shardConnections, bool stopOnFailure,
bool useBinaryCopyFormat);
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId,
bool useBinaryCopyFormat);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
int64 shardId);
@ -154,7 +158,7 @@ static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections,
CopyStmt *copyStatement);
CopyStmt *copyStatement, bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName);
@ -348,6 +352,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
FmgrInfo *compareFunction = NULL;
bool hasUniformHashDistribution = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
int shardCount = 0;
List *shardIntervalList = NULL;
@ -441,7 +447,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
executorExpressionContext = GetPerTupleExprContext(executorState);
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = true;
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
@ -528,10 +537,15 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
if (!shardConnectionsFound)
{
/* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections, false);
OpenCopyTransactions(copyStatement, shardConnections, false,
copyOutState->binary);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
@ -546,7 +560,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
connectionList = ConnectionList(shardConnectionHash);
/* send copy binary footers to all shard placements */
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState, connectionList);
}
/* all lines have been copied, stop showing line number in errors */
error_context_stack = errorCallback.previous;
@ -615,6 +632,9 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
/*
* Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
@ -631,7 +651,10 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
copyStatement->options);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->binary = true;
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
@ -690,10 +713,15 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (copiedDataSizeInBytes == 0)
{
/* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, copyStatement);
StartCopyToNewShard(shardConnections, copyStatement,
copyOutState->binary);
/* send copy binary headers to shard placements */
SendCopyBinaryHeaders(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryHeaders(copyOutState,
shardConnections->connectionList);
}
}
/* replicate row to shard placements */
@ -713,7 +741,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
* */
if (copiedDataSizeInBytes > shardMaxSizeInBytes)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState,
shardConnections->connectionList);
}
FinalizeCopyToNewShard(shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId);
@ -731,7 +763,11 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
*/
if (copiedDataSizeInBytes > 0)
{
SendCopyBinaryFooters(copyOutState, shardConnections->connectionList);
if (copyOutState->binary)
{
SendCopyBinaryFooters(copyOutState,
shardConnections->connectionList);
}
FinalizeCopyToNewShard(shardConnections);
MasterUpdateShardStatistics(shardConnections->shardId);
}
@ -875,7 +911,7 @@ RemoveMasterOptions(CopyStmt *copyStatement)
*/
static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections,
bool stopOnFailure)
bool stopOnFailure, bool useBinaryCopyFormat)
{
List *finalizedPlacementList = NIL;
List *failedPlacementList = NIL;
@ -932,7 +968,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
PQclear(result);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId,
useBinaryCopyFormat);
result = PQexec(connection, copyCommand->data);
if (PQresultStatus(result) != PGRES_COPY_IN)
@ -985,6 +1022,49 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
}
/*
* CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState
* and looks for a column whose type is array of user-defined type or composite type.
* If it finds such column, that means we cannot use binary format for COPY, because
* binary format sends Oid of the types, which are generally not same in master and
* worker nodes for user-defined types.
*/
static bool
CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
{
bool useBinaryCopyFormat = true;
int totalColumnCount = tupleDescription->natts;
int columnIndex = 0;
for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++)
{
Form_pg_attribute currentColumn = tupleDescription->attrs[columnIndex];
Oid typeId = InvalidOid;
char typeCategory = '\0';
bool typePreferred = false;
if (currentColumn->attisdropped)
{
continue;
}
typeId = currentColumn->atttypid;
if (typeId >= FirstNormalObjectId)
{
get_type_category_preferred(typeId, &typeCategory, &typePreferred);
if (typeCategory == TYPCATEGORY_ARRAY ||
typeCategory == TYPCATEGORY_COMPOSITE)
{
useBinaryCopyFormat = false;
break;
}
}
}
return useBinaryCopyFormat;
}
/*
* MasterShardPlacementList dispatches the finalized shard placements call
* between local or remote master node according to the master connection state.
@ -1075,7 +1155,7 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
* shard.
*/
static StringInfo
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat)
{
StringInfo command = makeStringInfo();
@ -1084,14 +1164,22 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId)
char *shardName = pstrdup(relationName);
char *shardQualifiedName = NULL;
const char *copyFormat = NULL;
AppendShardIdToName(&shardName, shardId);
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
appendStringInfo(command,
"COPY %s FROM STDIN WITH (FORMAT BINARY)",
shardQualifiedName);
if (useBinaryCopyFormat)
{
copyFormat = "BINARY";
}
else
{
copyFormat = "TEXT";
}
appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName,
copyFormat);
return command;
}
@ -1430,7 +1518,8 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* opens connections to shard placements.
*/
static void
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
bool useBinaryCopyFormat)
{
char *relationName = copyStatement->relation->relname;
char *schemaName = copyStatement->relation->schemaname;
@ -1444,7 +1533,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement)
shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections, true);
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat);
}

View File

@ -414,3 +414,99 @@ WITH (FORMAT 'csv');
-- Confirm that data was copied
SELECT count(*) FROM "1_customer";
-- Test COPY with types having different Oid at master and workers
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker1
\c - - - :worker_1_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker2
\c - - - :worker_2_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Connect back to master
\c - - - :master_port
-- Test array of user-defined type with hash distribution
CREATE TABLE packed_numbers_hash (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite';
COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_hash;
-- Test composite type containing an element with different Oid with hash distribution
CREATE TABLE super_packed_numbers_hash (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite';
COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_hash;
-- Test array of user-defined type with append distribution
CREATE TABLE packed_numbers_append (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_append;
-- Test composite type containing an element with different Oid with append distribution
CREATE TABLE super_packed_numbers_append (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_append;

View File

@ -550,3 +550,124 @@ SELECT count(*) FROM "1_customer";
2
(1 row)
-- Test COPY with types having different Oid at master and workers
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker1
\c - - - :worker_1_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Create same types in worker2
\c - - - :worker_2_port
CREATE TYPE number_pack AS (
number1 integer,
number2 integer
);
CREATE TYPE super_number_pack AS (
packed_number1 number_pack,
packed_number2 number_pack
);
-- Connect back to master
\c - - - :master_port
-- Test array of user-defined type with hash distribution
CREATE TABLE packed_numbers_hash (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('packed_numbers_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO '/tmp/copy_test_array_of_composite';
COPY packed_numbers_hash FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_hash;
id | packed_numbers
----+-----------------------
1 | {"(42,42)","(42,42)"}
(1 row)
-- Test composite type containing an element with different Oid with hash distribution
CREATE TABLE super_packed_numbers_hash (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1);
master_create_worker_shards
-----------------------------
(1 row)
COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO '/tmp/copy_test_composite_of_composite';
COPY super_packed_numbers_hash FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_hash;
id | super_packed_number
----+-----------------------
1 | ("(42,42)","(42,42)")
(1 row)
-- Test array of user-defined type with append distribution
CREATE TABLE packed_numbers_append (
id integer,
packed_numbers number_pack[]
);
SELECT master_create_distributed_table('packed_numbers_append', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
COPY packed_numbers_append FROM '/tmp/copy_test_array_of_composite';
-- Verify data is actually copied
SELECT * FROM packed_numbers_append;
id | packed_numbers
----+-----------------------
1 | {"(42,42)","(42,42)"}
(1 row)
-- Test composite type containing an element with different Oid with append distribution
CREATE TABLE super_packed_numbers_append (
id integer,
super_packed_number super_number_pack
);
SELECT master_create_distributed_table('super_packed_numbers_append', 'id', 'append');
master_create_distributed_table
---------------------------------
(1 row)
COPY super_packed_numbers_append FROM '/tmp/copy_test_composite_of_composite';
-- Verify data is actually copied
SELECT * FROM super_packed_numbers_append;
id | super_packed_number
----+-----------------------
1 | ("(42,42)","(42,42)")
(1 row)