mirror of https://github.com/citusdata/citus.git
Merge branch 'master' into issue4237
commit
04aeb6938b
|
@ -14,6 +14,7 @@
|
|||
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/pg_version_constants.h"
|
||||
|
||||
static DistributeObjectOps NoDistributeOps = {
|
||||
.deparse = NULL,
|
||||
|
@ -414,6 +415,29 @@ static DistributeObjectOps Schema_Rename = {
|
|||
.postprocess = NULL,
|
||||
.address = AlterSchemaRenameStmtObjectAddress,
|
||||
};
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
static DistributeObjectOps Statistics_Alter = {
|
||||
.deparse = DeparseAlterStatisticsStmt,
|
||||
.qualify = QualifyAlterStatisticsStmt,
|
||||
.preprocess = PreprocessAlterStatisticsStmt,
|
||||
.postprocess = NULL,
|
||||
.address = NULL,
|
||||
};
|
||||
#endif
|
||||
static DistributeObjectOps Statistics_AlterObjectSchema = {
|
||||
.deparse = DeparseAlterStatisticsSchemaStmt,
|
||||
.qualify = QualifyAlterStatisticsSchemaStmt,
|
||||
.preprocess = PreprocessAlterStatisticsSchemaStmt,
|
||||
.postprocess = PostprocessAlterStatisticsSchemaStmt,
|
||||
.address = AlterStatisticsSchemaStmtObjectAddress,
|
||||
};
|
||||
static DistributeObjectOps Statistics_AlterOwner = {
|
||||
.deparse = DeparseAlterStatisticsOwnerStmt,
|
||||
.qualify = QualifyAlterStatisticsOwnerStmt,
|
||||
.preprocess = PreprocessAlterStatisticsOwnerStmt,
|
||||
.postprocess = NULL,
|
||||
.address = NULL,
|
||||
};
|
||||
static DistributeObjectOps Statistics_Drop = {
|
||||
.deparse = NULL,
|
||||
.qualify = QualifyDropStatisticsStmt,
|
||||
|
@ -421,6 +445,13 @@ static DistributeObjectOps Statistics_Drop = {
|
|||
.postprocess = NULL,
|
||||
.address = NULL,
|
||||
};
|
||||
static DistributeObjectOps Statistics_Rename = {
|
||||
.deparse = DeparseAlterStatisticsRenameStmt,
|
||||
.qualify = QualifyAlterStatisticsRenameStmt,
|
||||
.preprocess = PreprocessAlterStatisticsRenameStmt,
|
||||
.postprocess = NULL,
|
||||
.address = NULL,
|
||||
};
|
||||
static DistributeObjectOps Table_AlterTable = {
|
||||
.deparse = NULL,
|
||||
.qualify = NULL,
|
||||
|
@ -590,6 +621,11 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Routine_AlterObjectSchema;
|
||||
}
|
||||
|
||||
case OBJECT_STATISTIC_EXT:
|
||||
{
|
||||
return &Statistics_AlterObjectSchema;
|
||||
}
|
||||
|
||||
case OBJECT_TABLE:
|
||||
{
|
||||
return &Table_AlterObjectSchema;
|
||||
|
@ -637,6 +673,11 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Routine_AlterOwner;
|
||||
}
|
||||
|
||||
case OBJECT_STATISTIC_EXT:
|
||||
{
|
||||
return &Statistics_AlterOwner;
|
||||
}
|
||||
|
||||
case OBJECT_TYPE:
|
||||
{
|
||||
return &Type_AlterOwner;
|
||||
|
@ -664,6 +705,13 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Any_AlterRoleSet;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
case T_AlterStatsStmt:
|
||||
{
|
||||
return &Statistics_Alter;
|
||||
}
|
||||
|
||||
#endif
|
||||
case T_AlterTableStmt:
|
||||
{
|
||||
AlterTableStmt *stmt = castNode(AlterTableStmt, node);
|
||||
|
@ -907,6 +955,11 @@ GetDistributeObjectOps(Node *node)
|
|||
return &Schema_Rename;
|
||||
}
|
||||
|
||||
case OBJECT_STATISTIC_EXT:
|
||||
{
|
||||
return &Statistics_Rename;
|
||||
}
|
||||
|
||||
case OBJECT_TYPE:
|
||||
{
|
||||
return &Type_Rename;
|
||||
|
|
|
@ -1138,12 +1138,16 @@ TriggerSyncMetadataToPrimaryNodes(void)
|
|||
|
||||
triggerMetadataSync = true;
|
||||
}
|
||||
else if (!workerNode->metadataSynced)
|
||||
{
|
||||
triggerMetadataSync = true;
|
||||
}
|
||||
}
|
||||
|
||||
/* let the maintanince deamon know about the metadata sync */
|
||||
if (triggerMetadataSync)
|
||||
{
|
||||
TriggerMetadataSync(MyDatabaseId);
|
||||
TriggerMetadataSyncOnCommit();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,6 +37,7 @@
|
|||
#include "distributed/relation_access_tracking.h"
|
||||
#include "distributed/resource_lock.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "miscadmin.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
@ -45,7 +46,10 @@
|
|||
|
||||
static List * GetExplicitStatisticsIdList(Oid relationId);
|
||||
static Oid GetRelIdByStatsOid(Oid statsOid);
|
||||
|
||||
static char * CreateAlterCommandIfOwnerNotDefault(Oid statsOid);
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
static char * CreateAlterCommandIfTargetNotDefault(Oid statsOid);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* PreprocessCreateStatisticsStmt is called during the planning phase for
|
||||
|
@ -190,6 +194,213 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterStatisticsRenameStmt is called during the planning phase for
|
||||
* ALTER STATISTICS RENAME.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString)
|
||||
{
|
||||
RenameStmt *renameStmt = castNode(RenameStmt, node);
|
||||
Assert(renameStmt->renameType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
Oid statsOid = get_statistics_object_oid((List *) renameStmt->object, false);
|
||||
Oid relationId = GetRelIdByStatsOid(statsOid);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
QualifyTreeNode((Node *) renameStmt);
|
||||
|
||||
char *ddlCommand = DeparseTreeNode((Node *) renameStmt);
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->concurrentIndexCmd = false;
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->commandString = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
||||
List *ddlJobs = list_make1(ddlJob);
|
||||
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PreprocessAlterStatisticsSchemaStmt is called during the planning phase for
|
||||
* ALTER STATISTICS SET SCHEMA.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
Oid statsOid = get_statistics_object_oid((List *) stmt->object, false);
|
||||
Oid relationId = GetRelIdByStatsOid(statsOid);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
QualifyTreeNode((Node *) stmt);
|
||||
|
||||
char *ddlCommand = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->concurrentIndexCmd = false;
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->commandString = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
||||
List *ddlJobs = list_make1(ddlJob);
|
||||
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PostprocessAlterStatisticsSchemaStmt is called after a ALTER STATISTICS SCHEMA
|
||||
* command has been executed by standard process utility.
|
||||
*/
|
||||
List *
|
||||
PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
Value *statName = llast((List *) stmt->object);
|
||||
Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema),
|
||||
statName), false);
|
||||
Oid relationId = GetRelIdByStatsOid(statsOid);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
bool missingOk = false;
|
||||
ObjectAddress objectAddress = GetObjectAddressFromParseTree((Node *) stmt, missingOk);
|
||||
|
||||
EnsureDependenciesExistOnAllNodes(&objectAddress);
|
||||
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AlterStatisticsSchemaStmtObjectAddress finds the ObjectAddress for the statistics
|
||||
* that is altered by given AlterObjectSchemaStmt. If missingOk is false and if
|
||||
* the statistics does not exist, then it errors out.
|
||||
*
|
||||
* Never returns NULL, but the objid in the address can be invalid if missingOk
|
||||
* was set to true.
|
||||
*/
|
||||
ObjectAddress
|
||||
AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
|
||||
ObjectAddress address = { 0 };
|
||||
Value *statName = llast((List *) stmt->object);
|
||||
Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema),
|
||||
statName), missingOk);
|
||||
ObjectAddressSet(address, StatisticExtRelationId, statsOid);
|
||||
|
||||
return address;
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
||||
/*
|
||||
* PreprocessAlterStatisticsStmt is called during the planning phase for
|
||||
* ALTER STATISTICS .. SET STATISTICS.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsStmt(Node *node, const char *queryString)
|
||||
{
|
||||
AlterStatsStmt *stmt = castNode(AlterStatsStmt, node);
|
||||
|
||||
Oid statsOid = get_statistics_object_oid(stmt->defnames, false);
|
||||
Oid relationId = GetRelIdByStatsOid(statsOid);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
QualifyTreeNode((Node *) stmt);
|
||||
|
||||
char *ddlCommand = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->concurrentIndexCmd = false;
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->commandString = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
||||
List *ddlJobs = list_make1(ddlJob);
|
||||
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* PreprocessAlterStatisticsOwnerStmt is called during the planning phase for
|
||||
* ALTER STATISTICS .. OWNER TO.
|
||||
*/
|
||||
List *
|
||||
PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
Oid statsOid = get_statistics_object_oid((List *) stmt->object, false);
|
||||
Oid relationId = GetRelIdByStatsOid(statsOid);
|
||||
|
||||
if (!IsCitusTable(relationId) || !ShouldPropagate())
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
EnsureCoordinator();
|
||||
|
||||
QualifyTreeNode((Node *) stmt);
|
||||
|
||||
char *ddlCommand = DeparseTreeNode((Node *) stmt);
|
||||
|
||||
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
|
||||
|
||||
ddlJob->targetRelationId = relationId;
|
||||
ddlJob->concurrentIndexCmd = false;
|
||||
ddlJob->startNewTransaction = false;
|
||||
ddlJob->commandString = ddlCommand;
|
||||
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
|
||||
|
||||
List *ddlJobs = list_make1(ddlJob);
|
||||
|
||||
return ddlJobs;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetExplicitStatisticsCommandList returns the list of DDL commands to create
|
||||
* statistics that are explicitly created for the table with relationId. See
|
||||
|
@ -199,6 +410,7 @@ List *
|
|||
GetExplicitStatisticsCommandList(Oid relationId)
|
||||
{
|
||||
List *createStatisticsCommandList = NIL;
|
||||
List *alterStatisticsCommandList = NIL;
|
||||
|
||||
PushOverrideEmptySearchPath(CurrentMemoryContext);
|
||||
|
||||
|
@ -207,16 +419,44 @@ GetExplicitStatisticsCommandList(Oid relationId)
|
|||
Oid statisticsId = InvalidOid;
|
||||
foreach_oid(statisticsId, statisticsIdList)
|
||||
{
|
||||
char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId, false);
|
||||
char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId,
|
||||
false);
|
||||
|
||||
createStatisticsCommandList = lappend(
|
||||
createStatisticsCommandList,
|
||||
createStatisticsCommandList =
|
||||
lappend(createStatisticsCommandList,
|
||||
makeTableDDLCommandString(createStatisticsCommand));
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
||||
/* we need to alter stats' target if it's getting distributed after creation */
|
||||
char *alterStatisticsTargetCommand =
|
||||
CreateAlterCommandIfTargetNotDefault(statisticsId);
|
||||
|
||||
if (alterStatisticsTargetCommand != NULL)
|
||||
{
|
||||
alterStatisticsCommandList =
|
||||
lappend(alterStatisticsCommandList,
|
||||
makeTableDDLCommandString(alterStatisticsTargetCommand));
|
||||
}
|
||||
#endif
|
||||
|
||||
/* we need to alter stats' owner if it's getting distributed after creation */
|
||||
char *alterStatisticsOwnerCommand =
|
||||
CreateAlterCommandIfOwnerNotDefault(statisticsId);
|
||||
|
||||
if (alterStatisticsOwnerCommand != NULL)
|
||||
{
|
||||
alterStatisticsCommandList =
|
||||
lappend(alterStatisticsCommandList,
|
||||
makeTableDDLCommandString(alterStatisticsOwnerCommand));
|
||||
}
|
||||
}
|
||||
|
||||
/* revert back to original search_path */
|
||||
PopOverrideSearchPath();
|
||||
|
||||
createStatisticsCommandList = list_concat(createStatisticsCommandList,
|
||||
alterStatisticsCommandList);
|
||||
|
||||
return createStatisticsCommandList;
|
||||
}
|
||||
|
||||
|
@ -332,3 +572,84 @@ GetRelIdByStatsOid(Oid statsOid)
|
|||
|
||||
return statisticsForm->stxrelid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateAlterCommandIfOwnerNotDefault returns an ALTER STATISTICS .. OWNER TO
|
||||
* command if the stats object with given id has an owner different than the default one.
|
||||
* Returns NULL otherwise.
|
||||
*/
|
||||
static char *
|
||||
CreateAlterCommandIfOwnerNotDefault(Oid statsOid)
|
||||
{
|
||||
HeapTuple tup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statsOid));
|
||||
|
||||
if (!HeapTupleIsValid(tup))
|
||||
{
|
||||
ereport(WARNING, (errmsg("No stats object found with id: %u", statsOid)));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup);
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
if (statisticsForm->stxowner == GetUserId())
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
char *schemaName = get_namespace_name(statisticsForm->stxnamespace);
|
||||
char *statName = NameStr(statisticsForm->stxname);
|
||||
char *ownerName = GetUserNameFromId(statisticsForm->stxowner, false);
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
appendStringInfo(&str, "ALTER STATISTICS %s OWNER TO %s",
|
||||
NameListToQuotedString(list_make2(makeString(schemaName),
|
||||
makeString(statName))),
|
||||
quote_identifier(ownerName));
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
||||
/*
|
||||
* CreateAlterCommandIfTargetNotDefault returns an ALTER STATISTICS .. SET STATISTICS
|
||||
* command if the stats object with given id has a target different than the default one.
|
||||
* Returns NULL otherwise.
|
||||
*/
|
||||
static char *
|
||||
CreateAlterCommandIfTargetNotDefault(Oid statsOid)
|
||||
{
|
||||
HeapTuple tup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statsOid));
|
||||
|
||||
if (!HeapTupleIsValid(tup))
|
||||
{
|
||||
ereport(WARNING, (errmsg("No stats object found with id: %u", statsOid)));
|
||||
return NULL;
|
||||
}
|
||||
|
||||
Form_pg_statistic_ext statisticsForm = (Form_pg_statistic_ext) GETSTRUCT(tup);
|
||||
ReleaseSysCache(tup);
|
||||
|
||||
if (statisticsForm->stxstattarget == -1)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
AlterStatsStmt *alterStatsStmt = makeNode(AlterStatsStmt);
|
||||
|
||||
char *schemaName = get_namespace_name(statisticsForm->stxnamespace);
|
||||
char *statName = NameStr(statisticsForm->stxname);
|
||||
|
||||
alterStatsStmt->stxstattarget = statisticsForm->stxstattarget;
|
||||
alterStatsStmt->defnames = list_make2(makeString(schemaName), makeString(statName));
|
||||
|
||||
return DeparseAlterStatisticsStmt((Node *) alterStatsStmt);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
@ -22,6 +22,12 @@
|
|||
|
||||
static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt);
|
||||
static void AppendDropStatisticsStmt(StringInfo buf, List *nameList, bool ifExists);
|
||||
static void AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt);
|
||||
static void AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt);
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
static void AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt);
|
||||
#endif
|
||||
static void AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt);
|
||||
static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt);
|
||||
static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt);
|
||||
static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt);
|
||||
|
@ -53,6 +59,65 @@ DeparseDropStatisticsStmt(List *nameList, bool ifExists)
|
|||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterStatisticsRenameStmt(Node *node)
|
||||
{
|
||||
RenameStmt *stmt = castNode(RenameStmt, node);
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
AppendAlterStatisticsRenameStmt(&str, stmt);
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
DeparseAlterStatisticsSchemaStmt(Node *node)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
AppendAlterStatisticsSchemaStmt(&str, stmt);
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
char *
|
||||
DeparseAlterStatisticsStmt(Node *node)
|
||||
{
|
||||
AlterStatsStmt *stmt = castNode(AlterStatsStmt, node);
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
AppendAlterStatisticsStmt(&str, stmt);
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
char *
|
||||
DeparseAlterStatisticsOwnerStmt(Node *node)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
StringInfoData str;
|
||||
initStringInfo(&str);
|
||||
|
||||
AppendAlterStatisticsOwnerStmt(&str, stmt);
|
||||
|
||||
return str.data;
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt)
|
||||
{
|
||||
|
@ -74,8 +139,6 @@ AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt)
|
|||
appendStringInfoString(buf, " FROM ");
|
||||
|
||||
AppendTableName(buf, stmt);
|
||||
|
||||
appendStringInfoString(buf, ";");
|
||||
}
|
||||
|
||||
|
||||
|
@ -93,6 +156,44 @@ AppendDropStatisticsStmt(StringInfo buf, List *nameList, bool ifExists)
|
|||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendAlterStatisticsRenameStmt(StringInfo buf, RenameStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s RENAME TO %s",
|
||||
NameListToQuotedString((List *) stmt->object), quote_identifier(
|
||||
stmt->newname));
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendAlterStatisticsSchemaStmt(StringInfo buf, AlterObjectSchemaStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s SET SCHEMA %s",
|
||||
NameListToQuotedString((List *) stmt->object), quote_identifier(
|
||||
stmt->newschema));
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
static void
|
||||
AppendAlterStatisticsStmt(StringInfo buf, AlterStatsStmt *stmt)
|
||||
{
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s SET STATISTICS %d", NameListToQuotedString(
|
||||
stmt->defnames), stmt->stxstattarget);
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
static void
|
||||
AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
|
||||
{
|
||||
List *names = (List *) stmt->object;
|
||||
appendStringInfo(buf, "ALTER STATISTICS %s OWNER TO %s",
|
||||
NameListToQuotedString(names),
|
||||
RoleSpecString(stmt->newowner, true));
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt)
|
||||
{
|
||||
|
|
|
@ -78,3 +78,89 @@ QualifyDropStatisticsStmt(Node *node)
|
|||
|
||||
dropStatisticsStmt->objects = objectNameListWithSchema;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QualifyAlterStatisticsRenameStmt qualifies RenameStmt's with schema name for
|
||||
* ALTER STATISTICS RENAME statements.
|
||||
*/
|
||||
void
|
||||
QualifyAlterStatisticsRenameStmt(Node *node)
|
||||
{
|
||||
RenameStmt *renameStmt = castNode(RenameStmt, node);
|
||||
Assert(renameStmt->renameType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
List *nameList = (List *) renameStmt->object;
|
||||
if (list_length(nameList) == 1)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList(nameList);
|
||||
Oid schemaOid = RangeVarGetCreationNamespace(stat);
|
||||
stat->schemaname = get_namespace_name(schemaOid);
|
||||
renameStmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* QualifyAlterStatisticsSchemaStmt qualifies AlterObjectSchemaStmt's with schema name for
|
||||
* ALTER STATISTICS RENAME statements.
|
||||
*/
|
||||
void
|
||||
QualifyAlterStatisticsSchemaStmt(Node *node)
|
||||
{
|
||||
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
List *nameList = (List *) stmt->object;
|
||||
if (list_length(nameList) == 1)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList(nameList);
|
||||
Oid schemaOid = RangeVarGetCreationNamespace(stat);
|
||||
stat->schemaname = get_namespace_name(schemaOid);
|
||||
stmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
|
||||
/*
|
||||
* QualifyAlterStatisticsStmt qualifies AlterStatsStmt's with schema name for
|
||||
* ALTER STATISTICS .. SET STATISTICS statements.
|
||||
*/
|
||||
void
|
||||
QualifyAlterStatisticsStmt(Node *node)
|
||||
{
|
||||
AlterStatsStmt *stmt = castNode(AlterStatsStmt, node);
|
||||
|
||||
if (list_length(stmt->defnames) == 1)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList(stmt->defnames);
|
||||
Oid schemaOid = RangeVarGetCreationNamespace(stat);
|
||||
stat->schemaname = get_namespace_name(schemaOid);
|
||||
stmt->defnames = MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* QualifyAlterStatisticsStmt qualifies AlterOwnerStmt's with schema name for
|
||||
* ALTER STATISTICS .. OWNER TO statements.
|
||||
*/
|
||||
void
|
||||
QualifyAlterStatisticsOwnerStmt(Node *node)
|
||||
{
|
||||
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
|
||||
Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
List *nameList = (List *) stmt->object;
|
||||
if (list_length(nameList) == 1)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList(nameList);
|
||||
Oid schemaOid = RangeVarGetCreationNamespace(stat);
|
||||
stat->schemaname = get_namespace_name(schemaOid);
|
||||
stmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include <signal.h>
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
|
@ -28,6 +29,7 @@
|
|||
#include "catalog/pg_foreign_server.h"
|
||||
#include "catalog/pg_namespace.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/async.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/deparser.h"
|
||||
|
@ -35,6 +37,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
|
@ -48,11 +51,15 @@
|
|||
#include "foreign/foreign.h"
|
||||
#include "miscadmin.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "pgstat.h"
|
||||
#include "postmaster/bgworker.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
#include "storage/lmgr.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/fmgroids.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
#include "utils/snapmgr.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
|
@ -76,11 +83,18 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
|
|||
char *permission,
|
||||
bool withGrantOption);
|
||||
static char * GenerateSetRoleQuery(Oid roleOid);
|
||||
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
|
||||
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
|
||||
|
||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
||||
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
||||
PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
|
||||
|
||||
static bool got_SIGTERM = false;
|
||||
static bool got_SIGALRM = false;
|
||||
|
||||
#define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon"
|
||||
|
||||
|
||||
/*
|
||||
* start_metadata_sync_to_node function sets hasmetadata column of the given
|
||||
|
@ -1497,7 +1511,7 @@ DetachPartitionCommandList(void)
|
|||
* metadata workers that are out of sync. Returns the result of
|
||||
* synchronization.
|
||||
*/
|
||||
MetadataSyncResult
|
||||
static MetadataSyncResult
|
||||
SyncMetadataToNodes(void)
|
||||
{
|
||||
MetadataSyncResult result = METADATA_SYNC_SUCCESS;
|
||||
|
@ -1527,6 +1541,9 @@ SyncMetadataToNodes(void)
|
|||
|
||||
if (!SyncMetadataSnapshotToNode(workerNode, raiseInterrupts))
|
||||
{
|
||||
ereport(WARNING, (errmsg("failed to sync metadata to %s:%d",
|
||||
workerNode->workerName,
|
||||
workerNode->workerPort)));
|
||||
result = METADATA_SYNC_FAILED_SYNC;
|
||||
}
|
||||
else
|
||||
|
@ -1539,3 +1556,244 @@ SyncMetadataToNodes(void)
|
|||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncMetadataToNodesMain is the main function for syncing metadata to
|
||||
* MX nodes. It retries until success and then exits.
|
||||
*/
|
||||
void
|
||||
SyncMetadataToNodesMain(Datum main_arg)
|
||||
{
|
||||
Oid databaseOid = DatumGetObjectId(main_arg);
|
||||
|
||||
/* extension owner is passed via bgw_extra */
|
||||
Oid extensionOwner = InvalidOid;
|
||||
memcpy_s(&extensionOwner, sizeof(extensionOwner),
|
||||
MyBgworkerEntry->bgw_extra, sizeof(Oid));
|
||||
|
||||
pqsignal(SIGTERM, MetadataSyncSigTermHandler);
|
||||
pqsignal(SIGALRM, MetadataSyncSigAlrmHandler);
|
||||
BackgroundWorkerUnblockSignals();
|
||||
|
||||
/* connect to database, after that we can actually access catalogs */
|
||||
BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0);
|
||||
|
||||
/* make worker recognizable in pg_stat_activity */
|
||||
pgstat_report_appname(METADATA_SYNC_APP_NAME);
|
||||
|
||||
bool syncedAllNodes = false;
|
||||
|
||||
while (!syncedAllNodes)
|
||||
{
|
||||
InvalidateMetadataSystemCache();
|
||||
StartTransactionCommand();
|
||||
|
||||
/*
|
||||
* Some functions in ruleutils.c, which we use to get the DDL for
|
||||
* metadata propagation, require an active snapshot.
|
||||
*/
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
if (!LockCitusExtension())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||
"skipping metadata sync")));
|
||||
}
|
||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||
{
|
||||
UseCoordinatedTransaction();
|
||||
MetadataSyncResult result = SyncMetadataToNodes();
|
||||
|
||||
syncedAllNodes = (result == METADATA_SYNC_SUCCESS);
|
||||
|
||||
/* we use LISTEN/NOTIFY to wait for metadata syncing in tests */
|
||||
if (result != METADATA_SYNC_FAILED_LOCK)
|
||||
{
|
||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
ProcessCompletedNotifies();
|
||||
|
||||
if (syncedAllNodes)
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* If backend is cancelled (e.g. bacause of distributed deadlock),
|
||||
* CHECK_FOR_INTERRUPTS() will raise a cancellation error which will
|
||||
* result in exit(1).
|
||||
*/
|
||||
CHECK_FOR_INTERRUPTS();
|
||||
|
||||
/*
|
||||
* SIGTERM is used for when maintenance daemon tries to clean-up
|
||||
* metadata sync daemons spawned by terminated maintenance daemons.
|
||||
*/
|
||||
if (got_SIGTERM)
|
||||
{
|
||||
exit(0);
|
||||
}
|
||||
|
||||
/*
|
||||
* SIGALRM is used for testing purposes and it simulates an error in metadata
|
||||
* sync daemon.
|
||||
*/
|
||||
if (got_SIGALRM)
|
||||
{
|
||||
elog(ERROR, "Error in metadata sync daemon");
|
||||
}
|
||||
|
||||
pg_usleep(MetadataSyncRetryInterval * 1000);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataSyncSigTermHandler set a flag to request termination of metadata
|
||||
* sync daemon.
|
||||
*/
|
||||
static void
|
||||
MetadataSyncSigTermHandler(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_SIGTERM = true;
|
||||
if (MyProc != NULL)
|
||||
{
|
||||
SetLatch(&MyProc->procLatch);
|
||||
}
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataSyncSigAlrmHandler set a flag to request error at metadata
|
||||
* sync daemon. This is used for testing purposes.
|
||||
*/
|
||||
static void
|
||||
MetadataSyncSigAlrmHandler(SIGNAL_ARGS)
|
||||
{
|
||||
int save_errno = errno;
|
||||
|
||||
got_SIGALRM = true;
|
||||
if (MyProc != NULL)
|
||||
{
|
||||
SetLatch(&MyProc->procLatch);
|
||||
}
|
||||
|
||||
errno = save_errno;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SpawnSyncMetadataToNodes starts a background worker which runs metadata
|
||||
* sync. On success it returns workers' handle. Otherwise it returns NULL.
|
||||
*/
|
||||
BackgroundWorkerHandle *
|
||||
SpawnSyncMetadataToNodes(Oid database, Oid extensionOwner)
|
||||
{
|
||||
BackgroundWorker worker;
|
||||
BackgroundWorkerHandle *handle = NULL;
|
||||
|
||||
/* Configure a worker. */
|
||||
memset(&worker, 0, sizeof(worker));
|
||||
SafeSnprintf(worker.bgw_name, BGW_MAXLEN,
|
||||
"Citus Metadata Sync: %u/%u",
|
||||
database, extensionOwner);
|
||||
worker.bgw_flags =
|
||||
BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
|
||||
worker.bgw_start_time = BgWorkerStart_ConsistentState;
|
||||
|
||||
/* don't restart, we manage restarts from maintenance daemon */
|
||||
worker.bgw_restart_time = BGW_NEVER_RESTART;
|
||||
strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus");
|
||||
strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name),
|
||||
"SyncMetadataToNodesMain");
|
||||
worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId);
|
||||
memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner,
|
||||
sizeof(Oid));
|
||||
worker.bgw_notify_pid = MyProcPid;
|
||||
|
||||
if (!RegisterDynamicBackgroundWorker(&worker, &handle))
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pid_t pid;
|
||||
WaitForBackgroundWorkerStartup(handle, &pid);
|
||||
|
||||
return handle;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SignalMetadataSyncDaemon signals metadata sync daemons belonging to
|
||||
* the given database.
|
||||
*/
|
||||
void
|
||||
SignalMetadataSyncDaemon(Oid database, int sig)
|
||||
{
|
||||
int backendCount = pgstat_fetch_stat_numbackends();
|
||||
for (int backend = 1; backend <= backendCount; backend++)
|
||||
{
|
||||
LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend);
|
||||
if (!localBeEntry)
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
PgBackendStatus *beStatus = &localBeEntry->backendStatus;
|
||||
if (beStatus->st_databaseid == database &&
|
||||
strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0)
|
||||
{
|
||||
kill(beStatus->st_procpid, sig);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated.
|
||||
* It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the
|
||||
* check.
|
||||
*/
|
||||
bool
|
||||
ShouldInitiateMetadataSync(bool *lockFailure)
|
||||
{
|
||||
if (!IsCoordinator())
|
||||
{
|
||||
*lockFailure = false;
|
||||
return false;
|
||||
}
|
||||
|
||||
Oid distNodeOid = DistNodeRelationId();
|
||||
if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock))
|
||||
{
|
||||
*lockFailure = true;
|
||||
return false;
|
||||
}
|
||||
|
||||
bool shouldSyncMetadata = false;
|
||||
|
||||
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerList)
|
||||
{
|
||||
if (workerNode->hasMetadata && !workerNode->metadataSynced)
|
||||
{
|
||||
shouldSyncMetadata = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
UnlockRelationOid(distNodeOid, AccessShareLock);
|
||||
|
||||
*lockFailure = false;
|
||||
return shouldSyncMetadata;
|
||||
}
|
||||
|
|
|
@ -444,7 +444,7 @@ SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
|
|||
{
|
||||
MarkNodeHasMetadata(newWorkerNode->workerName, newWorkerNode->workerPort,
|
||||
true);
|
||||
TriggerMetadataSync(MyDatabaseId);
|
||||
TriggerMetadataSyncOnCommit();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -810,7 +810,7 @@ master_update_node(PG_FUNCTION_ARGS)
|
|||
*/
|
||||
if (UnsetMetadataSyncedForAll())
|
||||
{
|
||||
TriggerMetadataSync(MyDatabaseId);
|
||||
TriggerMetadataSyncOnCommit();
|
||||
}
|
||||
|
||||
if (handle != NULL)
|
||||
|
|
|
@ -82,17 +82,48 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
|
|||
{
|
||||
AlterObjectSchemaStmt *alterObjectSchemaStmt =
|
||||
(AlterObjectSchemaStmt *) parseTree;
|
||||
ObjectType objectType = alterObjectSchemaStmt->objectType;
|
||||
|
||||
if (objectType == OBJECT_STATISTIC_EXT)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList(
|
||||
(List *) alterObjectSchemaStmt->object);
|
||||
|
||||
/* append shard id */
|
||||
AppendShardIdToName(&stat->relname, shardId);
|
||||
|
||||
alterObjectSchemaStmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
char **relationName = &(alterObjectSchemaStmt->relation->relname);
|
||||
char **relationSchemaName = &(alterObjectSchemaStmt->relation->schemaname);
|
||||
char **relationSchemaName =
|
||||
&(alterObjectSchemaStmt->relation->schemaname);
|
||||
|
||||
/* prefix with schema name if it is not added already */
|
||||
SetSchemaNameIfNotExist(relationSchemaName, schemaName);
|
||||
|
||||
/* append shardId to base relation name */
|
||||
AppendShardIdToName(relationName, shardId);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
case T_AlterStatsStmt:
|
||||
{
|
||||
AlterStatsStmt *alterStatsStmt = (AlterStatsStmt *) parseTree;
|
||||
RangeVar *stat = makeRangeVarFromNameList(alterStatsStmt->defnames);
|
||||
|
||||
AppendShardIdToName(&stat->relname, shardId);
|
||||
|
||||
alterStatsStmt->defnames = MakeNameListFromRangeVar(stat);
|
||||
|
||||
break;
|
||||
}
|
||||
#endif
|
||||
|
||||
case T_AlterTableStmt:
|
||||
{
|
||||
/*
|
||||
|
@ -180,6 +211,22 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
|
|||
break;
|
||||
}
|
||||
|
||||
case T_AlterOwnerStmt:
|
||||
{
|
||||
AlterOwnerStmt *alterOwnerStmt = castNode(AlterOwnerStmt, parseTree);
|
||||
|
||||
/* we currently extend names in alter owner statements only for statistics */
|
||||
Assert(alterOwnerStmt->objectType == OBJECT_STATISTIC_EXT);
|
||||
|
||||
RangeVar *stat = makeRangeVarFromNameList((List *) alterOwnerStmt->object);
|
||||
|
||||
AppendShardIdToName(&stat->relname, shardId);
|
||||
|
||||
alterOwnerStmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case T_ClusterStmt:
|
||||
{
|
||||
ClusterStmt *clusterStmt = (ClusterStmt *) parseTree;
|
||||
|
@ -530,6 +577,17 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
|
|||
{
|
||||
RenamePolicyEventExtendNames(renameStmt, schemaName, shardId);
|
||||
}
|
||||
else if (objectType == OBJECT_STATISTIC_EXT)
|
||||
{
|
||||
RangeVar *stat = makeRangeVarFromNameList((List *) renameStmt->object);
|
||||
|
||||
AppendShardIdToName(&stat->relname, shardId);
|
||||
AppendShardIdToName(&renameStmt->newname, shardId);
|
||||
|
||||
SetSchemaNameIfNotExist(&stat->schemaname, schemaName);
|
||||
|
||||
renameStmt->object = (Node *) MakeNameListFromRangeVar(stat);
|
||||
}
|
||||
else
|
||||
{
|
||||
ereport(WARNING, (errmsg("unsafe object type in rename statement"),
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#include "catalog/pg_type.h"
|
||||
#include "distributed/connection_management.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/remote_commands.h"
|
||||
#include "postmaster/postmaster.h"
|
||||
|
@ -28,6 +29,8 @@
|
|||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
|
||||
PG_FUNCTION_INFO_V1(wait_until_metadata_sync);
|
||||
PG_FUNCTION_INFO_V1(trigger_metadata_sync);
|
||||
PG_FUNCTION_INFO_V1(raise_error_in_metadata_sync);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -124,3 +127,26 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
|
|||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* trigger_metadata_sync triggers metadata sync for testing.
|
||||
*/
|
||||
Datum
|
||||
trigger_metadata_sync(PG_FUNCTION_ARGS)
|
||||
{
|
||||
TriggerMetadataSyncOnCommit();
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* raise_error_in_metadata_sync causes metadata sync to raise an error.
|
||||
*/
|
||||
Datum
|
||||
raise_error_in_metadata_sync(PG_FUNCTION_ARGS)
|
||||
{
|
||||
/* metadata sync uses SIGALRM to test errors */
|
||||
SignalMetadataSyncDaemon(MyDatabaseId, SIGALRM);
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/locally_reserved_shared_connections.h"
|
||||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/repartition_join_execution.h"
|
||||
|
@ -102,6 +103,9 @@ bool CoordinatedTransactionUses2PC = false;
|
|||
/* if disabled, distributed statements in a function may run as separate transactions */
|
||||
bool FunctionOpensTransactionBlock = true;
|
||||
|
||||
/* if true, we should trigger metadata sync on commit */
|
||||
bool MetadataSyncOnCommit = false;
|
||||
|
||||
|
||||
/* transaction management functions */
|
||||
static void CoordinatedTransactionCallback(XactEvent event, void *arg);
|
||||
|
@ -262,6 +266,15 @@ CoordinatedTransactionCallback(XactEvent event, void *arg)
|
|||
AfterXactConnectionHandling(true);
|
||||
}
|
||||
|
||||
/*
|
||||
* Changes to catalog tables are now visible to the metadata sync
|
||||
* daemon, so we can trigger metadata sync if necessary.
|
||||
*/
|
||||
if (MetadataSyncOnCommit)
|
||||
{
|
||||
TriggerMetadataSync(MyDatabaseId);
|
||||
}
|
||||
|
||||
ResetGlobalVariables();
|
||||
|
||||
/*
|
||||
|
@ -474,6 +487,7 @@ ResetGlobalVariables()
|
|||
activeSetStmts = NULL;
|
||||
CoordinatedTransactionUses2PC = false;
|
||||
TransactionModifiedNodeMetadata = false;
|
||||
MetadataSyncOnCommit = false;
|
||||
ResetWorkerErrorIndication();
|
||||
}
|
||||
|
||||
|
@ -728,3 +742,15 @@ MaybeExecutingUDF(void)
|
|||
{
|
||||
return ExecutorLevel > 1 || (ExecutorLevel == 1 && PlannerLevel > 0);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TriggerMetadataSyncOnCommit sets a flag to do metadata sync on commit.
|
||||
* This is because new metadata only becomes visible to the metadata sync
|
||||
* daemon after commit happens.
|
||||
*/
|
||||
void
|
||||
TriggerMetadataSyncOnCommit(void)
|
||||
{
|
||||
MetadataSyncOnCommit = true;
|
||||
}
|
||||
|
|
|
@ -118,7 +118,6 @@ static size_t MaintenanceDaemonShmemSize(void);
|
|||
static void MaintenanceDaemonShmemInit(void);
|
||||
static void MaintenanceDaemonShmemExit(int code, Datum arg);
|
||||
static void MaintenanceDaemonErrorContext(void *arg);
|
||||
static bool LockCitusExtension(void);
|
||||
static bool MetadataSyncTriggeredCheckAndReset(MaintenanceDaemonDBData *dbData);
|
||||
static void WarnMaintenanceDaemonNotStarted(void);
|
||||
|
||||
|
@ -291,6 +290,13 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
TimestampTz lastRecoveryTime = 0;
|
||||
TimestampTz nextMetadataSyncTime = 0;
|
||||
|
||||
|
||||
/*
|
||||
* We do metadata sync in a separate background worker. We need its
|
||||
* handle to be able to check its status.
|
||||
*/
|
||||
BackgroundWorkerHandle *metadataSyncBgwHandle = NULL;
|
||||
|
||||
/*
|
||||
* Look up this worker's configuration.
|
||||
*/
|
||||
|
@ -371,6 +377,12 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
/* make worker recognizable in pg_stat_activity */
|
||||
pgstat_report_appname("Citus Maintenance Daemon");
|
||||
|
||||
/*
|
||||
* Terminate orphaned metadata sync daemons spawned from previously terminated
|
||||
* or crashed maintenanced instances.
|
||||
*/
|
||||
SignalMetadataSyncDaemon(databaseOid, SIGTERM);
|
||||
|
||||
/* enter main loop */
|
||||
while (!got_SIGTERM)
|
||||
{
|
||||
|
@ -450,21 +462,42 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
}
|
||||
#endif
|
||||
|
||||
if (!RecoveryInProgress() &&
|
||||
pid_t metadataSyncBgwPid = 0;
|
||||
BgwHandleStatus metadataSyncStatus =
|
||||
metadataSyncBgwHandle != NULL ?
|
||||
GetBackgroundWorkerPid(metadataSyncBgwHandle, &metadataSyncBgwPid) :
|
||||
BGWH_STOPPED;
|
||||
|
||||
if (metadataSyncStatus != BGWH_STOPPED &&
|
||||
GetCurrentTimestamp() >= nextMetadataSyncTime)
|
||||
{
|
||||
/*
|
||||
* Metadata sync is still running, recheck in a short while.
|
||||
*/
|
||||
int nextTimeout = MetadataSyncRetryInterval;
|
||||
nextMetadataSyncTime =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
||||
timeout = Min(timeout, nextTimeout);
|
||||
}
|
||||
else if (!RecoveryInProgress() &&
|
||||
metadataSyncStatus == BGWH_STOPPED &&
|
||||
(MetadataSyncTriggeredCheckAndReset(myDbData) ||
|
||||
GetCurrentTimestamp() >= nextMetadataSyncTime))
|
||||
{
|
||||
bool metadataSyncFailed = false;
|
||||
if (metadataSyncBgwHandle)
|
||||
{
|
||||
TerminateBackgroundWorker(metadataSyncBgwHandle);
|
||||
pfree(metadataSyncBgwHandle);
|
||||
metadataSyncBgwHandle = NULL;
|
||||
}
|
||||
|
||||
InvalidateMetadataSystemCache();
|
||||
StartTransactionCommand();
|
||||
|
||||
/*
|
||||
* Some functions in ruleutils.c, which we use to get the DDL for
|
||||
* metadata propagation, require an active snapshot.
|
||||
*/
|
||||
PushActiveSnapshot(GetTransactionSnapshot());
|
||||
|
||||
int nextTimeout = MetadataSyncRetryInterval;
|
||||
bool syncMetadata = false;
|
||||
|
||||
if (!LockCitusExtension())
|
||||
{
|
||||
ereport(DEBUG1, (errmsg("could not lock the citus extension, "
|
||||
|
@ -472,25 +505,28 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
}
|
||||
else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded())
|
||||
{
|
||||
MetadataSyncResult result = SyncMetadataToNodes();
|
||||
metadataSyncFailed = (result != METADATA_SYNC_SUCCESS);
|
||||
bool lockFailure = false;
|
||||
syncMetadata = ShouldInitiateMetadataSync(&lockFailure);
|
||||
|
||||
/*
|
||||
* Notification means we had an attempt on synchronization
|
||||
* without being blocked for pg_dist_node access.
|
||||
* If lock fails, we need to recheck in a short while. If we are
|
||||
* going to sync metadata, we should recheck in a short while to
|
||||
* see if it failed. Otherwise, we can wait longer.
|
||||
*/
|
||||
if (result != METADATA_SYNC_FAILED_LOCK)
|
||||
{
|
||||
Async_Notify(METADATA_SYNC_CHANNEL, NULL);
|
||||
}
|
||||
nextTimeout = (lockFailure || syncMetadata) ?
|
||||
MetadataSyncRetryInterval :
|
||||
MetadataSyncInterval;
|
||||
}
|
||||
|
||||
PopActiveSnapshot();
|
||||
CommitTransactionCommand();
|
||||
ProcessCompletedNotifies();
|
||||
|
||||
int64 nextTimeout = metadataSyncFailed ? MetadataSyncRetryInterval :
|
||||
MetadataSyncInterval;
|
||||
if (syncMetadata)
|
||||
{
|
||||
metadataSyncBgwHandle =
|
||||
SpawnSyncMetadataToNodes(MyDatabaseId, myDbData->userOid);
|
||||
}
|
||||
|
||||
nextMetadataSyncTime =
|
||||
TimestampTzPlusMilliseconds(GetCurrentTimestamp(), nextTimeout);
|
||||
timeout = Min(timeout, nextTimeout);
|
||||
|
@ -626,6 +662,11 @@ CitusMaintenanceDaemonMain(Datum main_arg)
|
|||
ProcessConfigFile(PGC_SIGHUP);
|
||||
}
|
||||
}
|
||||
|
||||
if (metadataSyncBgwHandle)
|
||||
{
|
||||
TerminateBackgroundWorker(metadataSyncBgwHandle);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -786,7 +827,7 @@ MaintenanceDaemonErrorContext(void *arg)
|
|||
* LockCitusExtension acquires a lock on the Citus extension or returns
|
||||
* false if the extension does not exist or is being dropped.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
LockCitusExtension(void)
|
||||
{
|
||||
Oid extensionOid = get_extension_oid("citus", true);
|
||||
|
|
|
@ -282,6 +282,12 @@ extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString
|
|||
extern List * PostprocessCreateStatisticsStmt(Node *node, const char *queryString);
|
||||
extern ObjectAddress CreateStatisticsStmtObjectAddress(Node *node, bool missingOk);
|
||||
extern List * PreprocessDropStatisticsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString);
|
||||
extern List * PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString);
|
||||
extern ObjectAddress AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk);
|
||||
extern List * PreprocessAlterStatisticsStmt(Node *node, const char *queryString);
|
||||
extern List * PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString);
|
||||
extern List * GetExplicitStatisticsCommandList(Oid relationId);
|
||||
extern List * GetExplicitStatisticsSchemaIdList(Oid relationId);
|
||||
|
||||
|
|
|
@ -58,9 +58,17 @@ extern char * DeparseAlterSchemaRenameStmt(Node *stmt);
|
|||
/* forward declarations for deparse_statistics_stmts.c */
|
||||
extern char * DeparseCreateStatisticsStmt(Node *node);
|
||||
extern char * DeparseDropStatisticsStmt(List *nameList, bool ifExists);
|
||||
extern char * DeparseAlterStatisticsRenameStmt(Node *node);
|
||||
extern char * DeparseAlterStatisticsSchemaStmt(Node *node);
|
||||
extern char * DeparseAlterStatisticsStmt(Node *node);
|
||||
extern char * DeparseAlterStatisticsOwnerStmt(Node *node);
|
||||
|
||||
extern void QualifyCreateStatisticsStmt(Node *node);
|
||||
extern void QualifyDropStatisticsStmt(Node *node);
|
||||
extern void QualifyAlterStatisticsRenameStmt(Node *node);
|
||||
extern void QualifyAlterStatisticsSchemaStmt(Node *node);
|
||||
extern void QualifyAlterStatisticsStmt(Node *node);
|
||||
extern void QualifyAlterStatisticsOwnerStmt(Node *node);
|
||||
|
||||
/* forward declarations for deparse_type_stmts.c */
|
||||
extern char * DeparseCompositeTypeStmt(Node *stmt);
|
||||
|
|
|
@ -25,6 +25,7 @@ extern void StopMaintenanceDaemon(Oid databaseId);
|
|||
extern void TriggerMetadataSync(Oid databaseId);
|
||||
extern void InitializeMaintenanceDaemon(void);
|
||||
extern void InitializeMaintenanceDaemonBackend(void);
|
||||
extern bool LockCitusExtension(void);
|
||||
|
||||
extern void CitusMaintenanceDaemonMain(Datum main_arg);
|
||||
|
||||
|
|
|
@ -50,11 +50,14 @@ extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int sha
|
|||
extern void CreateTableMetadataOnWorkers(Oid relationId);
|
||||
extern void MarkNodeHasMetadata(const char *nodeName, int32 nodePort, bool hasMetadata);
|
||||
extern void MarkNodeMetadataSynced(const char *nodeName, int32 nodePort, bool synced);
|
||||
extern MetadataSyncResult SyncMetadataToNodes(void);
|
||||
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
|
||||
extern bool SendOptionalCommandListToWorkerInTransaction(const char *nodeName, int32
|
||||
nodePort,
|
||||
const char *nodeUser,
|
||||
List *commandList);
|
||||
extern void SyncMetadataToNodesMain(Datum main_arg);
|
||||
extern void SignalMetadataSyncDaemon(Oid database, int sig);
|
||||
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
|
||||
|
||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
|
||||
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
||||
|
|
|
@ -121,6 +121,7 @@ extern void InitializeTransactionManagement(void);
|
|||
/* other functions */
|
||||
extern List * ActiveSubXactContexts(void);
|
||||
extern StringInfo BeginAndSetDistributedTransactionIdCommand(void);
|
||||
extern void TriggerMetadataSyncOnCommit(void);
|
||||
|
||||
|
||||
#endif /* TRANSACTION_MANAGMENT_H */
|
||||
|
|
|
@ -28,11 +28,11 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
390 389 f
|
||||
395 394 f
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
389
|
||||
390 389
|
||||
394
|
||||
395 394
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
@ -75,14 +75,14 @@ step detector-dump-wait-edges:
|
|||
|
||||
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
|
||||
|
||||
394 393 f
|
||||
395 393 f
|
||||
395 394 t
|
||||
399 398 f
|
||||
400 398 f
|
||||
400 399 t
|
||||
transactionnumberwaitingtransactionnumbers
|
||||
|
||||
393
|
||||
394 393
|
||||
395 393,394
|
||||
398
|
||||
399 398
|
||||
400 398,399
|
||||
step s1-abort:
|
||||
ABORT;
|
||||
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
Parsed test spec with 3 sessions
|
||||
|
||||
starting permutation: enable-deadlock-detection reload-conf s2-start-session-level-connection s1-begin s1-update-1 s2-begin-on-worker s2-update-2-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s2-update-1-on-worker s1-update-2 s1-commit s2-commit-on-worker disable-deadlock-detection reload-conf s2-stop-connection
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step enable-deadlock-detection:
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
|
||||
|
||||
step reload-conf:
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
pg_reload_conf
|
||||
|
||||
t
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-update-1:
|
||||
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-update-2-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-truncate-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s3-invalidate-metadata:
|
||||
update pg_dist_node SET metadatasynced = false;
|
||||
|
||||
step s3-resync:
|
||||
SELECT trigger_metadata_sync();
|
||||
|
||||
trigger_metadata_sync
|
||||
|
||||
|
||||
step s3-wait:
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
pg_sleep
|
||||
|
||||
|
||||
step s2-update-1-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
|
||||
<waiting ...>
|
||||
step s1-update-2:
|
||||
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
|
||||
<waiting ...>
|
||||
step s1-update-2: <... completed>
|
||||
step s2-update-1-on-worker: <... completed>
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
error in steps s1-update-2 s2-update-1-on-worker: ERROR: canceling the transaction since it was involved in a distributed deadlock
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-commit-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step disable-deadlock-detection:
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||
|
||||
step reload-conf:
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
pg_reload_conf
|
||||
|
||||
t
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
||||
|
||||
starting permutation: increase-retry-interval reload-conf s2-start-session-level-connection s2-begin-on-worker s2-truncate-on-worker s3-invalidate-metadata s3-resync s3-wait s1-count-daemons s1-cancel-metadata-sync s1-count-daemons reset-retry-interval reload-conf s2-commit-on-worker s2-stop-connection s3-resync s3-wait
|
||||
create_distributed_table
|
||||
|
||||
|
||||
step increase-retry-interval:
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
|
||||
|
||||
step reload-conf:
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
pg_reload_conf
|
||||
|
||||
t
|
||||
step s2-start-session-level-connection:
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
|
||||
start_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-begin-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-truncate-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s3-invalidate-metadata:
|
||||
update pg_dist_node SET metadatasynced = false;
|
||||
|
||||
step s3-resync:
|
||||
SELECT trigger_metadata_sync();
|
||||
|
||||
trigger_metadata_sync
|
||||
|
||||
|
||||
step s3-wait:
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
pg_sleep
|
||||
|
||||
|
||||
step s1-count-daemons:
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
|
||||
count
|
||||
|
||||
1
|
||||
step s1-cancel-metadata-sync:
|
||||
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
pg_cancel_backend
|
||||
|
||||
t
|
||||
pg_sleep
|
||||
|
||||
|
||||
step s1-count-daemons:
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
|
||||
count
|
||||
|
||||
0
|
||||
step reset-retry-interval:
|
||||
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
|
||||
|
||||
step reload-conf:
|
||||
SELECT pg_reload_conf();
|
||||
|
||||
pg_reload_conf
|
||||
|
||||
t
|
||||
step s2-commit-on-worker:
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
|
||||
run_commands_on_session_level_connection_to_node
|
||||
|
||||
|
||||
step s2-stop-connection:
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
|
||||
stop_session_level_connection_to_node
|
||||
|
||||
|
||||
step s3-resync:
|
||||
SELECT trigger_metadata_sync();
|
||||
|
||||
trigger_metadata_sync
|
||||
|
||||
|
||||
step s3-wait:
|
||||
SELECT pg_sleep(2);
|
||||
|
||||
pg_sleep
|
||||
|
||||
|
||||
restore_isolation_tester_func
|
||||
|
||||
|
|
@ -21,6 +21,27 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
|
|||
master_run_on_worker(ARRAY[hostname], ARRAY[port],
|
||||
ARRAY['SELECT pg_reload_conf()'], false);
|
||||
$$;
|
||||
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
|
||||
declare
|
||||
counter integer := -1;
|
||||
begin
|
||||
while counter != target_count loop
|
||||
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
|
||||
-- Look it up in a new connection to get latest updates.
|
||||
SELECT result::int into counter FROM
|
||||
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
|
||||
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
|
||||
PERFORM pg_sleep(0.1);
|
||||
end loop;
|
||||
end$$ LANGUAGE plpgsql;
|
||||
-- add a node to the cluster
|
||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
@ -152,6 +173,142 @@ SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
|||
2 | t | f
|
||||
(1 row)
|
||||
|
||||
-- verify that metadata sync daemon has started
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- terminate maintenance daemon, and verify that we don't spawn multiple
|
||||
-- metadata sync daemons
|
||||
--
|
||||
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||
pg_terminate_backend
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
|
||||
select trigger_metadata_sync();
|
||||
trigger_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- cancel metadata sync daemon, and verify that it exits and restarts.
|
||||
--
|
||||
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||
pg_cancel_backend
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
select wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
|
||||
metadata_sync_restarted
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- cancel metadata sync daemon so it exits and restarts, but at the
|
||||
-- same time tell maintenanced to trigger a new metadata sync. One
|
||||
-- of these should exit to avoid multiple metadata syncs.
|
||||
--
|
||||
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||
pg_cancel_backend
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
select trigger_metadata_sync();
|
||||
trigger_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select wait_until_metadata_sync();
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
|
||||
select pg_sleep(1.2);
|
||||
pg_sleep
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
--
|
||||
-- error in metadata sync daemon, and verify it exits and restarts.
|
||||
--
|
||||
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select raise_error_in_metadata_sync();
|
||||
raise_error_in_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select wait_until_metadata_sync(30000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
|
||||
metadata_sync_restarted
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
SELECT trigger_metadata_sync();
|
||||
trigger_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT wait_until_metadata_sync(30000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- update it back to :worker_1_port, now metadata should be synced
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
?column?
|
||||
|
@ -594,6 +751,59 @@ SELECT verify_metadata('localhost', :worker_1_port);
|
|||
t
|
||||
(1 row)
|
||||
|
||||
-- verify that metadata sync daemon exits
|
||||
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
|
||||
-- verify that DROP DATABASE terminates metadata sync
|
||||
SELECT current_database() datname \gset
|
||||
CREATE DATABASE db_to_drop;
|
||||
NOTICE: Citus partially supports CREATE DATABASE for distributed databases
|
||||
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"CREATE DATABASE")
|
||||
(localhost,57638,t,"CREATE DATABASE")
|
||||
(2 rows)
|
||||
|
||||
\c db_to_drop - - :worker_1_port
|
||||
CREATE EXTENSION citus;
|
||||
\c db_to_drop - - :master_port
|
||||
CREATE EXTENSION citus;
|
||||
SELECT master_add_node('localhost', :worker_1_port);
|
||||
master_add_node
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
UPDATE pg_dist_node SET hasmetadata = true;
|
||||
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
|
||||
master_update_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
SELECT trigger_metadata_sync();
|
||||
trigger_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c :datname - - :master_port
|
||||
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
datname
|
||||
---------------------------------------------------------------------
|
||||
db_to_drop
|
||||
(1 row)
|
||||
|
||||
DROP DATABASE db_to_drop;
|
||||
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
datname
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- cleanup
|
||||
DROP TABLE ref_table;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
|
|
|
@ -26,7 +26,7 @@ WITH dist_node_summary AS (
|
|||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve
|
||||
\gset
|
||||
\if :server_version_above_twelve
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
CREATE SCHEMA "statistics'TestTarget";
|
||||
SET search_path TO "statistics'TestTarget";
|
||||
SET citus.next_shard_id TO 980000;
|
||||
SET client_min_messages TO WARNING;
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE t1 (a int, b int);
|
||||
CREATE STATISTICS s1 ON a,b FROM t1;
|
||||
CREATE STATISTICS s2 ON a,b FROM t1;
|
||||
CREATE STATISTICS s3 ON a,b FROM t1;
|
||||
CREATE STATISTICS s4 ON a,b FROM t1;
|
||||
-- test altering stats target
|
||||
-- test alter target before distribution
|
||||
ALTER STATISTICS s1 SET STATISTICS 3;
|
||||
-- since max value for target is 10000, this will automatically be lowered
|
||||
ALTER STATISTICS s2 SET STATISTICS 999999;
|
||||
WARNING: lowering statistics target to 10000
|
||||
SELECT create_distributed_table('t1', 'b');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- test alter target before distribution
|
||||
ALTER STATISTICS s3 SET STATISTICS 46;
|
||||
\c - - - :worker_1_port
|
||||
SELECT stxstattarget, stxrelid::regclass
|
||||
FROM pg_statistic_ext
|
||||
WHERE stxnamespace IN (
|
||||
SELECT oid
|
||||
FROM pg_namespace
|
||||
WHERE nspname IN ('statistics''TestTarget')
|
||||
)
|
||||
ORDER BY stxstattarget, stxrelid::regclass ASC;
|
||||
stxstattarget | stxrelid
|
||||
---------------------------------------------------------------------
|
||||
-1 | "statistics'TestTarget".t1_980000
|
||||
-1 | "statistics'TestTarget".t1_980002
|
||||
-1 | "statistics'TestTarget".t1_980004
|
||||
-1 | "statistics'TestTarget".t1_980006
|
||||
-1 | "statistics'TestTarget".t1_980008
|
||||
-1 | "statistics'TestTarget".t1_980010
|
||||
-1 | "statistics'TestTarget".t1_980012
|
||||
-1 | "statistics'TestTarget".t1_980014
|
||||
-1 | "statistics'TestTarget".t1_980016
|
||||
-1 | "statistics'TestTarget".t1_980018
|
||||
-1 | "statistics'TestTarget".t1_980020
|
||||
-1 | "statistics'TestTarget".t1_980022
|
||||
-1 | "statistics'TestTarget".t1_980024
|
||||
-1 | "statistics'TestTarget".t1_980026
|
||||
-1 | "statistics'TestTarget".t1_980028
|
||||
-1 | "statistics'TestTarget".t1_980030
|
||||
3 | "statistics'TestTarget".t1_980000
|
||||
3 | "statistics'TestTarget".t1_980002
|
||||
3 | "statistics'TestTarget".t1_980004
|
||||
3 | "statistics'TestTarget".t1_980006
|
||||
3 | "statistics'TestTarget".t1_980008
|
||||
3 | "statistics'TestTarget".t1_980010
|
||||
3 | "statistics'TestTarget".t1_980012
|
||||
3 | "statistics'TestTarget".t1_980014
|
||||
3 | "statistics'TestTarget".t1_980016
|
||||
3 | "statistics'TestTarget".t1_980018
|
||||
3 | "statistics'TestTarget".t1_980020
|
||||
3 | "statistics'TestTarget".t1_980022
|
||||
3 | "statistics'TestTarget".t1_980024
|
||||
3 | "statistics'TestTarget".t1_980026
|
||||
3 | "statistics'TestTarget".t1_980028
|
||||
3 | "statistics'TestTarget".t1_980030
|
||||
46 | "statistics'TestTarget".t1_980000
|
||||
46 | "statistics'TestTarget".t1_980002
|
||||
46 | "statistics'TestTarget".t1_980004
|
||||
46 | "statistics'TestTarget".t1_980006
|
||||
46 | "statistics'TestTarget".t1_980008
|
||||
46 | "statistics'TestTarget".t1_980010
|
||||
46 | "statistics'TestTarget".t1_980012
|
||||
46 | "statistics'TestTarget".t1_980014
|
||||
46 | "statistics'TestTarget".t1_980016
|
||||
46 | "statistics'TestTarget".t1_980018
|
||||
46 | "statistics'TestTarget".t1_980020
|
||||
46 | "statistics'TestTarget".t1_980022
|
||||
46 | "statistics'TestTarget".t1_980024
|
||||
46 | "statistics'TestTarget".t1_980026
|
||||
46 | "statistics'TestTarget".t1_980028
|
||||
46 | "statistics'TestTarget".t1_980030
|
||||
10000 | "statistics'TestTarget".t1_980000
|
||||
10000 | "statistics'TestTarget".t1_980002
|
||||
10000 | "statistics'TestTarget".t1_980004
|
||||
10000 | "statistics'TestTarget".t1_980006
|
||||
10000 | "statistics'TestTarget".t1_980008
|
||||
10000 | "statistics'TestTarget".t1_980010
|
||||
10000 | "statistics'TestTarget".t1_980012
|
||||
10000 | "statistics'TestTarget".t1_980014
|
||||
10000 | "statistics'TestTarget".t1_980016
|
||||
10000 | "statistics'TestTarget".t1_980018
|
||||
10000 | "statistics'TestTarget".t1_980020
|
||||
10000 | "statistics'TestTarget".t1_980022
|
||||
10000 | "statistics'TestTarget".t1_980024
|
||||
10000 | "statistics'TestTarget".t1_980026
|
||||
10000 | "statistics'TestTarget".t1_980028
|
||||
10000 | "statistics'TestTarget".t1_980030
|
||||
(64 rows)
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA "statistics'TestTarget" CASCADE;
|
|
@ -0,0 +1,6 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve
|
||||
\gset
|
||||
\if :server_version_above_twelve
|
||||
\else
|
||||
\q
|
|
@ -70,6 +70,27 @@ DROP STATISTICS IF EXISTS s3, sc2.s4, s6;
|
|||
DROP STATISTICS s5,s6;
|
||||
ERROR: statistics object "statistics'Test.s6" does not exist
|
||||
DROP STATISTICS IF EXISTS s5,s5,s6,s6;
|
||||
-- test renaming statistics
|
||||
CREATE STATISTICS s6 ON a,b FROM test_stats4;
|
||||
DROP STATISTICS s7;
|
||||
ERROR: statistics object "statistics'Test.s7" does not exist
|
||||
ALTER STATISTICS s6 RENAME TO s7;
|
||||
ALTER STATISTICS sc1.st1 RENAME TO st1_new;
|
||||
-- test altering stats schema
|
||||
CREATE SCHEMA test_alter_schema;
|
||||
ALTER STATISTICS s7 SET SCHEMA test_alter_schema;
|
||||
-- test alter owner
|
||||
ALTER STATISTICS sc2."neW'Stat" OWNER TO pg_monitor;
|
||||
-- test alter owner before distribution
|
||||
CREATE TABLE ownertest(a int, b int);
|
||||
CREATE STATISTICS sc1.s9 ON a,b FROM ownertest;
|
||||
ALTER STATISTICS sc1.s9 OWNER TO pg_signal_backend;
|
||||
SELECT create_distributed_table('ownertest','a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT stxname
|
||||
FROM pg_statistic_ext
|
||||
|
@ -129,23 +150,39 @@ ORDER BY stxname ASC;
|
|||
s2_980058
|
||||
s2_980060
|
||||
s2_980062
|
||||
st1_980064
|
||||
st1_980066
|
||||
st1_980068
|
||||
st1_980070
|
||||
st1_980072
|
||||
st1_980074
|
||||
st1_980076
|
||||
st1_980078
|
||||
st1_980080
|
||||
st1_980082
|
||||
st1_980084
|
||||
st1_980086
|
||||
st1_980088
|
||||
st1_980090
|
||||
st1_980092
|
||||
st1_980094
|
||||
(64 rows)
|
||||
s9_980129
|
||||
s9_980131
|
||||
s9_980133
|
||||
s9_980135
|
||||
s9_980137
|
||||
s9_980139
|
||||
s9_980141
|
||||
s9_980143
|
||||
s9_980145
|
||||
s9_980147
|
||||
s9_980149
|
||||
s9_980151
|
||||
s9_980153
|
||||
s9_980155
|
||||
s9_980157
|
||||
s9_980159
|
||||
st1_new_980064
|
||||
st1_new_980066
|
||||
st1_new_980068
|
||||
st1_new_980070
|
||||
st1_new_980072
|
||||
st1_new_980074
|
||||
st1_new_980076
|
||||
st1_new_980078
|
||||
st1_new_980080
|
||||
st1_new_980082
|
||||
st1_new_980084
|
||||
st1_new_980086
|
||||
st1_new_980088
|
||||
st1_new_980090
|
||||
st1_new_980092
|
||||
st1_new_980094
|
||||
(80 rows)
|
||||
|
||||
SELECT count(DISTINCT stxnamespace)
|
||||
FROM pg_statistic_ext
|
||||
|
@ -159,8 +196,21 @@ WHERE stxnamespace IN (
|
|||
3
|
||||
(1 row)
|
||||
|
||||
SELECT COUNT(DISTINCT stxowner)
|
||||
FROM pg_statistic_ext
|
||||
WHERE stxnamespace IN (
|
||||
SELECT oid
|
||||
FROM pg_namespace
|
||||
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
|
||||
);
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
3
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA "statistics'Test" CASCADE;
|
||||
DROP SCHEMA test_alter_schema CASCADE;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
|
|
|
@ -80,3 +80,4 @@ test: isolation_insert_select_vs_all_on_mx
|
|||
test: isolation_ref_select_for_update_vs_all_on_mx
|
||||
test: isolation_ref_update_delete_upsert_vs_all_on_mx
|
||||
test: isolation_dis2ref_foreign_keys_on_mx
|
||||
test: isolation_metadata_sync_deadlock
|
||||
|
|
|
@ -94,6 +94,7 @@ test: tableam
|
|||
# Tests for statistics propagation
|
||||
# ----------
|
||||
test: propagate_statistics
|
||||
test: pg13_propagate_statistics
|
||||
|
||||
# ----------
|
||||
# Miscellaneous tests to check our query planning behavior
|
||||
|
|
|
@ -0,0 +1,153 @@
|
|||
#include "isolation_mx_common.include.spec"
|
||||
|
||||
setup
|
||||
{
|
||||
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
CREATE OR REPLACE FUNCTION wait_until_metadata_sync(timeout INTEGER DEFAULT 15000)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
CREATE TABLE deadlock_detection_test (user_id int UNIQUE, some_val int);
|
||||
INSERT INTO deadlock_detection_test SELECT i, i FROM generate_series(1,7) i;
|
||||
SELECT create_distributed_table('deadlock_detection_test', 'user_id');
|
||||
|
||||
CREATE TABLE t2(a int);
|
||||
SELECT create_distributed_table('t2', 'a');
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP FUNCTION trigger_metadata_sync();
|
||||
DROP TABLE deadlock_detection_test;
|
||||
DROP TABLE t2;
|
||||
SET citus.shard_replication_factor = 1;
|
||||
SELECT citus_internal.restore_isolation_tester_func();
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "increase-retry-interval"
|
||||
{
|
||||
ALTER SYSTEM SET citus.metadata_sync_retry_interval TO 20000;
|
||||
}
|
||||
|
||||
step "reset-retry-interval"
|
||||
{
|
||||
ALTER SYSTEM RESET citus.metadata_sync_retry_interval;
|
||||
}
|
||||
|
||||
step "enable-deadlock-detection"
|
||||
{
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO 1.1;
|
||||
}
|
||||
|
||||
step "disable-deadlock-detection"
|
||||
{
|
||||
ALTER SYSTEM SET citus.distributed_deadlock_detection_factor TO -1;
|
||||
}
|
||||
|
||||
step "reload-conf"
|
||||
{
|
||||
SELECT pg_reload_conf();
|
||||
}
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-update-1"
|
||||
{
|
||||
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 1;
|
||||
}
|
||||
|
||||
step "s1-update-2"
|
||||
{
|
||||
UPDATE deadlock_detection_test SET some_val = 1 WHERE user_id = 2;
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
step "s1-count-daemons"
|
||||
{
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
}
|
||||
|
||||
step "s1-cancel-metadata-sync"
|
||||
{
|
||||
SELECT pg_cancel_backend(pid) FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
SELECT pg_sleep(2);
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-start-session-level-connection"
|
||||
{
|
||||
SELECT start_session_level_connection_to_node('localhost', 57638);
|
||||
}
|
||||
|
||||
step "s2-stop-connection"
|
||||
{
|
||||
SELECT stop_session_level_connection_to_node();
|
||||
}
|
||||
|
||||
step "s2-begin-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('BEGIN');
|
||||
}
|
||||
|
||||
step "s2-update-1-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 1');
|
||||
}
|
||||
|
||||
step "s2-update-2-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('UPDATE deadlock_detection_test SET some_val = 2 WHERE user_id = 2');
|
||||
}
|
||||
|
||||
step "s2-truncate-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('TRUNCATE t2');
|
||||
}
|
||||
|
||||
step "s2-commit-on-worker"
|
||||
{
|
||||
SELECT run_commands_on_session_level_connection_to_node('COMMIT');
|
||||
}
|
||||
|
||||
session "s3"
|
||||
|
||||
step "s3-invalidate-metadata"
|
||||
{
|
||||
update pg_dist_node SET metadatasynced = false;
|
||||
}
|
||||
|
||||
step "s3-resync"
|
||||
{
|
||||
SELECT trigger_metadata_sync();
|
||||
}
|
||||
|
||||
step "s3-wait"
|
||||
{
|
||||
SELECT pg_sleep(2);
|
||||
}
|
||||
|
||||
// Backends can block metadata sync. The following test verifies that if this happens,
|
||||
// we still do distributed deadlock detection. In the following, s2-truncate-on-worker
|
||||
// causes the concurrent metadata sync to be blocked. But s2 and s1 themselves are
|
||||
// themselves involved in a distributed deadlock.
|
||||
// See https://github.com/citusdata/citus/issues/4393 for more details.
|
||||
permutation "enable-deadlock-detection" "reload-conf" "s2-start-session-level-connection" "s1-begin" "s1-update-1" "s2-begin-on-worker" "s2-update-2-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s2-update-1-on-worker" "s1-update-2" "s1-commit" "s2-commit-on-worker" "disable-deadlock-detection" "reload-conf" "s2-stop-connection"
|
||||
|
||||
// Test that when metadata sync is waiting for locks, cancelling it terminates it.
|
||||
// This is important in cases where the metadata sync daemon itself is involved in a deadlock.
|
||||
permutation "increase-retry-interval" "reload-conf" "s2-start-session-level-connection" "s2-begin-on-worker" "s2-truncate-on-worker" "s3-invalidate-metadata" "s3-resync" "s3-wait" "s1-count-daemons" "s1-cancel-metadata-sync" "s1-count-daemons" "reset-retry-interval" "reload-conf" "s2-commit-on-worker" "s2-stop-connection" "s3-resync" "s3-wait"
|
|
@ -27,6 +27,30 @@ CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLE
|
|||
ARRAY['SELECT pg_reload_conf()'], false);
|
||||
$$;
|
||||
|
||||
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
|
||||
declare
|
||||
counter integer := -1;
|
||||
begin
|
||||
while counter != target_count loop
|
||||
-- pg_stat_activity is cached at xact level and there is no easy way to clear it.
|
||||
-- Look it up in a new connection to get latest updates.
|
||||
SELECT result::int into counter FROM
|
||||
master_run_on_worker(ARRAY['localhost'], ARRAY[57636], ARRAY[
|
||||
'SELECT count(*) FROM pg_stat_activity WHERE application_name = ' || quote_literal(appname) || ';'], false);
|
||||
PERFORM pg_sleep(0.1);
|
||||
end loop;
|
||||
end$$ LANGUAGE plpgsql;
|
||||
|
||||
-- add a node to the cluster
|
||||
SELECT master_add_node('localhost', :worker_1_port) As nodeid_1 \gset
|
||||
SELECT nodeid, nodename, nodeport, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
@ -79,6 +103,54 @@ END;
|
|||
SELECT wait_until_metadata_sync(30000);
|
||||
SELECT nodeid, hasmetadata, metadatasynced FROM pg_dist_node;
|
||||
|
||||
-- verify that metadata sync daemon has started
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
|
||||
--
|
||||
-- terminate maintenance daemon, and verify that we don't spawn multiple
|
||||
-- metadata sync daemons
|
||||
--
|
||||
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE application_name = 'Citus Maintenance Daemon';
|
||||
CALL wait_until_process_count('Citus Maintenance Daemon', 1);
|
||||
select trigger_metadata_sync();
|
||||
select wait_until_metadata_sync();
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
|
||||
--
|
||||
-- cancel metadata sync daemon, and verify that it exits and restarts.
|
||||
--
|
||||
select pid as pid_before_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||
select wait_until_metadata_sync();
|
||||
select pid as pid_after_cancel from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select :pid_before_cancel != :pid_after_cancel AS metadata_sync_restarted;
|
||||
|
||||
--
|
||||
-- cancel metadata sync daemon so it exits and restarts, but at the
|
||||
-- same time tell maintenanced to trigger a new metadata sync. One
|
||||
-- of these should exit to avoid multiple metadata syncs.
|
||||
--
|
||||
select pg_cancel_backend(pid) from pg_stat_activity where application_name = 'Citus Metadata Sync Daemon';
|
||||
select trigger_metadata_sync();
|
||||
select wait_until_metadata_sync();
|
||||
-- we assume citus.metadata_sync_retry_interval is 500ms. Change amount we sleep to ceiling + 0.2 if it changes.
|
||||
select pg_sleep(1.2);
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
|
||||
--
|
||||
-- error in metadata sync daemon, and verify it exits and restarts.
|
||||
--
|
||||
select pid as pid_before_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select raise_error_in_metadata_sync();
|
||||
select wait_until_metadata_sync(30000);
|
||||
select pid as pid_after_error from pg_stat_activity where application_name like 'Citus Met%' \gset
|
||||
select :pid_before_error != :pid_after_error AS metadata_sync_restarted;
|
||||
|
||||
|
||||
SELECT trigger_metadata_sync();
|
||||
SELECT wait_until_metadata_sync(30000);
|
||||
SELECT count(*) FROM pg_stat_activity WHERE application_name = 'Citus Metadata Sync Daemon';
|
||||
|
||||
-- update it back to :worker_1_port, now metadata should be synced
|
||||
SELECT 1 FROM master_update_node(:nodeid_1, 'localhost', :worker_1_port);
|
||||
SELECT wait_until_metadata_sync(30000);
|
||||
|
@ -249,6 +321,39 @@ SELECT 1 FROM master_activate_node('localhost', :worker_2_port);
|
|||
|
||||
SELECT verify_metadata('localhost', :worker_1_port);
|
||||
|
||||
-- verify that metadata sync daemon exits
|
||||
call wait_until_process_count('Citus Metadata Sync Daemon', 0);
|
||||
|
||||
-- verify that DROP DATABASE terminates metadata sync
|
||||
SELECT current_database() datname \gset
|
||||
CREATE DATABASE db_to_drop;
|
||||
SELECT run_command_on_workers('CREATE DATABASE db_to_drop');
|
||||
|
||||
\c db_to_drop - - :worker_1_port
|
||||
CREATE EXTENSION citus;
|
||||
|
||||
\c db_to_drop - - :master_port
|
||||
CREATE EXTENSION citus;
|
||||
SELECT master_add_node('localhost', :worker_1_port);
|
||||
UPDATE pg_dist_node SET hasmetadata = true;
|
||||
|
||||
SELECT master_update_node(nodeid, 'localhost', 12345) FROM pg_dist_node;
|
||||
|
||||
CREATE OR REPLACE FUNCTION trigger_metadata_sync()
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
SELECT trigger_metadata_sync();
|
||||
|
||||
\c :datname - - :master_port
|
||||
|
||||
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
|
||||
DROP DATABASE db_to_drop;
|
||||
|
||||
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
|
||||
|
||||
-- cleanup
|
||||
DROP TABLE ref_table;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
|
|
|
@ -23,7 +23,7 @@ WITH dist_node_summary AS (
|
|||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||
false)
|
||||
), dist_placement_summary AS (
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement)' AS query
|
||||
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY shardid) FROM pg_dist_placement' AS query
|
||||
), dist_placement_check AS (
|
||||
SELECT count(distinct result) = 1 AS matches
|
||||
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int > 12 AS server_version_above_twelve
|
||||
\gset
|
||||
\if :server_version_above_twelve
|
||||
\else
|
||||
\q
|
||||
\endif
|
||||
|
||||
CREATE SCHEMA "statistics'TestTarget";
|
||||
SET search_path TO "statistics'TestTarget";
|
||||
SET citus.next_shard_id TO 980000;
|
||||
SET client_min_messages TO WARNING;
|
||||
SET citus.shard_count TO 32;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE TABLE t1 (a int, b int);
|
||||
CREATE STATISTICS s1 ON a,b FROM t1;
|
||||
CREATE STATISTICS s2 ON a,b FROM t1;
|
||||
CREATE STATISTICS s3 ON a,b FROM t1;
|
||||
CREATE STATISTICS s4 ON a,b FROM t1;
|
||||
|
||||
-- test altering stats target
|
||||
-- test alter target before distribution
|
||||
ALTER STATISTICS s1 SET STATISTICS 3;
|
||||
-- since max value for target is 10000, this will automatically be lowered
|
||||
ALTER STATISTICS s2 SET STATISTICS 999999;
|
||||
|
||||
SELECT create_distributed_table('t1', 'b');
|
||||
|
||||
-- test alter target before distribution
|
||||
ALTER STATISTICS s3 SET STATISTICS 46;
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT stxstattarget, stxrelid::regclass
|
||||
FROM pg_statistic_ext
|
||||
WHERE stxnamespace IN (
|
||||
SELECT oid
|
||||
FROM pg_namespace
|
||||
WHERE nspname IN ('statistics''TestTarget')
|
||||
)
|
||||
ORDER BY stxstattarget, stxrelid::regclass ASC;
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA "statistics'TestTarget" CASCADE;
|
|
@ -57,6 +57,24 @@ DROP STATISTICS IF EXISTS s3, sc2.s4, s6;
|
|||
DROP STATISTICS s5,s6;
|
||||
DROP STATISTICS IF EXISTS s5,s5,s6,s6;
|
||||
|
||||
-- test renaming statistics
|
||||
CREATE STATISTICS s6 ON a,b FROM test_stats4;
|
||||
DROP STATISTICS s7;
|
||||
ALTER STATISTICS s6 RENAME TO s7;
|
||||
ALTER STATISTICS sc1.st1 RENAME TO st1_new;
|
||||
|
||||
-- test altering stats schema
|
||||
CREATE SCHEMA test_alter_schema;
|
||||
ALTER STATISTICS s7 SET SCHEMA test_alter_schema;
|
||||
|
||||
-- test alter owner
|
||||
ALTER STATISTICS sc2."neW'Stat" OWNER TO pg_monitor;
|
||||
-- test alter owner before distribution
|
||||
CREATE TABLE ownertest(a int, b int);
|
||||
CREATE STATISTICS sc1.s9 ON a,b FROM ownertest;
|
||||
ALTER STATISTICS sc1.s9 OWNER TO pg_signal_backend;
|
||||
SELECT create_distributed_table('ownertest','a');
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT stxname
|
||||
FROM pg_statistic_ext
|
||||
|
@ -75,8 +93,17 @@ WHERE stxnamespace IN (
|
|||
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
|
||||
);
|
||||
|
||||
SELECT COUNT(DISTINCT stxowner)
|
||||
FROM pg_statistic_ext
|
||||
WHERE stxnamespace IN (
|
||||
SELECT oid
|
||||
FROM pg_namespace
|
||||
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
|
||||
);
|
||||
|
||||
\c - - - :master_port
|
||||
SET client_min_messages TO WARNING;
|
||||
DROP SCHEMA "statistics'Test" CASCADE;
|
||||
DROP SCHEMA test_alter_schema CASCADE;
|
||||
DROP SCHEMA sc1 CASCADE;
|
||||
DROP SCHEMA sc2 CASCADE;
|
||||
|
|
Loading…
Reference in New Issue