From bdce4a7e67ff73f14eb344abb9f8bed7deb1a38d Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 23 Dec 2020 14:46:46 +0300 Subject: [PATCH 1/8] Propagate rename statistics --- .../commands/distribute_object_ops.c | 12 ++++++ src/backend/distributed/commands/statistics.c | 41 ++++++++++++++++++- .../deparser/deparse_statistics_stmts.c | 25 ++++++++++- .../deparser/qualify_statistics_stmt.c | 21 ++++++++++ .../distributed/relay/relay_event_utility.c | 11 +++++ src/include/distributed/commands.h | 1 + src/include/distributed/deparser.h | 2 + .../regress/expected/propagate_statistics.out | 6 +++ src/test/regress/sql/propagate_statistics.sql | 6 +++ 9 files changed, 122 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 38cee4041..77d9b2659 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -414,6 +414,13 @@ static DistributeObjectOps Schema_Rename = { .postprocess = NULL, .address = AlterSchemaRenameStmtObjectAddress, }; +static DistributeObjectOps Statistics_Rename = { + .deparse = DeparseAlterStatisticsRenameStmt, + .qualify = QualifyAlterStatisticsRenameStmt, + .preprocess = PreprocessAlterStatisticsRenameStmt, + .postprocess = NULL, + .address = NULL, +}; static DistributeObjectOps Statistics_Drop = { .deparse = NULL, .qualify = QualifyDropStatisticsStmt, @@ -907,6 +914,11 @@ GetDistributeObjectOps(Node *node) return &Schema_Rename; } + case OBJECT_STATISTIC_EXT: + { + return &Statistics_Rename; + } + case OBJECT_TYPE: { return &Type_Rename; diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 46fd2ed33..2d214bfc7 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -190,6 +190,44 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString) } +/* + * PreprocessAlterStatisticsRenameStmt is called during the planning phase for + * ALTER STATISTICS RENAME. + */ +List * +PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString) +{ + RenameStmt *renameStmt = castNode(RenameStmt, node); + Assert(renameStmt->renameType == OBJECT_STATISTIC_EXT); + + Oid statsOid = get_statistics_object_oid((List *) renameStmt->object, false); + Oid relationId = GetRelIdByStatsOid(statsOid); + + if (!IsCitusTable(relationId) || !ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + QualifyTreeNode((Node *) renameStmt); + + char *ddlCommand = DeparseTreeNode((Node *) renameStmt); + + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + + ddlJob->targetRelationId = relationId; + ddlJob->concurrentIndexCmd = false; + ddlJob->startNewTransaction = false; + ddlJob->commandString = ddlCommand; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + List *ddlJobs = list_make1(ddlJob); + + return ddlJobs; +} + + /* * GetExplicitStatisticsCommandList returns the list of DDL commands to create * statistics that are explicitly created for the table with relationId. See @@ -207,7 +245,8 @@ GetExplicitStatisticsCommandList(Oid relationId) Oid statisticsId = InvalidOid; foreach_oid(statisticsId, statisticsIdList) { - char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId, false); + char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId, + false); createStatisticsCommandList = lappend( createStatisticsCommandList, diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 58fd8f736..117b19709 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -26,6 +26,7 @@ static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt); static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt); static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt); static void AppendTableName(StringInfo buf, CreateStatsStmt *stmt); +static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt); char * DeparseCreateStatisticsStmt(Node *node) @@ -53,6 +54,20 @@ DeparseDropStatisticsStmt(List *nameList, bool ifExists) } +char * +DeparseAlterStatisticsRenameStmt(Node *node) +{ + RenameStmt *stmt = castNode(RenameStmt, node); + + StringInfoData str; + initStringInfo(&str); + + AppendAlterStatisticsRenameStmt(&str, stmt); + + return str.data; +} + + static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt) { @@ -74,8 +89,6 @@ AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt) appendStringInfoString(buf, " FROM "); AppendTableName(buf, stmt); - - appendStringInfoString(buf, ";"); } @@ -93,6 +106,14 @@ AppendDropStatisticsStmt(StringInfo buf, List *nameList, bool ifExists) } +static void +AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt) +{ + appendStringInfo(buf, "ALTER STATISTICS %s RENAME TO %s", + NameListToQuotedString((List *) stmt->object), stmt->newname); +} + + static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt) { diff --git a/src/backend/distributed/deparser/qualify_statistics_stmt.c b/src/backend/distributed/deparser/qualify_statistics_stmt.c index 9e6d97fa8..8947f2f5d 100644 --- a/src/backend/distributed/deparser/qualify_statistics_stmt.c +++ b/src/backend/distributed/deparser/qualify_statistics_stmt.c @@ -78,3 +78,24 @@ QualifyDropStatisticsStmt(Node *node) dropStatisticsStmt->objects = objectNameListWithSchema; } + + +/* + * QualifyAlterStatisticsRenameStmt qualifies RenameStmt's with schema name for + * ALTER STATISTICS RENAME statements. + */ +void +QualifyAlterStatisticsRenameStmt(Node *node) +{ + RenameStmt *renameStmt = castNode(RenameStmt, node); + Assert(renameStmt->renameType == OBJECT_STATISTIC_EXT); + + List *nameList = (List *) renameStmt->object; + if (list_length(nameList) == 1) + { + RangeVar *stat = makeRangeVarFromNameList(nameList); + Oid schemaOid = RangeVarGetCreationNamespace(stat); + stat->schemaname = get_namespace_name(schemaOid); + renameStmt->object = (Node *) MakeNameListFromRangeVar(stat); + } +} diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index fd9f4de71..5483966b9 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -530,6 +530,17 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) { RenamePolicyEventExtendNames(renameStmt, schemaName, shardId); } + else if (objectType == OBJECT_STATISTIC_EXT) + { + RangeVar *stat = makeRangeVarFromNameList((List *) renameStmt->object); + + AppendShardIdToName(&stat->relname, shardId); + AppendShardIdToName(&renameStmt->newname, shardId); + + SetSchemaNameIfNotExist(&stat->schemaname, schemaName); + + renameStmt->object = (Node *) MakeNameListFromRangeVar(stat); + } else { ereport(WARNING, (errmsg("unsafe object type in rename statement"), diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index b33e5486e..cde9193d2 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -282,6 +282,7 @@ extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString extern List * PostprocessCreateStatisticsStmt(Node *node, const char *queryString); extern ObjectAddress CreateStatisticsStmtObjectAddress(Node *node, bool missingOk); extern List * PreprocessDropStatisticsStmt(Node *node, const char *queryString); +extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString); extern List * GetExplicitStatisticsCommandList(Oid relationId); extern List * GetExplicitStatisticsSchemaIdList(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index d49797f05..9f152e880 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -58,9 +58,11 @@ extern char * DeparseAlterSchemaRenameStmt(Node *stmt); /* forward declarations for deparse_statistics_stmts.c */ extern char * DeparseCreateStatisticsStmt(Node *node); extern char * DeparseDropStatisticsStmt(List *nameList, bool ifExists); +extern char * DeparseAlterStatisticsRenameStmt(Node *node); extern void QualifyCreateStatisticsStmt(Node *node); extern void QualifyDropStatisticsStmt(Node *node); +extern void QualifyAlterStatisticsRenameStmt(Node *node); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt); diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index 2a69cec89..0cd0825a4 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -70,6 +70,12 @@ DROP STATISTICS IF EXISTS s3, sc2.s4, s6; DROP STATISTICS s5,s6; ERROR: statistics object "statistics'Test.s6" does not exist DROP STATISTICS IF EXISTS s5,s5,s6,s6; +-- test renaming statistics +CREATE STATISTICS s6 ON a,b FROM test_stats4; +DROP STATISTICS s7; +ERROR: statistics object "statistics'Test.s7" does not exist +ALTER STATISTICS s6 RENAME TO s7; +DROP STATISTICS s7; \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index 1c46b9644..7a4dbcc94 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -57,6 +57,12 @@ DROP STATISTICS IF EXISTS s3, sc2.s4, s6; DROP STATISTICS s5,s6; DROP STATISTICS IF EXISTS s5,s5,s6,s6; +-- test renaming statistics +CREATE STATISTICS s6 ON a,b FROM test_stats4; +DROP STATISTICS s7; +ALTER STATISTICS s6 RENAME TO s7; +DROP STATISTICS s7; + \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext From 5a1607b6c082cca1e60071eec1632f027c8566b3 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 23 Dec 2020 15:56:33 +0300 Subject: [PATCH 2/8] Propagate alter stats schema --- .../commands/distribute_object_ops.c | 24 +++-- src/backend/distributed/commands/statistics.c | 90 +++++++++++++++++++ .../deparser/deparse_statistics_stmts.c | 29 +++++- .../deparser/qualify_statistics_stmt.c | 21 +++++ .../distributed/relay/relay_event_utility.c | 30 +++++-- src/include/distributed/commands.h | 3 + src/include/distributed/deparser.h | 2 + .../regress/expected/propagate_statistics.out | 6 +- src/test/regress/sql/propagate_statistics.sql | 7 +- 9 files changed, 196 insertions(+), 16 deletions(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 77d9b2659..7e140a13d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -414,12 +414,12 @@ static DistributeObjectOps Schema_Rename = { .postprocess = NULL, .address = AlterSchemaRenameStmtObjectAddress, }; -static DistributeObjectOps Statistics_Rename = { - .deparse = DeparseAlterStatisticsRenameStmt, - .qualify = QualifyAlterStatisticsRenameStmt, - .preprocess = PreprocessAlterStatisticsRenameStmt, - .postprocess = NULL, - .address = NULL, +static DistributeObjectOps Statistics_AlterObjectSchema = { + .deparse = DeparseAlterStatisticsSchemaStmt, + .qualify = QualifyAlterStatisticsSchemaStmt, + .preprocess = PreprocessAlterStatisticsSchemaStmt, + .postprocess = PostprocessAlterStatisticsSchemaStmt, + .address = AlterStatisticsSchemaStmtObjectAddress, }; static DistributeObjectOps Statistics_Drop = { .deparse = NULL, @@ -428,6 +428,13 @@ static DistributeObjectOps Statistics_Drop = { .postprocess = NULL, .address = NULL, }; +static DistributeObjectOps Statistics_Rename = { + .deparse = DeparseAlterStatisticsRenameStmt, + .qualify = QualifyAlterStatisticsRenameStmt, + .preprocess = PreprocessAlterStatisticsRenameStmt, + .postprocess = NULL, + .address = NULL, +}; static DistributeObjectOps Table_AlterTable = { .deparse = NULL, .qualify = NULL, @@ -597,6 +604,11 @@ GetDistributeObjectOps(Node *node) return &Routine_AlterObjectSchema; } + case OBJECT_STATISTIC_EXT: + { + return &Statistics_AlterObjectSchema; + } + case OBJECT_TABLE: { return &Table_AlterObjectSchema; diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 2d214bfc7..7db7e06c4 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -228,6 +228,96 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString) } +/* + * PreprocessAlterStatisticsSchemaStmt is called during the planning phase for + * ALTER STATISTICS SET SCHEMA. + */ +List * +PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + Oid statsOid = get_statistics_object_oid((List *) stmt->object, false); + Oid relationId = GetRelIdByStatsOid(statsOid); + + if (!IsCitusTable(relationId) || !ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + QualifyTreeNode((Node *) stmt); + + char *ddlCommand = DeparseTreeNode((Node *) stmt); + + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + + ddlJob->targetRelationId = relationId; + ddlJob->concurrentIndexCmd = false; + ddlJob->startNewTransaction = false; + ddlJob->commandString = ddlCommand; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + List *ddlJobs = list_make1(ddlJob); + + return ddlJobs; +} + + +/* + * PostprocessAlterStatisticsSchemaStmt is called after a ALTER STATISTICS SCHEMA + * command has been executed by standard process utility. + */ +List * +PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + Value *statName = llast((List *) stmt->object); + Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), + statName), false); + Oid relationId = GetRelIdByStatsOid(statsOid); + + if (!IsCitusTable(relationId) || !ShouldPropagate()) + { + return NIL; + } + + bool missingOk = false; + ObjectAddress objectAddress = GetObjectAddressFromParseTree((Node *) stmt, missingOk); + + EnsureDependenciesExistOnAllNodes(&objectAddress); + + return NIL; +} + + +/* + * AlterStatisticsSchemaStmtObjectAddress finds the ObjectAddress for the statistics + * that is altered by given AlterObjectSchemaStmt. If missingOk is false and if + * the statistics does not exist, then it errors out. + * + * Never returns NULL, but the objid in the address can be invalid if missingOk + * was set to true. + */ +ObjectAddress +AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + + ObjectAddress address = { 0 }; + Value *statName = llast((List *) stmt->object); + Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), + statName), false); + ObjectAddressSet(address, StatisticExtRelationId, statsOid); + + return address; +} + + /* * GetExplicitStatisticsCommandList returns the list of DDL commands to create * statistics that are explicitly created for the table with relationId. See diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 117b19709..b085f027f 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -22,11 +22,12 @@ static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt); static void AppendDropStatisticsStmt(StringInfo buf, List *nameList, bool ifExists); +static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt); +static void AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt); static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt); static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt); static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt); static void AppendTableName(StringInfo buf, CreateStatsStmt *stmt); -static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt); char * DeparseCreateStatisticsStmt(Node *node) @@ -68,6 +69,20 @@ DeparseAlterStatisticsRenameStmt(Node *node) } +char * +DeparseAlterStatisticsSchemaStmt(Node *node) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + + StringInfoData str; + initStringInfo(&str); + + AppendAlterStatisticsSchemaStmt(&str, stmt); + + return str.data; +} + + static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt) { @@ -110,7 +125,17 @@ static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt) { appendStringInfo(buf, "ALTER STATISTICS %s RENAME TO %s", - NameListToQuotedString((List *) stmt->object), stmt->newname); + NameListToQuotedString((List *) stmt->object), quote_identifier( + stmt->newname)); +} + + +static void +AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) +{ + appendStringInfo(buf, "ALTER STATISTICS %s SET SCHEMA %s", + NameListToQuotedString((List *) stmt->object), quote_identifier( + stmt->newschema)); } diff --git a/src/backend/distributed/deparser/qualify_statistics_stmt.c b/src/backend/distributed/deparser/qualify_statistics_stmt.c index 8947f2f5d..eb7d19080 100644 --- a/src/backend/distributed/deparser/qualify_statistics_stmt.c +++ b/src/backend/distributed/deparser/qualify_statistics_stmt.c @@ -99,3 +99,24 @@ QualifyAlterStatisticsRenameStmt(Node *node) renameStmt->object = (Node *) MakeNameListFromRangeVar(stat); } } + + +/* + * QualifyAlterStatisticsSchemaStmt qualifies RenameStmt's with schema name for + * ALTER STATISTICS RENAME statements. + */ +void +QualifyAlterStatisticsSchemaStmt(Node *node) +{ + AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + List *nameList = (List *) stmt->object; + if (list_length(nameList) == 1) + { + RangeVar *stat = makeRangeVarFromNameList(nameList); + Oid schemaOid = RangeVarGetCreationNamespace(stat); + stat->schemaname = get_namespace_name(schemaOid); + stmt->object = (Node *) MakeNameListFromRangeVar(stat); + } +} diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 5483966b9..4d0a9104e 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -82,14 +82,32 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) { AlterObjectSchemaStmt *alterObjectSchemaStmt = (AlterObjectSchemaStmt *) parseTree; - char **relationName = &(alterObjectSchemaStmt->relation->relname); - char **relationSchemaName = &(alterObjectSchemaStmt->relation->schemaname); + ObjectType objectType = alterObjectSchemaStmt->objectType; - /* prefix with schema name if it is not added already */ - SetSchemaNameIfNotExist(relationSchemaName, schemaName); + if (objectType == OBJECT_STATISTIC_EXT) + { + RangeVar *stat = makeRangeVarFromNameList( + (List *) alterObjectSchemaStmt->object); + + /* set schema name and append shard id */ + SetSchemaNameIfNotExist(&stat->schemaname, schemaName); + AppendShardIdToName(&stat->relname, shardId); + + alterObjectSchemaStmt->object = (Node *) MakeNameListFromRangeVar(stat); + } + else + { + char **relationName = &(alterObjectSchemaStmt->relation->relname); + char **relationSchemaName = + &(alterObjectSchemaStmt->relation->schemaname); + + /* prefix with schema name if it is not added already */ + SetSchemaNameIfNotExist(relationSchemaName, schemaName); + + /* append shardId to base relation name */ + AppendShardIdToName(relationName, shardId); + } - /* append shardId to base relation name */ - AppendShardIdToName(relationName, shardId); break; } diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index cde9193d2..054ea5fc0 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -283,6 +283,9 @@ extern List * PostprocessCreateStatisticsStmt(Node *node, const char *queryStrin extern ObjectAddress CreateStatisticsStmtObjectAddress(Node *node, bool missingOk); extern List * PreprocessDropStatisticsStmt(Node *node, const char *queryString); extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString); +extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString); +extern List * PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString); +extern ObjectAddress AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk); extern List * GetExplicitStatisticsCommandList(Oid relationId); extern List * GetExplicitStatisticsSchemaIdList(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 9f152e880..59227a658 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -59,10 +59,12 @@ extern char * DeparseAlterSchemaRenameStmt(Node *stmt); extern char * DeparseCreateStatisticsStmt(Node *node); extern char * DeparseDropStatisticsStmt(List *nameList, bool ifExists); extern char * DeparseAlterStatisticsRenameStmt(Node *node); +extern char * DeparseAlterStatisticsSchemaStmt(Node *node); extern void QualifyCreateStatisticsStmt(Node *node); extern void QualifyDropStatisticsStmt(Node *node); extern void QualifyAlterStatisticsRenameStmt(Node *node); +extern void QualifyAlterStatisticsSchemaStmt(Node *node); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt); diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index 0cd0825a4..9516d0b4a 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -75,7 +75,10 @@ CREATE STATISTICS s6 ON a,b FROM test_stats4; DROP STATISTICS s7; ERROR: statistics object "statistics'Test.s7" does not exist ALTER STATISTICS s6 RENAME TO s7; -DROP STATISTICS s7; +-- test altering stats schema +CREATE SCHEMA test_alter_schema; +ALTER STATISTICS s7 SET SCHEMA test_alter_schema; +DROP STATISTICS test_alter_schema.s7; \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext @@ -168,5 +171,6 @@ WHERE stxnamespace IN ( \c - - - :master_port SET client_min_messages TO WARNING; DROP SCHEMA "statistics'Test" CASCADE; +DROP SCHEMA test_alter_schema CASCADE; DROP SCHEMA sc1 CASCADE; DROP SCHEMA sc2 CASCADE; diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index 7a4dbcc94..ffdd3dd29 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -61,7 +61,11 @@ DROP STATISTICS IF EXISTS s5,s5,s6,s6; CREATE STATISTICS s6 ON a,b FROM test_stats4; DROP STATISTICS s7; ALTER STATISTICS s6 RENAME TO s7; -DROP STATISTICS s7; + +-- test altering stats schema +CREATE SCHEMA test_alter_schema; +ALTER STATISTICS s7 SET SCHEMA test_alter_schema; +DROP STATISTICS test_alter_schema.s7; \c - - - :worker_1_port SELECT stxname @@ -84,5 +88,6 @@ WHERE stxnamespace IN ( \c - - - :master_port SET client_min_messages TO WARNING; DROP SCHEMA "statistics'Test" CASCADE; +DROP SCHEMA test_alter_schema CASCADE; DROP SCHEMA sc1 CASCADE; DROP SCHEMA sc2 CASCADE; From f7c70f9a63f53efb988eef0389946b1c38248ff3 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 23 Dec 2020 17:45:55 +0300 Subject: [PATCH 3/8] Propagate alter stats target --- .../commands/distribute_object_ops.c | 17 ++++ src/backend/distributed/commands/statistics.c | 43 ++++++++++ .../deparser/deparse_statistics_stmts.c | 33 ++++++++ .../deparser/qualify_statistics_stmt.c | 24 ++++++ .../distributed/relay/relay_event_utility.c | 15 ++++ src/include/distributed/commands.h | 1 + src/include/distributed/deparser.h | 2 + .../regress/expected/propagate_statistics.out | 81 +++++++++++++++++++ src/test/regress/sql/propagate_statistics.sql | 14 ++++ 9 files changed, 230 insertions(+) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 7e140a13d..258986327 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -14,6 +14,7 @@ #include "distributed/commands.h" #include "distributed/deparser.h" +#include "distributed/pg_version_constants.h" static DistributeObjectOps NoDistributeOps = { .deparse = NULL, @@ -414,6 +415,15 @@ static DistributeObjectOps Schema_Rename = { .postprocess = NULL, .address = AlterSchemaRenameStmtObjectAddress, }; +#if PG_VERSION_NUM >= PG_VERSION_13 +static DistributeObjectOps Statistics_Alter = { + .deparse = DeparseAlterStatisticsStmt, + .qualify = QualifyAlterStatisticsStmt, + .preprocess = PreprocessAlterStatisticsStmt, + .postprocess = NULL, + .address = NULL, +}; +#endif static DistributeObjectOps Statistics_AlterObjectSchema = { .deparse = DeparseAlterStatisticsSchemaStmt, .qualify = QualifyAlterStatisticsSchemaStmt, @@ -683,6 +693,13 @@ GetDistributeObjectOps(Node *node) return &Any_AlterRoleSet; } +#if PG_VERSION_NUM >= PG_VERSION_13 + case T_AlterStatsStmt: + { + return &Statistics_Alter; + } + +#endif case T_AlterTableStmt: { AlterTableStmt *stmt = castNode(AlterTableStmt, node); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 7db7e06c4..95c63d900 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -318,6 +318,49 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) } +#if PG_VERSION_NUM >= PG_VERSION_13 + +/* + * PreprocessAlterStatisticsStmt is called during the planning phase for + * ALTER STATISTICS .. SET STATISTICS command has been executed + * by standard process utility. + */ +List * +PreprocessAlterStatisticsStmt(Node *node, const char *queryString) +{ + AlterStatsStmt *stmt = castNode(AlterStatsStmt, node); + + Oid statsOid = get_statistics_object_oid(stmt->defnames, false); + Oid relationId = GetRelIdByStatsOid(statsOid); + + if (!IsCitusTable(relationId) || !ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + QualifyTreeNode((Node *) stmt); + + char *ddlCommand = DeparseTreeNode((Node *) stmt); + + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + + ddlJob->targetRelationId = relationId; + ddlJob->concurrentIndexCmd = false; + ddlJob->startNewTransaction = false; + ddlJob->commandString = ddlCommand; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + List *ddlJobs = list_make1(ddlJob); + + return ddlJobs; +} + + +#endif + + /* * GetExplicitStatisticsCommandList returns the list of DDL commands to create * statistics that are explicitly created for the table with relationId. See diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index b085f027f..1473980db 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -24,6 +24,9 @@ static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt); static void AppendDropStatisticsStmt(StringInfo buf, List *nameList, bool ifExists); static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt); static void AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt); +#if PG_VERSION_NUM >= PG_VERSION_13 +static void AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt); +#endif static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt); static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt); static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt); @@ -83,6 +86,24 @@ DeparseAlterStatisticsSchemaStmt(Node *node) } +#if PG_VERSION_NUM >= PG_VERSION_13 +char * +DeparseAlterStatisticsStmt(Node *node) +{ + AlterStatsStmt *stmt = castNode(AlterStatsStmt, node); + + StringInfoData str; + initStringInfo(&str); + + AppendAlterStatisticsStmt(&str, stmt); + + return str.data; +} + + +#endif + + static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt) { @@ -139,6 +160,18 @@ AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt) } +#if PG_VERSION_NUM >= PG_VERSION_13 +static void +AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt) +{ + appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", NameListToQuotedString( + stmt->defnames), stmt->stxstattarget); +} + + +#endif + + static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt) { diff --git a/src/backend/distributed/deparser/qualify_statistics_stmt.c b/src/backend/distributed/deparser/qualify_statistics_stmt.c index eb7d19080..ad60b42a3 100644 --- a/src/backend/distributed/deparser/qualify_statistics_stmt.c +++ b/src/backend/distributed/deparser/qualify_statistics_stmt.c @@ -120,3 +120,27 @@ QualifyAlterStatisticsSchemaStmt(Node *node) stmt->object = (Node *) MakeNameListFromRangeVar(stat); } } + + +#if PG_VERSION_NUM >= PG_VERSION_13 + +/* + * QualifyAlterStatisticsStmt qualifies AlterObjectSchemaStmt's with schema name for + * ALTER STATISTICS .. SET STATISTICS statements. + */ +void +QualifyAlterStatisticsStmt(Node *node) +{ + AlterStatsStmt *stmt = castNode(AlterStatsStmt, node); + + if (list_length(stmt->defnames) == 1) + { + RangeVar *stat = makeRangeVarFromNameList(stmt->defnames); + Oid schemaOid = RangeVarGetCreationNamespace(stat); + stat->schemaname = get_namespace_name(schemaOid); + stmt->defnames = MakeNameListFromRangeVar(stat); + } +} + + +#endif diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 4d0a9104e..6db69297b 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -111,6 +111,21 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) break; } +#if PG_VERSION_NUM >= PG_VERSION_13 + case T_AlterStatsStmt: + { + AlterStatsStmt *alterStatsStmt = (AlterStatsStmt *) parseTree; + RangeVar *stat = makeRangeVarFromNameList(alterStatsStmt->defnames); + + AppendShardIdToName(&stat->relname, shardId); + SetSchemaNameIfNotExist(&stat->schemaname, schemaName); + + alterStatsStmt->defnames = MakeNameListFromRangeVar(stat); + + break; + } +#endif + case T_AlterTableStmt: { /* diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index 054ea5fc0..e25aee2d2 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -286,6 +286,7 @@ extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryS extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString); extern List * PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString); extern ObjectAddress AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk); +extern List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString); extern List * GetExplicitStatisticsCommandList(Oid relationId); extern List * GetExplicitStatisticsSchemaIdList(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 59227a658..7ff42b6e3 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -60,11 +60,13 @@ extern char * DeparseCreateStatisticsStmt(Node *node); extern char * DeparseDropStatisticsStmt(List *nameList, bool ifExists); extern char * DeparseAlterStatisticsRenameStmt(Node *node); extern char * DeparseAlterStatisticsSchemaStmt(Node *node); +extern char * DeparseAlterStatisticsStmt(Node *node); extern void QualifyCreateStatisticsStmt(Node *node); extern void QualifyDropStatisticsStmt(Node *node); extern void QualifyAlterStatisticsRenameStmt(Node *node); extern void QualifyAlterStatisticsSchemaStmt(Node *node); +extern void QualifyAlterStatisticsStmt(Node *node); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt); diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index 9516d0b4a..18367b5d0 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -79,6 +79,11 @@ ALTER STATISTICS s6 RENAME TO s7; CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; DROP STATISTICS test_alter_schema.s7; +-- test altering stats target +ALTER STATISTICS s1 SET STATISTICS 3; +-- since max value for target is 10000, this will automatically be lowered +ALTER STATISTICS s2 SET STATISTICS 999999; +WARNING: lowering statistics target to 10000 \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext @@ -168,6 +173,82 @@ WHERE stxnamespace IN ( 3 (1 row) +SELECT stxstattarget +FROM pg_statistic_ext +WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') +) +ORDER BY stxstattarget ASC; + stxstattarget +--------------------------------------------------------------------- + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + -1 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 3 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 + 10000 +(64 rows) + \c - - - :master_port SET client_min_messages TO WARNING; DROP SCHEMA "statistics'Test" CASCADE; diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index ffdd3dd29..dda73c368 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -67,6 +67,11 @@ CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; DROP STATISTICS test_alter_schema.s7; +-- test altering stats target +ALTER STATISTICS s1 SET STATISTICS 3; +-- since max value for target is 10000, this will automatically be lowered +ALTER STATISTICS s2 SET STATISTICS 999999; + \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext @@ -85,6 +90,15 @@ WHERE stxnamespace IN ( WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') ); +SELECT stxstattarget +FROM pg_statistic_ext +WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') +) +ORDER BY stxstattarget ASC; + \c - - - :master_port SET client_min_messages TO WARNING; DROP SCHEMA "statistics'Test" CASCADE; From 48ca1637a465062f8c5a68e5f7b9205c6eddbbca Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Wed, 23 Dec 2020 19:28:41 +0300 Subject: [PATCH 4/8] Propagate alter stats owner --- .../commands/distribute_object_ops.c | 12 ++++++ src/backend/distributed/commands/statistics.c | 40 ++++++++++++++++++- .../deparser/deparse_statistics_stmts.c | 22 ++++++++++ .../deparser/qualify_statistics_stmt.c | 24 ++++++++++- .../distributed/relay/relay_event_utility.c | 17 ++++++++ src/include/distributed/commands.h | 1 + src/include/distributed/deparser.h | 2 + 7 files changed, 114 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/commands/distribute_object_ops.c b/src/backend/distributed/commands/distribute_object_ops.c index 258986327..dd456ec4d 100644 --- a/src/backend/distributed/commands/distribute_object_ops.c +++ b/src/backend/distributed/commands/distribute_object_ops.c @@ -431,6 +431,13 @@ static DistributeObjectOps Statistics_AlterObjectSchema = { .postprocess = PostprocessAlterStatisticsSchemaStmt, .address = AlterStatisticsSchemaStmtObjectAddress, }; +static DistributeObjectOps Statistics_AlterOwner = { + .deparse = DeparseAlterStatisticsOwnerStmt, + .qualify = QualifyAlterStatisticsOwnerStmt, + .preprocess = PreprocessAlterStatisticsOwnerStmt, + .postprocess = NULL, + .address = NULL, +}; static DistributeObjectOps Statistics_Drop = { .deparse = NULL, .qualify = QualifyDropStatisticsStmt, @@ -666,6 +673,11 @@ GetDistributeObjectOps(Node *node) return &Routine_AlterOwner; } + case OBJECT_STATISTIC_EXT: + { + return &Statistics_AlterOwner; + } + case OBJECT_TYPE: { return &Type_AlterOwner; diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 95c63d900..7476b8ca9 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -322,8 +322,7 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) /* * PreprocessAlterStatisticsStmt is called during the planning phase for - * ALTER STATISTICS .. SET STATISTICS command has been executed - * by standard process utility. + * ALTER STATISTICS .. SET STATISTICS. */ List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString) @@ -360,6 +359,43 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString) #endif +/* + * PreprocessAlterStatisticsOwnerStmt is called during the planning phase for + * ALTER STATISTICS .. OWNER TO. + */ +List * +PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString) +{ + AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + Oid statsOid = get_statistics_object_oid((List *) stmt->object, false); + Oid relationId = GetRelIdByStatsOid(statsOid); + + if (!IsCitusTable(relationId) || !ShouldPropagate()) + { + return NIL; + } + + EnsureCoordinator(); + + QualifyTreeNode((Node *) stmt); + + char *ddlCommand = DeparseTreeNode((Node *) stmt); + + DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + + ddlJob->targetRelationId = relationId; + ddlJob->concurrentIndexCmd = false; + ddlJob->startNewTransaction = false; + ddlJob->commandString = ddlCommand; + ddlJob->taskList = DDLTaskList(relationId, ddlCommand); + + List *ddlJobs = list_make1(ddlJob); + + return ddlJobs; +} + /* * GetExplicitStatisticsCommandList returns the list of DDL commands to create diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 1473980db..74350c766 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -27,6 +27,7 @@ static void AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStm #if PG_VERSION_NUM >= PG_VERSION_13 static void AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt); #endif +static void AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt); static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt); static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt); static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt); @@ -102,6 +103,19 @@ DeparseAlterStatisticsStmt(Node *node) #endif +char * +DeparseAlterStatisticsOwnerStmt(Node *node) +{ + AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + StringInfoData str; + initStringInfo(&str); + + AppendAlterStatisticsOwnerStmt(&str, stmt); + + return str.data; +} static void @@ -170,6 +184,14 @@ AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt) #endif +static void +AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) +{ + List *names = (List *) stmt->object; + appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s;", NameListToQuotedString( + names), + RoleSpecString(stmt->newowner, true)); +} static void diff --git a/src/backend/distributed/deparser/qualify_statistics_stmt.c b/src/backend/distributed/deparser/qualify_statistics_stmt.c index ad60b42a3..2a8cb1eaa 100644 --- a/src/backend/distributed/deparser/qualify_statistics_stmt.c +++ b/src/backend/distributed/deparser/qualify_statistics_stmt.c @@ -102,7 +102,7 @@ QualifyAlterStatisticsRenameStmt(Node *node) /* - * QualifyAlterStatisticsSchemaStmt qualifies RenameStmt's with schema name for + * QualifyAlterStatisticsSchemaStmt qualifies AlterObjectSchemaStmt's with schema name for * ALTER STATISTICS RENAME statements. */ void @@ -125,7 +125,7 @@ QualifyAlterStatisticsSchemaStmt(Node *node) #if PG_VERSION_NUM >= PG_VERSION_13 /* - * QualifyAlterStatisticsStmt qualifies AlterObjectSchemaStmt's with schema name for + * QualifyAlterStatisticsStmt qualifies AlterStatsStmt's with schema name for * ALTER STATISTICS .. SET STATISTICS statements. */ void @@ -144,3 +144,23 @@ QualifyAlterStatisticsStmt(Node *node) #endif + +/* + * QualifyAlterStatisticsStmt qualifies AlterOwnerStmt's with schema name for + * ALTER STATISTICS .. OWNER TO statements. + */ +void +QualifyAlterStatisticsOwnerStmt(Node *node) +{ + AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); + Assert(stmt->objectType == OBJECT_STATISTIC_EXT); + + List *nameList = (List *) stmt->object; + if (list_length(nameList) == 1) + { + RangeVar *stat = makeRangeVarFromNameList(nameList); + Oid schemaOid = RangeVarGetCreationNamespace(stat); + stat->schemaname = get_namespace_name(schemaOid); + stmt->object = (Node *) MakeNameListFromRangeVar(stat); + } +} diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 6db69297b..6cc8ab1ca 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -213,6 +213,23 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) break; } + case T_AlterOwnerStmt: + { + AlterOwnerStmt *alterOwnerStmt = castNode(AlterOwnerStmt, parseTree); + + /* we currently extend names in alter owner statements only for statistics */ + Assert(alterOwnerStmt->objectType == OBJECT_STATISTIC_EXT); + + RangeVar *stat = makeRangeVarFromNameList((List *) alterOwnerStmt->object); + + AppendShardIdToName(&stat->relname, shardId); + SetSchemaNameIfNotExist(&stat->schemaname, schemaName); + + alterOwnerStmt->object = (Node *) MakeNameListFromRangeVar(stat); + + break; + } + case T_ClusterStmt: { ClusterStmt *clusterStmt = (ClusterStmt *) parseTree; diff --git a/src/include/distributed/commands.h b/src/include/distributed/commands.h index e25aee2d2..338f3f25c 100644 --- a/src/include/distributed/commands.h +++ b/src/include/distributed/commands.h @@ -287,6 +287,7 @@ extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryS extern List * PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString); extern ObjectAddress AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk); extern List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString); +extern List * PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString); extern List * GetExplicitStatisticsCommandList(Oid relationId); extern List * GetExplicitStatisticsSchemaIdList(Oid relationId); diff --git a/src/include/distributed/deparser.h b/src/include/distributed/deparser.h index 7ff42b6e3..7e264544c 100644 --- a/src/include/distributed/deparser.h +++ b/src/include/distributed/deparser.h @@ -61,12 +61,14 @@ extern char * DeparseDropStatisticsStmt(List *nameList, bool ifExists); extern char * DeparseAlterStatisticsRenameStmt(Node *node); extern char * DeparseAlterStatisticsSchemaStmt(Node *node); extern char * DeparseAlterStatisticsStmt(Node *node); +extern char * DeparseAlterStatisticsOwnerStmt(Node *node); extern void QualifyCreateStatisticsStmt(Node *node); extern void QualifyDropStatisticsStmt(Node *node); extern void QualifyAlterStatisticsRenameStmt(Node *node); extern void QualifyAlterStatisticsSchemaStmt(Node *node); extern void QualifyAlterStatisticsStmt(Node *node); +extern void QualifyAlterStatisticsOwnerStmt(Node *node); /* forward declarations for deparse_type_stmts.c */ extern char * DeparseCompositeTypeStmt(Node *stmt); From d4bc17f6f0c8160695d576983d4969376a808a7e Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Thu, 24 Dec 2020 13:55:35 +0300 Subject: [PATCH 5/8] Propagate statistics with altered targets --- src/backend/distributed/commands/statistics.c | 63 ++++++++++++- .../deparser/deparse_statistics_stmts.c | 2 +- .../distributed/relay/relay_event_utility.c | 1 - .../regress/expected/propagate_statistics.out | 88 +++++++++++++++---- src/test/regress/sql/propagate_statistics.sql | 12 ++- 5 files changed, 142 insertions(+), 24 deletions(-) diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 7476b8ca9..9a1797fc8 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -45,7 +45,9 @@ static List * GetExplicitStatisticsIdList(Oid relationId); static Oid GetRelIdByStatsOid(Oid statsOid); - +#if PG_VERSION_NUM >= PG_VERSION_13 +static char * CreateAlterCommandIfTargetNotDefault(Oid statsOid); +#endif /* * PreprocessCreateStatisticsStmt is called during the planning phase for @@ -311,7 +313,7 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) ObjectAddress address = { 0 }; Value *statName = llast((List *) stmt->object); Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), - statName), false); + statName), missingOk); ObjectAddressSet(address, StatisticExtRelationId, statsOid); return address; @@ -406,6 +408,7 @@ List * GetExplicitStatisticsCommandList(Oid relationId) { List *createStatisticsCommandList = NIL; + List *alterStatisticsCommandList = NIL; PushOverrideEmptySearchPath(CurrentMemoryContext); @@ -420,11 +423,25 @@ GetExplicitStatisticsCommandList(Oid relationId) createStatisticsCommandList = lappend( createStatisticsCommandList, makeTableDDLCommandString(createStatisticsCommand)); +#if PG_VERSION_NUM >= PG_VERSION_13 + char *alterStatisticsTargetCommand = CreateAlterCommandIfTargetNotDefault( + statisticsId); + + if (alterStatisticsTargetCommand != NULL) + { + alterStatisticsCommandList = lappend(alterStatisticsCommandList, + makeTableDDLCommandString( + alterStatisticsTargetCommand)); + } +#endif } /* revert back to original search_path */ PopOverrideSearchPath(); + createStatisticsCommandList = list_concat(createStatisticsCommandList, + alterStatisticsCommandList); + return createStatisticsCommandList; } @@ -540,3 +557,45 @@ GetRelIdByStatsOid(Oid statsOid) return statisticsForm->stxrelid; } + + +#if PG_VERSION_NUM >= PG_VERSION_13 + +/* + * CreateAlterCommandIfTargetNotDefault returns an ALTER STATISTICS .. SET STATISTICS + * command if the stats object with given id has a target different than the default one. + * Returns NULL otherwise. + */ +static char * +CreateAlterCommandIfTargetNotDefault(Oid statsOid) +{ + HeapTuple tup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statsOid)); + + if (!HeapTupleIsValid(tup)) + { + ereport(WARNING, (errmsg("No stats object found with id: %u", statsOid))); + return NULL; + } + + Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup); + ReleaseSysCache(tup); + + if (statisticsForm->stxstattarget == -1) + { + return NULL; + } + + AlterStatsStmt *alterStatsStmt = palloc0(sizeof(AlterStatsStmt)); + + char *schemaName = get_namespace_name(statisticsForm->stxnamespace); + char *statName = NameStr(statisticsForm->stxname); + + alterStatsStmt->type = T_AlterStatsStmt; + alterStatsStmt->stxstattarget = statisticsForm->stxstattarget; + alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName)); + + return DeparseAlterStatisticsStmt((Node *) alterStatsStmt); +} + + +#endif diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index 74350c766..f4658b0d7 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -188,7 +188,7 @@ static void AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) { List *names = (List *) stmt->object; - appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s;", NameListToQuotedString( + appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s", NameListToQuotedString( names), RoleSpecString(stmt->newowner, true)); } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 6cc8ab1ca..c1f114d79 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -223,7 +223,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) RangeVar *stat = makeRangeVarFromNameList((List *) alterOwnerStmt->object); AppendShardIdToName(&stat->relname, shardId); - SetSchemaNameIfNotExist(&stat->schemaname, schemaName); alterOwnerStmt->object = (Node *) MakeNameListFromRangeVar(stat); diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index 18367b5d0..470a92826 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -75,15 +75,27 @@ CREATE STATISTICS s6 ON a,b FROM test_stats4; DROP STATISTICS s7; ERROR: statistics object "statistics'Test.s7" does not exist ALTER STATISTICS s6 RENAME TO s7; +ALTER STATISTICS sc1.st1 RENAME TO st1_new; -- test altering stats schema CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; -DROP STATISTICS test_alter_schema.s7; -- test altering stats target ALTER STATISTICS s1 SET STATISTICS 3; -- since max value for target is 10000, this will automatically be lowered ALTER STATISTICS s2 SET STATISTICS 999999; WARNING: lowering statistics target to 10000 +-- test alter target before distribution +CREATE TABLE targettable(a int, b int); +CREATE STATISTICS s8 ON a,b FROM targettable; +ALTER STATISTICS s8 SET STATISTICS 46; +SELECT create_distributed_table('targettable', 'b'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- test alter owner +ALTER STATISTICS s8 OWNER TO pg_monitor; \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext @@ -143,23 +155,39 @@ ORDER BY stxname ASC; s2_980058 s2_980060 s2_980062 - st1_980064 - st1_980066 - st1_980068 - st1_980070 - st1_980072 - st1_980074 - st1_980076 - st1_980078 - st1_980080 - st1_980082 - st1_980084 - st1_980086 - st1_980088 - st1_980090 - st1_980092 - st1_980094 -(64 rows) + s8_980129 + s8_980131 + s8_980133 + s8_980135 + s8_980137 + s8_980139 + s8_980141 + s8_980143 + s8_980145 + s8_980147 + s8_980149 + s8_980151 + s8_980153 + s8_980155 + s8_980157 + s8_980159 + st1_new_980064 + st1_new_980066 + st1_new_980068 + st1_new_980070 + st1_new_980072 + st1_new_980074 + st1_new_980076 + st1_new_980078 + st1_new_980080 + st1_new_980082 + st1_new_980084 + st1_new_980086 + st1_new_980088 + st1_new_980090 + st1_new_980092 + st1_new_980094 +(80 rows) SELECT count(DISTINCT stxnamespace) FROM pg_statistic_ext @@ -231,6 +259,22 @@ ORDER BY stxstattarget ASC; 3 3 3 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 + 46 10000 10000 10000 @@ -247,7 +291,13 @@ ORDER BY stxstattarget ASC; 10000 10000 10000 -(64 rows) +(80 rows) + +SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext; + count +--------------------------------------------------------------------- + 2 +(1 row) \c - - - :master_port SET client_min_messages TO WARNING; diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index dda73c368..e9b78ee57 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -61,16 +61,24 @@ DROP STATISTICS IF EXISTS s5,s5,s6,s6; CREATE STATISTICS s6 ON a,b FROM test_stats4; DROP STATISTICS s7; ALTER STATISTICS s6 RENAME TO s7; +ALTER STATISTICS sc1.st1 RENAME TO st1_new; -- test altering stats schema CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; -DROP STATISTICS test_alter_schema.s7; -- test altering stats target ALTER STATISTICS s1 SET STATISTICS 3; -- since max value for target is 10000, this will automatically be lowered ALTER STATISTICS s2 SET STATISTICS 999999; +-- test alter target before distribution +CREATE TABLE targettable(a int, b int); +CREATE STATISTICS s8 ON a,b FROM targettable; +ALTER STATISTICS s8 SET STATISTICS 46; +SELECT create_distributed_table('targettable', 'b'); + +-- test alter owner +ALTER STATISTICS s8 OWNER TO pg_monitor; \c - - - :worker_1_port SELECT stxname @@ -99,6 +107,8 @@ WHERE stxnamespace IN ( ) ORDER BY stxstattarget ASC; +SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext; + \c - - - :master_port SET client_min_messages TO WARNING; DROP SCHEMA "statistics'Test" CASCADE; From 5af585269a8de0119ff711d34d8b891e95c42b90 Mon Sep 17 00:00:00 2001 From: Ahmet Gedemenli Date: Thu, 24 Dec 2020 17:09:59 +0300 Subject: [PATCH 6/8] Add separate pg13 test for stats targets --- src/backend/distributed/commands/statistics.c | 74 +++++++-- .../deparser/deparse_statistics_stmts.c | 4 +- .../distributed/relay/relay_event_utility.c | 4 +- .../expected/pg13_propagate_statistics.out | 112 ++++++++++++++ .../expected/pg13_propagate_statistics_0.out | 6 + .../regress/expected/propagate_statistics.out | 143 ++++-------------- src/test/regress/multi_schedule | 1 + .../regress/sql/pg13_propagate_statistics.sql | 45 ++++++ src/test/regress/sql/propagate_statistics.sql | 24 +-- 9 files changed, 265 insertions(+), 148 deletions(-) create mode 100644 src/test/regress/expected/pg13_propagate_statistics.out create mode 100644 src/test/regress/expected/pg13_propagate_statistics_0.out create mode 100644 src/test/regress/sql/pg13_propagate_statistics.sql diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 9a1797fc8..c922a6f31 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -37,6 +37,7 @@ #include "distributed/relation_access_tracking.h" #include "distributed/resource_lock.h" #include "distributed/worker_transaction.h" +#include "miscadmin.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" @@ -45,6 +46,7 @@ static List * GetExplicitStatisticsIdList(Oid relationId); static Oid GetRelIdByStatsOid(Oid statsOid); +static char * CreateAlterCommandIfOwnerNotDefault(Oid statsOid); #if PG_VERSION_NUM >= PG_VERSION_13 static char * CreateAlterCommandIfTargetNotDefault(Oid statsOid); #endif @@ -420,20 +422,33 @@ GetExplicitStatisticsCommandList(Oid relationId) char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId, false); - createStatisticsCommandList = lappend( - createStatisticsCommandList, - makeTableDDLCommandString(createStatisticsCommand)); + createStatisticsCommandList = + lappend(createStatisticsCommandList, + makeTableDDLCommandString(createStatisticsCommand)); #if PG_VERSION_NUM >= PG_VERSION_13 - char *alterStatisticsTargetCommand = CreateAlterCommandIfTargetNotDefault( - statisticsId); + + /* we need to alter stats' target if it's getting distributed after creation */ + char *alterStatisticsTargetCommand = + CreateAlterCommandIfTargetNotDefault(statisticsId); if (alterStatisticsTargetCommand != NULL) { - alterStatisticsCommandList = lappend(alterStatisticsCommandList, - makeTableDDLCommandString( - alterStatisticsTargetCommand)); + alterStatisticsCommandList = + lappend(alterStatisticsCommandList, + makeTableDDLCommandString(alterStatisticsTargetCommand)); } #endif + + /* we need to alter stats' owner if it's getting distributed after creation */ + char *alterStatisticsOwnerCommand = + CreateAlterCommandIfOwnerNotDefault(statisticsId); + + if (alterStatisticsOwnerCommand != NULL) + { + alterStatisticsCommandList = + lappend(alterStatisticsCommandList, + makeTableDDLCommandString(alterStatisticsOwnerCommand)); + } } /* revert back to original search_path */ @@ -559,6 +574,46 @@ GetRelIdByStatsOid(Oid statsOid) } +/* + * CreateAlterCommandIfOwnerNotDefault returns an ALTER STATISTICS .. OWNER TO + * command if the stats object with given id has an owner different than the default one. + * Returns NULL otherwise. + */ +static char * +CreateAlterCommandIfOwnerNotDefault(Oid statsOid) +{ + HeapTuple tup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statsOid)); + + if (!HeapTupleIsValid(tup)) + { + ereport(WARNING, (errmsg("No stats object found with id: %u", statsOid))); + return NULL; + } + + Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup); + ReleaseSysCache(tup); + + if (statisticsForm->stxowner == GetUserId()) + { + return NULL; + } + + char *schemaName = get_namespace_name(statisticsForm->stxnamespace); + char *statName = NameStr(statisticsForm->stxname); + char *ownerName = GetUserNameFromId(statisticsForm->stxowner, false); + + StringInfoData str; + initStringInfo(&str); + + appendStringInfo(&str, "ALTER STATISTICS %s OWNER TO %s", + NameListToQuotedString(list_make2(makeString(schemaName), + makeString(statName))), + quote_identifier(ownerName)); + + return str.data; +} + + #if PG_VERSION_NUM >= PG_VERSION_13 /* @@ -585,12 +640,11 @@ CreateAlterCommandIfTargetNotDefault(Oid statsOid) return NULL; } - AlterStatsStmt *alterStatsStmt = palloc0(sizeof(AlterStatsStmt)); + AlterStatsStmt *alterStatsStmt = makeNode(AlterStatsStmt); char *schemaName = get_namespace_name(statisticsForm->stxnamespace); char *statName = NameStr(statisticsForm->stxname); - alterStatsStmt->type = T_AlterStatsStmt; alterStatsStmt->stxstattarget = statisticsForm->stxstattarget; alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName)); diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index f4658b0d7..125d4494f 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -188,8 +188,8 @@ static void AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) { List *names = (List *) stmt->object; - appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s", NameListToQuotedString( - names), + appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s", + NameListToQuotedString(names), RoleSpecString(stmt->newowner, true)); } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index c1f114d79..9e1ca2865 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -89,8 +89,7 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) RangeVar *stat = makeRangeVarFromNameList( (List *) alterObjectSchemaStmt->object); - /* set schema name and append shard id */ - SetSchemaNameIfNotExist(&stat->schemaname, schemaName); + /* append shard id */ AppendShardIdToName(&stat->relname, shardId); alterObjectSchemaStmt->object = (Node *) MakeNameListFromRangeVar(stat); @@ -118,7 +117,6 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) RangeVar *stat = makeRangeVarFromNameList(alterStatsStmt->defnames); AppendShardIdToName(&stat->relname, shardId); - SetSchemaNameIfNotExist(&stat->schemaname, schemaName); alterStatsStmt->defnames = MakeNameListFromRangeVar(stat); diff --git a/src/test/regress/expected/pg13_propagate_statistics.out b/src/test/regress/expected/pg13_propagate_statistics.out new file mode 100644 index 000000000..c75eb333f --- /dev/null +++ b/src/test/regress/expected/pg13_propagate_statistics.out @@ -0,0 +1,112 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve +\gset +\if :server_version_above_twelve +\else +\q +\endif +CREATE SCHEMA "statistics'TestTarget"; +SET search_path TO "statistics'TestTarget"; +SET citus.next_shard_id TO 980000; +SET client_min_messages TO WARNING; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; +CREATE TABLE t1 (a int, b int); +CREATE STATISTICS s1 ON a,b FROM t1; +CREATE STATISTICS s2 ON a,b FROM t1; +CREATE STATISTICS s3 ON a,b FROM t1; +CREATE STATISTICS s4 ON a,b FROM t1; +-- test altering stats target +-- test alter target before distribution +ALTER STATISTICS s1 SET STATISTICS 3; +-- since max value for target is 10000, this will automatically be lowered +ALTER STATISTICS s2 SET STATISTICS 999999; +WARNING: lowering statistics target to 10000 +SELECT create_distributed_table('t1', 'b'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +-- test alter target before distribution +ALTER STATISTICS s3 SET STATISTICS 46; +\c - - - :worker_1_port +SELECT stxstattarget, stxrelid::regclass +FROM pg_statistic_ext +WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('statistics''TestTarget') +) +ORDER BY stxstattarget, stxrelid::regclass ASC; + stxstattarget | stxrelid +--------------------------------------------------------------------- + -1 | "statistics'TestTarget".t1_980000 + -1 | "statistics'TestTarget".t1_980002 + -1 | "statistics'TestTarget".t1_980004 + -1 | "statistics'TestTarget".t1_980006 + -1 | "statistics'TestTarget".t1_980008 + -1 | "statistics'TestTarget".t1_980010 + -1 | "statistics'TestTarget".t1_980012 + -1 | "statistics'TestTarget".t1_980014 + -1 | "statistics'TestTarget".t1_980016 + -1 | "statistics'TestTarget".t1_980018 + -1 | "statistics'TestTarget".t1_980020 + -1 | "statistics'TestTarget".t1_980022 + -1 | "statistics'TestTarget".t1_980024 + -1 | "statistics'TestTarget".t1_980026 + -1 | "statistics'TestTarget".t1_980028 + -1 | "statistics'TestTarget".t1_980030 + 3 | "statistics'TestTarget".t1_980000 + 3 | "statistics'TestTarget".t1_980002 + 3 | "statistics'TestTarget".t1_980004 + 3 | "statistics'TestTarget".t1_980006 + 3 | "statistics'TestTarget".t1_980008 + 3 | "statistics'TestTarget".t1_980010 + 3 | "statistics'TestTarget".t1_980012 + 3 | "statistics'TestTarget".t1_980014 + 3 | "statistics'TestTarget".t1_980016 + 3 | "statistics'TestTarget".t1_980018 + 3 | "statistics'TestTarget".t1_980020 + 3 | "statistics'TestTarget".t1_980022 + 3 | "statistics'TestTarget".t1_980024 + 3 | "statistics'TestTarget".t1_980026 + 3 | "statistics'TestTarget".t1_980028 + 3 | "statistics'TestTarget".t1_980030 + 46 | "statistics'TestTarget".t1_980000 + 46 | "statistics'TestTarget".t1_980002 + 46 | "statistics'TestTarget".t1_980004 + 46 | "statistics'TestTarget".t1_980006 + 46 | "statistics'TestTarget".t1_980008 + 46 | "statistics'TestTarget".t1_980010 + 46 | "statistics'TestTarget".t1_980012 + 46 | "statistics'TestTarget".t1_980014 + 46 | "statistics'TestTarget".t1_980016 + 46 | "statistics'TestTarget".t1_980018 + 46 | "statistics'TestTarget".t1_980020 + 46 | "statistics'TestTarget".t1_980022 + 46 | "statistics'TestTarget".t1_980024 + 46 | "statistics'TestTarget".t1_980026 + 46 | "statistics'TestTarget".t1_980028 + 46 | "statistics'TestTarget".t1_980030 + 10000 | "statistics'TestTarget".t1_980000 + 10000 | "statistics'TestTarget".t1_980002 + 10000 | "statistics'TestTarget".t1_980004 + 10000 | "statistics'TestTarget".t1_980006 + 10000 | "statistics'TestTarget".t1_980008 + 10000 | "statistics'TestTarget".t1_980010 + 10000 | "statistics'TestTarget".t1_980012 + 10000 | "statistics'TestTarget".t1_980014 + 10000 | "statistics'TestTarget".t1_980016 + 10000 | "statistics'TestTarget".t1_980018 + 10000 | "statistics'TestTarget".t1_980020 + 10000 | "statistics'TestTarget".t1_980022 + 10000 | "statistics'TestTarget".t1_980024 + 10000 | "statistics'TestTarget".t1_980026 + 10000 | "statistics'TestTarget".t1_980028 + 10000 | "statistics'TestTarget".t1_980030 +(64 rows) + +\c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA "statistics'TestTarget" CASCADE; diff --git a/src/test/regress/expected/pg13_propagate_statistics_0.out b/src/test/regress/expected/pg13_propagate_statistics_0.out new file mode 100644 index 000000000..e25fbb82d --- /dev/null +++ b/src/test/regress/expected/pg13_propagate_statistics_0.out @@ -0,0 +1,6 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve +\gset +\if :server_version_above_twelve +\else +\q diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index 470a92826..80e501fff 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -79,23 +79,18 @@ ALTER STATISTICS sc1.st1 RENAME TO st1_new; -- test altering stats schema CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; --- test altering stats target -ALTER STATISTICS s1 SET STATISTICS 3; --- since max value for target is 10000, this will automatically be lowered -ALTER STATISTICS s2 SET STATISTICS 999999; -WARNING: lowering statistics target to 10000 --- test alter target before distribution -CREATE TABLE targettable(a int, b int); -CREATE STATISTICS s8 ON a,b FROM targettable; -ALTER STATISTICS s8 SET STATISTICS 46; -SELECT create_distributed_table('targettable', 'b'); +-- test alter owner +ALTER STATISTICS sc2."neW'Stat" OWNER TO pg_monitor; +-- test alter owner before distribution +CREATE TABLE ownertest(a int, b int); +CREATE STATISTICS sc1.s9 ON a,b FROM ownertest; +ALTER STATISTICS sc1.s9 OWNER TO pg_signal_backend; +SELECT create_distributed_table('ownertest','a'); create_distributed_table --------------------------------------------------------------------- (1 row) --- test alter owner -ALTER STATISTICS s8 OWNER TO pg_monitor; \c - - - :worker_1_port SELECT stxname FROM pg_statistic_ext @@ -155,22 +150,22 @@ ORDER BY stxname ASC; s2_980058 s2_980060 s2_980062 - s8_980129 - s8_980131 - s8_980133 - s8_980135 - s8_980137 - s8_980139 - s8_980141 - s8_980143 - s8_980145 - s8_980147 - s8_980149 - s8_980151 - s8_980153 - s8_980155 - s8_980157 - s8_980159 + s9_980129 + s9_980131 + s9_980133 + s9_980135 + s9_980137 + s9_980139 + s9_980141 + s9_980143 + s9_980145 + s9_980147 + s9_980149 + s9_980151 + s9_980153 + s9_980155 + s9_980157 + s9_980159 st1_new_980064 st1_new_980066 st1_new_980068 @@ -201,102 +196,16 @@ WHERE stxnamespace IN ( 3 (1 row) -SELECT stxstattarget +SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -) -ORDER BY stxstattarget ASC; - stxstattarget ---------------------------------------------------------------------- - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - -1 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 3 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 46 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 - 10000 -(80 rows) - -SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext; +); count --------------------------------------------------------------------- - 2 + 3 (1 row) \c - - - :master_port diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 5f862dc71..8a65fb475 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -94,6 +94,7 @@ test: tableam # Tests for statistics propagation # ---------- test: propagate_statistics +test: pg13_propagate_statistics # ---------- # Miscellaneous tests to check our query planning behavior diff --git a/src/test/regress/sql/pg13_propagate_statistics.sql b/src/test/regress/sql/pg13_propagate_statistics.sql new file mode 100644 index 000000000..9716d3289 --- /dev/null +++ b/src/test/regress/sql/pg13_propagate_statistics.sql @@ -0,0 +1,45 @@ +SHOW server_version \gset +SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve +\gset +\if :server_version_above_twelve +\else +\q +\endif + +CREATE SCHEMA "statistics'TestTarget"; +SET search_path TO "statistics'TestTarget"; +SET citus.next_shard_id TO 980000; +SET client_min_messages TO WARNING; +SET citus.shard_count TO 32; +SET citus.shard_replication_factor TO 1; + +CREATE TABLE t1 (a int, b int); +CREATE STATISTICS s1 ON a,b FROM t1; +CREATE STATISTICS s2 ON a,b FROM t1; +CREATE STATISTICS s3 ON a,b FROM t1; +CREATE STATISTICS s4 ON a,b FROM t1; + +-- test altering stats target +-- test alter target before distribution +ALTER STATISTICS s1 SET STATISTICS 3; +-- since max value for target is 10000, this will automatically be lowered +ALTER STATISTICS s2 SET STATISTICS 999999; + +SELECT create_distributed_table('t1', 'b'); + +-- test alter target before distribution +ALTER STATISTICS s3 SET STATISTICS 46; + +\c - - - :worker_1_port +SELECT stxstattarget, stxrelid::regclass +FROM pg_statistic_ext +WHERE stxnamespace IN ( + SELECT oid + FROM pg_namespace + WHERE nspname IN ('statistics''TestTarget') +) +ORDER BY stxstattarget, stxrelid::regclass ASC; + +\c - - - :master_port +SET client_min_messages TO WARNING; +DROP SCHEMA "statistics'TestTarget" CASCADE; diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index e9b78ee57..8a0372cf7 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -67,18 +67,13 @@ ALTER STATISTICS sc1.st1 RENAME TO st1_new; CREATE SCHEMA test_alter_schema; ALTER STATISTICS s7 SET SCHEMA test_alter_schema; --- test altering stats target -ALTER STATISTICS s1 SET STATISTICS 3; --- since max value for target is 10000, this will automatically be lowered -ALTER STATISTICS s2 SET STATISTICS 999999; --- test alter target before distribution -CREATE TABLE targettable(a int, b int); -CREATE STATISTICS s8 ON a,b FROM targettable; -ALTER STATISTICS s8 SET STATISTICS 46; -SELECT create_distributed_table('targettable', 'b'); - -- test alter owner -ALTER STATISTICS s8 OWNER TO pg_monitor; +ALTER STATISTICS sc2."neW'Stat" OWNER TO pg_monitor; +-- test alter owner before distribution +CREATE TABLE ownertest(a int, b int); +CREATE STATISTICS sc1.s9 ON a,b FROM ownertest; +ALTER STATISTICS sc1.s9 OWNER TO pg_signal_backend; +SELECT create_distributed_table('ownertest','a'); \c - - - :worker_1_port SELECT stxname @@ -98,16 +93,13 @@ WHERE stxnamespace IN ( WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') ); -SELECT stxstattarget +SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -) -ORDER BY stxstattarget ASC; - -SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext; +); \c - - - :master_port SET client_min_messages TO WARNING; From 4df723cf9bdb1ec952bf4bdf1a83bb0a5f0586d9 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Mon, 14 Dec 2020 23:17:22 -0800 Subject: [PATCH 7/8] Do metadata sync in a separate background worker. --- src/backend/distributed/commands/function.c | 4 + .../distributed/metadata/metadata_sync.c | 260 +++++++++++++++++- src/backend/distributed/test/metadata_sync.c | 26 ++ src/backend/distributed/utils/maintenanced.c | 85 ++++-- src/include/distributed/maintenanced.h | 1 + src/include/distributed/metadata_sync.h | 5 +- .../isolation_dump_global_wait_edges.out | 18 +- .../isolation_metadata_sync_deadlock.out | 198 +++++++++++++ .../expected/multi_mx_node_metadata.out | 210 ++++++++++++++ .../expected/multi_test_helpers_superuser.out | 2 +- src/test/regress/isolation_schedule | 1 + .../isolation_metadata_sync_deadlock.spec | 149 ++++++++++ .../regress/sql/multi_mx_node_metadata.sql | 105 +++++++ .../sql/multi_test_helpers_superuser.sql | 2 +- 14 files changed, 1031 insertions(+), 35 deletions(-) create mode 100644 src/test/regress/expected/isolation_metadata_sync_deadlock.out create mode 100644 src/test/regress/spec/isolation_metadata_sync_deadlock.spec diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 62b7cc880..398d803d6 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1138,6 +1138,10 @@ TriggerSyncMetadataToPrimaryNodes(void) triggerMetadataSync = true; } + else if (!workerNode->metadataSynced) + { + triggerMetadataSync = true; + } } /* let the maintanince deamon know about the metadata sync */ diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 93a222409..c80e2d8bc 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -14,6 +14,7 @@ #include "postgres.h" #include "miscadmin.h" +#include #include #include @@ -28,6 +29,7 @@ #include "catalog/pg_foreign_server.h" #include "catalog/pg_namespace.h" #include "catalog/pg_type.h" +#include "commands/async.h" #include "distributed/citus_ruleutils.h" #include "distributed/commands.h" #include "distributed/deparser.h" @@ -35,6 +37,7 @@ #include "distributed/listutils.h" #include "distributed/metadata_utility.h" #include "distributed/coordinator_protocol.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata/distobject.h" @@ -48,11 +51,15 @@ #include "foreign/foreign.h" #include "miscadmin.h" #include "nodes/pg_list.h" +#include "pgstat.h" +#include "postmaster/bgworker.h" +#include "postmaster/postmaster.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" +#include "utils/snapmgr.h" #include "utils/syscache.h" @@ -76,11 +83,18 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid, char *permission, bool withGrantOption); static char * GenerateSetRoleQuery(Oid roleOid); +static void MetadataSyncSigTermHandler(SIGNAL_ARGS); +static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); +static bool got_SIGTERM = false; +static bool got_SIGALRM = false; + +#define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon" + /* * start_metadata_sync_to_node function sets hasmetadata column of the given @@ -1497,7 +1511,7 @@ DetachPartitionCommandList(void) * metadata workers that are out of sync. Returns the result of * synchronization. */ -MetadataSyncResult +static MetadataSyncResult SyncMetadataToNodes(void) { MetadataSyncResult result = METADATA_SYNC_SUCCESS; @@ -1527,6 +1541,9 @@ SyncMetadataToNodes(void) if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts)) { + ereport(WARNING, (errmsg("failed to sync metadata to %s:%d", + workerNode->workerName, + workerNode->workerPort))); result = METADATA_SYNC_FAILED_SYNC; } else @@ -1539,3 +1556,244 @@ SyncMetadataToNodes(void) return result; } + + +/* + * SyncMetadataToNodesMain is the main function for syncing metadata to + * MX nodes. It retries until success and then exits. + */ +void +SyncMetadataToNodesMain(Datum main_arg) +{ + Oid databaseOid = DatumGetObjectId(main_arg); + + /* extension owner is passed via bgw_extra */ + Oid extensionOwner = InvalidOid; + memcpy_s(&extensionOwner, sizeof(extensionOwner), + MyBgworkerEntry->bgw_extra, sizeof(Oid)); + + pqsignal(SIGTERM, MetadataSyncSigTermHandler); + pqsignal(SIGALRM, MetadataSyncSigAlrmHandler); + BackgroundWorkerUnblockSignals(); + + /* connect to database, after that we can actually access catalogs */ + BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); + + /* make worker recognizable in pg_stat_activity */ + pgstat_report_appname(METADATA_SYNC_APP_NAME); + + bool syncedAllNodes = false; + + while (!syncedAllNodes) + { + InvalidateMetadataSystemCache(); + StartTransactionCommand(); + + /* + * Some functions in ruleutils.c, which we use to get the DDL for + * metadata propagation, require an active snapshot. + */ + PushActiveSnapshot(GetTransactionSnapshot()); + + if (!LockCitusExtension()) + { + ereport(DEBUG1, (errmsg("could not lock the citus extension, " + "skipping metadata sync"))); + } + else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) + { + UseCoordinatedTransaction(); + MetadataSyncResult result = SyncMetadataToNodes(); + + syncedAllNodes = (result == METADATA_SYNC_SUCCESS); + + /* we use LISTEN/NOTIFY to wait for metadata syncing in tests */ + if (result != METADATA_SYNC_FAILED_LOCK) + { + Async_Notify(METADATA_SYNC_CHANNEL, NULL); + } + } + + PopActiveSnapshot(); + CommitTransactionCommand(); + ProcessCompletedNotifies(); + + if (syncedAllNodes) + { + break; + } + + /* + * If backend is cancelled (e.g. bacause of distributed deadlock), + * CHECK_FOR_INTERRUPTS() will raise a cancellation error which will + * result in exit(1). + */ + CHECK_FOR_INTERRUPTS(); + + /* + * SIGTERM is used for when maintenance daemon tries to clean-up + * metadata sync daemons spawned by terminated maintenance daemons. + */ + if (got_SIGTERM) + { + exit(0); + } + + /* + * SIGALRM is used for testing purposes and it simulates an error in metadata + * sync daemon. + */ + if (got_SIGALRM) + { + elog(ERROR, "Error in metadata sync daemon"); + } + + pg_usleep(MetadataSyncRetryInterval * 1000); + } +} + + +/* + * MetadataSyncSigTermHandler set a flag to request termination of metadata + * sync daemon. + */ +static void +MetadataSyncSigTermHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGTERM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * MetadataSyncSigAlrmHandler set a flag to request error at metadata + * sync daemon. This is used for testing purposes. + */ +static void +MetadataSyncSigAlrmHandler(SIGNAL_ARGS) +{ + int save_errno = errno; + + got_SIGALRM = true; + if (MyProc != NULL) + { + SetLatch(&MyProc->procLatch); + } + + errno = save_errno; +} + + +/* + * SpawnSyncMetadataToNodes starts a background worker which runs metadata + * sync. On success it returns workers' handle. Otherwise it returns NULL. + */ +BackgroundWorkerHandle * +SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner) +{ + BackgroundWorker worker; + BackgroundWorkerHandle *handle = NULL; + + /* Configure a worker. */ + memset(&worker, 0, sizeof(worker)); + SafeSnprintf(worker.bgw_name, BGW_MAXLEN, + "Citus Metadata Sync: %u/%u", + database, extensionOwner); + worker.bgw_flags = + BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; + worker.bgw_start_time = BgWorkerStart_ConsistentState; + + /* don't restart, we manage restarts from maintenance daemon */ + worker.bgw_restart_time = BGW_NEVER_RESTART; + strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); + strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), + "SyncMetadataToNodesMain"); + worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); + memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, + sizeof(Oid)); + worker.bgw_notify_pid = MyProcPid; + + if (!RegisterDynamicBackgroundWorker(&worker, &handle)) + { + return NULL; + } + + pid_t pid; + WaitForBackgroundWorkerStartup(handle, &pid); + + return handle; +} + + +/* + * SignalMetadataSyncDaemon signals metadata sync daemons belonging to + * the given database. + */ +void +SignalMetadataSyncDaemon(Oid database, int sig) +{ + int backendCount = pgstat_fetch_stat_numbackends(); + for (int backend = 1; backend <= backendCount; backend++) + { + LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend); + if (!localBeEntry) + { + continue; + } + + PgBackendStatus *beStatus = &localBeEntry->backendStatus; + if (beStatus->st_databaseid == database && + strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0) + { + kill(beStatus->st_procpid, sig); + } + } +} + + +/* + * ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated. + * It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the + * check. + */ +bool +ShouldInitiateMetadataSync(bool *lockFailure) +{ + if (!IsCoordinator()) + { + *lockFailure = false; + return false; + } + + Oid distNodeOid = DistNodeRelationId(); + if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock)) + { + *lockFailure = true; + return false; + } + + bool shouldSyncMetadata = false; + + List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, workerList) + { + if (workerNode->hasMetadata && !workerNode->metadataSynced) + { + shouldSyncMetadata = true; + break; + } + } + + UnlockRelationOid(distNodeOid, AccessShareLock); + + *lockFailure = false; + return shouldSyncMetadata; +} diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index 2a1b6c290..c2dc1ce4f 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -16,6 +16,7 @@ #include "catalog/pg_type.h" #include "distributed/connection_management.h" #include "distributed/listutils.h" +#include "distributed/maintenanced.h" #include "distributed/metadata_sync.h" #include "distributed/remote_commands.h" #include "postmaster/postmaster.h" @@ -28,6 +29,8 @@ /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_metadata_snapshot); PG_FUNCTION_INFO_V1(wait_until_metadata_sync); +PG_FUNCTION_INFO_V1(trigger_metadata_sync); +PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync); /* @@ -124,3 +127,26 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) PG_RETURN_VOID(); } + + +/* + * trigger_metadata_sync triggers metadata sync for testing. + */ +Datum +trigger_metadata_sync(PG_FUNCTION_ARGS) +{ + TriggerMetadataSync(MyDatabaseId); + PG_RETURN_VOID(); +} + + +/* + * raise_error_in_metadata_sync causes metadata sync to raise an error. + */ +Datum +raise_error_in_metadata_sync(PG_FUNCTION_ARGS) +{ + /* metadata sync uses SIGALRM to test errors */ + SignalMetadataSyncDaemon(MyDatabaseId, SIGALRM); + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/utils/maintenanced.c b/src/backend/distributed/utils/maintenanced.c index 74ac7fbe5..66c58d30d 100644 --- a/src/backend/distributed/utils/maintenanced.c +++ b/src/backend/distributed/utils/maintenanced.c @@ -118,7 +118,6 @@ static size_t MaintenanceDaemonShmemSize(void); static void MaintenanceDaemonShmemInit(void); static void MaintenanceDaemonShmemExit(int code, Datum arg); static void MaintenanceDaemonErrorContext(void *arg); -static bool LockCitusExtension(void); static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData); static void WarnMaintenanceDaemonNotStarted(void); @@ -291,6 +290,13 @@ CitusMaintenanceDaemonMain(Datum main_arg) TimestampTz lastRecoveryTime = 0; TimestampTz nextMetadataSyncTime = 0; + + /* + * We do metadata sync in a separate background worker. We need its + * handle to be able to check its status. + */ + BackgroundWorkerHandle *metadataSyncBgwHandle = NULL; + /* * Look up this worker's configuration. */ @@ -371,6 +377,12 @@ CitusMaintenanceDaemonMain(Datum main_arg) /* make worker recognizable in pg_stat_activity */ pgstat_report_appname("Citus Maintenance Daemon"); + /* + * Terminate orphaned metadata sync daemons spawned from previously terminated + * or crashed maintenanced instances. + */ + SignalMetadataSyncDaemon(databaseOid, SIGTERM); + /* enter main loop */ while (!got_SIGTERM) { @@ -450,21 +462,42 @@ CitusMaintenanceDaemonMain(Datum main_arg) } #endif - if (!RecoveryInProgress() && - (MetadataSyncTriggeredCheckAndReset(myDbData) || - GetCurrentTimestamp() >= nextMetadataSyncTime)) + pid_t metadataSyncBgwPid = 0; + BgwHandleStatus metadataSyncStatus = + metadataSyncBgwHandle != NULL ? + GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) : + BGWH_STOPPED; + + if (metadataSyncStatus != BGWH_STOPPED && + GetCurrentTimestamp() >= nextMetadataSyncTime) { - bool metadataSyncFailed = false; + /* + * Metadata sync is still running, recheck in a short while. + */ + int nextTimeout = MetadataSyncRetryInterval; + nextMetadataSyncTime = + TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); + timeout = Min(timeout, nextTimeout); + } + else if (!RecoveryInProgress() && + metadataSyncStatus == BGWH_STOPPED && + (MetadataSyncTriggeredCheckAndReset(myDbData) || + GetCurrentTimestamp() >= nextMetadataSyncTime)) + { + if (metadataSyncBgwHandle) + { + TerminateBackgroundWorker(metadataSyncBgwHandle); + pfree(metadataSyncBgwHandle); + metadataSyncBgwHandle = NULL; + } InvalidateMetadataSystemCache(); StartTransactionCommand(); - - /* - * Some functions in ruleutils.c, which we use to get the DDL for - * metadata propagation, require an active snapshot. - */ PushActiveSnapshot(GetTransactionSnapshot()); + int nextTimeout = MetadataSyncRetryInterval; + bool syncMetadata = false; + if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " @@ -472,25 +505,28 @@ CitusMaintenanceDaemonMain(Datum main_arg) } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { - MetadataSyncResult result = SyncMetadataToNodes(); - metadataSyncFailed = (result != METADATA_SYNC_SUCCESS); + bool lockFailure = false; + syncMetadata = ShouldInitiateMetadataSync(&lockFailure); /* - * Notification means we had an attempt on synchronization - * without being blocked for pg_dist_node access. + * If lock fails, we need to recheck in a short while. If we are + * going to sync metadata, we should recheck in a short while to + * see if it failed. Otherwise, we can wait longer. */ - if (result != METADATA_SYNC_FAILED_LOCK) - { - Async_Notify(METADATA_SYNC_CHANNEL, NULL); - } + nextTimeout = (lockFailure || syncMetadata) ? + MetadataSyncRetryInterval : + MetadataSyncInterval; } PopActiveSnapshot(); CommitTransactionCommand(); - ProcessCompletedNotifies(); - int64 nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval : - MetadataSyncInterval; + if (syncMetadata) + { + metadataSyncBgwHandle = + SpawnSyncMetadataToNodes(MyDatabaseId, myDbData->userOid); + } + nextMetadataSyncTime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout); timeout = Min(timeout, nextTimeout); @@ -626,6 +662,11 @@ CitusMaintenanceDaemonMain(Datum main_arg) ProcessConfigFile(PGC_SIGHUP); } } + + if (metadataSyncBgwHandle) + { + TerminateBackgroundWorker(metadataSyncBgwHandle); + } } @@ -786,7 +827,7 @@ MaintenanceDaemonErrorContext(void *arg) * LockCitusExtension acquires a lock on the Citus extension or returns * false if the extension does not exist or is being dropped. */ -static bool +bool LockCitusExtension(void) { Oid extensionOid = get_extension_oid("citus", true); diff --git a/src/include/distributed/maintenanced.h b/src/include/distributed/maintenanced.h index ee9662047..7ff42674f 100644 --- a/src/include/distributed/maintenanced.h +++ b/src/include/distributed/maintenanced.h @@ -25,6 +25,7 @@ extern void StopMaintenanceDaemon(Oid databaseId); extern void TriggerMetadataSync(Oid databaseId); extern void InitializeMaintenanceDaemon(void); extern void InitializeMaintenanceDaemonBackend(void); +extern bool LockCitusExtension(void); extern void CitusMaintenanceDaemonMain(Datum main_arg); diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index 88f1e63a9..8538830ba 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -50,11 +50,14 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha extern void CreateTableMetadataOnWorkers(Oid relationId); extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata); extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced); -extern MetadataSyncResult SyncMetadataToNodes(void); +extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32 nodePort, const char *nodeUser, List *commandList); +extern void SyncMetadataToNodesMain(Datum main_arg); +extern void SignalMetadataSyncDaemon(Oid database, int sig); +extern bool ShouldInitiateMetadataSync(bool *lockFailure); #define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE" #define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \ diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 71a7f1a7a..b0343b982 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -28,11 +28,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -390 389 f +395 394 f transactionnumberwaitingtransactionnumbers -389 -390 389 +394 +395 394 step s1-abort: ABORT; @@ -75,14 +75,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -394 393 f -395 393 f -395 394 t +399 398 f +400 398 f +400 399 t transactionnumberwaitingtransactionnumbers -393 -394 393 -395 393,394 +398 +399 398 +400 398,399 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_metadata_sync_deadlock.out b/src/test/regress/expected/isolation_metadata_sync_deadlock.out new file mode 100644 index 000000000..0977caeea --- /dev/null +++ b/src/test/regress/expected/isolation_metadata_sync_deadlock.out @@ -0,0 +1,198 @@ +Parsed test spec with 3 sessions + +starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection +create_distributed_table + + +step enable-deadlock-detection: + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s1-begin: + BEGIN; + +step s1-update-1: + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1; + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-update-2-on-worker: + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2'); + +run_commands_on_session_level_connection_to_node + + +step s2-truncate-on-worker: + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); + +run_commands_on_session_level_connection_to_node + + +step s3-invalidate-metadata: + update pg_dist_node SET metadatasynced = false; + +step s3-resync: + SELECT trigger_metadata_sync(); + SELECT pg_sleep(2); + +trigger_metadata_sync + + +pg_sleep + + +step s2-update-1-on-worker: + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1'); + +step s1-update-2: + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2; + +step s1-update-2: <... completed> +step s2-update-1-on-worker: <... completed> +run_commands_on_session_level_connection_to_node + + +error in steps s1-update-2 s2-update-1-on-worker: ERROR: canceling the transaction since it was involved in a distributed deadlock +step s1-commit: + COMMIT; + +step s2-commit-on-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step disable-deadlock-detection: + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +restore_isolation_tester_func + + + +starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync +create_distributed_table + + +step increase-retry-interval: + ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-start-session-level-connection: + SELECT start_session_level_connection_to_node('localhost', 57638); + +start_session_level_connection_to_node + + +step s2-begin-on-worker: + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); + +run_commands_on_session_level_connection_to_node + + +step s2-truncate-on-worker: + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); + +run_commands_on_session_level_connection_to_node + + +step s3-invalidate-metadata: + update pg_dist_node SET metadatasynced = false; + +step s3-resync: + SELECT trigger_metadata_sync(); + SELECT pg_sleep(2); + +trigger_metadata_sync + + +pg_sleep + + +step s1-count-daemons: + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +count + +1 +step s1-cancel-metadata-sync: + SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + SELECT pg_sleep(2); + +pg_cancel_backend + +t +pg_sleep + + +step s1-count-daemons: + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +count + +0 +step reset-retry-interval: + ALTER SYSTEM RESET citus.metadata_sync_retry_interval; + +step reload-conf: + SELECT pg_reload_conf(); + +pg_reload_conf + +t +step s2-commit-on-worker: + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); + +run_commands_on_session_level_connection_to_node + + +step s2-stop-connection: + SELECT stop_session_level_connection_to_node(); + +stop_session_level_connection_to_node + + +step s3-resync: + SELECT trigger_metadata_sync(); + SELECT pg_sleep(2); + +trigger_metadata_sync + + +pg_sleep + + +restore_isolation_tester_func + + diff --git a/src/test/regress/expected/multi_mx_node_metadata.out b/src/test/regress/expected/multi_mx_node_metadata.out index e1321e549..bf473c310 100644 --- a/src/test/regress/expected/multi_mx_node_metadata.out +++ b/src/test/regress/expected/multi_mx_node_metadata.out @@ -21,6 +21,27 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE master_run_on_worker(ARRAY[hostname], ARRAY[port], ARRAY['SELECT pg_reload_conf()'], false); $$; +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$ +declare + counter integer := -1; +begin + while counter != target_count loop + -- pg_stat_activity is cached at xact level and there is no easy way to clear it. + -- Look it up in a new connection to get latest updates. + SELECT result::int into counter FROM + master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[ + 'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false); + PERFORM pg_sleep(0.1); + end loop; +end$$ LANGUAGE plpgsql; -- add a node to the cluster SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; @@ -152,6 +173,142 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; 2 | t | f (1 row) +-- verify that metadata sync daemon has started +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- terminate maintenance daemon, and verify that we don't spawn multiple +-- metadata sync daemons +-- +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; + pg_terminate_backend +--------------------------------------------------------------------- + t +(1 row) + +CALL wait_until_process_count('Citus Maintenance Daemon', 1); +select trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- cancel metadata sync daemon, and verify that it exits and restarts. +-- +select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted; + metadata_sync_restarted +--------------------------------------------------------------------- + t +(1 row) + +-- +-- cancel metadata sync daemon so it exits and restarts, but at the +-- same time tell maintenanced to trigger a new metadata sync. One +-- of these should exit to avoid multiple metadata syncs. +-- +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; + pg_cancel_backend +--------------------------------------------------------------------- + t +(1 row) + +select trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes. +select pg_sleep(1.2); + pg_sleep +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- +-- error in metadata sync daemon, and verify it exits and restarts. +-- +select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select raise_error_in_metadata_sync(); + raise_error_in_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_error != :pid_after_error AS metadata_sync_restarted; + metadata_sync_restarted +--------------------------------------------------------------------- + t +(1 row) + +SELECT trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT wait_until_metadata_sync(30000); + wait_until_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + count +--------------------------------------------------------------------- + 1 +(1 row) + -- update it back to :worker_1_port, now metadata should be synced SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); ?column? @@ -594,6 +751,59 @@ SELECT verify_metadata('localhost', :worker_1_port); t (1 row) +-- verify that metadata sync daemon exits +call wait_until_process_count('Citus Metadata Sync Daemon', 0); +-- verify that DROP DATABASE terminates metadata sync +SELECT current_database() datname \gset +CREATE DATABASE db_to_drop; +NOTICE: Citus partially supports CREATE DATABASE for distributed databases +SELECT run_command_on_workers('CREATE DATABASE db_to_drop'); + run_command_on_workers +--------------------------------------------------------------------- + (localhost,57637,t,"CREATE DATABASE") + (localhost,57638,t,"CREATE DATABASE") +(2 rows) + +\c db_to_drop - - :worker_1_port +CREATE EXTENSION citus; +\c db_to_drop - - :master_port +CREATE EXTENSION citus; +SELECT master_add_node('localhost', :worker_1_port); + master_add_node +--------------------------------------------------------------------- + 1 +(1 row) + +UPDATE pg_dist_node SET hasmetadata = true; +SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node; + master_update_node +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; +SELECT trigger_metadata_sync(); + trigger_metadata_sync +--------------------------------------------------------------------- + +(1 row) + +\c :datname - - :master_port +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + datname +--------------------------------------------------------------------- + db_to_drop +(1 row) + +DROP DATABASE db_to_drop; +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + datname +--------------------------------------------------------------------- +(0 rows) + -- cleanup DROP TABLE ref_table; TRUNCATE pg_dist_colocation; diff --git a/src/test/regress/expected/multi_test_helpers_superuser.out b/src/test/regress/expected/multi_test_helpers_superuser.out index b631814f8..01676131c 100644 --- a/src/test/regress/expected/multi_test_helpers_superuser.out +++ b/src/test/regress/expected/multi_test_helpers_superuser.out @@ -26,7 +26,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index e734870ee..3277de0b2 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -80,3 +80,4 @@ test: isolation_insert_select_vs_all_on_mx test: isolation_ref_select_for_update_vs_all_on_mx test: isolation_ref_update_delete_upsert_vs_all_on_mx test: isolation_dis2ref_foreign_keys_on_mx +test: isolation_metadata_sync_deadlock diff --git a/src/test/regress/spec/isolation_metadata_sync_deadlock.spec b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec new file mode 100644 index 000000000..9aa9a05b2 --- /dev/null +++ b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec @@ -0,0 +1,149 @@ +#include "isolation_mx_common.include.spec" + +setup +{ + CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + + CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000) + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + + CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int); + INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i; + SELECT create_distributed_table('deadlock_detection_test', 'user_id'); + + CREATE TABLE t2(a int); + SELECT create_distributed_table('t2', 'a'); +} + +teardown +{ + DROP FUNCTION trigger_metadata_sync(); + DROP TABLE deadlock_detection_test; + DROP TABLE t2; + SET citus.shard_replication_factor = 1; + SELECT citus_internal.restore_isolation_tester_func(); +} + +session "s1" + +step "increase-retry-interval" +{ + ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000; +} + +step "reset-retry-interval" +{ + ALTER SYSTEM RESET citus.metadata_sync_retry_interval; +} + +step "enable-deadlock-detection" +{ + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1; +} + +step "disable-deadlock-detection" +{ + ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1; +} + +step "reload-conf" +{ + SELECT pg_reload_conf(); +} + +step "s1-begin" +{ + BEGIN; +} + +step "s1-update-1" +{ + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1; +} + +step "s1-update-2" +{ + UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2; +} + +step "s1-commit" +{ + COMMIT; +} + +step "s1-count-daemons" +{ + SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; +} + +step "s1-cancel-metadata-sync" +{ + SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + SELECT pg_sleep(2); +} + +session "s2" + +step "s2-start-session-level-connection" +{ + SELECT start_session_level_connection_to_node('localhost', 57638); +} + +step "s2-stop-connection" +{ + SELECT stop_session_level_connection_to_node(); +} + +step "s2-begin-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('BEGIN'); +} + +step "s2-update-1-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1'); +} + +step "s2-update-2-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2'); +} + +step "s2-truncate-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2'); +} + +step "s2-commit-on-worker" +{ + SELECT run_commands_on_session_level_connection_to_node('COMMIT'); +} + +session "s3" + +step "s3-invalidate-metadata" +{ + update pg_dist_node SET metadatasynced = false; +} + +step "s3-resync" +{ + SELECT trigger_metadata_sync(); + SELECT pg_sleep(2); +} + +// Backends can block metadata sync. The following test verifies that if this happens, +// we still do distributed deadlock detection. In the following, s2-truncate-on-worker +// causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are +// themselves involved in a distributed deadlock. +// See https://github.com/citusdata/citus/issues/4393 for more details. +permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection" + +// Test that when metadata sync is waiting for locks, cancelling it terminates it. +// This is important in cases where the metadata sync daemon itself is involved in a deadlock. +permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" diff --git a/src/test/regress/sql/multi_mx_node_metadata.sql b/src/test/regress/sql/multi_mx_node_metadata.sql index 2f6b22872..834baa09e 100644 --- a/src/test/regress/sql/multi_mx_node_metadata.sql +++ b/src/test/regress/sql/multi_mx_node_metadata.sql @@ -27,6 +27,30 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE ARRAY['SELECT pg_reload_conf()'], false); $$; +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$ +declare + counter integer := -1; +begin + while counter != target_count loop + -- pg_stat_activity is cached at xact level and there is no easy way to clear it. + -- Look it up in a new connection to get latest updates. + SELECT result::int into counter FROM + master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[ + 'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false); + PERFORM pg_sleep(0.1); + end loop; +end$$ LANGUAGE plpgsql; + -- add a node to the cluster SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node; @@ -79,6 +103,54 @@ END; SELECT wait_until_metadata_sync(30000); SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node; +-- verify that metadata sync daemon has started +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- terminate maintenance daemon, and verify that we don't spawn multiple +-- metadata sync daemons +-- +SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon'; +CALL wait_until_process_count('Citus Maintenance Daemon', 1); +select trigger_metadata_sync(); +select wait_until_metadata_sync(); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- cancel metadata sync daemon, and verify that it exits and restarts. +-- +select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; +select wait_until_metadata_sync(); +select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted; + +-- +-- cancel metadata sync daemon so it exits and restarts, but at the +-- same time tell maintenanced to trigger a new metadata sync. One +-- of these should exit to avoid multiple metadata syncs. +-- +select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon'; +select trigger_metadata_sync(); +select wait_until_metadata_sync(); +-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes. +select pg_sleep(1.2); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + +-- +-- error in metadata sync daemon, and verify it exits and restarts. +-- +select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select raise_error_in_metadata_sync(); +select wait_until_metadata_sync(30000); +select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset +select :pid_before_error != :pid_after_error AS metadata_sync_restarted; + + +SELECT trigger_metadata_sync(); +SELECT wait_until_metadata_sync(30000); +SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon'; + -- update it back to :worker_1_port, now metadata should be synced SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port); SELECT wait_until_metadata_sync(30000); @@ -249,6 +321,39 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port); SELECT verify_metadata('localhost', :worker_1_port); +-- verify that metadata sync daemon exits +call wait_until_process_count('Citus Metadata Sync Daemon', 0); + +-- verify that DROP DATABASE terminates metadata sync +SELECT current_database() datname \gset +CREATE DATABASE db_to_drop; +SELECT run_command_on_workers('CREATE DATABASE db_to_drop'); + +\c db_to_drop - - :worker_1_port +CREATE EXTENSION citus; + +\c db_to_drop - - :master_port +CREATE EXTENSION citus; +SELECT master_add_node('localhost', :worker_1_port); +UPDATE pg_dist_node SET hasmetadata = true; + +SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node; + +CREATE OR REPLACE FUNCTION trigger_metadata_sync() + RETURNS void + LANGUAGE C STRICT + AS 'citus'; + +SELECT trigger_metadata_sync(); + +\c :datname - - :master_port + +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + +DROP DATABASE db_to_drop; + +SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%'; + -- cleanup DROP TABLE ref_table; TRUNCATE pg_dist_colocation; diff --git a/src/test/regress/sql/multi_test_helpers_superuser.sql b/src/test/regress/sql/multi_test_helpers_superuser.sql index aa7b3ee66..0bd360b12 100644 --- a/src/test/regress/sql/multi_test_helpers_superuser.sql +++ b/src/test/regress/sql/multi_test_helpers_superuser.sql @@ -23,7 +23,7 @@ WITH dist_node_summary AS ( ARRAY[dist_node_summary.query, dist_node_summary.query], false) ), dist_placement_summary AS ( - SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query + SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query ), dist_placement_check AS ( SELECT count(distinct result) = 1 AS matches FROM dist_placement_summary CROSS JOIN LATERAL From a2c73bef27baf07351aa0a3bc0e22e4636dd7052 Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 23 Dec 2020 13:27:22 -0800 Subject: [PATCH 8/8] Trigger metadata sync at transaction commit --- src/backend/distributed/commands/function.c | 2 +- .../distributed/metadata/node_metadata.c | 4 +-- src/backend/distributed/test/metadata_sync.c | 2 +- .../transaction/transaction_management.c | 26 +++++++++++++++++++ .../distributed/transaction_management.h | 1 + .../isolation_metadata_sync_deadlock.out | 16 ++++++++---- .../isolation_metadata_sync_deadlock.spec | 8 ++++-- 7 files changed, 48 insertions(+), 11 deletions(-) diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 398d803d6..a9c29b3e1 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -1147,7 +1147,7 @@ TriggerSyncMetadataToPrimaryNodes(void) /* let the maintanince deamon know about the metadata sync */ if (triggerMetadataSync) { - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } } diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 65176152a..335cb115b 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -444,7 +444,7 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode) { MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort, true); - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } } } @@ -810,7 +810,7 @@ master_update_node(PG_FUNCTION_ARGS) */ if (UnsetMetadataSyncedForAll()) { - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); } if (handle != NULL) diff --git a/src/backend/distributed/test/metadata_sync.c b/src/backend/distributed/test/metadata_sync.c index c2dc1ce4f..47e12ce7a 100644 --- a/src/backend/distributed/test/metadata_sync.c +++ b/src/backend/distributed/test/metadata_sync.c @@ -135,7 +135,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS) Datum trigger_metadata_sync(PG_FUNCTION_ARGS) { - TriggerMetadataSync(MyDatabaseId); + TriggerMetadataSyncOnCommit(); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index 1999c836c..96a4180a4 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -28,6 +28,7 @@ #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/locally_reserved_shared_connections.h" +#include "distributed/maintenanced.h" #include "distributed/multi_executor.h" #include "distributed/multi_explain.h" #include "distributed/repartition_join_execution.h" @@ -102,6 +103,9 @@ bool CoordinatedTransactionUses2PC = false; /* if disabled, distributed statements in a function may run as separate transactions */ bool FunctionOpensTransactionBlock = true; +/* if true, we should trigger metadata sync on commit */ +bool MetadataSyncOnCommit = false; + /* transaction management functions */ static void CoordinatedTransactionCallback(XactEvent event, void *arg); @@ -262,6 +266,15 @@ CoordinatedTransactionCallback(XactEvent event, void *arg) AfterXactConnectionHandling(true); } + /* + * Changes to catalog tables are now visible to the metadata sync + * daemon, so we can trigger metadata sync if necessary. + */ + if (MetadataSyncOnCommit) + { + TriggerMetadataSync(MyDatabaseId); + } + ResetGlobalVariables(); /* @@ -474,6 +487,7 @@ ResetGlobalVariables() activeSetStmts = NULL; CoordinatedTransactionUses2PC = false; TransactionModifiedNodeMetadata = false; + MetadataSyncOnCommit = false; ResetWorkerErrorIndication(); } @@ -728,3 +742,15 @@ MaybeExecutingUDF(void) { return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0); } + + +/* + * TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit. + * This is because new metadata only becomes visible to the metadata sync + * daemon after commit happens. + */ +void +TriggerMetadataSyncOnCommit(void) +{ + MetadataSyncOnCommit = true; +} diff --git a/src/include/distributed/transaction_management.h b/src/include/distributed/transaction_management.h index a0a595fac..f6bac9100 100644 --- a/src/include/distributed/transaction_management.h +++ b/src/include/distributed/transaction_management.h @@ -121,6 +121,7 @@ extern void InitializeTransactionManagement(void); /* other functions */ extern List * ActiveSubXactContexts(void); extern StringInfo BeginAndSetDistributedTransactionIdCommand(void); +extern void TriggerMetadataSyncOnCommit(void); #endif /* TRANSACTION_MANAGMENT_H */ diff --git a/src/test/regress/expected/isolation_metadata_sync_deadlock.out b/src/test/regress/expected/isolation_metadata_sync_deadlock.out index 0977caeea..e19498f6f 100644 --- a/src/test/regress/expected/isolation_metadata_sync_deadlock.out +++ b/src/test/regress/expected/isolation_metadata_sync_deadlock.out @@ -1,6 +1,6 @@ Parsed test spec with 3 sessions -starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection +starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection create_distributed_table @@ -48,11 +48,13 @@ step s3-invalidate-metadata: step s3-resync: SELECT trigger_metadata_sync(); - SELECT pg_sleep(2); trigger_metadata_sync +step s3-wait: + SELECT pg_sleep(2); + pg_sleep @@ -96,7 +98,7 @@ restore_isolation_tester_func -starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync +starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync s3-wait create_distributed_table @@ -132,11 +134,13 @@ step s3-invalidate-metadata: step s3-resync: SELECT trigger_metadata_sync(); - SELECT pg_sleep(2); trigger_metadata_sync +step s3-wait: + SELECT pg_sleep(2); + pg_sleep @@ -185,11 +189,13 @@ stop_session_level_connection_to_node step s3-resync: SELECT trigger_metadata_sync(); - SELECT pg_sleep(2); trigger_metadata_sync +step s3-wait: + SELECT pg_sleep(2); + pg_sleep diff --git a/src/test/regress/spec/isolation_metadata_sync_deadlock.spec b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec index 9aa9a05b2..c5cabfd84 100644 --- a/src/test/regress/spec/isolation_metadata_sync_deadlock.spec +++ b/src/test/regress/spec/isolation_metadata_sync_deadlock.spec @@ -134,6 +134,10 @@ step "s3-invalidate-metadata" step "s3-resync" { SELECT trigger_metadata_sync(); +} + +step "s3-wait" +{ SELECT pg_sleep(2); } @@ -142,8 +146,8 @@ step "s3-resync" // causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are // themselves involved in a distributed deadlock. // See https://github.com/citusdata/citus/issues/4393 for more details. -permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection" +permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection" // Test that when metadata sync is waiting for locks, cancelling it terminates it. // This is important in cases where the metadata sync daemon itself is involved in a deadlock. -permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" +permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" "s3-wait"