mirror of https://github.com/citusdata/citus.git
worker_append_table_to_shard becomes aware of partitioned tables
parent
c1b5a04f6e
commit
8520a5b432
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_client_executor.h"
|
#include "distributed/multi_client_executor.h"
|
||||||
#include "distributed/multi_logical_optimizer.h"
|
#include "distributed/multi_logical_optimizer.h"
|
||||||
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_server_executor.h"
|
#include "distributed/multi_server_executor.h"
|
||||||
#include "distributed/multi_utility.h"
|
#include "distributed/multi_utility.h"
|
||||||
#include "distributed/relay_utility.h"
|
#include "distributed/relay_utility.h"
|
||||||
|
@ -763,6 +764,8 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
bool received = false;
|
bool received = false;
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
|
Oid sourceShardRelationId = InvalidOid;
|
||||||
|
Oid sourceSchemaId = InvalidOid;
|
||||||
|
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
@ -787,7 +790,25 @@ worker_append_table_to_shard(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName);
|
sourceQualifiedName = quote_qualified_identifier(sourceSchemaName, sourceTableName);
|
||||||
sourceCopyCommand = makeStringInfo();
|
sourceCopyCommand = makeStringInfo();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Partitioned tables do not support "COPY table TO STDOUT". Thus, we use
|
||||||
|
* "COPY (SELECT * FROM table) TO STDOUT" for partitioned tables.
|
||||||
|
*
|
||||||
|
* If the schema name is not explicitly set, we use the public schema.
|
||||||
|
*/
|
||||||
|
sourceSchemaName = sourceSchemaName ? sourceSchemaName : "public";
|
||||||
|
sourceSchemaId = get_namespace_oid(sourceSchemaName, false);
|
||||||
|
sourceShardRelationId = get_relname_relid(sourceTableName, sourceSchemaId);
|
||||||
|
if (PartitionedTableNoLock(sourceShardRelationId))
|
||||||
|
{
|
||||||
|
appendStringInfo(sourceCopyCommand, COPY_SELECT_ALL_OUT_COMMAND,
|
||||||
|
sourceQualifiedName);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
appendStringInfo(sourceCopyCommand, COPY_OUT_COMMAND, sourceQualifiedName);
|
||||||
|
}
|
||||||
|
|
||||||
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
|
received = ReceiveRegularFile(sourceNodeName, sourceNodePort, NULL, sourceCopyCommand,
|
||||||
localFilePath);
|
localFilePath);
|
||||||
|
|
|
@ -46,6 +46,7 @@
|
||||||
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
/* the tablename in the overloaded COPY statement is the to-be-transferred file */
|
||||||
#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')"
|
#define TRANSMIT_REGULAR_COMMAND "COPY \"%s\" TO STDOUT WITH (format 'transmit')"
|
||||||
#define COPY_OUT_COMMAND "COPY %s TO STDOUT"
|
#define COPY_OUT_COMMAND "COPY %s TO STDOUT"
|
||||||
|
#define COPY_SELECT_ALL_OUT_COMMAND "COPY (SELECT * FROM %s) TO STDOUT"
|
||||||
#define COPY_IN_COMMAND "COPY %s FROM '%s'"
|
#define COPY_IN_COMMAND "COPY %s FROM '%s'"
|
||||||
|
|
||||||
/* Defines that relate to creating tables */
|
/* Defines that relate to creating tables */
|
||||||
|
|
Loading…
Reference in New Issue