Address review feedback in create_distributed_table data loading

pull/1117/head
Marco Slot 2017-02-28 17:24:24 +01:00
parent db98c28354
commit 56d4d375c2
4 changed files with 393 additions and 89 deletions

View File

@ -73,13 +73,14 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
/* local function forward declarations */
static void CreateReferenceTable(Oid relationId);
static void CreateReferenceTable(Oid distributedRelationId);
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, char replicationModel,
uint32 colocationId);
uint32 colocationId, bool allowEmpty);
static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
static void ErrorIfNotSupportedForeignConstraint(Relation relation,
@ -90,7 +91,8 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
static void CopyLocalData(Oid relationId);
static void CopyLocalDataIntoShards(Oid relationId);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -114,6 +116,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
bool allowEmpty = false;
EnsureCoordinator();
@ -129,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, REPLICATION_MODEL_COORDINATOR,
INVALID_COLOCATION_ID);
INVALID_COLOCATION_ID, allowEmpty);
PG_RETURN_VOID();
}
@ -151,7 +154,6 @@ create_distributed_table(PG_FUNCTION_ARGS)
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
text *colocateWithTableNameText = NULL;
char *colocateWithTableName = NULL;
char relationKind = 0;
EnsureCoordinator();
@ -186,6 +188,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
/* if distribution method is not hash, just create partition metadata */
if (distributionMethod != DISTRIBUTE_BY_HASH)
{
bool allowEmpty = false;
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
{
ereport(NOTICE, (errmsg("using statement-based replication"),
@ -195,7 +199,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(relationId, distributionColumnName,
distributionMethod, REPLICATION_MODEL_COORDINATOR,
INVALID_COLOCATION_ID);
INVALID_COLOCATION_ID, allowEmpty);
PG_RETURN_VOID();
}
@ -204,13 +208,6 @@ create_distributed_table(PG_FUNCTION_ARGS)
colocateWithTableName, ShardCount,
ShardReplicationFactor);
/* copy over data from regular relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
CopyLocalData(relationId);
}
if (ShouldSyncTableMetadata(relationId))
{
CreateTableMetadataOnWorkers(relationId);
@ -229,17 +226,9 @@ Datum
create_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
char relationKind = 0;
CreateReferenceTable(relationId);
/* copy over data from regular relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
CopyLocalData(relationId);
}
PG_RETURN_VOID();
}
@ -256,6 +245,8 @@ CreateReferenceTable(Oid relationId)
List *workerNodeList = WorkerNodeList();
int replicationFactor = list_length(workerNodeList);
char *distributionColumnName = NULL;
bool canLoadData = false;
char relationKind = 0;
EnsureCoordinator();
@ -269,16 +260,30 @@ CreateReferenceTable(Oid relationId)
errdetail("There are no active worker nodes.")));
}
/* we only support data loading for regular (non-foreign) relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
canLoadData = true;
}
colocationId = CreateReferenceTableColocationId();
/* first, convert the relation into distributed relation */
ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId);
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId,
canLoadData);
/* now, create the single shard replicated to all nodes */
CreateReferenceTableShard(relationId);
CreateTableMetadataOnWorkers(relationId);
/* copy over data from regular relations */
if (canLoadData)
{
CopyLocalDataIntoShards(relationId);
}
}
@ -296,7 +301,7 @@ CreateReferenceTable(Oid relationId)
static void
ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, char replicationModel,
uint32 colocationId)
uint32 colocationId, bool allowEmpty)
{
Relation relation = NULL;
TupleDesc relationDesc = NULL;
@ -346,6 +351,17 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
"foreign tables.")));
}
/* check that the relation does not contain any rows */
if (!allowEmpty && !LocalTableEmpty(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"",
relationName),
errdetail("Relation \"%s\" contains data.",
relationName),
errhint("Empty your table before distributing it.")));
}
/*
* Distribution column returns NULL for reference tables,
* but it is not used below for reference tables.
@ -829,6 +845,62 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
}
/*
* LocalTableEmpty function checks whether given local table contains any row and
* returns false if there is any data. This function is only for local tables and
* should not be called for distributed tables.
*/
static bool
LocalTableEmpty(Oid tableId)
{
Oid schemaId = get_rel_namespace(tableId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(tableId);
char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
int spiConnectionResult = 0;
int spiQueryResult = 0;
StringInfo selectExistQueryString = makeStringInfo();
HeapTuple tuple = NULL;
Datum hasDataDatum = 0;
bool localTableEmpty = false;
bool columnNull = false;
bool readOnly = true;
int rowId = 0;
int attributeId = 1;
AssertArg(!IsDistributedTable(tableId));
spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName);
spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0);
if (spiQueryResult != SPI_OK_SELECT)
{
ereport(ERROR, (errmsg("execution was not successful \"%s\"",
selectExistQueryString->data)));
}
/* we expect that SELECT EXISTS query will return single value in a single row */
Assert(SPI_processed == 1);
tuple = SPI_tuptable->vals[rowId];
hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull);
localTableEmpty = !DatumGetBool(hasDataDatum);
SPI_finish();
return localTableEmpty;
}
/*
* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
* and assigns citus_truncate_trigger() as handler.
@ -872,6 +944,8 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
uint32 colocationId = INVALID_COLOCATION_ID;
Oid sourceRelationId = InvalidOid;
Oid distributionColumnType = InvalidOid;
bool canLoadData = false;
char relationKind = 0;
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
distributedRelation = relation_open(relationId, AccessShareLock);
@ -914,9 +988,16 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
colocationId = TableColocationId(sourceRelationId);
}
/* we only support data loading for regular (non-foreign) relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
canLoadData = true;
}
/* create distributed table metadata */
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
ReplicationModel, colocationId);
ReplicationModel, colocationId, canLoadData);
/* create shards */
if (sourceRelationId != InvalidOid)
@ -933,6 +1014,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor);
}
/* copy over data from regular relations */
if (canLoadData)
{
CopyLocalDataIntoShards(relationId);
}
heap_close(pgDistColocation, NoLock);
relation_close(distributedRelation, NoLock);
}
@ -981,16 +1068,37 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
/*
* CopyLocalData copies local data into the shards.
* CopyLocalDataIntoShards copies data from the local table, which is hidden
* after converting it to a distributed table, into the shards of the distributed
* table.
*
* This function uses CitusCopyDestReceiver to invoke the distributed COPY logic.
* We cannot use a regular COPY here since that cannot read from a table. Instead
* we read from the table and pass each tuple to the CitusCopyDestReceiver which
* opens a connection and starts a COPY for each shard placement that will have
* data.
*
* We could call the planner and executor here and send the output to the
* DestReceiver, but we are in a tricky spot here since Citus is already
* intercepting queries on this table in the planner and executor hooks and we
* want to read from the local table. To keep it simple, we perform a heap scan
* directly on the table.
*
* Any writes on the table that are started during this operation will be handled
* as distributed queries once the current transaction commits. SELECTs will
* continue to read from the local table until the current transaction commits,
* after which new SELECTs will be handled as distributed queries.
*
* After copying local data into the distributed table, the local data remains
* in place and should be truncated at a later time.
*/
static void
CopyLocalData(Oid relationId)
CopyLocalDataIntoShards(Oid distributedRelationId)
{
DestReceiver *copyDest = NULL;
List *columnNameList = NIL;
Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL;
int columnIndex = 0;
bool stopOnFailure = true;
EState *estate = NULL;
@ -1001,9 +1109,86 @@ CopyLocalData(Oid relationId)
TupleTableSlot *slot = NULL;
uint64 rowsCopied = 0;
distributedRelation = heap_open(relationId, ExclusiveLock);
/* take an ExclusiveLock to block all operations except SELECT */
distributedRelation = heap_open(distributedRelationId, ExclusiveLock);
/* get the table columns */
tupleDescriptor = RelationGetDescr(distributedRelation);
slot = MakeSingleTupleTableSlot(tupleDescriptor);
columnNameList = TupleDescColumnNameList(tupleDescriptor);
/* initialise per-tuple memory context */
estate = CreateExecutorState();
econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
columnNameList, estate,
stopOnFailure);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);
/* begin reading from local table */
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
/* materialize tuple and send it to a shard */
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
copyDest->receiveSlot(slot, copyDest);
/* clear tuple memory */
ResetPerTupleExprContext(estate);
/* make sure we roll back on cancellation */
CHECK_FOR_INTERRUPTS();
if (rowsCopied == 0)
{
ereport(NOTICE, (errmsg("Copying data from local table...")));
}
rowsCopied++;
if (rowsCopied % 1000000 == 0)
{
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
}
}
if (rowsCopied % 1000000 != 0)
{
ereport(DEBUG1, (errmsg("Copied %ld rows", rowsCopied)));
}
MemoryContextSwitchTo(oldContext);
/* finish reading from the local table */
heap_endscan(scan);
/* finish writing into the shards */
copyDest->rShutdown(copyDest);
/* free memory and close the relation */
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
heap_close(distributedRelation, NoLock);
}
/*
* TupleDescColumnNameList returns a list of column names for the given tuple
* descriptor as plain strings.
*/
static List *
TupleDescColumnNameList(TupleDesc tupleDescriptor)
{
List *columnNameList = NIL;
int columnIndex = 0;
for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
{
@ -1018,52 +1203,5 @@ CopyLocalData(Oid relationId)
columnNameList = lappend(columnNameList, columnName);
}
estate = CreateExecutorState();
econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList,
estate, stopOnFailure);
copyDest->rStartup(copyDest, 0, tupleDescriptor);
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
copyDest->receiveSlot(slot, copyDest);
CHECK_FOR_INTERRUPTS();
ResetPerTupleExprContext(estate);
if (rowsCopied == 0)
{
ereport(NOTICE, (errmsg("Copying data from local table...")));
}
rowsCopied++;
if (rowsCopied % 1000000 == 0)
{
ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied)));
}
}
if (rowsCopied % 1000000 != 0)
{
ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied)));
}
MemoryContextSwitchTo(oldContext);
heap_endscan(scan);
copyDest->rShutdown(copyDest);
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
heap_close(distributedRelation, NoLock);
return columnNameList;
}

View File

@ -355,26 +355,134 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text);
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
INSERT INTO data_load_test VALUES (243, 'world');
-- create_distributed_table copies data into the distributed table
-- table must be empty when using append- or range-partitioning
SELECT create_distributed_table('data_load_test', 'col1', 'append');
ERROR: cannot distribute relation "data_load_test"
DETAIL: Relation "data_load_test" contains data.
HINT: Empty your table before distributing it.
SELECT create_distributed_table('data_load_test', 'col1', 'range');
ERROR: cannot distribute relation "data_load_test"
DETAIL: Relation "data_load_test" contains data.
HINT: Empty your table before distributing it.
-- table must be empty when using master_create_distributed_table (no shards created)
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
ERROR: cannot distribute relation "data_load_test"
DETAIL: Relation "data_load_test" contains data.
HINT: Empty your table before distributing it.
-- create_distributed_table creates shards and copies data into the distributed table
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
NOTICE: Copied 2 rows
create_distributed_table
--------------------------
(1 row)
SELECT * FROM data_load_test;
col1 | col2
------+-------
132 | hello
243 | world
SELECT * FROM data_load_test ORDER BY col1;
col1 | col2 | col3
------+-------+------
132 | hello | 1
243 | world | 2
(2 rows)
DROP TABLE data_load_test;
-- ensure writes in the same transaction as create_distributed_table are visible
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
INSERT INTO data_load_test VALUES (243, 'world');
END;
SELECT * FROM data_load_test ORDER BY col1;
col1 | col2 | col3
------+-------+------
132 | hello | 1
243 | world | 2
(2 rows)
DROP TABLE data_load_test;
-- creating co-located distributed tables in the same transaction works
BEGIN;
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test1 VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test1', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test2 VALUES (132, 'world');
SELECT create_distributed_table('data_load_test2', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
SELECT a.col2 ||' '|| b.col2
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
WHERE col1 = 132;
?column?
-------------
hello world
(1 row)
DROP TABLE data_load_test1, data_load_test2;
END;
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
CREATE INDEX data_load_test_idx ON data_load_test (col2);
END;
DROP TABLE data_load_test;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
ERROR: shard drop operations must not appear in transaction blocks containing other distributed modifications
CONTEXT: SQL statement "SELECT master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name)"
PL/pgSQL function citus_drop_trigger() line 21 at PERFORM
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
@ -382,7 +490,6 @@ INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2;
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
NOTICE: Copied 2 rows
create_distributed_table
--------------------------

View File

@ -6,7 +6,6 @@ INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
-- create the reference table
SELECT create_reference_table('reference_table_test');
NOTICE: Copying data from local table...
NOTICE: Copied 1 rows
create_reference_table
------------------------

View File

@ -191,15 +191,75 @@ DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text);
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
INSERT INTO data_load_test VALUES (243, 'world');
-- create_distributed_table copies data into the distributed table
-- table must be empty when using append- or range-partitioning
SELECT create_distributed_table('data_load_test', 'col1', 'append');
SELECT create_distributed_table('data_load_test', 'col1', 'range');
-- table must be empty when using master_create_distributed_table (no shards created)
SELECT master_create_distributed_table('data_load_test', 'col1', 'hash');
-- create_distributed_table creates shards and copies data into the distributed table
SELECT create_distributed_table('data_load_test', 'col1');
SELECT * FROM data_load_test;
SELECT * FROM data_load_test ORDER BY col1;
DROP TABLE data_load_test;
-- ensure writes in the same transaction as create_distributed_table are visible
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
INSERT INTO data_load_test VALUES (243, 'world');
END;
SELECT * FROM data_load_test ORDER BY col1;
DROP TABLE data_load_test;
-- creating co-located distributed tables in the same transaction works
BEGIN;
CREATE TABLE data_load_test1 (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test1 VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test1', 'col1');
CREATE TABLE data_load_test2 (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test2 VALUES (132, 'world');
SELECT create_distributed_table('data_load_test2', 'col1');
SELECT a.col2 ||' '|| b.col2
FROM data_load_test1 a JOIN data_load_test2 b USING (col1)
WHERE col1 = 132;
DROP TABLE data_load_test1, data_load_test2;
END;
-- creating an index after loading data works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
CREATE INDEX data_load_test_idx ON data_load_test (col2);
END;
DROP TABLE data_load_test;
-- popping in and out of existence in the same transaction works
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
DROP TABLE data_load_test;
END;
-- but dropping after a write on the distributed table is currently disallowed
BEGIN;
CREATE TABLE data_load_test (col1 int, col2 text, col3 serial);
INSERT INTO data_load_test VALUES (132, 'hello');
SELECT create_distributed_table('data_load_test', 'col1');
INSERT INTO data_load_test VALUES (243, 'world');
DROP TABLE data_load_test;
END;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');