Record non-distributed table accesses in local executor (#4139)

pull/4154/head
Onur Tirtir 2020-09-07 18:19:08 +03:00 committed by GitHub
parent 959629d3f3
commit ba208eae4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 164 additions and 30 deletions

View File

@ -1183,23 +1183,43 @@ CanUseExclusiveConnections(Oid relationId, bool localTableEmpty)
}
else if (shouldRunSequential && ParallelQueryExecutedInTransaction())
{
/*
* We decided to use sequential execution. It's either because relation
* has a pre-existing foreign key to a reference table or because we
* decided to use sequential execution due to a query executed in the
* current xact beforehand.
* We have specific error messages for either cases.
*/
char *relationName = get_rel_name(relationId);
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
"transaction because it has a foreign key to "
"a reference table", relationName),
errdetail("If a hash distributed table has a foreign key "
"to a reference table, it has to be created "
"in sequential mode before any parallel commands "
"have been executed in the same transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
if (hasForeignKeyToReferenceTable)
{
/*
* If there has already been a parallel query executed, the sequential mode
* would still use the already opened parallel connections to the workers,
* thus contradicting our purpose of using sequential mode.
*/
ereport(ERROR, (errmsg("cannot distribute relation \"%s\" in this "
"transaction because it has a foreign key to "
"a reference table", relationName),
errdetail("If a hash distributed table has a foreign key "
"to a reference table, it has to be created "
"in sequential mode before any parallel commands "
"have been executed in the same transaction"),
errhint("Try re-running the transaction with "
"\"SET LOCAL citus.multi_shard_modify_mode TO "
"\'sequential\';\"")));
}
else if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
ereport(ERROR, (errmsg("cannot distribute \"%s\" in sequential mode because "
"a parallel query was executed in this transaction",
relationName),
errhint("If you have manually set "
"citus.multi_shard_modify_mode to 'sequential', "
"try with 'parallel' option. ")));
}
}
else if (shouldRunSequential)
{

View File

@ -2288,7 +2288,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
copyDest->shardStateHash = CreateShardStateHash(TopTransactionContext);
copyDest->connectionStateHash = CreateConnectionStateHash(TopTransactionContext);
RecordRelationAccessIfReferenceTable(tableId, PLACEMENT_ACCESS_DML);
RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML);
/*
* For all the primary (e.g., writable) nodes, reserve a shared connection.

View File

@ -466,7 +466,7 @@ AssignPlacementListToConnection(List *placementAccessList, MultiConnection *conn
/* record the relation access */
Oid relationId = RelationIdForShard(placement->shardId);
RecordRelationAccessIfReferenceTable(relationId, accessType);
RecordRelationAccessIfNonDistTable(relationId, accessType);
}
}

View File

@ -118,6 +118,7 @@ static void SplitLocalAndRemotePlacements(List *taskPlacementList,
static uint64 ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
TupleDestination *tupleDest, Task *task,
ParamListInfo paramListInfo);
static void RecordNonDistTableAccessesForTask(Task *task);
static void LogLocalCommand(Task *task);
static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
TupleDestination *tupleDest,
@ -549,6 +550,8 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
int eflags = 0;
uint64 totalRowsProcessed = 0;
RecordNonDistTableAccessesForTask(task);
/*
* Use the tupleStore provided by the scanState because it is shared accross
* the other task executions and the adaptive executor.
@ -585,6 +588,59 @@ ExecuteLocalTaskPlan(PlannedStmt *taskPlan, char *queryString,
}
/*
* RecordNonDistTableAccessesForTask records relation accesses for the non-distributed
* relations that given task will access (if any).
*/
static void
RecordNonDistTableAccessesForTask(Task *task)
{
List *taskPlacementList = task->taskPlacementList;
if (list_length(taskPlacementList) == 0)
{
/*
* We need at least one task placement to record relation access.
* FIXME: Unfortunately, it is possible due to
* https://github.com/citusdata/citus/issues/4104.
* We can safely remove this check when above bug is fixed.
*/
return;
}
/*
* We use only the first placement to find the relation accesses. It is
* sufficient as PlacementAccessListForTask iterates relationShardList
* field of the task and generates accesses per relation in the task.
* As we are only interested in relations, not the placements, we can
* skip rest of the placements.
* Also, here we don't need to iterate relationShardList field of task
* to mark each accessed relation because PlacementAccessListForTask
* already computes and returns relations that task accesses.
*/
ShardPlacement *taskPlacement = linitial(taskPlacementList);
List *placementAccessList = PlacementAccessListForTask(task, taskPlacement);
ShardPlacementAccess *placementAccess = NULL;
foreach_ptr(placementAccess, placementAccessList)
{
uint64 placementAccessShardId = placementAccess->placement->shardId;
if (placementAccessShardId == INVALID_SHARD_ID)
{
/*
* When a SELECT prunes down to 0 shard, we still may pass through
* the local executor. In that case, we don't need to record any
* relation access as we don't actually access any shard placement.
*/
continue;
}
Oid accessedRelationId = RelationIdForShard(placementAccessShardId);
ShardPlacementAccessType shardPlacementAccessType = placementAccess->accessType;
RecordRelationAccessIfNonDistTable(accessedRelationId, shardPlacementAccessType);
}
}
/*
* SetLocalExecutionStatus sets the local execution status to
* the given status, it errors if the transition is not possible from the

View File

@ -153,13 +153,13 @@ AllocateRelationAccessHash(void)
/*
* RecordRelationAccessIfReferenceTable marks the relation accessed if it is a
* RecordRelationAccessIfNonDistTable marks the relation accessed if it is a
* reference relation.
*
* The function is a wrapper around RecordRelationAccessBase().
*/
void
RecordRelationAccessIfReferenceTable(Oid relationId, ShardPlacementAccessType accessType)
RecordRelationAccessIfNonDistTable(Oid relationId, ShardPlacementAccessType accessType)
{
if (!ShouldRecordRelationAccess())
{
@ -492,6 +492,17 @@ RecordParallelRelationAccess(Oid relationId, ShardPlacementAccessType placementA
/* act accordingly if it's a conflicting access */
CheckConflictingParallelRelationAccesses(relationId, placementAccess);
/*
* CheckConflictingParallelRelationAccesses might switch to sequential
* execution. If that's the case, no need to continue because the executor
* would take the necessary actions to switch to sequential execution
* immediately.
*/
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION)
{
return;
}
/* If a relation is partitioned, record accesses to all of its partitions as well. */
if (PartitionedTable(relationId))
{

View File

@ -36,8 +36,8 @@ typedef enum RelationAccessMode
extern void AllocateRelationAccessHash(void);
extern void ResetRelationAccessHash(void);
extern void RecordRelationAccessIfReferenceTable(Oid relationId,
ShardPlacementAccessType accessType);
extern void RecordRelationAccessIfNonDistTable(Oid relationId,
ShardPlacementAccessType accessType);
extern void RecordParallelRelationAccessForTaskList(List *taskList);
extern void RecordParallelSelectAccess(Oid relationId);
extern void RecordParallelModifyAccess(Oid relationId);

View File

@ -1268,9 +1268,8 @@ BEGIN;
SET LOCAL citus.multi_shard_modify_mode TO 'sequential';
CREATE TABLE test_table_2(id int PRIMARY KEY, value_1 int);
SELECT create_distributed_table('test_table_2', 'id');
ERROR: cannot distribute relation "test_table_2" in this transaction because it has a foreign key to a reference table
DETAIL: If a hash distributed table has a foreign key to a reference table, it has to be created in sequential mode before any parallel commands have been executed in the same transaction
HINT: Try re-running the transaction with "SET LOCAL citus.multi_shard_modify_mode TO 'sequential';"
ERROR: cannot distribute "test_table_2" in sequential mode because a parallel query was executed in this transaction
HINT: If you have manually set citus.multi_shard_modify_mode to 'sequential', try with 'parallel' option.
CREATE TABLE test_table_1(id int PRIMARY KEY);
ERROR: current transaction is aborted, commands ignored until end of transaction block
SELECT create_reference_table('test_table_1');

View File

@ -629,7 +629,6 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
(2 rows)
ROLLBACK;
-- Try a bunch of commands and expect failure at SELECT create_distributed_table
BEGIN;
-- here this SELECT will enforce the whole block for local execution
SELECT COUNT(*) FROM ref_table;
@ -664,8 +663,6 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;')
-- as we create table via remote connections, below SELECT create_distributed_table
-- would error out
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'CREATE TABLE local_commands_test_schema.another_dist_table (a integer)');SELECT worker_apply_shard_ddl_command (1500133, 'local_commands_test_schema', 'ALTER TABLE local_commands_test_schema.another_dist_table OWNER TO postgres')
@ -685,6 +682,44 @@ NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1
(1 row)
COMMIT;
-- add a foreign key for next test
ALTER TABLE dist_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY (b) REFERENCES ref_table(a);
BEGIN;
SELECT count(*) FROM ref_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM local_commands_test_schema.ref_table_1500035 ref_table
count
---------------------------------------------------------------------
0
(1 row)
-- should show parallel
SHOW citus.multi_shard_modify_mode ;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
parallel
(1 row)
-- wants to do parallel execution but will switch to sequential mode
ALTER TABLE dist_table DROP COLUMN c;
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500100, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500103, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500106, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500109, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500112, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500115, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500118, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500121, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500124, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500127, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1500130, 'local_commands_test_schema', 'ALTER TABLE dist_table DROP COLUMN c;')
-- should show sequential
SHOW citus.multi_shard_modify_mode;
citus.multi_shard_modify_mode
---------------------------------------------------------------------
sequential
(1 row)
ROLLBACK;
---------------------------------------------------------------------
------------ partitioned tables -------------
---------------------------------------------------------------------

View File

@ -278,7 +278,6 @@ BEGIN;
SELECT tablename FROM pg_tables where schemaname='foo_schema' ORDER BY tablename;
ROLLBACK;
-- Try a bunch of commands and expect failure at SELECT create_distributed_table
BEGIN;
-- here this SELECT will enforce the whole block for local execution
SELECT COUNT(*) FROM ref_table;
@ -287,12 +286,26 @@ BEGIN;
ALTER TABLE dist_table ADD column c int;
ALTER TABLE dist_table ALTER COLUMN c SET NOT NULL;
-- as we create table via remote connections, below SELECT create_distributed_table
-- would error out
CREATE TABLE another_dist_table(a int);
SELECT create_distributed_table('another_dist_table', 'a', colocate_with:='dist_table');
COMMIT;
-- add a foreign key for next test
ALTER TABLE dist_table ADD CONSTRAINT fkey_dist_to_ref FOREIGN KEY (b) REFERENCES ref_table(a);
BEGIN;
SELECT count(*) FROM ref_table;
-- should show parallel
SHOW citus.multi_shard_modify_mode ;
-- wants to do parallel execution but will switch to sequential mode
ALTER TABLE dist_table DROP COLUMN c;
-- should show sequential
SHOW citus.multi_shard_modify_mode;
ROLLBACK;
---------------------------------------------
------------ partitioned tables -------------
---------------------------------------------