From 0655ac7a8d9f10b610fc4cc94a88ee70fb6ca9b6 Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 12 Sep 2016 00:22:03 +0200 Subject: [PATCH] Add master_insert_query_result UDF to do basic INSERT/SELECT queries (with DML support) --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--5.2-1--5.2-2.sql | 8 + src/backend/distributed/citus.control | 2 +- .../distributed/commands/insert_query.c | 311 ++++++++++++++++++ src/test/regress/expected/multi_extension.out | 1 + .../regress/expected/multi_insert_query.out | 123 +++++++ .../regress/expected/multi_router_planner.out | 2 +- src/test/regress/multi_schedule | 5 + src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_insert_query.sql | 71 ++++ 10 files changed, 525 insertions(+), 3 deletions(-) create mode 100644 src/backend/distributed/citus--5.2-1--5.2-2.sql create mode 100644 src/backend/distributed/commands/insert_query.c create mode 100644 src/test/regress/expected/multi_insert_query.out create mode 100644 src/test/regress/sql/multi_insert_query.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index fbaca79aa..3bb21df5c 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -7,7 +7,7 @@ MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ - 5.2-1 + 5.2-1 5.2-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -52,6 +52,8 @@ $(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql cat $^ > $@ $(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql cat $^ > $@ +$(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.2-1--5.2-2.sql b/src/backend/distributed/citus--5.2-1--5.2-2.sql new file mode 100644 index 000000000..adb1ef898 --- /dev/null +++ b/src/backend/distributed/citus--5.2-1--5.2-2.sql @@ -0,0 +1,8 @@ +/* citus--5.2-1--5.2-2.sql */ + +CREATE FUNCTION master_insert_query_result(distributed_table regclass, query text) + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_insert_query_result$$; +COMMENT ON FUNCTION master_insert_query_result(regclass, text) + IS 'append the results of a query to a distributed table' diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index a6881d90b..f8dcc6c7e 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.2-1' +default_version = '5.2-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/insert_query.c b/src/backend/distributed/commands/insert_query.c new file mode 100644 index 000000000..9521c65ea --- /dev/null +++ b/src/backend/distributed/commands/insert_query.c @@ -0,0 +1,311 @@ +/*------------------------------------------------------------------------- + * + * insert_query.c + * + * Routines for inserting query results into a distributed table. + * + * Copyright (c) 2016, Citus Data, Inc. + * + * $Id$ + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" + +#include +#include +#include +#include + +#include "access/htup_details.h" +#include "distributed/multi_copy.h" +#include "distributed/worker_protocol.h" +#include "executor/spi.h" +#include "nodes/makefuncs.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" +#include "utils/memutils.h" +#include "utils/rel.h" + + +/* struct type to use a local query executed using SPI as a tuple source */ +typedef struct LocalQueryContext +{ + const char *query; + Portal queryPortal; + int indexInBatch; + MemoryContext spiContext; +} LocalQueryContext; + + +/* Local functions forward declarations */ +static uint64 InsertQueryResult(Oid relationId, const char *query); +static CopyTupleSource * CreateLocalQueryTupleSource(const char *query); +static void LocalQueryOpen(void *context, Relation relation, + ErrorContextCallback *errorCallback); +static void VerifyTupleDescriptorsMatch(TupleDesc tableDescriptor, + TupleDesc resultDescriptor); +static bool LocalQueryNextTuple(void *context, Datum *columnValues, + bool *columnNulls); +static void LocalQueryClose(void *context); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(master_insert_query_result); + + +/* + * master_isnert_query_result runs a query using SPI and inserts the results + * into a distributed table. + */ +Datum +master_insert_query_result(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + char *query = text_to_cstring(PG_GETARG_TEXT_P(1)); + + uint64 processedRowCount = 0; + + processedRowCount = InsertQueryResult(relationId, query); + + PG_RETURN_INT64(processedRowCount); +} + + +/* + * InsertQueryResult runs a query using SPI and inserts the results into + * a distributed table. + */ +static uint64 +InsertQueryResult(Oid relationId, const char *query) +{ + CopyTupleSource *tupleSource = CreateLocalQueryTupleSource(query); + + /* + * Unfortunately, COPY requires a RangeVar * instead of an Oid to deal + * with non-existent tables (COPY from worker). Translate relationId + * into RangeVar *. + */ + char *relationName = get_rel_name(relationId); + Oid relationSchemaId = get_rel_namespace(relationId); + char *relationSchemaName = get_namespace_name(relationSchemaId); + RangeVar *relation = makeRangeVar(relationSchemaName, relationName, -1); + + uint64 processedRowCount = CopyTupleSourceToShards(tupleSource, relation); + + return processedRowCount; +} + + +/* + * CreateLocalQueryTupleSource creates and returns a tuple source for a local + * query executed using SPI. + */ +static CopyTupleSource * +CreateLocalQueryTupleSource(const char *query) +{ + LocalQueryContext *localQueryContext = palloc0(sizeof(LocalQueryContext)); + CopyTupleSource *tupleSource = palloc0(sizeof(CopyTupleSource)); + + localQueryContext->query = query; + localQueryContext->queryPortal = NULL; + localQueryContext->indexInBatch = 0; + localQueryContext->spiContext = NULL; + + tupleSource->context = localQueryContext; + tupleSource->rowContext = AllocSetContextCreate(CurrentMemoryContext, + "InsertQueryRowContext", + ALLOCSET_DEFAULT_MINSIZE, + ALLOCSET_DEFAULT_INITSIZE, + ALLOCSET_DEFAULT_MAXSIZE); + + tupleSource->Open = LocalQueryOpen; + tupleSource->NextTuple = LocalQueryNextTuple; + tupleSource->Close = LocalQueryClose; + + return tupleSource; +} + + +/* + * LocalQueryOpen starts query execution through SPI. + */ +static void +LocalQueryOpen(void *context, Relation relation, ErrorContextCallback *errorCallback) +{ + LocalQueryContext *localQueryContext = (LocalQueryContext *) context; + + const char *query = localQueryContext->query; + TupleDesc tableDescriptor = RelationGetDescr(relation); + Portal queryPortal = NULL; + int connected = 0; + + const char *noPortalName = NULL; + const bool readOnly = false; + const bool fetchForward = true; + const int noCursorOptions = 0; + const int prefetchCount = ROW_PREFETCH_COUNT; + + MemoryContext oldContext = CurrentMemoryContext; + + /* for now, don't provide any special context */ + errorCallback->callback = NULL; + errorCallback->arg = NULL; + errorCallback->previous = error_context_stack; + + /* initialize SPI */ + connected = SPI_connect(); + if (connected != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + queryPortal = SPI_cursor_open_with_args(noPortalName, query, + 0, NULL, NULL, NULL, /* no arguments */ + readOnly, noCursorOptions); + if (queryPortal == NULL) + { + ereport(ERROR, (errmsg("could not open cursor for query \"%s\"", query))); + } + + localQueryContext->queryPortal = queryPortal; + + /* fetch the first batch */ + SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount); + if (SPI_processed > 0) + { + TupleDesc resultDescriptor = SPI_tuptable->tupdesc; + + VerifyTupleDescriptorsMatch(tableDescriptor, resultDescriptor); + } + + localQueryContext->spiContext = MemoryContextSwitchTo(oldContext); +} + + +/* + * VerifyTupleDescriptorsMatch throws an error if the tuple descriptor does not + * match that of the table. + */ +static void +VerifyTupleDescriptorsMatch(TupleDesc tableDescriptor, TupleDesc resultDescriptor) +{ + int tableColumnCount = tableDescriptor->natts; + int tupleColumnCount = resultDescriptor->natts; + int tableColumnIndex = 0; + int tupleColumnIndex = 0; + + for (tableColumnIndex = 0; tableColumnIndex < tableColumnCount; tableColumnIndex++) + { + Form_pg_attribute tableColumn = NULL; + Form_pg_attribute tupleColumn = NULL; + + tableColumn = tableDescriptor->attrs[tableColumnIndex]; + if (tableColumn->attisdropped) + { + continue; + } + + if (tupleColumnIndex >= tupleColumnCount) + { + ereport(ERROR, (errmsg("query result has fewer columns than table"))); + } + + tupleColumn = resultDescriptor->attrs[tupleColumnIndex]; + + if (tableColumn->atttypid != tupleColumn->atttypid) + { + char *columnName = NameStr(tableColumn->attname); + ereport(ERROR, (errmsg("query result does not match the type of " + "column \"%s\"", columnName))); + } + + tupleColumnIndex++; + } + + if (tupleColumnIndex < tupleColumnCount) + { + ereport(ERROR, (errmsg("query result has more columns than table"))); + } +} + + +/* + * LocalQueryNextTuple reads a tuple from SPI and evaluates any missing + * default values. + */ +static bool +LocalQueryNextTuple(void *context, Datum *columnValues, bool *columnNulls) +{ + LocalQueryContext *localQueryContext = (LocalQueryContext *) context; + + Portal queryPortal = localQueryContext->queryPortal; + HeapTuple tuple = NULL; + TupleDesc resultDescriptor = NULL; + const bool fetchForward = true; + const int prefetchCount = ROW_PREFETCH_COUNT; + + /* + * Check if we reached the end of our current batch. It would look slightly nicer + * if we did this after reading a tuple, but we still need to use the tuple after + * this function completes. + */ + if (SPI_processed > 0 && localQueryContext->indexInBatch >= SPI_processed) + { + MemoryContext oldContext = MemoryContextSwitchTo(localQueryContext->spiContext); + + /* release the current batch */ + SPI_freetuptable(SPI_tuptable); + + /* fetch a new batch */ + SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount); + + MemoryContextSwitchTo(oldContext); + + localQueryContext->indexInBatch = 0; + } + + if (SPI_processed == 0) + { + return false; + } + + /* "read" the tuple */ + tuple = SPI_tuptable->vals[localQueryContext->indexInBatch]; + localQueryContext->indexInBatch++; + + /* extract the column values and nulls */ + resultDescriptor = SPI_tuptable->tupdesc; + heap_deform_tuple(tuple, resultDescriptor, columnValues, columnNulls); + + return true; +} + + +/* + * LocalQueryClose closes the SPI cursor. + */ +static void +LocalQueryClose(void *context) +{ + LocalQueryContext *localQueryContext = (LocalQueryContext *) context; + + Portal queryPortal = localQueryContext->queryPortal; + int finished = 0; + + MemoryContext oldContext = MemoryContextSwitchTo(localQueryContext->spiContext); + + SPI_freetuptable(SPI_tuptable); + SPI_cursor_close(queryPortal); + + /* will restore memory context to what it was when SPI_connect was called */ + finished = SPI_finish(); + if (finished != SPI_OK_FINISH) + { + ereport(ERROR, (errmsg("could not disconnect from SPI manager"))); + } + + MemoryContextSwitchTo(oldContext); +} diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 5a583be41..023a39146 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -23,6 +23,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.2-1'; +ALTER EXTENSION citus UPDATE TO '5.2-2'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_insert_query.out b/src/test/regress/expected/multi_insert_query.out new file mode 100644 index 000000000..82b3f819c --- /dev/null +++ b/src/test/regress/expected/multi_insert_query.out @@ -0,0 +1,123 @@ +-- +-- MULTI_INSERT_QUERY +-- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; +CREATE SCHEMA insert_query +CREATE TABLE hash_table ( + key int, + value text, + attributes text[] +) +CREATE TABLE append_table ( + LIKE hash_table +) +CREATE TABLE staging_table ( + LIKE hash_table +); +SELECT master_create_distributed_table('insert_query.hash_table', 'key', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('insert_query.hash_table', 4, 2); + master_create_worker_shards +----------------------------- + +(1 row) + +SELECT master_create_distributed_table('insert_query.append_table', 'key', 'append'); + master_create_distributed_table +--------------------------------- + +(1 row) + +-- Try to insert into both distributed tables from staging table +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1, 1000) s +$$); + master_insert_query_result +---------------------------- + 1000 +(1 row) + +SELECT count(*) FROM insert_query.hash_table; + count +------- + 1000 +(1 row) + +SELECT * FROM insert_query.hash_table LIMIT 1; + key | value | attributes +-----+---------+------------ + 1 | value-1 | {a-1,b-1} +(1 row) + +-- Try to insert into both distributed tables from staging table +SELECT master_insert_query_result('insert_query.append_table', $$ + SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1001, 2000) s +$$); + master_insert_query_result +---------------------------- + 1000 +(1 row) + +SELECT count(*) FROM insert_query.append_table; + count +------- + 1000 +(1 row) + +SELECT * FROM insert_query.append_table LIMIT 1; + key | value | attributes +------+------------+----------------- + 1001 | value-1001 | {a-1001,b-1001} +(1 row) + +-- Load 1000 rows in to staging table +INSERT INTO insert_query.staging_table +SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(2001, 3000) s; +-- Move all the rows into target table +SELECT master_insert_query_result('insert_query.hash_table', + 'DELETE FROM insert_query.staging_table RETURNING *'); + master_insert_query_result +---------------------------- + 1000 +(1 row) + +SELECT count(*) FROM insert_query.hash_table; + count +------- + 2000 +(1 row) + +-- Copy from a distributed table to a distributed table +SELECT master_insert_query_result('insert_query.append_table', + 'SELECT * FROM insert_query.hash_table LIMIT 10'); + master_insert_query_result +---------------------------- + 10 +(1 row) + +SELECT count(*) FROM insert_query.append_table; + count +------- + 1010 +(1 row) + +-- Too many columns +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, value, attributes, attributes FROM insert_query.append_table +$$); +ERROR: query result has more columns than table +-- Too few columns +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, value FROM insert_query.append_table +$$); +ERROR: query result has fewer columns than table +-- Non-matching data type +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, attributes, value FROM insert_query.append_table +$$); +ERROR: query result does not match the type of column "value" diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index e6053d972..a0f71243a 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1534,7 +1534,7 @@ DROP MATERIALIZED VIEW mv_articles_hash; DEBUG: drop auto-cascades to type mv_articles_hash DEBUG: drop auto-cascades to type mv_articles_hash[] DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash -DEBUG: EventTriggerInvoke 16727 +DEBUG: EventTriggerInvoke 16729 CREATE MATERIALIZED VIEW mv_articles_hash_error AS SELECT * FROM articles_hash WHERE author_id in (1,2); NOTICE: cannot use shard pruning with ANY/ALL (array expression) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index c97da7190..b40f60336 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -159,3 +159,8 @@ test: multi_schema_support # multi_function_evaluation tests edge-cases in master-side function pre-evaluation # ---------- test: multi_function_evaluation + +# --------- +# multi_insert_query tests the master_insert_query_result UDF +# --------- +test: multi_insert_query diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 745cc5f0a..6d4fb1491 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -28,6 +28,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-6'; ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.2-1'; +ALTER EXTENSION citus UPDATE TO '5.2-2'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_insert_query.sql b/src/test/regress/sql/multi_insert_query.sql new file mode 100644 index 000000000..f635ea67e --- /dev/null +++ b/src/test/regress/sql/multi_insert_query.sql @@ -0,0 +1,71 @@ +-- +-- MULTI_INSERT_QUERY +-- + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000; + +CREATE SCHEMA insert_query +CREATE TABLE hash_table ( + key int, + value text, + attributes text[] +) +CREATE TABLE append_table ( + LIKE hash_table +) +CREATE TABLE staging_table ( + LIKE hash_table +); + +SELECT master_create_distributed_table('insert_query.hash_table', 'key', 'hash'); +SELECT master_create_worker_shards('insert_query.hash_table', 4, 2); + +SELECT master_create_distributed_table('insert_query.append_table', 'key', 'append'); + +-- Try to insert into both distributed tables from staging table +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1, 1000) s +$$); + +SELECT count(*) FROM insert_query.hash_table; +SELECT * FROM insert_query.hash_table LIMIT 1; + +-- Try to insert into both distributed tables from staging table +SELECT master_insert_query_result('insert_query.append_table', $$ + SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1001, 2000) s +$$); + +SELECT count(*) FROM insert_query.append_table; +SELECT * FROM insert_query.append_table LIMIT 1; + +-- Load 1000 rows in to staging table +INSERT INTO insert_query.staging_table +SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(2001, 3000) s; + +-- Move all the rows into target table +SELECT master_insert_query_result('insert_query.hash_table', + 'DELETE FROM insert_query.staging_table RETURNING *'); + +SELECT count(*) FROM insert_query.hash_table; + +-- Copy from a distributed table to a distributed table +SELECT master_insert_query_result('insert_query.append_table', + 'SELECT * FROM insert_query.hash_table LIMIT 10'); + +SELECT count(*) FROM insert_query.append_table; + +-- Too many columns +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, value, attributes, attributes FROM insert_query.append_table +$$); + +-- Too few columns +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, value FROM insert_query.append_table +$$); + +-- Non-matching data type +SELECT master_insert_query_result('insert_query.hash_table', $$ + SELECT key, attributes, value FROM insert_query.append_table +$$);