diff --git a/CHANGELOG.md b/CHANGELOG.md index f4efc32d2..c24d35aee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,15 @@ +### citus v8.2.1 (April 03, 2019) ### + +* Fixes a bug that prevents stack size to be adjusted + +### citus v8.1.2 (April 03, 2019) ### + +* Don't do redundant ALTER TABLE consistency checks at coordinator + +* Fixes a bug that prevents stack size to be adjusted + +* Fix an issue with some DECLARE .. CURSOR WITH HOLD commands + ### citus v8.2.0 (March 28, 2019) ### * Removes support and code for PostgreSQL 9.6 diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 6cc0c5dca..d12ae4354 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -631,7 +631,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) shardConnections->connectionList); } - EndRemoteCopy(currentShardId, shardConnections->connectionList, true); + EndRemoteCopy(currentShardId, shardConnections->connectionList); MasterUpdateShardStatistics(shardConnections->shardId); copiedDataSizeInBytes = 0; @@ -655,7 +655,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId) SendCopyBinaryFooters(copyOutState, currentShardId, shardConnections->connectionList); } - EndRemoteCopy(currentShardId, shardConnections->connectionList, true); + EndRemoteCopy(currentShardId, shardConnections->connectionList); MasterUpdateShardStatistics(shardConnections->shardId); } @@ -1194,11 +1194,10 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c /* * EndRemoteCopy ends the COPY input on all connections, and unclaims connections. - * If stopOnFailure is true, then EndRemoteCopy reports an error on failure, - * otherwise it reports a warning or continues. + * This reports an error on failure. */ void -EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) +EndRemoteCopy(int64 shardId, List *connectionList) { ListCell *connectionCell = NULL; @@ -1211,21 +1210,14 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) /* end the COPY input */ if (!PutRemoteCopyEnd(connection, NULL)) { - if (!stopOnFailure) - { - continue; - } - ereport(ERROR, (errcode(ERRCODE_IO_ERROR), errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d", shardId, connection->hostname, connection->port))); - - continue; } /* check whether there were any COPY errors */ result = GetRemoteCommandResult(connection, raiseInterrupts); - if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) + if (PQresultStatus(result) != PGRES_COMMAND_OK) { ReportCopyError(connection, result); } @@ -2456,8 +2448,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver) } /* close the COPY input on all shard placements */ - EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, - true); + EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList); } } PG_CATCH(); diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index 8827fa79a..65ea09fb0 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -448,7 +448,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver) } /* close the COPY input */ - EndRemoteCopy(0, connectionList, true); + EndRemoteCopy(0, connectionList); if (resultDest->writeLocalFile) { diff --git a/src/backend/distributed/executor/multi_task_tracker_executor.c b/src/backend/distributed/executor/multi_task_tracker_executor.c index 1a81ada48..244973e59 100644 --- a/src/backend/distributed/executor/multi_task_tracker_executor.c +++ b/src/backend/distributed/executor/multi_task_tracker_executor.c @@ -2355,9 +2355,10 @@ AssignQueuedTasks(TaskTracker *taskTracker) foreach(taskCell, tasksToAssignList) { - taskState = (TrackerTaskState *) lfirst(taskCell); BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; + taskState = (TrackerTaskState *) lfirst(taskCell); + if (!batchSuccess) { taskState->status = TASK_CLIENT_SIDE_ASSIGN_FAILED; @@ -2904,12 +2905,17 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask) foreach(activeTaskTrackerCell, activeTackTrackerList) { - taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell); - int32 connectionId = taskTracker->connectionId; - const char *nodeName = taskTracker->workerName; - uint32 nodePort = taskTracker->workerPort; + int32 connectionId = 0; + char *nodeName = NULL; + uint32 nodePort = 0; + ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS; - ResultStatus resultStatus = MultiClientResultStatus(connectionId); + taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell); + connectionId = taskTracker->connectionId; + nodeName = taskTracker->workerName; + nodePort = taskTracker->workerPort; + + resultStatus = MultiClientResultStatus(connectionId); if (resultStatus == CLIENT_RESULT_READY) { QueryStatus queryStatus = MultiClientQueryStatus(connectionId); diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 0c5541995..02d14c58b 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -144,7 +144,7 @@ static bool SelectsFromDistributedTable(List *rangeTableList, Query *query); static List * get_all_actual_clauses(List *restrictinfo_list); static int CompareInsertValuesByShardId(const void *leftElement, const void *rightElement); -static uint64 GetInitialShardId(List *relationShardList); +static uint64 GetAnchorShardId(List *relationShardList); static List * TargetShardIntervalForFastPathQuery(Query *query, Const **partitionValueConst, bool *isMultiShardQuery); @@ -2004,7 +2004,7 @@ PlanRouterQuery(Query *originalQuery, } /* we need anchor shard id for select queries with router planner */ - shardId = GetInitialShardId(prunedRelationShardList); + shardId = GetAnchorShardId(prunedRelationShardList); /* * Determine the worker that has all shard placements if a shard placement found. @@ -2076,13 +2076,20 @@ PlanRouterQuery(Query *originalQuery, /* - * GetInitialShardId returns the initial shard id given relation shard list. If - * there is no relation shard exist in the list returns INAVLID_SHARD_ID. + * GetAnchorShardId returns the anchor shard id given relation shard list. + * The desired anchor shard is found as follows: + * + * - Return the first distributed table shard id in the relationShardList if + * there is any. + * - Return a random reference table shard id if all the shards belong to + * reference tables + * - Return INVALID_SHARD_ID on empty lists */ static uint64 -GetInitialShardId(List *relationShardList) +GetAnchorShardId(List *relationShardList) { ListCell *prunedRelationShardListCell = NULL; + uint64 referenceShardId = INVALID_SHARD_ID; foreach(prunedRelationShardListCell, relationShardList) { @@ -2096,10 +2103,18 @@ GetInitialShardId(List *relationShardList) } shardInterval = linitial(prunedShardList); - return shardInterval->shardId; + + if (ReferenceTableShardId(shardInterval->shardId)) + { + referenceShardId = shardInterval->shardId; + } + else + { + return shardInterval->shardId; + } } - return INVALID_SHARD_ID; + return referenceShardId; } diff --git a/src/include/distributed/commands/multi_copy.h b/src/include/distributed/commands/multi_copy.h index 36a7cf978..907d19a10 100644 --- a/src/include/distributed/commands/multi_copy.h +++ b/src/include/distributed/commands/multi_copy.h @@ -131,7 +131,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray, CopyCoercionData *columnCoercionPaths); extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); -extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); +extern void EndRemoteCopy(int64 shardId, List *connectionList); extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, const char *queryString); extern void CheckCopyPermissions(CopyStmt *copyStatement); diff --git a/src/test/regress/expected/multi_task_assignment_policy.out b/src/test/regress/expected/multi_task_assignment_policy.out index 686b1f9cc..70bae799b 100644 --- a/src/test/regress/expected/multi_task_assignment_policy.out +++ b/src/test/regress/expected/multi_task_assignment_policy.out @@ -184,7 +184,7 @@ RESET client_min_messages; -- which might change and we don't have any control over it. -- the important thing that we look for is that round-robin policy -- should give the same output for executions in the same transaction --- and different output for executions that are not insdie the +-- and different output for executions that are not inside the -- same transaction. To ensure that, we define a helper function BEGIN; SET LOCAL citus.explain_distributed_queries TO on; @@ -201,12 +201,11 @@ SELECT count(DISTINCT value) FROM explain_outputs; 1 (1 row) -DROP TABLE explain_outputs; +TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside --- a transaction, we should see the assignements +-- a transaction, we should see the assignments -- change on every execution -CREATE TEMPORARY TABLE explain_outputs (value text); SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; INSERT INTO explain_outputs @@ -248,12 +247,11 @@ SELECT count(DISTINCT value) FROM explain_outputs; 1 (1 row) -DROP TABLE explain_outputs; +TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside --- a transaction, we should see the assignements +-- a transaction, we should see the assignments -- change on every execution -CREATE TEMPORARY TABLE explain_outputs (value text); SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; INSERT INTO explain_outputs @@ -268,6 +266,40 @@ SELECT count(DISTINCT value) FROM explain_outputs; 2 (1 row) +TRUNCATE explain_outputs; +-- test that the round robin policy detects the anchor shard correctly +-- we should not pick a reference table shard as the anchor shard when joining with a distributed table +SET citus.shard_replication_factor TO 1; +CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer); +SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id'); + create_distributed_table +-------------------------- + +(1 row) + +-- run the query two times to make sure that it hits the correct worker every time +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN SELECT * +FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist + LEFT JOIN task_assignment_reference_table ref + ON dist.ref_id = ref.test_id +$cmd$, 'task_assignment_nonreplicated_hash'); +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN SELECT * +FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist + LEFT JOIN task_assignment_reference_table ref + ON dist.ref_id = ref.test_id +$cmd$, 'task_assignment_nonreplicated_hash'); +-- The count should be 1 since the shard exists in only one worker node +SELECT count(DISTINCT value) FROM explain_outputs; + count +------- + 1 +(1 row) + +TRUNCATE explain_outputs; RESET citus.task_assignment_policy; RESET client_min_messages; -- we should be able to use round-robin with router queries that @@ -292,4 +324,5 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; (0 rows) ROLLBACK; -DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; +DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, + task_assignment_reference_table, explain_outputs; diff --git a/src/test/regress/sql/multi_task_assignment_policy.sql b/src/test/regress/sql/multi_task_assignment_policy.sql index ccbc72018..67341b8cc 100644 --- a/src/test/regress/sql/multi_task_assignment_policy.sql +++ b/src/test/regress/sql/multi_task_assignment_policy.sql @@ -133,7 +133,7 @@ RESET client_min_messages; -- which might change and we don't have any control over it. -- the important thing that we look for is that round-robin policy -- should give the same output for executions in the same transaction --- and different output for executions that are not insdie the +-- and different output for executions that are not inside the -- same transaction. To ensure that, we define a helper function BEGIN; @@ -149,15 +149,12 @@ INSERT INTO explain_outputs -- given that we're in the same transaction, the count should be 1 SELECT count(DISTINCT value) FROM explain_outputs; - -DROP TABLE explain_outputs; +TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside --- a transaction, we should see the assignements +-- a transaction, we should see the assignments -- change on every execution -CREATE TEMPORARY TABLE explain_outputs (value text); - SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; @@ -192,15 +189,12 @@ INSERT INTO explain_outputs -- given that we're in the same transaction, the count should be 1 SELECT count(DISTINCT value) FROM explain_outputs; - -DROP TABLE explain_outputs; +TRUNCATE explain_outputs; COMMIT; -- now test round-robin policy outside --- a transaction, we should see the assignements +-- a transaction, we should see the assignments -- change on every execution -CREATE TEMPORARY TABLE explain_outputs (value text); - SET citus.task_assignment_policy TO 'round-robin'; SET citus.explain_distributed_queries TO ON; @@ -212,7 +206,35 @@ INSERT INTO explain_outputs -- given that we're in the same transaction, the count should be 2 -- since there are two different worker nodes SELECT count(DISTINCT value) FROM explain_outputs; +TRUNCATE explain_outputs; +-- test that the round robin policy detects the anchor shard correctly +-- we should not pick a reference table shard as the anchor shard when joining with a distributed table +SET citus.shard_replication_factor TO 1; + +CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer); +SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id'); + +-- run the query two times to make sure that it hits the correct worker every time +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN SELECT * +FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist + LEFT JOIN task_assignment_reference_table ref + ON dist.ref_id = ref.test_id +$cmd$, 'task_assignment_nonreplicated_hash'); + +INSERT INTO explain_outputs +SELECT parse_explain_output($cmd$ +EXPLAIN SELECT * +FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist + LEFT JOIN task_assignment_reference_table ref + ON dist.ref_id = ref.test_id +$cmd$, 'task_assignment_nonreplicated_hash'); + +-- The count should be 1 since the shard exists in only one worker node +SELECT count(DISTINCT value) FROM explain_outputs; +TRUNCATE explain_outputs; RESET citus.task_assignment_policy; RESET client_min_messages; @@ -228,4 +250,5 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin'; WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; ROLLBACK; -DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; +DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash, + task_assignment_reference_table, explain_outputs;