Merge pull request #2691 from citusdata/update_changelog

Add 8.1.2 and 8.2.1 changelog entries
pull/2707/head
Hanefi Onaldi 2019-05-07 09:42:51 +03:00
commit 4030d603eb
No known key found for this signature in database
GPG Key ID: 95177DABDC09D1F5
8 changed files with 130 additions and 50 deletions

View File

@ -1,3 +1,15 @@
### citus v8.2.1 (April 03, 2019) ###
* Fixes a bug that prevents stack size to be adjusted
### citus v8.1.2 (April 03, 2019) ###
* Don't do redundant ALTER TABLE consistency checks at coordinator
* Fixes a bug that prevents stack size to be adjusted
* Fix an issue with some DECLARE .. CURSOR WITH HOLD commands
### citus v8.2.0 (March 28, 2019) ### ### citus v8.2.0 (March 28, 2019) ###
* Removes support and code for PostgreSQL 9.6 * Removes support and code for PostgreSQL 9.6

View File

@ -631,7 +631,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
shardConnections->connectionList); shardConnections->connectionList);
} }
EndRemoteCopy(currentShardId, shardConnections->connectionList, true); EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
copiedDataSizeInBytes = 0; copiedDataSizeInBytes = 0;
@ -655,7 +655,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
SendCopyBinaryFooters(copyOutState, currentShardId, SendCopyBinaryFooters(copyOutState, currentShardId,
shardConnections->connectionList); shardConnections->connectionList);
} }
EndRemoteCopy(currentShardId, shardConnections->connectionList, true); EndRemoteCopy(currentShardId, shardConnections->connectionList);
MasterUpdateShardStatistics(shardConnections->shardId); MasterUpdateShardStatistics(shardConnections->shardId);
} }
@ -1194,11 +1194,10 @@ SendCopyDataToPlacement(StringInfo dataBuffer, int64 shardId, MultiConnection *c
/* /*
* EndRemoteCopy ends the COPY input on all connections, and unclaims connections. * EndRemoteCopy ends the COPY input on all connections, and unclaims connections.
* If stopOnFailure is true, then EndRemoteCopy reports an error on failure, * This reports an error on failure.
* otherwise it reports a warning or continues.
*/ */
void void
EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure) EndRemoteCopy(int64 shardId, List *connectionList)
{ {
ListCell *connectionCell = NULL; ListCell *connectionCell = NULL;
@ -1211,21 +1210,14 @@ EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure)
/* end the COPY input */ /* end the COPY input */
if (!PutRemoteCopyEnd(connection, NULL)) if (!PutRemoteCopyEnd(connection, NULL))
{ {
if (!stopOnFailure)
{
continue;
}
ereport(ERROR, (errcode(ERRCODE_IO_ERROR), ereport(ERROR, (errcode(ERRCODE_IO_ERROR),
errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d", errmsg("failed to COPY to shard " INT64_FORMAT " on %s:%d",
shardId, connection->hostname, connection->port))); shardId, connection->hostname, connection->port)));
continue;
} }
/* check whether there were any COPY errors */ /* check whether there were any COPY errors */
result = GetRemoteCommandResult(connection, raiseInterrupts); result = GetRemoteCommandResult(connection, raiseInterrupts);
if (PQresultStatus(result) != PGRES_COMMAND_OK && stopOnFailure) if (PQresultStatus(result) != PGRES_COMMAND_OK)
{ {
ReportCopyError(connection, result); ReportCopyError(connection, result);
} }
@ -2456,8 +2448,7 @@ CitusCopyDestReceiverShutdown(DestReceiver *destReceiver)
} }
/* close the COPY input on all shard placements */ /* close the COPY input on all shard placements */
EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList, EndRemoteCopy(shardConnections->shardId, shardConnections->connectionList);
true);
} }
} }
PG_CATCH(); PG_CATCH();

View File

@ -448,7 +448,7 @@ RemoteFileDestReceiverShutdown(DestReceiver *destReceiver)
} }
/* close the COPY input */ /* close the COPY input */
EndRemoteCopy(0, connectionList, true); EndRemoteCopy(0, connectionList);
if (resultDest->writeLocalFile) if (resultDest->writeLocalFile)
{ {

View File

@ -2355,9 +2355,10 @@ AssignQueuedTasks(TaskTracker *taskTracker)
foreach(taskCell, tasksToAssignList) foreach(taskCell, tasksToAssignList)
{ {
taskState = (TrackerTaskState *) lfirst(taskCell);
BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY; BatchQueryStatus queryStatus = CLIENT_INVALID_BATCH_QUERY;
taskState = (TrackerTaskState *) lfirst(taskCell);
if (!batchSuccess) if (!batchSuccess)
{ {
taskState->status = TASK_CLIENT_SIDE_ASSIGN_FAILED; taskState->status = TASK_CLIENT_SIDE_ASSIGN_FAILED;
@ -2904,12 +2905,17 @@ TrackerHashCleanupJob(HTAB *taskTrackerHash, Task *jobCleanupTask)
foreach(activeTaskTrackerCell, activeTackTrackerList) foreach(activeTaskTrackerCell, activeTackTrackerList)
{ {
taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell); int32 connectionId = 0;
int32 connectionId = taskTracker->connectionId; char *nodeName = NULL;
const char *nodeName = taskTracker->workerName; uint32 nodePort = 0;
uint32 nodePort = taskTracker->workerPort; ResultStatus resultStatus = CLIENT_INVALID_RESULT_STATUS;
ResultStatus resultStatus = MultiClientResultStatus(connectionId); taskTracker = (TaskTracker *) lfirst(activeTaskTrackerCell);
connectionId = taskTracker->connectionId;
nodeName = taskTracker->workerName;
nodePort = taskTracker->workerPort;
resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY) if (resultStatus == CLIENT_RESULT_READY)
{ {
QueryStatus queryStatus = MultiClientQueryStatus(connectionId); QueryStatus queryStatus = MultiClientQueryStatus(connectionId);

View File

@ -144,7 +144,7 @@ static bool SelectsFromDistributedTable(List *rangeTableList, Query *query);
static List * get_all_actual_clauses(List *restrictinfo_list); static List * get_all_actual_clauses(List *restrictinfo_list);
static int CompareInsertValuesByShardId(const void *leftElement, static int CompareInsertValuesByShardId(const void *leftElement,
const void *rightElement); const void *rightElement);
static uint64 GetInitialShardId(List *relationShardList); static uint64 GetAnchorShardId(List *relationShardList);
static List * TargetShardIntervalForFastPathQuery(Query *query, static List * TargetShardIntervalForFastPathQuery(Query *query,
Const **partitionValueConst, Const **partitionValueConst,
bool *isMultiShardQuery); bool *isMultiShardQuery);
@ -2004,7 +2004,7 @@ PlanRouterQuery(Query *originalQuery,
} }
/* we need anchor shard id for select queries with router planner */ /* we need anchor shard id for select queries with router planner */
shardId = GetInitialShardId(prunedRelationShardList); shardId = GetAnchorShardId(prunedRelationShardList);
/* /*
* Determine the worker that has all shard placements if a shard placement found. * Determine the worker that has all shard placements if a shard placement found.
@ -2076,13 +2076,20 @@ PlanRouterQuery(Query *originalQuery,
/* /*
* GetInitialShardId returns the initial shard id given relation shard list. If * GetAnchorShardId returns the anchor shard id given relation shard list.
* there is no relation shard exist in the list returns INAVLID_SHARD_ID. * The desired anchor shard is found as follows:
*
* - Return the first distributed table shard id in the relationShardList if
* there is any.
* - Return a random reference table shard id if all the shards belong to
* reference tables
* - Return INVALID_SHARD_ID on empty lists
*/ */
static uint64 static uint64
GetInitialShardId(List *relationShardList) GetAnchorShardId(List *relationShardList)
{ {
ListCell *prunedRelationShardListCell = NULL; ListCell *prunedRelationShardListCell = NULL;
uint64 referenceShardId = INVALID_SHARD_ID;
foreach(prunedRelationShardListCell, relationShardList) foreach(prunedRelationShardListCell, relationShardList)
{ {
@ -2096,10 +2103,18 @@ GetInitialShardId(List *relationShardList)
} }
shardInterval = linitial(prunedShardList); shardInterval = linitial(prunedShardList);
return shardInterval->shardId;
if (ReferenceTableShardId(shardInterval->shardId))
{
referenceShardId = shardInterval->shardId;
}
else
{
return shardInterval->shardId;
}
} }
return INVALID_SHARD_ID; return referenceShardId;
} }

View File

@ -131,7 +131,7 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
CopyCoercionData *columnCoercionPaths); CopyCoercionData *columnCoercionPaths);
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState); extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState); extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure); extern void EndRemoteCopy(int64 shardId, List *connectionList);
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
const char *queryString); const char *queryString);
extern void CheckCopyPermissions(CopyStmt *copyStatement); extern void CheckCopyPermissions(CopyStmt *copyStatement);

View File

@ -184,7 +184,7 @@ RESET client_min_messages;
-- which might change and we don't have any control over it. -- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy -- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction -- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the -- and different output for executions that are not inside the
-- same transaction. To ensure that, we define a helper function -- same transaction. To ensure that, we define a helper function
BEGIN; BEGIN;
SET LOCAL citus.explain_distributed_queries TO on; SET LOCAL citus.explain_distributed_queries TO on;
@ -201,12 +201,11 @@ SELECT count(DISTINCT value) FROM explain_outputs;
1 1
(1 row) (1 row)
DROP TABLE explain_outputs; TRUNCATE explain_outputs;
COMMIT; COMMIT;
-- now test round-robin policy outside -- now test round-robin policy outside
-- a transaction, we should see the assignements -- a transaction, we should see the assignments
-- change on every execution -- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin'; SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON; SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs INSERT INTO explain_outputs
@ -248,12 +247,11 @@ SELECT count(DISTINCT value) FROM explain_outputs;
1 1
(1 row) (1 row)
DROP TABLE explain_outputs; TRUNCATE explain_outputs;
COMMIT; COMMIT;
-- now test round-robin policy outside -- now test round-robin policy outside
-- a transaction, we should see the assignements -- a transaction, we should see the assignments
-- change on every execution -- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin'; SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON; SET citus.explain_distributed_queries TO ON;
INSERT INTO explain_outputs INSERT INTO explain_outputs
@ -268,6 +266,40 @@ SELECT count(DISTINCT value) FROM explain_outputs;
2 2
(1 row) (1 row)
TRUNCATE explain_outputs;
-- test that the round robin policy detects the anchor shard correctly
-- we should not pick a reference table shard as the anchor shard when joining with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer);
SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id');
create_distributed_table
--------------------------
(1 row)
-- run the query two times to make sure that it hits the correct worker every time
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN SELECT *
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
LEFT JOIN task_assignment_reference_table ref
ON dist.ref_id = ref.test_id
$cmd$, 'task_assignment_nonreplicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN SELECT *
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
LEFT JOIN task_assignment_reference_table ref
ON dist.ref_id = ref.test_id
$cmd$, 'task_assignment_nonreplicated_hash');
-- The count should be 1 since the shard exists in only one worker node
SELECT count(DISTINCT value) FROM explain_outputs;
count
-------
1
(1 row)
TRUNCATE explain_outputs;
RESET citus.task_assignment_policy; RESET citus.task_assignment_policy;
RESET client_min_messages; RESET client_min_messages;
-- we should be able to use round-robin with router queries that -- we should be able to use round-robin with router queries that
@ -292,4 +324,5 @@ WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
(0 rows) (0 rows)
ROLLBACK; ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash,
task_assignment_reference_table, explain_outputs;

View File

@ -133,7 +133,7 @@ RESET client_min_messages;
-- which might change and we don't have any control over it. -- which might change and we don't have any control over it.
-- the important thing that we look for is that round-robin policy -- the important thing that we look for is that round-robin policy
-- should give the same output for executions in the same transaction -- should give the same output for executions in the same transaction
-- and different output for executions that are not insdie the -- and different output for executions that are not inside the
-- same transaction. To ensure that, we define a helper function -- same transaction. To ensure that, we define a helper function
BEGIN; BEGIN;
@ -149,15 +149,12 @@ INSERT INTO explain_outputs
-- given that we're in the same transaction, the count should be 1 -- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs; SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
DROP TABLE explain_outputs;
COMMIT; COMMIT;
-- now test round-robin policy outside -- now test round-robin policy outside
-- a transaction, we should see the assignements -- a transaction, we should see the assignments
-- change on every execution -- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin'; SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON; SET citus.explain_distributed_queries TO ON;
@ -192,15 +189,12 @@ INSERT INTO explain_outputs
-- given that we're in the same transaction, the count should be 1 -- given that we're in the same transaction, the count should be 1
SELECT count(DISTINCT value) FROM explain_outputs; SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
DROP TABLE explain_outputs;
COMMIT; COMMIT;
-- now test round-robin policy outside -- now test round-robin policy outside
-- a transaction, we should see the assignements -- a transaction, we should see the assignments
-- change on every execution -- change on every execution
CREATE TEMPORARY TABLE explain_outputs (value text);
SET citus.task_assignment_policy TO 'round-robin'; SET citus.task_assignment_policy TO 'round-robin';
SET citus.explain_distributed_queries TO ON; SET citus.explain_distributed_queries TO ON;
@ -212,7 +206,35 @@ INSERT INTO explain_outputs
-- given that we're in the same transaction, the count should be 2 -- given that we're in the same transaction, the count should be 2
-- since there are two different worker nodes -- since there are two different worker nodes
SELECT count(DISTINCT value) FROM explain_outputs; SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
-- test that the round robin policy detects the anchor shard correctly
-- we should not pick a reference table shard as the anchor shard when joining with a distributed table
SET citus.shard_replication_factor TO 1;
CREATE TABLE task_assignment_nonreplicated_hash (test_id integer, ref_id integer);
SELECT create_distributed_table('task_assignment_nonreplicated_hash', 'test_id');
-- run the query two times to make sure that it hits the correct worker every time
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN SELECT *
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
LEFT JOIN task_assignment_reference_table ref
ON dist.ref_id = ref.test_id
$cmd$, 'task_assignment_nonreplicated_hash');
INSERT INTO explain_outputs
SELECT parse_explain_output($cmd$
EXPLAIN SELECT *
FROM (SELECT * FROM task_assignment_nonreplicated_hash WHERE test_id = 3) AS dist
LEFT JOIN task_assignment_reference_table ref
ON dist.ref_id = ref.test_id
$cmd$, 'task_assignment_nonreplicated_hash');
-- The count should be 1 since the shard exists in only one worker node
SELECT count(DISTINCT value) FROM explain_outputs;
TRUNCATE explain_outputs;
RESET citus.task_assignment_policy; RESET citus.task_assignment_policy;
RESET client_min_messages; RESET client_min_messages;
@ -228,4 +250,5 @@ SET LOCAL citus.task_assignment_policy TO 'round-robin';
WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1; WITH q1 AS (SELECT * FROM task_assignment_test_table_2) SELECT * FROM q1;
ROLLBACK; ROLLBACK;
DROP TABLE task_assignment_replicated_hash, task_assignment_reference_table; DROP TABLE task_assignment_replicated_hash, task_assignment_nonreplicated_hash,
task_assignment_reference_table, explain_outputs;