mirror of https://github.com/citusdata/citus.git
Code review feedback
parent
ff11e51a56
commit
48c43984cb
|
@ -373,7 +373,7 @@ multi_ProcessUtility(Node *parsetree,
|
|||
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||
}
|
||||
|
||||
/* we run VacuumStmt after standard processing to benefit from its checks */
|
||||
/* we run VacuumStmt after standard hook to benefit from its checks and locking */
|
||||
if (IsA(parsetree, VacuumStmt))
|
||||
{
|
||||
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
|
||||
|
@ -902,7 +902,7 @@ ProcessAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
|
|||
* table, it is propagated to all involved nodes; otherwise, this function will
|
||||
* immediately exit after some error checking.
|
||||
*
|
||||
* Unlike other Process functions within this file, this function does not
|
||||
* Unlike most other Process functions within this file, this function does not
|
||||
* return a modified parse node, as it is expected that the local VACUUM or
|
||||
* ANALYZE has already been processed.
|
||||
*/
|
||||
|
@ -915,7 +915,15 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
|||
|
||||
if (vacuumStmt->relation != NULL)
|
||||
{
|
||||
relationId = RangeVarGetRelid(vacuumStmt->relation, NoLock, false);
|
||||
LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ?
|
||||
AccessExclusiveLock : ShareUpdateExclusiveLock;
|
||||
|
||||
relationId = RangeVarGetRelid(vacuumStmt->relation, lockMode, false);
|
||||
|
||||
if (relationId == InvalidOid)
|
||||
{
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
supportedVacuumStmt = IsSupportedDistributedVacuumStmt(relationId, vacuumStmt);
|
||||
|
@ -926,8 +934,11 @@ ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
|
|||
|
||||
taskList = VacuumTaskList(relationId, vacuumStmt);
|
||||
|
||||
/* save old commit protocol to restore at xact end */
|
||||
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
|
||||
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
|
||||
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
|
||||
ExecuteModifyTasksWithoutResults(taskList);
|
||||
}
|
||||
|
||||
|
@ -946,9 +957,9 @@ IsSupportedDistributedVacuumStmt(Oid relationId, VacuumStmt *vacuumStmt)
|
|||
{
|
||||
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
|
||||
|
||||
if (vacuumStmt->relation == NULL && EnableDDLPropagation)
|
||||
if (vacuumStmt->relation == NULL)
|
||||
{
|
||||
/* WARN and exit early for local unqualified VACUUM commands */
|
||||
/* WARN and exit early for unqualified VACUUM commands */
|
||||
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
|
||||
errhint("Provide a specific table in order to %s "
|
||||
"distributed tables.", stmtName)));
|
||||
|
@ -996,7 +1007,7 @@ static List *
|
|||
VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
|
||||
{
|
||||
List *taskList = NIL;
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
List *shardIntervalList = NIL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
uint64 jobId = INVALID_JOB_ID;
|
||||
int taskId = 1;
|
||||
|
@ -1006,8 +1017,13 @@ VacuumTaskList(Oid relationId, VacuumStmt *vacuumStmt)
|
|||
char *schemaName = get_namespace_name(schemaId);
|
||||
char *tableName = get_rel_name(relationId);
|
||||
|
||||
/* lock metadata before getting placement lists */
|
||||
LockShardListMetadata(shardIntervalList, ExclusiveLock);
|
||||
/* lock relation metadata before getting shard list */
|
||||
LockRelationDistributionMetadata(relationId, ShareLock);
|
||||
|
||||
shardIntervalList = LoadShardIntervalList(relationId);
|
||||
|
||||
/* grab shard lock before getting placement list */
|
||||
LockShardListMetadata(shardIntervalList, ShareLock);
|
||||
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
|
|
|
@ -175,6 +175,8 @@ BeginTransactionOnShardPlacements(uint64 shardId, char *userName)
|
|||
* transaction to fail.
|
||||
*/
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
/* the special BARE mode (for e.g. VACUUM/ANALYZE) skips BEGIN */
|
||||
if (MultiShardCommitProtocol > COMMIT_PROTOCOL_BARE)
|
||||
{
|
||||
/* issue BEGIN */
|
||||
|
@ -276,6 +278,7 @@ ResetShardPlacementTransactionState(void)
|
|||
if (MultiShardCommitProtocol == COMMIT_PROTOCOL_BARE)
|
||||
{
|
||||
MultiShardCommitProtocol = SavedMultiShardCommitProtocol;
|
||||
SavedMultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -83,17 +83,60 @@ SELECT master_create_worker_shards('dustbunnies', 1, 2);
|
|||
|
||||
-- add some data to the distributed table
|
||||
\copy dustbunnies from stdin with csv
|
||||
-- following approach adapted from PostgreSQL's stats.sql file
|
||||
-- save relevant stat counter values in refreshable view
|
||||
\c - - - :worker_1_port
|
||||
CREATE MATERIALIZED VIEW prevcounts AS
|
||||
SELECT analyze_count, vacuum_count FROM pg_stat_user_tables
|
||||
WHERE relname='dustbunnies_990002';
|
||||
-- create function that sleeps until those counters increment
|
||||
create function wait_for_stats() returns void as $$
|
||||
declare
|
||||
start_time timestamptz := clock_timestamp();
|
||||
analyze_updated bool;
|
||||
vacuum_updated bool;
|
||||
begin
|
||||
-- we don't want to wait forever; loop will exit after 10 seconds
|
||||
for i in 1 .. 100 loop
|
||||
|
||||
-- check to see if analyze has been updated
|
||||
SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
-- check to see if vacuum has been updated
|
||||
SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
exit when analyze_updated or vacuum_updated;
|
||||
|
||||
-- wait a little
|
||||
perform pg_sleep(0.1);
|
||||
|
||||
-- reset stats snapshot so we can test again
|
||||
perform pg_stat_clear_snapshot();
|
||||
|
||||
end loop;
|
||||
|
||||
-- report time waited in postmaster log (where it won't change test output)
|
||||
raise log 'wait_for_stats delayed % seconds',
|
||||
extract(epoch from clock_timestamp() - start_time);
|
||||
end
|
||||
$$ language plpgsql;
|
||||
-- run VACUUM and ANALYZE against the table on the master
|
||||
\c - - - :master_port
|
||||
VACUUM dustbunnies;
|
||||
ANALYZE dustbunnies;
|
||||
-- verify that the VACUUM and ANALYZE ran
|
||||
\c - - - :worker_1_port
|
||||
SELECT pg_sleep(.500);
|
||||
pg_sleep
|
||||
----------
|
||||
SELECT wait_for_stats();
|
||||
wait_for_stats
|
||||
----------------
|
||||
|
||||
(1 row)
|
||||
|
||||
REFRESH MATERIALIZED VIEW prevcounts;
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
pg_stat_get_vacuum_count
|
||||
--------------------------
|
||||
|
@ -123,9 +166,9 @@ WHERE oid='dustbunnies_990002'::regclass;
|
|||
(1 row)
|
||||
|
||||
-- verify the VACUUM ANALYZE incremented both vacuum and analyze counts
|
||||
SELECT pg_sleep(.500);
|
||||
pg_sleep
|
||||
----------
|
||||
SELECT wait_for_stats();
|
||||
wait_for_stats
|
||||
----------------
|
||||
|
||||
(1 row)
|
||||
|
||||
|
|
|
@ -56,13 +56,59 @@ SELECT master_create_worker_shards('dustbunnies', 1, 2);
|
|||
4,roger
|
||||
\.
|
||||
|
||||
-- following approach adapted from PostgreSQL's stats.sql file
|
||||
|
||||
-- save relevant stat counter values in refreshable view
|
||||
\c - - - :worker_1_port
|
||||
CREATE MATERIALIZED VIEW prevcounts AS
|
||||
SELECT analyze_count, vacuum_count FROM pg_stat_user_tables
|
||||
WHERE relname='dustbunnies_990002';
|
||||
|
||||
-- create function that sleeps until those counters increment
|
||||
create function wait_for_stats() returns void as $$
|
||||
declare
|
||||
start_time timestamptz := clock_timestamp();
|
||||
analyze_updated bool;
|
||||
vacuum_updated bool;
|
||||
begin
|
||||
-- we don't want to wait forever; loop will exit after 10 seconds
|
||||
for i in 1 .. 100 loop
|
||||
|
||||
-- check to see if analyze has been updated
|
||||
SELECT (st.analyze_count >= pc.analyze_count + 1) INTO analyze_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
-- check to see if vacuum has been updated
|
||||
SELECT (st.vacuum_count >= pc.vacuum_count + 1) INTO vacuum_updated
|
||||
FROM pg_stat_user_tables AS st, pg_class AS cl, prevcounts AS pc
|
||||
WHERE st.relname='dustbunnies_990002' AND cl.relname='dustbunnies_990002';
|
||||
|
||||
exit when analyze_updated or vacuum_updated;
|
||||
|
||||
-- wait a little
|
||||
perform pg_sleep(0.1);
|
||||
|
||||
-- reset stats snapshot so we can test again
|
||||
perform pg_stat_clear_snapshot();
|
||||
|
||||
end loop;
|
||||
|
||||
-- report time waited in postmaster log (where it won't change test output)
|
||||
raise log 'wait_for_stats delayed % seconds',
|
||||
extract(epoch from clock_timestamp() - start_time);
|
||||
end
|
||||
$$ language plpgsql;
|
||||
|
||||
-- run VACUUM and ANALYZE against the table on the master
|
||||
\c - - - :master_port
|
||||
VACUUM dustbunnies;
|
||||
ANALYZE dustbunnies;
|
||||
|
||||
-- verify that the VACUUM and ANALYZE ran
|
||||
\c - - - :worker_1_port
|
||||
SELECT pg_sleep(.500);
|
||||
SELECT wait_for_stats();
|
||||
REFRESH MATERIALIZED VIEW prevcounts;
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
|
||||
|
@ -81,7 +127,7 @@ SELECT relfilenode != :oldnode AS table_rewritten FROM pg_class
|
|||
WHERE oid='dustbunnies_990002'::regclass;
|
||||
|
||||
-- verify the VACUUM ANALYZE incremented both vacuum and analyze counts
|
||||
SELECT pg_sleep(.500);
|
||||
SELECT wait_for_stats();
|
||||
SELECT pg_stat_get_vacuum_count('dustbunnies_990002'::regclass);
|
||||
SELECT pg_stat_get_analyze_count('dustbunnies_990002'::regclass);
|
||||
|
||||
|
|
Loading…
Reference in New Issue