diff --git a/src/backend/distributed/commands/local_multi_copy.c b/src/backend/distributed/commands/local_multi_copy.c index e90426d0f..ca532ef70 100644 --- a/src/backend/distributed/commands/local_multi_copy.c +++ b/src/backend/distributed/commands/local_multi_copy.c @@ -30,20 +30,25 @@ #include "distributed/transmit.h" #include "distributed/commands/multi_copy.h" +#include "distributed/intermediate_results.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/local_executor.h" #include "distributed/local_multi_copy.h" #include "distributed/shard_utils.h" #include "distributed/version_compat.h" -static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); +/* managed via GUC, default is 512 kB */ +int LocalCopyFlushThresholdByte = 512 * 1024; + + static void AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutState localCopyOutState); - +static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); static bool ShouldSendCopyNow(StringInfo buffer); static void DoLocalCopy(StringInfo buffer, Oid relationId, int64 shardId, CopyStmt *copyStatement, bool isEndOfCopy); -static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); +static int ReadFromLocalBufferCallback(void *outBuf, int minRead, int maxRead); + /* * LocalCopyBuffer is used in copy callback to return the copied rows. @@ -52,6 +57,7 @@ static bool ShouldAddBinaryHeaders(StringInfo buffer, bool isBinary); */ static StringInfo LocalCopyBuffer; + /* * WriteTupleToLocalShard adds the given slot and does a local copy if * this is the end of copy, or the buffer size exceeds the threshold. @@ -94,6 +100,25 @@ WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, in } +/* + * WriteTupleToLocalFile adds the given slot and does a local copy to the + * file if the buffer size exceeds the threshold. + */ +void +WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + int64 shardId, CopyOutState localFileCopyOutState, + FileCompat *fileCompat) +{ + AddSlotToBuffer(slot, copyDest, localFileCopyOutState); + + if (ShouldSendCopyNow(localFileCopyOutState->fe_msgbuf)) + { + WriteToLocalFile(localFileCopyOutState->fe_msgbuf, fileCompat); + resetStringInfo(localFileCopyOutState->fe_msgbuf); + } +} + + /* * FinishLocalCopyToShard finishes local copy for the given shard with the shard id. */ @@ -112,6 +137,26 @@ FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, } +/* + * FinishLocalCopyToFile finishes local copy for the given file. + */ +void +FinishLocalCopyToFile(CopyOutState localFileCopyOutState, FileCompat *fileCompat) +{ + StringInfo data = localFileCopyOutState->fe_msgbuf; + + bool isBinaryCopy = localFileCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryFooters(localFileCopyOutState); + } + WriteToLocalFile(data, fileCompat); + resetStringInfo(localFileCopyOutState->fe_msgbuf); + + FileClose(fileCompat->fd); +} + + /* * AddSlotToBuffer serializes the given slot and adds it to * the buffer in localCopyOutState. @@ -138,7 +183,8 @@ AddSlotToBuffer(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, CopyOutSt static bool ShouldSendCopyNow(StringInfo buffer) { - return buffer->len > LOCAL_COPY_FLUSH_THRESHOLD; + /* LocalCopyFlushThreshold is in bytes */ + return buffer->len > LocalCopyFlushThresholdByte; } diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 4ee5ce7e8..9824a1fe5 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -92,6 +92,7 @@ #include "distributed/worker_protocol.h" #include "distributed/local_multi_copy.h" #include "distributed/hash_helpers.h" +#include "distributed/transmit.h" #include "executor/executor.h" #include "foreign/foreign.h" @@ -122,6 +123,8 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; */ #define COPY_SWITCH_OVER_THRESHOLD (4 * 1024 * 1024) +#define FILE_IS_OPEN(x) (x > -1) + typedef struct CopyShardState CopyShardState; typedef struct CopyPlacementState CopyPlacementState; @@ -195,9 +198,12 @@ struct CopyShardState /* Used as hash key. */ uint64 shardId; - /* used for doing local copy */ + /* used for doing local copy, either for a shard or a co-located file */ CopyOutState copyOutState; + /* used when copy is targeting co-located file */ + FileCompat fileDest; + /* containsLocalPlacement is true if we have a local placement for the shard id of this state */ bool containsLocalPlacement; @@ -271,7 +277,7 @@ static CopyConnectionState * GetConnectionState(HTAB *connectionStateHash, static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool shouldUseLocalCopy, CopyOutState - copyOutState, bool isCopyToIntermediateFile); + copyOutState, bool isColocatedIntermediateResult); static MultiConnection * CopyGetPlacementConnection(HTAB *connectionStateHash, ShardPlacement *placement, bool stopOnFailure); @@ -283,10 +289,10 @@ static List * ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 port); static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, - uint64 shardId, bool stopOnFailure, bool - canUseLocalCopy, + uint64 shardId, bool stopOnFailure, + bool canUseLocalCopy, CopyOutState copyOutState, - bool isCopyToIntermediateFile); + bool colocatedIntermediateResult); static void StartPlacementStateCopyCommand(CopyPlacementState *placementState, CopyStmt *copyStatement, CopyOutState copyOutState); @@ -335,11 +341,14 @@ static bool ContainsLocalPlacement(int64 shardId); static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uint64 processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); +static void CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest, + CopyShardState *shardState); +static void FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool - isIntermediateResult); +static LocalCopyStatus GetLocalCopyStatus(void); static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); -static void LogLocalCopyExecution(uint64 shardId); +static void LogLocalCopyToRelationExecution(uint64 shardId); +static void LogLocalCopyToFileExecution(uint64 shardId); /* exports for SQL callable functions */ @@ -2083,7 +2092,7 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu copyDest->partitionColumnIndex = partitionColumnIndex; copyDest->executorState = executorState; copyDest->stopOnFailure = stopOnFailure; - copyDest->intermediateResultIdPrefix = intermediateResultIdPrefix; + copyDest->colocatedIntermediateResultIdPrefix = intermediateResultIdPrefix; copyDest->memoryContext = CurrentMemoryContext; return copyDest; @@ -2097,22 +2106,13 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu * execution depending on other information. */ static LocalCopyStatus -GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult) +GetLocalCopyStatus(void) { if (!EnableLocalExecution || GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { return LOCAL_COPY_DISABLED; } - else if (isIntermediateResult) - { - /* - * Intermediate files are written to a file, and files are visible to all - * transactions, and we use a custom copy format for copy therefore we will - * use the existing logic for that. - */ - return LOCAL_COPY_DISABLED; - } else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* @@ -2291,9 +2291,11 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, /* define the template for the COPY statement that is sent to workers */ CopyStmt *copyStatement = makeNode(CopyStmt); - if (copyDest->intermediateResultIdPrefix != NULL) + if (copyDest->colocatedIntermediateResultIdPrefix != NULL) { - copyStatement->relation = makeRangeVar(NULL, copyDest->intermediateResultIdPrefix, + copyStatement->relation = makeRangeVar(NULL, + copyDest-> + colocatedIntermediateResultIdPrefix, -1); DefElem *formatResultOption = makeDefElem("format", (Node *) makeString("result"), @@ -2342,9 +2344,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, */ EnsureConnectionPossibilityForRemotePrimaryNodes(); - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; - LocalCopyStatus localCopyStatus = - GetLocalCopyStatus(shardIntervalList, isIntermediateResult); + LocalCopyStatus localCopyStatus = GetLocalCopyStatus(); if (localCopyStatus == LOCAL_COPY_DISABLED) { copyDest->shouldUseLocalCopy = false; @@ -2443,15 +2443,16 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest /* connections hash is kept in memory context */ MemoryContextSwitchTo(copyDest->memoryContext); + bool isColocatedIntermediateResult = + copyDest->colocatedIntermediateResultIdPrefix != NULL; - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; CopyShardState *shardState = GetShardState(shardId, copyDest->shardStateHash, copyDest->connectionStateHash, stopOnFailure, &cachedShardStateFound, copyDest->shouldUseLocalCopy, copyDest->copyOutState, - isIntermediateResult); + isColocatedIntermediateResult); if (!cachedShardStateFound) { firstTupleInShard = true; @@ -2472,12 +2473,22 @@ CitusSendTupleToPlacements(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest } } - if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) + if (isColocatedIntermediateResult && copyDest->shouldUseLocalCopy && + shardState->containsLocalPlacement) + { + if (firstTupleInShard) + { + CreateLocalColocatedIntermediateFile(copyDest, shardState); + } + + WriteTupleToLocalFile(slot, copyDest, shardId, + shardState->copyOutState, &shardState->fileDest); + } + else if (copyDest->shouldUseLocalCopy && shardState->containsLocalPlacement) { WriteTupleToLocalShard(slot, copyDest, shardId, shardState->copyOutState); } - foreach(placementStateCell, shardState->placementStateList) { CopyPlacementState *currentPlacementState = lfirst(placementStateCell); @@ -2693,6 +2704,8 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) Relation distributedRelation = copyDest->distributedRelation; List *connectionStateList = ConnectionStateList(connectionStateHash); + + FinishLocalColocatedIntermediateFiles(copyDest); FinishLocalCopy(copyDest); PG_TRY(); @@ -2743,6 +2756,61 @@ FinishLocalCopy(CitusCopyDestReceiver *copyDest) } +/* + * CreateLocalColocatedIntermediateFile creates a co-located file for the given + * shard, and appends the binary headers if needed. The function also modifies + * shardState to set the fileDest and copyOutState. + */ +static void +CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest, + CopyShardState *shardState) +{ + /* make sure the directory exists */ + CreateIntermediateResultsDirectory(); + + const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC); + const int fileMode = (S_IRUSR | S_IWUSR); + + StringInfo filePath = makeStringInfo(); + appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix, + shardState->shardId); + + const char *fileName = QueryResultFileName(filePath->data); + shardState->fileDest = + FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags, fileMode)); + + CopyOutState localFileCopyOutState = shardState->copyOutState; + bool isBinaryCopy = localFileCopyOutState->binary; + if (isBinaryCopy) + { + AppendCopyBinaryHeaders(localFileCopyOutState); + } +} + + +/* + * FinishLocalColocatedIntermediateFiles iterates over all the colocated + * intermediate files and finishes the COPY on all of them. + */ +static void +FinishLocalColocatedIntermediateFiles(CitusCopyDestReceiver *copyDest) +{ + HTAB *shardStateHash = copyDest->shardStateHash; + HASH_SEQ_STATUS status; + CopyShardState *copyShardState; + + foreach_htab(copyShardState, &status, shardStateHash) + { + if (copyShardState->copyOutState != NULL && + FILE_IS_OPEN(copyShardState->fileDest.fd)) + { + FinishLocalCopyToFile(copyShardState->copyOutState, + ©ShardState->fileDest); + } + } +} + + /* * ShutdownCopyConnectionState ends the copy command for the current active * placement on connection, and then sends the rest of the buffers over the @@ -3438,8 +3506,8 @@ ConnectionStateListToNode(HTAB *connectionStateHash, const char *hostname, int32 static CopyShardState * GetShardState(uint64 shardId, HTAB *shardStateHash, HTAB *connectionStateHash, bool stopOnFailure, bool *found, bool - shouldUseLocalCopy, CopyOutState copyOutState, bool - isCopyToIntermediateFile) + shouldUseLocalCopy, CopyOutState copyOutState, + bool isColocatedIntermediateResult) { CopyShardState *shardState = (CopyShardState *) hash_search(shardStateHash, &shardId, HASH_ENTER, found); @@ -3447,7 +3515,7 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, { InitializeCopyShardState(shardState, connectionStateHash, shardId, stopOnFailure, shouldUseLocalCopy, - copyOutState, isCopyToIntermediateFile); + copyOutState, isColocatedIntermediateResult); } return shardState; @@ -3462,8 +3530,9 @@ GetShardState(uint64 shardId, HTAB *shardStateHash, static void InitializeCopyShardState(CopyShardState *shardState, HTAB *connectionStateHash, uint64 shardId, - bool stopOnFailure, bool shouldUseLocalCopy, CopyOutState - copyOutState, bool isCopyToIntermediateFile) + bool stopOnFailure, bool shouldUseLocalCopy, + CopyOutState copyOutState, + bool colocatedIntermediateResult) { ListCell *placementCell = NULL; int failedPlacementCount = 0; @@ -3487,7 +3556,7 @@ InitializeCopyShardState(CopyShardState *shardState, shardState->placementStateList = NIL; shardState->copyOutState = NULL; shardState->containsLocalPlacement = ContainsLocalPlacement(shardId); - + shardState->fileDest.fd = -1; foreach(placementCell, activePlacementList) { @@ -3497,22 +3566,17 @@ InitializeCopyShardState(CopyShardState *shardState, { shardState->copyOutState = (CopyOutState) palloc0(sizeof(*copyOutState)); CloneCopyOutStateForLocalCopy(copyOutState, shardState->copyOutState); - LogLocalCopyExecution(shardId); - continue; - } - - if (placement->groupId == GetLocalGroupId()) - { - /* - * if we are copying into an intermediate file we won't use local copy. - * Files are visible to all transactions so we can still use local execution, therefore - * we shouldn't restrict only using connection in this case. - */ - if (!isCopyToIntermediateFile) + if (colocatedIntermediateResult) { - SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); + LogLocalCopyToFileExecution(shardId); } + else + { + LogLocalCopyToRelationExecution(shardId); + } + + continue; } MultiConnection *connection = @@ -3591,11 +3655,11 @@ CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to) /* - * LogLocalCopyExecution logs that the copy will be done locally for - * the given shard. + * LogLocalCopyToRelationExecution logs that the copy will be done + * locally for the given shard. */ static void -LogLocalCopyExecution(uint64 shardId) +LogLocalCopyToRelationExecution(uint64 shardId) { if (!(LogRemoteCommands || LogLocalCommands)) { @@ -3605,6 +3669,22 @@ LogLocalCopyExecution(uint64 shardId) } +/* + * LogLocalCopyToFileExecution logs that the copy will be done locally for + * a file colocated to the given shard. + */ +static void +LogLocalCopyToFileExecution(uint64 shardId) +{ + if (!(LogRemoteCommands || LogLocalCommands)) + { + return; + } + ereport(NOTICE, (errmsg("executing the copy locally for colocated file with " + "shard %lu", shardId))); +} + + /* * CopyGetPlacementConnection assigns a connection to the given placement. If * a connection has already been assigned the placement in the current transaction diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 12695f86b..5d9c8ee27 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -86,7 +86,6 @@ static void RemoteFileDestReceiverStartup(DestReceiver *dest, int operation, TupleDesc inputTupleDescriptor); static void PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest); static StringInfo ConstructCopyResultStatement(const char *resultId); -static void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); static bool RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest); static void BroadcastCopyData(StringInfo dataBuffer, List *connectionList); static void SendCopyDataOverConnection(StringInfo dataBuffer, @@ -438,7 +437,7 @@ RemoteFileDestReceiverReceive(TupleTableSlot *slot, DestReceiver *dest) /* * WriteToLocalResultsFile writes the bytes in a StringInfo to a local file. */ -static void +void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat) { int bytesWritten = FileWriteCompat(fileCompat, copyData->data, diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index af2996bc0..eec8356db 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -37,6 +37,7 @@ #include "distributed/distributed_deadlock_detection.h" #include "distributed/insert_select_executor.h" #include "distributed/intermediate_result_pruning.h" +#include "distributed/local_multi_copy.h" #include "distributed/local_executor.h" #include "distributed/local_distributed_join_planner.h" #include "distributed/locally_reserved_shared_connections.h" @@ -684,6 +685,16 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NoticeIfSubqueryPushdownEnabled, NULL, NULL); + DefineCustomIntVariable( + "citus.local_copy_flush_threshold", + gettext_noop("Sets the threshold for local copy to be flushed."), + NULL, + &LocalCopyFlushThresholdByte, + 512 * 1024, 1, INT_MAX, + PGC_USERSET, + GUC_UNIT_BYTE | GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.local_shared_pool_size", gettext_noop( diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index c1aed58ee..996cb8c97 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -131,8 +131,13 @@ typedef struct CitusCopyDestReceiver /* if true, should copy to local placements in the current session */ bool shouldUseLocalCopy; - /* copy into intermediate result */ - char *intermediateResultIdPrefix; + /* + * Copy into colocated intermediate result. When this is set, the + * COPY assumes there are hypothetical colocated shards to the + * relation that are files. And, COPY writes the data to the + * files as if they are shards. + */ + char *colocatedIntermediateResultIdPrefix; } CitusCopyDestReceiver; diff --git a/src/include/distributed/intermediate_results.h b/src/include/distributed/intermediate_results.h index d7ebf15bf..6e5a7f640 100644 --- a/src/include/distributed/intermediate_results.h +++ b/src/include/distributed/intermediate_results.h @@ -53,6 +53,7 @@ extern DestReceiver * CreateRemoteFileDestReceiver(const char *resultId, EState *executorState, List *initialNodeList, bool writeLocalFile); +extern void WriteToLocalFile(StringInfo copyData, FileCompat *fileCompat); extern uint64 RemoteFileDestReceiverBytesSent(DestReceiver *destReceiver); extern void SendQueryResultViaCopy(const char *resultId); extern void ReceiveQueryResultViaCopy(const char *resultId); diff --git a/src/include/distributed/local_multi_copy.h b/src/include/distributed/local_multi_copy.h index a4e46f015..76fb674bb 100644 --- a/src/include/distributed/local_multi_copy.h +++ b/src/include/distributed/local_multi_copy.h @@ -3,17 +3,24 @@ #define LOCAL_MULTI_COPY /* - * LOCAL_COPY_FLUSH_THRESHOLD is the threshold for local copy to be flushed. + * LocalCopyFlushThresholdByte is the threshold for local copy to be flushed. * There will be one buffer for each local placement, when the buffer size * exceeds this threshold, it will be flushed. + * + * Managed via GUC, the default is 512 kB. */ -#define LOCAL_COPY_FLUSH_THRESHOLD (1 * 512 * 1024) +extern int LocalCopyFlushThresholdByte; extern void WriteTupleToLocalShard(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, int64 shardId, CopyOutState localCopyOutState); +extern void WriteTupleToLocalFile(TupleTableSlot *slot, CitusCopyDestReceiver *copyDest, + int64 shardId, CopyOutState localFileCopyOutState, + FileCompat *fileCompat); extern void FinishLocalCopyToShard(CitusCopyDestReceiver *copyDest, int64 shardId, CopyOutState localCopyOutState); +extern void FinishLocalCopyToFile(CopyOutState localFileCopyOutState, + FileCompat *fileCompat); #endif /* LOCAL_MULTI_COPY */ diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 7f8e03834..46b71ffc1 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -401,6 +401,7 @@ BEGIN; WITH a AS (SELECT count(*) FROM test), b AS (INSERT INTO local VALUES (3,2) RETURNING *), c AS (INSERT INTO ref SELECT *,* FROM generate_series(1,10) RETURNING *), d AS (SELECT count(*) FROM ref JOIN local ON (a = x)) SELECT * FROM a, b, c, d ORDER BY x,y,a,b; NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503000 test WHERE true NOTICE: executing the command locally: SELECT count(*) AS count FROM coordinator_shouldhaveshards.test_1503003 test WHERE true +NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_1503020 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_1503020'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) RETURNING citus_table_alias.a, citus_table_alias.b NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinator_shouldhaveshards.ref_1503020 ref JOIN coordinator_shouldhaveshards.local ON ((ref.a OPERATOR(pg_catalog.=) local.x))) NOTICE: executing the command locally: SELECT a.count, b.x, b.y, c.a, c.b, d.count FROM (SELECT intermediate_result.count FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) a, (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) b, (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_3'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) c, (SELECT intermediate_result.count FROM read_intermediate_result('XXX_4'::text, 'binary'::citus_copy_format) intermediate_result(count bigint)) d ORDER BY b.x, b.y, c.a, c.b @@ -531,7 +532,7 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1 ROLLBACK; RESET citus.enable_cte_inlining; -CREATE table ref_table(x int, y int); +CREATE table ref_table(x int PRIMARY KEY, y int); -- this will be replicated to the coordinator because of add_coordinator test SELECT create_reference_table('ref_table'); create_reference_table @@ -555,6 +556,22 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (coordinat (1 row) ROLLBACK; +-- writing to local file and remote intermediate files +-- at the same time +INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); +NOTICE: executing the copy locally for shard xxxxx +WITH cte_1 AS ( +INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *) +SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT x, y FROM coordinator_shouldhaveshards.ref_table_1503039 ref_table LIMIT 10000 +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.ref_table_1503039 AS citus_table_alias (x, y) SELECT x, y FROM read_intermediate_result('insert_select_XXX_1503039'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer) ON CONFLICT(x) DO UPDATE SET y = (excluded.y OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.x, citus_table_alias.y +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.x, intermediate_result.y FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(x integer, y integer)) cte_1 + count +--------------------------------------------------------------------- + 100 +(1 row) + -- issue #4237: preventing empty placement creation on coordinator CREATE TABLE test_append_table(a int); SELECT create_distributed_table('test_append_table', 'a', 'append'); diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 56ddff6bd..805d69569 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -283,6 +283,7 @@ RETURNING *; -- can be executed locally INSERT INTO distributed_table SELECT * FROM distributed_table WHERE key = 1 OFFSET 0 ON CONFLICT DO NOTHING; NOTICE: executing the command locally: SELECT key, value, age FROM local_shard_execution.distributed_table_1470001 distributed_table WHERE (key OPERATOR(pg_catalog.=) 1) OFFSET 0 +NOTICE: executing the copy locally for colocated file with shard xxxxx NOTICE: executing the command locally: INSERT INTO local_shard_execution.distributed_table_1470001 AS citus_table_alias (key, value, age) SELECT key, value, age FROM read_intermediate_result('insert_select_XXX_1470001'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text, age bigint) ON CONFLICT DO NOTHING INSERT INTO distributed_table SELECT 1, '1',15 FROM distributed_table WHERE key = 2 LIMIT 1 ON CONFLICT DO NOTHING; -- sanity check: multi-shard INSERT..SELECT pushdown goes through distributed execution diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index bdaafa689..68dcbcf97 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -167,6 +167,14 @@ SELECT create_distributed_table('public.another_schema_table', 'a'); (1 row) +CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type); +SELECT create_distributed_table('non_binary_copy_test', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i; -- Confirm the basics work INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); SELECT * FROM test WHERE x = 1; @@ -236,21 +244,21 @@ INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1), (2, 2) RETURNING *; SET citus.log_remote_commands to true; -- observe that there is a conflict and the following query does nothing INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING *; -NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- same as the above with different syntax INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT (part_key) DO NOTHING RETURNING *; -NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT(part_key) DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) -- again the same query with another syntax INSERT INTO upsert_test (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key DO NOTHING RETURNING *; -NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630515 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630515 DO NOTHING RETURNING part_key, other_col, third_col +NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_90630519 AS citus_table_alias (part_key, other_col) VALUES (1, 1) ON CONFLICT ON CONSTRAINT upsert_test_part_key_key_90630519 DO NOTHING RETURNING part_key, other_col, third_col part_key | other_col | third_col --------------------------------------------------------------------- (0 rows) @@ -258,7 +266,7 @@ NOTICE: executing the command locally: INSERT INTO single_node.upsert_test_9063 BEGIN; -- force local execution SELECT count(*) FROM upsert_test WHERE part_key = 1; -NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630515 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1) +NOTICE: executing the command locally: SELECT count(*) AS count FROM single_node.upsert_test_90630519 upsert_test WHERE (part_key OPERATOR(pg_catalog.=) 1) count --------------------------------------------------------------------- 1 @@ -344,10 +352,10 @@ SET search_path TO single_node; DROP SCHEMA "Quoed.Schema" CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table "Quoed.Schema".simple_table_name -drop cascades to table "Quoed.Schema".simple_table_name_90630520 -drop cascades to table "Quoed.Schema".simple_table_name_90630521 -drop cascades to table "Quoed.Schema".simple_table_name_90630522 -drop cascades to table "Quoed.Schema".simple_table_name_90630523 +drop cascades to table "Quoed.Schema".simple_table_name_90630524 +drop cascades to table "Quoed.Schema".simple_table_name_90630525 +drop cascades to table "Quoed.Schema".simple_table_name_90630526 +drop cascades to table "Quoed.Schema".simple_table_name_90630527 -- test partitioned index creation with long name CREATE TABLE test_index_creation1 ( @@ -1591,6 +1599,12 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 7 (1 row) +-- this is to get ready for the next tests +TRUNCATE another_schema_table; +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE -- copy can use local execution even if there is no connection available COPY another_schema_table(a) FROM PROGRAM 'seq 32'; NOTICE: executing the copy locally for shard xxxxx @@ -1601,6 +1615,176 @@ NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 3: "3" NOTICE: executing the copy locally for shard xxxxx CONTEXT: COPY another_schema_table, line 6: "6" +-- INSERT .. SELECT with co-located intermediate results +SET citus.log_remote_commands to false; +CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a); +SET citus.log_local_commands to true; +INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING +INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO UPDATE SET b = (excluded.b OPERATOR(pg_catalog.+) 1) RETURNING citus_table_alias.a, citus_table_alias.b + a | b +--------------------------------------------------------------------- + 1 | + 2 | + 3 | + 4 | + 5 | + 6 | + 7 | + 8 | + 9 | + 10 | +(10 rows) + +-- INSERT .. SELECT with co-located intermediate result for non-binary input +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value) +SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value) SELECT key, value FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.value +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'text'::citus_copy_format) intermediate_result(value single_node.new_type)) cte_1 + count +--------------------------------------------------------------------- + 1001 +(1 row) + +-- test with NULL columns +ALTER TABLE non_binary_copy_test ADD COLUMN z INT; +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630515, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630516, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630517, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (90630518, 'single_node', 'ALTER TABLE non_binary_copy_test ADD COLUMN z INT;') +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) +SELECT bool_and(z is null) FROM cte_1; +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1 + bool_and +--------------------------------------------------------------------- + t +(1 row) + +-- test with type coersion (int -> text) and also NULL values with coersion +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z) +SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1; +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.key, citus_table_alias.z +NOTICE: executing the command locally: SELECT count(DISTINCT (key)::text) AS count, count(DISTINCT (z)::text) AS count FROM (SELECT intermediate_result.key, intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, z integer)) cte_1 + count | count +--------------------------------------------------------------------- + 1001 | 0 +(1 row) + +-- lets flush the copy often to make sure everyhing is fine +SET citus.local_copy_flush_threshold TO 1; +TRUNCATE another_schema_table; +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +NOTICE: executing the command locally: TRUNCATE TABLE single_node.another_schema_table_xxxxx CASCADE +INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i; +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +NOTICE: executing the copy locally for shard xxxxx +WITH cte_1 AS +(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *) +SELECT count(*) FROM cte_1; +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630511 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630512 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630513 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT a, b FROM single_node.another_schema_table_90630514 another_schema_table WHERE true ORDER BY a LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630511 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630511'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630512 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630512'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630513 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630513'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: INSERT INTO single_node.another_schema_table_90630514 AS citus_table_alias (a, b) SELECT a, b FROM read_intermediate_result('insert_select_XXX_90630514'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer) ON CONFLICT(a) DO NOTHING RETURNING citus_table_alias.a, citus_table_alias.b +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.a, intermediate_result.b FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(a integer, b integer)) cte_1 + count +--------------------------------------------------------------------- + 0 +(1 row) + +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) +SELECT bool_and(z is null) FROM cte_1; +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630515 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630516 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630517 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the command locally: SELECT key, value, z FROM single_node.non_binary_copy_test_90630518 non_binary_copy_test WHERE true LIMIT '10000'::bigint +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the copy locally for colocated file with shard xxxxx +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630515 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630515'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630516 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630516'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630517 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630517'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: INSERT INTO single_node.non_binary_copy_test_90630518 AS citus_table_alias (key, value, z) SELECT key, value, z FROM read_intermediate_result('insert_select_XXX_90630518'::text, 'text'::citus_copy_format) intermediate_result(key integer, value single_node.new_type, z integer) ON CONFLICT(key) DO UPDATE SET value = ROW(0, 'citus0'::text)::single_node.new_type RETURNING citus_table_alias.z +NOTICE: executing the command locally: SELECT bool_and((z IS NULL)) AS bool_and FROM (SELECT intermediate_result.z FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(z integer)) cte_1 + bool_and +--------------------------------------------------------------------- + t +(1 row) + +RESET citus.local_copy_flush_threshold; -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 7501be265..8dd0b197c 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -221,7 +221,7 @@ SELECT create_distributed_table('dist_table1', 'a'); ROLLBACK; RESET citus.enable_cte_inlining; -CREATE table ref_table(x int, y int); +CREATE table ref_table(x int PRIMARY KEY, y int); -- this will be replicated to the coordinator because of add_coordinator test SELECT create_reference_table('ref_table'); @@ -233,6 +233,14 @@ INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); SELECT COUNT(*) FROM test JOIN ref_table USING(x); ROLLBACK; +-- writing to local file and remote intermediate files +-- at the same time +INSERT INTO ref_table SELECT *, * FROM generate_series(1, 100); + +WITH cte_1 AS ( +INSERT INTO ref_table SELECT * FROM ref_table LIMIT 10000 ON CONFLICT (x) DO UPDATE SET y = EXCLUDED.y + 1 RETURNING *) +SELECT count(*) FROM cte_1; + -- issue #4237: preventing empty placement creation on coordinator CREATE TABLE test_append_table(a int); SELECT create_distributed_table('test_append_table', 'a', 'append'); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 4d7343668..fb3208c26 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -79,6 +79,10 @@ CREATE TABLE local(c int, d int); CREATE TABLE public.another_schema_table(a int, b int); SELECT create_distributed_table('public.another_schema_table', 'a'); +CREATE TABLE non_binary_copy_test (key int PRIMARY KEY, value new_type); +SELECT create_distributed_table('non_binary_copy_test', 'key'); +INSERT INTO non_binary_copy_test SELECT i, (i, 'citus9.5')::new_type FROM generate_series(0,1000)i; + -- Confirm the basics work INSERT INTO test VALUES (1, 2), (3, 4), (5, 6), (2, 7), (4, 5); SELECT * FROM test WHERE x = 1; @@ -783,7 +787,6 @@ ROLLBACK; \c - - - :master_port SET search_path TO single_node; - -- simulate that even if there is no connection slots -- to connect, Citus can switch to local execution SET citus.force_max_query_parallelization TO false; @@ -817,9 +820,48 @@ ROLLBACK; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; +-- this is to get ready for the next tests +TRUNCATE another_schema_table; + -- copy can use local execution even if there is no connection available COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +-- INSERT .. SELECT with co-located intermediate results +SET citus.log_remote_commands to false; +CREATE UNIQUE INDEX another_schema_table_pk ON another_schema_table(a); + +SET citus.log_local_commands to true; +INSERT INTO another_schema_table SELECT * FROM another_schema_table LIMIT 10000 ON CONFLICT(a) DO NOTHING; +INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10 ON CONFLICT(a) DO UPDATE SET b = EXCLUDED.b + 1 RETURNING *; + +-- INSERT .. SELECT with co-located intermediate result for non-binary input +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING value) +SELECT count(*) FROM cte_1; + +-- test with NULL columns +ALTER TABLE non_binary_copy_test ADD COLUMN z INT; +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) +SELECT bool_and(z is null) FROM cte_1; + +-- test with type coersion (int -> text) and also NULL values with coersion +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING key, z) +SELECT count(DISTINCT key::text), count(DISTINCT z::text) FROM cte_1; + +-- lets flush the copy often to make sure everyhing is fine +SET citus.local_copy_flush_threshold TO 1; +TRUNCATE another_schema_table; +INSERT INTO another_schema_table(a) SELECT i from generate_Series(0,10000)i; +WITH cte_1 AS +(INSERT INTO another_schema_table SELECT * FROM another_schema_table ORDER BY a LIMIT 10000 ON CONFLICT(a) DO NOTHING RETURNING *) +SELECT count(*) FROM cte_1; +WITH cte_1 AS +(INSERT INTO non_binary_copy_test SELECT * FROM non_binary_copy_test LIMIT 10000 ON CONFLICT (key) DO UPDATE SET value = (0, 'citus0')::new_type RETURNING z) +SELECT bool_and(z is null) FROM cte_1; + +RESET citus.local_copy_flush_threshold; -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false;