Merge pull request #1965 from citusdata/fast_jsonb_copy

Skip JSON validation on coordinator during COPY
pull/1988/head
Marco Slot 2018-02-04 14:56:56 +01:00 committed by GitHub
commit 00f9082cd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 343 additions and 23 deletions

View File

@ -14,7 +14,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
7.1-1 7.1-2 7.1-3 7.1-4 \
7.2-1 7.2-2 7.2-3 \
7.3-1
7.3-1 7.3-2
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -188,6 +188,8 @@ $(EXTENSION)--7.2-3.sql: $(EXTENSION)--7.2-2.sql $(EXTENSION)--7.2-2--7.2-3.sql
cat $^ > $@
$(EXTENSION)--7.3-1.sql: $(EXTENSION)--7.2-3.sql $(EXTENSION)--7.2-3--7.3-1.sql
cat $^ > $@
$(EXTENSION)--7.3-2.sql: $(EXTENSION)--7.3-1.sql $(EXTENSION)--7.3-1--7.3-2.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,6 @@
/* citus--7.3-1--7.3-2 */
CREATE FUNCTION pg_catalog.citus_text_send_as_jsonb(text)
RETURNS bytea
LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT
AS 'MODULE_PATHNAME', $$citus_text_send_as_jsonb$$;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.3-1'
default_version = '7.3-2'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -59,9 +59,11 @@
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_shard_transaction.h"
@ -71,6 +73,7 @@
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "executor/executor.h"
#include "libpq/pqformat.h"
#include "nodes/makefuncs.h"
#include "tsearch/ts_locale.h"
#include "utils/builtins.h"
@ -86,10 +89,16 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
/* use a global connection to the master node in order to skip passing it around */
static MultiConnection *masterConnection = NULL;
/* if true, skip validation of JSONB columns during COPY */
bool SkipJsonbValidationInCopy = true;
/* Local functions forward declarations */
static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag);
static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag);
static bool IsCopyInBinaryFormat(CopyStmt *copyStatement);
static List * FindJsonbInputColumns(TupleDesc tupleDescriptor,
List *inputColumnNameList);
static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId);
static char MasterPartitionMethod(RangeVar *relation);
static void RemoveMasterOptions(CopyStmt *copyStatement);
@ -148,6 +157,10 @@ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver);
static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb);
/*
* CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy
* statement to related subfunctions based on where the copy command is run
@ -313,6 +326,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
char partitionMethod = 0;
bool stopOnFailure = false;
bool isInputFormatBinary = IsCopyInBinaryFormat(copyStatement);
CopyState copyState = NULL;
uint64 processedRowCount = 0;
@ -370,17 +384,10 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
dest->rStartup(dest, 0, tupleDescriptor);
/*
* BeginCopyFrom opens all partitions of given partitioned table with relation_open
* and it expects its caller to close those relations. We do not have direct access
* to opened relations, thus we are changing relkind of partitioned tables so that
* Postgres will treat those tables as regular relations and will not open its
* partitions.
*
* We will make this change on copied version of distributed relation to not change
* anything in relcache.
* Below, we change a few fields in the Relation to control the behaviour
* of BeginCopyFrom. However, we obviously should not do this in relcache
* and therefore make a copy of the Relation.
*/
if (PartitionedTable(tableId))
{
copiedDistributedRelation = (Relation) palloc0(sizeof(RelationData));
copiedDistributedRelationTuple = (Form_pg_class) palloc(CLASS_TUPLE_SIZE);
@ -392,16 +399,79 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
memcpy(copiedDistributedRelationTuple, distributedRelation->rd_rel,
CLASS_TUPLE_SIZE);
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple;
copiedDistributedRelation->rd_att = CreateTupleDescCopyConstr(tupleDescriptor);
/*
* BeginCopyFrom opens all partitions of given partitioned table with relation_open
* and it expects its caller to close those relations. We do not have direct access
* to opened relations, thus we are changing relkind of partitioned tables so that
* Postgres will treat those tables as regular relations and will not open its
* partitions.
*/
if (PartitionedTable(tableId))
{
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
}
else
/*
* We make an optimisation to skip JSON parsing for JSONB columns, because many
* Citus users have large objects in this column and parsing it on the coordinator
* causes significant CPU overhead. We do this by forcing BeginCopyFrom and
* NextCopyFrom to parse the column as text and then encoding it as JSON again
* by using citus_text_send_as_jsonb as the binary output function.
*
* The main downside of enabling this optimisation is that it defers validation
* until the object is parsed by the worker, which is unable to give an accurate
* line number.
*/
if (SkipJsonbValidationInCopy && !isInputFormatBinary)
{
CopyOutState copyOutState = copyDest->copyOutState;
List *jsonbColumnIndexList = NIL;
ListCell *jsonbColumnIndexCell = NULL;
/* get the column indices for all JSONB columns that appear in the input */
jsonbColumnIndexList = FindJsonbInputColumns(copiedDistributedRelation->rd_att,
copyStatement->attlist);
foreach(jsonbColumnIndexCell, jsonbColumnIndexList)
{
int columnIndex = lfirst_int(jsonbColumnIndexCell);
Form_pg_attribute currentColumn =
TupleDescAttr(copiedDistributedRelation->rd_att, columnIndex);
if (columnIndex == partitionColumnIndex)
{
/*
* If we are not dealing with partitioned table, copiedDistributedRelation is same
* as distributedRelation.
* In the curious case of using a JSONB column as partition column,
* we leave it as is because we want to make sure the hashing works
* correctly.
*/
copiedDistributedRelation = distributedRelation;
continue;
}
ereport(DEBUG1, (errmsg("parsing JSONB column %s as text",
NameStr(currentColumn->attname))));
/* parse the column as text instead of JSONB */
currentColumn->atttypid = TEXTOID;
if (copyOutState->binary)
{
Oid textSendAsJsonbFunctionId = CitusTextSendAsJsonbFunctionId();
/*
* If we're using binary encoding between coordinator and workers
* then we should honour the format expected by jsonb_recv, which
* is a version number followed by text. We therefore use an output
* function which sends the text as if it were jsonb, namely by
* prepending a version number.
*/
fmgr_info(textSendAsJsonbFunctionId,
&copyDest->columnOutputFunctions[columnIndex]);
}
}
}
/* initialize copy state to read from COPY data source */
@ -480,6 +550,83 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
}
/*
* IsCopyInBinaryFormat determines whether the given COPY statement has the
* WITH (format binary) option.
*/
static bool
IsCopyInBinaryFormat(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
foreach(optionCell, copyStatement->options)
{
DefElem *defel = lfirst_node(DefElem, optionCell);
if (strcmp(defel->defname, "format") == 0 &&
strcmp(defGetString(defel), "binary") == 0)
{
return true;
}
}
return false;
}
/*
* FindJsonbInputColumns finds columns in the tuple descriptor that have
* the JSONB type and appear in inputColumnNameList. If the list is empty then
* all JSONB columns are returned.
*/
static List *
FindJsonbInputColumns(TupleDesc tupleDescriptor, List *inputColumnNameList)
{
List *jsonbColumnIndexList = NIL;
int columnCount = tupleDescriptor->natts;
int columnIndex = 0;
for (columnIndex = 0; columnIndex < columnCount; columnIndex++)
{
Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex);
if (currentColumn->attisdropped)
{
continue;
}
if (currentColumn->atttypid != JSONBOID)
{
continue;
}
if (inputColumnNameList != NIL)
{
ListCell *inputColumnCell = NULL;
bool isInputColumn = false;
foreach(inputColumnCell, inputColumnNameList)
{
char *inputColumnName = strVal(lfirst(inputColumnCell));
if (namestrcmp(&currentColumn->attname, inputColumnName) == 0)
{
isInputColumn = true;
break;
}
}
if (!isInputColumn)
{
continue;
}
}
jsonbColumnIndexList = lappend_int(jsonbColumnIndexList, columnIndex);
}
return jsonbColumnIndexList;
}
/*
* CopyToNewShards implements the COPY table_name FROM ... for append-partitioned
* tables where we create new shards into which to copy rows.
@ -1474,6 +1621,25 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat)
}
/*
* citus_text_send_as_jsonb sends a text as if it was a JSONB. This should only
* be used if the text is indeed valid JSON.
*/
Datum
citus_text_send_as_jsonb(PG_FUNCTION_ARGS)
{
text *inputText = PG_GETARG_TEXT_PP(0);
StringInfoData buf;
int version = 1;
pq_begintypsend(&buf);
pq_sendint(&buf, version, 1);
pq_sendtext(&buf, VARDATA_ANY(inputText), VARSIZE_ANY_EXHDR(inputText));
PG_RETURN_BYTEA_P(pq_endtypsend(&buf));
}
/*
* AppendCopyRowData serializes one row using the column output functions,
* and appends the data to the row output state object's message buffer.

View File

@ -508,6 +508,22 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomBoolVariable(
"citus.skip_jsonb_validation_in_copy",
gettext_noop("Skip validation of JSONB columns on the coordinator during COPY "
"into a distributed table"),
gettext_noop("Parsing large JSON objects may incur significant CPU overhead, "
"which can lower COPY throughput. If this GUC is set (the default), "
"JSON parsing is skipped on the coordinator, which means you cannot "
"see the line number in case of malformed JSON, but throughput will "
"be higher. This setting does not apply if the input format is "
"binary."),
&SkipJsonbValidationInCopy,
true,
PGC_USERSET,
0,
NULL, NULL, NULL);
DefineCustomIntVariable(
"citus.shard_count",
gettext_noop("Sets the number of shards for a new hash-partitioned table"

View File

@ -121,6 +121,7 @@ typedef struct MetadataCacheData
Oid readIntermediateResultFuncId;
Oid extraDataContainerFuncId;
Oid workerHashFunctionId;
Oid textSendAsJsonbFunctionId;
Oid extensionOwner;
Oid binaryCopyFormatId;
Oid textCopyFormatId;
@ -1941,6 +1942,24 @@ CitusWorkerHashFunctionId(void)
}
/* return oid of the citus_text_send_as_jsonb(text) function */
Oid
CitusTextSendAsJsonbFunctionId(void)
{
if (MetadataCache.textSendAsJsonbFunctionId == InvalidOid)
{
List *nameList = list_make2(makeString("pg_catalog"),
makeString("citus_text_send_as_jsonb"));
Oid paramOids[1] = { TEXTOID };
MetadataCache.textSendAsJsonbFunctionId =
LookupFuncName(nameList, 1, paramOids, false);
}
return MetadataCache.textSendAsJsonbFunctionId;
}
/*
* CitusExtensionOwner() returns the owner of the 'citus' extension. That user
* is, amongst others, used to perform actions a normal user might not be

View File

@ -130,6 +130,7 @@ extern Oid CitusCopyFormatTypeId(void);
extern Oid CitusReadIntermediateResultFuncId(void);
extern Oid CitusExtraDataContainerFuncId(void);
extern Oid CitusWorkerHashFunctionId(void);
extern Oid CitusTextSendAsJsonbFunctionId(void);
/* enum oids */
extern Oid PrimaryNodeRoleId(void);

View File

@ -108,6 +108,10 @@ typedef struct CitusCopyDestReceiver
} CitusCopyDestReceiver;
/* GUCs */
extern bool SkipJsonbValidationInCopy;
/* function declarations for copying into a distributed table */
extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId,
List *columnNameList,

View File

@ -812,3 +812,46 @@ CREATE UNLOGGED TABLE trigger_flush AS
SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s;
SELECT create_distributed_table('trigger_flush','a');
ABORT;
-- copy into a table with a JSONB column
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
SELECT create_distributed_table('copy_jsonb', 'key');
-- JSONB from text should work
\COPY copy_jsonb (key, value) FROM STDIN
blue {"r":0,"g":0,"b":255}
green {"r":0,"g":255,"b":0}
\.
SELECT * FROM copy_jsonb ORDER BY key;
-- JSONB from binary should work
\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary)
\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary)
SELECT * FROM copy_jsonb ORDER BY key;
-- JSONB parsing error without validation: no line number
\COPY copy_jsonb (key, value) FROM STDIN
red {"r":255,"g":0,"b":0
\.
TRUNCATE copy_jsonb;
SET citus.skip_jsonb_validation_in_copy TO off;
-- JSONB from text should work
\COPY copy_jsonb (key, value) FROM STDIN
blue {"r":0,"g":0,"b":255}
green {"r":0,"g":255,"b":0}
\.
SELECT * FROM copy_jsonb ORDER BY key;
-- JSONB from binary should work
\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary)
\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary)
SELECT * FROM copy_jsonb ORDER BY key;
-- JSONB parsing error with validation: should see line number
\COPY copy_jsonb (key, value) FROM STDIN
red {"r":255,"g":0,"b":0
\.
DROP TABLE copy_jsonb;

View File

@ -1087,3 +1087,66 @@ NOTICE: Copying data from local table...
(1 row)
ABORT;
-- copy into a table with a JSONB column
CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb);
SELECT create_distributed_table('copy_jsonb', 'key');
create_distributed_table
--------------------------
(1 row)
-- JSONB from text should work
\COPY copy_jsonb (key, value) FROM STDIN
SELECT * FROM copy_jsonb ORDER BY key;
key | value | extra
-------+----------------------------+-------------
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
(2 rows)
-- JSONB from binary should work
\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary)
\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary)
SELECT * FROM copy_jsonb ORDER BY key;
key | value | extra
-------+----------------------------+-------------
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
(4 rows)
-- JSONB parsing error without validation: no line number
\COPY copy_jsonb (key, value) FROM STDIN
ERROR: invalid input syntax for type json
DETAIL: The input string ended unexpectedly.
TRUNCATE copy_jsonb;
SET citus.skip_jsonb_validation_in_copy TO off;
-- JSONB from text should work
\COPY copy_jsonb (key, value) FROM STDIN
SELECT * FROM copy_jsonb ORDER BY key;
key | value | extra
-------+----------------------------+-------------
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
(2 rows)
-- JSONB from binary should work
\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary)
\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary)
SELECT * FROM copy_jsonb ORDER BY key;
key | value | extra
-------+----------------------------+-------------
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
blue | {"b": 255, "g": 0, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
green | {"b": 0, "g": 255, "r": 0} | ["default"]
(4 rows)
-- JSONB parsing error with validation: should see line number
\COPY copy_jsonb (key, value) FROM STDIN
ERROR: invalid input syntax for type json
DETAIL: The input string ended unexpectedly.
CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0
COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"
DROP TABLE copy_jsonb;