pull/782/merge
Marco Slot 2016-09-22 13:30:34 +00:00 committed by GitHub
commit 8f8e23b82a
12 changed files with 779 additions and 140 deletions

View File

@ -7,7 +7,7 @@ MODULE_big = citus
EXTENSION = citus EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-1 5.2-2
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -52,6 +52,8 @@ $(EXTENSION)--5.1-8.sql: $(EXTENSION)--5.1-7.sql $(EXTENSION)--5.1-7--5.1-8.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql $(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -0,0 +1,8 @@
/* citus--5.2-1--5.2-2.sql */
CREATE FUNCTION master_insert_query_result(distributed_table regclass, query text)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_insert_query_result$$;
COMMENT ON FUNCTION master_insert_query_result(regclass, text)
IS 'append the results of a query to a distributed table'

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '5.2-1' default_version = '5.2-2'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -0,0 +1,311 @@
/*-------------------------------------------------------------------------
*
* insert_query.c
*
* Routines for inserting query results into a distributed table.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/stat.h>
#include <unistd.h>
#include "access/htup_details.h"
#include "distributed/multi_copy.h"
#include "distributed/worker_protocol.h"
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/rel.h"
/* struct type to use a local query executed using SPI as a tuple source */
typedef struct LocalQueryContext
{
const char *query;
Portal queryPortal;
int indexInBatch;
MemoryContext spiContext;
} LocalQueryContext;
/* Local functions forward declarations */
static uint64 InsertQueryResult(Oid relationId, const char *query);
static CopyTupleSource * CreateLocalQueryTupleSource(const char *query);
static void LocalQueryOpen(void *context, Relation relation,
ErrorContextCallback *errorCallback);
static void VerifyTupleDescriptorsMatch(TupleDesc tableDescriptor,
TupleDesc resultDescriptor);
static bool LocalQueryNextTuple(void *context, Datum *columnValues,
bool *columnNulls);
static void LocalQueryClose(void *context);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_insert_query_result);
/*
* master_isnert_query_result runs a query using SPI and inserts the results
* into a distributed table.
*/
Datum
master_insert_query_result(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
char *query = text_to_cstring(PG_GETARG_TEXT_P(1));
uint64 processedRowCount = 0;
processedRowCount = InsertQueryResult(relationId, query);
PG_RETURN_INT64(processedRowCount);
}
/*
* InsertQueryResult runs a query using SPI and inserts the results into
* a distributed table.
*/
static uint64
InsertQueryResult(Oid relationId, const char *query)
{
CopyTupleSource *tupleSource = CreateLocalQueryTupleSource(query);
/*
* Unfortunately, COPY requires a RangeVar * instead of an Oid to deal
* with non-existent tables (COPY from worker). Translate relationId
* into RangeVar *.
*/
char *relationName = get_rel_name(relationId);
Oid relationSchemaId = get_rel_namespace(relationId);
char *relationSchemaName = get_namespace_name(relationSchemaId);
RangeVar *relation = makeRangeVar(relationSchemaName, relationName, -1);
uint64 processedRowCount = CopyTupleSourceToShards(tupleSource, relation);
return processedRowCount;
}
/*
* CreateLocalQueryTupleSource creates and returns a tuple source for a local
* query executed using SPI.
*/
static CopyTupleSource *
CreateLocalQueryTupleSource(const char *query)
{
LocalQueryContext *localQueryContext = palloc0(sizeof(LocalQueryContext));
CopyTupleSource *tupleSource = palloc0(sizeof(CopyTupleSource));
localQueryContext->query = query;
localQueryContext->queryPortal = NULL;
localQueryContext->indexInBatch = 0;
localQueryContext->spiContext = NULL;
tupleSource->context = localQueryContext;
tupleSource->rowContext = AllocSetContextCreate(CurrentMemoryContext,
"InsertQueryRowContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
tupleSource->Open = LocalQueryOpen;
tupleSource->NextTuple = LocalQueryNextTuple;
tupleSource->Close = LocalQueryClose;
return tupleSource;
}
/*
* LocalQueryOpen starts query execution through SPI.
*/
static void
LocalQueryOpen(void *context, Relation relation, ErrorContextCallback *errorCallback)
{
LocalQueryContext *localQueryContext = (LocalQueryContext *) context;
const char *query = localQueryContext->query;
TupleDesc tableDescriptor = RelationGetDescr(relation);
Portal queryPortal = NULL;
int connected = 0;
const char *noPortalName = NULL;
const bool readOnly = false;
const bool fetchForward = true;
const int noCursorOptions = 0;
const int prefetchCount = ROW_PREFETCH_COUNT;
MemoryContext oldContext = CurrentMemoryContext;
/* for now, don't provide any special context */
errorCallback->callback = NULL;
errorCallback->arg = NULL;
errorCallback->previous = error_context_stack;
/* initialize SPI */
connected = SPI_connect();
if (connected != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
queryPortal = SPI_cursor_open_with_args(noPortalName, query,
0, NULL, NULL, NULL, /* no arguments */
readOnly, noCursorOptions);
if (queryPortal == NULL)
{
ereport(ERROR, (errmsg("could not open cursor for query \"%s\"", query)));
}
localQueryContext->queryPortal = queryPortal;
/* fetch the first batch */
SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount);
if (SPI_processed > 0)
{
TupleDesc resultDescriptor = SPI_tuptable->tupdesc;
VerifyTupleDescriptorsMatch(tableDescriptor, resultDescriptor);
}
localQueryContext->spiContext = MemoryContextSwitchTo(oldContext);
}
/*
* VerifyTupleDescriptorsMatch throws an error if the tuple descriptor does not
* match that of the table.
*/
static void
VerifyTupleDescriptorsMatch(TupleDesc tableDescriptor, TupleDesc resultDescriptor)
{
int tableColumnCount = tableDescriptor->natts;
int tupleColumnCount = resultDescriptor->natts;
int tableColumnIndex = 0;
int tupleColumnIndex = 0;
for (tableColumnIndex = 0; tableColumnIndex < tableColumnCount; tableColumnIndex++)
{
Form_pg_attribute tableColumn = NULL;
Form_pg_attribute tupleColumn = NULL;
tableColumn = tableDescriptor->attrs[tableColumnIndex];
if (tableColumn->attisdropped)
{
continue;
}
if (tupleColumnIndex >= tupleColumnCount)
{
ereport(ERROR, (errmsg("query result has fewer columns than table")));
}
tupleColumn = resultDescriptor->attrs[tupleColumnIndex];
if (tableColumn->atttypid != tupleColumn->atttypid)
{
char *columnName = NameStr(tableColumn->attname);
ereport(ERROR, (errmsg("query result does not match the type of "
"column \"%s\"", columnName)));
}
tupleColumnIndex++;
}
if (tupleColumnIndex < tupleColumnCount)
{
ereport(ERROR, (errmsg("query result has more columns than table")));
}
}
/*
* LocalQueryNextTuple reads a tuple from SPI and evaluates any missing
* default values.
*/
static bool
LocalQueryNextTuple(void *context, Datum *columnValues, bool *columnNulls)
{
LocalQueryContext *localQueryContext = (LocalQueryContext *) context;
Portal queryPortal = localQueryContext->queryPortal;
HeapTuple tuple = NULL;
TupleDesc resultDescriptor = NULL;
const bool fetchForward = true;
const int prefetchCount = ROW_PREFETCH_COUNT;
/*
* Check if we reached the end of our current batch. It would look slightly nicer
* if we did this after reading a tuple, but we still need to use the tuple after
* this function completes.
*/
if (SPI_processed > 0 && localQueryContext->indexInBatch >= SPI_processed)
{
MemoryContext oldContext = MemoryContextSwitchTo(localQueryContext->spiContext);
/* release the current batch */
SPI_freetuptable(SPI_tuptable);
/* fetch a new batch */
SPI_cursor_fetch(queryPortal, fetchForward, prefetchCount);
MemoryContextSwitchTo(oldContext);
localQueryContext->indexInBatch = 0;
}
if (SPI_processed == 0)
{
return false;
}
/* "read" the tuple */
tuple = SPI_tuptable->vals[localQueryContext->indexInBatch];
localQueryContext->indexInBatch++;
/* extract the column values and nulls */
resultDescriptor = SPI_tuptable->tupdesc;
heap_deform_tuple(tuple, resultDescriptor, columnValues, columnNulls);
return true;
}
/*
* LocalQueryClose closes the SPI cursor.
*/
static void
LocalQueryClose(void *context)
{
LocalQueryContext *localQueryContext = (LocalQueryContext *) context;
Portal queryPortal = localQueryContext->queryPortal;
int finished = 0;
MemoryContext oldContext = MemoryContextSwitchTo(localQueryContext->spiContext);
SPI_freetuptable(SPI_tuptable);
SPI_cursor_close(queryPortal);
/* will restore memory context to what it was when SPI_connect was called */
finished = SPI_finish();
if (finished != SPI_OK_FINISH)
{
ereport(ERROR, (errmsg("could not disconnect from SPI manager")));
}
MemoryContextSwitchTo(oldContext);
}

View File

@ -8,9 +8,9 @@
* COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input * COPY ... FROM commands on distributed tables. CitusCopyFrom parses the input
* from stdin, a program, or a file, and decides to copy new rows to existing * from stdin, a program, or a file, and decides to copy new rows to existing
* shards or new shards based on the partition method of the distributed table. * shards or new shards based on the partition method of the distributed table.
* If copy is run a worker node, CitusCopyFrom calls CopyFromWorkerNode which * If copy is run a worker node, CitusCopyFrom calls LocalCopyFromWorkerNode
* parses the master node copy options and handles communication with the master * which parses the master node copy options and handles communication with
* node. * the master node.
* *
* It opens a new connection for every shard placement and uses the PQputCopyData * It opens a new connection for every shard placement and uses the PQputCopyData
* function to copy the data. Because PQputCopyData transmits data, asynchronously, * function to copy the data. Because PQputCopyData transmits data, asynchronously,
@ -128,6 +128,15 @@
#include "utils/memutils.h" #include "utils/memutils.h"
/* struct type to use a local COPY .. FROM statement as a tuple source */
typedef struct LocalCopyContext
{
CopyStmt *copyStatement;
CopyState copyState;
EState *executorState;
} LocalCopyContext;
/* constant used in binary protocol */ /* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
@ -136,21 +145,28 @@ static PGconn *masterConnection = NULL;
/* Local functions forward declarations */ /* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); static CopyTupleSource * CreateLocalCopyTupleSource(CopyStmt *copyStatement);
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); static void LocalCopyOpen(void *context, Relation relation,
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); ErrorContextCallback *errorCallback);
static bool LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls);
static void LocalCopyClose(void *context);
static uint64 LocalCopyFromWorkerNode(CopyStmt *copyStatement);
static uint64 CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation);
static uint64 CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation,
Oid relationId);
static char MasterPartitionMethod(RangeVar *relation); static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement); static void RemoveMasterOptions(CopyStmt *copyStatement);
static void OpenCopyTransactions(CopyStmt *copyStatement, static void OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections,
ShardConnections *shardConnections, bool stopOnFailure, bool stopOnFailure, bool useBinaryCopyFormat);
bool useBinaryCopyFormat); static CopyOutState CreateCopyOutState(TupleDesc tupleDescriptor,
MemoryContext rowContext);
static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription, static bool CanUseBinaryCopyFormat(TupleDesc tupleDescription,
CopyOutState rowOutputState); CopyOutState rowOutputState);
static List * MasterShardPlacementList(uint64 shardId); static List * MasterShardPlacementList(uint64 shardId);
static List * RemoteFinalizedShardPlacementList(uint64 shardId); static List * RemoteFinalizedShardPlacementList(uint64 shardId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryHeaders(CopyOutState copyOutState, List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList); static void SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList);
static StringInfo ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, static StringInfo ConstructCopyStatement(RangeVar *relation, int64 shardId,
bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList); static void SendCopyDataToAll(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection, static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
@ -158,8 +174,8 @@ static void SendCopyDataToPlacement(StringInfo dataBuffer, PGconn *connection,
static void EndRemoteCopy(List *connectionList, bool stopOnFailure); static void EndRemoteCopy(List *connectionList, bool stopOnFailure);
static void ReportCopyError(PGconn *connection, PGresult *result); static void ReportCopyError(PGconn *connection, PGresult *result);
static uint32 AvailableColumnCount(TupleDesc tupleDescriptor); static uint32 AvailableColumnCount(TupleDesc tupleDescriptor);
static void StartCopyToNewShard(ShardConnections *shardConnections, static void StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections,
CopyStmt *copyStatement, bool useBinaryCopyFormat); bool useBinaryCopyFormat);
static int64 MasterCreateEmptyShard(char *relationName); static int64 MasterCreateEmptyShard(char *relationName);
static int64 CreateEmptyShard(char *relationName); static int64 CreateEmptyShard(char *relationName);
static int64 RemoteCreateEmptyShard(char *relationName); static int64 RemoteCreateEmptyShard(char *relationName);
@ -186,6 +202,7 @@ void
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag) CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{ {
bool isCopyFromWorker = false; bool isCopyFromWorker = false;
uint64 processedRowCount = 0;
/* disallow COPY to/from file or program except for superusers */ /* disallow COPY to/from file or program except for superusers */
if (copyStatement->filename != NULL && !superuser()) if (copyStatement->filename != NULL && !superuser())
@ -211,33 +228,143 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
isCopyFromWorker = IsCopyFromWorker(copyStatement); isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker) if (isCopyFromWorker)
{ {
CopyFromWorkerNode(copyStatement, completionTag); processedRowCount = LocalCopyFromWorkerNode(copyStatement);
} }
else else
{ {
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); CopyTupleSource *tupleSource = CreateLocalCopyTupleSource(copyStatement);
char partitionMethod = PartitionMethod(relationId); RangeVar *relation = copyStatement->relation;
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == processedRowCount = CopyTupleSourceToShards(tupleSource, relation);
DISTRIBUTE_BY_RANGE) }
{
CopyToExistingShards(copyStatement, completionTag); if (completionTag != NULL)
} {
else if (partitionMethod == DISTRIBUTE_BY_APPEND) snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
{ "COPY " UINT64_FORMAT, processedRowCount);
CopyToNewShards(copyStatement, completionTag, relationId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
} }
XactModificationLevel = XACT_MODIFICATION_DATA; XactModificationLevel = XACT_MODIFICATION_DATA;
} }
/*
* CopyTupleSourceToShards copies the tuples provided by tupleSource to
* the shards of the given relation. For hash- and range-distributed
* tables it copies to the existing shards and for append-distributed
* tables it adds a new shard.
*/
uint64
CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation)
{
uint64 processedRowCount = 0;
Oid relationId = RangeVarGetRelid(relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_RANGE)
{
processedRowCount = CopyToExistingShards(tupleSource, relation);
}
else if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
processedRowCount = CopyToNewShards(tupleSource, relation, relationId);
}
else
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("unsupported partition method")));
}
return processedRowCount;
}
/*
* CreateLocalCopyTupleSource creates and returns a tuple source for a local
* COPY .. FROM command.
*/
static CopyTupleSource *
CreateLocalCopyTupleSource(CopyStmt *copyStatement)
{
LocalCopyContext *localCopyContext = palloc0(sizeof(LocalCopyContext));
CopyTupleSource *tupleSource = palloc0(sizeof(CopyTupleSource));
EState *executorState = CreateExecutorState();
localCopyContext->copyStatement = copyStatement;
localCopyContext->copyState = NULL;
localCopyContext->executorState = executorState;
tupleSource->context = localCopyContext;
tupleSource->rowContext = GetPerTupleMemoryContext(executorState);
tupleSource->Open = LocalCopyOpen;
tupleSource->NextTuple = LocalCopyNextTuple;
tupleSource->Close = LocalCopyClose;
return tupleSource;
}
/*
* LocalCopyOpen opens the COPY input for copying into the given relation.
*/
static void
LocalCopyOpen(void *context, Relation relation, ErrorContextCallback *errorCallback)
{
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
CopyStmt *copyStatement = localCopyContext->copyStatement;
CopyState copyState = NULL;
copyState = BeginCopyFrom(relation,
copyStatement->filename,
copyStatement->is_program,
copyStatement->attlist,
copyStatement->options);
localCopyContext->copyState = copyState;
errorCallback->callback = CopyFromErrorCallback;
errorCallback->arg = (void *) copyState;
errorCallback->previous = error_context_stack;
}
/*
* LocalCopyNextTuple reads a tuple from the COPY input. columnValeus
* will contain values for all columns, using the default values for
* columns that were not provided by the COPY command. columnNulls
* is true for columns that contain NULL. LocalCopyNextTuple returns
* if the end of the input is reached and true otherwise.
*/
static bool
LocalCopyNextTuple(void *context, Datum *columnValues, bool *columnNulls)
{
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
CopyState copyState = localCopyContext->copyState;
EState *executorState = localCopyContext->executorState;
ExprContext *expressionContext = GetPerTupleExprContext(executorState);
bool nextRowFound = NextCopyFrom(copyState, expressionContext, columnValues,
columnNulls, NULL);
return nextRowFound;
}
/*
* LocalCopyClose closes the COPY input.
*/
static void
LocalCopyClose(void *context)
{
LocalCopyContext *localCopyContext = (LocalCopyContext *) context;
EndCopyFrom(localCopyContext->copyState);
}
/* /*
* IsCopyFromWorker checks if the given copy statement has the master host option. * IsCopyFromWorker checks if the given copy statement has the master host option.
*/ */
@ -259,16 +386,17 @@ IsCopyFromWorker(CopyStmt *copyStatement)
/* /*
* CopyFromWorkerNode implements the COPY table_name FROM ... from worker nodes * LocalCopyFromWorkerNode implements the COPY table_name FROM ... from worker
* for append-partitioned tables. * nodes for append-partitioned tables.
*/ */
static void static uint64
CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag) LocalCopyFromWorkerNode(CopyStmt *copyStatement)
{ {
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement); NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName; char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort; int32 nodePort = masterNodeAddress->nodePort;
char *nodeUser = CurrentUserName(); char *nodeUser = CurrentUserName();
uint64 processedRowCount = 0;
if (XactModificationLevel > XACT_MODIFICATION_NONE) if (XactModificationLevel > XACT_MODIFICATION_NONE)
{ {
@ -285,17 +413,19 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
PGresult *queryResult = NULL; PGresult *queryResult = NULL;
Oid relationId = InvalidOid; Oid relationId = InvalidOid;
char partitionMethod = 0; char partitionMethod = 0;
CopyTupleSource *tupleSource = NULL;
/* strip schema name for local reference */ /* strip schema name for local reference */
char *schemaName = copyStatement->relation->schemaname; RangeVar *relation = copyStatement->relation;
copyStatement->relation->schemaname = NULL; char *schemaName = relation->schemaname;
relation->schemaname = NULL;
relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false); relationId = RangeVarGetRelid(relation, NoLock, false);
/* put schema name back */ /* put schema name back */
copyStatement->relation->schemaname = schemaName; relation->schemaname = schemaName;
partitionMethod = MasterPartitionMethod(copyStatement->relation); partitionMethod = MasterPartitionMethod(relation);
if (partitionMethod != DISTRIBUTE_BY_APPEND) if (partitionMethod != DISTRIBUTE_BY_APPEND)
{ {
ereport(ERROR, (errmsg("copy from worker nodes is only supported " ereport(ERROR, (errmsg("copy from worker nodes is only supported "
@ -317,7 +447,9 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
*/ */
RemoveMasterOptions(copyStatement); RemoveMasterOptions(copyStatement);
CopyToNewShards(copyStatement, completionTag, relationId); tupleSource = CreateLocalCopyTupleSource(copyStatement);
processedRowCount = CopyToNewShards(tupleSource, relation, relationId);
/* commit metadata transactions */ /* commit metadata transactions */
queryResult = PQexec(masterConnection, "COMMIT"); queryResult = PQexec(masterConnection, "COMMIT");
@ -341,6 +473,8 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
return processedRowCount;
} }
@ -349,10 +483,10 @@ CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag)
* range-partitioned tables where there are already shards into which to copy * range-partitioned tables where there are already shards into which to copy
* rows. * rows.
*/ */
static void static uint64
CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) CopyToExistingShards(CopyTupleSource *tupleSource, RangeVar *relation)
{ {
Oid tableId = RangeVarGetRelid(copyStatement->relation, NoLock, false); Oid tableId = RangeVarGetRelid(relation, NoLock, false);
char *relationName = get_rel_name(tableId); char *relationName = get_rel_name(tableId);
Relation distributedRelation = NULL; Relation distributedRelation = NULL;
TupleDesc tupleDescriptor = NULL; TupleDesc tupleDescriptor = NULL;
@ -363,8 +497,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
FmgrInfo *compareFunction = NULL; FmgrInfo *compareFunction = NULL;
bool hasUniformHashDistribution = false; bool hasUniformHashDistribution = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(tableId);
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
int shardCount = 0; int shardCount = 0;
List *shardIntervalList = NULL; List *shardIntervalList = NULL;
@ -375,11 +507,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
ShardConnections *shardConnections = NULL; ShardConnections *shardConnections = NULL;
List *connectionList = NIL; List *connectionList = NIL;
EState *executorState = NULL;
MemoryContext executorTupleContext = NULL;
ExprContext *executorExpressionContext = NULL;
CopyState copyState = NULL;
CopyOutState copyOutState = NULL; CopyOutState copyOutState = NULL;
FmgrInfo *columnOutputFunctions = NULL; FmgrInfo *columnOutputFunctions = NULL;
uint64 processedRowCount = 0; uint64 processedRowCount = 0;
@ -446,25 +573,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
useBinarySearch = true; useBinarySearch = true;
} }
/* initialize copy state to read from COPY data source */ copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext);
copyState = BeginCopyFrom(distributedRelation,
copyStatement->filename,
copyStatement->is_program,
copyStatement->attlist,
copyStatement->options);
executorState = CreateExecutorState();
executorTupleContext = GetPerTupleMemoryContext(executorState);
executorExpressionContext = GetPerTupleExprContext(executorState);
copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* /*
@ -480,10 +589,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
{ {
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
/* open the tuple source */
tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback);
/* set up callback to identify error line number */ /* set up callback to identify error line number */
errorCallback.callback = CopyFromErrorCallback;
errorCallback.arg = (void *) copyState;
errorCallback.previous = error_context_stack;
error_context_stack = &errorCallback; error_context_stack = &errorCallback;
/* ensure transactions have unique names on worker nodes */ /* ensure transactions have unique names on worker nodes */
@ -498,14 +607,13 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
bool shardConnectionsFound = false; bool shardConnectionsFound = false;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
ResetPerTupleExprContext(executorState); MemoryContextReset(tupleSource->rowContext);
oldContext = MemoryContextSwitchTo(executorTupleContext); oldContext = MemoryContextSwitchTo(tupleSource->rowContext);
/* parse a row from the input */ /* parse a row from the input */
nextRowFound = NextCopyFrom(copyState, executorExpressionContext, nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues,
columnValues, columnNulls, NULL); columnNulls);
if (!nextRowFound) if (!nextRowFound)
{ {
MemoryContextSwitchTo(oldContext); MemoryContextSwitchTo(oldContext);
@ -547,7 +655,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
if (!shardConnectionsFound) if (!shardConnectionsFound)
{ {
/* open connections and initiate COPY on shard placements */ /* open connections and initiate COPY on shard placements */
OpenCopyTransactions(copyStatement, shardConnections, false, OpenCopyTransactions(relation, shardConnections, false,
copyOutState->binary); copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
@ -586,7 +694,8 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
PrepareRemoteTransactions(connectionList); PrepareRemoteTransactions(connectionList);
} }
EndCopyFrom(copyState); tupleSource->Close(tupleSource->context);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* check for cancellation one last time before committing */ /* check for cancellation one last time before committing */
@ -614,11 +723,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
CommitRemoteTransactions(connectionList, false); CommitRemoteTransactions(connectionList, false);
CloseConnections(connectionList); CloseConnections(connectionList);
if (completionTag != NULL) return processedRowCount;
{
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
}
} }
@ -626,10 +731,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned * CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
* tables where we create new shards into which to copy rows. * tables where we create new shards into which to copy rows.
*/ */
static void static uint64
CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) CopyToNewShards(CopyTupleSource *tupleSource, RangeVar *relation, Oid relationId)
{ {
FmgrInfo *columnOutputFunctions = NULL; uint64 processedRowCount = 0;
/* allocate column values and nulls arrays */ /* allocate column values and nulls arrays */
Relation distributedRelation = heap_open(relationId, RowExclusiveLock); Relation distributedRelation = heap_open(relationId, RowExclusiveLock);
@ -638,36 +743,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
Datum *columnValues = palloc0(columnCount * sizeof(Datum)); Datum *columnValues = palloc0(columnCount * sizeof(Datum));
bool *columnNulls = palloc0(columnCount * sizeof(bool)); bool *columnNulls = palloc0(columnCount * sizeof(bool));
EState *executorState = CreateExecutorState(); ShardConnections *shardConnections = NULL;
MemoryContext executorTupleContext = GetPerTupleMemoryContext(executorState); CopyOutState copyOutState = NULL;
ExprContext *executorExpressionContext = GetPerTupleExprContext(executorState); FmgrInfo *columnOutputFunctions = NULL;
const char *delimiterCharacter = "\t";
const char *nullPrintCharacter = "\\N";
/* /*
* Shard connections should be initialized before the PG_TRY, since it is * Shard connections should be initialized before the PG_TRY, since it is
* used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH * used in PG_CATCH. Otherwise, it may be undefined in the PG_CATCH
* (see sigsetjmp documentation). * (see sigsetjmp documentation).
*/ */
ShardConnections *shardConnections = shardConnections = palloc0(sizeof(ShardConnections));
(ShardConnections *) palloc0(sizeof(ShardConnections));
/* initialize copy state to read from COPY data source */
CopyState copyState = BeginCopyFrom(distributedRelation,
copyStatement->filename,
copyStatement->is_program,
copyStatement->attlist,
copyStatement->options);
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = executorTupleContext;
copyOutState = CreateCopyOutState(tupleDescriptor, tupleSource->rowContext);
columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary); columnOutputFunctions = ColumnOutputFunctions(tupleDescriptor, copyOutState->binary);
/* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */ /* we use a PG_TRY block to close connections on errors (e.g. in NextCopyFrom) */
@ -675,14 +762,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
{ {
uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L; uint64 shardMaxSizeInBytes = (int64) ShardMaxSize * 1024L;
uint64 copiedDataSizeInBytes = 0; uint64 copiedDataSizeInBytes = 0;
uint64 processedRowCount = 0;
/* set up callback to identify error line number */ /* set up callback to identify error line number */
ErrorContextCallback errorCallback; ErrorContextCallback errorCallback;
errorCallback.callback = CopyFromErrorCallback; /* open the tuple source */
errorCallback.arg = (void *) copyState; tupleSource->Open(tupleSource->context, distributedRelation, &errorCallback);
errorCallback.previous = error_context_stack;
while (true) while (true)
{ {
@ -690,16 +775,18 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
uint64 messageBufferSize = 0; uint64 messageBufferSize = 0;
ResetPerTupleExprContext(executorState); /* clear the previous tuple's memory */
MemoryContextReset(tupleSource->rowContext);
/* switch to tuple memory context and start showing line number in errors */ /* start showing the line number in errors */
error_context_stack = &errorCallback; error_context_stack = &errorCallback;
oldContext = MemoryContextSwitchTo(executorTupleContext);
/* switch to tuple memory context */
oldContext = MemoryContextSwitchTo(tupleSource->rowContext);
/* parse a row from the input */ /* parse a row from the input */
nextRowFound = NextCopyFrom(copyState, executorExpressionContext, nextRowFound = tupleSource->NextTuple(tupleSource->context, columnValues,
columnValues, columnNulls, NULL); columnNulls);
if (!nextRowFound) if (!nextRowFound)
{ {
/* switch to regular memory context and stop showing line number in errors */ /* switch to regular memory context and stop showing line number in errors */
@ -723,8 +810,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
if (copiedDataSizeInBytes == 0) if (copiedDataSizeInBytes == 0)
{ {
/* create shard and open connections to shard placements */ /* create shard and open connections to shard placements */
StartCopyToNewShard(shardConnections, copyStatement, StartCopyToNewShard(relation, shardConnections, copyOutState->binary);
copyOutState->binary);
/* send copy binary headers to shard placements */ /* send copy binary headers to shard placements */
if (copyOutState->binary) if (copyOutState->binary)
@ -782,17 +868,12 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
} }
EndCopyFrom(copyState); tupleSource->Close(tupleSource->context);
heap_close(distributedRelation, NoLock); heap_close(distributedRelation, NoLock);
/* check for cancellation one last time before returning */ /* check for cancellation one last time before returning */
CHECK_FOR_INTERRUPTS(); CHECK_FOR_INTERRUPTS();
if (completionTag != NULL)
{
snprintf(completionTag, COMPLETION_TAG_BUFSIZE,
"COPY " UINT64_FORMAT, processedRowCount);
}
} }
PG_CATCH(); PG_CATCH();
{ {
@ -804,6 +885,8 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
PG_RE_THROW(); PG_RE_THROW();
} }
PG_END_TRY(); PG_END_TRY();
return processedRowCount;
} }
@ -920,7 +1003,7 @@ RemoveMasterOptions(CopyStmt *copyStatement)
* shard placements. * shard placements.
*/ */
static void static void
OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections, OpenCopyTransactions(RangeVar *relation, ShardConnections *shardConnections,
bool stopOnFailure, bool useBinaryCopyFormat) bool stopOnFailure, bool useBinaryCopyFormat)
{ {
List *finalizedPlacementList = NIL; List *finalizedPlacementList = NIL;
@ -986,7 +1069,7 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
} }
PQclear(result); PQclear(result);
copyCommand = ConstructCopyStatement(copyStatement, shardConnections->shardId, copyCommand = ConstructCopyStatement(relation, shardConnections->shardId,
useBinaryCopyFormat); useBinaryCopyFormat);
result = PQexec(connection, copyCommand->data); result = PQexec(connection, copyCommand->data);
@ -1040,6 +1123,28 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections
} }
/*
* CreateCopyOutState creates a copy output state to pass to the COPY
* functions.
*/
static CopyOutState
CreateCopyOutState(TupleDesc tupleDescriptor, MemoryContext rowContext)
{
static const char *delimiterCharacter = "\t";
static const char *nullPrintCharacter = "\\N";
CopyOutState copyOutState = (CopyOutState) palloc0(sizeof(CopyOutStateData));
copyOutState->delim = (char *) delimiterCharacter;
copyOutState->null_print = (char *) nullPrintCharacter;
copyOutState->null_print_client = (char *) nullPrintCharacter;
copyOutState->binary = CanUseBinaryCopyFormat(tupleDescriptor, copyOutState);
copyOutState->fe_msgbuf = makeStringInfo();
copyOutState->rowcontext = rowContext;
return copyOutState;
}
/* /*
* CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState * CanUseBinaryCopyFormat iterates over columns of the relation given in rowOutputState
* and looks for a column whose type is array of user-defined type or composite type. * and looks for a column whose type is array of user-defined type or composite type.
@ -1173,12 +1278,12 @@ SendCopyBinaryFooters(CopyOutState copyOutState, List *connectionList)
* shard. * shard.
*/ */
static StringInfo static StringInfo
ConstructCopyStatement(CopyStmt *copyStatement, int64 shardId, bool useBinaryCopyFormat) ConstructCopyStatement(RangeVar *relation, int64 shardId, bool useBinaryCopyFormat)
{ {
StringInfo command = makeStringInfo(); StringInfo command = makeStringInfo();
char *schemaName = copyStatement->relation->schemaname; char *schemaName = relation->schemaname;
char *relationName = copyStatement->relation->relname; char *relationName = relation->relname;
char *shardName = pstrdup(relationName); char *shardName = pstrdup(relationName);
char *shardQualifiedName = NULL; char *shardQualifiedName = NULL;
@ -1536,11 +1641,11 @@ AppendCopyBinaryFooters(CopyOutState footerOutputState)
* opens connections to shard placements. * opens connections to shard placements.
*/ */
static void static void
StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement, StartCopyToNewShard(RangeVar *relation, ShardConnections *shardConnections,
bool useBinaryCopyFormat) bool useBinaryCopyFormat)
{ {
char *relationName = copyStatement->relation->relname; char *relationName = relation->relname;
char *schemaName = copyStatement->relation->schemaname; char *schemaName = relation->schemaname;
char *qualifiedName = quote_qualified_identifier(schemaName, relationName); char *qualifiedName = quote_qualified_identifier(schemaName, relationName);
int64 shardId = MasterCreateEmptyShard(qualifiedName); int64 shardId = MasterCreateEmptyShard(qualifiedName);
@ -1551,7 +1656,7 @@ StartCopyToNewShard(ShardConnections *shardConnections, CopyStmt *copyStatement,
shardConnections->connectionList = NIL; shardConnections->connectionList = NIL;
/* connect to shards placements and start transactions */ /* connect to shards placements and start transactions */
OpenCopyTransactions(copyStatement, shardConnections, true, useBinaryCopyFormat); OpenCopyTransactions(relation, shardConnections, true, useBinaryCopyFormat);
} }

View File

@ -43,8 +43,20 @@ typedef struct NodeAddress
int32 nodePort; int32 nodePort;
} NodeAddress; } NodeAddress;
/* struct type for a generic source of tuples to copy to shards */
typedef struct CopyTupleSource
{
void *context;
MemoryContext rowContext;
void (*Open)(void *context, Relation relation, ErrorContextCallback *errorCallback);
bool (*NextTuple)(void *context, Datum *columnValues, bool *columnNulls);
void (*Close)(void *context);
} CopyTupleSource;
/* function declarations for copying into a distributed table */ /* function declarations for copying into a distributed table */
extern uint64 CopyTupleSourceToShards(CopyTupleSource *tupleSource, RangeVar *relation);
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
TupleDesc rowDescriptor, TupleDesc rowDescriptor,

View File

@ -23,6 +23,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.1-8';
ALTER EXTENSION citus UPDATE TO '5.2-1'; ALTER EXTENSION citus UPDATE TO '5.2-1';
ALTER EXTENSION citus UPDATE TO '5.2-2';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;
\c \c

View File

@ -0,0 +1,123 @@
--
-- MULTI_INSERT_QUERY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
CREATE SCHEMA insert_query
CREATE TABLE hash_table (
key int,
value text,
attributes text[]
)
CREATE TABLE append_table (
LIKE hash_table
)
CREATE TABLE staging_table (
LIKE hash_table
);
SELECT master_create_distributed_table('insert_query.hash_table', 'key', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('insert_query.hash_table', 4, 2);
master_create_worker_shards
-----------------------------
(1 row)
SELECT master_create_distributed_table('insert_query.append_table', 'key', 'append');
master_create_distributed_table
---------------------------------
(1 row)
-- Try to insert into both distributed tables from staging table
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1, 1000) s
$$);
master_insert_query_result
----------------------------
1000
(1 row)
SELECT count(*) FROM insert_query.hash_table;
count
-------
1000
(1 row)
SELECT * FROM insert_query.hash_table LIMIT 1;
key | value | attributes
-----+---------+------------
1 | value-1 | {a-1,b-1}
(1 row)
-- Try to insert into both distributed tables from staging table
SELECT master_insert_query_result('insert_query.append_table', $$
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1001, 2000) s
$$);
master_insert_query_result
----------------------------
1000
(1 row)
SELECT count(*) FROM insert_query.append_table;
count
-------
1000
(1 row)
SELECT * FROM insert_query.append_table LIMIT 1;
key | value | attributes
------+------------+-----------------
1001 | value-1001 | {a-1001,b-1001}
(1 row)
-- Load 1000 rows in to staging table
INSERT INTO insert_query.staging_table
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(2001, 3000) s;
-- Move all the rows into target table
SELECT master_insert_query_result('insert_query.hash_table',
'DELETE FROM insert_query.staging_table RETURNING *');
master_insert_query_result
----------------------------
1000
(1 row)
SELECT count(*) FROM insert_query.hash_table;
count
-------
2000
(1 row)
-- Copy from a distributed table to a distributed table
SELECT master_insert_query_result('insert_query.append_table',
'SELECT * FROM insert_query.hash_table LIMIT 10');
master_insert_query_result
----------------------------
10
(1 row)
SELECT count(*) FROM insert_query.append_table;
count
-------
1010
(1 row)
-- Too many columns
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, value, attributes, attributes FROM insert_query.append_table
$$);
ERROR: query result has more columns than table
-- Too few columns
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, value FROM insert_query.append_table
$$);
ERROR: query result has fewer columns than table
-- Non-matching data type
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, attributes, value FROM insert_query.append_table
$$);
ERROR: query result does not match the type of column "value"

View File

@ -1534,7 +1534,7 @@ DROP MATERIALIZED VIEW mv_articles_hash;
DEBUG: drop auto-cascades to type mv_articles_hash DEBUG: drop auto-cascades to type mv_articles_hash
DEBUG: drop auto-cascades to type mv_articles_hash[] DEBUG: drop auto-cascades to type mv_articles_hash[]
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
DEBUG: EventTriggerInvoke 16727 DEBUG: EventTriggerInvoke 16729
CREATE MATERIALIZED VIEW mv_articles_hash_error AS CREATE MATERIALIZED VIEW mv_articles_hash_error AS
SELECT * FROM articles_hash WHERE author_id in (1,2); SELECT * FROM articles_hash WHERE author_id in (1,2);
NOTICE: cannot use shard pruning with ANY/ALL (array expression) NOTICE: cannot use shard pruning with ANY/ALL (array expression)

View File

@ -160,3 +160,8 @@ test: multi_schema_support
# multi_function_evaluation tests edge-cases in master-side function pre-evaluation # multi_function_evaluation tests edge-cases in master-side function pre-evaluation
# ---------- # ----------
test: multi_function_evaluation test: multi_function_evaluation
# ---------
# multi_insert_query tests the master_insert_query_result UDF
# ---------
test: multi_insert_query

View File

@ -28,6 +28,7 @@ ALTER EXTENSION citus UPDATE TO '5.1-6';
ALTER EXTENSION citus UPDATE TO '5.1-7'; ALTER EXTENSION citus UPDATE TO '5.1-7';
ALTER EXTENSION citus UPDATE TO '5.1-8'; ALTER EXTENSION citus UPDATE TO '5.1-8';
ALTER EXTENSION citus UPDATE TO '5.2-1'; ALTER EXTENSION citus UPDATE TO '5.2-1';
ALTER EXTENSION citus UPDATE TO '5.2-2';
-- drop extension an re-create in newest version -- drop extension an re-create in newest version
DROP EXTENSION citus; DROP EXTENSION citus;

View File

@ -0,0 +1,71 @@
--
-- MULTI_INSERT_QUERY
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1300000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1300000;
CREATE SCHEMA insert_query
CREATE TABLE hash_table (
key int,
value text,
attributes text[]
)
CREATE TABLE append_table (
LIKE hash_table
)
CREATE TABLE staging_table (
LIKE hash_table
);
SELECT master_create_distributed_table('insert_query.hash_table', 'key', 'hash');
SELECT master_create_worker_shards('insert_query.hash_table', 4, 2);
SELECT master_create_distributed_table('insert_query.append_table', 'key', 'append');
-- Try to insert into both distributed tables from staging table
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1, 1000) s
$$);
SELECT count(*) FROM insert_query.hash_table;
SELECT * FROM insert_query.hash_table LIMIT 1;
-- Try to insert into both distributed tables from staging table
SELECT master_insert_query_result('insert_query.append_table', $$
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(1001, 2000) s
$$);
SELECT count(*) FROM insert_query.append_table;
SELECT * FROM insert_query.append_table LIMIT 1;
-- Load 1000 rows in to staging table
INSERT INTO insert_query.staging_table
SELECT s, 'value-'||s, ARRAY['a-'||s,'b-'||s] FROM generate_series(2001, 3000) s;
-- Move all the rows into target table
SELECT master_insert_query_result('insert_query.hash_table',
'DELETE FROM insert_query.staging_table RETURNING *');
SELECT count(*) FROM insert_query.hash_table;
-- Copy from a distributed table to a distributed table
SELECT master_insert_query_result('insert_query.append_table',
'SELECT * FROM insert_query.hash_table LIMIT 10');
SELECT count(*) FROM insert_query.append_table;
-- Too many columns
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, value, attributes, attributes FROM insert_query.append_table
$$);
-- Too few columns
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, value FROM insert_query.append_table
$$);
-- Non-matching data type
SELECT master_insert_query_result('insert_query.hash_table', $$
SELECT key, attributes, value FROM insert_query.append_table
$$);