diff --git a/src/backend/distributed/executor/multi_utility.c b/src/backend/distributed/executor/multi_utility.c index d3e287b06..1f2b7a802 100644 --- a/src/backend/distributed/executor/multi_utility.c +++ b/src/backend/distributed/executor/multi_utility.c @@ -373,7 +373,7 @@ multi_ProcessUtility(Node *parsetree, SetUserIdAndSecContext(savedUserId, savedSecurityContext); } - /* we run VacuumStmt after standard processing to benefit from its checks */ + /* we run VacuumStmt after standard hook to benefit from its checks and locking */ if (IsA(parsetree, VacuumStmt)) { VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree; @@ -902,7 +902,7 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, * table, it is propagated to all involved nodes; otherwise, this function will * immediately exit after some error checking. * - * Unlike other Process functions within this file, this function does not + * Unlike most other Process functions within this file, this function does not * return a modified parse node, as it is expected that the local VACUUM or * ANALYZE has already been processed. */ @@ -915,7 +915,15 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) if (vacuumStmt->relation != NULL) { - relationId = RangeVarGetRelid(vacuumStmt->relation, NoLock, false); + LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ? + AccessExclusiveLock : ShareUpdateExclusiveLock; + + relationId = RangeVarGetRelid(vacuumStmt->relation, lockMode, false); + + if (relationId == InvalidOid) + { + return; + } } supportedVacuumStmt = IsSupportedDistributedVacuumStmt(relationId, vacuumStmt); @@ -926,8 +934,11 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand) taskList = VacuumTaskList(relationId, vacuumStmt); + /* save old commit protocol to restore at xact end */ + Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE); SavedMultiShardCommitProtocol = MultiShardCommitProtocol; MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; + ExecuteModifyTasksWithoutResults(taskList); } @@ -946,9 +957,9 @@ IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt) { const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE"; - if (vacuumStmt->relation == NULL && EnableDDLPropagation) + if (vacuumStmt->relation == NULL) { - /* WARN and exit early for local unqualified VACUUM commands */ + /* WARN and exit early for unqualified VACUUM commands */ ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName), errhint("Provide a specific table in order to %s " "distributed tables.", stmtName))); @@ -996,7 +1007,7 @@ static List * VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt) { List *taskList = NIL; - List *shardIntervalList = LoadShardIntervalList(relationId); + List *shardIntervalList = NIL; ListCell *shardIntervalCell = NULL; uint64 jobId = INVALID_JOB_ID; int taskId = 1; @@ -1006,8 +1017,13 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt) char *schemaName = get_namespace_name(schemaId); char *tableName = get_rel_name(relationId); - /* lock metadata before getting placement lists */ - LockShardListMetadata(shardIntervalList, ExclusiveLock); + /* lock relation metadata before getting shard list */ + LockRelationDistributionMetadata(relationId, ShareLock); + + shardIntervalList = LoadShardIntervalList(relationId); + + /* grab shard lock before getting placement list */ + LockShardListMetadata(shardIntervalList, ShareLock); foreach(shardIntervalCell, shardIntervalList) { diff --git a/src/backend/distributed/transaction/multi_shard_transaction.c b/src/backend/distributed/transaction/multi_shard_transaction.c index a581fa241..f5ebd1351 100644 --- a/src/backend/distributed/transaction/multi_shard_transaction.c +++ b/src/backend/distributed/transaction/multi_shard_transaction.c @@ -175,6 +175,8 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName) * transaction to fail. */ MarkRemoteTransactionCritical(connection); + + /* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */ if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE) { /* issue BEGIN */ @@ -276,6 +278,7 @@ ResetShardPlacementTransactionState(void) if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE) { MultiShardCommitProtocol = SavedMultiShardCommitProtocol; + SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE; } } diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index 4f8f72ec4..ac68fe85b 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -83,17 +83,60 @@ SELECT master_create_worker_shards('dustbunnies', 1, 2); -- add some data to the distributed table \copy dustbunnies from stdin with csv +-- following approach adapted from PostgreSQL's stats.sql file +-- save relevant stat counter values in refreshable view +\c - - - :worker_1_port +CREATE MATERIALIZED VIEW prevcounts AS +SELECT analyze_count, vacuum_count FROM pg_stat_user_tables +WHERE relname='dustbunnies_990002'; +-- create function that sleeps until those counters increment +create function wait_for_stats() returns void as $$ +declare + start_time timestamptz := clock_timestamp(); + analyze_updated bool; + vacuum_updated bool; +begin + -- we don't want to wait forever; loop will exit after 10 seconds + for i in 1 .. 100 loop + + -- check to see if analyze has been updated + SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated + FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc + WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002'; + + -- check to see if vacuum has been updated + SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated + FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc + WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002'; + + exit when analyze_updated or vacuum_updated; + + -- wait a little + perform pg_sleep(0.1); + + -- reset stats snapshot so we can test again + perform pg_stat_clear_snapshot(); + + end loop; + + -- report time waited in postmaster log (where it won't change test output) + raise log 'wait_for_stats delayed % seconds', + extract(epoch from clock_timestamp() - start_time); +end +$$ language plpgsql; -- run VACUUM and ANALYZE against the table on the master +\c - - - :master_port VACUUM dustbunnies; ANALYZE dustbunnies; -- verify that the VACUUM and ANALYZE ran \c - - - :worker_1_port -SELECT pg_sleep(.500); - pg_sleep ----------- +SELECT wait_for_stats(); + wait_for_stats +---------------- (1 row) +REFRESH MATERIALIZED VIEW prevcounts; SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass); pg_stat_get_vacuum_count -------------------------- @@ -123,9 +166,9 @@ WHERE oid='dustbunnies_990002'::regclass; (1 row) -- verify the VACUUM ANALYZE incremented both vacuum and analyze counts -SELECT pg_sleep(.500); - pg_sleep ----------- +SELECT wait_for_stats(); + wait_for_stats +---------------- (1 row) diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index 54a55992a..3cdfd91d7 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -56,13 +56,59 @@ SELECT master_create_worker_shards('dustbunnies', 1, 2); 4,roger \. +-- following approach adapted from PostgreSQL's stats.sql file + +-- save relevant stat counter values in refreshable view +\c - - - :worker_1_port +CREATE MATERIALIZED VIEW prevcounts AS +SELECT analyze_count, vacuum_count FROM pg_stat_user_tables +WHERE relname='dustbunnies_990002'; + +-- create function that sleeps until those counters increment +create function wait_for_stats() returns void as $$ +declare + start_time timestamptz := clock_timestamp(); + analyze_updated bool; + vacuum_updated bool; +begin + -- we don't want to wait forever; loop will exit after 10 seconds + for i in 1 .. 100 loop + + -- check to see if analyze has been updated + SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated + FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc + WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002'; + + -- check to see if vacuum has been updated + SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated + FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc + WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002'; + + exit when analyze_updated or vacuum_updated; + + -- wait a little + perform pg_sleep(0.1); + + -- reset stats snapshot so we can test again + perform pg_stat_clear_snapshot(); + + end loop; + + -- report time waited in postmaster log (where it won't change test output) + raise log 'wait_for_stats delayed % seconds', + extract(epoch from clock_timestamp() - start_time); +end +$$ language plpgsql; + -- run VACUUM and ANALYZE against the table on the master +\c - - - :master_port VACUUM dustbunnies; ANALYZE dustbunnies; -- verify that the VACUUM and ANALYZE ran \c - - - :worker_1_port -SELECT pg_sleep(.500); +SELECT wait_for_stats(); +REFRESH MATERIALIZED VIEW prevcounts; SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass); SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass); @@ -81,7 +127,7 @@ SELECT relfilenode != :oldnode AS table_rewritten FROM pg_class WHERE oid='dustbunnies_990002'::regclass; -- verify the VACUUM ANALYZE incremented both vacuum and analyze counts -SELECT pg_sleep(.500); +SELECT wait_for_stats(); SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass); SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);