Load data into distributed table on creation

pull/1117/head
Marco Slot 2017-01-10 07:51:40 +01:00
parent bf3541cb24
commit d11eca7d4a
6 changed files with 193 additions and 104 deletions

View File

@ -9,6 +9,7 @@
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/hash.h"
@ -39,11 +40,14 @@
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h"
#include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h"
#include "executor/executor.h"
#include "executor/spi.h"
#include "nodes/execnodes.h"
#include "nodes/nodeFuncs.h"
@ -52,10 +56,14 @@
#include "parser/parse_node.h"
#include "parser/parse_relation.h"
#include "parser/parser.h"
#include "tcop/pquery.h"
#include "tcop/tcopprot.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
#include "utils/snapmgr.h"
#include "utils/syscache.h"
#include "utils/inval.h"
@ -72,7 +80,6 @@ static void ConvertToDistributedTable(Oid relationId, char *distributionColumnNa
static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber);
static bool LocalTableEmpty(Oid tableId);
static void ErrorIfNotSupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
static void ErrorIfNotSupportedForeignConstraint(Relation relation,
@ -83,6 +90,7 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
char *colocateWithTableName,
int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName);
static void CopyLocalData(Oid relationId);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table);
@ -143,6 +151,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
char distributionMethod = LookupDistributionMethod(distributionMethodOid);
text *colocateWithTableNameText = NULL;
char *colocateWithTableName = NULL;
char relationKind = 0;
EnsureCoordinator();
@ -195,6 +204,13 @@ create_distributed_table(PG_FUNCTION_ARGS)
colocateWithTableName, ShardCount,
ShardReplicationFactor);
/* copy over data from regular relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
CopyLocalData(relationId);
}
if (ShouldSyncTableMetadata(relationId))
{
CreateTableMetadataOnWorkers(relationId);
@ -213,9 +229,17 @@ Datum
create_reference_table(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
char relationKind = 0;
CreateReferenceTable(relationId);
/* copy over data from regular relations */
relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION)
{
CopyLocalData(relationId);
}
PG_RETURN_VOID();
}
@ -322,17 +346,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
"foreign tables.")));
}
/* check that the relation does not contain any rows */
if (!LocalTableEmpty(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"",
relationName),
errdetail("Relation \"%s\" contains data.",
relationName),
errhint("Empty your table before distributing it.")));
}
/*
* Distribution column returns NULL for reference tables,
* but it is not used below for reference tables.
@ -816,62 +829,6 @@ SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
}
/*
* LocalTableEmpty function checks whether given local table contains any row and
* returns false if there is any data. This function is only for local tables and
* should not be called for distributed tables.
*/
static bool
LocalTableEmpty(Oid tableId)
{
Oid schemaId = get_rel_namespace(tableId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(tableId);
char *tableQualifiedName = quote_qualified_identifier(schemaName, tableName);
int spiConnectionResult = 0;
int spiQueryResult = 0;
StringInfo selectExistQueryString = makeStringInfo();
HeapTuple tuple = NULL;
Datum hasDataDatum = 0;
bool localTableEmpty = false;
bool columnNull = false;
bool readOnly = true;
int rowId = 0;
int attributeId = 1;
AssertArg(!IsDistributedTable(tableId));
spiConnectionResult = SPI_connect();
if (spiConnectionResult != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
appendStringInfo(selectExistQueryString, SELECT_EXIST_QUERY, tableQualifiedName);
spiQueryResult = SPI_execute(selectExistQueryString->data, readOnly, 0);
if (spiQueryResult != SPI_OK_SELECT)
{
ereport(ERROR, (errmsg("execution was not successful \"%s\"",
selectExistQueryString->data)));
}
/* we expect that SELECT EXISTS query will return single value in a single row */
Assert(SPI_processed == 1);
tuple = SPI_tuptable->vals[rowId];
hasDataDatum = SPI_getbinval(tuple, SPI_tuptable->tupdesc, attributeId, &columnNull);
localTableEmpty = !DatumGetBool(hasDataDatum);
SPI_finish();
return localTableEmpty;
}
/*
* CreateTruncateTrigger creates a truncate trigger on table identified by relationId
* and assigns citus_truncate_trigger() as handler.
@ -1021,3 +978,92 @@ EnsureReplicationSettings(Oid relationId, char replicationModel)
"factor\" to one%s.", extraHint)));
}
}
/*
* CopyLocalData copies local data into the shards.
*/
static void
CopyLocalData(Oid relationId)
{
DestReceiver *copyDest = NULL;
List *columnNameList = NIL;
Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL;
int columnIndex = 0;
bool stopOnFailure = true;
EState *estate = NULL;
HeapScanDesc scan = NULL;
HeapTuple tuple = NULL;
ExprContext *econtext = NULL;
MemoryContext oldContext = NULL;
TupleTableSlot *slot = NULL;
uint64 rowsCopied = 0;
distributedRelation = heap_open(relationId, ExclusiveLock);
tupleDescriptor = RelationGetDescr(distributedRelation);
slot = MakeSingleTupleTableSlot(tupleDescriptor);
for (columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++)
{
Form_pg_attribute currentColumn = tupleDescriptor->attrs[columnIndex];
char *columnName = NameStr(currentColumn->attname);
if (currentColumn->attisdropped)
{
continue;
}
columnNameList = lappend(columnNameList, columnName);
}
estate = CreateExecutorState();
econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(relationId, columnNameList,
estate, stopOnFailure);
copyDest->rStartup(copyDest, 0, tupleDescriptor);
scan = heap_beginscan(distributedRelation, GetActiveSnapshot(), 0, NULL);
oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
while ((tuple = heap_getnext(scan, ForwardScanDirection)) != NULL)
{
ExecStoreTuple(tuple, slot, InvalidBuffer, false);
copyDest->receiveSlot(slot, copyDest);
CHECK_FOR_INTERRUPTS();
ResetPerTupleExprContext(estate);
if (rowsCopied == 0)
{
ereport(NOTICE, (errmsg("Copying data from local table...")));
}
rowsCopied++;
if (rowsCopied % 1000000 == 0)
{
ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied)));
}
}
if (rowsCopied % 1000000 != 0)
{
ereport(NOTICE, (errmsg("Copied %ld rows", rowsCopied)));
}
MemoryContextSwitchTo(oldContext);
heap_endscan(scan);
copyDest->rShutdown(copyDest);
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
heap_close(distributedRelation, NoLock);
}

View File

@ -1819,7 +1819,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
/* we don't support copy to reference tables from workers */
EnsureSchemaNode();
EnsureCoordinator();
}
else
{

View File

@ -75,12 +75,6 @@ CREATE TABLE nation (
n_name char(25) not null,
n_regionkey integer not null,
n_comment varchar(152));
\COPY nation FROM STDIN WITH CSV
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
ERROR: cannot distribute relation "nation"
DETAIL: Relation "nation" contains data.
HINT: Empty your table before distributing it.
TRUNCATE nation;
SELECT create_reference_table('nation');
create_reference_table
------------------------
@ -360,6 +354,48 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text);
INSERT INTO data_load_test VALUES (132, 'hello');
INSERT INTO data_load_test VALUES (243, 'world');
-- create_distributed_table copies data into the distributed table
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
NOTICE: Copied 2 rows
create_distributed_table
--------------------------
(1 row)
SELECT * FROM data_load_test;
col1 | col2
------+-------
132 | hello
243 | world
(2 rows)
DROP TABLE data_load_test;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2;
SELECT create_distributed_table('data_load_test', 'col1');
NOTICE: Copying data from local table...
NOTICE: Copied 2 rows
create_distributed_table
--------------------------
(1 row)
SELECT * FROM data_load_test;
col1 | col3
------+-------
132 | world
243 | hello
(2 rows)
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem);

View File

@ -2,15 +2,11 @@ ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1250000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
-- insert some data, and make sure that cannot be create_distributed_table
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05');
-- should error out given that there exists data
SELECT create_reference_table('reference_table_test');
ERROR: cannot distribute relation "reference_table_test"
DETAIL: Relation "reference_table_test" contains data.
HINT: Empty your table before distributing it.
TRUNCATE reference_table_test;
-- now should be able to create the reference table
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
-- create the reference table
SELECT create_reference_table('reference_table_test');
NOTICE: Copying data from local table...
NOTICE: Copied 1 rows
create_reference_table
------------------------
@ -52,8 +48,14 @@ WHERE
1250000 | 1 | localhost | 57638
(2 rows)
-- check whether data was copied into distributed table
SELECT * FROM reference_table_test;
value_1 | value_2 | value_3 | value_4
---------+---------+---------+--------------------------
1 | 1 | 1 | Thu Dec 01 00:00:00 2016
(1 row)
-- now, execute some modification queries
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');

View File

@ -60,18 +60,6 @@ CREATE TABLE nation (
n_regionkey integer not null,
n_comment varchar(152));
\COPY nation FROM STDIN WITH CSV
1,'name',1,'comment_1'
2,'name',2,'comment_2'
3,'name',3,'comment_3'
4,'name',4,'comment_4'
5,'name',5,'comment_5'
\.
SELECT master_create_distributed_table('nation', 'n_nationkey', 'append');
TRUNCATE nation;
SELECT create_reference_table('nation');
CREATE TABLE part (
@ -201,6 +189,26 @@ SELECT repmodel FROM pg_dist_partition WHERE logicalrelid='repmodel_test'::regcl
DROP TABLE repmodel_test;
RESET citus.replication_model;
-- Test initial data loading
CREATE TABLE data_load_test (col1 int, col2 text);
INSERT INTO data_load_test VALUES (132, 'hello');
INSERT INTO data_load_test VALUES (243, 'world');
-- create_distributed_table copies data into the distributed table
SELECT create_distributed_table('data_load_test', 'col1');
SELECT * FROM data_load_test;
DROP TABLE data_load_test;
-- Test data loading after dropping a column
CREATE TABLE data_load_test (col1 int, col2 text, col3 text);
INSERT INTO data_load_test VALUES (132, 'hello', 'world');
INSERT INTO data_load_test VALUES (243, 'world', 'hello');
ALTER TABLE data_load_test DROP COLUMN col2;
SELECT create_distributed_table('data_load_test', 'col1');
SELECT * FROM data_load_test;
DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4;

View File

@ -4,14 +4,9 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1250000;
CREATE TABLE reference_table_test (value_1 int, value_2 float, value_3 text, value_4 timestamp);
-- insert some data, and make sure that cannot be create_distributed_table
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-05');
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
-- should error out given that there exists data
SELECT create_reference_table('reference_table_test');
TRUNCATE reference_table_test;
-- now should be able to create the reference table
-- create the reference table
SELECT create_reference_table('reference_table_test');
-- see that partkey is NULL
@ -36,8 +31,10 @@ FROM
WHERE
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'reference_table_test'::regclass);
-- check whether data was copied into distributed table
SELECT * FROM reference_table_test;
-- now, execute some modification queries
INSERT INTO reference_table_test VALUES (1, 1.0, '1', '2016-12-01');
INSERT INTO reference_table_test VALUES (2, 2.0, '2', '2016-12-02');
INSERT INTO reference_table_test VALUES (3, 3.0, '3', '2016-12-03');
INSERT INTO reference_table_test VALUES (4, 4.0, '4', '2016-12-04');