From bf3541cb244fad848bb68b24146d2574bee25b17 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Sun, 8 Jan 2017 18:50:50 -0800 Subject: [PATCH 1/6] Add CitusCopyDestReceiver infrastructure --- src/backend/distributed/commands/multi_copy.c | 428 +++++++++++++++++- src/include/distributed/multi_copy.h | 50 ++ 2 files changed, 472 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bfc73c676..d9cf8eade 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -68,6 +68,7 @@ #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "executor/executor.h" +#include "nodes/makefuncs.h" #include "tsearch/ts_locale.h" #include "utils/builtins.h" #include "utils/lsyscache.h" @@ -127,6 +128,19 @@ static void CopySendInt16(CopyOutState outputState, int16 val); static void CopyAttributeOutText(CopyOutState outputState, char *string); static inline void CopyFlushOutput(CopyOutState outputState, char *start, char *pointer); +/* CitusCopyDestReceiver functions */ +static void CitusCopyDestReceiverStartup(DestReceiver *copyDest, int operation, + TupleDesc inputTupleDesc); +#if PG_VERSION_NUM >= 90600 +static bool CitusCopyDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *copyDest); +#else +static void CitusCopyDestReceiverReceive(TupleTableSlot *slot, + DestReceiver *copyDest); +#endif +static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); +static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); + /* * CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy @@ -406,6 +420,13 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) /* create a mapping of shard id to a connection for each of its placements */ shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); + /* + * From here on we use copyStatement as the template for the command + * that we send to workers. This command does not have an attribute + * list since NextCopyFrom will generate a value for all columns. + */ + copyStatement->attlist = NIL; + /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; errorCallback.arg = (void *) copyState; @@ -604,6 +625,13 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) errorCallback.arg = (void *) copyState; errorCallback.previous = error_context_stack; + /* + * From here on we use copyStatement as the template for the command + * that we send to workers. This command does not have an attribute + * list since NextCopyFrom will generate a value for all columns. + */ + copyStatement->attlist = NIL; + while (true) { bool nextRowFound = false; @@ -1074,22 +1102,46 @@ ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCop char *shardName = pstrdup(relationName); char *shardQualifiedName = NULL; - const char *copyFormat = NULL; AppendShardIdToName(&shardName, shardId); shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + appendStringInfo(command, "COPY %s ", shardQualifiedName); + + if (copyStatement->attlist != NIL) + { + ListCell *columnNameCell = NULL; + bool appendedFirstName = false; + + foreach(columnNameCell, copyStatement->attlist) + { + char *columnName = (char *) lfirst(columnNameCell); + + if (!appendedFirstName) + { + appendStringInfo(command, "(%s", columnName); + appendedFirstName = true; + } + else + { + appendStringInfo(command, ", %s", columnName); + } + } + + appendStringInfoString(command, ") "); + } + + appendStringInfo(command, "FROM STDIN WITH "); + if (useBinaryCopyFormat) { - copyFormat = "BINARY"; + appendStringInfoString(command, "(FORMAT BINARY)"); } else { - copyFormat = "TEXT"; + appendStringInfoString(command, "(FORMAT TEXT)"); } - appendStringInfo(command, "COPY %s FROM STDIN WITH (FORMAT %s)", shardQualifiedName, - copyFormat); return command; } @@ -1277,7 +1329,6 @@ AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, { CopySendInt16(rowOutputState, availableColumnCount); } - for (columnIndex = 0; columnIndex < totalColumnCount; columnIndex++) { Form_pg_attribute currentColumn = rowDescriptor->attrs[columnIndex]; @@ -1694,3 +1745,368 @@ CopyFlushOutput(CopyOutState cstate, char *start, char *pointer) CopySendData(cstate, start, pointer - start); } } + + +/* + * CreateCitusCopyDestReceiver creates a DestReceiver that copies into + * a distributed table. + */ +CitusCopyDestReceiver * +CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorState, + bool stopOnFailure) +{ + CitusCopyDestReceiver *copyDest = NULL; + + copyDest = (CitusCopyDestReceiver *) palloc0(sizeof(CitusCopyDestReceiver)); + + /* set up the DestReceiver function pointers */ + copyDest->pub.receiveSlot = CitusCopyDestReceiverReceive; + copyDest->pub.rStartup = CitusCopyDestReceiverStartup; + copyDest->pub.rShutdown = CitusCopyDestReceiverShutdown; + copyDest->pub.rDestroy = CitusCopyDestReceiverDestroy; + copyDest->pub.mydest = DestCopyOut; + + /* set up output parameters */ + copyDest->distributedRelationId = tableId; + copyDest->columnNameList = columnNameList; + copyDest->executorState = executorState; + copyDest->stopOnFailure = stopOnFailure; + copyDest->memoryContext = CurrentMemoryContext; + + return copyDest; +} + + +static void +CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, + TupleDesc inputTupleDescriptor) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + + Oid tableId = copyDest->distributedRelationId; + + char *relationName = get_rel_name(tableId); + Oid schemaOid = get_rel_namespace(tableId); + char *schemaName = get_namespace_name(schemaOid); + + Relation distributedRelation = NULL; + int columnIndex = 0; + List *columnNameList = copyDest->columnNameList; + + ListCell *columnNameCell = NULL; + + char partitionMethod = '\0'; + Var *partitionColumn = NULL; + int partitionColumnIndex = -1; + DistTableCacheEntry *cacheEntry = NULL; + + CopyStmt *copyStatement = NULL; + + List *shardIntervalList = NULL; + + CopyOutState copyOutState = NULL; + const char *delimiterCharacter = "\t"; + const char *nullPrintCharacter = "\\N"; + + /* look up table properties */ + distributedRelation = heap_open(tableId, RowExclusiveLock); + cacheEntry = DistributedTableCacheEntry(tableId); + partitionMethod = cacheEntry->partitionMethod; + + copyDest->distributedRelation = distributedRelation; + copyDest->partitionMethod = partitionMethod; + + if (partitionMethod == DISTRIBUTE_BY_NONE) + { + /* we don't support copy to reference tables from workers */ + EnsureSchemaNode(); + } + else + { + partitionColumn = PartitionColumn(tableId, 0); + } + + /* load the list of shards and verify that we have shards to copy into */ + shardIntervalList = LoadShardIntervalList(tableId); + if (shardIntervalList == NIL) + { + if (partitionMethod == DISTRIBUTE_BY_HASH) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName), + errhint("Run master_create_worker_shards to create shards " + "and try again."))); + } + else + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find any shards into which to copy"), + errdetail("No shards exist for distributed table \"%s\".", + relationName))); + } + } + + /* prevent concurrent placement changes and non-commutative DML statements */ + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); + + /* error if any shard missing min/max values */ + if (partitionMethod != DISTRIBUTE_BY_NONE && + cacheEntry->hasUninitializedShardInterval) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not start copy"), + errdetail("Distributed relation \"%s\" has shards " + "with missing shardminvalue/shardmaxvalue.", + relationName))); + } + + copyDest->hashFunction = cacheEntry->hashFunction; + copyDest->compareFunction = cacheEntry->shardIntervalCompareFunction; + + /* initialize the shard interval cache */ + copyDest->shardCount = cacheEntry->shardIntervalArrayLength; + copyDest->shardIntervalCache = cacheEntry->sortedShardIntervalArray; + + /* determine whether to use binary search */ + if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution) + { + copyDest->useBinarySearch = true; + } + + /* define how tuples will be serialised */ + copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); + copyOutState->delim = (char *) delimiterCharacter; + copyOutState->null_print = (char *) nullPrintCharacter; + copyOutState->null_print_client = (char *) nullPrintCharacter; + copyOutState->binary = CanUseBinaryCopyFormat(inputTupleDescriptor, copyOutState); + copyOutState->fe_msgbuf = makeStringInfo(); + copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); + copyDest->copyOutState = copyOutState; + + copyDest->tupleDescriptor = inputTupleDescriptor; + + /* prepare output functions */ + copyDest->columnOutputFunctions = + ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + + foreach(columnNameCell, columnNameList) + { + char *columnName = (char *) lfirst(columnNameCell); + + /* load the column information from pg_attribute */ + AttrNumber attrNumber = get_attnum(tableId, columnName); + + /* check whether this is the partition column */ + if (partitionColumn != NULL && attrNumber == partitionColumn->varattno) + { + Assert(partitionColumnIndex == -1); + + partitionColumnIndex = columnIndex; + } + + columnIndex++; + } + + if (partitionMethod != DISTRIBUTE_BY_NONE && partitionColumnIndex == -1) + { + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column of table %s should have a value", + quote_qualified_identifier(schemaName, relationName)))); + } + + copyDest->partitionColumnIndex = partitionColumnIndex; + + /* define the template for the COPY statement that is sent to workers */ + copyStatement = makeNode(CopyStmt); + copyStatement->relation = makeRangeVar(schemaName, relationName, -1); + copyStatement->query = NULL; + copyStatement->attlist = columnNameList; + copyStatement->is_from = true; + copyStatement->is_program = false; + copyStatement->filename = NULL; + copyStatement->options = NIL; + copyDest->copyStatement = copyStatement; + + copyDest->copyConnectionHash = CreateShardConnectionHash(TopTransactionContext); +} + + +#if PG_VERSION_NUM >= 90600 +static bool +#else +static void +#endif +CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; + + char partitionMethod = copyDest->partitionMethod; + int partitionColumnIndex = copyDest->partitionColumnIndex; + TupleDesc tupleDescriptor = copyDest->tupleDescriptor; + CopyStmt *copyStatement = copyDest->copyStatement; + + int shardCount = copyDest->shardCount; + ShardInterval **shardIntervalCache = copyDest->shardIntervalCache; + + bool useBinarySearch = copyDest->useBinarySearch; + FmgrInfo *hashFunction = copyDest->hashFunction; + FmgrInfo *compareFunction = copyDest->compareFunction; + + HTAB *copyConnectionHash = copyDest->copyConnectionHash; + CopyOutState copyOutState = copyDest->copyOutState; + FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; + + bool stopOnFailure = copyDest->stopOnFailure; + + Datum *columnValues = NULL; + bool *columnNulls = NULL; + + Datum partitionColumnValue = 0; + ShardInterval *shardInterval = NULL; + int64 shardId = 0; + + bool shardConnectionsFound = false; + ShardConnections *shardConnections = NULL; + + EState *executorState = copyDest->executorState; + MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); + MemoryContext oldContext = MemoryContextSwitchTo(executorTupleContext); + + slot_getallattrs(slot); + + columnValues = slot->tts_values; + columnNulls = slot->tts_isnull; + + /* + * Find the partition column value and corresponding shard interval + * for non-reference tables. + * Get the existing (and only a single) shard interval for the reference + * tables. Note that, reference tables has NULL partition column values so + * skip the check. + */ + if (partitionColumnIndex >= 0) + { + if (columnNulls[partitionColumnIndex]) + { + Oid relationId = copyDest->distributedRelationId; + char *relationName = get_rel_name(relationId); + Oid schemaOid = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaOid); + char *qualifiedTableName = quote_qualified_identifier(schemaName, + relationName); + + ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), + errmsg("the partition column of table %s should have a value", + qualifiedTableName))); + } + + /* find the partition column value */ + partitionColumnValue = columnValues[partitionColumnIndex]; + } + + /* + * Find the shard interval and id for the partition column value for + * non-reference tables. + * + * For reference table, this function blindly returns the tables single + * shard. + */ + shardInterval = FindShardInterval(partitionColumnValue, shardIntervalCache, + shardCount, partitionMethod, + compareFunction, hashFunction, + useBinarySearch); + if (shardInterval == NULL) + { + ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("could not find shard for partition column " + "value"))); + } + + shardId = shardInterval->shardId; + + /* connections hash is kept in memory context */ + MemoryContextSwitchTo(copyDest->memoryContext); + + /* get existing connections to the shard placements, if any */ + shardConnections = GetShardHashConnections(copyConnectionHash, shardId, + &shardConnectionsFound); + if (!shardConnectionsFound) + { + /* open connections and initiate COPY on shard placements */ + OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, + copyOutState->binary); + + /* send copy binary headers to shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryHeaders(copyOutState, shardId, + shardConnections->connectionList); + } + } + + /* replicate row to shard placements */ + resetStringInfo(copyOutState->fe_msgbuf); + AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, + copyOutState, columnOutputFunctions); + SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, shardConnections->connectionList); + + MemoryContextSwitchTo(oldContext); + +#if PG_VERSION_NUM >= 90600 + return true; +#endif +} + + +static void +CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; + + HTAB *shardConnectionHash = copyDest->copyConnectionHash; + List *shardConnectionsList = NIL; + ListCell *shardConnectionsCell = NULL; + CopyOutState copyOutState = copyDest->copyOutState; + Relation distributedRelation = copyDest->distributedRelation; + + shardConnectionsList = ShardConnectionList(shardConnectionHash); + foreach(shardConnectionsCell, shardConnectionsList) + { + ShardConnections *shardConnections = (ShardConnections *) lfirst( + shardConnectionsCell); + + /* send copy binary footers to all shard placements */ + if (copyOutState->binary) + { + SendCopyBinaryFooters(copyOutState, shardConnections->shardId, + shardConnections->connectionList); + } + + /* close the COPY input on all shard placements */ + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); + } + + heap_close(distributedRelation, NoLock); +} + + +static void +CitusCopyDestReceiverDestroy(DestReceiver *destReceiver) +{ + CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; + + if (copyDest->copyOutState) + { + pfree(copyDest->copyOutState); + } + + if (copyDest->columnOutputFunctions) + { + pfree(copyDest->columnOutputFunctions); + } + + pfree(copyDest); +} diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index d27926bbf..7b1c6dd04 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -13,7 +13,11 @@ #define MULTI_COPY_H +#include "distributed/master_metadata_utility.h" +#include "nodes/execnodes.h" #include "nodes/parsenodes.h" +#include "tcop/dest.h" + /* * A smaller version of copy.c's CopyStateData, trimmed to the elements @@ -43,8 +47,54 @@ typedef struct NodeAddress int32 nodePort; } NodeAddress; +/* CopyDestReceiver can be used to stream results into a distributed table */ +typedef struct CitusCopyDestReceiver +{ + DestReceiver pub; + + /* relation and columns to which to copy */ + Oid distributedRelationId; + List *columnNameList; + + /* EState for per-tuple memory allocation */ + EState *executorState; + + /* MemoryContext for DestReceiver session */ + MemoryContext memoryContext; + + /* distributed relation details */ + Relation distributedRelation; + char partitionMethod; + int partitionColumnIndex; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; + + /* template for COPY statement to send to workers */ + CopyStmt *copyStatement; + + /* cached shard metadata for pruning */ + int shardCount; + ShardInterval **shardIntervalCache; + bool useBinarySearch; + FmgrInfo *hashFunction; + FmgrInfo *compareFunction; + + /* cached shard metadata for pruning */ + HTAB *copyConnectionHash; + bool stopOnFailure; + + /* state on how to copy out data types */ + CopyOutState copyOutState; + FmgrInfo *columnOutputFunctions; +} CitusCopyDestReceiver; + /* function declarations for copying into a distributed table */ +extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, + List *columnNameList, + EState *executorState, + bool stopOnFailure); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, TupleDesc rowDescriptor, From d11eca7d4a104da991dd062c0942c1babed81905 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 10 Jan 2017 07:51:40 +0100 Subject: [PATCH 2/6] Load data into distributed table on creation --- .../commands/create_distributed_table.c | 182 +++++++++++------- src/backend/distributed/commands/multi_copy.c | 2 +- .../regress/expected/multi_create_table.out | 48 ++++- .../expected/multi_reference_table.out | 20 +- src/test/regress/sql/multi_create_table.sql | 32 +-- .../regress/sql/multi_reference_table.sql | 13 +- 6 files changed, 193 insertions(+), 104 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 43dd3a7ee..285835340 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,6 +9,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -39,11 +40,14 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" +#include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" +#include "executor/executor.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" @@ -52,10 +56,14 @@ #include "parser/parse_node.h" #include "parser/parse_relation.h" #include "parser/parser.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/inval.h" @@ -72,7 +80,6 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnNa static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); -static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void ErrorIfNotSupportedForeignConstraint(Relation relation, @@ -83,6 +90,7 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); +static void CopyLocalData(Oid relationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -143,6 +151,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; + char relationKind = 0; EnsureCoordinator(); @@ -195,6 +204,13 @@ create_distributed_table(PG_FUNCTION_ARGS) colocateWithTableName, ShardCount, ShardReplicationFactor); + /* copy over data from regular relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + CopyLocalData(relationId); + } + if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); @@ -213,9 +229,17 @@ Datum create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); + char relationKind = 0; CreateReferenceTable(relationId); + /* copy over data from regular relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + CopyLocalData(relationId); + } + PG_RETURN_VOID(); } @@ -322,17 +346,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } - /* check that the relation does not contain any rows */ - if (!LocalTableEmpty(relationId)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot distribute relation \"%s\"", - relationName), - errdetail("Relation \"%s\" contains data.", - relationName), - errhint("Empty your table before distributing it."))); - } - /* * Distribution column returns NULL for reference tables, * but it is not used below for reference tables. @@ -816,62 +829,6 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, } -/* - * LocalTableEmpty function checks whether given local table contains any row and - * returns false if there is any data. This function is only for local tables and - * should not be called for distributed tables. - */ -static bool -LocalTableEmpty(Oid tableId) -{ - Oid schemaId = get_rel_namespace(tableId); - char *schemaName = get_namespace_name(schemaId); - char *tableName = get_rel_name(tableId); - char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); - - int spiConnectionResult = 0; - int spiQueryResult = 0; - StringInfo selectExistQueryString = makeStringInfo(); - - HeapTuple tuple = NULL; - Datum hasDataDatum = 0; - bool localTableEmpty = false; - bool columnNull = false; - bool readOnly = true; - - int rowId = 0; - int attributeId = 1; - - AssertArg(!IsDistributedTable(tableId)); - - spiConnectionResult = SPI_connect(); - if (spiConnectionResult != SPI_OK_CONNECT) - { - ereport(ERROR, (errmsg("could not connect to SPI manager"))); - } - - appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName); - - spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0); - if (spiQueryResult != SPI_OK_SELECT) - { - ereport(ERROR, (errmsg("execution was not successful \"%s\"", - selectExistQueryString->data))); - } - - /* we expect that SELECT EXISTS query will return single value in a single row */ - Assert(SPI_processed == 1); - - tuple = SPI_tuptable->vals[rowId]; - hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull); - localTableEmpty = !DatumGetBool(hasDataDatum); - - SPI_finish(); - - return localTableEmpty; -} - - /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. @@ -1021,3 +978,92 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) "factor\" to one%s.", extraHint))); } } + + +/* + * CopyLocalData copies local data into the shards. + */ +static void +CopyLocalData(Oid relationId) +{ + DestReceiver *copyDest = NULL; + List *columnNameList = NIL; + Relation distributedRelation = NULL; + TupleDesc tupleDescriptor = NULL; + int columnIndex = 0; + bool stopOnFailure = true; + + EState *estate = NULL; + HeapScanDesc scan = NULL; + HeapTuple tuple = NULL; + ExprContext *econtext = NULL; + MemoryContext oldContext = NULL; + TupleTableSlot *slot = NULL; + uint64 rowsCopied = 0; + + distributedRelation = heap_open(relationId, ExclusiveLock); + tupleDescriptor = RelationGetDescr(distributedRelation); + slot = MakeSingleTupleTableSlot(tupleDescriptor); + + for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = slot; + + copyDest = + (DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList, + estate, stopOnFailure); + + copyDest->rStartup(copyDest, 0, tupleDescriptor); + + scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + copyDest->receiveSlot(slot, copyDest); + + CHECK_FOR_INTERRUPTS(); + + ResetPerTupleExprContext(estate); + + if (rowsCopied == 0) + { + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); + } + } + + if (rowsCopied % 1000000 != 0) + { + ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); + } + + MemoryContextSwitchTo(oldContext); + heap_endscan(scan); + copyDest->rShutdown(copyDest); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d9cf8eade..dfdd7c1da 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1819,7 +1819,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, if (partitionMethod == DISTRIBUTE_BY_NONE) { /* we don't support copy to reference tables from workers */ - EnsureSchemaNode(); + EnsureCoordinator(); } else { diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index b23c2d26b..988228c0d 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -75,12 +75,6 @@ CREATE TABLE nation ( n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); -ERROR: cannot distribute relation "nation" -DETAIL: Relation "nation" contains data. -HINT: Empty your table before distributing it. -TRUNCATE nation; SELECT create_reference_table('nation'); create_reference_table ------------------------ @@ -360,6 +354,48 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); +-- create_distributed_table copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... +NOTICE: Copied 2 rows + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test; + col1 | col2 +------+------- + 132 | hello + 243 | world +(2 rows) + +DROP TABLE data_load_test; +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... +NOTICE: Copied 2 rows + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test; + col1 | col3 +------+------- + 132 | world + 243 | hello +(2 rows) + +DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; CREATE TABLE lineitem_hash_part (like lineitem); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 7ff726ce1..a7da8812c 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -2,15 +2,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); -ERROR: cannot distribute relation "reference_table_test" -DETAIL: Relation "reference_table_test" contains data. -HINT: Empty your table before distributing it. -TRUNCATE reference_table_test; --- now should be able to create the reference table +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); +-- create the reference table SELECT create_reference_table('reference_table_test'); +NOTICE: Copying data from local table... +NOTICE: Copied 1 rows create_reference_table ------------------------ @@ -52,8 +48,14 @@ WHERE 1250000 | 1 | localhost | 57638 (2 rows) +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 1 | 1 | 1 | Thu Dec 01 00:00:00 2016 +(1 row) + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 2aa46b9a4..07b063fcf 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -60,18 +60,6 @@ CREATE TABLE nation ( n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -1,'name',1,'comment_1' -2,'name',2,'comment_2' -3,'name',3,'comment_3' -4,'name',4,'comment_4' -5,'name',5,'comment_5' -\. - -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); - -TRUNCATE nation; - SELECT create_reference_table('nation'); CREATE TABLE part ( @@ -201,6 +189,26 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; + +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); + +-- create_distributed_table copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test; +DROP TABLE data_load_test; + +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test; +DROP TABLE data_load_test; + SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index ddae075a7..96af1c710 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -4,14 +4,9 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); - -TRUNCATE reference_table_test; - --- now should be able to create the reference table +-- create the reference table SELECT create_reference_table('reference_table_test'); -- see that partkey is NULL @@ -36,8 +31,10 @@ FROM WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass); +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); From d74fb764b13b23b6d753e343f535bedf16942d1c Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Wed, 25 Jan 2017 14:14:17 +0100 Subject: [PATCH 3/6] Use CitusCopyDestReceiver for regular COPY --- src/backend/distributed/commands/multi_copy.c | 244 ++++-------------- src/test/regress/input/multi_copy.source | 4 +- src/test/regress/output/multi_copy.source | 16 +- 3 files changed, 58 insertions(+), 206 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index dfdd7c1da..f98dd83a7 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -287,49 +287,31 @@ static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) { Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); - char *relationName = get_rel_name(tableId); + + CitusCopyDestReceiver *copyDest = NULL; + DestReceiver *dest = NULL; + Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; uint32 columnCount = 0; Datum *columnValues = NULL; bool *columnNulls = NULL; - FmgrInfo *hashFunction = NULL; - FmgrInfo *compareFunction = NULL; - bool hasUniformHashDistribution = false; - DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); - const char *delimiterCharacter = "\t"; - const char *nullPrintCharacter = "\\N"; - - int shardCount = 0; - List *shardIntervalList = NULL; - ShardInterval **shardIntervalCache = NULL; - bool useBinarySearch = false; - - HTAB *shardConnectionHash = NULL; - ShardConnections *shardConnections = NULL; - List *shardConnectionsList = NIL; - ListCell *shardConnectionsCell = NULL; + int columnIndex = 0; + List *columnNameList = NIL; + TupleTableSlot *tupleTableSlot = NULL; EState *executorState = NULL; MemoryContext executorTupleContext = NULL; ExprContext *executorExpressionContext = NULL; + char partitionMethod = 0; + bool stopOnFailure = false; + CopyState copyState = NULL; - CopyOutState copyOutState = NULL; - FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0; - Var *partitionColumn = PartitionColumn(tableId, 0); - char partitionMethod = PartitionMethod(tableId); - ErrorContextCallback errorCallback; - /* get hash function for partition column */ - hashFunction = cacheEntry->hashFunction; - - /* get compare function for shard intervals */ - compareFunction = cacheEntry->shardIntervalCompareFunction; - /* allocate column values and nulls arrays */ distributedRelation = heap_open(tableId, RowExclusiveLock); tupleDescriptor = RelationGetDescr(distributedRelation); @@ -337,64 +319,40 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) columnValues = palloc0(columnCount * sizeof(Datum)); columnNulls = palloc0(columnCount * sizeof(bool)); - /* we don't support copy to reference tables from workers */ + /* set up a virtual tuple table slot */ + tupleTableSlot = MakeSingleTupleTableSlot(tupleDescriptor); + tupleTableSlot->tts_nvalid = columnCount; + tupleTableSlot->tts_values = columnValues; + tupleTableSlot->tts_isnull = columnNulls; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + executorState = CreateExecutorState(); + executorTupleContext = GetPerTupleMemoryContext(executorState); + executorExpressionContext = GetPerTupleExprContext(executorState); + + partitionMethod = PartitionMethod(tableId); if (partitionMethod == DISTRIBUTE_BY_NONE) { - EnsureCoordinator(); + stopOnFailure = true; } - /* load the list of shards and verify that we have shards to copy into */ - shardIntervalList = LoadShardIntervalList(tableId); - if (shardIntervalList == NIL) - { - if (partitionMethod == DISTRIBUTE_BY_HASH) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName), - errhint("Run master_create_worker_shards to create shards " - "and try again."))); - } - else - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find any shards into which to copy"), - errdetail("No shards exist for distributed table \"%s\".", - relationName))); - } - } - - /* error if any shard missing min/max values for non reference tables */ - if (partitionMethod != DISTRIBUTE_BY_NONE && - cacheEntry->hasUninitializedShardInterval) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not start copy"), - errdetail("Distributed relation \"%s\" has shards " - "with missing shardminvalue/shardmaxvalue.", - relationName))); - } - - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - LockShardListResources(shardIntervalList, ShareLock); - - /* initialize the shard interval cache */ - shardCount = cacheEntry->shardIntervalArrayLength; - shardIntervalCache = cacheEntry->sortedShardIntervalArray; - hasUniformHashDistribution = cacheEntry->hasUniformHashDistribution; - - /* determine whether to use binary search */ - if (partitionMethod != DISTRIBUTE_BY_HASH || !hasUniformHashDistribution) - { - useBinarySearch = true; - } - - if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) - { - CoordinatedTransactionUse2PC(); - } + /* set up the destination for the COPY */ + copyDest = CreateCitusCopyDestReceiver(tableId, columnNameList, executorState, + stopOnFailure); + dest = (DestReceiver *) copyDest; + dest->rStartup(dest, 0, tupleDescriptor); /* initialize copy state to read from COPY data source */ copyState = BeginCopyFrom(distributedRelation, @@ -403,30 +361,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) copyStatement->attlist, copyStatement->options); - executorState = CreateExecutorState(); - executorTupleContext = GetPerTupleMemoryContext(executorState); - executorExpressionContext = GetPerTupleExprContext(executorState); - - copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); - copyOutState->delim = (char *) delimiterCharacter; - copyOutState->null_print = (char *) nullPrintCharacter; - copyOutState->null_print_client = (char *) nullPrintCharacter; - copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState); - copyOutState->fe_msgbuf = makeStringInfo(); - copyOutState->rowcontext = executorTupleContext; - - columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); - - /* create a mapping of shard id to a connection for each of its placements */ - shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); - - /* - * From here on we use copyStatement as the template for the command - * that we send to workers. This command does not have an attribute - * list since NextCopyFrom will generate a value for all columns. - */ - copyStatement->attlist = NIL; - /* set up callback to identify error line number */ errorCallback.callback = CopyFromErrorCallback; errorCallback.arg = (void *) copyState; @@ -436,10 +370,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) while (true) { bool nextRowFound = false; - Datum partitionColumnValue = 0; - ShardInterval *shardInterval = NULL; - int64 shardId = 0; - bool shardConnectionsFound = false; MemoryContext oldContext = NULL; ResetPerTupleExprContext(executorState); @@ -458,103 +388,23 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CHECK_FOR_INTERRUPTS(); - /* - * Find the partition column value and corresponding shard interval - * for non-reference tables. - * Get the existing (and only a single) shard interval for the reference - * tables. Note that, reference tables has NULL partition column values so - * skip the check. - */ - if (partitionColumn != NULL) - { - if (columnNulls[partitionColumn->varattno - 1]) - { - ereport(ERROR, (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), - errmsg("cannot copy row with NULL value " - "in partition column"))); - } - - partitionColumnValue = columnValues[partitionColumn->varattno - 1]; - } - - /* - * Find the shard interval and id for the partition column value for - * non-reference tables. - * For reference table, this function blindly returns the tables single - * shard. - */ - shardInterval = FindShardInterval(partitionColumnValue, - shardIntervalCache, - shardCount, partitionMethod, - compareFunction, hashFunction, - useBinarySearch); - - if (shardInterval == NULL) - { - ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), - errmsg("could not find shard for partition column " - "value"))); - } - - shardId = shardInterval->shardId; - MemoryContextSwitchTo(oldContext); - /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(shardConnectionHash, shardId, - &shardConnectionsFound); - if (!shardConnectionsFound) - { - bool stopOnFailure = false; - - if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) - { - stopOnFailure = true; - } - - /* open connections and initiate COPY on shard placements */ - OpenCopyConnections(copyStatement, shardConnections, stopOnFailure, - copyOutState->binary); - - /* send copy binary headers to shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryHeaders(copyOutState, shardId, - shardConnections->connectionList); - } - } - - /* replicate row to shard placements */ - resetStringInfo(copyOutState->fe_msgbuf); - AppendCopyRowData(columnValues, columnNulls, tupleDescriptor, - copyOutState, columnOutputFunctions); - SendCopyDataToAll(copyOutState->fe_msgbuf, shardId, - shardConnections->connectionList); + dest->receiveSlot(tupleTableSlot, dest); processedRowCount += 1; } + EndCopyFrom(copyState); + /* all lines have been copied, stop showing line number in errors */ error_context_stack = errorCallback.previous; - shardConnectionsList = ShardConnectionList(shardConnectionHash); - foreach(shardConnectionsCell, shardConnectionsList) - { - ShardConnections *shardConnections = (ShardConnections *) lfirst( - shardConnectionsCell); + /* finish the COPY commands */ + dest->rShutdown(dest); - /* send copy binary footers to all shard placements */ - if (copyOutState->binary) - { - SendCopyBinaryFooters(copyOutState, shardConnections->shardId, - shardConnections->connectionList); - } - - /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, true); - } - - EndCopyFrom(copyState); + ExecDropSingleTupleTableSlot(tupleTableSlot); + FreeExecutorState(executorState); heap_close(distributedRelation, NoLock); /* mark failed placements as inactive */ diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 16703c580..f96e53361 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -723,7 +723,7 @@ CREATE TABLE numbers_hash(a int, b int); SELECT create_distributed_table('numbers_hash', 'a'); \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back @@ -739,7 +739,7 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); \. -- verify no row is inserted -SELECT * FROM numbers_hash; +SELECT count(a) FROM numbers_hash; -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 9b02124f6..e04c99149 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -978,17 +978,19 @@ SELECT create_distributed_table('numbers_hash', 'a'); (1 row) \c - - - :worker_1_port -ALTER TABLE numbers_hash_560180 ADD COLUMN c int; +ALTER TABLE numbers_hash_560180 DROP COLUMN b; \c - - - :master_port -- operation will fail to modify a shard and roll back COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -ERROR: row field count is 2, expected 3 -DETAIL: (null) +ERROR: column "b" of relation "numbers_hash_560180" does not exist +CONTEXT: while executing command on localhost:57637 +COPY numbers_hash, line 1: "1,1" -- verify no row is inserted -SELECT * FROM numbers_hash; - a | b ----+--- -(0 rows) +SELECT count(a) FROM numbers_hash; + count +------- + 0 +(1 row) -- verify shard is still marked as valid SELECT shardid, shardstate, nodename, nodeport From db98c2835432885b911515112ce0341c76ca3cde Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 28 Feb 2017 17:23:56 +0100 Subject: [PATCH 4/6] Address review feedback in COPY refactoring --- src/backend/distributed/commands/multi_copy.c | 65 +++++++++++-------- src/include/distributed/multi_copy.h | 30 ++++----- 2 files changed, 52 insertions(+), 43 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index f98dd83a7..68fc0b848 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1627,6 +1627,10 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, EState *executorS } +/* + * CitusCopyDestReceiverStartup implements the rStartup interface of + * CitusCopyDestReceiver. It opens the relation + */ static void CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor) @@ -1646,7 +1650,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, ListCell *columnNameCell = NULL; char partitionMethod = '\0'; - Var *partitionColumn = NULL; + Var *partitionColumn = PartitionColumn(tableId, 0); int partitionColumnIndex = -1; DistTableCacheEntry *cacheEntry = NULL; @@ -1664,17 +1668,13 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, partitionMethod = cacheEntry->partitionMethod; copyDest->distributedRelation = distributedRelation; - copyDest->partitionMethod = partitionMethod; + copyDest->tupleDescriptor = inputTupleDescriptor; + /* we don't support copy to reference tables from workers */ if (partitionMethod == DISTRIBUTE_BY_NONE) { - /* we don't support copy to reference tables from workers */ EnsureCoordinator(); } - else - { - partitionColumn = PartitionColumn(tableId, 0); - } /* load the list of shards and verify that we have shards to copy into */ shardIntervalList = LoadShardIntervalList(tableId); @@ -1698,10 +1698,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, } } - /* prevent concurrent placement changes and non-commutative DML statements */ - LockShardListMetadata(shardIntervalList, ShareLock); - LockShardListResources(shardIntervalList, ShareLock); - /* error if any shard missing min/max values */ if (partitionMethod != DISTRIBUTE_BY_NONE && cacheEntry->hasUninitializedShardInterval) @@ -1713,12 +1709,12 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, relationName))); } - copyDest->hashFunction = cacheEntry->hashFunction; - copyDest->compareFunction = cacheEntry->shardIntervalCompareFunction; + /* prevent concurrent placement changes and non-commutative DML statements */ + LockShardListMetadata(shardIntervalList, ShareLock); + LockShardListResources(shardIntervalList, ShareLock); - /* initialize the shard interval cache */ - copyDest->shardCount = cacheEntry->shardIntervalArrayLength; - copyDest->shardIntervalCache = cacheEntry->sortedShardIntervalArray; + /* keep the table metadata to avoid looking it up for every tuple */ + copyDest->tableMetadata = cacheEntry; /* determine whether to use binary search */ if (partitionMethod != DISTRIBUTE_BY_HASH || !cacheEntry->hasUniformHashDistribution) @@ -1726,6 +1722,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyDest->useBinarySearch = true; } + if (cacheEntry->replicationModel == REPLICATION_MODEL_2PC) + { + CoordinatedTransactionUse2PC(); + } + /* define how tuples will be serialised */ copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData)); copyOutState->delim = (char *) delimiterCharacter; @@ -1736,12 +1737,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyOutState->rowcontext = GetPerTupleMemoryContext(copyDest->executorState); copyDest->copyOutState = copyOutState; - copyDest->tupleDescriptor = inputTupleDescriptor; - /* prepare output functions */ copyDest->columnOutputFunctions = ColumnOutputFunctions(inputTupleDescriptor, copyOutState->binary); + /* find the partition column index in the column list */ foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); @@ -1780,10 +1780,15 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, copyStatement->options = NIL; copyDest->copyStatement = copyStatement; - copyDest->copyConnectionHash = CreateShardConnectionHash(TopTransactionContext); + copyDest->shardConnectionHash = CreateShardConnectionHash(TopTransactionContext); } +/* + * CitusCopyDestReceiverReceive implements the receiveSlot function of + * CitusCopyDestReceiver. It takes a TupleTableSlot and sends the contents to + * the appropriate shard placement(s). + */ #if PG_VERSION_NUM >= 90600 static bool #else @@ -1793,19 +1798,20 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - char partitionMethod = copyDest->partitionMethod; + DistTableCacheEntry *tableMetadata = copyDest->tableMetadata; + char partitionMethod = tableMetadata->partitionMethod; int partitionColumnIndex = copyDest->partitionColumnIndex; TupleDesc tupleDescriptor = copyDest->tupleDescriptor; CopyStmt *copyStatement = copyDest->copyStatement; - int shardCount = copyDest->shardCount; - ShardInterval **shardIntervalCache = copyDest->shardIntervalCache; + int shardCount = tableMetadata->shardIntervalArrayLength; + ShardInterval **shardIntervalCache = tableMetadata->sortedShardIntervalArray; bool useBinarySearch = copyDest->useBinarySearch; - FmgrInfo *hashFunction = copyDest->hashFunction; - FmgrInfo *compareFunction = copyDest->compareFunction; + FmgrInfo *hashFunction = tableMetadata->hashFunction; + FmgrInfo *compareFunction = tableMetadata->shardIntervalCompareFunction; - HTAB *copyConnectionHash = copyDest->copyConnectionHash; + HTAB *shardConnectionHash = copyDest->shardConnectionHash; CopyOutState copyOutState = copyDest->copyOutState; FmgrInfo *columnOutputFunctions = copyDest->columnOutputFunctions; @@ -1881,7 +1887,7 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) MemoryContextSwitchTo(copyDest->memoryContext); /* get existing connections to the shard placements, if any */ - shardConnections = GetShardHashConnections(copyConnectionHash, shardId, + shardConnections = GetShardHashConnections(shardConnectionHash, shardId, &shardConnectionsFound); if (!shardConnectionsFound) { @@ -1911,12 +1917,17 @@ CitusCopyDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) } +/* + * CitusCopyDestReceiverShutdown implements the rShutdown interface of + * CitusCopyDestReceiver. It ends the COPY on all the open connections and closes + * the relation. + */ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) destReceiver; - HTAB *shardConnectionHash = copyDest->copyConnectionHash; + HTAB *shardConnectionHash = copyDest->shardConnectionHash; List *shardConnectionsList = NIL; ListCell *shardConnectionsCell = NULL; CopyOutState copyOutState = copyDest->copyOutState; diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 7b1c6dd04..cd3b56b73 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -14,6 +14,7 @@ #include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" #include "nodes/execnodes.h" #include "nodes/parsenodes.h" #include "tcop/dest.h" @@ -50,11 +51,23 @@ typedef struct NodeAddress /* CopyDestReceiver can be used to stream results into a distributed table */ typedef struct CitusCopyDestReceiver { + /* public DestReceiver interface */ DestReceiver pub; /* relation and columns to which to copy */ Oid distributedRelationId; List *columnNameList; + int partitionColumnIndex; + + /* distributed table metadata */ + DistTableCacheEntry *tableMetadata; + bool useBinarySearch; + + /* open relation handle */ + Relation distributedRelation; + + /* descriptor of the tuples that are sent to the worker */ + TupleDesc tupleDescriptor; /* EState for per-tuple memory allocation */ EState *executorState; @@ -62,26 +75,11 @@ typedef struct CitusCopyDestReceiver /* MemoryContext for DestReceiver session */ MemoryContext memoryContext; - /* distributed relation details */ - Relation distributedRelation; - char partitionMethod; - int partitionColumnIndex; - - /* descriptor of the tuples that are sent to the worker */ - TupleDesc tupleDescriptor; - /* template for COPY statement to send to workers */ CopyStmt *copyStatement; /* cached shard metadata for pruning */ - int shardCount; - ShardInterval **shardIntervalCache; - bool useBinarySearch; - FmgrInfo *hashFunction; - FmgrInfo *compareFunction; - - /* cached shard metadata for pruning */ - HTAB *copyConnectionHash; + HTAB *shardConnectionHash; bool stopOnFailure; /* state on how to copy out data types */ From 56d4d375c2265fdab2b1b2400fbef999dc4d90e8 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 28 Feb 2017 17:24:24 +0100 Subject: [PATCH 5/6] Address review feedback in create_distributed_table data loading --- .../commands/create_distributed_table.c | 290 +++++++++++++----- .../regress/expected/multi_create_table.out | 125 +++++++- .../expected/multi_reference_table.out | 1 - src/test/regress/sql/multi_create_table.sql | 66 +++- 4 files changed, 393 insertions(+), 89 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 285835340..5968728ce 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -73,13 +73,14 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ -static void CreateReferenceTable(Oid relationId); +static void CreateReferenceTable(Oid distributedRelationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId); + uint32 colocationId, bool allowEmpty); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); +static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void ErrorIfNotSupportedForeignConstraint(Relation relation, @@ -90,7 +91,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalData(Oid relationId); +static void CopyLocalDataIntoShards(Oid relationId); +static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -114,6 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + bool allowEmpty = false; EnsureCoordinator(); @@ -129,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, allowEmpty); PG_RETURN_VOID(); } @@ -151,7 +154,6 @@ create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; - char relationKind = 0; EnsureCoordinator(); @@ -186,6 +188,8 @@ create_distributed_table(PG_FUNCTION_ARGS) /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { + bool allowEmpty = false; + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { ereport(NOTICE, (errmsg("using statement-based replication"), @@ -195,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(relationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, allowEmpty); PG_RETURN_VOID(); } @@ -204,13 +208,6 @@ create_distributed_table(PG_FUNCTION_ARGS) colocateWithTableName, ShardCount, ShardReplicationFactor); - /* copy over data from regular relations */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) - { - CopyLocalData(relationId); - } - if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); @@ -229,17 +226,9 @@ Datum create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - char relationKind = 0; CreateReferenceTable(relationId); - /* copy over data from regular relations */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) - { - CopyLocalData(relationId); - } - PG_RETURN_VOID(); } @@ -256,6 +245,8 @@ CreateReferenceTable(Oid relationId) List *workerNodeList = WorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; + bool canLoadData = false; + char relationKind = 0; EnsureCoordinator(); @@ -269,16 +260,30 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } + /* we only support data loading for regular (non-foreign) relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + canLoadData = true; + } + colocationId = CreateReferenceTableColocationId(); /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); + DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId, + canLoadData); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); CreateTableMetadataOnWorkers(relationId); + + /* copy over data from regular relations */ + if (canLoadData) + { + CopyLocalDataIntoShards(relationId); + } } @@ -296,7 +301,7 @@ CreateReferenceTable(Oid relationId) static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId) + uint32 colocationId, bool allowEmpty) { Relation relation = NULL; TupleDesc relationDesc = NULL; @@ -346,6 +351,17 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } + /* check that the relation does not contain any rows */ + if (!allowEmpty && !LocalTableEmpty(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", + relationName), + errdetail("Relation \"%s\" contains data.", + relationName), + errhint("Empty your table before distributing it."))); + } + /* * Distribution column returns NULL for reference tables, * but it is not used below for reference tables. @@ -829,6 +845,62 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, } +/* + * LocalTableEmpty function checks whether given local table contains any row and + * returns false if there is any data. This function is only for local tables and + * should not be called for distributed tables. + */ +static bool +LocalTableEmpty(Oid tableId) +{ + Oid schemaId = get_rel_namespace(tableId); + char *schemaName = get_namespace_name(schemaId); + char *tableName = get_rel_name(tableId); + char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); + + int spiConnectionResult = 0; + int spiQueryResult = 0; + StringInfo selectExistQueryString = makeStringInfo(); + + HeapTuple tuple = NULL; + Datum hasDataDatum = 0; + bool localTableEmpty = false; + bool columnNull = false; + bool readOnly = true; + + int rowId = 0; + int attributeId = 1; + + AssertArg(!IsDistributedTable(tableId)); + + spiConnectionResult = SPI_connect(); + if (spiConnectionResult != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName); + + spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0); + if (spiQueryResult != SPI_OK_SELECT) + { + ereport(ERROR, (errmsg("execution was not successful \"%s\"", + selectExistQueryString->data))); + } + + /* we expect that SELECT EXISTS query will return single value in a single row */ + Assert(SPI_processed == 1); + + tuple = SPI_tuptable->vals[rowId]; + hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull); + localTableEmpty = !DatumGetBool(hasDataDatum); + + SPI_finish(); + + return localTableEmpty; +} + + /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. @@ -872,6 +944,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; + bool canLoadData = false; + char relationKind = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); @@ -914,9 +988,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, colocationId = TableColocationId(sourceRelationId); } + /* we only support data loading for regular (non-foreign) relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + canLoadData = true; + } + /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId); + ReplicationModel, colocationId, canLoadData); /* create shards */ if (sourceRelationId != InvalidOid) @@ -933,6 +1014,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } + /* copy over data from regular relations */ + if (canLoadData) + { + CopyLocalDataIntoShards(relationId); + } + heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } @@ -981,16 +1068,37 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) /* - * CopyLocalData copies local data into the shards. + * CopyLocalDataIntoShards copies data from the local table, which is hidden + * after converting it to a distributed table, into the shards of the distributed + * table. + * + * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic. + * We cannot use a regular COPY here since that cannot read from a table. Instead + * we read from the table and pass each tuple to the CitusCopyDestReceiver which + * opens a connection and starts a COPY for each shard placement that will have + * data. + * + * We could call the planner and executor here and send the output to the + * DestReceiver, but we are in a tricky spot here since Citus is already + * intercepting queries on this table in the planner and executor hooks and we + * want to read from the local table. To keep it simple, we perform a heap scan + * directly on the table. + * + * Any writes on the table that are started during this operation will be handled + * as distributed queries once the current transaction commits. SELECTs will + * continue to read from the local table until the current transaction commits, + * after which new SELECTs will be handled as distributed queries. + * + * After copying local data into the distributed table, the local data remains + * in place and should be truncated at a later time. */ static void -CopyLocalData(Oid relationId) +CopyLocalDataIntoShards(Oid distributedRelationId) { DestReceiver *copyDest = NULL; List *columnNameList = NIL; Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; - int columnIndex = 0; bool stopOnFailure = true; EState *estate = NULL; @@ -1001,9 +1109,86 @@ CopyLocalData(Oid relationId) TupleTableSlot *slot = NULL; uint64 rowsCopied = 0; - distributedRelation = heap_open(relationId, ExclusiveLock); + /* take an ExclusiveLock to block all operations except SELECT */ + distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + + /* get the table columns */ tupleDescriptor = RelationGetDescr(distributedRelation); slot = MakeSingleTupleTableSlot(tupleDescriptor); + columnNameList = TupleDescColumnNameList(tupleDescriptor); + + /* initialise per-tuple memory context */ + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = slot; + + copyDest = + (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, + columnNameList, estate, + stopOnFailure); + + /* initialise state for writing to shards, we'll open connections on demand */ + copyDest->rStartup(copyDest, 0, tupleDescriptor); + + /* begin reading from local table */ + scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + /* materialize tuple and send it to a shard */ + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + copyDest->receiveSlot(slot, copyDest); + + /* clear tuple memory */ + ResetPerTupleExprContext(estate); + + /* make sure we roll back on cancellation */ + CHECK_FOR_INTERRUPTS(); + + if (rowsCopied == 0) + { + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + } + + if (rowsCopied % 1000000 != 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + + MemoryContextSwitchTo(oldContext); + + /* finish reading from the local table */ + heap_endscan(scan); + + /* finish writing into the shards */ + copyDest->rShutdown(copyDest); + + /* free memory and close the relation */ + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); +} + + +/* + * TupleDescColumnNameList returns a list of column names for the given tuple + * descriptor as plain strings. + */ +static List * +TupleDescColumnNameList(TupleDesc tupleDescriptor) +{ + List *columnNameList = NIL; + int columnIndex = 0; for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) { @@ -1018,52 +1203,5 @@ CopyLocalData(Oid relationId) columnNameList = lappend(columnNameList, columnName); } - estate = CreateExecutorState(); - econtext = GetPerTupleExprContext(estate); - econtext->ecxt_scantuple = slot; - - copyDest = - (DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList, - estate, stopOnFailure); - - copyDest->rStartup(copyDest, 0, tupleDescriptor); - - scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); - - oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - - copyDest->receiveSlot(slot, copyDest); - - CHECK_FOR_INTERRUPTS(); - - ResetPerTupleExprContext(estate); - - if (rowsCopied == 0) - { - ereport(NOTICE, (errmsg("Copying data from local table..."))); - } - - rowsCopied++; - - if (rowsCopied % 1000000 == 0) - { - ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); - } - } - - if (rowsCopied % 1000000 != 0) - { - ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); - } - - MemoryContextSwitchTo(oldContext); - heap_endscan(scan); - copyDest->rShutdown(copyDest); - ExecDropSingleTupleTableSlot(slot); - FreeExecutorState(estate); - heap_close(distributedRelation, NoLock); + return columnNameList; } diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 988228c0d..8529d01d4 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -355,26 +355,134 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; -- Test initial data loading -CREATE TABLE data_load_test (col1 int, col2 text); +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); INSERT INTO data_load_test VALUES (132, 'hello'); INSERT INTO data_load_test VALUES (243, 'world'); --- create_distributed_table copies data into the distributed table +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +SELECT create_distributed_table('data_load_test', 'col1', 'range'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- create_distributed_table creates shards and copies data into the distributed table SELECT create_distributed_table('data_load_test', 'col1'); NOTICE: Copying data from local table... -NOTICE: Copied 2 rows create_distributed_table -------------------------- (1 row) -SELECT * FROM data_load_test; - col1 | col2 -------+------- - 132 | hello - 243 | world +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 (2 rows) DROP TABLE data_load_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + ?column? +------------- + hello world +(1 row) + +DROP TABLE data_load_test1, data_load_test2; +END; +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +DROP TABLE data_load_test; +END; +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +END; -- Test data loading after dropping a column CREATE TABLE data_load_test (col1 int, col2 text, col3 text); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); @@ -382,7 +490,6 @@ INSERT INTO data_load_test VALUES (243, 'world', 'hello'); ALTER TABLE data_load_test DROP COLUMN col2; SELECT create_distributed_table('data_load_test', 'col1'); NOTICE: Copying data from local table... -NOTICE: Copied 2 rows create_distributed_table -------------------------- diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index a7da8812c..6544b9f40 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -6,7 +6,6 @@ INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); -- create the reference table SELECT create_reference_table('reference_table_test'); NOTICE: Copying data from local table... -NOTICE: Copied 1 rows create_reference_table ------------------------ diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 07b063fcf..da20b263e 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -191,15 +191,75 @@ DROP TABLE repmodel_test; RESET citus.replication_model; -- Test initial data loading -CREATE TABLE data_load_test (col1 int, col2 text); +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); INSERT INTO data_load_test VALUES (132, 'hello'); INSERT INTO data_load_test VALUES (243, 'world'); --- create_distributed_table copies data into the distributed table +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +SELECT create_distributed_table('data_load_test', 'col1', 'range'); + +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); + +-- create_distributed_table creates shards and copies data into the distributed table SELECT create_distributed_table('data_load_test', 'col1'); -SELECT * FROM data_load_test; +SELECT * FROM data_load_test ORDER BY col1; DROP TABLE data_load_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + +DROP TABLE data_load_test1, data_load_test2; +END; + +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; + +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +DROP TABLE data_load_test; +END; + +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +END; + -- Test data loading after dropping a column CREATE TABLE data_load_test (col1 int, col2 text, col3 text); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); From 047825c6ca9f6368f8c7bc8d39d4361880189716 Mon Sep 17 00:00:00 2001 From: Jason Petersen Date: Tue, 28 Feb 2017 22:48:00 -0700 Subject: [PATCH 6/6] Rename misleading allowEmpty parameter Last bit of PR feedback. --- .../commands/create_distributed_table.c | 43 ++++++++++--------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 5968728ce..4198837b2 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -76,7 +76,7 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; static void CreateReferenceTable(Oid distributedRelationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId, bool allowEmpty); + uint32 colocationId, bool requireEmpty); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); @@ -116,7 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); - bool allowEmpty = false; + bool requireEmpty = true; EnsureCoordinator(); @@ -132,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID, allowEmpty); + INVALID_COLOCATION_ID, requireEmpty); PG_RETURN_VOID(); } @@ -188,7 +188,7 @@ create_distributed_table(PG_FUNCTION_ARGS) /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { - bool allowEmpty = false; + bool requireEmpty = true; if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { @@ -199,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(relationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID, allowEmpty); + INVALID_COLOCATION_ID, requireEmpty); PG_RETURN_VOID(); } @@ -245,7 +245,7 @@ CreateReferenceTable(Oid relationId) List *workerNodeList = WorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; - bool canLoadData = false; + bool requireEmpty = true; char relationKind = 0; EnsureCoordinator(); @@ -260,11 +260,11 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } - /* we only support data loading for regular (non-foreign) relations */ + /* relax empty table requirement for regular (non-foreign) tables */ relationKind = get_rel_relkind(relationId); if (relationKind == RELKIND_RELATION) { - canLoadData = true; + requireEmpty = false; } colocationId = CreateReferenceTableColocationId(); @@ -272,15 +272,15 @@ CreateReferenceTable(Oid relationId) /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId, - canLoadData); + requireEmpty); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); CreateTableMetadataOnWorkers(relationId); - /* copy over data from regular relations */ - if (canLoadData) + /* copy over data for regular relations */ + if (relationKind == RELKIND_RELATION) { CopyLocalDataIntoShards(relationId); } @@ -290,7 +290,8 @@ CreateReferenceTable(Oid relationId) /* * ConvertToDistributedTable converts the given regular PostgreSQL table into a * distributed table. First, it checks if the given table can be distributed, - * then it creates related tuple in pg_dist_partition. + * then it creates related tuple in pg_dist_partition. If requireEmpty is true, + * this function errors out when presented with a relation containing rows. * * XXX: We should perform more checks here to see if this table is fit for * partitioning. At a minimum, we should validate the following: (i) this node @@ -301,7 +302,7 @@ CreateReferenceTable(Oid relationId) static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId, bool allowEmpty) + uint32 colocationId, bool requireEmpty) { Relation relation = NULL; TupleDesc relationDesc = NULL; @@ -351,8 +352,8 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } - /* check that the relation does not contain any rows */ - if (!allowEmpty && !LocalTableEmpty(relationId)) + /* check that table is empty if that is required */ + if (requireEmpty && !LocalTableEmpty(relationId)) { ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), errmsg("cannot distribute relation \"%s\"", @@ -944,7 +945,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; - bool canLoadData = false; + bool requireEmpty = true; char relationKind = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ @@ -988,16 +989,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, colocationId = TableColocationId(sourceRelationId); } - /* we only support data loading for regular (non-foreign) relations */ + /* relax empty table requirement for regular (non-foreign) tables */ relationKind = get_rel_relkind(relationId); if (relationKind == RELKIND_RELATION) { - canLoadData = true; + requireEmpty = false; } /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId, canLoadData); + ReplicationModel, colocationId, requireEmpty); /* create shards */ if (sourceRelationId != InvalidOid) @@ -1014,8 +1015,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } - /* copy over data from regular relations */ - if (canLoadData) + /* copy over data for regular relations */ + if (relationKind == RELKIND_RELATION) { CopyLocalDataIntoShards(relationId); }