mirror of https://github.com/citusdata/citus.git
Merge pull request #1117 from citusdata/create_table_data_migration
Migrate data on create_distributed_table cr: @jasonmp85pull/1264/head
commit
33b58f8e26
|
@ -9,6 +9,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "postgres.h"
|
#include "postgres.h"
|
||||||
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/hash.h"
|
#include "access/hash.h"
|
||||||
|
@ -39,11 +40,14 @@
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/metadata_sync.h"
|
#include "distributed/metadata_sync.h"
|
||||||
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_logical_planner.h"
|
#include "distributed/multi_logical_planner.h"
|
||||||
#include "distributed/pg_dist_colocation.h"
|
#include "distributed/pg_dist_colocation.h"
|
||||||
#include "distributed/pg_dist_partition.h"
|
#include "distributed/pg_dist_partition.h"
|
||||||
#include "distributed/reference_table_utils.h"
|
#include "distributed/reference_table_utils.h"
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
#include "distributed/worker_transaction.h"
|
#include "distributed/worker_transaction.h"
|
||||||
|
#include "executor/executor.h"
|
||||||
#include "executor/spi.h"
|
#include "executor/spi.h"
|
||||||
#include "nodes/execnodes.h"
|
#include "nodes/execnodes.h"
|
||||||
#include "nodes/nodeFuncs.h"
|
#include "nodes/nodeFuncs.h"
|
||||||
|
@ -52,10 +56,14 @@
|
||||||
#include "parser/parse_node.h"
|
#include "parser/parse_node.h"
|
||||||
#include "parser/parse_relation.h"
|
#include "parser/parse_relation.h"
|
||||||
#include "parser/parser.h"
|
#include "parser/parser.h"
|
||||||
|
#include "tcop/pquery.h"
|
||||||
|
#include "tcop/tcopprot.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
#include "utils/memutils.h"
|
||||||
#include "utils/rel.h"
|
#include "utils/rel.h"
|
||||||
|
#include "utils/snapmgr.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
#include "utils/inval.h"
|
#include "utils/inval.h"
|
||||||
|
|
||||||
|
@ -65,10 +73,10 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
|
||||||
|
|
||||||
|
|
||||||
/* local function forward declarations */
|
/* local function forward declarations */
|
||||||
static void CreateReferenceTable(Oid relationId);
|
static void CreateReferenceTable(Oid distributedRelationId);
|
||||||
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, char replicationModel,
|
char distributionMethod, char replicationModel,
|
||||||
uint32 colocationId);
|
uint32 colocationId, bool requireEmpty);
|
||||||
static char LookupDistributionMethod(Oid distributionMethodOid);
|
static char LookupDistributionMethod(Oid distributionMethodOid);
|
||||||
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
|
||||||
int16 supportFunctionNumber);
|
int16 supportFunctionNumber);
|
||||||
|
@ -83,6 +91,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
|
||||||
char *colocateWithTableName,
|
char *colocateWithTableName,
|
||||||
int shardCount, int replicationFactor);
|
int shardCount, int replicationFactor);
|
||||||
static Oid ColumnType(Oid relationId, char *columnName);
|
static Oid ColumnType(Oid relationId, char *columnName);
|
||||||
|
static void CopyLocalDataIntoShards(Oid relationId);
|
||||||
|
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
||||||
|
@ -106,6 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
char *distributionColumnName = text_to_cstring(distributionColumnText);
|
||||||
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
|
||||||
|
bool requireEmpty = true;
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
@ -121,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
|
||||||
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
||||||
INVALID_COLOCATION_ID);
|
INVALID_COLOCATION_ID, requireEmpty);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -177,6 +188,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
/* if distribution method is not hash, just create partition metadata */
|
/* if distribution method is not hash, just create partition metadata */
|
||||||
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
if (distributionMethod != DISTRIBUTE_BY_HASH)
|
||||||
{
|
{
|
||||||
|
bool requireEmpty = true;
|
||||||
|
|
||||||
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
|
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
|
||||||
{
|
{
|
||||||
ereport(NOTICE, (errmsg("using statement-based replication"),
|
ereport(NOTICE, (errmsg("using statement-based replication"),
|
||||||
|
@ -186,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
|
||||||
|
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
distributionMethod, REPLICATION_MODEL_COORDINATOR,
|
||||||
INVALID_COLOCATION_ID);
|
INVALID_COLOCATION_ID, requireEmpty);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -232,6 +245,8 @@ CreateReferenceTable(Oid relationId)
|
||||||
List *workerNodeList = WorkerNodeList();
|
List *workerNodeList = WorkerNodeList();
|
||||||
int replicationFactor = list_length(workerNodeList);
|
int replicationFactor = list_length(workerNodeList);
|
||||||
char *distributionColumnName = NULL;
|
char *distributionColumnName = NULL;
|
||||||
|
bool requireEmpty = true;
|
||||||
|
char relationKind = 0;
|
||||||
|
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
@ -245,23 +260,38 @@ CreateReferenceTable(Oid relationId)
|
||||||
errdetail("There are no active worker nodes.")));
|
errdetail("There are no active worker nodes.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* relax empty table requirement for regular (non-foreign) tables */
|
||||||
|
relationKind = get_rel_relkind(relationId);
|
||||||
|
if (relationKind == RELKIND_RELATION)
|
||||||
|
{
|
||||||
|
requireEmpty = false;
|
||||||
|
}
|
||||||
|
|
||||||
colocationId = CreateReferenceTableColocationId();
|
colocationId = CreateReferenceTableColocationId();
|
||||||
|
|
||||||
/* first, convert the relation into distributed relation */
|
/* first, convert the relation into distributed relation */
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName,
|
ConvertToDistributedTable(relationId, distributionColumnName,
|
||||||
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId);
|
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId,
|
||||||
|
requireEmpty);
|
||||||
|
|
||||||
/* now, create the single shard replicated to all nodes */
|
/* now, create the single shard replicated to all nodes */
|
||||||
CreateReferenceTableShard(relationId);
|
CreateReferenceTableShard(relationId);
|
||||||
|
|
||||||
CreateTableMetadataOnWorkers(relationId);
|
CreateTableMetadataOnWorkers(relationId);
|
||||||
|
|
||||||
|
/* copy over data for regular relations */
|
||||||
|
if (relationKind == RELKIND_RELATION)
|
||||||
|
{
|
||||||
|
CopyLocalDataIntoShards(relationId);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* ConvertToDistributedTable converts the given regular PostgreSQL table into a
|
* ConvertToDistributedTable converts the given regular PostgreSQL table into a
|
||||||
* distributed table. First, it checks if the given table can be distributed,
|
* distributed table. First, it checks if the given table can be distributed,
|
||||||
* then it creates related tuple in pg_dist_partition.
|
* then it creates related tuple in pg_dist_partition. If requireEmpty is true,
|
||||||
|
* this function errors out when presented with a relation containing rows.
|
||||||
*
|
*
|
||||||
* XXX: We should perform more checks here to see if this table is fit for
|
* XXX: We should perform more checks here to see if this table is fit for
|
||||||
* partitioning. At a minimum, we should validate the following: (i) this node
|
* partitioning. At a minimum, we should validate the following: (i) this node
|
||||||
|
@ -272,7 +302,7 @@ CreateReferenceTable(Oid relationId)
|
||||||
static void
|
static void
|
||||||
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
char distributionMethod, char replicationModel,
|
char distributionMethod, char replicationModel,
|
||||||
uint32 colocationId)
|
uint32 colocationId, bool requireEmpty)
|
||||||
{
|
{
|
||||||
Relation relation = NULL;
|
Relation relation = NULL;
|
||||||
TupleDesc relationDesc = NULL;
|
TupleDesc relationDesc = NULL;
|
||||||
|
@ -322,8 +352,8 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
"foreign tables.")));
|
"foreign tables.")));
|
||||||
}
|
}
|
||||||
|
|
||||||
/* check that the relation does not contain any rows */
|
/* check that table is empty if that is required */
|
||||||
if (!LocalTableEmpty(relationId))
|
if (requireEmpty && !LocalTableEmpty(relationId))
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
|
||||||
errmsg("cannot distribute relation \"%s\"",
|
errmsg("cannot distribute relation \"%s\"",
|
||||||
|
@ -915,6 +945,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
uint32 colocationId = INVALID_COLOCATION_ID;
|
uint32 colocationId = INVALID_COLOCATION_ID;
|
||||||
Oid sourceRelationId = InvalidOid;
|
Oid sourceRelationId = InvalidOid;
|
||||||
Oid distributionColumnType = InvalidOid;
|
Oid distributionColumnType = InvalidOid;
|
||||||
|
bool requireEmpty = true;
|
||||||
|
char relationKind = 0;
|
||||||
|
|
||||||
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
|
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
|
||||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||||
|
@ -957,9 +989,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
colocationId = TableColocationId(sourceRelationId);
|
colocationId = TableColocationId(sourceRelationId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* relax empty table requirement for regular (non-foreign) tables */
|
||||||
|
relationKind = get_rel_relkind(relationId);
|
||||||
|
if (relationKind == RELKIND_RELATION)
|
||||||
|
{
|
||||||
|
requireEmpty = false;
|
||||||
|
}
|
||||||
|
|
||||||
/* create distributed table metadata */
|
/* create distributed table metadata */
|
||||||
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
|
||||||
ReplicationModel, colocationId);
|
ReplicationModel, colocationId, requireEmpty);
|
||||||
|
|
||||||
/* create shards */
|
/* create shards */
|
||||||
if (sourceRelationId != InvalidOid)
|
if (sourceRelationId != InvalidOid)
|
||||||
|
@ -976,6 +1015,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
|
||||||
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* copy over data for regular relations */
|
||||||
|
if (relationKind == RELKIND_RELATION)
|
||||||
|
{
|
||||||
|
CopyLocalDataIntoShards(relationId);
|
||||||
|
}
|
||||||
|
|
||||||
heap_close(pgDistColocation, NoLock);
|
heap_close(pgDistColocation, NoLock);
|
||||||
relation_close(distributedRelation, NoLock);
|
relation_close(distributedRelation, NoLock);
|
||||||
}
|
}
|
||||||
|
@ -1021,3 +1066,143 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
|
||||||
"factor\" to one%s.", extraHint)));
|
"factor\" to one%s.", extraHint)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CopyLocalDataIntoShards copies data from the local table, which is hidden
|
||||||
|
* after converting it to a distributed table, into the shards of the distributed
|
||||||
|
* table.
|
||||||
|
*
|
||||||
|
* This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
|
||||||
|
* We cannot use a regular COPY here since that cannot read from a table. Instead
|
||||||
|
* we read from the table and pass each tuple to the CitusCopyDestReceiver which
|
||||||
|
* opens a connection and starts a COPY for each shard placement that will have
|
||||||
|
* data.
|
||||||
|
*
|
||||||
|
* We could call the planner and executor here and send the output to the
|
||||||
|
* DestReceiver, but we are in a tricky spot here since Citus is already
|
||||||
|
* intercepting queries on this table in the planner and executor hooks and we
|
||||||
|
* want to read from the local table. To keep it simple, we perform a heap scan
|
||||||
|
* directly on the table.
|
||||||
|
*
|
||||||
|
* Any writes on the table that are started during this operation will be handled
|
||||||
|
* as distributed queries once the current transaction commits. SELECTs will
|
||||||
|
* continue to read from the local table until the current transaction commits,
|
||||||
|
* after which new SELECTs will be handled as distributed queries.
|
||||||
|
*
|
||||||
|
* After copying local data into the distributed table, the local data remains
|
||||||
|
* in place and should be truncated at a later time.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CopyLocalDataIntoShards(Oid distributedRelationId)
|
||||||
|
{
|
||||||
|
DestReceiver *copyDest = NULL;
|
||||||
|
List *columnNameList = NIL;
|
||||||
|
Relation distributedRelation = NULL;
|
||||||
|
TupleDesc tupleDescriptor = NULL;
|
||||||
|
bool stopOnFailure = true;
|
||||||
|
|
||||||
|
EState *estate = NULL;
|
||||||
|
HeapScanDesc scan = NULL;
|
||||||
|
HeapTuple tuple = NULL;
|
||||||
|
ExprContext *econtext = NULL;
|
||||||
|
MemoryContext oldContext = NULL;
|
||||||
|
TupleTableSlot *slot = NULL;
|
||||||
|
uint64 rowsCopied = 0;
|
||||||
|
|
||||||
|
/* take an ExclusiveLock to block all operations except SELECT */
|
||||||
|
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
|
||||||
|
|
||||||
|
/* get the table columns */
|
||||||
|
tupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
|
slot = MakeSingleTupleTableSlot(tupleDescriptor);
|
||||||
|
columnNameList = TupleDescColumnNameList(tupleDescriptor);
|
||||||
|
|
||||||
|
/* initialise per-tuple memory context */
|
||||||
|
estate = CreateExecutorState();
|
||||||
|
econtext = GetPerTupleExprContext(estate);
|
||||||
|
econtext->ecxt_scantuple = slot;
|
||||||
|
|
||||||
|
copyDest =
|
||||||
|
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
|
||||||
|
columnNameList, estate,
|
||||||
|
stopOnFailure);
|
||||||
|
|
||||||
|
/* initialise state for writing to shards, we'll open connections on demand */
|
||||||
|
copyDest->rStartup(copyDest, 0, tupleDescriptor);
|
||||||
|
|
||||||
|
/* begin reading from local table */
|
||||||
|
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
|
||||||
|
|
||||||
|
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
|
||||||
|
|
||||||
|
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
|
||||||
|
{
|
||||||
|
/* materialize tuple and send it to a shard */
|
||||||
|
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
|
||||||
|
copyDest->receiveSlot(slot, copyDest);
|
||||||
|
|
||||||
|
/* clear tuple memory */
|
||||||
|
ResetPerTupleExprContext(estate);
|
||||||
|
|
||||||
|
/* make sure we roll back on cancellation */
|
||||||
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
|
if (rowsCopied == 0)
|
||||||
|
{
|
||||||
|
ereport(NOTICE, (errmsg("Copying data from local table...")));
|
||||||
|
}
|
||||||
|
|
||||||
|
rowsCopied++;
|
||||||
|
|
||||||
|
if (rowsCopied % 1000000 == 0)
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (rowsCopied % 1000000 != 0)
|
||||||
|
{
|
||||||
|
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
|
||||||
|
}
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
/* finish reading from the local table */
|
||||||
|
heap_endscan(scan);
|
||||||
|
|
||||||
|
/* finish writing into the shards */
|
||||||
|
copyDest->rShutdown(copyDest);
|
||||||
|
|
||||||
|
/* free memory and close the relation */
|
||||||
|
ExecDropSingleTupleTableSlot(slot);
|
||||||
|
FreeExecutorState(estate);
|
||||||
|
heap_close(distributedRelation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* TupleDescColumnNameList returns a list of column names for the given tuple
|
||||||
|
* descriptor as plain strings.
|
||||||
|
*/
|
||||||
|
static List *
|
||||||
|
TupleDescColumnNameList(TupleDesc tupleDescriptor)
|
||||||
|
{
|
||||||
|
List *columnNameList = NIL;
|
||||||
|
int columnIndex = 0;
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
|
||||||
|
char *columnName = NameStr(currentColumn->attname);
|
||||||
|
|
||||||
|
if (currentColumn->attisdropped)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
columnNameList = lappend(columnNameList, columnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
return columnNameList;
|
||||||
|
}
|
||||||
|
|
|
@ -68,6 +68,7 @@
|
||||||
#include "distributed/remote_commands.h"
|
#include "distributed/remote_commands.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "executor/executor.h"
|
#include "executor/executor.h"
|
||||||
|
#include "nodes/makefuncs.h"
|
||||||
#include "tsearch/ts_locale.h"
|
#include "tsearch/ts_locale.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -127,6 +128,19 @@ static void CopySendInt16(CopyOutState outputState, int16 val);
|
||||||
static void CopyAttributeOutText(CopyOutState outputState, char *string);
|
static void CopyAttributeOutText(CopyOutState outputState, char *string);
|
||||||
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
|
static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer);
|
||||||
|
|
||||||
|
/* CitusCopyDestReceiver functions */
|
||||||
|
static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation,
|
||||||
|
TupleDesc inputTupleDesc);
|
||||||
|
#if PG_VERSION_NUM >= 90600
|
||||||
|
static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot,
|
||||||
|
DestReceiver *copyDest);
|
||||||
|
#else
|
||||||
|
static void CitusCopyDestReceiverReceive(TupleTableSlot *slot,
|
||||||
|
DestReceiver *copyDest);
|
||||||
|
#endif
|
||||||
|
static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
|
||||||
|
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
|
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
|
||||||
|
@ -273,49 +287,31 @@ static void
|
||||||
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
{
|
{
|
||||||
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
|
||||||
char *relationName = get_rel_name(tableId);
|
|
||||||
|
CitusCopyDestReceiver *copyDest = NULL;
|
||||||
|
DestReceiver *dest = NULL;
|
||||||
|
|
||||||
Relation distributedRelation = NULL;
|
Relation distributedRelation = NULL;
|
||||||
TupleDesc tupleDescriptor = NULL;
|
TupleDesc tupleDescriptor = NULL;
|
||||||
uint32 columnCount = 0;
|
uint32 columnCount = 0;
|
||||||
Datum *columnValues = NULL;
|
Datum *columnValues = NULL;
|
||||||
bool *columnNulls = NULL;
|
bool *columnNulls = NULL;
|
||||||
FmgrInfo *hashFunction = NULL;
|
int columnIndex = 0;
|
||||||
FmgrInfo *compareFunction = NULL;
|
List *columnNameList = NIL;
|
||||||
bool hasUniformHashDistribution = false;
|
TupleTableSlot *tupleTableSlot = NULL;
|
||||||
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
|
|
||||||
const char *delimiterCharacter = "\t";
|
|
||||||
const char *nullPrintCharacter = "\\N";
|
|
||||||
|
|
||||||
int shardCount = 0;
|
|
||||||
List *shardIntervalList = NULL;
|
|
||||||
ShardInterval **shardIntervalCache = NULL;
|
|
||||||
bool useBinarySearch = false;
|
|
||||||
|
|
||||||
HTAB *shardConnectionHash = NULL;
|
|
||||||
ShardConnections *shardConnections = NULL;
|
|
||||||
List *shardConnectionsList = NIL;
|
|
||||||
ListCell *shardConnectionsCell = NULL;
|
|
||||||
|
|
||||||
EState *executorState = NULL;
|
EState *executorState = NULL;
|
||||||
MemoryContext executorTupleContext = NULL;
|
MemoryContext executorTupleContext = NULL;
|
||||||
ExprContext *executorExpressionContext = NULL;
|
ExprContext *executorExpressionContext = NULL;
|
||||||
|
|
||||||
|
char partitionMethod = 0;
|
||||||
|
bool stopOnFailure = false;
|
||||||
|
|
||||||
CopyState copyState = NULL;
|
CopyState copyState = NULL;
|
||||||
CopyOutState copyOutState = NULL;
|
|
||||||
FmgrInfo *columnOutputFunctions = NULL;
|
|
||||||
uint64 processedRowCount = 0;
|
uint64 processedRowCount = 0;
|
||||||
|
|
||||||
Var *partitionColumn = PartitionColumn(tableId, 0);
|
|
||||||
char partitionMethod = PartitionMethod(tableId);
|
|
||||||
|
|
||||||
ErrorContextCallback errorCallback;
|
ErrorContextCallback errorCallback;
|
||||||
|
|
||||||
/* get hash function for partition column */
|
|
||||||
hashFunction = cacheEntry->hashFunction;
|
|
||||||
|
|
||||||
/* get compare function for shard intervals */
|
|
||||||
compareFunction = cacheEntry->shardIntervalCompareFunction;
|
|
||||||
|
|
||||||
/* allocate column values and nulls arrays */
|
/* allocate column values and nulls arrays */
|
||||||
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||||
tupleDescriptor = RelationGetDescr(distributedRelation);
|
tupleDescriptor = RelationGetDescr(distributedRelation);
|
||||||
|
@ -323,64 +319,40 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
columnValues = palloc0(columnCount * sizeof(Datum));
|
columnValues = palloc0(columnCount * sizeof(Datum));
|
||||||
columnNulls = palloc0(columnCount * sizeof(bool));
|
columnNulls = palloc0(columnCount * sizeof(bool));
|
||||||
|
|
||||||
/* we don't support copy to reference tables from workers */
|
/* set up a virtual tuple table slot */
|
||||||
|
tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor);
|
||||||
|
tupleTableSlot->tts_nvalid = columnCount;
|
||||||
|
tupleTableSlot->tts_values = columnValues;
|
||||||
|
tupleTableSlot->tts_isnull = columnNulls;
|
||||||
|
|
||||||
|
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
||||||
|
{
|
||||||
|
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
|
||||||
|
char *columnName = NameStr(currentColumn->attname);
|
||||||
|
|
||||||
|
if (currentColumn->attisdropped)
|
||||||
|
{
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
columnNameList = lappend(columnNameList, columnName);
|
||||||
|
}
|
||||||
|
|
||||||
|
executorState = CreateExecutorState();
|
||||||
|
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
|
executorExpressionContext = GetPerTupleExprContext(executorState);
|
||||||
|
|
||||||
|
partitionMethod = PartitionMethod(tableId);
|
||||||
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
{
|
{
|
||||||
EnsureCoordinator();
|
stopOnFailure = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
/* load the list of shards and verify that we have shards to copy into */
|
/* set up the destination for the COPY */
|
||||||
shardIntervalList = LoadShardIntervalList(tableId);
|
copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState,
|
||||||
if (shardIntervalList == NIL)
|
stopOnFailure);
|
||||||
{
|
dest = (DestReceiver *) copyDest;
|
||||||
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
dest->rStartup(dest, 0, tupleDescriptor);
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("could not find any shards into which to copy"),
|
|
||||||
errdetail("No shards exist for distributed table \"%s\".",
|
|
||||||
relationName),
|
|
||||||
errhint("Run master_create_worker_shards to create shards "
|
|
||||||
"and try again.")));
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("could not find any shards into which to copy"),
|
|
||||||
errdetail("No shards exist for distributed table \"%s\".",
|
|
||||||
relationName)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* error if any shard missing min/max values for non reference tables */
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
|
||||||
cacheEntry->hasUninitializedShardInterval)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("could not start copy"),
|
|
||||||
errdetail("Distributed relation \"%s\" has shards "
|
|
||||||
"with missing shardminvalue/shardmaxvalue.",
|
|
||||||
relationName)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/* prevent concurrent placement changes and non-commutative DML statements */
|
|
||||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
|
||||||
LockShardListResources(shardIntervalList, ShareLock);
|
|
||||||
|
|
||||||
/* initialize the shard interval cache */
|
|
||||||
shardCount = cacheEntry->shardIntervalArrayLength;
|
|
||||||
shardIntervalCache = cacheEntry->sortedShardIntervalArray;
|
|
||||||
hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution;
|
|
||||||
|
|
||||||
/* determine whether to use binary search */
|
|
||||||
if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution)
|
|
||||||
{
|
|
||||||
useBinarySearch = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
|
|
||||||
{
|
|
||||||
CoordinatedTransactionUse2PC();
|
|
||||||
}
|
|
||||||
|
|
||||||
/* initialize copy state to read from COPY data source */
|
/* initialize copy state to read from COPY data source */
|
||||||
copyState = BeginCopyFrom(distributedRelation,
|
copyState = BeginCopyFrom(distributedRelation,
|
||||||
|
@ -389,23 +361,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
copyStatement->attlist,
|
copyStatement->attlist,
|
||||||
copyStatement->options);
|
copyStatement->options);
|
||||||
|
|
||||||
executorState = CreateExecutorState();
|
|
||||||
executorTupleContext = GetPerTupleMemoryContext(executorState);
|
|
||||||
executorExpressionContext = GetPerTupleExprContext(executorState);
|
|
||||||
|
|
||||||
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
|
||||||
copyOutState->delim = (char *) delimiterCharacter;
|
|
||||||
copyOutState->null_print = (char *) nullPrintCharacter;
|
|
||||||
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
|
||||||
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
|
|
||||||
copyOutState->fe_msgbuf = makeStringInfo();
|
|
||||||
copyOutState->rowcontext = executorTupleContext;
|
|
||||||
|
|
||||||
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
|
|
||||||
|
|
||||||
/* create a mapping of shard id to a connection for each of its placements */
|
|
||||||
shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
|
|
||||||
|
|
||||||
/* set up callback to identify error line number */
|
/* set up callback to identify error line number */
|
||||||
errorCallback.callback = CopyFromErrorCallback;
|
errorCallback.callback = CopyFromErrorCallback;
|
||||||
errorCallback.arg = (void *) copyState;
|
errorCallback.arg = (void *) copyState;
|
||||||
|
@ -415,10 +370,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
bool nextRowFound = false;
|
bool nextRowFound = false;
|
||||||
Datum partitionColumnValue = 0;
|
|
||||||
ShardInterval *shardInterval = NULL;
|
|
||||||
int64 shardId = 0;
|
|
||||||
bool shardConnectionsFound = false;
|
|
||||||
MemoryContext oldContext = NULL;
|
MemoryContext oldContext = NULL;
|
||||||
|
|
||||||
ResetPerTupleExprContext(executorState);
|
ResetPerTupleExprContext(executorState);
|
||||||
|
@ -437,103 +388,23 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
|
||||||
CHECK_FOR_INTERRUPTS();
|
CHECK_FOR_INTERRUPTS();
|
||||||
|
|
||||||
/*
|
|
||||||
* Find the partition column value and corresponding shard interval
|
|
||||||
* for non-reference tables.
|
|
||||||
* Get the existing (and only a single) shard interval for the reference
|
|
||||||
* tables. Note that, reference tables has NULL partition column values so
|
|
||||||
* skip the check.
|
|
||||||
*/
|
|
||||||
if (partitionColumn != NULL)
|
|
||||||
{
|
|
||||||
if (columnNulls[partitionColumn->varattno - 1])
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
|
||||||
errmsg("cannot copy row with NULL value "
|
|
||||||
"in partition column")));
|
|
||||||
}
|
|
||||||
|
|
||||||
partitionColumnValue = columnValues[partitionColumn->varattno - 1];
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
|
||||||
* Find the shard interval and id for the partition column value for
|
|
||||||
* non-reference tables.
|
|
||||||
* For reference table, this function blindly returns the tables single
|
|
||||||
* shard.
|
|
||||||
*/
|
|
||||||
shardInterval = FindShardInterval(partitionColumnValue,
|
|
||||||
shardIntervalCache,
|
|
||||||
shardCount, partitionMethod,
|
|
||||||
compareFunction, hashFunction,
|
|
||||||
useBinarySearch);
|
|
||||||
|
|
||||||
if (shardInterval == NULL)
|
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
|
||||||
errmsg("could not find shard for partition column "
|
|
||||||
"value")));
|
|
||||||
}
|
|
||||||
|
|
||||||
shardId = shardInterval->shardId;
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(oldContext);
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
/* get existing connections to the shard placements, if any */
|
dest->receiveSlot(tupleTableSlot, dest);
|
||||||
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
|
|
||||||
&shardConnectionsFound);
|
|
||||||
if (!shardConnectionsFound)
|
|
||||||
{
|
|
||||||
bool stopOnFailure = false;
|
|
||||||
|
|
||||||
if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE)
|
|
||||||
{
|
|
||||||
stopOnFailure = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
/* open connections and initiate COPY on shard placements */
|
|
||||||
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
|
|
||||||
copyOutState->binary);
|
|
||||||
|
|
||||||
/* send copy binary headers to shard placements */
|
|
||||||
if (copyOutState->binary)
|
|
||||||
{
|
|
||||||
SendCopyBinaryHeaders(copyOutState, shardId,
|
|
||||||
shardConnections->connectionList);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* replicate row to shard placements */
|
|
||||||
resetStringInfo(copyOutState->fe_msgbuf);
|
|
||||||
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
|
||||||
copyOutState, columnOutputFunctions);
|
|
||||||
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId,
|
|
||||||
shardConnections->connectionList);
|
|
||||||
|
|
||||||
processedRowCount += 1;
|
processedRowCount += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
EndCopyFrom(copyState);
|
||||||
|
|
||||||
/* all lines have been copied, stop showing line number in errors */
|
/* all lines have been copied, stop showing line number in errors */
|
||||||
error_context_stack = errorCallback.previous;
|
error_context_stack = errorCallback.previous;
|
||||||
|
|
||||||
shardConnectionsList = ShardConnectionList(shardConnectionHash);
|
/* finish the COPY commands */
|
||||||
foreach(shardConnectionsCell, shardConnectionsList)
|
dest->rShutdown(dest);
|
||||||
{
|
|
||||||
ShardConnections *shardConnections = (ShardConnections *) lfirst(
|
|
||||||
shardConnectionsCell);
|
|
||||||
|
|
||||||
/* send copy binary footers to all shard placements */
|
ExecDropSingleTupleTableSlot(tupleTableSlot);
|
||||||
if (copyOutState->binary)
|
FreeExecutorState(executorState);
|
||||||
{
|
|
||||||
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
|
|
||||||
shardConnections->connectionList);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* close the COPY input on all shard placements */
|
|
||||||
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
EndCopyFrom(copyState);
|
|
||||||
heap_close(distributedRelation, NoLock);
|
heap_close(distributedRelation, NoLock);
|
||||||
|
|
||||||
/* mark failed placements as inactive */
|
/* mark failed placements as inactive */
|
||||||
|
@ -604,6 +475,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
|
||||||
errorCallback.arg = (void *) copyState;
|
errorCallback.arg = (void *) copyState;
|
||||||
errorCallback.previous = error_context_stack;
|
errorCallback.previous = error_context_stack;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* From here on we use copyStatement as the template for the command
|
||||||
|
* that we send to workers. This command does not have an attribute
|
||||||
|
* list since NextCopyFrom will generate a value for all columns.
|
||||||
|
*/
|
||||||
|
copyStatement->attlist = NIL;
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
bool nextRowFound = false;
|
bool nextRowFound = false;
|
||||||
|
@ -1074,22 +952,46 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop
|
||||||
|
|
||||||
char *shardName = pstrdup(relationName);
|
char *shardName = pstrdup(relationName);
|
||||||
char *shardQualifiedName = NULL;
|
char *shardQualifiedName = NULL;
|
||||||
const char *copyFormat = NULL;
|
|
||||||
|
|
||||||
AppendShardIdToName(&shardName, shardId);
|
AppendShardIdToName(&shardName, shardId);
|
||||||
|
|
||||||
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
shardQualifiedName = quote_qualified_identifier(schemaName, shardName);
|
||||||
|
|
||||||
|
appendStringInfo(command, "COPY %s ", shardQualifiedName);
|
||||||
|
|
||||||
|
if (copyStatement->attlist != NIL)
|
||||||
|
{
|
||||||
|
ListCell *columnNameCell = NULL;
|
||||||
|
bool appendedFirstName = false;
|
||||||
|
|
||||||
|
foreach(columnNameCell, copyStatement->attlist)
|
||||||
|
{
|
||||||
|
char *columnName = (char *) lfirst(columnNameCell);
|
||||||
|
|
||||||
|
if (!appendedFirstName)
|
||||||
|
{
|
||||||
|
appendStringInfo(command, "(%s", columnName);
|
||||||
|
appendedFirstName = true;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(command, ", %s", columnName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfoString(command, ") ");
|
||||||
|
}
|
||||||
|
|
||||||
|
appendStringInfo(command, "FROM STDIN WITH ");
|
||||||
|
|
||||||
if (useBinaryCopyFormat)
|
if (useBinaryCopyFormat)
|
||||||
{
|
{
|
||||||
copyFormat = "BINARY";
|
appendStringInfoString(command, "(FORMAT BINARY)");
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
copyFormat = "TEXT";
|
appendStringInfoString(command, "(FORMAT TEXT)");
|
||||||
}
|
}
|
||||||
appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName,
|
|
||||||
copyFormat);
|
|
||||||
|
|
||||||
return command;
|
return command;
|
||||||
}
|
}
|
||||||
|
@ -1277,7 +1179,6 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor,
|
||||||
{
|
{
|
||||||
CopySendInt16(rowOutputState, availableColumnCount);
|
CopySendInt16(rowOutputState, availableColumnCount);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++)
|
for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++)
|
||||||
{
|
{
|
||||||
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
|
Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex];
|
||||||
|
@ -1694,3 +1595,379 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer)
|
||||||
CopySendData(cstate, start, pointer - start);
|
CopySendData(cstate, start, pointer - start);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CreateCitusCopyDestReceiver creates a DestReceiver that copies into
|
||||||
|
* a distributed table.
|
||||||
|
*/
|
||||||
|
CitusCopyDestReceiver *
|
||||||
|
CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState,
|
||||||
|
bool stopOnFailure)
|
||||||
|
{
|
||||||
|
CitusCopyDestReceiver *copyDest = NULL;
|
||||||
|
|
||||||
|
copyDest = (CitusCopyDestReceiver *) palloc0(sizeof(CitusCopyDestReceiver));
|
||||||
|
|
||||||
|
/* set up the DestReceiver function pointers */
|
||||||
|
copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive;
|
||||||
|
copyDest->pub.rStartup = CitusCopyDestReceiverStartup;
|
||||||
|
copyDest->pub.rShutdown = CitusCopyDestReceiverShutdown;
|
||||||
|
copyDest->pub.rDestroy = CitusCopyDestReceiverDestroy;
|
||||||
|
copyDest->pub.mydest = DestCopyOut;
|
||||||
|
|
||||||
|
/* set up output parameters */
|
||||||
|
copyDest->distributedRelationId = tableId;
|
||||||
|
copyDest->columnNameList = columnNameList;
|
||||||
|
copyDest->executorState = executorState;
|
||||||
|
copyDest->stopOnFailure = stopOnFailure;
|
||||||
|
copyDest->memoryContext = CurrentMemoryContext;
|
||||||
|
|
||||||
|
return copyDest;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusCopyDestReceiverStartup implements the rStartup interface of
|
||||||
|
* CitusCopyDestReceiver. It opens the relation
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
TupleDesc inputTupleDescriptor)
|
||||||
|
{
|
||||||
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
|
||||||
|
|
||||||
|
Oid tableId = copyDest->distributedRelationId;
|
||||||
|
|
||||||
|
char *relationName = get_rel_name(tableId);
|
||||||
|
Oid schemaOid = get_rel_namespace(tableId);
|
||||||
|
char *schemaName = get_namespace_name(schemaOid);
|
||||||
|
|
||||||
|
Relation distributedRelation = NULL;
|
||||||
|
int columnIndex = 0;
|
||||||
|
List *columnNameList = copyDest->columnNameList;
|
||||||
|
|
||||||
|
ListCell *columnNameCell = NULL;
|
||||||
|
|
||||||
|
char partitionMethod = '\0';
|
||||||
|
Var *partitionColumn = PartitionColumn(tableId, 0);
|
||||||
|
int partitionColumnIndex = -1;
|
||||||
|
DistTableCacheEntry *cacheEntry = NULL;
|
||||||
|
|
||||||
|
CopyStmt *copyStatement = NULL;
|
||||||
|
|
||||||
|
List *shardIntervalList = NULL;
|
||||||
|
|
||||||
|
CopyOutState copyOutState = NULL;
|
||||||
|
const char *delimiterCharacter = "\t";
|
||||||
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
|
/* look up table properties */
|
||||||
|
distributedRelation = heap_open(tableId, RowExclusiveLock);
|
||||||
|
cacheEntry = DistributedTableCacheEntry(tableId);
|
||||||
|
partitionMethod = cacheEntry->partitionMethod;
|
||||||
|
|
||||||
|
copyDest->distributedRelation = distributedRelation;
|
||||||
|
copyDest->tupleDescriptor = inputTupleDescriptor;
|
||||||
|
|
||||||
|
/* we don't support copy to reference tables from workers */
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_NONE)
|
||||||
|
{
|
||||||
|
EnsureCoordinator();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* load the list of shards and verify that we have shards to copy into */
|
||||||
|
shardIntervalList = LoadShardIntervalList(tableId);
|
||||||
|
if (shardIntervalList == NIL)
|
||||||
|
{
|
||||||
|
if (partitionMethod == DISTRIBUTE_BY_HASH)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("could not find any shards into which to copy"),
|
||||||
|
errdetail("No shards exist for distributed table \"%s\".",
|
||||||
|
relationName),
|
||||||
|
errhint("Run master_create_worker_shards to create shards "
|
||||||
|
"and try again.")));
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("could not find any shards into which to copy"),
|
||||||
|
errdetail("No shards exist for distributed table \"%s\".",
|
||||||
|
relationName)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* error if any shard missing min/max values */
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_NONE &&
|
||||||
|
cacheEntry->hasUninitializedShardInterval)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("could not start copy"),
|
||||||
|
errdetail("Distributed relation \"%s\" has shards "
|
||||||
|
"with missing shardminvalue/shardmaxvalue.",
|
||||||
|
relationName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* prevent concurrent placement changes and non-commutative DML statements */
|
||||||
|
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||||
|
LockShardListResources(shardIntervalList, ShareLock);
|
||||||
|
|
||||||
|
/* keep the table metadata to avoid looking it up for every tuple */
|
||||||
|
copyDest->tableMetadata = cacheEntry;
|
||||||
|
|
||||||
|
/* determine whether to use binary search */
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution)
|
||||||
|
{
|
||||||
|
copyDest->useBinarySearch = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC)
|
||||||
|
{
|
||||||
|
CoordinatedTransactionUse2PC();
|
||||||
|
}
|
||||||
|
|
||||||
|
/* define how tuples will be serialised */
|
||||||
|
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
|
||||||
|
copyOutState->delim = (char *) delimiterCharacter;
|
||||||
|
copyOutState->null_print = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->null_print_client = (char *) nullPrintCharacter;
|
||||||
|
copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor, copyOutState);
|
||||||
|
copyOutState->fe_msgbuf = makeStringInfo();
|
||||||
|
copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState);
|
||||||
|
copyDest->copyOutState = copyOutState;
|
||||||
|
|
||||||
|
/* prepare output functions */
|
||||||
|
copyDest->columnOutputFunctions =
|
||||||
|
ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary);
|
||||||
|
|
||||||
|
/* find the partition column index in the column list */
|
||||||
|
foreach(columnNameCell, columnNameList)
|
||||||
|
{
|
||||||
|
char *columnName = (char *) lfirst(columnNameCell);
|
||||||
|
|
||||||
|
/* load the column information from pg_attribute */
|
||||||
|
AttrNumber attrNumber = get_attnum(tableId, columnName);
|
||||||
|
|
||||||
|
/* check whether this is the partition column */
|
||||||
|
if (partitionColumn != NULL && attrNumber == partitionColumn->varattno)
|
||||||
|
{
|
||||||
|
Assert(partitionColumnIndex == -1);
|
||||||
|
|
||||||
|
partitionColumnIndex = columnIndex;
|
||||||
|
}
|
||||||
|
|
||||||
|
columnIndex++;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
errmsg("the partition column of table %s should have a value",
|
||||||
|
quote_qualified_identifier(schemaName, relationName))));
|
||||||
|
}
|
||||||
|
|
||||||
|
copyDest->partitionColumnIndex = partitionColumnIndex;
|
||||||
|
|
||||||
|
/* define the template for the COPY statement that is sent to workers */
|
||||||
|
copyStatement = makeNode(CopyStmt);
|
||||||
|
copyStatement->relation = makeRangeVar(schemaName, relationName, -1);
|
||||||
|
copyStatement->query = NULL;
|
||||||
|
copyStatement->attlist = columnNameList;
|
||||||
|
copyStatement->is_from = true;
|
||||||
|
copyStatement->is_program = false;
|
||||||
|
copyStatement->filename = NULL;
|
||||||
|
copyStatement->options = NIL;
|
||||||
|
copyDest->copyStatement = copyStatement;
|
||||||
|
|
||||||
|
copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusCopyDestReceiverReceive implements the receiveSlot function of
|
||||||
|
* CitusCopyDestReceiver. It takes a TupleTableSlot and sends the contents to
|
||||||
|
* the appropriate shard placement(s).
|
||||||
|
*/
|
||||||
|
#if PG_VERSION_NUM >= 90600
|
||||||
|
static bool
|
||||||
|
#else
|
||||||
|
static void
|
||||||
|
#endif
|
||||||
|
CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
|
||||||
|
{
|
||||||
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest;
|
||||||
|
|
||||||
|
DistTableCacheEntry *tableMetadata = copyDest->tableMetadata;
|
||||||
|
char partitionMethod = tableMetadata->partitionMethod;
|
||||||
|
int partitionColumnIndex = copyDest->partitionColumnIndex;
|
||||||
|
TupleDesc tupleDescriptor = copyDest->tupleDescriptor;
|
||||||
|
CopyStmt *copyStatement = copyDest->copyStatement;
|
||||||
|
|
||||||
|
int shardCount = tableMetadata->shardIntervalArrayLength;
|
||||||
|
ShardInterval **shardIntervalCache = tableMetadata->sortedShardIntervalArray;
|
||||||
|
|
||||||
|
bool useBinarySearch = copyDest->useBinarySearch;
|
||||||
|
FmgrInfo *hashFunction = tableMetadata->hashFunction;
|
||||||
|
FmgrInfo *compareFunction = tableMetadata->shardIntervalCompareFunction;
|
||||||
|
|
||||||
|
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
||||||
|
CopyOutState copyOutState = copyDest->copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions;
|
||||||
|
|
||||||
|
bool stopOnFailure = copyDest->stopOnFailure;
|
||||||
|
|
||||||
|
Datum *columnValues = NULL;
|
||||||
|
bool *columnNulls = NULL;
|
||||||
|
|
||||||
|
Datum partitionColumnValue = 0;
|
||||||
|
ShardInterval *shardInterval = NULL;
|
||||||
|
int64 shardId = 0;
|
||||||
|
|
||||||
|
bool shardConnectionsFound = false;
|
||||||
|
ShardConnections *shardConnections = NULL;
|
||||||
|
|
||||||
|
EState *executorState = copyDest->executorState;
|
||||||
|
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState);
|
||||||
|
MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext);
|
||||||
|
|
||||||
|
slot_getallattrs(slot);
|
||||||
|
|
||||||
|
columnValues = slot->tts_values;
|
||||||
|
columnNulls = slot->tts_isnull;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find the partition column value and corresponding shard interval
|
||||||
|
* for non-reference tables.
|
||||||
|
* Get the existing (and only a single) shard interval for the reference
|
||||||
|
* tables. Note that, reference tables has NULL partition column values so
|
||||||
|
* skip the check.
|
||||||
|
*/
|
||||||
|
if (partitionColumnIndex >= 0)
|
||||||
|
{
|
||||||
|
if (columnNulls[partitionColumnIndex])
|
||||||
|
{
|
||||||
|
Oid relationId = copyDest->distributedRelationId;
|
||||||
|
char *relationName = get_rel_name(relationId);
|
||||||
|
Oid schemaOid = get_rel_namespace(relationId);
|
||||||
|
char *schemaName = get_namespace_name(schemaOid);
|
||||||
|
char *qualifiedTableName = quote_qualified_identifier(schemaName,
|
||||||
|
relationName);
|
||||||
|
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
|
||||||
|
errmsg("the partition column of table %s should have a value",
|
||||||
|
qualifiedTableName)));
|
||||||
|
}
|
||||||
|
|
||||||
|
/* find the partition column value */
|
||||||
|
partitionColumnValue = columnValues[partitionColumnIndex];
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Find the shard interval and id for the partition column value for
|
||||||
|
* non-reference tables.
|
||||||
|
*
|
||||||
|
* For reference table, this function blindly returns the tables single
|
||||||
|
* shard.
|
||||||
|
*/
|
||||||
|
shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache,
|
||||||
|
shardCount, partitionMethod,
|
||||||
|
compareFunction, hashFunction,
|
||||||
|
useBinarySearch);
|
||||||
|
if (shardInterval == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
|
||||||
|
errmsg("could not find shard for partition column "
|
||||||
|
"value")));
|
||||||
|
}
|
||||||
|
|
||||||
|
shardId = shardInterval->shardId;
|
||||||
|
|
||||||
|
/* connections hash is kept in memory context */
|
||||||
|
MemoryContextSwitchTo(copyDest->memoryContext);
|
||||||
|
|
||||||
|
/* get existing connections to the shard placements, if any */
|
||||||
|
shardConnections = GetShardHashConnections(shardConnectionHash, shardId,
|
||||||
|
&shardConnectionsFound);
|
||||||
|
if (!shardConnectionsFound)
|
||||||
|
{
|
||||||
|
/* open connections and initiate COPY on shard placements */
|
||||||
|
OpenCopyConnections(copyStatement, shardConnections, stopOnFailure,
|
||||||
|
copyOutState->binary);
|
||||||
|
|
||||||
|
/* send copy binary headers to shard placements */
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
SendCopyBinaryHeaders(copyOutState, shardId,
|
||||||
|
shardConnections->connectionList);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* replicate row to shard placements */
|
||||||
|
resetStringInfo(copyOutState->fe_msgbuf);
|
||||||
|
AppendCopyRowData(columnValues, columnNulls, tupleDescriptor,
|
||||||
|
copyOutState, columnOutputFunctions);
|
||||||
|
SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList);
|
||||||
|
|
||||||
|
MemoryContextSwitchTo(oldContext);
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 90600
|
||||||
|
return true;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* CitusCopyDestReceiverShutdown implements the rShutdown interface of
|
||||||
|
* CitusCopyDestReceiver. It ends the COPY on all the open connections and closes
|
||||||
|
* the relation.
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
HTAB *shardConnectionHash = copyDest->shardConnectionHash;
|
||||||
|
List *shardConnectionsList = NIL;
|
||||||
|
ListCell *shardConnectionsCell = NULL;
|
||||||
|
CopyOutState copyOutState = copyDest->copyOutState;
|
||||||
|
Relation distributedRelation = copyDest->distributedRelation;
|
||||||
|
|
||||||
|
shardConnectionsList = ShardConnectionList(shardConnectionHash);
|
||||||
|
foreach(shardConnectionsCell, shardConnectionsList)
|
||||||
|
{
|
||||||
|
ShardConnections *shardConnections = (ShardConnections *) lfirst(
|
||||||
|
shardConnectionsCell);
|
||||||
|
|
||||||
|
/* send copy binary footers to all shard placements */
|
||||||
|
if (copyOutState->binary)
|
||||||
|
{
|
||||||
|
SendCopyBinaryFooters(copyOutState, shardConnections->shardId,
|
||||||
|
shardConnections->connectionList);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* close the COPY input on all shard placements */
|
||||||
|
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
heap_close(distributedRelation, NoLock);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
static void
|
||||||
|
CitusCopyDestReceiverDestroy(DestReceiver *destReceiver)
|
||||||
|
{
|
||||||
|
CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver;
|
||||||
|
|
||||||
|
if (copyDest->copyOutState)
|
||||||
|
{
|
||||||
|
pfree(copyDest->copyOutState);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (copyDest->columnOutputFunctions)
|
||||||
|
{
|
||||||
|
pfree(copyDest->columnOutputFunctions);
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(copyDest);
|
||||||
|
}
|
||||||
|
|
|
@ -13,7 +13,12 @@
|
||||||
#define MULTI_COPY_H
|
#define MULTI_COPY_H
|
||||||
|
|
||||||
|
|
||||||
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
#include "distributed/metadata_cache.h"
|
||||||
|
#include "nodes/execnodes.h"
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
|
#include "tcop/dest.h"
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
* A smaller version of copy.c's CopyStateData, trimmed to the elements
|
||||||
|
@ -43,8 +48,51 @@ typedef struct NodeAddress
|
||||||
int32 nodePort;
|
int32 nodePort;
|
||||||
} NodeAddress;
|
} NodeAddress;
|
||||||
|
|
||||||
|
/* CopyDestReceiver can be used to stream results into a distributed table */
|
||||||
|
typedef struct CitusCopyDestReceiver
|
||||||
|
{
|
||||||
|
/* public DestReceiver interface */
|
||||||
|
DestReceiver pub;
|
||||||
|
|
||||||
|
/* relation and columns to which to copy */
|
||||||
|
Oid distributedRelationId;
|
||||||
|
List *columnNameList;
|
||||||
|
int partitionColumnIndex;
|
||||||
|
|
||||||
|
/* distributed table metadata */
|
||||||
|
DistTableCacheEntry *tableMetadata;
|
||||||
|
bool useBinarySearch;
|
||||||
|
|
||||||
|
/* open relation handle */
|
||||||
|
Relation distributedRelation;
|
||||||
|
|
||||||
|
/* descriptor of the tuples that are sent to the worker */
|
||||||
|
TupleDesc tupleDescriptor;
|
||||||
|
|
||||||
|
/* EState for per-tuple memory allocation */
|
||||||
|
EState *executorState;
|
||||||
|
|
||||||
|
/* MemoryContext for DestReceiver session */
|
||||||
|
MemoryContext memoryContext;
|
||||||
|
|
||||||
|
/* template for COPY statement to send to workers */
|
||||||
|
CopyStmt *copyStatement;
|
||||||
|
|
||||||
|
/* cached shard metadata for pruning */
|
||||||
|
HTAB *shardConnectionHash;
|
||||||
|
bool stopOnFailure;
|
||||||
|
|
||||||
|
/* state on how to copy out data types */
|
||||||
|
CopyOutState copyOutState;
|
||||||
|
FmgrInfo *columnOutputFunctions;
|
||||||
|
} CitusCopyDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
|
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
|
List *columnNameList,
|
||||||
|
EState *executorState,
|
||||||
|
bool stopOnFailure);
|
||||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||||
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
|
||||||
TupleDesc rowDescriptor,
|
TupleDesc rowDescriptor,
|
||||||
|
|
|
@ -75,12 +75,6 @@ CREATE TABLE nation (
|
||||||
n_name char(25) not null,
|
n_name char(25) not null,
|
||||||
n_regionkey integer not null,
|
n_regionkey integer not null,
|
||||||
n_comment varchar(152));
|
n_comment varchar(152));
|
||||||
\COPY nation FROM STDIN WITH CSV
|
|
||||||
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
|
|
||||||
ERROR: cannot distribute relation "nation"
|
|
||||||
DETAIL: Relation "nation" contains data.
|
|
||||||
HINT: Empty your table before distributing it.
|
|
||||||
TRUNCATE nation;
|
|
||||||
SELECT create_reference_table('nation');
|
SELECT create_reference_table('nation');
|
||||||
create_reference_table
|
create_reference_table
|
||||||
------------------------
|
------------------------
|
||||||
|
@ -360,6 +354,155 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl
|
||||||
|
|
||||||
DROP TABLE repmodel_test;
|
DROP TABLE repmodel_test;
|
||||||
RESET citus.replication_model;
|
RESET citus.replication_model;
|
||||||
|
-- Test initial data loading
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
-- table must be empty when using append- or range-partitioning
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'append');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'range');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
-- table must be empty when using master_create_distributed_table (no shards created)
|
||||||
|
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
|
||||||
|
ERROR: cannot distribute relation "data_load_test"
|
||||||
|
DETAIL: Relation "data_load_test" contains data.
|
||||||
|
HINT: Empty your table before distributing it.
|
||||||
|
-- create_distributed_table creates shards and copies data into the distributed table
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
col1 | col2 | col3
|
||||||
|
------+-------+------
|
||||||
|
132 | hello | 1
|
||||||
|
243 | world | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
-- ensure writes in the same transaction as create_distributed_table are visible
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
END;
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
col1 | col2 | col3
|
||||||
|
------+-------+------
|
||||||
|
132 | hello | 1
|
||||||
|
243 | world | 2
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
-- creating co-located distributed tables in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test1 VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test1', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test2 VALUES (132, 'world');
|
||||||
|
SELECT create_distributed_table('data_load_test2', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT a.col2 ||' '|| b.col2
|
||||||
|
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
|
||||||
|
WHERE col1 = 132;
|
||||||
|
?column?
|
||||||
|
-------------
|
||||||
|
hello world
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test1, data_load_test2;
|
||||||
|
END;
|
||||||
|
-- creating an index after loading data works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE INDEX data_load_test_idx ON data_load_test (col2);
|
||||||
|
END;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
-- popping in and out of existence in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
END;
|
||||||
|
-- but dropping after a write on the distributed table is currently disallowed
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications
|
||||||
|
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
|
||||||
|
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
|
||||||
|
END;
|
||||||
|
-- Test data loading after dropping a column
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||||
|
ALTER TABLE data_load_test DROP COLUMN col2;
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
|
create_distributed_table
|
||||||
|
--------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT * FROM data_load_test;
|
||||||
|
col1 | col3
|
||||||
|
------+-------
|
||||||
|
132 | world
|
||||||
|
243 | hello
|
||||||
|
(2 rows)
|
||||||
|
|
||||||
|
DROP TABLE data_load_test;
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
SET citus.shard_count to 4;
|
SET citus.shard_count to 4;
|
||||||
CREATE TABLE lineitem_hash_part (like lineitem);
|
CREATE TABLE lineitem_hash_part (like lineitem);
|
||||||
|
|
|
@ -2,15 +2,10 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000;
|
||||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
|
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
|
||||||
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
|
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
|
||||||
-- insert some data, and make sure that cannot be create_distributed_table
|
-- insert some data, and make sure that cannot be create_distributed_table
|
||||||
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05');
|
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
|
||||||
-- should error out given that there exists data
|
-- create the reference table
|
||||||
SELECT create_reference_table('reference_table_test');
|
|
||||||
ERROR: cannot distribute relation "reference_table_test"
|
|
||||||
DETAIL: Relation "reference_table_test" contains data.
|
|
||||||
HINT: Empty your table before distributing it.
|
|
||||||
TRUNCATE reference_table_test;
|
|
||||||
-- now should be able to create the reference table
|
|
||||||
SELECT create_reference_table('reference_table_test');
|
SELECT create_reference_table('reference_table_test');
|
||||||
|
NOTICE: Copying data from local table...
|
||||||
create_reference_table
|
create_reference_table
|
||||||
------------------------
|
------------------------
|
||||||
|
|
||||||
|
@ -52,8 +47,14 @@ WHERE
|
||||||
1250000 | 1 | localhost | 57638
|
1250000 | 1 | localhost | 57638
|
||||||
(2 rows)
|
(2 rows)
|
||||||
|
|
||||||
|
-- check whether data was copied into distributed table
|
||||||
|
SELECT * FROM reference_table_test;
|
||||||
|
value_1 | value_2 | value_3 | value_4
|
||||||
|
---------+---------+---------+--------------------------
|
||||||
|
1 | 1 | 1 | Thu Dec 01 00:00:00 2016
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- now, execute some modification queries
|
-- now, execute some modification queries
|
||||||
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
|
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
|
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
|
||||||
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
|
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
|
||||||
|
|
|
@ -723,7 +723,7 @@ CREATE TABLE numbers_hash(a int, b int);
|
||||||
SELECT create_distributed_table('numbers_hash', 'a');
|
SELECT create_distributed_table('numbers_hash', 'a');
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
ALTER TABLE numbers_hash_560180 ADD COLUMN c int;
|
ALTER TABLE numbers_hash_560180 DROP COLUMN b;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
|
|
||||||
-- operation will fail to modify a shard and roll back
|
-- operation will fail to modify a shard and roll back
|
||||||
|
@ -739,7 +739,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
||||||
\.
|
\.
|
||||||
|
|
||||||
-- verify no row is inserted
|
-- verify no row is inserted
|
||||||
SELECT * FROM numbers_hash;
|
SELECT count(a) FROM numbers_hash;
|
||||||
|
|
||||||
-- verify shard is still marked as valid
|
-- verify shard is still marked as valid
|
||||||
SELECT shardid, shardstate, nodename, nodeport
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
|
|
@ -978,17 +978,19 @@ SELECT create_distributed_table('numbers_hash', 'a');
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
\c - - - :worker_1_port
|
\c - - - :worker_1_port
|
||||||
ALTER TABLE numbers_hash_560180 ADD COLUMN c int;
|
ALTER TABLE numbers_hash_560180 DROP COLUMN b;
|
||||||
\c - - - :master_port
|
\c - - - :master_port
|
||||||
-- operation will fail to modify a shard and roll back
|
-- operation will fail to modify a shard and roll back
|
||||||
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
|
||||||
ERROR: row field count is 2, expected 3
|
ERROR: column "b" of relation "numbers_hash_560180" does not exist
|
||||||
DETAIL: (null)
|
CONTEXT: while executing command on localhost:57637
|
||||||
|
COPY numbers_hash, line 1: "1,1"
|
||||||
-- verify no row is inserted
|
-- verify no row is inserted
|
||||||
SELECT * FROM numbers_hash;
|
SELECT count(a) FROM numbers_hash;
|
||||||
a | b
|
count
|
||||||
---+---
|
-------
|
||||||
(0 rows)
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
-- verify shard is still marked as valid
|
-- verify shard is still marked as valid
|
||||||
SELECT shardid, shardstate, nodename, nodeport
|
SELECT shardid, shardstate, nodename, nodeport
|
||||||
|
|
|
@ -60,18 +60,6 @@ CREATE TABLE nation (
|
||||||
n_regionkey integer not null,
|
n_regionkey integer not null,
|
||||||
n_comment varchar(152));
|
n_comment varchar(152));
|
||||||
|
|
||||||
\COPY nation FROM STDIN WITH CSV
|
|
||||||
1,'name',1,'comment_1'
|
|
||||||
2,'name',2,'comment_2'
|
|
||||||
3,'name',3,'comment_3'
|
|
||||||
4,'name',4,'comment_4'
|
|
||||||
5,'name',5,'comment_5'
|
|
||||||
\.
|
|
||||||
|
|
||||||
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
|
|
||||||
|
|
||||||
TRUNCATE nation;
|
|
||||||
|
|
||||||
SELECT create_reference_table('nation');
|
SELECT create_reference_table('nation');
|
||||||
|
|
||||||
CREATE TABLE part (
|
CREATE TABLE part (
|
||||||
|
@ -201,6 +189,86 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl
|
||||||
DROP TABLE repmodel_test;
|
DROP TABLE repmodel_test;
|
||||||
|
|
||||||
RESET citus.replication_model;
|
RESET citus.replication_model;
|
||||||
|
|
||||||
|
-- Test initial data loading
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
|
||||||
|
-- table must be empty when using append- or range-partitioning
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'append');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1', 'range');
|
||||||
|
|
||||||
|
-- table must be empty when using master_create_distributed_table (no shards created)
|
||||||
|
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
|
||||||
|
|
||||||
|
-- create_distributed_table creates shards and copies data into the distributed table
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
|
-- ensure writes in the same transaction as create_distributed_table are visible
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
END;
|
||||||
|
SELECT * FROM data_load_test ORDER BY col1;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
|
-- creating co-located distributed tables in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test1 VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test1', 'col1');
|
||||||
|
|
||||||
|
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test2 VALUES (132, 'world');
|
||||||
|
SELECT create_distributed_table('data_load_test2', 'col1');
|
||||||
|
|
||||||
|
SELECT a.col2 ||' '|| b.col2
|
||||||
|
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
|
||||||
|
WHERE col1 = 132;
|
||||||
|
|
||||||
|
DROP TABLE data_load_test1, data_load_test2;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- creating an index after loading data works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
CREATE INDEX data_load_test_idx ON data_load_test (col2);
|
||||||
|
END;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
|
-- popping in and out of existence in the same transaction works
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- but dropping after a write on the distributed table is currently disallowed
|
||||||
|
BEGIN;
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello');
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world');
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
END;
|
||||||
|
|
||||||
|
-- Test data loading after dropping a column
|
||||||
|
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
|
||||||
|
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
|
||||||
|
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
|
||||||
|
ALTER TABLE data_load_test DROP COLUMN col2;
|
||||||
|
SELECT create_distributed_table('data_load_test', 'col1');
|
||||||
|
SELECT * FROM data_load_test;
|
||||||
|
DROP TABLE data_load_test;
|
||||||
|
|
||||||
SET citus.shard_replication_factor TO default;
|
SET citus.shard_replication_factor TO default;
|
||||||
|
|
||||||
SET citus.shard_count to 4;
|
SET citus.shard_count to 4;
|
||||||
|
|
|
@ -4,14 +4,9 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
|
||||||
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
|
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
|
||||||
|
|
||||||
-- insert some data, and make sure that cannot be create_distributed_table
|
-- insert some data, and make sure that cannot be create_distributed_table
|
||||||
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05');
|
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
|
||||||
|
|
||||||
-- should error out given that there exists data
|
-- create the reference table
|
||||||
SELECT create_reference_table('reference_table_test');
|
|
||||||
|
|
||||||
TRUNCATE reference_table_test;
|
|
||||||
|
|
||||||
-- now should be able to create the reference table
|
|
||||||
SELECT create_reference_table('reference_table_test');
|
SELECT create_reference_table('reference_table_test');
|
||||||
|
|
||||||
-- see that partkey is NULL
|
-- see that partkey is NULL
|
||||||
|
@ -36,8 +31,10 @@ FROM
|
||||||
WHERE
|
WHERE
|
||||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass);
|
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass);
|
||||||
|
|
||||||
|
-- check whether data was copied into distributed table
|
||||||
|
SELECT * FROM reference_table_test;
|
||||||
|
|
||||||
-- now, execute some modification queries
|
-- now, execute some modification queries
|
||||||
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
|
|
||||||
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
|
||||||
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
|
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
|
||||||
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
|
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');
|
||||||
|
|
Loading…
Reference in New Issue