diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 43dd3a7ee..285835340 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -9,6 +9,7 @@ */ #include "postgres.h" +#include "miscadmin.h" #include "access/genam.h" #include "access/hash.h" @@ -39,11 +40,14 @@ #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" +#include "distributed/multi_copy.h" #include "distributed/multi_logical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_partition.h" #include "distributed/reference_table_utils.h" +#include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" +#include "executor/executor.h" #include "executor/spi.h" #include "nodes/execnodes.h" #include "nodes/nodeFuncs.h" @@ -52,10 +56,14 @@ #include "parser/parse_node.h" #include "parser/parse_relation.h" #include "parser/parser.h" +#include "tcop/pquery.h" +#include "tcop/tcopprot.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" +#include "utils/memutils.h" #include "utils/rel.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" #include "utils/inval.h" @@ -72,7 +80,6 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnNa static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); -static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void ErrorIfNotSupportedForeignConstraint(Relation relation, @@ -83,6 +90,7 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); +static void CopyLocalData(Oid relationId); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -143,6 +151,7 @@ create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; + char relationKind = 0; EnsureCoordinator(); @@ -195,6 +204,13 @@ create_distributed_table(PG_FUNCTION_ARGS) colocateWithTableName, ShardCount, ShardReplicationFactor); + /* copy over data from regular relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + CopyLocalData(relationId); + } + if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); @@ -213,9 +229,17 @@ Datum create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); + char relationKind = 0; CreateReferenceTable(relationId); + /* copy over data from regular relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + CopyLocalData(relationId); + } + PG_RETURN_VOID(); } @@ -322,17 +346,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } - /* check that the relation does not contain any rows */ - if (!LocalTableEmpty(relationId)) - { - ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), - errmsg("cannot distribute relation \"%s\"", - relationName), - errdetail("Relation \"%s\" contains data.", - relationName), - errhint("Empty your table before distributing it."))); - } - /* * Distribution column returns NULL for reference tables, * but it is not used below for reference tables. @@ -816,62 +829,6 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, } -/* - * LocalTableEmpty function checks whether given local table contains any row and - * returns false if there is any data. This function is only for local tables and - * should not be called for distributed tables. - */ -static bool -LocalTableEmpty(Oid tableId) -{ - Oid schemaId = get_rel_namespace(tableId); - char *schemaName = get_namespace_name(schemaId); - char *tableName = get_rel_name(tableId); - char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); - - int spiConnectionResult = 0; - int spiQueryResult = 0; - StringInfo selectExistQueryString = makeStringInfo(); - - HeapTuple tuple = NULL; - Datum hasDataDatum = 0; - bool localTableEmpty = false; - bool columnNull = false; - bool readOnly = true; - - int rowId = 0; - int attributeId = 1; - - AssertArg(!IsDistributedTable(tableId)); - - spiConnectionResult = SPI_connect(); - if (spiConnectionResult != SPI_OK_CONNECT) - { - ereport(ERROR, (errmsg("could not connect to SPI manager"))); - } - - appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName); - - spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0); - if (spiQueryResult != SPI_OK_SELECT) - { - ereport(ERROR, (errmsg("execution was not successful \"%s\"", - selectExistQueryString->data))); - } - - /* we expect that SELECT EXISTS query will return single value in a single row */ - Assert(SPI_processed == 1); - - tuple = SPI_tuptable->vals[rowId]; - hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull); - localTableEmpty = !DatumGetBool(hasDataDatum); - - SPI_finish(); - - return localTableEmpty; -} - - /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. @@ -1021,3 +978,92 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) "factor\" to one%s.", extraHint))); } } + + +/* + * CopyLocalData copies local data into the shards. + */ +static void +CopyLocalData(Oid relationId) +{ + DestReceiver *copyDest = NULL; + List *columnNameList = NIL; + Relation distributedRelation = NULL; + TupleDesc tupleDescriptor = NULL; + int columnIndex = 0; + bool stopOnFailure = true; + + EState *estate = NULL; + HeapScanDesc scan = NULL; + HeapTuple tuple = NULL; + ExprContext *econtext = NULL; + MemoryContext oldContext = NULL; + TupleTableSlot *slot = NULL; + uint64 rowsCopied = 0; + + distributedRelation = heap_open(relationId, ExclusiveLock); + tupleDescriptor = RelationGetDescr(distributedRelation); + slot = MakeSingleTupleTableSlot(tupleDescriptor); + + for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) + { + Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex]; + char *columnName = NameStr(currentColumn->attname); + + if (currentColumn->attisdropped) + { + continue; + } + + columnNameList = lappend(columnNameList, columnName); + } + + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = slot; + + copyDest = + (DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList, + estate, stopOnFailure); + + copyDest->rStartup(copyDest, 0, tupleDescriptor); + + scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + + copyDest->receiveSlot(slot, copyDest); + + CHECK_FOR_INTERRUPTS(); + + ResetPerTupleExprContext(estate); + + if (rowsCopied == 0) + { + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); + } + } + + if (rowsCopied % 1000000 != 0) + { + ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); + } + + MemoryContextSwitchTo(oldContext); + heap_endscan(scan); + copyDest->rShutdown(copyDest); + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); +} diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d9cf8eade..dfdd7c1da 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1819,7 +1819,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, if (partitionMethod == DISTRIBUTE_BY_NONE) { /* we don't support copy to reference tables from workers */ - EnsureSchemaNode(); + EnsureCoordinator(); } else { diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index b23c2d26b..988228c0d 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -75,12 +75,6 @@ CREATE TABLE nation ( n_name char(25) not null, n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); -ERROR: cannot distribute relation "nation" -DETAIL: Relation "nation" contains data. -HINT: Empty your table before distributing it. -TRUNCATE nation; SELECT create_reference_table('nation'); create_reference_table ------------------------ @@ -360,6 +354,48 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); +-- create_distributed_table copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... +NOTICE: Copied 2 rows + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test; + col1 | col2 +------+------- + 132 | hello + 243 | world +(2 rows) + +DROP TABLE data_load_test; +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... +NOTICE: Copied 2 rows + create_distributed_table +-------------------------- + +(1 row) + +SELECT * FROM data_load_test; + col1 | col3 +------+------- + 132 | world + 243 | hello +(2 rows) + +DROP TABLE data_load_test; SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; CREATE TABLE lineitem_hash_part (like lineitem); diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index 7ff726ce1..a7da8812c 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -2,15 +2,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000; ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); -ERROR: cannot distribute relation "reference_table_test" -DETAIL: Relation "reference_table_test" contains data. -HINT: Empty your table before distributing it. -TRUNCATE reference_table_test; --- now should be able to create the reference table +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); +-- create the reference table SELECT create_reference_table('reference_table_test'); +NOTICE: Copying data from local table... +NOTICE: Copied 1 rows create_reference_table ------------------------ @@ -52,8 +48,14 @@ WHERE 1250000 | 1 | localhost | 57638 (2 rows) +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + value_1 | value_2 | value_3 | value_4 +---------+---------+---------+-------------------------- + 1 | 1 | 1 | Thu Dec 01 00:00:00 2016 +(1 row) + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04'); diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 2aa46b9a4..07b063fcf 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -60,18 +60,6 @@ CREATE TABLE nation ( n_regionkey integer not null, n_comment varchar(152)); -\COPY nation FROM STDIN WITH CSV -1,'name',1,'comment_1' -2,'name',2,'comment_2' -3,'name',3,'comment_3' -4,'name',4,'comment_4' -5,'name',5,'comment_5' -\. - -SELECT master_create_distributed_table('nation', 'n_nationkey', 'append'); - -TRUNCATE nation; - SELECT create_reference_table('nation'); CREATE TABLE part ( @@ -201,6 +189,26 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; + +-- Test initial data loading +CREATE TABLE data_load_test (col1 int, col2 text); +INSERT INTO data_load_test VALUES (132, 'hello'); +INSERT INTO data_load_test VALUES (243, 'world'); + +-- create_distributed_table copies data into the distributed table +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test; +DROP TABLE data_load_test; + +-- Test data loading after dropping a column +CREATE TABLE data_load_test (col1 int, col2 text, col3 text); +INSERT INTO data_load_test VALUES (132, 'hello', 'world'); +INSERT INTO data_load_test VALUES (243, 'world', 'hello'); +ALTER TABLE data_load_test DROP COLUMN col2; +SELECT create_distributed_table('data_load_test', 'col1'); +SELECT * FROM data_load_test; +DROP TABLE data_load_test; + SET citus.shard_replication_factor TO default; SET citus.shard_count to 4; diff --git a/src/test/regress/sql/multi_reference_table.sql b/src/test/regress/sql/multi_reference_table.sql index ddae075a7..96af1c710 100644 --- a/src/test/regress/sql/multi_reference_table.sql +++ b/src/test/regress/sql/multi_reference_table.sql @@ -4,14 +4,9 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000; CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp); -- insert some data, and make sure that cannot be create_distributed_table -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05'); +INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); --- should error out given that there exists data -SELECT create_reference_table('reference_table_test'); - -TRUNCATE reference_table_test; - --- now should be able to create the reference table +-- create the reference table SELECT create_reference_table('reference_table_test'); -- see that partkey is NULL @@ -36,8 +31,10 @@ FROM WHERE shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass); +-- check whether data was copied into distributed table +SELECT * FROM reference_table_test; + -- now, execute some modification queries -INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02'); INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03'); INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');