From 56d4d375c2265fdab2b1b2400fbef999dc4d90e8 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Tue, 28 Feb 2017 17:24:24 +0100 Subject: [PATCH] Address review feedback in create_distributed_table data loading --- .../commands/create_distributed_table.c | 290 +++++++++++++----- .../regress/expected/multi_create_table.out | 125 +++++++- .../expected/multi_reference_table.out | 1 - src/test/regress/sql/multi_create_table.sql | 66 +++- 4 files changed, 393 insertions(+), 89 deletions(-) diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index 285835340..5968728ce 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -73,13 +73,14 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR; /* local function forward declarations */ -static void CreateReferenceTable(Oid relationId); +static void CreateReferenceTable(Oid distributedRelationId); static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId); + uint32 colocationId, bool allowEmpty); static char LookupDistributionMethod(Oid distributionMethodOid); static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, int16 supportFunctionNumber); +static bool LocalTableEmpty(Oid tableId); static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod, Var *distributionColumn, uint32 colocationId); static void ErrorIfNotSupportedForeignConstraint(Relation relation, @@ -90,7 +91,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN char *colocateWithTableName, int shardCount, int replicationFactor); static Oid ColumnType(Oid relationId, char *columnName); -static void CopyLocalData(Oid relationId); +static void CopyLocalDataIntoShards(Oid relationId); +static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_create_distributed_table); @@ -114,6 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char *distributionColumnName = text_to_cstring(distributionColumnText); char distributionMethod = LookupDistributionMethod(distributionMethodOid); + bool allowEmpty = false; EnsureCoordinator(); @@ -129,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(distributedRelationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, allowEmpty); PG_RETURN_VOID(); } @@ -151,7 +154,6 @@ create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = LookupDistributionMethod(distributionMethodOid); text *colocateWithTableNameText = NULL; char *colocateWithTableName = NULL; - char relationKind = 0; EnsureCoordinator(); @@ -186,6 +188,8 @@ create_distributed_table(PG_FUNCTION_ARGS) /* if distribution method is not hash, just create partition metadata */ if (distributionMethod != DISTRIBUTE_BY_HASH) { + bool allowEmpty = false; + if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) { ereport(NOTICE, (errmsg("using statement-based replication"), @@ -195,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS) ConvertToDistributedTable(relationId, distributionColumnName, distributionMethod, REPLICATION_MODEL_COORDINATOR, - INVALID_COLOCATION_ID); + INVALID_COLOCATION_ID, allowEmpty); PG_RETURN_VOID(); } @@ -204,13 +208,6 @@ create_distributed_table(PG_FUNCTION_ARGS) colocateWithTableName, ShardCount, ShardReplicationFactor); - /* copy over data from regular relations */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) - { - CopyLocalData(relationId); - } - if (ShouldSyncTableMetadata(relationId)) { CreateTableMetadataOnWorkers(relationId); @@ -229,17 +226,9 @@ Datum create_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); - char relationKind = 0; CreateReferenceTable(relationId); - /* copy over data from regular relations */ - relationKind = get_rel_relkind(relationId); - if (relationKind == RELKIND_RELATION) - { - CopyLocalData(relationId); - } - PG_RETURN_VOID(); } @@ -256,6 +245,8 @@ CreateReferenceTable(Oid relationId) List *workerNodeList = WorkerNodeList(); int replicationFactor = list_length(workerNodeList); char *distributionColumnName = NULL; + bool canLoadData = false; + char relationKind = 0; EnsureCoordinator(); @@ -269,16 +260,30 @@ CreateReferenceTable(Oid relationId) errdetail("There are no active worker nodes."))); } + /* we only support data loading for regular (non-foreign) relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + canLoadData = true; + } + colocationId = CreateReferenceTableColocationId(); /* first, convert the relation into distributed relation */ ConvertToDistributedTable(relationId, distributionColumnName, - DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId); + DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId, + canLoadData); /* now, create the single shard replicated to all nodes */ CreateReferenceTableShard(relationId); CreateTableMetadataOnWorkers(relationId); + + /* copy over data from regular relations */ + if (canLoadData) + { + CopyLocalDataIntoShards(relationId); + } } @@ -296,7 +301,7 @@ CreateReferenceTable(Oid relationId) static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, char distributionMethod, char replicationModel, - uint32 colocationId) + uint32 colocationId, bool allowEmpty) { Relation relation = NULL; TupleDesc relationDesc = NULL; @@ -346,6 +351,17 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName, "foreign tables."))); } + /* check that the relation does not contain any rows */ + if (!allowEmpty && !LocalTableEmpty(relationId)) + { + ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION), + errmsg("cannot distribute relation \"%s\"", + relationName), + errdetail("Relation \"%s\" contains data.", + relationName), + errhint("Empty your table before distributing it."))); + } + /* * Distribution column returns NULL for reference tables, * but it is not used below for reference tables. @@ -829,6 +845,62 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, } +/* + * LocalTableEmpty function checks whether given local table contains any row and + * returns false if there is any data. This function is only for local tables and + * should not be called for distributed tables. + */ +static bool +LocalTableEmpty(Oid tableId) +{ + Oid schemaId = get_rel_namespace(tableId); + char *schemaName = get_namespace_name(schemaId); + char *tableName = get_rel_name(tableId); + char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName); + + int spiConnectionResult = 0; + int spiQueryResult = 0; + StringInfo selectExistQueryString = makeStringInfo(); + + HeapTuple tuple = NULL; + Datum hasDataDatum = 0; + bool localTableEmpty = false; + bool columnNull = false; + bool readOnly = true; + + int rowId = 0; + int attributeId = 1; + + AssertArg(!IsDistributedTable(tableId)); + + spiConnectionResult = SPI_connect(); + if (spiConnectionResult != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName); + + spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0); + if (spiQueryResult != SPI_OK_SELECT) + { + ereport(ERROR, (errmsg("execution was not successful \"%s\"", + selectExistQueryString->data))); + } + + /* we expect that SELECT EXISTS query will return single value in a single row */ + Assert(SPI_processed == 1); + + tuple = SPI_tuptable->vals[rowId]; + hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull); + localTableEmpty = !DatumGetBool(hasDataDatum); + + SPI_finish(); + + return localTableEmpty; +} + + /* * CreateTruncateTrigger creates a truncate trigger on table identified by relationId * and assigns citus_truncate_trigger() as handler. @@ -872,6 +944,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, uint32 colocationId = INVALID_COLOCATION_ID; Oid sourceRelationId = InvalidOid; Oid distributionColumnType = InvalidOid; + bool canLoadData = false; + char relationKind = 0; /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ distributedRelation = relation_open(relationId, AccessShareLock); @@ -914,9 +988,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, colocationId = TableColocationId(sourceRelationId); } + /* we only support data loading for regular (non-foreign) relations */ + relationKind = get_rel_relkind(relationId); + if (relationKind == RELKIND_RELATION) + { + canLoadData = true; + } + /* create distributed table metadata */ ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, - ReplicationModel, colocationId); + ReplicationModel, colocationId, canLoadData); /* create shards */ if (sourceRelationId != InvalidOid) @@ -933,6 +1014,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName, CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); } + /* copy over data from regular relations */ + if (canLoadData) + { + CopyLocalDataIntoShards(relationId); + } + heap_close(pgDistColocation, NoLock); relation_close(distributedRelation, NoLock); } @@ -981,16 +1068,37 @@ EnsureReplicationSettings(Oid relationId, char replicationModel) /* - * CopyLocalData copies local data into the shards. + * CopyLocalDataIntoShards copies data from the local table, which is hidden + * after converting it to a distributed table, into the shards of the distributed + * table. + * + * This function uses CitusCopyDestReceiver to invoke the distributed COPY logic. + * We cannot use a regular COPY here since that cannot read from a table. Instead + * we read from the table and pass each tuple to the CitusCopyDestReceiver which + * opens a connection and starts a COPY for each shard placement that will have + * data. + * + * We could call the planner and executor here and send the output to the + * DestReceiver, but we are in a tricky spot here since Citus is already + * intercepting queries on this table in the planner and executor hooks and we + * want to read from the local table. To keep it simple, we perform a heap scan + * directly on the table. + * + * Any writes on the table that are started during this operation will be handled + * as distributed queries once the current transaction commits. SELECTs will + * continue to read from the local table until the current transaction commits, + * after which new SELECTs will be handled as distributed queries. + * + * After copying local data into the distributed table, the local data remains + * in place and should be truncated at a later time. */ static void -CopyLocalData(Oid relationId) +CopyLocalDataIntoShards(Oid distributedRelationId) { DestReceiver *copyDest = NULL; List *columnNameList = NIL; Relation distributedRelation = NULL; TupleDesc tupleDescriptor = NULL; - int columnIndex = 0; bool stopOnFailure = true; EState *estate = NULL; @@ -1001,9 +1109,86 @@ CopyLocalData(Oid relationId) TupleTableSlot *slot = NULL; uint64 rowsCopied = 0; - distributedRelation = heap_open(relationId, ExclusiveLock); + /* take an ExclusiveLock to block all operations except SELECT */ + distributedRelation = heap_open(distributedRelationId, ExclusiveLock); + + /* get the table columns */ tupleDescriptor = RelationGetDescr(distributedRelation); slot = MakeSingleTupleTableSlot(tupleDescriptor); + columnNameList = TupleDescColumnNameList(tupleDescriptor); + + /* initialise per-tuple memory context */ + estate = CreateExecutorState(); + econtext = GetPerTupleExprContext(estate); + econtext->ecxt_scantuple = slot; + + copyDest = + (DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId, + columnNameList, estate, + stopOnFailure); + + /* initialise state for writing to shards, we'll open connections on demand */ + copyDest->rStartup(copyDest, 0, tupleDescriptor); + + /* begin reading from local table */ + scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); + + oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); + + while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) + { + /* materialize tuple and send it to a shard */ + ExecStoreTuple(tuple, slot, InvalidBuffer, false); + copyDest->receiveSlot(slot, copyDest); + + /* clear tuple memory */ + ResetPerTupleExprContext(estate); + + /* make sure we roll back on cancellation */ + CHECK_FOR_INTERRUPTS(); + + if (rowsCopied == 0) + { + ereport(NOTICE, (errmsg("Copying data from local table..."))); + } + + rowsCopied++; + + if (rowsCopied % 1000000 == 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + } + + if (rowsCopied % 1000000 != 0) + { + ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied))); + } + + MemoryContextSwitchTo(oldContext); + + /* finish reading from the local table */ + heap_endscan(scan); + + /* finish writing into the shards */ + copyDest->rShutdown(copyDest); + + /* free memory and close the relation */ + ExecDropSingleTupleTableSlot(slot); + FreeExecutorState(estate); + heap_close(distributedRelation, NoLock); +} + + +/* + * TupleDescColumnNameList returns a list of column names for the given tuple + * descriptor as plain strings. + */ +static List * +TupleDescColumnNameList(TupleDesc tupleDescriptor) +{ + List *columnNameList = NIL; + int columnIndex = 0; for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) { @@ -1018,52 +1203,5 @@ CopyLocalData(Oid relationId) columnNameList = lappend(columnNameList, columnName); } - estate = CreateExecutorState(); - econtext = GetPerTupleExprContext(estate); - econtext->ecxt_scantuple = slot; - - copyDest = - (DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList, - estate, stopOnFailure); - - copyDest->rStartup(copyDest, 0, tupleDescriptor); - - scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL); - - oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate)); - - while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL) - { - ExecStoreTuple(tuple, slot, InvalidBuffer, false); - - copyDest->receiveSlot(slot, copyDest); - - CHECK_FOR_INTERRUPTS(); - - ResetPerTupleExprContext(estate); - - if (rowsCopied == 0) - { - ereport(NOTICE, (errmsg("Copying data from local table..."))); - } - - rowsCopied++; - - if (rowsCopied % 1000000 == 0) - { - ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); - } - } - - if (rowsCopied % 1000000 != 0) - { - ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied))); - } - - MemoryContextSwitchTo(oldContext); - heap_endscan(scan); - copyDest->rShutdown(copyDest); - ExecDropSingleTupleTableSlot(slot); - FreeExecutorState(estate); - heap_close(distributedRelation, NoLock); + return columnNameList; } diff --git a/src/test/regress/expected/multi_create_table.out b/src/test/regress/expected/multi_create_table.out index 988228c0d..8529d01d4 100644 --- a/src/test/regress/expected/multi_create_table.out +++ b/src/test/regress/expected/multi_create_table.out @@ -355,26 +355,134 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl DROP TABLE repmodel_test; RESET citus.replication_model; -- Test initial data loading -CREATE TABLE data_load_test (col1 int, col2 text); +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); INSERT INTO data_load_test VALUES (132, 'hello'); INSERT INTO data_load_test VALUES (243, 'world'); --- create_distributed_table copies data into the distributed table +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +SELECT create_distributed_table('data_load_test', 'col1', 'range'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); +ERROR: cannot distribute relation "data_load_test" +DETAIL: Relation "data_load_test" contains data. +HINT: Empty your table before distributing it. +-- create_distributed_table creates shards and copies data into the distributed table SELECT create_distributed_table('data_load_test', 'col1'); NOTICE: Copying data from local table... -NOTICE: Copied 2 rows create_distributed_table -------------------------- (1 row) -SELECT * FROM data_load_test; - col1 | col2 -------+------- - 132 | hello - 243 | world +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 (2 rows) DROP TABLE data_load_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; + col1 | col2 | col3 +------+-------+------ + 132 | hello | 1 + 243 | world | 2 +(2 rows) + +DROP TABLE data_load_test; +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + ?column? +------------- + hello world +(1 row) + +DROP TABLE data_load_test1, data_load_test2; +END; +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +DROP TABLE data_load_test; +END; +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +NOTICE: Copying data from local table... + create_distributed_table +-------------------------- + +(1 row) + +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications +CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)" +PL/pgSQL function citus_drop_trigger() line 21 at PERFORM +END; -- Test data loading after dropping a column CREATE TABLE data_load_test (col1 int, col2 text, col3 text); INSERT INTO data_load_test VALUES (132, 'hello', 'world'); @@ -382,7 +490,6 @@ INSERT INTO data_load_test VALUES (243, 'world', 'hello'); ALTER TABLE data_load_test DROP COLUMN col2; SELECT create_distributed_table('data_load_test', 'col1'); NOTICE: Copying data from local table... -NOTICE: Copied 2 rows create_distributed_table -------------------------- diff --git a/src/test/regress/expected/multi_reference_table.out b/src/test/regress/expected/multi_reference_table.out index a7da8812c..6544b9f40 100644 --- a/src/test/regress/expected/multi_reference_table.out +++ b/src/test/regress/expected/multi_reference_table.out @@ -6,7 +6,6 @@ INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01'); -- create the reference table SELECT create_reference_table('reference_table_test'); NOTICE: Copying data from local table... -NOTICE: Copied 1 rows create_reference_table ------------------------ diff --git a/src/test/regress/sql/multi_create_table.sql b/src/test/regress/sql/multi_create_table.sql index 07b063fcf..da20b263e 100644 --- a/src/test/regress/sql/multi_create_table.sql +++ b/src/test/regress/sql/multi_create_table.sql @@ -191,15 +191,75 @@ DROP TABLE repmodel_test; RESET citus.replication_model; -- Test initial data loading -CREATE TABLE data_load_test (col1 int, col2 text); +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); INSERT INTO data_load_test VALUES (132, 'hello'); INSERT INTO data_load_test VALUES (243, 'world'); --- create_distributed_table copies data into the distributed table +-- table must be empty when using append- or range-partitioning +SELECT create_distributed_table('data_load_test', 'col1', 'append'); +SELECT create_distributed_table('data_load_test', 'col1', 'range'); + +-- table must be empty when using master_create_distributed_table (no shards created) +SELECT master_create_distributed_table('data_load_test', 'col1', 'hash'); + +-- create_distributed_table creates shards and copies data into the distributed table SELECT create_distributed_table('data_load_test', 'col1'); -SELECT * FROM data_load_test; +SELECT * FROM data_load_test ORDER BY col1; DROP TABLE data_load_test; +-- ensure writes in the same transaction as create_distributed_table are visible +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +END; +SELECT * FROM data_load_test ORDER BY col1; +DROP TABLE data_load_test; + +-- creating co-located distributed tables in the same transaction works +BEGIN; +CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test1 VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test1', 'col1'); + +CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test2 VALUES (132, 'world'); +SELECT create_distributed_table('data_load_test2', 'col1'); + +SELECT a.col2 ||' '|| b.col2 +FROM data_load_test1 a JOIN data_load_test2 b USING (col1) +WHERE col1 = 132; + +DROP TABLE data_load_test1, data_load_test2; +END; + +-- creating an index after loading data works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +CREATE INDEX data_load_test_idx ON data_load_test (col2); +END; +DROP TABLE data_load_test; + +-- popping in and out of existence in the same transaction works +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +DROP TABLE data_load_test; +END; + +-- but dropping after a write on the distributed table is currently disallowed +BEGIN; +CREATE TABLE data_load_test (col1 int, col2 text, col3 serial); +INSERT INTO data_load_test VALUES (132, 'hello'); +SELECT create_distributed_table('data_load_test', 'col1'); +INSERT INTO data_load_test VALUES (243, 'world'); +DROP TABLE data_load_test; +END; + -- Test data loading after dropping a column CREATE TABLE data_load_test (col1 int, col2 text, col3 text); INSERT INTO data_load_test VALUES (132, 'hello', 'world');