mirror of https://github.com/citusdata/citus.git
Remove skip_jsonb_validation_in_copy GUC
parent
1440caeef2
commit
3b7b64a8b6
|
@ -59,11 +59,9 @@
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
#include "commands/copy.h"
|
#include "commands/copy.h"
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
#include "distributed/listutils.h"
|
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
#include "distributed/metadata_cache.h"
|
#include "distributed/metadata_cache.h"
|
||||||
#include "distributed/multi_copy.h"
|
#include "distributed/multi_copy.h"
|
||||||
#include "distributed/multi_executor.h"
|
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_shard_transaction.h"
|
#include "distributed/multi_shard_transaction.h"
|
||||||
|
@ -89,16 +87,10 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||||
/* use a global connection to the master node in order to skip passing it around */
|
/* use a global connection to the master node in order to skip passing it around */
|
||||||
static MultiConnection *masterConnection = NULL;
|
static MultiConnection *masterConnection = NULL;
|
||||||
|
|
||||||
/* if true, skip validation of JSONB columns during COPY */
|
|
||||||
bool SkipJsonbValidationInCopy = true;
|
|
||||||
|
|
||||||
|
|
||||||
/* Local functions forward declarations */
|
/* Local functions forward declarations */
|
||||||
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
|
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
|
||||||
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
|
||||||
static bool IsCopyInBinaryFormat(CopyStmt *copyStatement);
|
|
||||||
static List * FindJsonbInputColumns(TupleDesc tupleDescriptor,
|
|
||||||
List *inputColumnNameList);
|
|
||||||
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
|
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
|
||||||
static char MasterPartitionMethod(RangeVar *relation);
|
static char MasterPartitionMethod(RangeVar *relation);
|
||||||
static void RemoveMasterOptions(CopyStmt *copyStatement);
|
static void RemoveMasterOptions(CopyStmt *copyStatement);
|
||||||
|
@ -326,7 +318,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
|
|
||||||
char partitionMethod = 0;
|
char partitionMethod = 0;
|
||||||
bool stopOnFailure = false;
|
bool stopOnFailure = false;
|
||||||
bool isInputFormatBinary = IsCopyInBinaryFormat(copyStatement);
|
|
||||||
|
|
||||||
CopyState copyState = NULL;
|
CopyState copyState = NULL;
|
||||||
uint64 processedRowCount = 0;
|
uint64 processedRowCount = 0;
|
||||||
|
@ -414,66 +405,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
|
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
* We make an optimisation to skip JSON parsing for JSONB columns, because many
|
|
||||||
* Citus users have large objects in this column and parsing it on the coordinator
|
|
||||||
* causes significant CPU overhead. We do this by forcing BeginCopyFrom and
|
|
||||||
* NextCopyFrom to parse the column as text and then encoding it as JSON again
|
|
||||||
* by using citus_text_send_as_jsonb as the binary output function.
|
|
||||||
*
|
|
||||||
* The main downside of enabling this optimisation is that it defers validation
|
|
||||||
* until the object is parsed by the worker, which is unable to give an accurate
|
|
||||||
* line number.
|
|
||||||
*/
|
|
||||||
if (SkipJsonbValidationInCopy && !isInputFormatBinary)
|
|
||||||
{
|
|
||||||
CopyOutState copyOutState = copyDest->copyOutState;
|
|
||||||
List *jsonbColumnIndexList = NIL;
|
|
||||||
ListCell *jsonbColumnIndexCell = NULL;
|
|
||||||
|
|
||||||
/* get the column indices for all JSONB columns that appear in the input */
|
|
||||||
jsonbColumnIndexList = FindJsonbInputColumns(copiedDistributedRelation->rd_att,
|
|
||||||
copyStatement->attlist);
|
|
||||||
|
|
||||||
foreach(jsonbColumnIndexCell, jsonbColumnIndexList)
|
|
||||||
{
|
|
||||||
int columnIndex = lfirst_int(jsonbColumnIndexCell);
|
|
||||||
Form_pg_attribute currentColumn =
|
|
||||||
TupleDescAttr(copiedDistributedRelation->rd_att, columnIndex);
|
|
||||||
|
|
||||||
if (columnIndex == partitionColumnIndex)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* In the curious case of using a JSONB column as partition column,
|
|
||||||
* we leave it as is because we want to make sure the hashing works
|
|
||||||
* correctly.
|
|
||||||
*/
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
ereport(DEBUG1, (errmsg("parsing JSONB column %s as text",
|
|
||||||
NameStr(currentColumn->attname))));
|
|
||||||
|
|
||||||
/* parse the column as text instead of JSONB */
|
|
||||||
currentColumn->atttypid = TEXTOID;
|
|
||||||
|
|
||||||
if (copyOutState->binary)
|
|
||||||
{
|
|
||||||
Oid textSendAsJsonbFunctionId = CitusTextSendAsJsonbFunctionId();
|
|
||||||
|
|
||||||
/*
|
|
||||||
* If we're using binary encoding between coordinator and workers
|
|
||||||
* then we should honour the format expected by jsonb_recv, which
|
|
||||||
* is a version number followed by text. We therefore use an output
|
|
||||||
* function which sends the text as if it were jsonb, namely by
|
|
||||||
* prepending a version number.
|
|
||||||
*/
|
|
||||||
fmgr_info(textSendAsJsonbFunctionId,
|
|
||||||
©Dest->columnOutputFunctions[columnIndex]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* initialize copy state to read from COPY data source */
|
/* initialize copy state to read from COPY data source */
|
||||||
#if (PG_VERSION_NUM >= 100000)
|
#if (PG_VERSION_NUM >= 100000)
|
||||||
copyState = BeginCopyFrom(NULL,
|
copyState = BeginCopyFrom(NULL,
|
||||||
|
@ -550,83 +481,6 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* IsCopyInBinaryFormat determines whether the given COPY statement has the
|
|
||||||
* WITH (format binary) option.
|
|
||||||
*/
|
|
||||||
static bool
|
|
||||||
IsCopyInBinaryFormat(CopyStmt *copyStatement)
|
|
||||||
{
|
|
||||||
ListCell *optionCell = NULL;
|
|
||||||
|
|
||||||
foreach(optionCell, copyStatement->options)
|
|
||||||
{
|
|
||||||
DefElem *defel = lfirst_node(DefElem, optionCell);
|
|
||||||
if (strcmp(defel->defname, "format") == 0 &&
|
|
||||||
strcmp(defGetString(defel), "binary") == 0)
|
|
||||||
{
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
|
||||||
* FindJsonbInputColumns finds columns in the tuple descriptor that have
|
|
||||||
* the JSONB type and appear in inputColumnNameList. If the list is empty then
|
|
||||||
* all JSONB columns are returned.
|
|
||||||
*/
|
|
||||||
static List *
|
|
||||||
FindJsonbInputColumns(TupleDesc tupleDescriptor, List *inputColumnNameList)
|
|
||||||
{
|
|
||||||
List *jsonbColumnIndexList = NIL;
|
|
||||||
int columnCount = tupleDescriptor->natts;
|
|
||||||
int columnIndex = 0;
|
|
||||||
|
|
||||||
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
|
|
||||||
{
|
|
||||||
Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
|
|
||||||
if (currentColumn->attisdropped)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (currentColumn->atttypid != JSONBOID)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (inputColumnNameList != NIL)
|
|
||||||
{
|
|
||||||
ListCell *inputColumnCell = NULL;
|
|
||||||
bool isInputColumn = false;
|
|
||||||
|
|
||||||
foreach(inputColumnCell, inputColumnNameList)
|
|
||||||
{
|
|
||||||
char *inputColumnName = strVal(lfirst(inputColumnCell));
|
|
||||||
|
|
||||||
if (namestrcmp(¤tColumn->attname, inputColumnName) == 0)
|
|
||||||
{
|
|
||||||
isInputColumn = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!isInputColumn)
|
|
||||||
{
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
jsonbColumnIndexList = lappend_int(jsonbColumnIndexList, columnIndex);
|
|
||||||
}
|
|
||||||
|
|
||||||
return jsonbColumnIndexList;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
|
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
|
||||||
* tables where we create new shards into which to copy rows.
|
* tables where we create new shards into which to copy rows.
|
||||||
|
|
|
@ -508,22 +508,6 @@ RegisterCitusConfigVariables(void)
|
||||||
GUC_NO_SHOW_ALL,
|
GUC_NO_SHOW_ALL,
|
||||||
NULL, NULL, NULL);
|
NULL, NULL, NULL);
|
||||||
|
|
||||||
DefineCustomBoolVariable(
|
|
||||||
"citus.skip_jsonb_validation_in_copy",
|
|
||||||
gettext_noop("Skip validation of JSONB columns on the coordinator during COPY "
|
|
||||||
"into a distributed table"),
|
|
||||||
gettext_noop("Parsing large JSON objects may incur significant CPU overhead, "
|
|
||||||
"which can lower COPY throughput. If this GUC is set (the default), "
|
|
||||||
"JSON parsing is skipped on the coordinator, which means you cannot "
|
|
||||||
"see the line number in case of malformed JSON, but throughput will "
|
|
||||||
"be higher. This setting does not apply if the input format is "
|
|
||||||
"binary."),
|
|
||||||
&SkipJsonbValidationInCopy,
|
|
||||||
false,
|
|
||||||
PGC_USERSET,
|
|
||||||
GUC_NO_SHOW_ALL,
|
|
||||||
NULL, NULL, NULL);
|
|
||||||
|
|
||||||
DefineCustomIntVariable(
|
DefineCustomIntVariable(
|
||||||
"citus.shard_count",
|
"citus.shard_count",
|
||||||
gettext_noop("Sets the number of shards for a new hash-partitioned table"
|
gettext_noop("Sets the number of shards for a new hash-partitioned table"
|
||||||
|
|
|
@ -108,10 +108,6 @@ typedef struct CitusCopyDestReceiver
|
||||||
} CitusCopyDestReceiver;
|
} CitusCopyDestReceiver;
|
||||||
|
|
||||||
|
|
||||||
/* GUCs */
|
|
||||||
extern bool SkipJsonbValidationInCopy;
|
|
||||||
|
|
||||||
|
|
||||||
/* function declarations for copying into a distributed table */
|
/* function declarations for copying into a distributed table */
|
||||||
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
|
||||||
List *columnNameList,
|
List *columnNameList,
|
||||||
|
|
Loading…
Reference in New Issue