From 6f7c3bd73b538b5f8c0b8e4b1906daf5a6d56a6b Mon Sep 17 00:00:00 2001 From: Marco Slot Date: Mon, 22 Jan 2018 13:03:48 +0100 Subject: [PATCH] Skip JSON validation on coordinator during COPY --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--7.3-1--7.3-2.sql | 6 + src/backend/distributed/citus.control | 2 +- src/backend/distributed/commands/multi_copy.c | 208 ++++++++++++++++-- src/backend/distributed/shared_library_init.c | 16 ++ .../distributed/utils/metadata_cache.c | 19 ++ src/include/distributed/metadata_cache.h | 1 + src/include/distributed/multi_copy.h | 4 + src/test/regress/input/multi_copy.source | 43 ++++ src/test/regress/output/multi_copy.source | 63 ++++++ 10 files changed, 343 insertions(+), 23 deletions(-) create mode 100644 src/backend/distributed/citus--7.3-1--7.3-2.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index b7980666e..a1f9bbca9 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -14,7 +14,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \ 7.1-1 7.1-2 7.1-3 7.1-4 \ 7.2-1 7.2-2 7.2-3 \ - 7.3-1 + 7.3-1 7.3-2 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -188,6 +188,8 @@ $(EXTENSION)--7.2-3.sql: $(EXTENSION)--7.2-2.sql $(EXTENSION)--7.2-2--7.2-3.sql cat $^ > $@ $(EXTENSION)--7.3-1.sql: $(EXTENSION)--7.2-3.sql $(EXTENSION)--7.2-3--7.3-1.sql cat $^ > $@ +$(EXTENSION)--7.3-2.sql: $(EXTENSION)--7.3-1.sql $(EXTENSION)--7.3-1--7.3-2.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.3-1--7.3-2.sql b/src/backend/distributed/citus--7.3-1--7.3-2.sql new file mode 100644 index 000000000..a9f8776ff --- /dev/null +++ b/src/backend/distributed/citus--7.3-1--7.3-2.sql @@ -0,0 +1,6 @@ +/* citus--7.3-1--7.3-2 */ + +CREATE FUNCTION pg_catalog.citus_text_send_as_jsonb(text) +RETURNS bytea +LANGUAGE C IMMUTABLE PARALLEL SAFE STRICT +AS 'MODULE_PATHNAME', $$citus_text_send_as_jsonb$$; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 2bc9267b3..43095a349 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.3-1' +default_version = '7.3-2' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 7e0a3a994..69702ebbe 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -59,9 +59,11 @@ #include "catalog/pg_type.h" #include "commands/copy.h" #include "commands/defrem.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_copy.h" +#include "distributed/multi_executor.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_shard_transaction.h" @@ -71,6 +73,7 @@ #include "distributed/shard_pruning.h" #include "distributed/version_compat.h" #include "executor/executor.h" +#include "libpq/pqformat.h" #include "nodes/makefuncs.h" #include "tsearch/ts_locale.h" #include "utils/builtins.h" @@ -86,10 +89,16 @@ static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0"; /* use a global connection to the master node in order to skip passing it around */ static MultiConnection *masterConnection = NULL; +/* if true, skip validation of JSONB columns during COPY */ +bool SkipJsonbValidationInCopy = true; + /* Local functions forward declarations */ static void CopyFromWorkerNode(CopyStmt *copyStatement, char *completionTag); static void CopyToExistingShards(CopyStmt *copyStatement, char *completionTag); +static bool IsCopyInBinaryFormat(CopyStmt *copyStatement); +static List * FindJsonbInputColumns(TupleDesc tupleDescriptor, + List *inputColumnNameList); static void CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId); static char MasterPartitionMethod(RangeVar *relation); static void RemoveMasterOptions(CopyStmt *copyStatement); @@ -148,6 +157,10 @@ static void CitusCopyDestReceiverShutdown(DestReceiver *destReceiver); static void CitusCopyDestReceiverDestroy(DestReceiver *destReceiver); +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb); + + /* * CitusCopyFrom implements the COPY table_name FROM. It dispacthes the copy * statement to related subfunctions based on where the copy command is run @@ -313,6 +326,7 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) char partitionMethod = 0; bool stopOnFailure = false; + bool isInputFormatBinary = IsCopyInBinaryFormat(copyStatement); CopyState copyState = NULL; uint64 processedRowCount = 0; @@ -369,39 +383,95 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) dest = (DestReceiver *) copyDest; dest->rStartup(dest, 0, tupleDescriptor); + /* + * Below, we change a few fields in the Relation to control the behaviour + * of BeginCopyFrom. However, we obviously should not do this in relcache + * and therefore make a copy of the Relation. + */ + copiedDistributedRelation = (Relation) palloc0(sizeof(RelationData)); + copiedDistributedRelationTuple = (Form_pg_class) palloc(CLASS_TUPLE_SIZE); + + /* + * There is no need to deep copy everything. We will just deep copy of the fields + * we will change. + */ + memcpy(copiedDistributedRelation, distributedRelation, sizeof(RelationData)); + memcpy(copiedDistributedRelationTuple, distributedRelation->rd_rel, + CLASS_TUPLE_SIZE); + + copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple; + copiedDistributedRelation->rd_att = CreateTupleDescCopyConstr(tupleDescriptor); + /* * BeginCopyFrom opens all partitions of given partitioned table with relation_open * and it expects its caller to close those relations. We do not have direct access * to opened relations, thus we are changing relkind of partitioned tables so that * Postgres will treat those tables as regular relations and will not open its * partitions. - * - * We will make this change on copied version of distributed relation to not change - * anything in relcache. */ if (PartitionedTable(tableId)) { - copiedDistributedRelation = (Relation) palloc0(sizeof(RelationData)); - copiedDistributedRelationTuple = (Form_pg_class) palloc(CLASS_TUPLE_SIZE); - - /* - * There is no need to deep copy everything. We will just deep copy of the fields - * we will change. - */ - memcpy(copiedDistributedRelation, distributedRelation, sizeof(RelationData)); - memcpy(copiedDistributedRelationTuple, distributedRelation->rd_rel, - CLASS_TUPLE_SIZE); - copiedDistributedRelationTuple->relkind = RELKIND_RELATION; - copiedDistributedRelation->rd_rel = copiedDistributedRelationTuple; } - else + + /* + * We make an optimisation to skip JSON parsing for JSONB columns, because many + * Citus users have large objects in this column and parsing it on the coordinator + * causes significant CPU overhead. We do this by forcing BeginCopyFrom and + * NextCopyFrom to parse the column as text and then encoding it as JSON again + * by using citus_text_send_as_jsonb as the binary output function. + * + * The main downside of enabling this optimisation is that it defers validation + * until the object is parsed by the worker, which is unable to give an accurate + * line number. + */ + if (SkipJsonbValidationInCopy && !isInputFormatBinary) { - /* - * If we are not dealing with partitioned table, copiedDistributedRelation is same - * as distributedRelation. - */ - copiedDistributedRelation = distributedRelation; + CopyOutState copyOutState = copyDest->copyOutState; + List *jsonbColumnIndexList = NIL; + ListCell *jsonbColumnIndexCell = NULL; + + /* get the column indices for all JSONB columns that appear in the input */ + jsonbColumnIndexList = FindJsonbInputColumns(copiedDistributedRelation->rd_att, + copyStatement->attlist); + + foreach(jsonbColumnIndexCell, jsonbColumnIndexList) + { + int columnIndex = lfirst_int(jsonbColumnIndexCell); + Form_pg_attribute currentColumn = + TupleDescAttr(copiedDistributedRelation->rd_att, columnIndex); + + if (columnIndex == partitionColumnIndex) + { + /* + * In the curious case of using a JSONB column as partition column, + * we leave it as is because we want to make sure the hashing works + * correctly. + */ + continue; + } + + ereport(DEBUG1, (errmsg("parsing JSONB column %s as text", + NameStr(currentColumn->attname)))); + + /* parse the column as text instead of JSONB */ + currentColumn->atttypid = TEXTOID; + + if (copyOutState->binary) + { + Oid textSendAsJsonbFunctionId = CitusTextSendAsJsonbFunctionId(); + + /* + * If we're using binary encoding between coordinator and workers + * then we should honour the format expected by jsonb_recv, which + * is a version number followed by text. We therefore use an output + * function which sends the text as if it were jsonb, namely by + * prepending a version number. + */ + fmgr_info(textSendAsJsonbFunctionId, + ©Dest->columnOutputFunctions[columnIndex]); + } + } } /* initialize copy state to read from COPY data source */ @@ -480,6 +550,83 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag) } +/* + * IsCopyInBinaryFormat determines whether the given COPY statement has the + * WITH (format binary) option. + */ +static bool +IsCopyInBinaryFormat(CopyStmt *copyStatement) +{ + ListCell *optionCell = NULL; + + foreach(optionCell, copyStatement->options) + { + DefElem *defel = lfirst_node(DefElem, optionCell); + if (strcmp(defel->defname, "format") == 0 && + strcmp(defGetString(defel), "binary") == 0) + { + return true; + } + } + + return false; +} + + +/* + * FindJsonbInputColumns finds columns in the tuple descriptor that have + * the JSONB type and appear in inputColumnNameList. If the list is empty then + * all JSONB columns are returned. + */ +static List * +FindJsonbInputColumns(TupleDesc tupleDescriptor, List *inputColumnNameList) +{ + List *jsonbColumnIndexList = NIL; + int columnCount = tupleDescriptor->natts; + int columnIndex = 0; + + for (columnIndex = 0; columnIndex < columnCount; columnIndex++) + { + Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex); + if (currentColumn->attisdropped) + { + continue; + } + + if (currentColumn->atttypid != JSONBOID) + { + continue; + } + + if (inputColumnNameList != NIL) + { + ListCell *inputColumnCell = NULL; + bool isInputColumn = false; + + foreach(inputColumnCell, inputColumnNameList) + { + char *inputColumnName = strVal(lfirst(inputColumnCell)); + + if (namestrcmp(¤tColumn->attname, inputColumnName) == 0) + { + isInputColumn = true; + break; + } + } + + if (!isInputColumn) + { + continue; + } + } + + jsonbColumnIndexList = lappend_int(jsonbColumnIndexList, columnIndex); + } + + return jsonbColumnIndexList; +} + + /* * CopyToNewShards implements the COPY table_name FROM ... for append-partitioned * tables where we create new shards into which to copy rows. @@ -1474,6 +1621,25 @@ ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat) } +/* + * citus_text_send_as_jsonb sends a text as if it was a JSONB. This should only + * be used if the text is indeed valid JSON. + */ +Datum +citus_text_send_as_jsonb(PG_FUNCTION_ARGS) +{ + text *inputText = PG_GETARG_TEXT_PP(0); + StringInfoData buf; + int version = 1; + + pq_begintypsend(&buf); + pq_sendint(&buf, version, 1); + pq_sendtext(&buf, VARDATA_ANY(inputText), VARSIZE_ANY_EXHDR(inputText)); + + PG_RETURN_BYTEA_P(pq_endtypsend(&buf)); +} + + /* * AppendCopyRowData serializes one row using the column output functions, * and appends the data to the row output state object's message buffer. diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index fe852ff1b..3be790b4c 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -508,6 +508,22 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomBoolVariable( + "citus.skip_jsonb_validation_in_copy", + gettext_noop("Skip validation of JSONB columns on the coordinator during COPY " + "into a distributed table"), + gettext_noop("Parsing large JSON objects may incur significant CPU overhead, " + "which can lower COPY throughput. If this GUC is set (the default), " + "JSON parsing is skipped on the coordinator, which means you cannot " + "see the line number in case of malformed JSON, but throughput will " + "be higher. This setting does not apply if the input format is " + "binary."), + &SkipJsonbValidationInCopy, + true, + PGC_USERSET, + 0, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.shard_count", gettext_noop("Sets the number of shards for a new hash-partitioned table" diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index bc0043a20..e77a96510 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -121,6 +121,7 @@ typedef struct MetadataCacheData Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; Oid workerHashFunctionId; + Oid textSendAsJsonbFunctionId; Oid extensionOwner; Oid binaryCopyFormatId; Oid textCopyFormatId; @@ -1941,6 +1942,24 @@ CitusWorkerHashFunctionId(void) } +/* return oid of the citus_text_send_as_jsonb(text) function */ +Oid +CitusTextSendAsJsonbFunctionId(void) +{ + if (MetadataCache.textSendAsJsonbFunctionId == InvalidOid) + { + List *nameList = list_make2(makeString("pg_catalog"), + makeString("citus_text_send_as_jsonb")); + Oid paramOids[1] = { TEXTOID }; + + MetadataCache.textSendAsJsonbFunctionId = + LookupFuncName(nameList, 1, paramOids, false); + } + + return MetadataCache.textSendAsJsonbFunctionId; +} + + /* * CitusExtensionOwner() returns the owner of the 'citus' extension. That user * is, amongst others, used to perform actions a normal user might not be diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 29350bbe7..e5dacfc7a 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -130,6 +130,7 @@ extern Oid CitusCopyFormatTypeId(void); extern Oid CitusReadIntermediateResultFuncId(void); extern Oid CitusExtraDataContainerFuncId(void); extern Oid CitusWorkerHashFunctionId(void); +extern Oid CitusTextSendAsJsonbFunctionId(void); /* enum oids */ extern Oid PrimaryNodeRoleId(void); diff --git a/src/include/distributed/multi_copy.h b/src/include/distributed/multi_copy.h index 5269a61f1..fa05b2518 100644 --- a/src/include/distributed/multi_copy.h +++ b/src/include/distributed/multi_copy.h @@ -108,6 +108,10 @@ typedef struct CitusCopyDestReceiver } CitusCopyDestReceiver; +/* GUCs */ +extern bool SkipJsonbValidationInCopy; + + /* function declarations for copying into a distributed table */ extern CitusCopyDestReceiver * CreateCitusCopyDestReceiver(Oid relationId, List *columnNameList, diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 093757713..b5fe7dda5 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -812,3 +812,46 @@ CREATE UNLOGGED TABLE trigger_flush AS SELECT 1 AS a, s AS b, s AS c, s AS d, s AS e, s AS f, s AS g, s AS h FROM generate_series(1,150000) s; SELECT create_distributed_table('trigger_flush','a'); ABORT; + +-- copy into a table with a JSONB column +CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb); +SELECT create_distributed_table('copy_jsonb', 'key'); + +-- JSONB from text should work +\COPY copy_jsonb (key, value) FROM STDIN +blue {"r":0,"g":0,"b":255} +green {"r":0,"g":255,"b":0} +\. +SELECT * FROM copy_jsonb ORDER BY key; + +-- JSONB from binary should work +\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary) +\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary) +SELECT * FROM copy_jsonb ORDER BY key; + +-- JSONB parsing error without validation: no line number +\COPY copy_jsonb (key, value) FROM STDIN +red {"r":255,"g":0,"b":0 +\. + +TRUNCATE copy_jsonb; +SET citus.skip_jsonb_validation_in_copy TO off; + +-- JSONB from text should work +\COPY copy_jsonb (key, value) FROM STDIN +blue {"r":0,"g":0,"b":255} +green {"r":0,"g":255,"b":0} +\. +SELECT * FROM copy_jsonb ORDER BY key; + +-- JSONB from binary should work +\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary) +\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary) +SELECT * FROM copy_jsonb ORDER BY key; + +-- JSONB parsing error with validation: should see line number +\COPY copy_jsonb (key, value) FROM STDIN +red {"r":255,"g":0,"b":0 +\. + +DROP TABLE copy_jsonb; diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 24bef3af1..a03aa7ebc 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -1087,3 +1087,66 @@ NOTICE: Copying data from local table... (1 row) ABORT; +-- copy into a table with a JSONB column +CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb); +SELECT create_distributed_table('copy_jsonb', 'key'); + create_distributed_table +-------------------------- + +(1 row) + +-- JSONB from text should work +\COPY copy_jsonb (key, value) FROM STDIN +SELECT * FROM copy_jsonb ORDER BY key; + key | value | extra +-------+----------------------------+------------- + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] +(2 rows) + +-- JSONB from binary should work +\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary) +\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary) +SELECT * FROM copy_jsonb ORDER BY key; + key | value | extra +-------+----------------------------+------------- + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] +(4 rows) + +-- JSONB parsing error without validation: no line number +\COPY copy_jsonb (key, value) FROM STDIN +ERROR: invalid input syntax for type json +DETAIL: The input string ended unexpectedly. +TRUNCATE copy_jsonb; +SET citus.skip_jsonb_validation_in_copy TO off; +-- JSONB from text should work +\COPY copy_jsonb (key, value) FROM STDIN +SELECT * FROM copy_jsonb ORDER BY key; + key | value | extra +-------+----------------------------+------------- + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] +(2 rows) + +-- JSONB from binary should work +\COPY copy_jsonb TO '/tmp/copy_jsonb.pgcopy' WITH (format binary) +\COPY copy_jsonb FROM '/tmp/copy_jsonb.pgcopy' WITH (format binary) +SELECT * FROM copy_jsonb ORDER BY key; + key | value | extra +-------+----------------------------+------------- + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + blue | {"b": 255, "g": 0, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] + green | {"b": 0, "g": 255, "r": 0} | ["default"] +(4 rows) + +-- JSONB parsing error with validation: should see line number +\COPY copy_jsonb (key, value) FROM STDIN +ERROR: invalid input syntax for type json +DETAIL: The input string ended unexpectedly. +CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0 +COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0" +DROP TABLE copy_jsonb;