Allow local execution for intermediate results in COPY

When COPY is used for copying into co-located files, it was
not allowed to use local execution. The primary reason was
Citus treating co-located intermediate results as co-located
shards, and COPY into the distributed table was done via
"format result". And, local execution of such COPY commands
was not implemented.

With this change, we implement support for local execution with
"format result". To do that, we use the buffer for every file
on shardState->copyOutState, similar to how local copy on
shards are implemented. In fact, the logic is similar to
local copy on shards, but instead of writing to the shards,
Citus writes the results to a file.

The logic relies on LOCAL_COPY_FLUSH_THRESHOLD, and flushes
only when the size exceeds the threshold. But, unlike local
copy on shards, in this case we write the headers and footers
just once.
pull/4666/head
Onder Kalaci 2021-02-05 18:20:56 +01:00
parent 1d3b866df5
commit c804c9aa21
12 changed files with 472 additions and 71 deletions

View File

@ -30,20 +30,25 @@
#include "distributed/transmit.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/intermediate_results.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/local_executor.h"
#include "distributed/local_multi_copy.h"
#include "distributed/shard_utils.h"
#include "distributed/version_compat.h"
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
/* managed via GUC, default is 512 kB */
int LocalCopyFlushThresholdByte = 512 * 1024;
static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
CopyOutState localCopyOutState);
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
static bool ShouldSendCopyNow(StringInfo buffer);
static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId,
CopyStmt *copyStatement, bool isEndOfCopy);
static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead);
/*
* LocalCopyBuffer is used in copy callback to return the copied rows.
@ -52,6 +57,7 @@ static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary);
*/
static StringInfo LocalCopyBuffer;
/*
* WriteTupleToLocalShard adds the given slot and does a local copy if
* this is the end of copy, or the buffer size exceeds the threshold.
@ -94,6 +100,25 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in
}
/*
* WriteTupleToLocalFile adds the given slot and does a local copy to the
* file if the buffer size exceeds the threshold.
*/
void
WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
int64 shardId, CopyOutState localFileCopyOutState,
FileCompat *fileCompat)
{
AddSlotToBuffer(slot, copyDest, localFileCopyOutState);
if (ShouldSendCopyNow(localFileCopyOutState->fe_msgbuf))
{
WriteToLocalFile(localFileCopyOutState->fe_msgbuf, fileCompat);
resetStringInfo(localFileCopyOutState->fe_msgbuf);
}
}
/*
* FinishLocalCopyToShard finishes local copy for the given shard with the shard id.
*/
@ -112,6 +137,26 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
}
/*
* FinishLocalCopyToFile finishes local copy for the given file.
*/
void
FinishLocalCopyToFile(CopyOutState localFileCopyOutState, FileCompat *fileCompat)
{
StringInfo data = localFileCopyOutState->fe_msgbuf;
bool isBinaryCopy = localFileCopyOutState->binary;
if (isBinaryCopy)
{
AppendCopyBinaryFooters(localFileCopyOutState);
}
WriteToLocalFile(data, fileCompat);
resetStringInfo(localFileCopyOutState->fe_msgbuf);
FileClose(fileCompat->fd);
}
/*
* AddSlotToBuffer serializes the given slot and adds it to
* the buffer in localCopyOutState.
@ -138,7 +183,8 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutSt
static bool
ShouldSendCopyNow(StringInfo buffer)
{
return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD;
/* LocalCopyFlushThreshold is in bytes */
return buffer->len > LocalCopyFlushThresholdByte;
}

View File

@ -92,6 +92,7 @@
#include "distributed/worker_protocol.h"
#include "distributed/local_multi_copy.h"
#include "distributed/hash_helpers.h"
#include "distributed/transmit.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
@ -122,6 +123,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
*/
#define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024)
#define FILE_IS_OPEN(x) (x > -1)
typedef struct CopyShardState CopyShardState;
typedef struct CopyPlacementState CopyPlacementState;
@ -195,9 +198,12 @@ struct CopyShardState
/* Used as hash key. */
uint64 shardId;
/* used for doing local copy */
/* used for doing local copy, either for a shard or a co-located file */
CopyOutState copyOutState;
/* used when copy is targeting co-located file */
FileCompat fileDest;
/* containsLocalPlacement is true if we have a local placement for the shard id of this state */
bool containsLocalPlacement;
@ -271,7 +277,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash,
static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure,
bool *found, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isCopyToIntermediateFile);
copyOutState, bool isColocatedIntermediateResult);
static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash,
ShardPlacement *placement,
bool stopOnFailure);
@ -283,10 +289,10 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash,
const char *hostname, int32 port);
static void InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash,
uint64 shardId, bool stopOnFailure, bool
canUseLocalCopy,
uint64 shardId, bool stopOnFailure,
bool canUseLocalCopy,
CopyOutState copyOutState,
bool isCopyToIntermediateFile);
bool colocatedIntermediateResult);
static void StartPlacementStateCopyCommand(CopyPlacementState *placementState,
CopyStmt *copyStatement,
CopyOutState copyOutState);
@ -335,11 +341,14 @@ static bool ContainsLocalPlacement(int64 shardId);
static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64
processedRowCount);
static void FinishLocalCopy(CitusCopyDestReceiver *copyDest);
static void CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
CopyShardState *shardState);
static void FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest);
static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to);
static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool
isIntermediateResult);
static LocalCopyStatus GetLocalCopyStatus(void);
static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList);
static void LogLocalCopyExecution(uint64 shardId);
static void LogLocalCopyToRelationExecution(uint64 shardId);
static void LogLocalCopyToFileExecution(uint64 shardId);
/* exports for SQL callable functions */
@ -2083,7 +2092,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
copyDest->partitionColumnIndex = partitionColumnIndex;
copyDest->executorState = executorState;
copyDest->stopOnFailure = stopOnFailure;
copyDest->intermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix;
copyDest->memoryContext = CurrentMemoryContext;
return copyDest;
@ -2097,22 +2106,13 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu
* execution depending on other information.
*/
static LocalCopyStatus
GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult)
GetLocalCopyStatus(void)
{
if (!EnableLocalExecution ||
GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED)
{
return LOCAL_COPY_DISABLED;
}
else if (isIntermediateResult)
{
/*
* Intermediate files are written to a file, and files are visible to all
* transactions, and we use a custom copy format for copy therefore we will
* use the existing logic for that.
*/
return LOCAL_COPY_DISABLED;
}
else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED)
{
/*
@ -2291,9 +2291,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
/* define the template for the COPY statement that is sent to workers */
CopyStmt *copyStatement = makeNode(CopyStmt);
if (copyDest->intermediateResultIdPrefix != NULL)
if (copyDest->colocatedIntermediateResultIdPrefix != NULL)
{
copyStatement->relation = makeRangeVar(NULL, copyDest->intermediateResultIdPrefix,
copyStatement->relation = makeRangeVar(NULL,
copyDest->
colocatedIntermediateResultIdPrefix,
-1);
DefElem *formatResultOption = makeDefElem("format", (Node *) makeString("result"),
@ -2342,9 +2344,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
*/
EnsureConnectionPossibilityForRemotePrimaryNodes();
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
LocalCopyStatus localCopyStatus =
GetLocalCopyStatus(shardIntervalList, isIntermediateResult);
LocalCopyStatus localCopyStatus = GetLocalCopyStatus();
if (localCopyStatus == LOCAL_COPY_DISABLED)
{
copyDest->shouldUseLocalCopy = false;
@ -2443,15 +2443,16 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
/* connections hash is kept in memory context */
MemoryContextSwitchTo(copyDest->memoryContext);
bool isColocatedIntermediateResult =
copyDest->colocatedIntermediateResultIdPrefix != NULL;
bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL;
CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash,
copyDest->connectionStateHash,
stopOnFailure,
&cachedShardStateFound,
copyDest->shouldUseLocalCopy,
copyDest->copyOutState,
isIntermediateResult);
isColocatedIntermediateResult);
if (!cachedShardStateFound)
{
firstTupleInShard = true;
@ -2472,12 +2473,22 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest
}
}
if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
if (isColocatedIntermediateResult && copyDest->shouldUseLocalCopy &&
shardState->containsLocalPlacement)
{
if (firstTupleInShard)
{
CreateLocalColocatedIntermediateFile(copyDest, shardState);
}
WriteTupleToLocalFile(slot, copyDest, shardId,
shardState->copyOutState, &shardState->fileDest);
}
else if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement)
{
WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState);
}
foreach(placementStateCell, shardState->placementStateList)
{
CopyPlacementState *currentPlacementState = lfirst(placementStateCell);
@ -2693,6 +2704,8 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
Relation distributedRelation = copyDest->distributedRelation;
List *connectionStateList = ConnectionStateList(connectionStateHash);
FinishLocalColocatedIntermediateFiles(copyDest);
FinishLocalCopy(copyDest);
PG_TRY();
@ -2743,6 +2756,61 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest)
}
/*
* CreateLocalColocatedIntermediateFile creates a co-located file for the given
* shard, and appends the binary headers if needed. The function also modifies
* shardState to set the fileDest and copyOutState.
*/
static void
CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
CopyShardState *shardState)
{
/* make sure the directory exists */
CreateIntermediateResultsDirectory();
const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC);
const int fileMode = (S_IRUSR | S_IWUSR);
StringInfo filePath = makeStringInfo();
appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix,
shardState->shardId);
const char *fileName = QueryResultFileName(filePath->data);
shardState->fileDest =
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags, fileMode));
CopyOutState localFileCopyOutState = shardState->copyOutState;
bool isBinaryCopy = localFileCopyOutState->binary;
if (isBinaryCopy)
{
AppendCopyBinaryHeaders(localFileCopyOutState);
}
}
/*
* FinishLocalColocatedIntermediateFiles iterates over all the colocated
* intermediate files and finishes the COPY on all of them.
*/
static void
FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest)
{
HTAB *shardStateHash = copyDest->shardStateHash;
HASH_SEQ_STATUS status;
CopyShardState *copyShardState;
foreach_htab(copyShardState, &status, shardStateHash)
{
if (copyShardState->copyOutState != NULL &&
FILE_IS_OPEN(copyShardState->fileDest.fd))
{
FinishLocalCopyToFile(copyShardState->copyOutState,
&copyShardState->fileDest);
}
}
}
/*
* ShutdownCopyConnectionState ends the copy command for the current active
* placement on connection, and then sends the rest of the buffers over the
@ -3438,8 +3506,8 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32
static CopyShardState *
GetShardState(uint64 shardId, HTAB *shardStateHash,
HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool
shouldUseLocalCopy, CopyOutState copyOutState, bool
isCopyToIntermediateFile)
shouldUseLocalCopy, CopyOutState copyOutState,
bool isColocatedIntermediateResult)
{
CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId,
HASH_ENTER, found);
@ -3447,7 +3515,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
{
InitializeCopyShardState(shardState, connectionStateHash,
shardId, stopOnFailure, shouldUseLocalCopy,
copyOutState, isCopyToIntermediateFile);
copyOutState, isColocatedIntermediateResult);
}
return shardState;
@ -3462,8 +3530,9 @@ GetShardState(uint64 shardId, HTAB *shardStateHash,
static void
InitializeCopyShardState(CopyShardState *shardState,
HTAB *connectionStateHash, uint64 shardId,
bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState
copyOutState, bool isCopyToIntermediateFile)
bool stopOnFailure, bool shouldUseLocalCopy,
CopyOutState copyOutState,
bool colocatedIntermediateResult)
{
ListCell *placementCell = NULL;
int failedPlacementCount = 0;
@ -3487,7 +3556,7 @@ InitializeCopyShardState(CopyShardState *shardState,
shardState->placementStateList = NIL;
shardState->copyOutState = NULL;
shardState->containsLocalPlacement = ContainsLocalPlacement(shardId);
shardState->fileDest.fd = -1;
foreach(placementCell, activePlacementList)
{
@ -3497,22 +3566,17 @@ InitializeCopyShardState(CopyShardState *shardState,
{
shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState));
CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState);
LogLocalCopyExecution(shardId);
continue;
}
if (placement->groupId == GetLocalGroupId())
{
/*
* if we are copying into an intermediate file we won't use local copy.
* Files are visible to all transactions so we can still use local execution, therefore
* we shouldn't restrict only using connection in this case.
*/
if (!isCopyToIntermediateFile)
if (colocatedIntermediateResult)
{
SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED);
LogLocalCopyToFileExecution(shardId);
}
else
{
LogLocalCopyToRelationExecution(shardId);
}
continue;
}
MultiConnection *connection =
@ -3591,11 +3655,11 @@ CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to)
/*
* LogLocalCopyExecution logs that the copy will be done locally for
* the given shard.
* LogLocalCopyToRelationExecution logs that the copy will be done
* locally for the given shard.
*/
static void
LogLocalCopyExecution(uint64 shardId)
LogLocalCopyToRelationExecution(uint64 shardId)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
@ -3605,6 +3669,22 @@ LogLocalCopyExecution(uint64 shardId)
}
/*
* LogLocalCopyToFileExecution logs that the copy will be done locally for
* a file colocated to the given shard.
*/
static void
LogLocalCopyToFileExecution(uint64 shardId)
{
if (!(LogRemoteCommands || LogLocalCommands))
{
return;
}
ereport(NOTICE, (errmsg("executing the copy locally for colocated file with "
"shard %lu", shardId)));
}
/*
* CopyGetPlacementConnection assigns a connection to the given placement. If
* a connection has already been assigned the placement in the current transaction

View File

@ -86,7 +86,6 @@ static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation,
TupleDesc inputTupleDescriptor);
static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest);
static StringInfo ConstructCopyResultStatement(const char *resultId);
static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest);
static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList);
static void SendCopyDataOverConnection(StringInfo dataBuffer,
@ -438,7 +437,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest)
/*
* WriteToLocalResultsFile writes the bytes in a StringInfo to a local file.
*/
static void
void
WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat)
{
int bytesWritten = FileWriteCompat(fileCompat, copyData->data,

View File

@ -37,6 +37,7 @@
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/insert_select_executor.h"
#include "distributed/intermediate_result_pruning.h"
#include "distributed/local_multi_copy.h"
#include "distributed/local_executor.h"
#include "distributed/local_distributed_join_planner.h"
#include "distributed/locally_reserved_shared_connections.h"
@ -684,6 +685,16 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NoticeIfSubqueryPushdownEnabled, NULL, NULL);
DefineCustomIntVariable(
"citus.local_copy_flush_threshold",
gettext_noop("Sets the threshold for local copy to be flushed."),
NULL,
&LocalCopyFlushThresholdByte,
512 * 1024, 1, INT_MAX,
PGC_USERSET,
GUC_UNIT_BYTE | GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.local_shared_pool_size",
gettext_noop(

View File

@ -131,8 +131,13 @@ typedef struct CitusCopyDestReceiver
/* if true, should copy to local placements in the current session */
bool shouldUseLocalCopy;
/* copy into intermediate result */
char *intermediateResultIdPrefix;
/*
* Copy into colocated intermediate result. When this is set, the
* COPY assumes there are hypothetical colocated shards to the
* relation that are files. And, COPY writes the data to the
* files as if they are shards.
*/
char *colocatedIntermediateResultIdPrefix;
} CitusCopyDestReceiver;

View File

@ -53,6 +53,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId,
EState *executorState,
List *initialNodeList, bool
writeLocalFile);
extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat);
extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver);
extern void SendQueryResultViaCopy(const char *resultId);
extern void ReceiveQueryResultViaCopy(const char *resultId);

View File

@ -3,17 +3,24 @@
#define LOCAL_MULTI_COPY
/*
* LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed.
* LocalCopyFlushThresholdByte is the threshold for local copy to be flushed.
* There will be one buffer for each local placement, when the buffer size
* exceeds this threshold, it will be flushed.
*
* Managed via GUC, the default is 512 kB.
*/
#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024)
extern int LocalCopyFlushThresholdByte;
extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
int64
shardId,
CopyOutState localCopyOutState);
extern void WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest,
int64 shardId, CopyOutState localFileCopyOutState,
FileCompat *fileCompat);
extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId,
CopyOutState localCopyOutState);
extern void FinishLocalCopyToFile(CopyOutState localFileCopyOutState,
FileCompat *fileCompat);
#endif /* LOCAL_MULTI_COPY */

View File

@ -401,6 +401,7 @@ BEGIN;
WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b;
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true
NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x)))
NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b
@ -531,7 +532,7 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
ROLLBACK;
RESET citus.enable_cte_inlining;
CREATE table ref_table(x int, y int);
CREATE table ref_table(x int PRIMARY KEY, y int);
-- this will be replicated to the coordinator because of add_coordinator test
SELECT create_reference_table('ref_table');
create_reference_table
@ -555,6 +556,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinat
(1 row)
ROLLBACK;
-- writing to local file and remote intermediate files
-- at the same time
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
NOTICE: executing the copy locally for shard xxxxx
WITH cte_1 AS (
INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT x, y FROM coordinator_shouldhaveshards.ref_table_1503039 ref_table LIMIT 10000
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_table_1503039 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_result('insert_select_XXX_1503039'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = (excluded.y OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.x, citus_table_alias.y
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) cte_1
count
---------------------------------------------------------------------
100
(1 row)
-- issue #4237: preventing empty placement creation on coordinator
CREATE TABLE test_append_table(a int);
SELECT create_distributed_table('test_append_table', 'a', 'append');

View File

@ -283,6 +283,7 @@ RETURNING *;
-- can be executed locally
INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING;
NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING
INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING;
-- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution

View File

@ -167,6 +167,14 @@ SELECT create_distributed_table('public.another_schema_table', 'a');
(1 row)
CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type);
SELECT create_distributed_table('non_binary_copy_test', 'key');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i;
-- Confirm the basics work
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
SELECT * FROM test WHERE x = 1;
@ -236,21 +244,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *;
SET citus.log_remote_commands to true;
-- observe that there is a conflict and the following query does nothing
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col
---------------------------------------------------------------------
(0 rows)
-- same as the above with different syntax
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col
---------------------------------------------------------------------
(0 rows)
-- again the same query with another syntax
INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *;
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630515 DO NOTHING RETURNING part_key, other_col, third_col
NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630519 DO NOTHING RETURNING part_key, other_col, third_col
part_key | other_col | third_col
---------------------------------------------------------------------
(0 rows)
@ -258,7 +266,7 @@ NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_9063
BEGIN;
-- force local execution
SELECT count(*) FROM upsert_test WHERE part_key = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630515 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630519 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
1
@ -344,10 +352,10 @@ SET search_path TO single_node;
DROP SCHEMA "Quoed.Schema" CASCADE;
NOTICE: drop cascades to 5 other objects
DETAIL: drop cascades to table "Quoed.Schema".simple_table_name
drop cascades to table "Quoed.Schema".simple_table_name_90630520
drop cascades to table "Quoed.Schema".simple_table_name_90630521
drop cascades to table "Quoed.Schema".simple_table_name_90630522
drop cascades to table "Quoed.Schema".simple_table_name_90630523
drop cascades to table "Quoed.Schema".simple_table_name_90630524
drop cascades to table "Quoed.Schema".simple_table_name_90630525
drop cascades to table "Quoed.Schema".simple_table_name_90630526
drop cascades to table "Quoed.Schema".simple_table_name_90630527
-- test partitioned index creation with long name
CREATE TABLE test_index_creation1
(
@ -1591,6 +1599,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in
7
(1 row)
-- this is to get ready for the next tests
TRUNCATE another_schema_table;
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
-- copy can use local execution even if there is no connection available
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
NOTICE: executing the copy locally for shard xxxxx
@ -1601,6 +1615,176 @@ NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY another_schema_table, line 3: "3"
NOTICE: executing the copy locally for shard xxxxx
CONTEXT: COPY another_schema_table, line 6: "6"
-- INSERT .. SELECT with co-located intermediate results
SET citus.log_remote_commands to false;
CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a);
SET citus.log_local_commands to true;
INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING
INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b
a | b
---------------------------------------------------------------------
1 |
2 |
3 |
4 |
5 |
6 |
7 |
8 |
9 |
10 |
(10 rows)
-- INSERT .. SELECT with co-located intermediate result for non-binary input
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value single_node.new_type)) cte_1
count
---------------------------------------------------------------------
1001
(1 row)
-- test with NULL columns
ALTER TABLE non_binary_copy_test ADD COLUMN z INT;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630515, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630516, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630517, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630518, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;')
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
SELECT bool_and(z is null) FROM cte_1;
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
bool_and
---------------------------------------------------------------------
t
(1 row)
-- test with type coersion (int -> text) and also NULL values with coersion
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z
NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS count, count(DISTINCT (z)::text) AS count FROM (SELECT intermediate_result.key, intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, z integer)) cte_1
count | count
---------------------------------------------------------------------
1001 | 0
(1 row)
-- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table;
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE
INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i;
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
NOTICE: executing the copy locally for shard xxxxx
WITH cte_1 AS
(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *)
SELECT count(*) FROM cte_1;
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b
NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1
count
---------------------------------------------------------------------
0
(1 row)
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
SELECT bool_and(z is null) FROM cte_1;
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the copy locally for colocated file with shard xxxxx
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z
NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1
bool_and
---------------------------------------------------------------------
t
(1 row)
RESET citus.local_copy_flush_threshold;
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;

View File

@ -221,7 +221,7 @@ SELECT create_distributed_table('dist_table1', 'a');
ROLLBACK;
RESET citus.enable_cte_inlining;
CREATE table ref_table(x int, y int);
CREATE table ref_table(x int PRIMARY KEY, y int);
-- this will be replicated to the coordinator because of add_coordinator test
SELECT create_reference_table('ref_table');
@ -233,6 +233,14 @@ INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
SELECT COUNT(*) FROM test JOIN ref_table USING(x);
ROLLBACK;
-- writing to local file and remote intermediate files
-- at the same time
INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100);
WITH cte_1 AS (
INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *)
SELECT count(*) FROM cte_1;
-- issue #4237: preventing empty placement creation on coordinator
CREATE TABLE test_append_table(a int);
SELECT create_distributed_table('test_append_table', 'a', 'append');

View File

@ -79,6 +79,10 @@ CREATE TABLE local(c int, d int);
CREATE TABLE public.another_schema_table(a int, b int);
SELECT create_distributed_table('public.another_schema_table', 'a');
CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type);
SELECT create_distributed_table('non_binary_copy_test', 'key');
INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i;
-- Confirm the basics work
INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5);
SELECT * FROM test WHERE x = 1;
@ -783,7 +787,6 @@ ROLLBACK;
\c - - - :master_port
SET search_path TO single_node;
-- simulate that even if there is no connection slots
-- to connect, Citus can switch to local execution
SET citus.force_max_query_parallelization TO false;
@ -817,9 +820,48 @@ ROLLBACK;
WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000)
SELECT count(*) FROM cte_1;
-- this is to get ready for the next tests
TRUNCATE another_schema_table;
-- copy can use local execution even if there is no connection available
COPY another_schema_table(a) FROM PROGRAM 'seq 32';
-- INSERT .. SELECT with co-located intermediate results
SET citus.log_remote_commands to false;
CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a);
SET citus.log_local_commands to true;
INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING;
INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *;
-- INSERT .. SELECT with co-located intermediate result for non-binary input
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value)
SELECT count(*) FROM cte_1;
-- test with NULL columns
ALTER TABLE non_binary_copy_test ADD COLUMN z INT;
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
SELECT bool_and(z is null) FROM cte_1;
-- test with type coersion (int -> text) and also NULL values with coersion
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z)
SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1;
-- lets flush the copy often to make sure everyhing is fine
SET citus.local_copy_flush_threshold TO 1;
TRUNCATE another_schema_table;
INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i;
WITH cte_1 AS
(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *)
SELECT count(*) FROM cte_1;
WITH cte_1 AS
(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z)
SELECT bool_and(z is null) FROM cte_1;
RESET citus.local_copy_flush_threshold;
-- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail
SET citus.enable_local_execution TO false;