diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 43dd3a7ee..4198837b2 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,6 +9,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -39,11 +40,14 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" +#include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" +#include "executor/executor.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" @@ -52,10 +56,14 @@ #include "parser/parse_node.h" #include "parser/parse_relation.h" #include "parser/parser.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/inval.h" @@ -65,10 +73,10 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ -static void CreateReferenceTable(Oid relationId); +static void CreateReferenceTable(Oid distributedRelationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId); + uint32 colocationId, bool requireEmpty); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -83,6 +91,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); +static void CopyLocalDataIntoShards(Oid relationId); +static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -106,6 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + bool requireEmpty = true; EnsureCoordinator(); @@ -121,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, requireEmpty); PG_RETURN_VOID(); } @@ -177,6 +188,8 @@ create_distributed_table(PG_FUNCTION_ARGS) /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { + bool requireEmpty = true; + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { ereport(NOTICE, (errmsg("using statement-based replication"), @@ -186,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(relationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, requireEmpty); PG_RETURN_VOID(); } @@ -232,6 +245,8 @@ CreateReferenceTable(Oid relationId) List *workerNodeList = WorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; + bool requireEmpty = true; + char relationKind = 0; EnsureCoordinator(); @@ -245,23 +260,38 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } + /* relax empty table requirement for regular (non-foreign) tables */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + requireEmpty = false; + } + colocationId = CreateReferenceTableColocationId(); /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); + DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId, + requireEmpty); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); CreateTableMetadataOnWorkers(relationId); + + /* copy over data for regular relations */ + if (relationKind == RELKIND_RELATION) + { + CopyLocalDataIntoShards(relationId); + } } /* * ConvertToDistributedTable converts the given regular PostgreSQL table into a * distributed table. First, it checks if the given table can be distributed, - * then it creates related tuple in pg_dist_partition. + * then it creates related tuple in pg_dist_partition. If requireEmpty is true, + * this function errors out when presented with a relation containing rows. * * XXX: We should perform more checks here to see if this table is fit for * partitioning. At a minimum, we should validate the following: (i) this node @@ -272,7 +302,7 @@ CreateReferenceTable(Oid relationId) static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId) + uint32 colocationId, bool requireEmpty) { Relation relation = NULL; TupleDesc relationDesc = NULL; @@ -322,8 +352,8 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } - /* check that the relation does not contain any rows */ - if (!LocalTableEmpty(relationId)) + /* check that table is empty if that is required */ + if (requireEmpty && !LocalTableEmpty(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", @@ -915,6 +945,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; + bool requireEmpty = true; + char relationKind = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); @@ -957,9 +989,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, colocationId = TableColocationId(sourceRelationId); } + /* relax empty table requirement for regular (non-foreign) tables */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + requireEmpty = false; + } + /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId); + ReplicationModel, colocationId, requireEmpty); /* create shards */ if (sourceRelationId != InvalidOid) @@ -976,6 +1015,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } + /* copy over data for regular relations */ + if (relationKind == RELKIND_RELATION) + { + CopyLocalDataIntoShards(relationId); + } + heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } @@ -1021,3 +1066,143 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) "factor\" to one%s.", extraHint))); } } + + +/* + * CopyLocalDataIntoShards copies data from the local table, which is hidden + * after converting it to a distributed table, into the shards of the distributed + * table. + * + * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic. + * We cannot use a regular COPY here since that cannot read from a table. Instead + * we read from the table and pass each tuple to the CitusCopyDestReceiver which + * opens a connection and starts a COPY for each shard placement that will have + * data. + * + * We could call the planner and executor here and send the output to the + * DestReceiver, but we are in a tricky spot here since Citus is already + * intercepting queries on this table in the planner and executor hooks and we + * want to read from the local table. To keep it simple, we perform a heap scan + * directly on the table. + * + * Any writes on the table that are started during this operation will be handled + * as distributed queries once the current transaction commits. SELECTs will + * continue to read from the local table until the current transaction commits, + * after which new SELECTs will be handled as distributed queries. + * + * After copying local data into the distributed table, the local data remains + * in place and should be truncated at a later time. + */ +static void +CopyLocalDataIntoShards(Oid distributedRelationId) +{ + DestReceiver *copyDest = NULL; + List *columnNameList = NIL; + Relation distributedRelation = NULL; + TupleDesc tupleDescriptor = NULL; + bool stopOnFailure = true; + + EState *estate = NULL; + HeapScanDesc scan = NULL; + HeapTuple tuple = NULL; + ExprContext *econtext = NULL; + MemoryContext oldContext = NULL; + TupleTableSlot *slot = NULL; + uint64 rowsCopied = 0; + + /* take an ExclusiveLock to block all operations except SELECT */ + distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + + /* get the table columns */ + tupleDescriptor = RelationGetDescr(distributedRelation); + slot = MakeSingleTupleTableSlot(tupleDescriptor); + columnNameList = TupleDescColumnNameList(tupleDescriptor); + + /* initialise per-tuple memory context */ + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = slot; + + copyDest = + (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, + columnNameList, estate, + stopOnFailure); + + /* initialise state for writing to shards, we'll open connections on demand */ + copyDest->rStartup(copyDest, 0, tupleDescriptor); + + /* begin reading from local table */ + scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + /* materialize tuple and send it to a shard */ + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + copyDest->receiveSlot(slot, copyDest); + + /* clear tuple memory */ + ResetPerTupleExprContext(estate); + + /* make sure we roll back on cancellation */ + CHECK_FOR_INTERRUPTS(); + + if (rowsCopied == 0) + { + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + } + + if (rowsCopied % 1000000 != 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + + MemoryContextSwitchTo(oldContext); + + /* finish reading from the local table */ + heap_endscan(scan); + + /* finish writing into the shards */ + copyDest->rShutdown(copyDest); + + /* free memory and close the relation */ + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); +} + + +/* + * TupleDescColumnNameList returns a list of column names for the given tuple + * descriptor as plain strings. + */ +static List * +TupleDescColumnNameList(TupleDesc tupleDescriptor) +{ + List *columnNameList = NIL; + int columnIndex = 0; + + for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + return columnNameList; +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bfc73c676..68fc0b848 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -68,6 +68,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/executor.h" +#include "nodes/makefuncs.h" #include "tsearch/ts_locale.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -127,6 +128,19 @@ static void CopySendInt16(CopyOutState outputState, int16 val); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); +/* CitusCopyDestReceiver functions */ +static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, + TupleDesc inputTupleDesc); +#if PG_VERSION_NUM >= 90600 +static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *copyDest); +#else +static void CitusCopyDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *copyDest); +#endif +static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); +static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); + /* * CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy @@ -273,49 +287,31 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) { Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char *relationName = get_rel_name(tableId); + + CitusCopyDestReceiver *copyDest = NULL; + DestReceiver *dest = NULL; + Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; - FmgrInfo *hashFunction = NULL; - FmgrInfo *compareFunction = NULL; - bool hasUniformHashDistribution = false; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; - - int shardCount = 0; - List *shardIntervalList = NULL; - ShardInterval **shardIntervalCache = NULL; - bool useBinarySearch = false; - - HTAB *shardConnectionHash = NULL; - ShardConnections *shardConnections = NULL; - List *shardConnectionsList = NIL; - ListCell *shardConnectionsCell = NULL; + int columnIndex = 0; + List *columnNameList = NIL; + TupleTableSlot *tupleTableSlot = NULL; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; + char partitionMethod = 0; + bool stopOnFailure = false; + CopyState copyState = NULL; - CopyOutState copyOutState = NULL; - FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0; - Var *partitionColumn = PartitionColumn(tableId, 0); - char partitionMethod = PartitionMethod(tableId); - ErrorContextCallback errorCallback; - /* get hash function for partition column */ - hashFunction = cacheEntry->hashFunction; - - /* get compare function for shard intervals */ - compareFunction = cacheEntry->shardIntervalCompareFunction; - /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); tupleDescriptor = RelationGetDescr(distributedRelation); @@ -323,64 +319,40 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - /* we don't support copy to reference tables from workers */ + /* set up a virtual tuple table slot */ + tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); + tupleTableSlot->tts_nvalid = columnCount; + tupleTableSlot->tts_values = columnValues; + tupleTableSlot->tts_isnull = columnNulls; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + executorState = CreateExecutorState(); + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); + + partitionMethod = PartitionMethod(tableId); if (partitionMethod == DISTRIBUTE_BY_NONE) { - EnsureCoordinator(); + stopOnFailure = true; } - /* load the list of shards and verify that we have shards to copy into */ - shardIntervalList = LoadShardIntervalList(tableId); - if (shardIntervalList == NIL) - { - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName), - errhint("Run master_create_worker_shards to create shards " - "and try again."))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName))); - } - } - - /* error if any shard missing min/max values for non reference tables */ - if (partitionMethod != DISTRIBUTE_BY_NONE && - cacheEntry->hasUninitializedShardInterval) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not start copy"), - errdetail("Distributed relation \"%s\" has shards " - "with missing shardminvalue/shardmaxvalue.", - relationName))); - } - - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - LockShardListResources(shardIntervalList, ShareLock); - - /* initialize the shard interval cache */ - shardCount = cacheEntry->shardIntervalArrayLength; - shardIntervalCache = cacheEntry->sortedShardIntervalArray; - hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; - - /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) - { - useBinarySearch = true; - } - - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) - { - CoordinatedTransactionUse2PC(); - } + /* set up the destination for the COPY */ + copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, + stopOnFailure); + dest = (DestReceiver *) copyDest; + dest->rStartup(dest, 0, tupleDescriptor); /* initialize copy state to read from COPY data source */ copyState = BeginCopyFrom(distributedRelation, @@ -389,23 +361,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) copyStatement->attlist, copyStatement->options); - executorState = CreateExecutorState(); - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); - - copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = executorTupleContext; - - columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); - - /* create a mapping of shard id to a connection for each of its placements */ - shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); - /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; errorCallback.arg = (void *) copyState; @@ -415,10 +370,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) while (true) { bool nextRowFound = false; - Datum partitionColumnValue = 0; - ShardInterval *shardInterval = NULL; - int64 shardId = 0; - bool shardConnectionsFound = false; MemoryContext oldContext = NULL; ResetPerTupleExprContext(executorState); @@ -437,103 +388,23 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CHECK_FOR_INTERRUPTS(); - /* - * Find the partition column value and corresponding shard interval - * for non-reference tables. - * Get the existing (and only a single) shard interval for the reference - * tables. Note that, reference tables has NULL partition column values so - * skip the check. - */ - if (partitionColumn != NULL) - { - if (columnNulls[partitionColumn->varattno - 1]) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot copy row with NULL value " - "in partition column"))); - } - - partitionColumnValue = columnValues[partitionColumn->varattno - 1]; - } - - /* - * Find the shard interval and id for the partition column value for - * non-reference tables. - * For reference table, this function blindly returns the tables single - * shard. - */ - shardInterval = FindShardInterval(partitionColumnValue, - shardIntervalCache, - shardCount, partitionMethod, - compareFunction, hashFunction, - useBinarySearch); - - if (shardInterval == NULL) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find shard for partition column " - "value"))); - } - - shardId = shardInterval->shardId; - MemoryContextSwitchTo(oldContext); - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(shardConnectionHash, shardId, - &shardConnectionsFound); - if (!shardConnectionsFound) - { - bool stopOnFailure = false; - - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - stopOnFailure = true; - } - - /* open connections and initiate COPY on shard placements */ - OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, - copyOutState->binary); - - /* send copy binary headers to shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryHeaders(copyOutState, shardId, - shardConnections->connectionList); - } - } - - /* replicate row to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); - SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, - shardConnections->connectionList); + dest->receiveSlot(tupleTableSlot, dest); processedRowCount += 1; } + EndCopyFrom(copyState); + /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; - shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) - { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); + /* finish the COPY commands */ + dest->rShutdown(dest); - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } - - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); - } - - EndCopyFrom(copyState); + ExecDropSingleTupleTableSlot(tupleTableSlot); + FreeExecutorState(executorState); heap_close(distributedRelation, NoLock); /* mark failed placements as inactive */ @@ -604,6 +475,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) errorCallback.arg = (void *) copyState; errorCallback.previous = error_context_stack; + /* + * From here on we use copyStatement as the template for the command + * that we send to workers. This command does not have an attribute + * list since NextCopyFrom will generate a value for all columns. + */ + copyStatement->attlist = NIL; + while (true) { bool nextRowFound = false; @@ -1074,22 +952,46 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; - const char *copyFormat = NULL; AppendShardIdToName(&shardName, shardId); shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + appendStringInfo(command, "COPY %s ", shardQualifiedName); + + if (copyStatement->attlist != NIL) + { + ListCell *columnNameCell = NULL; + bool appendedFirstName = false; + + foreach(columnNameCell, copyStatement->attlist) + { + char *columnName = (char *) lfirst(columnNameCell); + + if (!appendedFirstName) + { + appendStringInfo(command, "(%s", columnName); + appendedFirstName = true; + } + else + { + appendStringInfo(command, ", %s", columnName); + } + } + + appendStringInfoString(command, ") "); + } + + appendStringInfo(command, "FROM STDIN WITH "); + if (useBinaryCopyFormat) { - copyFormat = "BINARY"; + appendStringInfoString(command, "(FORMAT BINARY)"); } else { - copyFormat = "TEXT"; + appendStringInfoString(command, "(FORMAT TEXT)"); } - appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName, - copyFormat); return command; } @@ -1277,7 +1179,6 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, { CopySendInt16(rowOutputState, availableColumnCount); } - for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++) { Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex]; @@ -1694,3 +1595,379 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CopySendData(cstate, start, pointer - start); } } + + +/* + * CreateCitusCopyDestReceiver creates a DestReceiver that copies into + * a distributed table. + */ +CitusCopyDestReceiver * +CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState, + bool stopOnFailure) +{ + CitusCopyDestReceiver *copyDest = NULL; + + copyDest = (CitusCopyDestReceiver *) palloc0(sizeof(CitusCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; + copyDest->pub.rStartup = CitusCopyDestReceiverStartup; + copyDest->pub.rShutdown = CitusCopyDestReceiverShutdown; + copyDest->pub.rDestroy = CitusCopyDestReceiverDestroy; + copyDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + copyDest->distributedRelationId = tableId; + copyDest->columnNameList = columnNameList; + copyDest->executorState = executorState; + copyDest->stopOnFailure = stopOnFailure; + copyDest->memoryContext = CurrentMemoryContext; + + return copyDest; +} + + +/* + * CitusCopyDestReceiverStartup implements the rStartup interface of + * CitusCopyDestReceiver. It opens the relation + */ +static void +CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + + Oid tableId = copyDest->distributedRelationId; + + char *relationName = get_rel_name(tableId); + Oid schemaOid = get_rel_namespace(tableId); + char *schemaName = get_namespace_name(schemaOid); + + Relation distributedRelation = NULL; + int columnIndex = 0; + List *columnNameList = copyDest->columnNameList; + + ListCell *columnNameCell = NULL; + + char partitionMethod = '\0'; + Var *partitionColumn = PartitionColumn(tableId, 0); + int partitionColumnIndex = -1; + DistTableCacheEntry *cacheEntry = NULL; + + CopyStmt *copyStatement = NULL; + + List *shardIntervalList = NULL; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + /* look up table properties */ + distributedRelation = heap_open(tableId, RowExclusiveLock); + cacheEntry = DistributedTableCacheEntry(tableId); + partitionMethod = cacheEntry->partitionMethod; + + copyDest->distributedRelation = distributedRelation; + copyDest->tupleDescriptor = inputTupleDescriptor; + + /* we don't support copy to reference tables from workers */ + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + EnsureCoordinator(); + } + + /* load the list of shards and verify that we have shards to copy into */ + shardIntervalList = LoadShardIntervalList(tableId); + if (shardIntervalList == NIL) + { + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName), + errhint("Run master_create_worker_shards to create shards " + "and try again."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName))); + } + } + + /* error if any shard missing min/max values */ + if (partitionMethod != DISTRIBUTE_BY_NONE && + cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not start copy"), + errdetail("Distributed relation \"%s\" has shards " + "with missing shardminvalue/shardmaxvalue.", + relationName))); + } + + /* prevent concurrent placement changes and non-commutative DML statements */ + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); + + /* keep the table metadata to avoid looking it up for every tuple */ + copyDest->tableMetadata = cacheEntry; + + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution) + { + copyDest->useBinarySearch = true; + } + + if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + { + CoordinatedTransactionUse2PC(); + } + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor, copyOutState); + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); + copyDest->copyOutState = copyOutState; + + /* prepare output functions */ + copyDest->columnOutputFunctions = + ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + + /* find the partition column index in the column list */ + foreach(columnNameCell, columnNameList) + { + char *columnName = (char *) lfirst(columnNameCell); + + /* load the column information from pg_attribute */ + AttrNumber attrNumber = get_attnum(tableId, columnName); + + /* check whether this is the partition column */ + if (partitionColumn != NULL && attrNumber == partitionColumn->varattno) + { + Assert(partitionColumnIndex == -1); + + partitionColumnIndex = columnIndex; + } + + columnIndex++; + } + + if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column of table %s should have a value", + quote_qualified_identifier(schemaName, relationName)))); + } + + copyDest->partitionColumnIndex = partitionColumnIndex; + + /* define the template for the COPY statement that is sent to workers */ + copyStatement = makeNode(CopyStmt); + copyStatement->relation = makeRangeVar(schemaName, relationName, -1); + copyStatement->query = NULL; + copyStatement->attlist = columnNameList; + copyStatement->is_from = true; + copyStatement->is_program = false; + copyStatement->filename = NULL; + copyStatement->options = NIL; + copyDest->copyStatement = copyStatement; + + copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); +} + + +/* + * CitusCopyDestReceiverReceive implements the receiveSlot function of + * CitusCopyDestReceiver. It takes a TupleTableSlot and sends the contents to + * the appropriate shard placement(s). + */ +#if PG_VERSION_NUM >= 90600 +static bool +#else +static void +#endif +CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + + DistTableCacheEntry *tableMetadata = copyDest->tableMetadata; + char partitionMethod = tableMetadata->partitionMethod; + int partitionColumnIndex = copyDest->partitionColumnIndex; + TupleDesc tupleDescriptor = copyDest->tupleDescriptor; + CopyStmt *copyStatement = copyDest->copyStatement; + + int shardCount = tableMetadata->shardIntervalArrayLength; + ShardInterval **shardIntervalCache = tableMetadata->sortedShardIntervalArray; + + bool useBinarySearch = copyDest->useBinarySearch; + FmgrInfo *hashFunction = tableMetadata->hashFunction; + FmgrInfo *compareFunction = tableMetadata->shardIntervalCompareFunction; + + HTAB *shardConnectionHash = copyDest->shardConnectionHash; + CopyOutState copyOutState = copyDest->copyOutState; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + + bool stopOnFailure = copyDest->stopOnFailure; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + + Datum partitionColumnValue = 0; + ShardInterval *shardInterval = NULL; + int64 shardId = 0; + + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + + EState *executorState = copyDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + /* + * Find the partition column value and corresponding shard interval + * for non-reference tables. + * Get the existing (and only a single) shard interval for the reference + * tables. Note that, reference tables has NULL partition column values so + * skip the check. + */ + if (partitionColumnIndex >= 0) + { + if (columnNulls[partitionColumnIndex]) + { + Oid relationId = copyDest->distributedRelationId; + char *relationName = get_rel_name(relationId); + Oid schemaOid = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaOid); + char *qualifiedTableName = quote_qualified_identifier(schemaName, + relationName); + + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column of table %s should have a value", + qualifiedTableName))); + } + + /* find the partition column value */ + partitionColumnValue = columnValues[partitionColumnIndex]; + } + + /* + * Find the shard interval and id for the partition column value for + * non-reference tables. + * + * For reference table, this function blindly returns the tables single + * shard. + */ + shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, hashFunction, + useBinarySearch); + if (shardInterval == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find shard for partition column " + "value"))); + } + + shardId = shardInterval->shardId; + + /* connections hash is kept in memory context */ + MemoryContextSwitchTo(copyDest->memoryContext); + + /* get existing connections to the shard placements, if any */ + shardConnections = GetShardHashConnections(shardConnectionHash, shardId, + &shardConnectionsFound); + if (!shardConnectionsFound) + { + /* open connections and initiate COPY on shard placements */ + OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, + copyOutState->binary); + + /* send copy binary headers to shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, shardId, + shardConnections->connectionList); + } + } + + /* replicate row to shard placements */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList); + + MemoryContextSwitchTo(oldContext); + +#if PG_VERSION_NUM >= 90600 + return true; +#endif +} + + +/* + * CitusCopyDestReceiverShutdown implements the rShutdown interface of + * CitusCopyDestReceiver. It ends the COPY on all the open connections and closes + * the relation. + */ +static void +CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; + + HTAB *shardConnectionHash = copyDest->shardConnectionHash; + List *shardConnectionsList = NIL; + ListCell *shardConnectionsCell = NULL; + CopyOutState copyOutState = copyDest->copyOutState; + Relation distributedRelation = copyDest->distributedRelation; + + shardConnectionsList = ShardConnectionList(shardConnectionHash); + foreach(shardConnectionsCell, shardConnectionsList) + { + ShardConnections *shardConnections = (ShardConnections *) lfirst( + shardConnectionsCell); + + /* send copy binary footers to all shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); + } + + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); + } + + heap_close(distributedRelation, NoLock); +} + + +static void +CitusCopyDestReceiverDestroy(DestReceiver *destReceiver) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; + + if (copyDest->copyOutState) + { + pfree(copyDest->copyOutState); + } + + if (copyDest->columnOutputFunctions) + { + pfree(copyDest->columnOutputFunctions); + } + + pfree(copyDest); +} diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index d27926bbf..cd3b56b73 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -13,7 +13,12 @@ #define MULTI_COPY_H +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" +#include "tcop/dest.h" + /* * A smaller version of copy.c's CopyStateData, trimmed to the elements @@ -43,8 +48,51 @@ typedef struct NodeAddress int32 nodePort; } NodeAddress; +/* CopyDestReceiver can be used to stream results into a distributed table */ +typedef struct CitusCopyDestReceiver +{ + /* public DestReceiver interface */ + DestReceiver pub; + + /* relation and columns to which to copy */ + Oid distributedRelationId; + List *columnNameList; + int partitionColumnIndex; + + /* distributed table metadata */ + DistTableCacheEntry *tableMetadata; + bool useBinarySearch; + + /* open relation handle */ + Relation distributedRelation; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* template for COPY statement to send to workers */ + CopyStmt *copyStatement; + + /* cached shard metadata for pruning */ + HTAB *shardConnectionHash; + bool stopOnFailure; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; +} CitusCopyDestReceiver; + /* function declarations for copying into a distributed table */ +extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, + List *columnNameList, + EState *executorState, + bool stopOnFailure); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index b23c2d26b..8529d01d4 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -75,12 +75,6 @@ CREATE TABLE nation ( n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); -ERROR: cannot distribute relation "nation" -DETAIL: Relation "nation" contains data. -HINT: Empty your table before distributing it. -TRUNCATE nation; SELECT create_reference_table('nation'); create_reference_table ------------------------ @@ -360,6 +354,155 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +SELECT create_distributed_table('data_load_test', 'col1', 'range'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- create_distributed_table creates shards and copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + ?column? +------------- + hello world +(1 row) + +DROP TABLE data_load_test1, data_load_test2; +END; +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +DROP TABLE data_load_test; +END; +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +END; +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test; + col1 | col3 +------+------- + 132 | world + 243 | hello +(2 rows) + +DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; CREATE TABLE lineitem_hash_part (like lineitem); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 7ff726ce1..6544b9f40 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -2,15 +2,10 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); -ERROR: cannot distribute relation "reference_table_test" -DETAIL: Relation "reference_table_test" contains data. -HINT: Empty your table before distributing it. -TRUNCATE reference_table_test; --- now should be able to create the reference table +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); +-- create the reference table SELECT create_reference_table('reference_table_test'); +NOTICE: Copying data from local table... create_reference_table ------------------------ @@ -52,8 +47,14 @@ WHERE 1250000 | 1 | localhost | 57638 (2 rows) +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 1 | 1 | 1 | Thu Dec 01 00:00:00 2016 +(1 row) + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 16703c580..f96e53361 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -723,7 +723,7 @@ CREATE TABLE numbers_hash(a int, b int); SELECT create_distributed_table('numbers_hash', 'a'); \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back @@ -739,7 +739,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); \. -- verify no row is inserted -SELECT * FROM numbers_hash; +SELECT count(a) FROM numbers_hash; -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 9b02124f6..e04c99149 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -978,17 +978,19 @@ SELECT create_distributed_table('numbers_hash', 'a'); (1 row) \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -ERROR: row field count is 2, expected 3 -DETAIL: (null) +ERROR: column "b" of relation "numbers_hash_560180" does not exist +CONTEXT: while executing command on localhost:57637 +COPY numbers_hash, line 1: "1,1" -- verify no row is inserted -SELECT * FROM numbers_hash; - a | b ----+--- -(0 rows) +SELECT count(a) FROM numbers_hash; + count +------- + 0 +(1 row) -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 2aa46b9a4..da20b263e 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -60,18 +60,6 @@ CREATE TABLE nation ( n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -1,'name',1,'comment_1' -2,'name',2,'comment_2' -3,'name',3,'comment_3' -4,'name',4,'comment_4' -5,'name',5,'comment_5' -\. - -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); - -TRUNCATE nation; - SELECT create_reference_table('nation'); CREATE TABLE part ( @@ -201,6 +189,86 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; + +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); + +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +SELECT create_distributed_table('data_load_test', 'col1', 'range'); + +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); + +-- create_distributed_table creates shards and copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + +DROP TABLE data_load_test1, data_load_test2; +END; + +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; + +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +DROP TABLE data_load_test; +END; + +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +END; + +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test; +DROP TABLE data_load_test; + SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index ddae075a7..96af1c710 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -4,14 +4,9 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); - -TRUNCATE reference_table_test; - --- now should be able to create the reference table +-- create the reference table SELECT create_reference_table('reference_table_test'); -- see that partkey is NULL @@ -36,8 +31,10 @@ FROM WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass); +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');