diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index dda2aa34d..b610cc3b4 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -80,26 +80,18 @@ static bool DistributedTableSizeOnWorker(WorkerNode *workerNode, Oid relationId, SizeQueryType sizeQueryType, bool failOnError, uint64 *tableSize); static List * ShardIntervalsOnWorkerGroup(WorkerNode *workerNode, Oid relationId); -static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool - useShardMinMaxQuery); +static char * GenerateShardStatisticsQueryForShardList(List *shardIntervalList); static char * GetWorkerPartitionedSizeUDFNameBySizeQueryType(SizeQueryType sizeQueryType); static char * GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType); static char * GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, - List *citusTableIds, bool - useShardMinMaxQuery); -static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, - bool useShardMinMaxQuery); + List *citusTableIds); +static List * GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds); static void ErrorIfNotSuitableToGetSize(Oid relationId); static List * OpenConnectionToNodes(List *workerNodeList); static void ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static void AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId, - ShardInterval * - shardInterval, char *shardName, - char *quotedShardName); -static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, - char *quotedShardName); +static void AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval); static HeapTuple CreateDiskSpaceTuple(TupleDesc tupleDesc, uint64 availableBytes, uint64 totalBytes); @@ -245,11 +237,8 @@ citus_shard_sizes(PG_FUNCTION_ARGS) /* we don't need a distributed transaction here */ bool useDistributedTransaction = false; - /* we only want the shard sizes here so useShardMinMaxQuery parameter is false */ - bool useShardMinMaxQuery = false; - List *connectionList = SendShardStatisticsQueriesInParallel(allCitusTableIds, - useDistributedTransaction, - useShardMinMaxQuery); + List *connectionList = + SendShardStatisticsQueriesInParallel(allCitusTableIds, useDistributedTransaction); TupleDesc tupleDescriptor = NULL; Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); @@ -342,15 +331,12 @@ citus_relation_size(PG_FUNCTION_ARGS) * to available nodes. It returns the connection list. */ List * -SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction, - bool - useShardMinMaxQuery) +SendShardStatisticsQueriesInParallel(List *citusTableIds, bool useDistributedTransaction) { List *workerNodeList = ActivePrimaryNodeList(NoLock); List *shardSizesQueryList = GenerateShardStatisticsQueryList(workerNodeList, - citusTableIds, - useShardMinMaxQuery); + citusTableIds); List *connectionList = OpenConnectionToNodes(workerNodeList); FinishConnectionListEstablishment(connectionList); @@ -415,20 +401,18 @@ OpenConnectionToNodes(List *workerNodeList) /* * GenerateShardStatisticsQueryList generates a query per node that will return: - * - all shard_name, shard_size pairs from the node (if includeShardMinMax is false) - * - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples from the node (if true) + * shard_id, shard_name, shard_size for all shard placements on the node */ static List * -GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds, bool - useShardMinMaxQuery) +GenerateShardStatisticsQueryList(List *workerNodeList, List *citusTableIds) { List *shardStatisticsQueryList = NIL; WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { - char *shardStatisticsQuery = GenerateAllShardStatisticsQueryForNode(workerNode, - citusTableIds, - useShardMinMaxQuery); + char *shardStatisticsQuery = + GenerateAllShardStatisticsQueryForNode(workerNode, citusTableIds); + shardStatisticsQueryList = lappend(shardStatisticsQueryList, shardStatisticsQuery); } @@ -479,12 +463,13 @@ ReceiveShardNameAndSizeResults(List *connectionList, Tuplestorestate *tupleStore memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); - char *tableName = PQgetvalue(result, rowIndex, 0); + /* format is [0] shard id, [1] shard name, [2] size */ + char *tableName = PQgetvalue(result, rowIndex, 1); Datum resultStringDatum = CStringGetDatum(tableName); Datum textDatum = DirectFunctionCall1(textin, resultStringDatum); values[0] = textDatum; - values[1] = ParseIntField(result, rowIndex, 1); + values[1] = ParseIntField(result, rowIndex, 2); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } @@ -858,12 +843,10 @@ GetSizeQueryBySizeQueryType(SizeQueryType sizeQueryType) /* * GenerateAllShardStatisticsQueryForNode generates a query that returns: - * - all shard_name, shard_size pairs for the given node (if useShardMinMaxQuery is false) - * - all shard_id, shard_minvalue, shard_maxvalue, shard_size quartuples (if true) + * shard_id, shard_name, shard_size for all shard placements on the node */ static char * -GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds, bool - useShardMinMaxQuery) +GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableIds) { StringInfo allShardStatisticsQuery = makeStringInfo(); @@ -881,61 +864,32 @@ GenerateAllShardStatisticsQueryForNode(WorkerNode *workerNode, List *citusTableI List *shardIntervalsOnNode = ShardIntervalsOnWorkerGroup(workerNode, relationId); char *shardStatisticsQuery = - GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode, - useShardMinMaxQuery); + GenerateShardStatisticsQueryForShardList(shardIntervalsOnNode); appendStringInfoString(allShardStatisticsQuery, shardStatisticsQuery); relation_close(relation, AccessShareLock); } } /* Add a dummy entry so that UNION ALL doesn't complain */ - if (useShardMinMaxQuery) - { - /* 0 for shard_id, NULL for min, NULL for text, 0 for shard_size */ - appendStringInfo(allShardStatisticsQuery, - "SELECT 0::bigint, NULL::text, NULL::text, 0::bigint;"); - } - else - { - /* NULL for shard_name, 0 for shard_size */ - appendStringInfo(allShardStatisticsQuery, "SELECT NULL::text, 0::bigint;"); - } + appendStringInfo(allShardStatisticsQuery, "SELECT 0::bigint, NULL::text, 0::bigint;"); + return allShardStatisticsQuery->data; } /* - * GenerateShardStatisticsQueryForShardList generates one of the two types of queries: - * - SELECT shard_name - shard_size (if useShardMinMaxQuery is false) - * - SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size (if true) + * GenerateShardStatisticsQueryForShardList generates a query that returns: + * SELECT shard_id, shard_name, shard_size for all shards in the list */ static char * -GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool - useShardMinMaxQuery) +GenerateShardStatisticsQueryForShardList(List *shardIntervalList) { StringInfo selectQuery = makeStringInfo(); ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, shardIntervalList) { - uint64 shardId = shardInterval->shardId; - Oid schemaId = get_rel_namespace(shardInterval->relationId); - char *schemaName = get_namespace_name(schemaId); - char *shardName = get_rel_name(shardInterval->relationId); - AppendShardIdToName(&shardName, shardId); - - char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); - char *quotedShardName = quote_literal_cstr(shardQualifiedName); - - if (useShardMinMaxQuery) - { - AppendShardSizeMinMaxQuery(selectQuery, shardId, shardInterval, shardName, - quotedShardName); - } - else - { - AppendShardSizeQuery(selectQuery, shardInterval, quotedShardName); - } + AppendShardSizeQuery(selectQuery, shardInterval); appendStringInfo(selectQuery, " UNION ALL "); } @@ -943,50 +897,25 @@ GenerateShardStatisticsQueryForShardList(List *shardIntervalList, bool } -/* - * AppendShardSizeMinMaxQuery appends a query in the following form to selectQuery - * SELECT shard_id, shard_minvalue, shard_maxvalue, shard_size - */ -static void -AppendShardSizeMinMaxQuery(StringInfo selectQuery, uint64 shardId, - ShardInterval *shardInterval, char *shardName, - char *quotedShardName) -{ - if (IsCitusTableType(shardInterval->relationId, APPEND_DISTRIBUTED)) - { - /* fill in the partition column name */ - const uint32 unusedTableId = 1; - Var *partitionColumn = PartitionColumn(shardInterval->relationId, - unusedTableId); - char *partitionColumnName = get_attname(shardInterval->relationId, - partitionColumn->varattno, false); - appendStringInfo(selectQuery, - "SELECT " UINT64_FORMAT - " AS shard_id, min(%s)::text AS shard_minvalue, max(%s)::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size FROM %s ", - shardId, partitionColumnName, - partitionColumnName, - quotedShardName, shardName); - } - else - { - /* we don't need to update min/max for non-append distributed tables because they don't change */ - appendStringInfo(selectQuery, - "SELECT " UINT64_FORMAT - " AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size(%s) AS shard_size ", - shardId, quotedShardName); - } -} - - /* * AppendShardSizeQuery appends a query in the following form to selectQuery - * SELECT shard_name, shard_size + * SELECT shard_id, shard_name, shard_size */ static void -AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval, - char *quotedShardName) +AppendShardSizeQuery(StringInfo selectQuery, ShardInterval *shardInterval) { - appendStringInfo(selectQuery, "SELECT %s AS shard_name, ", quotedShardName); + uint64 shardId = shardInterval->shardId; + Oid schemaId = get_rel_namespace(shardInterval->relationId); + char *schemaName = get_namespace_name(schemaId); + char *shardName = get_rel_name(shardInterval->relationId); + + AppendShardIdToName(&shardName, shardId); + + char *shardQualifiedName = quote_qualified_identifier(schemaName, shardName); + char *quotedShardName = quote_literal_cstr(shardQualifiedName); + + appendStringInfo(selectQuery, "SELECT " UINT64_FORMAT " AS shard_id, ", shardId); + appendStringInfo(selectQuery, "%s AS shard_name, ", quotedShardName); appendStringInfo(selectQuery, PG_RELATION_SIZE_FUNCTION, quotedShardName); } diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index 9a7937d89..3cbccaf79 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -65,16 +65,13 @@ /* Local functions forward declarations */ static List * RelationShardListForShardCreate(ShardInterval *shardInterval); static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, - const char *shardName, uint64 *shardSize, - text **shardMinValue, text **shardMaxValue); + const char *shardName, uint64 *shardSize); static void UpdateTableStatistics(Oid relationId); -static void ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList); -static void UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid - relationId, List *shardPlacementList, uint64 - shardSize, text *shardMinValue, - text *shardMaxValue); +static void ReceiveAndUpdateShardsSizes(List *connectionList); +static void UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, + Oid relationId, List *shardPlacementList, + uint64 shardSize); static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, - text **shardMinValue, text **shardMaxValue, uint64 *shardSize); /* exports for SQL callable functions */ @@ -707,8 +704,6 @@ UpdateShardStatistics(int64 shardId) Oid relationId = shardInterval->relationId; bool statsOK = false; uint64 shardSize = 0; - text *minValue = NULL; - text *maxValue = NULL; /* Build shard qualified name. */ char *shardName = get_rel_name(relationId); @@ -726,7 +721,7 @@ UpdateShardStatistics(int64 shardId) foreach_ptr(placement, shardPlacementList) { statsOK = WorkerShardStats(placement, relationId, shardQualifiedName, - &shardSize, &minValue, &maxValue); + &shardSize); if (statsOK) { break; @@ -747,8 +742,9 @@ UpdateShardStatistics(int64 shardId) errdetail("Setting shard statistics to NULL"))); } - UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, shardPlacementList, - shardSize, minValue, maxValue); + UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList, + shardSize); + return shardSize; } @@ -766,24 +762,20 @@ UpdateTableStatistics(Oid relationId) /* we want to use a distributed transaction here to detect distributed deadlocks */ bool useDistributedTransaction = true; - /* we also want shard min/max values for append distributed tables */ - bool useShardMinMaxQuery = true; + List *connectionList = + SendShardStatisticsQueriesInParallel(citusTableIds, useDistributedTransaction); - List *connectionList = SendShardStatisticsQueriesInParallel(citusTableIds, - useDistributedTransaction, - useShardMinMaxQuery); - - ReceiveAndUpdateShardsSizeAndMinMax(connectionList); + ReceiveAndUpdateShardsSizes(connectionList); } /* - * ReceiveAndUpdateShardsSizeAndMinMax receives shard id, size - * and min max results from the given connection list, and updates - * respective entries in pg_dist_placement and pg_dist_shard + * ReceiveAndUpdateShardsSizes receives shard id and size + * results from the given connection list, and updates + * respective entries in pg_dist_placement. */ static void -ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) +ReceiveAndUpdateShardsSizes(List *connectionList) { /* * From the connection list, we will not get all the shards, but @@ -812,7 +804,7 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) int64 colCount = PQnfields(result); /* Although it is not expected */ - if (colCount != UPDATE_SHARD_STATISTICS_COLUMN_COUNT) + if (colCount != SHARD_SIZES_COLUMN_COUNT) { ereport(WARNING, (errmsg("unexpected number of columns from " "citus_update_table_statistics"))); @@ -822,12 +814,9 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) for (int64 rowIndex = 0; rowIndex < rowCount; rowIndex++) { uint64 shardId = 0; - text *shardMinValue = NULL; - text *shardMaxValue = NULL; uint64 shardSize = 0; - if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardMinValue, - &shardMaxValue, &shardSize)) + if (!ProcessShardStatisticsRow(result, rowIndex, &shardId, &shardSize)) { /* this row has no valid shard statistics */ continue; @@ -845,9 +834,8 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) Oid relationId = shardInterval->relationId; List *shardPlacementList = ActiveShardPlacementList(shardId); - UpdateShardSizeAndMinMax(shardId, shardInterval, relationId, - shardPlacementList, shardSize, shardMinValue, - shardMaxValue); + UpdateShardSize(shardId, shardInterval, relationId, shardPlacementList, + shardSize); } PQclear(result); ForgetResults(connection); @@ -860,10 +848,13 @@ ReceiveAndUpdateShardsSizeAndMinMax(List *connectionList) * ProcessShardStatisticsRow processes a row of shard statistics of the input PGresult * - it returns true if this row belongs to a valid shard * - it returns false if this row has no valid shard statistics (shardId = INVALID_SHARD_ID) + * + * Input tuples are assumed to be of the form: + * (shard_id bigint, shard_name text, shard_size bigint) */ static bool ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, - text **shardMinValue, text **shardMaxValue, uint64 *shardSize) + uint64 *shardSize) { *shardId = ParseIntField(result, rowIndex, 0); @@ -874,28 +865,19 @@ ProcessShardStatisticsRow(PGresult *result, int64 rowIndex, uint64 *shardId, return false; } - char *minValueResult = PQgetvalue(result, rowIndex, 1); - char *maxValueResult = PQgetvalue(result, rowIndex, 2); - *shardMinValue = cstring_to_text(minValueResult); - *shardMaxValue = cstring_to_text(maxValueResult); - *shardSize = ParseIntField(result, rowIndex, 3); + *shardSize = ParseIntField(result, rowIndex, 2); return true; } /* - * UpdateShardSizeAndMinMax updates the shardlength (shard size) of the given - * shard and its placements in pg_dist_placement, and updates the shard min value - * and shard max value of the given shard in pg_dist_shard if the relationId belongs - * to an append-distributed table + * UpdateShardSize updates the shardlength (shard size) of the given + * shard and its placements in pg_dist_placement. */ static void -UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relationId, - List *shardPlacementList, uint64 shardSize, text *shardMinValue, - text *shardMaxValue) +UpdateShardSize(uint64 shardId, ShardInterval *shardInterval, Oid relationId, + List *shardPlacementList, uint64 shardSize) { - char storageType = shardInterval->storageType; - ShardPlacement *placement = NULL; /* update metadata for each shard placement */ @@ -906,16 +888,7 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat DeleteShardPlacementRow(placementId); InsertShardPlacementRow(shardId, placementId, SHARD_STATE_ACTIVE, - shardSize, - groupId); - } - - /* only update shard min/max values for append-partitioned tables */ - if (IsCitusTableType(relationId, APPEND_DISTRIBUTED)) - { - DeleteShardRow(shardId); - InsertShardRow(relationId, shardId, storageType, shardMinValue, - shardMaxValue); + shardSize, groupId); } } @@ -926,17 +899,10 @@ UpdateShardSizeAndMinMax(uint64 shardId, ShardInterval *shardInterval, Oid relat */ static bool WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardName, - uint64 *shardSize, text **shardMinValue, text **shardMaxValue) + uint64 *shardSize) { StringInfo tableSizeQuery = makeStringInfo(); - - const uint32 unusedTableId = 1; - StringInfo partitionValueQuery = makeStringInfo(); - PGresult *queryResult = NULL; - const int minValueIndex = 0; - const int maxValueIndex = 1; - char *tableSizeStringEnd = NULL; int connectionFlags = 0; @@ -951,8 +917,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam Assert(connection != NULL); *shardSize = 0; - *shardMinValue = NULL; - *shardMaxValue = NULL; char *quotedShardName = quote_literal_cstr(shardName); appendStringInfo(tableSizeQuery, SHARD_TABLE_SIZE_QUERY, quotedShardName); @@ -986,40 +950,6 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam PQclear(queryResult); ForgetResults(connection); - if (!IsCitusTableType(relationId, APPEND_DISTRIBUTED)) - { - /* we don't need min/max for non-append distributed tables */ - return true; - } - - /* fill in the partition column name and shard name in the query. */ - Var *partitionColumn = PartitionColumn(relationId, unusedTableId); - char *partitionColumnName = get_attname(relationId, partitionColumn->varattno, false); - appendStringInfo(partitionValueQuery, SHARD_RANGE_QUERY, - partitionColumnName, partitionColumnName, shardName); - - executeCommand = ExecuteOptionalRemoteCommand(connection, partitionValueQuery->data, - &queryResult); - if (executeCommand != 0) - { - return false; - } - - bool minValueIsNull = PQgetisnull(queryResult, 0, minValueIndex); - bool maxValueIsNull = PQgetisnull(queryResult, 0, maxValueIndex); - - if (!minValueIsNull && !maxValueIsNull) - { - char *minValueResult = PQgetvalue(queryResult, 0, minValueIndex); - char *maxValueResult = PQgetvalue(queryResult, 0, maxValueIndex); - - *shardMinValue = cstring_to_text(minValueResult); - *shardMaxValue = cstring_to_text(maxValueResult); - } - - PQclear(queryResult); - ForgetResults(connection); - return true; } diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 4e7f0a743..51978a53a 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -39,8 +39,7 @@ #define WORKER_PARTITIONED_RELATION_TOTAL_SIZE_FUNCTION \ "worker_partitioned_relation_total_size(%s)" -#define SHARD_SIZES_COLUMN_COUNT 2 -#define UPDATE_SHARD_STATISTICS_COLUMN_COUNT 4 +#define SHARD_SIZES_COLUMN_COUNT (3) /* In-memory representation of a typed tuple in pg_dist_shard. */ typedef struct ShardInterval @@ -281,9 +280,8 @@ extern ShardInterval * DeformedDistShardTupleToShardInterval(Datum *datumArray, int32 intervalTypeMod); extern void GetIntervalTypeInfo(char partitionMethod, Var *partitionColumn, Oid *intervalTypeId, int32 *intervalTypeMod); -extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, bool - useDistributedTransaction, bool - useShardMinMaxQuery); +extern List * SendShardStatisticsQueriesInParallel(List *citusTableIds, + bool useDistributedTransaction); extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection, uint64 *availableBytes, uint64 *totalBytes); diff --git a/src/test/regress/expected/citus_update_table_statistics.out b/src/test/regress/expected/citus_update_table_statistics.out index d512369d3..69676c1bf 100644 --- a/src/test/regress/expected/citus_update_table_statistics.out +++ b/src/test/regress/expected/citus_update_table_statistics.out @@ -64,11 +64,11 @@ SET citus.multi_shard_modify_mode TO sequential; SELECT citus_update_table_statistics('test_table_statistics_hash'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981000 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981000') AS shard_size UNION ALL SELECT 981001 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981001') AS shard_size UNION ALL SELECT 981002 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981002') AS shard_size UNION ALL SELECT 981003 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981003') AS shard_size UNION ALL SELECT 981004 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981004') AS shard_size UNION ALL SELECT 981005 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981005') AS shard_size UNION ALL SELECT 981006 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981006') AS shard_size UNION ALL SELECT 981007 AS shard_id, NULL::text AS shard_minvalue, NULL::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_hash_981007') AS shard_size UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981000 AS shard_id, 'public.test_table_statistics_hash_981000' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981000') UNION ALL SELECT 981001 AS shard_id, 'public.test_table_statistics_hash_981001' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981001') UNION ALL SELECT 981002 AS shard_id, 'public.test_table_statistics_hash_981002' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981002') UNION ALL SELECT 981003 AS shard_id, 'public.test_table_statistics_hash_981003' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981003') UNION ALL SELECT 981004 AS shard_id, 'public.test_table_statistics_hash_981004' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981004') UNION ALL SELECT 981005 AS shard_id, 'public.test_table_statistics_hash_981005' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981005') UNION ALL SELECT 981006 AS shard_id, 'public.test_table_statistics_hash_981006' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981006') UNION ALL SELECT 981007 AS shard_id, 'public.test_table_statistics_hash_981007' AS shard_name, pg_relation_size('public.test_table_statistics_hash_981007') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -152,11 +152,11 @@ SET citus.multi_shard_modify_mode TO sequential; SELECT citus_update_table_statistics('test_table_statistics_append'); NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;SELECT assign_distributed_transaction_id(xx, xx, 'xxxxxxx'); DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx -NOTICE: issuing SELECT 981008 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981008') AS shard_size FROM test_table_statistics_append_981008 UNION ALL SELECT 981009 AS shard_id, min(id)::text AS shard_minvalue, max(id)::text AS shard_maxvalue, pg_relation_size('public.test_table_statistics_append_981009') AS shard_size FROM test_table_statistics_append_981009 UNION ALL SELECT 0::bigint, NULL::text, NULL::text, 0::bigint; +NOTICE: issuing SELECT 981008 AS shard_id, 'public.test_table_statistics_append_981008' AS shard_name, pg_relation_size('public.test_table_statistics_append_981008') UNION ALL SELECT 981009 AS shard_id, 'public.test_table_statistics_append_981009' AS shard_name, pg_relation_size('public.test_table_statistics_append_981009') UNION ALL SELECT 0::bigint, NULL::text, 0::bigint; DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx NOTICE: issuing COMMIT DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx @@ -181,10 +181,10 @@ WHERE ds.logicalrelid::regclass::text in ('test_table_statistics_append') ORDER BY 2, 3; tablename | shardid | placementid | shardname | shardminvalue | shardmaxvalue --------------------------------------------------------------------- - test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | 1 | 3 - test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | 1 | 3 - test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | 5 | 7 - test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | 5 | 7 + test_table_statistics_append | 981008 | 982016 | test_table_statistics_append_981008 | | + test_table_statistics_append | 981008 | 982017 | test_table_statistics_append_981008 | | + test_table_statistics_append | 981009 | 982018 | test_table_statistics_append_981009 | | + test_table_statistics_append | 981009 | 982019 | test_table_statistics_append_981009 | | (4 rows) DROP TABLE test_table_statistics_hash, test_table_statistics_append; diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 058b693f6..527f0802d 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -112,11 +112,8 @@ SELECT count(*) FROM customer_copy_hash; -- Confirm that data was copied SELECT count(*) FROM customer_copy_hash; --- Make sure that master_update_shard_statistics() only updates shard length for --- hash-partitioned tables -SELECT master_update_shard_statistics(560000); - -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000; +-- Update shard statistics for hash-partitioned table +SELECT citus_update_shard_statistics(560000); SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000; @@ -206,24 +203,13 @@ FROM customer_copy_range WHERE c_custkey <= 500; -- Check whether data was copied SELECT count(*) FROM customer_copy_range; --- Manipulate min/max values and check shard statistics for new shard -UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 -WHERE shardid = :new_shard_id; - -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; --- Update shard statistics for range-partitioned shard and check that only the --- shard length is updated. -SELECT master_update_shard_statistics(:new_shard_id); +-- Update shard statistics for range-partitioned shard +SELECT citus_update_shard_statistics(:new_shard_id); -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; --- Revert back min/max value updates -UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000 -WHERE shardid = :new_shard_id; - -- Create a new append-partitioned table into which to COPY CREATE TABLE customer_copy_append ( c_custkey integer, @@ -272,16 +258,13 @@ END; SELECT * FROM customer_copy_append; -- Manipulate manipulate and check shard statistics for append-partitioned table shard -UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132; UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132; -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; -- Update shard statistics for append-partitioned shard -SELECT master_update_shard_statistics(560132); +SELECT citus_update_shard_statistics(560132); -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132; SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; -- Create lineitem table diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index 1d2e3107e..f122d15df 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -14,9 +14,9 @@ CREATE TABLE customer_copy_hash ( c_comment varchar(117), primary key (c_custkey)); SELECT master_create_distributed_table('customer_copy_hash', 'c_custkey', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) -- Test COPY into empty hash-partitioned table @@ -25,24 +25,24 @@ ERROR: could not find any shards into which to copy DETAIL: No shards exist for distributed table "customer_copy_hash". HINT: Run master_create_worker_shards to create shards and try again. SELECT master_create_worker_shards('customer_copy_hash', 64, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) -- Test empty copy COPY customer_copy_hash FROM STDIN; -- Test syntax error COPY customer_copy_hash (c_custkey,c_name) FROM STDIN; -ERROR: invalid input syntax for type integer: "1,customer1" +ERROR: invalid input syntax for integer: "1,customer1" CONTEXT: COPY customer_copy_hash, line 1, column c_custkey: "1,customer1" -- Test invalid option -COPY customer_copy_hash (c_custkey,c_name) FROM STDIN (append_to_shard 1); +COPY customer_copy_hash (c_custkey,c_name) FROM STDIN (append_to_shard xxxxx); ERROR: append_to_shard is only valid for append-distributed tables -- Confirm that no data was copied SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 0 (1 row) @@ -53,8 +53,8 @@ ERROR: duplicate key value violates unique constraint "customer_copy_hash_pkey_ DETAIL: Key (c_custkey)=(2) already exists. -- Confirm that no data was copied SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 0 (1 row) @@ -63,8 +63,8 @@ COPY customer_copy_hash (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv', HEADER true, FORCE_NULL (c_custkey)); -- Confirm that only first row was skipped SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 3 (1 row) @@ -73,8 +73,8 @@ COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN WITH (FORMAT 'csv', QUOTE '"', FORCE_NOT_NULL (c_address)); -- Confirm that value is not null SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 4; - count -------- + count +--------------------------------------------------------------------- 1 (1 row) @@ -83,20 +83,20 @@ COPY customer_copy_hash (c_custkey, c_name, c_address) FROM STDIN WITH (FORMAT 'csv', QUOTE '"', FORCE_NULL (c_address)); -- Confirm that value is null SELECT count(c_address) FROM customer_copy_hash WHERE c_custkey = 5; - count -------- + count +--------------------------------------------------------------------- 0 (1 row) -- Test null violation COPY customer_copy_hash (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); -ERROR: null value in column "c_name" of relation "customer_copy_hash_560001" violates not-null constraint +ERROR: null value in column "c_name" violates not-null constraint DETAIL: Failing row contains (8, null, null, null, null, null, null, null). -- Confirm that no data was copied SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 5 (1 row) @@ -105,8 +105,8 @@ COPY customer_copy_hash (c_custkey, c_name) FROM PROGRAM 'echo 9 customer9' WITH (DELIMITER ' '); -- Confirm that data was copied SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9; - count -------- + count +--------------------------------------------------------------------- 1 (1 row) @@ -114,8 +114,8 @@ SELECT count(*) FROM customer_copy_hash WHERE c_custkey = 9; COPY customer_copy_hash FROM '@abs_srcdir@/data/customer.2.data' WITH (DELIMITER '|'); -- Confirm that data was copied SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 1006 (1 row) @@ -123,28 +123,21 @@ SELECT count(*) FROM customer_copy_hash; \copy customer_copy_hash FROM '@abs_srcdir@/data/customer.3.data' WITH (DELIMITER '|'); -- Confirm that data was copied SELECT count(*) FROM customer_copy_hash; - count -------- + count +--------------------------------------------------------------------- 2006 (1 row) --- Make sure that master_update_shard_statistics() only updates shard length for --- hash-partitioned tables -SELECT master_update_shard_statistics(560000); - master_update_shard_statistics --------------------------------- - 8192 -(1 row) - -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560000; - shardid | shardminvalue | shardmaxvalue ----------+---------------+--------------- - 560000 | -2147483648 | -2080374785 +-- Update shard statistics for hash-partitioned table +SELECT citus_update_shard_statistics(560000); + citus_update_shard_statistics +--------------------------------------------------------------------- + 8192 (1 row) SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560000; - shardid | shardlength ----------+------------- + shardid | shardlength +--------------------------------------------------------------------- 560000 | 8192 (1 row) @@ -154,15 +147,15 @@ CREATE TABLE customer_with_default( c_name varchar(25) not null, c_time timestamp default now()); SELECT master_create_distributed_table('customer_with_default', 'c_custkey', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_worker_shards('customer_with_default', 64, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) -- Test with default values for now() function @@ -170,8 +163,8 @@ COPY customer_with_default (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); -- Confirm that data was copied with now() function SELECT count(*) FROM customer_with_default where c_time IS NOT NULL; - count -------- + count +--------------------------------------------------------------------- 2 (1 row) @@ -180,8 +173,8 @@ ALTER TABLE customer_copy_hash ADD COLUMN extra1 INT DEFAULT 0; ALTER TABLE customer_copy_hash ADD COLUMN extra2 INT DEFAULT 0; COPY customer_copy_hash (c_custkey, c_name, extra1, extra2) FROM STDIN CSV; SELECT * FROM customer_copy_hash WHERE extra1 = 1; - c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2 ------------+------------+-----------+-------------+---------+-----------+--------------+-----------+--------+-------- + c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra1 | extra2 +--------------------------------------------------------------------- 10 | customer10 | | | | | | | 1 | 5 (1 row) @@ -189,8 +182,8 @@ SELECT * FROM customer_copy_hash WHERE extra1 = 1; ALTER TABLE customer_copy_hash DROP COLUMN extra1; COPY customer_copy_hash (c_custkey, c_name, extra2) FROM STDIN CSV; SELECT * FROM customer_copy_hash WHERE c_custkey = 11; - c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2 ------------+------------+-----------+-------------+---------+-----------+--------------+-----------+-------- + c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment | extra2 +--------------------------------------------------------------------- 11 | customer11 | | | | | | | 5 (1 row) @@ -198,9 +191,9 @@ SELECT * FROM customer_copy_hash WHERE c_custkey = 11; ALTER TABLE customer_copy_hash DROP COLUMN extra2; COPY customer_copy_hash (c_custkey, c_name) FROM STDIN CSV; SELECT * FROM customer_copy_hash WHERE c_custkey = 12; - c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment ------------+------------+-----------+-------------+---------+-----------+--------------+----------- - 12 | customer12 | | | | | | + c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment +--------------------------------------------------------------------- + 12 | customer12 | | | | | | (1 row) -- Create a new range-partitioned table into which to COPY @@ -215,9 +208,9 @@ CREATE TABLE customer_copy_range ( c_comment varchar(117), primary key (c_custkey)); SELECT master_create_distributed_table('customer_copy_range', 'c_custkey', 'range'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) -- Test COPY into empty range-partitioned table @@ -237,58 +230,39 @@ COPY customer_copy_range FROM '@abs_srcdir@/data/customer.1.data' WITH (DELIMITE -- Check whether data went into the right shard (maybe) SELECT min(c_custkey), max(c_custkey), avg(c_custkey), count(*) FROM customer_copy_range WHERE c_custkey <= 500; - min | max | avg | count ------+-----+----------------------+------- + min | max | avg | count +--------------------------------------------------------------------- 1 | 500 | 250.5000000000000000 | 500 (1 row) -- Check whether data was copied SELECT count(*) FROM customer_copy_range; - count -------- + count +--------------------------------------------------------------------- 1000 (1 row) --- Manipulate min/max values and check shard statistics for new shard -UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 -WHERE shardid = :new_shard_id; -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; - shardid | shardminvalue | shardmaxvalue ----------+---------------+--------------- - 560129 | 1501 | 2000 -(1 row) - SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; - shardid | shardlength ----------+------------- + shardid | shardlength +--------------------------------------------------------------------- 560129 | 0 560129 | 0 (2 rows) --- Update shard statistics for range-partitioned shard and check that only the --- shard length is updated. -SELECT master_update_shard_statistics(:new_shard_id); - master_update_shard_statistics --------------------------------- - 131072 -(1 row) - -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = :new_shard_id; - shardid | shardminvalue | shardmaxvalue ----------+---------------+--------------- - 560129 | 1501 | 2000 +-- Update shard statistics for range-partitioned shard +SELECT citus_update_shard_statistics(:new_shard_id); + citus_update_shard_statistics +--------------------------------------------------------------------- + 131072 (1 row) SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = :new_shard_id; - shardid | shardlength ----------+------------- + shardid | shardlength +--------------------------------------------------------------------- 560129 | 131072 560129 | 131072 (2 rows) --- Revert back min/max value updates -UPDATE pg_dist_shard SET shardminvalue = 501, shardmaxvalue = 1000 -WHERE shardid = :new_shard_id; -- Create a new append-partitioned table into which to COPY CREATE TABLE customer_copy_append ( c_custkey integer, @@ -300,22 +274,22 @@ CREATE TABLE customer_copy_append ( c_mktsegment char(10), c_comment varchar(117)); SELECT create_distributed_table('customer_copy_append', 'c_custkey', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) -- Test syntax error BEGIN; SELECT master_create_empty_shard('customer_copy_append') AS shardid \gset COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid); -ERROR: invalid input syntax for type integer: "notinteger" +ERROR: invalid input syntax for integer: "notinteger" CONTEXT: COPY customer_copy_append, line 3, column c_custkey: "notinteger" END; -- Test that no shard is created for failing copy SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; - count -------- + count +--------------------------------------------------------------------- 0 (1 row) @@ -326,8 +300,8 @@ COPY customer_copy_append FROM STDIN WITH (append_to_shard :shardid); END; -- Test that a shard is created SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'customer_copy_append'::regclass; - count -------- + count +--------------------------------------------------------------------- 1 (1 row) @@ -338,44 +312,31 @@ COPY customer_copy_append(c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv', appe END; -- Check whether data was copied properly SELECT * FROM customer_copy_append; - c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment ------------+-----------+-----------+-------------+---------+-----------+--------------+----------- - 1 | customer1 | | | | | | - 2 | customer2 | | | | | | + c_custkey | c_name | c_address | c_nationkey | c_phone | c_acctbal | c_mktsegment | c_comment +--------------------------------------------------------------------- + 1 | customer1 | | | | | | + 2 | customer2 | | | | | | (2 rows) -- Manipulate manipulate and check shard statistics for append-partitioned table shard -UPDATE pg_dist_shard SET shardminvalue = 1501, shardmaxvalue = 2000 WHERE shardid = 560132; UPDATE pg_dist_shard_placement SET shardlength = 0 WHERE shardid = 560132; -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132; - shardid | shardminvalue | shardmaxvalue ----------+---------------+--------------- - 560132 | 1501 | 2000 -(1 row) - SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; - shardid | shardlength ----------+------------- + shardid | shardlength +--------------------------------------------------------------------- 560132 | 0 560132 | 0 (2 rows) -- Update shard statistics for append-partitioned shard -SELECT master_update_shard_statistics(560132); - master_update_shard_statistics --------------------------------- - 8192 -(1 row) - -SELECT shardid, shardminvalue, shardmaxvalue FROM pg_dist_shard WHERE shardid = 560132; - shardid | shardminvalue | shardmaxvalue ----------+---------------+--------------- - 560132 | 1 | 2 +SELECT citus_update_shard_statistics(560132); + citus_update_shard_statistics +--------------------------------------------------------------------- + 8192 (1 row) SELECT shardid, shardlength FROM pg_dist_shard_placement WHERE shardid = 560132; - shardid | shardlength ----------+------------- + shardid | shardlength +--------------------------------------------------------------------- 560132 | 8192 560132 | 8192 (2 rows) @@ -399,9 +360,9 @@ CREATE TABLE lineitem_copy_append ( l_shipmode char(10) not null, l_comment varchar(44) not null); SELECT create_distributed_table('lineitem_copy_append', 'l_orderkey', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) BEGIN; @@ -409,16 +370,16 @@ SELECT master_create_empty_shard('lineitem_copy_append') AS shardid \gset COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard :shardid); END; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid = 'lineitem_copy_append'::regclass; - count -------- + count +--------------------------------------------------------------------- 1 (1 row) -- trigger some errors on the append_to_shard option -COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard 1); -ERROR: could not find valid entry for shard 1 -COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard 560000); -ERROR: shard 560000 does not belong to table lineitem_copy_append +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard xxxxx); +ERROR: could not find valid entry for shard xxxxx +COPY lineitem_copy_append FROM '@abs_srcdir@/data/lineitem.1.data' with (delimiter '|', append_to_shard xxxxx); +ERROR: shard xxxxx does not belong to table lineitem_copy_append -- Test schema support on append partitioned tables CREATE SCHEMA append; CREATE TABLE append.customer_copy ( @@ -431,9 +392,9 @@ CREATE TABLE append.customer_copy ( c_mktsegment char(10), c_comment varchar(117)); SELECT create_distributed_table('append.customer_copy', 'c_custkey', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('append.customer_copy') AS shardid1 \gset @@ -443,8 +404,8 @@ COPY append.customer_copy FROM '@abs_srcdir@/data/customer.1.data' with (delimit COPY append.customer_copy FROM '@abs_srcdir@/data/customer.2.data' with (delimiter '|', append_to_shard :shardid2); -- Test the content of the table SELECT min(c_custkey), max(c_custkey), avg(c_acctbal), count(*) FROM append.customer_copy; - min | max | avg | count ------+------+-----------------------+------- + min | max | avg | count +--------------------------------------------------------------------- 1 | 7000 | 4443.8028800000000000 | 2000 (1 row) @@ -453,23 +414,23 @@ CREATE TABLE "customer_with_special_\\_character"( c_custkey integer, c_name varchar(25) not null); SELECT master_create_distributed_table('"customer_with_special_\\_character"', 'c_custkey', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_worker_shards('"customer_with_special_\\_character"', 4, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) COPY "customer_with_special_\\_character" (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); -- Confirm that data was copied SELECT count(*) FROM "customer_with_special_\\_character"; - count -------- + count +--------------------------------------------------------------------- 2 (1 row) @@ -478,23 +439,23 @@ CREATE TABLE "1_customer"( c_custkey integer, c_name varchar(25) not null); SELECT master_create_distributed_table('"1_customer"', 'c_custkey', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_worker_shards('"1_customer"', 4, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) COPY "1_customer" (c_custkey, c_name) FROM STDIN WITH (FORMAT 'csv'); -- Confirm that data was copied SELECT count(*) FROM "1_customer"; - count -------- + count +--------------------------------------------------------------------- 2 (1 row) @@ -513,23 +474,23 @@ CREATE TABLE packed_numbers_hash ( packed_numbers number_pack[] ); SELECT master_create_distributed_table('packed_numbers_hash', 'id', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_worker_shards('packed_numbers_hash', 4, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) COPY (SELECT 1, ARRAY[ROW(42, 42), ROW(42, 42)]) TO :'temp_dir''copy_test_array_of_composite'; COPY packed_numbers_hash FROM :'temp_dir''copy_test_array_of_composite'; -- Verify data is actually copied SELECT * FROM packed_numbers_hash; - id | packed_numbers -----+----------------------- + id | packed_numbers +--------------------------------------------------------------------- 1 | {"(42,42)","(42,42)"} (1 row) @@ -539,23 +500,23 @@ CREATE TABLE super_packed_numbers_hash ( super_packed_number super_number_pack ); SELECT master_create_distributed_table('super_packed_numbers_hash', 'id', 'hash'); - master_create_distributed_table ---------------------------------- - + master_create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_worker_shards('super_packed_numbers_hash', 4, 1); - master_create_worker_shards ------------------------------ - + master_create_worker_shards +--------------------------------------------------------------------- + (1 row) COPY (SELECT 1, ROW(ROW(42, 42), ROW(42, 42))) TO :'temp_dir''copy_test_composite_of_composite'; COPY super_packed_numbers_hash FROM :'temp_dir''copy_test_composite_of_composite'; -- Verify data is actually copied SELECT * FROM super_packed_numbers_hash; - id | super_packed_number -----+----------------------- + id | super_packed_number +--------------------------------------------------------------------- 1 | ("(42,42)","(42,42)") (1 row) @@ -565,17 +526,17 @@ CREATE TABLE packed_numbers_append ( packed_numbers number_pack[] ); SELECT create_distributed_table('packed_numbers_append', 'id', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('packed_numbers_append') AS shardid \gset COPY packed_numbers_append FROM :'temp_dir''copy_test_array_of_composite' WITH (append_to_shard :shardid); -- Verify data is actually copied SELECT * FROM packed_numbers_append; - id | packed_numbers -----+----------------------- + id | packed_numbers +--------------------------------------------------------------------- 1 | {"(42,42)","(42,42)"} (1 row) @@ -585,17 +546,17 @@ CREATE TABLE super_packed_numbers_append ( super_packed_number super_number_pack ); SELECT create_distributed_table('super_packed_numbers_append', 'id', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('super_packed_numbers_append') AS shardid \gset COPY super_packed_numbers_append FROM :'temp_dir''copy_test_composite_of_composite' WITH (append_to_shard :shardid); -- Verify data is actually copied SELECT * FROM super_packed_numbers_append; - id | super_packed_number -----+----------------------- + id | super_packed_number +--------------------------------------------------------------------- 1 | ("(42,42)","(42,42)") (1 row) @@ -605,9 +566,9 @@ CREATE TABLE composite_partition_column_table( composite_column number_pack ); SELECT create_distributed_table('composite_partition_column_table', 'composite_column', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT master_create_empty_shard('composite_partition_column_table') AS shardid \gset @@ -615,17 +576,17 @@ COPY composite_partition_column_table FROM STDIN WITH (FORMAT 'csv', append_to_s -- Test copy on append distributed tables do not create shards on removed workers CREATE TABLE numbers_append (a int, b int); SELECT create_distributed_table('numbers_append', 'a', 'append'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) -- no shards is created yet SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; - shardid | nodename | nodeport ----------+----------+---------- + shardid | nodename | nodeport +--------------------------------------------------------------------- (0 rows) SELECT master_create_empty_shard('numbers_append') AS shardid1 \gset @@ -636,8 +597,8 @@ COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid2); SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; - shardid | nodename | nodeport ----------+-----------+---------- + shardid | nodename | nodeport +--------------------------------------------------------------------- 560155 | localhost | 57637 560155 | localhost | 57638 560156 | localhost | 57638 @@ -646,10 +607,10 @@ SELECT shardid, nodename, nodeport -- disable the first node SELECT master_disable_node('localhost', :worker_1_port); -NOTICE: Node localhost:57637 has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57637) to activate this node back. - master_disable_node ---------------------- - +NOTICE: Node localhost:xxxxx has active shard placements. Some queries may fail after this operation. Use SELECT citus_activate_node('localhost', 57637) to activate this node back. + master_disable_node +--------------------------------------------------------------------- + (1 row) -- set replication factor to 1 so that copy will @@ -663,8 +624,8 @@ COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid2); SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; - shardid | nodename | nodeport ----------+-----------+---------- + shardid | nodename | nodeport +--------------------------------------------------------------------- 560155 | localhost | 57637 560155 | localhost | 57638 560156 | localhost | 57638 @@ -676,8 +637,8 @@ SELECT shardid, nodename, nodeport -- add the node back SET client_min_messages TO ERROR; SELECT 1 FROM master_activate_node('localhost', :worker_1_port); - ?column? ----------- + ?column? +--------------------------------------------------------------------- 1 (1 row) @@ -691,8 +652,8 @@ COPY numbers_append FROM STDIN WITH (FORMAT 'csv', append_to_shard :shardid2); SELECT shardid, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_append'::regclass order by placementid; - shardid | nodename | nodeport ----------+-----------+---------- + shardid | nodename | nodeport +--------------------------------------------------------------------- 560155 | localhost | 57637 560155 | localhost | 57638 560156 | localhost | 57638 @@ -712,8 +673,8 @@ CREATE USER test_user; NOTICE: not propagating CREATE ROLE/USER commands to worker nodes HINT: Connect to worker nodes directly to manually create all necessary users and roles. SELECT * FROM run_command_on_workers('CREATE USER test_user'); - nodename | nodeport | success | result ------------+----------+---------+------------- + nodename | nodeport | success | result +--------------------------------------------------------------------- localhost | 57637 | t | CREATE ROLE localhost | 57638 | t | CREATE ROLE (2 rows) @@ -722,9 +683,9 @@ SELECT * FROM run_command_on_workers('CREATE USER test_user'); SET citus.shard_count to 4; CREATE TABLE numbers_hash (a int, b int); SELECT create_distributed_table('numbers_hash', 'a'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); @@ -732,8 +693,8 @@ COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560161 | 1 | localhost | 57637 560161 | 1 | localhost | 57638 560162 | 1 | localhost | 57637 @@ -747,25 +708,25 @@ SELECT shardid, shardstate, nodename, nodeport -- create a reference table CREATE TABLE numbers_reference(a int, b int); SELECT create_reference_table('numbers_reference'); - create_reference_table ------------------------- - + create_reference_table +--------------------------------------------------------------------- + (1 row) COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); -- create another hash distributed table CREATE TABLE numbers_hash_other(a int, b int); SELECT create_distributed_table('numbers_hash_other', 'a'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560166 | 1 | localhost | 57637 560166 | 1 | localhost | 57638 560167 | 1 | localhost | 57637 @@ -790,14 +751,14 @@ ALTER USER test_user WITH nologin; \c - test_user - :master_port -- reissue copy, and it should fail COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash, line 1: "1,1" -- verify shards in the none of the workers as marked invalid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560161 | 1 | localhost | 57637 560161 | 1 | localhost | 57638 560162 | 1 | localhost | 57637 @@ -810,14 +771,14 @@ SELECT shardid, shardstate, nodename, nodeport -- try to insert into a reference table copy should fail COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); -ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_reference, line 1: "3,1" -- verify shards for reference table are still valid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_reference'::regclass order by placementid; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560165 | 1 | localhost | 57637 560165 | 1 | localhost | 57638 (2 rows) @@ -826,15 +787,15 @@ SELECT shardid, shardstate, nodename, nodeport -- since it can not insert into either copies of a shard. shards are expected to -- stay valid since the operation is rolled back. COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); -ERROR: connection to the remote node localhost:57637 failed with the following error: FATAL: role "test_user" is not permitted to log in +ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in CONTEXT: COPY numbers_hash_other, line 1: "1,1" -- verify shards for numbers_hash_other are still valid -- since copy has failed altogether SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash_other'::regclass order by shardid, nodeport; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560166 | 1 | localhost | 57637 560166 | 1 | localhost | 57638 560167 | 1 | localhost | 57637 @@ -858,9 +819,9 @@ DROP TABLE numbers_reference; SET citus.shard_count to 4; CREATE TABLE numbers_hash(a int, b int); SELECT create_distributed_table('numbers_hash', 'a'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) \c - - - :worker_1_port @@ -869,12 +830,12 @@ ALTER TABLE numbers_hash_560170 DROP COLUMN b; -- operation will fail to modify a shard and roll back COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); ERROR: column "b" of relation "numbers_hash_560170" does not exist -CONTEXT: while executing command on localhost:57637 +CONTEXT: while executing command on localhost:xxxxx COPY numbers_hash, line 1: "1,1" -- verify no row is inserted SELECT count(a) FROM numbers_hash; - count -------- + count +--------------------------------------------------------------------- 0 (1 row) @@ -882,8 +843,8 @@ SELECT count(a) FROM numbers_hash; SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) WHERE logicalrelid = 'numbers_hash'::regclass order by shardid, nodeport; - shardid | shardstate | nodename | nodeport ----------+------------+-----------+---------- + shardid | shardstate | nodename | nodeport +--------------------------------------------------------------------- 560170 | 1 | localhost | 57637 560170 | 1 | localhost | 57638 560171 | 1 | localhost | 57637 @@ -896,8 +857,8 @@ SELECT shardid, shardstate, nodename, nodeport DROP TABLE numbers_hash; SELECT * FROM run_command_on_workers('DROP USER test_user'); - nodename | nodeport | success | result ------------+----------+---------+----------- + nodename | nodeport | success | result +--------------------------------------------------------------------- localhost | 57637 | t | DROP ROLE localhost | 57638 | t | DROP ROLE (2 rows) @@ -909,15 +870,15 @@ col1 aclitem NOT NULL, col2 character varying(255) NOT NULL ); SELECT create_reference_table('test_binaryless_builtin'); - create_reference_table ------------------------- - + create_reference_table +--------------------------------------------------------------------- + (1 row) \COPY test_binaryless_builtin FROM STDIN WITH (format CSV) SELECT * FROM test_binaryless_builtin; - col1 | col2 ----------------------+------- + col1 | col2 +--------------------------------------------------------------------- postgres=r/postgres | test (1 row) @@ -926,9 +887,9 @@ DROP TABLE test_binaryless_builtin; BEGIN; CREATE TABLE tt1(id int); SELECT create_distributed_table('tt1','id'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) \copy tt1 from STDIN; @@ -937,24 +898,24 @@ END; -- Test dropping a column in front of the partition column CREATE TABLE drop_copy_test_table (col1 int, col2 int, col3 int, col4 int); SELECT create_distributed_table('drop_copy_test_table','col3'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) ALTER TABLE drop_copy_test_table drop column col1; COPY drop_copy_test_table (col2,col3,col4) from STDIN with CSV; SELECT * FROM drop_copy_test_table WHERE col3 = 1; - col2 | col3 | col4 -------+------+------ - | 1 | + col2 | col3 | col4 +--------------------------------------------------------------------- + | 1 | (1 row) ALTER TABLE drop_copy_test_table drop column col4; COPY drop_copy_test_table (col2,col3) from STDIN with CSV; SELECT * FROM drop_copy_test_table WHERE col3 = 1; - col2 | col3 -------+------ + col2 | col3 +--------------------------------------------------------------------- | 1 | 1 (2 rows) @@ -963,8 +924,8 @@ DROP TABLE drop_copy_test_table; -- There should be no "tt1" shard on the worker nodes \c - - - :worker_1_port SELECT relname FROM pg_class WHERE relname LIKE 'tt1%'; - relname ---------- + relname +--------------------------------------------------------------------- (0 rows) \c - - - :master_port @@ -977,9 +938,9 @@ NOTICE: Copying data from local table... NOTICE: copying the data has completed DETAIL: The local data in the table is no longer visible, but is still on disk. HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$public.trigger_flush$$) - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) ABORT; @@ -989,9 +950,9 @@ SET citus.shard_count TO 3; SET citus.multi_shard_modify_mode TO 'sequential'; CREATE UNLOGGED TABLE trigger_switchover(a int, b int, c int, d int, e int, f int, g int, h int); SELECT create_distributed_table('trigger_switchover','a'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) INSERT INTO trigger_switchover @@ -1000,16 +961,16 @@ ABORT; -- copy into a table with a JSONB column CREATE TABLE copy_jsonb (key text, value jsonb, extra jsonb default '["default"]'::jsonb); SELECT create_distributed_table('copy_jsonb', 'key', colocate_with => 'none'); - create_distributed_table --------------------------- - + create_distributed_table +--------------------------------------------------------------------- + (1 row) -- JSONB from text should work \COPY copy_jsonb (key, value) FROM STDIN SELECT * FROM copy_jsonb ORDER BY key; - key | value | extra --------+----------------------------+------------- + key | value | extra +--------------------------------------------------------------------- blue | {"b": 255, "g": 0, "r": 0} | ["default"] green | {"b": 0, "g": 255, "r": 0} | ["default"] (2 rows) @@ -1018,8 +979,8 @@ SELECT * FROM copy_jsonb ORDER BY key; COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary); COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary); SELECT * FROM copy_jsonb ORDER BY key; - key | value | extra --------+----------------------------+------------- + key | value | extra +--------------------------------------------------------------------- blue | {"b": 255, "g": 0, "r": 0} | ["default"] blue | {"b": 255, "g": 0, "r": 0} | ["default"] green | {"b": 0, "g": 255, "r": 0} | ["default"] @@ -1028,7 +989,7 @@ SELECT * FROM copy_jsonb ORDER BY key; -- JSONB parsing error without validation: no line number \COPY copy_jsonb (key, value) FROM STDIN -ERROR: invalid input syntax for type json +ERROR: invalid input syntax for json DETAIL: The input string ended unexpectedly. CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0 COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0" @@ -1037,8 +998,8 @@ SET citus.skip_jsonb_validation_in_copy TO off; -- JSONB from text should work \COPY copy_jsonb (key, value) FROM STDIN SELECT * FROM copy_jsonb ORDER BY key; - key | value | extra --------+----------------------------+------------- + key | value | extra +--------------------------------------------------------------------- blue | {"b": 255, "g": 0, "r": 0} | ["default"] green | {"b": 0, "g": 255, "r": 0} | ["default"] (2 rows) @@ -1047,8 +1008,8 @@ SELECT * FROM copy_jsonb ORDER BY key; COPY copy_jsonb TO :'temp_dir''copy_jsonb.pgcopy' WITH (format binary); COPY copy_jsonb FROM :'temp_dir''copy_jsonb.pgcopy' WITH (format binary); SELECT * FROM copy_jsonb ORDER BY key; - key | value | extra --------+----------------------------+------------- + key | value | extra +--------------------------------------------------------------------- blue | {"b": 255, "g": 0, "r": 0} | ["default"] blue | {"b": 255, "g": 0, "r": 0} | ["default"] green | {"b": 0, "g": 255, "r": 0} | ["default"] @@ -1057,7 +1018,7 @@ SELECT * FROM copy_jsonb ORDER BY key; -- JSONB parsing error with validation: should see line number \COPY copy_jsonb (key, value) FROM STDIN -ERROR: invalid input syntax for type json +ERROR: invalid input syntax for json DETAIL: The input string ended unexpectedly. CONTEXT: JSON data, line 1: {"r":255,"g":0,"b":0 COPY copy_jsonb, line 1, column value: "{"r":255,"g":0,"b":0"