Support use of binary protocol in between nodes (#3877)

This can save a lot of data to be sent in some cases, thus improving
performance for which inter query bandwidth is the bottleneck.
There's some issues with enabling this as default, so that's currently not done.
pull/3909/head
Jelte Fennema 2020-06-12 15:02:51 +02:00 committed by GitHub
parent da8f2b0134
commit 0e12d045b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1025 additions and 133 deletions

View File

@ -217,6 +217,7 @@ static void OpenCopyConnectionsForNewShards(CopyStmt *copyStatement,
bool useBinaryCopyFormat);
static List * RemoveOptionFromList(List *optionList, char *optionName);
static bool BinaryOutputFunctionDefined(Oid typeId);
static bool BinaryInputFunctionDefined(Oid typeId);
static void SendCopyBinaryHeaders(CopyOutState copyOutState, int64 shardId,
List *connectionList);
static void SendCopyBinaryFooters(CopyOutState copyOutState, int64 shardId,
@ -952,6 +953,11 @@ CanUseBinaryCopyFormatForType(Oid typeId)
return false;
}
if (!BinaryInputFunctionDefined(typeId))
{
return false;
}
if (typeId >= FirstNormalObjectId)
{
char typeCategory = '\0';
@ -986,12 +992,28 @@ BinaryOutputFunctionDefined(Oid typeId)
get_type_io_data(typeId, IOFunc_send, &typeLength, &typeByVal,
&typeAlign, &typeDelim, &typeIoParam, &typeFunctionId);
if (OidIsValid(typeFunctionId))
{
return true;
}
return OidIsValid(typeFunctionId);
}
return false;
/*
* BinaryInputFunctionDefined checks whether binary output function is defined
* for the given type.
*/
static bool
BinaryInputFunctionDefined(Oid typeId)
{
Oid typeFunctionId = InvalidOid;
Oid typeIoParam = InvalidOid;
int16 typeLength = 0;
bool typeByVal = false;
char typeAlign = 0;
char typeDelim = 0;
get_type_io_data(typeId, IOFunc_receive, &typeLength, &typeByVal,
&typeAlign, &typeDelim, &typeIoParam, &typeFunctionId);
return OidIsValid(typeFunctionId);
}

View File

@ -474,7 +474,7 @@ ExecuteOptionalRemoteCommand(MultiConnection *connection, const char *command,
int
SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues)
const char *const *parameterValues, bool binaryResults)
{
PGconn *pgConn = connection->pgConn;
@ -492,7 +492,7 @@ SendRemoteCommandParams(MultiConnection *connection, const char *command,
Assert(PQisnonblocking(pgConn));
int rc = PQsendQueryParams(pgConn, command, parameterCount, parameterTypes,
parameterValues, NULL, NULL, 0);
parameterValues, NULL, NULL, binaryResults ? 1 : 0);
return rc;
}

View File

@ -129,6 +129,7 @@
#include "access/transam.h"
#include "access/xact.h"
#include "access/htup_details.h"
#include "catalog/pg_type.h"
#include "commands/dbcommands.h"
#include "commands/schemacmds.h"
@ -137,6 +138,7 @@
#include "distributed/citus_custom_scan.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/connection_management.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/distributed_execution_locks.h"
#include "distributed/listutils.h"
@ -163,9 +165,11 @@
#include "portability/instr_time.h"
#include "storage/fd.h"
#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "utils/timestamp.h"
#define SLOW_START_DISABLED 0
@ -277,12 +281,13 @@ typedef struct DistributedExecution
* The following fields are used while receiving results from remote nodes.
* We store this information here to avoid re-allocating it every time.
*
* columnArray field is reset/calculated per row, so might be useless for other
* contexts. The benefit of keeping it here is to avoid allocating the array
* over and over again.
* columnArray field is reset/calculated per row, so might be useless for
* other contexts. The benefit of keeping it here is to avoid allocating
* the array over and over again.
*/
uint32 allocatedColumnCount;
char **columnArray;
void **columnArray;
StringInfoData *stringInfoDataArray;
/*
* jobIdList contains all jobs in the job tree, this is used to
@ -437,6 +442,7 @@ struct TaskPlacementExecution;
/* GUC, determining whether Citus opens 1 connection per task */
bool ForceMaxQueryParallelization = false;
int MaxAdaptiveExecutorPoolSize = 16;
bool EnableBinaryProtocol = true;
/* GUC, number of ms to wait between opening connections to the same worker */
int ExecutorSlowStartInterval = 10;
@ -478,6 +484,10 @@ typedef struct ShardCommandExecution
/* cached AttInMetadata for task */
AttInMetadata **attributeInputMetadata;
/* indicates whether the attributeInputMetadata has binary or text
* encoding/decoding functions */
bool binaryResults;
/* order in which the command should be replicated on replicas */
PlacementExecutionOrder executionOrder;
@ -632,6 +642,10 @@ static int RebuildWaitEventSet(DistributedExecution *execution);
static void ProcessWaitEvents(DistributedExecution *execution, WaitEvent *events, int
eventCount, bool *cancellationReceived);
static long MillisecondsBetweenTimestamps(instr_time startTime, instr_time endTime);
static HeapTuple BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values);
static AttInMetadata * TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc);
static void SetAttributeInputMetadata(DistributedExecution *execution,
ShardCommandExecution *shardCommandExecution);
/*
* AdaptiveExecutorPreExecutorRun gets called right before postgres starts its executor
@ -1089,7 +1103,28 @@ CreateDistributedExecution(RowModifyLevel modLevel, List *taskList,
* allocate for. We start with 16, and reallocate when we need more.
*/
execution->allocatedColumnCount = 16;
execution->columnArray = palloc0(execution->allocatedColumnCount * sizeof(char *));
execution->columnArray = palloc0(execution->allocatedColumnCount * sizeof(void *));
if (EnableBinaryProtocol)
{
/*
* Initialize enough StringInfos for each column. These StringInfos
* (and thus the backing buffers) will be reused for each row.
* We will reference these StringInfos in the columnArray if the value
* is not NULL.
*
* NOTE: StringInfos are always grown in the memory context in which
* they were initially created. So appending in any memory context will
* result in bufferes that are still valid after removing that memory
* context.
*/
execution->stringInfoDataArray = palloc0(
execution->allocatedColumnCount *
sizeof(StringInfoData));
for (int i = 0; i < execution->allocatedColumnCount; i++)
{
initStringInfo(&execution->stringInfoDataArray[i]);
}
}
if (ShouldExecuteTasksLocally(taskList))
{
@ -1735,23 +1770,7 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
sizeof(TaskPlacementExecution *));
shardCommandExecution->placementExecutionCount = placementExecutionCount;
TupleDestination *tupleDest = task->tupleDest ?
task->tupleDest :
execution->defaultTupleDest;
uint32 queryCount = task->queryCount;
shardCommandExecution->attributeInputMetadata = palloc0(queryCount *
sizeof(AttInMetadata *));
for (uint32 queryIndex = 0; queryIndex < queryCount; queryIndex++)
{
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest,
queryIndex);
shardCommandExecution->attributeInputMetadata[queryIndex] =
tupleDescriptor ?
TupleDescGetAttInMetadata(tupleDescriptor) :
NULL;
}
SetAttributeInputMetadata(execution, shardCommandExecution);
ShardPlacement *taskPlacement = NULL;
foreach_ptr(taskPlacement, task->taskPlacementList)
{
@ -1895,6 +1914,53 @@ AssignTasksToConnectionsOrWorkerPool(DistributedExecution *execution)
}
/*
* SetAttributeInputMetadata sets attributeInputMetadata in
* shardCommandExecution for all the queries that are part of its task.
* This contains the deserialization functions for the tuples that will be
* received. It also sets binaryResults when applicable.
*/
static void
SetAttributeInputMetadata(DistributedExecution *execution,
ShardCommandExecution *shardCommandExecution)
{
TupleDestination *tupleDest = shardCommandExecution->task->tupleDest ?
shardCommandExecution->task->tupleDest :
execution->defaultTupleDest;
uint32 queryCount = shardCommandExecution->task->queryCount;
shardCommandExecution->attributeInputMetadata = palloc0(queryCount *
sizeof(AttInMetadata *));
for (uint32 queryIndex = 0; queryIndex < queryCount; queryIndex++)
{
AttInMetadata *attInMetadata = NULL;
TupleDesc tupleDescriptor = tupleDest->tupleDescForQuery(tupleDest,
queryIndex);
if (tupleDescriptor == NULL)
{
attInMetadata = NULL;
}
/*
* We only allow binary results when queryCount is 1, because we
* cannot use binary results with SendRemoteCommand. Which must be
* used if queryCount is larger than 1.
*/
else if (EnableBinaryProtocol && queryCount == 1 &&
CanUseBinaryCopyFormat(tupleDescriptor))
{
attInMetadata = TupleDescGetAttBinaryInMetadata(tupleDescriptor);
shardCommandExecution->binaryResults = true;
}
else
{
attInMetadata = TupleDescGetAttInMetadata(tupleDescriptor);
}
shardCommandExecution->attributeInputMetadata[queryIndex] = attInMetadata;
}
}
/*
* UseConnectionPerPlacement returns whether we should use a separate connection
* per placement even if another connection is idle. We mostly use this in testing
@ -3382,6 +3448,7 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
MultiConnection *connection = session->connection;
ShardCommandExecution *shardCommandExecution =
placementExecution->shardCommandExecution;
bool binaryResults = shardCommandExecution->binaryResults;
Task *task = shardCommandExecution->task;
ShardPlacement *taskPlacement = placementExecution->shardPlacement;
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
@ -3428,11 +3495,32 @@ StartPlacementExecutionOnSession(TaskPlacementExecution *placementExecution,
ExtractParametersForRemoteExecution(paramListInfo, &parameterTypes,
&parameterValues);
querySent = SendRemoteCommandParams(connection, queryString, parameterCount,
parameterTypes, parameterValues);
parameterTypes, parameterValues,
binaryResults);
}
else
{
querySent = SendRemoteCommand(connection, queryString);
/*
* We only need to use SendRemoteCommandParams when we desire
* binaryResults. One downside of SendRemoteCommandParams is that it
* only supports one query in the query string. In some cases we have
* more than one query. In those cases we already make sure before that
* binaryResults is false.
*
* XXX: It also seems that SendRemoteCommandParams does something
* strange/incorrectly with select statements. In
* isolation_select_vs_all.spec, when doing an s1-router-select in one
* session blocked an s2-ddl-create-index-concurrently in another.
*/
if (!binaryResults)
{
querySent = SendRemoteCommand(connection, queryString);
}
else
{
querySent = SendRemoteCommandParams(connection, queryString, 0, NULL, NULL,
binaryResults);
}
}
if (querySent == 0)
@ -3478,11 +3566,11 @@ ReceiveResults(WorkerSession *session, bool storeRows)
* into tuple. The context is reseted on every row, thus we create it at the
* start of the loop and reset on every iteration.
*/
MemoryContext ioContext = AllocSetContextCreate(CurrentMemoryContext,
"IoContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
MemoryContext rowContext = AllocSetContextCreate(CurrentMemoryContext,
"RowContext",
ALLOCSET_DEFAULT_MINSIZE,
ALLOCSET_DEFAULT_INITSIZE,
ALLOCSET_DEFAULT_MAXSIZE);
while (!PQisBusy(connection->pgConn))
{
@ -3567,16 +3655,48 @@ ReceiveResults(WorkerSession *session, bool storeRows)
if (columnCount > execution->allocatedColumnCount)
{
pfree(execution->columnArray);
int oldColumnCount = execution->allocatedColumnCount;
execution->allocatedColumnCount = columnCount;
execution->columnArray = palloc0(execution->allocatedColumnCount *
sizeof(char *));
sizeof(void *));
if (EnableBinaryProtocol)
{
/*
* Using repalloc here, to not throw away any previously
* created StringInfos.
*/
execution->stringInfoDataArray = repalloc(
execution->stringInfoDataArray,
execution->allocatedColumnCount *
sizeof(StringInfoData));
for (int i = oldColumnCount; i < columnCount; i++)
{
initStringInfo(&execution->stringInfoDataArray[i]);
}
}
}
char **columnArray = execution->columnArray;
void **columnArray = execution->columnArray;
StringInfoData *stringInfoDataArray = execution->stringInfoDataArray;
bool binaryResults = shardCommandExecution->binaryResults;
/*
* stringInfoDataArray is NULL when EnableBinaryProtocol is false. So
* we make sure binaryResults is also false in that case. Otherwise we
* cannot store them anywhere.
*/
Assert(EnableBinaryProtocol || !binaryResults);
for (uint32 rowIndex = 0; rowIndex < rowsProcessed; rowIndex++)
{
memset(columnArray, 0, columnCount * sizeof(char *));
/*
* Switch to a temporary memory context that we reset after each
* tuple. This protects us from any memory leaks that might be
* present in anything we do to parse a tuple.
*/
MemoryContext oldContext = MemoryContextSwitchTo(rowContext);
memset(columnArray, 0, columnCount * sizeof(void *));
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
@ -3586,34 +3706,55 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
else
{
columnArray[columnIndex] = PQgetvalue(result, rowIndex, columnIndex);
int valueLength = PQgetlength(result, rowIndex, columnIndex);
char *value = PQgetvalue(result, rowIndex, columnIndex);
if (binaryResults)
{
if (PQfformat(result, columnIndex) == 0)
{
ereport(ERROR, (errmsg("unexpected text result")));
}
resetStringInfo(&stringInfoDataArray[columnIndex]);
appendBinaryStringInfo(&stringInfoDataArray[columnIndex],
value, valueLength);
columnArray[columnIndex] = &stringInfoDataArray[columnIndex];
}
else
{
if (PQfformat(result, columnIndex) == 1)
{
ereport(ERROR, (errmsg("unexpected binary result")));
}
columnArray[columnIndex] = value;
}
if (SubPlanLevel > 0 && executionStats != NULL)
{
executionStats->totalIntermediateResultSize += PQgetlength(result,
rowIndex,
columnIndex);
executionStats->totalIntermediateResultSize += valueLength;
}
}
}
/*
* Switch to a temporary memory context that we reset after each tuple. This
* protects us from any memory leaks that might be present in I/O functions
* called by BuildTupleFromCStrings.
*/
MemoryContext oldContextPerRow = MemoryContextSwitchTo(ioContext);
AttInMetadata *attInMetadata =
shardCommandExecution->attributeInputMetadata[queryIndex];
HeapTuple heapTuple = BuildTupleFromCStrings(attInMetadata, columnArray);
HeapTuple heapTuple;
if (binaryResults)
{
heapTuple = BuildTupleFromBytes(attInMetadata,
(fmStringInfo *) columnArray);
}
else
{
heapTuple = BuildTupleFromCStrings(attInMetadata,
(char **) columnArray);
}
MemoryContextSwitchTo(oldContextPerRow);
MemoryContextSwitchTo(oldContext);
tupleDest->putTuple(tupleDest, task,
placementExecution->placementExecutionIndex, queryIndex,
heapTuple);
MemoryContextReset(ioContext);
MemoryContextReset(rowContext);
execution->rowsProcessed++;
}
@ -3627,12 +3768,126 @@ ReceiveResults(WorkerSession *session, bool storeRows)
}
/* the context is local to the function, so not needed anymore */
MemoryContextDelete(ioContext);
MemoryContextDelete(rowContext);
return fetchDone;
}
/*
* TupleDescGetAttBinaryInMetadata - Build an AttInMetadata structure based on
* the supplied TupleDesc. AttInMetadata can be used in conjunction with
* fmStringInfos containing binary encoded types to produce a properly formed
* tuple.
*
* NOTE: This function is a copy of the PG function TupleDescGetAttInMetadata,
* except that it uses getTypeBinaryInputInfo instead of getTypeInputInfo.
*/
static AttInMetadata *
TupleDescGetAttBinaryInMetadata(TupleDesc tupdesc)
{
int natts = tupdesc->natts;
int i;
Oid atttypeid;
Oid attinfuncid;
AttInMetadata *attinmeta = (AttInMetadata *) palloc(sizeof(AttInMetadata));
/* "Bless" the tupledesc so that we can make rowtype datums with it */
attinmeta->tupdesc = BlessTupleDesc(tupdesc);
/*
* Gather info needed later to call the "in" function for each attribute
*/
FmgrInfo *attinfuncinfo = (FmgrInfo *) palloc0(natts * sizeof(FmgrInfo));
Oid *attioparams = (Oid *) palloc0(natts * sizeof(Oid));
int32 *atttypmods = (int32 *) palloc0(natts * sizeof(int32));
for (i = 0; i < natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupdesc, i);
/* Ignore dropped attributes */
if (!att->attisdropped)
{
atttypeid = att->atttypid;
getTypeBinaryInputInfo(atttypeid, &attinfuncid, &attioparams[i]);
fmgr_info(attinfuncid, &attinfuncinfo[i]);
atttypmods[i] = att->atttypmod;
}
}
attinmeta->attinfuncs = attinfuncinfo;
attinmeta->attioparams = attioparams;
attinmeta->atttypmods = atttypmods;
return attinmeta;
}
/*
* BuildTupleFromBytes - build a HeapTuple given user data in binary form.
* values is an array of StringInfos, one for each attribute of the return
* tuple. A NULL StringInfo pointer indicates we want to create a NULL field.
*
* NOTE: This function is a copy of the PG function BuildTupleFromCStrings,
* except that it uses ReceiveFunctionCall instead of InputFunctionCall.
*/
static HeapTuple
BuildTupleFromBytes(AttInMetadata *attinmeta, fmStringInfo *values)
{
TupleDesc tupdesc = attinmeta->tupdesc;
int natts = tupdesc->natts;
int i;
Datum *dvalues = (Datum *) palloc(natts * sizeof(Datum));
bool *nulls = (bool *) palloc(natts * sizeof(bool));
/*
* Call the "in" function for each non-dropped attribute, even for nulls,
* to support domains.
*/
for (i = 0; i < natts; i++)
{
if (!TupleDescAttr(tupdesc, i)->attisdropped)
{
/* Non-dropped attributes */
dvalues[i] = ReceiveFunctionCall(&attinmeta->attinfuncs[i],
values[i],
attinmeta->attioparams[i],
attinmeta->atttypmods[i]);
if (values[i] != NULL)
{
nulls[i] = false;
}
else
{
nulls[i] = true;
}
}
else
{
/* Handle dropped attributes by setting to NULL */
dvalues[i] = (Datum) 0;
nulls[i] = true;
}
}
/*
* Form a tuple
*/
HeapTuple tuple = heap_form_tuple(tupdesc, dvalues, nulls);
/*
* Release locally palloc'd space. XXX would probably be good to pfree
* values of pass-by-reference datums, as well.
*/
pfree(dvalues);
pfree(nulls);
return tuple;
}
/*
* WorkerPoolFailed marks a worker pool and all the placement executions scheduled
* on it as failed.

View File

@ -217,7 +217,7 @@ WrapTasksForPartitioning(const char *resultIdPrefix, List *selectTaskList,
{
StringInfo wrappedQuery = makeStringInfo();
appendStringInfo(wrappedQuery,
"SELECT %u, partition_index"
"SELECT %u::int, partition_index"
", %s || '_' || partition_index::text "
", rows_written "
"FROM worker_partition_query_result"
@ -334,7 +334,7 @@ ExecutePartitionTaskList(List *taskList, CitusTableCacheEntry *targetRelation)
#endif
TupleDescInitEntry(resultDescriptor, (AttrNumber) 1, "node_id",
INT8OID, -1, 0);
INT4OID, -1, 0);
TupleDescInitEntry(resultDescriptor, (AttrNumber) 2, "partition_index",
INT4OID, -1, 0);
TupleDescInitEntry(resultDescriptor, (AttrNumber) 3, "result_id",

View File

@ -166,7 +166,7 @@ CreateRemoteRestorePoints(char *restoreName, List *connectionList)
{
int querySent = SendRemoteCommandParams(connection, CREATE_RESTORE_POINT_COMMAND,
parameterCount, parameterTypes,
parameterValues);
parameterValues, false);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);

View File

@ -543,6 +543,17 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.enable_binary_protocol",
gettext_noop(
"Enables communication between nodes using binary protocol when possible"),
NULL,
&EnableBinaryProtocol,
false,
PGC_USERSET,
GUC_STANDARD,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.override_table_visibility",
gettext_noop("Enables replacing occurencens of pg_catalog.pg_table_visible() "

View File

@ -504,7 +504,7 @@ SendCommandToWorkersParamsInternal(TargetWorkerSet targetWorkerSet, const char *
foreach_ptr(connection, connectionList)
{
int querySent = SendRemoteCommandParams(connection, command, parameterCount,
parameterTypes, parameterValues);
parameterTypes, parameterValues, false);
if (querySent == 0)
{
ReportConnectionError(connection, ERROR);

View File

@ -6,6 +6,7 @@
/* GUC, determining whether Citus opens 1 connection per task */
extern bool ForceMaxQueryParallelization;
extern int MaxAdaptiveExecutorPoolSize;
extern bool EnableBinaryProtocol;
/* GUC, number of ms to wait between opening connections to the same worker */
extern int ExecutorSlowStartInterval;

View File

@ -48,7 +48,8 @@ extern int ExecuteOptionalRemoteCommand(MultiConnection *connection,
extern int SendRemoteCommand(MultiConnection *connection, const char *command);
extern int SendRemoteCommandParams(MultiConnection *connection, const char *command,
int parameterCount, const Oid *parameterTypes,
const char *const *parameterValues);
const char *const *parameterValues,
bool binaryResults);
extern List * ReadFirstColumnAsText(PGresult *queryResult);
extern PGresult * GetRemoteCommandResult(MultiConnection *connection,
bool raiseInterrupts);

View File

@ -123,3 +123,6 @@ s/Citus.*currently supports/Citus currently supports/g
# Warnings in multi_explain
s/prepared transaction with identifier .* does not exist/prepared transaction with identifier "citus_x_yyyyyy_zzz_w" does not exist/g
s/failed to roll back prepared transaction '.*'/failed to roll back prepared transaction 'citus_x_yyyyyy_zzz_w'/g
# Errors with binary decoding where OIDs should be normalized
s/wrong data type: [0-9]+, expected [0-9]+/wrong data type: XXXX, expected XXXX/g

View File

@ -0,0 +1,169 @@
SET citus.shard_count = 4;
SET citus.next_shard_id TO 4754000;
CREATE SCHEMA binary_protocol;
SET search_path TO binary_protocol;
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE t(id int);
SELECT create_distributed_table('t', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO t (SELECT i FROM generate_series(1, 10) i);
SELECT * FROM t ORDER BY id;
id
---------------------------------------------------------------------
1
2
3
4
5
6
7
8
9
10
(10 rows)
-- Select more than 16 columns to trigger growing of columns
SELECT id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id
FROM t ORDER BY id;
id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id | id
---------------------------------------------------------------------
1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1 | 1
2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2 | 2
3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3 | 3
4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4 | 4
5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5 | 5
6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6 | 6
7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7 | 7
8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8 | 8
9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9 | 9
10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10 | 10
(10 rows)
INSERT INTO t SELECT count(*) from t;
INSERT INTO t (SELECT id+1 from t);
SELECT * FROM t ORDER BY id;
id
---------------------------------------------------------------------
1
2
2
3
3
4
4
5
5
6
6
7
7
8
8
9
9
10
10
10
11
11
(22 rows)
CREATE TYPE composite_type AS (
i integer,
i2 integer
);
CREATE TABLE composite_type_table
(
id bigserial,
col composite_type[]
);
SELECT create_distributed_table('composite_type_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]);
SELECT * FROM composite_type_table;
id | col
---------------------------------------------------------------------
1 | {"(1,2)"}
(1 row)
CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
CREATE TABLE nested_composite_type_table
(
id bigserial,
col nested_composite_type
);
SELECT create_distributed_table('nested_composite_type_table', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type);
SELECT * FROM nested_composite_type_table;
id | col
---------------------------------------------------------------------
1 | ("(1,2)","(3,4)")
(1 row)
CREATE TABLE binaryless_builtin (
col1 aclitem NOT NULL,
col2 character varying(255) NOT NULL
);
SELECT create_reference_table('binaryless_builtin');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT * FROM binaryless_builtin;
col1 | col2
---------------------------------------------------------------------
postgres=r/postgres | test
(1 row)
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id int, val1 bigint);
SELECT create_distributed_table('test_table_1', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('test_table_2', 'id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
INSERT INTO test_table_1 VALUES(1,1),(2,4),(3,3);
INSERT INTO test_table_2 VALUES(1,1),(3,3),(4,5);
SELECT id, val1
FROM test_table_1 LEFT JOIN test_table_2 USING(id, val1)
ORDER BY 1, 2;
id | val1
---------------------------------------------------------------------
1 | 1
2 | 4
3 | 3
(3 rows)
\set VERBOSITY terse
DROP SCHEMA binary_protocol CASCADE;
NOTICE: drop cascades to 8 other objects

File diff suppressed because it is too large Load Diff

View File

@ -387,7 +387,7 @@ restore_isolation_tester_func
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-select s2-coordinator-create-index-concurrently s1-commit-worker s1-stop-connection
starting permutation: s1-start-session-level-connection s1-begin-on-worker s1-disable-binary-protocol-on-worker s1-select s2-coordinator-create-index-concurrently s1-commit-worker s1-stop-connection
step s1-start-session-level-connection:
SELECT start_session_level_connection_to_node('localhost', 57637);
@ -400,6 +400,13 @@ step s1-begin-on-worker:
run_commands_on_session_level_connection_to_node
step s1-disable-binary-protocol-on-worker:
-- Workaround router-select blocking blocking create-index-concurrently
SELECT run_commands_on_session_level_connection_to_node('SET citus.enable_binary_protocol TO false');
run_commands_on_session_level_connection_to_node
step s1-select:
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM select_table');

View File

@ -232,6 +232,17 @@ DEBUG: pushing down the function call
(S,S)
(1 row)
-- This is currently an undetected failure when using the binary protocol
-- It should not be enabled by default until this is resolved. The tests above
-- will fail too, when changing the default to TRUE;
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
ERROR: wrong data type: XXXX, expected XXXX
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
DEBUG: pushing down the function call
ERROR: wrong data type: XXXX, expected XXXX
RESET citus.enable_binary_protocol;
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);

View File

@ -5,6 +5,9 @@
// create range distributed table to test behavior of SELECT in concurrent operations
setup
{
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE select_append(id integer, data text, int_data int);
SELECT create_distributed_table('select_append', 'id', 'append');
@ -14,12 +17,18 @@ setup
teardown
{
DROP TABLE IF EXISTS select_append CASCADE;
SELECT citus_internal.restore_isolation_tester_func();
}
// session 1
session "s1"
step "s1-initialize" { COPY select_append FROM PROGRAM 'echo 0, a, 0 && echo 1, b, 1 && echo 2, c, 2 && echo 3, d, 3 && echo 4, e, 4' WITH CSV; }
step "s1-begin" { BEGIN; }
step "s1-disable-binary-protocol" {
-- Workaround router-select blocking blocking create-index-concurrently
SET citus.enable_binary_protocol TO false;
}
step "s1-router-select" { SELECT * FROM select_append WHERE id = 1; }
step "s1-real-time-select" { SELECT * FROM select_append ORDER BY 1, 2; }
step "s1-task-tracker-select"
@ -96,7 +105,7 @@ permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-truncate" "s1-comm
permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-drop" "s1-commit" "s1-select-count"
permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-ddl-create-index" "s1-commit" "s1-select-count" "s1-show-indexes"
permutation "s1-initialize" "s1-ddl-create-index" "s1-begin" "s1-router-select" "s2-ddl-drop-index" "s1-commit" "s1-select-count" "s1-show-indexes"
permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-ddl-create-index-concurrently" "s1-commit" "s1-select-count" "s1-show-indexes"
permutation "s1-initialize" "s1-begin" "s1-disable-binary-protocol" "s1-router-select" "s2-ddl-create-index-concurrently" "s1-commit" "s1-select-count" "s1-show-indexes"
permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-ddl-add-column" "s1-commit" "s1-select-count" "s1-show-columns"
permutation "s1-initialize" "s1-ddl-add-column" "s1-begin" "s1-router-select" "s2-ddl-drop-column" "s1-commit" "s1-select-count" "s1-show-columns"
permutation "s1-initialize" "s1-begin" "s1-router-select" "s2-ddl-rename-column" "s1-commit" "s1-select-count" "s1-show-columns"

View File

@ -29,6 +29,11 @@ step "s1-begin-on-worker"
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
}
step "s1-disable-binary-protocol-on-worker" {
-- Workaround router-select blocking blocking create-index-concurrently
SELECT run_commands_on_session_level_connection_to_node('SET citus.enable_binary_protocol TO false');
}
step "s1-select"
{
SELECT run_commands_on_session_level_connection_to_node('SELECT * FROM select_table');
@ -135,4 +140,4 @@ permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-copy" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection" "s3-select-count"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select" "s2-begin" "s2-index" "s1-commit-worker" "s2-commit" "s1-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-select-for-update" "s1-commit-worker" "s2-commit-worker" "s1-stop-connection" "s2-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-select" "s2-coordinator-create-index-concurrently" "s1-commit-worker" "s1-stop-connection"
permutation "s1-start-session-level-connection" "s1-begin-on-worker" "s1-disable-binary-protocol-on-worker" "s1-select" "s2-coordinator-create-index-concurrently" "s1-commit-worker" "s1-stop-connection"

View File

@ -0,0 +1,84 @@
SET citus.shard_count = 4;
SET citus.next_shard_id TO 4754000;
CREATE SCHEMA binary_protocol;
SET search_path TO binary_protocol;
SET citus.enable_binary_protocol = TRUE;
CREATE TABLE t(id int);
SELECT create_distributed_table('t', 'id');
INSERT INTO t (SELECT i FROM generate_series(1, 10) i);
SELECT * FROM t ORDER BY id;
-- Select more than 16 columns to trigger growing of columns
SELECT id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id,
id, id, id, id, id
FROM t ORDER BY id;
INSERT INTO t SELECT count(*) from t;
INSERT INTO t (SELECT id+1 from t);
SELECT * FROM t ORDER BY id;
CREATE TYPE composite_type AS (
i integer,
i2 integer
);
CREATE TABLE composite_type_table
(
id bigserial,
col composite_type[]
);
SELECT create_distributed_table('composite_type_table', 'id');
INSERT INTO composite_type_table(col) VALUES (ARRAY[(1, 2)::composite_type]);
SELECT * FROM composite_type_table;
CREATE TYPE nested_composite_type AS (
a composite_type,
b composite_type
);
CREATE TABLE nested_composite_type_table
(
id bigserial,
col nested_composite_type
);
SELECT create_distributed_table('nested_composite_type_table', 'id');
INSERT INTO nested_composite_type_table(col) VALUES (((1, 2), (3,4))::nested_composite_type);
SELECT * FROM nested_composite_type_table;
CREATE TABLE binaryless_builtin (
col1 aclitem NOT NULL,
col2 character varying(255) NOT NULL
);
SELECT create_reference_table('binaryless_builtin');
INSERT INTO binaryless_builtin VALUES ('user postgres=r/postgres', 'test');
SELECT * FROM binaryless_builtin;
CREATE TABLE test_table_1(id int, val1 int);
CREATE TABLE test_table_2(id int, val1 bigint);
SELECT create_distributed_table('test_table_1', 'id');
SELECT create_distributed_table('test_table_2', 'id');
INSERT INTO test_table_1 VALUES(1,1),(2,4),(3,3);
INSERT INTO test_table_2 VALUES(1,1),(3,3),(4,5);
SELECT id, val1
FROM test_table_1 LEFT JOIN test_table_2 USING(id, val1)
ORDER BY 1, 2;
\set VERBOSITY terse
DROP SCHEMA binary_protocol CASCADE;

View File

@ -104,6 +104,16 @@ select squares(4);
select multi_mx_function_call_delegation.mx_call_func(2, 0);
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
-- This is currently an undetected failure when using the binary protocol
-- It should not be enabled by default until this is resolved. The tests above
-- will fail too, when changing the default to TRUE;
SET citus.enable_binary_protocol = TRUE;
select mx_call_func_custom_types('S', 'A');
select multi_mx_function_call_delegation.mx_call_func_custom_types('S', 'A');
RESET citus.enable_binary_protocol;
-- We don't allow distributing calls inside transactions
begin;
select mx_call_func(2, 0);