mirror of https://github.com/citusdata/citus.git
Merge pull request #1300 from citusdata/ltree_copy_branch
Change copy format of ltreepull/1318/head
commit
cfc0992137
|
@ -93,8 +93,8 @@ static void OpenCopyConnections(CopyStmt *copyStatement,
|
||||||
ShardConnections *shardConnections, bool stopOnFailure,
|
ShardConnections *shardConnections, bool stopOnFailure,
|
||||||
bool useBinaryCopyFormat);
|
bool useBinaryCopyFormat);
|
||||||
|
|
||||||
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
|
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription);
|
||||||
CopyOutState rowOutputState);
|
static bool BinaryOutputFunctionDefined(Oid typeId);
|
||||||
static List * MasterShardPlacementList(uint64 shardId);
|
static List * MasterShardPlacementList(uint64 shardId);
|
||||||
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
static List * RemoteFinalizedShardPlacementList(uint64 shardId);
|
||||||
|
|
||||||
|
@ -464,7 +464,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
copyOutState->delim = (char *) delimiterCharacter;
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||||
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
|
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor);
|
||||||
copyOutState->fe_msgbuf = makeStringInfo();
|
copyOutState->fe_msgbuf = makeStringInfo();
|
||||||
copyOutState->rowcontext = executorTupleContext;
|
copyOutState->rowcontext = executorTupleContext;
|
||||||
|
|
||||||
|
@ -809,14 +809,15 @@ OpenCopyConnections(CopyStmt *copyStatement, ShardConnections *shardConnections,
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState
|
* CanUseBinaryCopyFormat iterates over columns of the relation and looks for a
|
||||||
* and looks for a column whose type is array of user-defined type or composite type.
|
* column whose type is array of user-defined type or composite type. If it finds
|
||||||
* If it finds such column, that means we cannot use binary format for COPY, because
|
* such column, that means we cannot use binary format for COPY, because binary
|
||||||
* binary format sends Oid of the types, which are generally not same in master and
|
* format sends Oid of the types, which are generally not same in master and
|
||||||
* worker nodes for user-defined types.
|
* worker nodes for user-defined types. If the function can not detect a binary
|
||||||
|
* output function for any of the column, it returns false.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
|
CanUseBinaryCopyFormat(TupleDesc tupleDescription)
|
||||||
{
|
{
|
||||||
bool useBinaryCopyFormat = true;
|
bool useBinaryCopyFormat = true;
|
||||||
int totalColumnCount = tupleDescription->natts;
|
int totalColumnCount = tupleDescription->natts;
|
||||||
|
@ -828,6 +829,7 @@ CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
|
||||||
Oid typeId = InvalidOid;
|
Oid typeId = InvalidOid;
|
||||||
char typeCategory = '\0';
|
char typeCategory = '\0';
|
||||||
bool typePreferred = false;
|
bool typePreferred = false;
|
||||||
|
bool binaryOutputFunctionDefined = false;
|
||||||
|
|
||||||
if (currentColumn->attisdropped)
|
if (currentColumn->attisdropped)
|
||||||
{
|
{
|
||||||
|
@ -835,6 +837,15 @@ CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
|
||||||
}
|
}
|
||||||
|
|
||||||
typeId = currentColumn->atttypid;
|
typeId = currentColumn->atttypid;
|
||||||
|
|
||||||
|
/* built-in types may also don't have binary output function */
|
||||||
|
binaryOutputFunctionDefined = BinaryOutputFunctionDefined(typeId);
|
||||||
|
if (!binaryOutputFunctionDefined)
|
||||||
|
{
|
||||||
|
useBinaryCopyFormat = false;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
if (typeId >= FirstNormalObjectId)
|
if (typeId >= FirstNormalObjectId)
|
||||||
{
|
{
|
||||||
get_type_category_preferred(typeId, &typeCategory, &typePreferred);
|
get_type_category_preferred(typeId, &typeCategory, &typePreferred);
|
||||||
|
@ -851,6 +862,32 @@ CanUseBinaryCopyFormat(TupleDesc tupleDescription, CopyOutState rowOutputState)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* BinaryOutputFunctionDefined checks whether binary output function is defined
|
||||||
|
* for the given type.
|
||||||
|
*/
|
||||||
|
static bool
|
||||||
|
BinaryOutputFunctionDefined(Oid typeId)
|
||||||
|
{
|
||||||
|
Oid typeFunctionId = InvalidOid;
|
||||||
|
Oid typeIoParam = InvalidOid;
|
||||||
|
int16 typeLength = 0;
|
||||||
|
bool typeByVal = false;
|
||||||
|
char typeAlign = 0;
|
||||||
|
char typeDelim = 0;
|
||||||
|
|
||||||
|
get_type_io_data(typeId, IOFunc_send, &typeLength, &typeByVal,
|
||||||
|
&typeAlign, &typeDelim, &typeIoParam, &typeFunctionId);
|
||||||
|
|
||||||
|
if (OidIsValid(typeFunctionId))
|
||||||
|
{
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* MasterShardPlacementList dispatches the finalized shard placements call
|
* MasterShardPlacementList dispatches the finalized shard placements call
|
||||||
* between local or remote master node according to the master connection state.
|
* between local or remote master node according to the master connection state.
|
||||||
|
@ -1732,7 +1769,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
copyOutState->delim = (char *) delimiterCharacter;
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||||
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor, copyOutState);
|
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor);
|
||||||
copyOutState->fe_msgbuf = makeStringInfo();
|
copyOutState->fe_msgbuf = makeStringInfo();
|
||||||
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
|
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
|
||||||
copyDest->copyOutState = copyOutState;
|
copyDest->copyOutState = copyOutState;
|
||||||
|
|
|
@ -46,11 +46,11 @@ check-worker: all
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/worker_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/worker_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
check-multi: all tempinstall-main
|
check-multi: all tempinstall-main
|
||||||
$(pg_regress_multi_check) --load-extension=citus \
|
$(pg_regress_multi_check) --load-extension=citus --load-extension=ltree \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
check-multi-vg: all tempinstall-main
|
check-multi-vg: all tempinstall-main
|
||||||
$(pg_regress_multi_check) --load-extension=citus --valgrind \
|
$(pg_regress_multi_check) --load-extension=citus --valgrind --load-extension=ltree \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
check-isolation: all tempinstall-main
|
check-isolation: all tempinstall-main
|
||||||
|
@ -65,14 +65,14 @@ check-multi-mx: all tempinstall-main
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_mx_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
check-multi-task-tracker-extra: all tempinstall-main
|
check-multi-task-tracker-extra: all tempinstall-main
|
||||||
$(pg_regress_multi_check) --load-extension=citus \
|
$(pg_regress_multi_check) --load-extension=citus --load-extension=ltree \
|
||||||
--server-option=citus.task_executor_type=task-tracker \
|
--server-option=citus.task_executor_type=task-tracker \
|
||||||
--server-option=citus.large_table_shard_count=1 \
|
--server-option=citus.large_table_shard_count=1 \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_task_tracker_extra_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_task_tracker_extra_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
|
|
||||||
check-multi-binary: all tempinstall-main
|
check-multi-binary: all tempinstall-main
|
||||||
$(pg_regress_multi_check) --load-extension=citus \
|
$(pg_regress_multi_check) --load-extension=citus --load-extension=ltree \
|
||||||
--server-option=citus.binary_worker_copy_format=on \
|
--server-option=citus.binary_worker_copy_format=on \
|
||||||
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_binary_schedule $(EXTRA_TESTS)
|
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/multi_binary_schedule $(EXTRA_TESTS)
|
||||||
|
|
||||||
|
|
|
@ -749,3 +749,17 @@ SELECT shardid, shardstate, nodename, nodeport
|
||||||
DROP TABLE numbers_hash;
|
DROP TABLE numbers_hash;
|
||||||
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
|
||||||
|
-- Test with ltree column
|
||||||
|
CREATE TABLE test_ltree (path ltree);
|
||||||
|
SELECT create_reference_table('test_ltree');
|
||||||
|
|
||||||
|
COPY test_ltree FROM STDIN DELIMITER AS ',';
|
||||||
|
Top
|
||||||
|
Top.Science
|
||||||
|
Top.Science.Astronomy
|
||||||
|
\.
|
||||||
|
|
||||||
|
SELECT path, nlevel(path) AS depth FROM test_ltree ORDER BY depth;
|
||||||
|
|
||||||
|
DROP TABLE test_ltree;
|
||||||
|
|
|
@ -1017,3 +1017,21 @@ SELECT * FROM run_command_on_workers('DROP USER test_user');
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
DROP USER test_user;
|
DROP USER test_user;
|
||||||
|
-- Test with ltree column
|
||||||
|
CREATE TABLE test_ltree (path ltree);
|
||||||
|
SELECT create_reference_table('test_ltree');
|
||||||
|
create_reference_table
|
||||||
|
------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
COPY test_ltree FROM STDIN DELIMITER AS ',';
|
||||||
|
SELECT path, nlevel(path) AS depth FROM test_ltree ORDER BY depth;
|
||||||
|
path | depth
|
||||||
|
-----------------------+-------
|
||||||
|
Top | 1
|
||||||
|
Top.Science | 2
|
||||||
|
Top.Science.Astronomy | 3
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
|
DROP TABLE test_ltree;
|
||||||
|
|
Loading…
Reference in New Issue