Merge pull request #4395 from citusdata/propagate-create-statistics

Propagate create statistics
pull/4417/head
Ahmet Gedemenli 2020-12-18 18:18:35 +03:00 committed by GitHub
commit fb06f7e57e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 1108 additions and 3 deletions

View File

@ -261,7 +261,7 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
/* /*
* Depending on changes in the environment, such as the enable_object_propagation guc * Depending on changes in the environment, such as the enable_object_propagation guc
* there might be objects in the distributed object address list that should currently * there might be objects in the distributed object address list that should currently
* not be propagated by citus as the are 'not supported'. * not be propagated by citus as they are 'not supported'.
*/ */
dependencies = FilterObjectAddressListByPredicate(dependencies, dependencies = FilterObjectAddressListByPredicate(dependencies,
&SupportedDependencyByCitus); &SupportedDependencyByCitus);

View File

@ -155,6 +155,13 @@ static DistributeObjectOps Any_CreatePolicy = {
.postprocess = NULL, .postprocess = NULL,
.address = NULL, .address = NULL,
}; };
static DistributeObjectOps Any_CreateStatistics = {
.deparse = DeparseCreateStatisticsStmt,
.qualify = QualifyCreateStatisticsStmt,
.preprocess = PreprocessCreateStatisticsStmt,
.postprocess = PostprocessCreateStatisticsStmt,
.address = CreateStatisticsStmtObjectAddress,
};
static DistributeObjectOps Any_CreateTrigger = { static DistributeObjectOps Any_CreateTrigger = {
.deparse = NULL, .deparse = NULL,
.qualify = NULL, .qualify = NULL,
@ -717,6 +724,11 @@ GetDistributeObjectOps(Node *node)
return &Any_CreatePolicy; return &Any_CreatePolicy;
} }
case T_CreateStatsStmt:
{
return &Any_CreateStatistics;
}
case T_CreateTrigStmt: case T_CreateTrigStmt:
{ {
return &Any_CreateTrigger; return &Any_CreateTrigger;

View File

@ -0,0 +1,250 @@
/*-------------------------------------------------------------------------
*
* statistics.c
* Commands for STATISTICS statements.
*
* We currently support replicating statistics definitions on the
* coordinator in all the worker nodes in the form of
*
* CREATE STATISTICS ... queries.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_statistic_ext.h"
#include "catalog/pg_type.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/commands.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/namespace_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/ruleutils.h"
#include "utils/syscache.h"
static List * GetExplicitStatisticsIdList(Oid relationId);
/*
* PreprocessCreateStatisticsStmt is called during the planning phase for
* CREATE STATISTICS.
*/
List *
PreprocessCreateStatisticsStmt(Node *node, const char *queryString)
{
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
RangeVar *relation = (RangeVar *) linitial(stmt->relations);
Oid relationId = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, false);
if (!IsCitusTable(relationId) || !ShouldPropagate())
{
return NIL;
}
EnsureCoordinator();
QualifyTreeNode((Node *) stmt);
char *ddlCommand = DeparseTreeNode((Node *) stmt);
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->startNewTransaction = false;
ddlJob->commandString = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
List *ddlJobs = list_make1(ddlJob);
return ddlJobs;
}
/*
* PostprocessCreateStatisticsStmt is called after a CREATE STATISTICS command has
* been executed by standard process utility.
*/
List *
PostprocessCreateStatisticsStmt(Node *node, const char *queryString)
{
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
Assert(stmt->type == T_CreateStatsStmt);
RangeVar *relation = (RangeVar *) linitial(stmt->relations);
Oid relationId = RangeVarGetRelid(relation, ShareUpdateExclusiveLock, false);
if (!IsCitusTable(relationId) || !ShouldPropagate())
{
return NIL;
}
bool missingOk = false;
ObjectAddress objectAddress = GetObjectAddressFromParseTree((Node *) stmt, missingOk);
EnsureDependenciesExistOnAllNodes(&objectAddress);
return NIL;
}
/*
* CreateStatisticsStmtObjectAddress finds the ObjectAddress for the statistics that
* is created by given CreateStatsStmt. If missingOk is false and if statistics
* does not exist, then it errors out.
*
* Never returns NULL, but the objid in the address can be invalid if missingOk
* was set to true.
*/
ObjectAddress
CreateStatisticsStmtObjectAddress(Node *node, bool missingOk)
{
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
ObjectAddress address = { 0 };
Oid statsOid = get_statistics_object_oid(stmt->defnames, missingOk);
ObjectAddressSet(address, StatisticExtRelationId, statsOid);
return address;
}
/*
* GetExplicitStatisticsCommandList returns the list of DDL commands to create
* statistics that are explicitly created for the table with relationId. See
* comment of GetExplicitStatisticsIdList function.
*/
List *
GetExplicitStatisticsCommandList(Oid relationId)
{
List *createStatisticsCommandList = NIL;
PushOverrideEmptySearchPath(CurrentMemoryContext);
List *statisticsIdList = GetExplicitStatisticsIdList(relationId);
Oid statisticsId = InvalidOid;
foreach_oid(statisticsId, statisticsIdList)
{
char *createStatisticsCommand = pg_get_statisticsobj_worker(statisticsId, false);
createStatisticsCommandList = lappend(
createStatisticsCommandList,
makeTableDDLCommandString(createStatisticsCommand));
}
/* revert back to original search_path */
PopOverrideSearchPath();
return createStatisticsCommandList;
}
/*
* GetExplicitStatisticsSchemaIdList returns the list of schema ids of statistics'
* which are created on relation with given relation id.
*/
List *
GetExplicitStatisticsSchemaIdList(Oid relationId)
{
List *schemaIdList = NIL;
Relation pgStatistics = table_open(StatisticExtRelationId, AccessShareLock);
int scanKeyCount = 1;
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_statistic_ext_stxrelid,
BTEqualStrategyNumber, F_OIDEQ, relationId);
bool useIndex = true;
SysScanDesc scanDescriptor = systable_beginscan(pgStatistics,
StatisticExtRelidIndexId,
useIndex, NULL, scanKeyCount,
scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
FormData_pg_statistic_ext *statisticsForm =
(FormData_pg_statistic_ext *) GETSTRUCT(heapTuple);
Oid schemaId = statisticsForm->stxnamespace;
if (!list_member_oid(schemaIdList, schemaId))
{
schemaIdList = lappend_oid(schemaIdList, schemaId);
}
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(pgStatistics, NoLock);
return schemaIdList;
}
/*
* GetExplicitStatisticsIdList returns a list of OIDs corresponding to the statistics
* that are explicitly created on the relation with relationId. That means,
* this function discards internal statistics implicitly created by postgres.
*/
static List *
GetExplicitStatisticsIdList(Oid relationId)
{
List *statisticsIdList = NIL;
Relation pgStatistics = table_open(StatisticExtRelationId, AccessShareLock);
int scanKeyCount = 1;
ScanKeyData scanKey[1];
ScanKeyInit(&scanKey[0], Anum_pg_statistic_ext_stxrelid,
BTEqualStrategyNumber, F_OIDEQ, relationId);
bool useIndex = true;
SysScanDesc scanDescriptor = systable_beginscan(pgStatistics,
StatisticExtRelidIndexId,
useIndex, NULL, scanKeyCount,
scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Oid statisticsId = InvalidOid;
#if PG_VERSION_NUM >= PG_VERSION_12
FormData_pg_statistic_ext *statisticsForm =
(FormData_pg_statistic_ext *) GETSTRUCT(heapTuple);
statisticsId = statisticsForm->oid;
#else
statisticsId = HeapTupleGetOid(heapTuple);
#endif
statisticsIdList = lappend_oid(statisticsIdList, statisticsId);
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
table_close(pgStatistics, NoLock);
return statisticsIdList;
}

View File

@ -2,8 +2,9 @@
* *
* deparse_schema_stmts.c * deparse_schema_stmts.c
* All routines to deparse schema statements. * All routines to deparse schema statements.
* This file contains all entry points specific for type statement deparsing as well as * This file contains all entry points specific for schema statement deparsing
* functions that are currently only used for deparsing of the schema statements. * as well as functions that are currently only used for deparsing of the
* schema statements.
* *
* Copyright (c) Citus Data, Inc. * Copyright (c) Citus Data, Inc.
* *

View File

@ -0,0 +1,137 @@
/*-------------------------------------------------------------------------
*
* deparse_statistics_stmts.c
* All routines to deparse statistics statements.
* This file contains all entry points specific for statistics statement deparsing
* as well as functions that are currently only used for deparsing of the statistics
* statements.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "distributed/relay_utility.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
#include "utils/builtins.h"
static void AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt);
static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt);
static void AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt);
static void AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt);
static void AppendTableName(StringInfo buf, CreateStatsStmt *stmt);
char *
DeparseCreateStatisticsStmt(Node *node)
{
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
StringInfoData str;
initStringInfo(&str);
AppendCreateStatisticsStmt(&str, stmt);
return str.data;
}
static void
AppendCreateStatisticsStmt(StringInfo buf, CreateStatsStmt *stmt)
{
appendStringInfoString(buf, "CREATE STATISTICS ");
if (stmt->if_not_exists)
{
appendStringInfoString(buf, "IF NOT EXISTS ");
}
AppendStatisticsName(buf, stmt);
AppendStatTypes(buf, stmt);
appendStringInfoString(buf, " ON ");
AppendColumnNames(buf, stmt);
appendStringInfoString(buf, " FROM ");
AppendTableName(buf, stmt);
appendStringInfoString(buf, ";");
}
static void
AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt)
{
Value *schemaNameVal = (Value *) linitial(stmt->defnames);
const char *schemaName = quote_identifier(strVal(schemaNameVal));
Value *statNameVal = (Value *) lsecond(stmt->defnames);
const char *statName = quote_identifier(strVal(statNameVal));
appendStringInfo(buf, "%s.%s", schemaName, statName);
}
static void
AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt)
{
if (list_length(stmt->stat_types) == 0)
{
return;
}
appendStringInfoString(buf, " (");
Value *statType = NULL;
foreach_ptr(statType, stmt->stat_types)
{
appendStringInfoString(buf, strVal(statType));
if (statType != llast(stmt->stat_types))
{
appendStringInfoString(buf, ", ");
}
}
appendStringInfoString(buf, ")");
}
static void
AppendColumnNames(StringInfo buf, CreateStatsStmt *stmt)
{
ColumnRef *column = NULL;
foreach_ptr(column, stmt->exprs)
{
Assert(IsA(column, ColumnRef));
char *columnName = NameListToQuotedString(column->fields);
appendStringInfoString(buf, columnName);
if (column != llast(stmt->exprs))
{
appendStringInfoString(buf, ", ");
}
}
}
static void
AppendTableName(StringInfo buf, CreateStatsStmt *stmt)
{
/* statistics' can be created with only one relation */
Assert(list_length(stmt->relations) == 1);
RangeVar *relation = (RangeVar *) linitial(stmt->relations);
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
appendStringInfoString(buf, quote_qualified_identifier(schemaName, relationName));
}

View File

@ -0,0 +1,50 @@
/*-------------------------------------------------------------------------
*
* qualify_statistics_stmt.c
* Functions specialized in fully qualifying all statistics statements.
* These functions are dispatched from qualify.c
*
* Goal would be that the deparser functions for these statements can
* serialize the statement without any external lookups.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/commands.h"
#include "distributed/deparser.h"
#include "distributed/listutils.h"
#include "nodes/parsenodes.h"
#include "nodes/value.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#include "utils/relcache.h"
void
QualifyCreateStatisticsStmt(Node *node)
{
CreateStatsStmt *stmt = castNode(CreateStatsStmt, node);
RangeVar *relation = (RangeVar *) linitial(stmt->relations);
if (relation->schemaname == NULL)
{
Oid tableOid = RelnameGetRelid(relation->relname);
Oid schemaOid = get_rel_namespace(tableOid);
relation->schemaname = get_namespace_name(schemaOid);
}
RangeVar *stat = makeRangeVarFromNameList(stmt->defnames);
if (stat->schemaname == NULL)
{
Oid schemaOid = RangeVarGetCreationNamespace(stat);
stat->schemaname = get_namespace_name(schemaOid);
stmt->defnames = MakeNameListFromRangeVar(stat);
}
}

View File

@ -429,6 +429,7 @@ static void get_from_clause_coldeflist(RangeTblFunction *rtfunc,
deparse_context *context); deparse_context *context);
static void get_tablesample_def(TableSampleClause *tablesample, static void get_tablesample_def(TableSampleClause *tablesample,
deparse_context *context); deparse_context *context);
char *pg_get_statisticsobj_worker(Oid statextid, bool missing_ok);
static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); static char *pg_get_triggerdef_worker(Oid trigid, bool pretty);
static void set_simple_column_names(deparse_namespace *dpns); static void set_simple_column_names(deparse_namespace *dpns);
static void get_opclass_name(Oid opclass, Oid actual_datatype, static void get_opclass_name(Oid opclass, Oid actual_datatype,
@ -7488,6 +7489,124 @@ pg_get_triggerdef_command(Oid triggerId)
return pg_get_triggerdef_worker(triggerId, prettyOutput); return pg_get_triggerdef_worker(triggerId, prettyOutput);
} }
char *
pg_get_statisticsobj_worker(Oid statextid, bool missing_ok)
{
StringInfoData buf;
int colno;
bool isnull;
int i;
HeapTuple statexttup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statextid));
if (!HeapTupleIsValid(statexttup))
{
if (missing_ok)
{
return NULL;
}
elog(ERROR, "cache lookup failed for statistics object %u", statextid);
}
Form_pg_statistic_ext statextrec = (Form_pg_statistic_ext) GETSTRUCT(statexttup);
initStringInfo(&buf);
char *nsp = get_namespace_name(statextrec->stxnamespace);
appendStringInfo(&buf, "CREATE STATISTICS %s",
quote_qualified_identifier(nsp,
NameStr(statextrec->stxname)));
/*
* Decode the stxkind column so that we know which stats types to print.
*/
Datum datum = SysCacheGetAttr(STATEXTOID, statexttup,
Anum_pg_statistic_ext_stxkind, &isnull);
Assert(!isnull);
ArrayType *arr = DatumGetArrayTypeP(datum);
if (ARR_NDIM(arr) != 1 ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != CHAROID)
{
elog(ERROR, "stxkind is not a 1-D char array");
}
char *enabled = (char *) ARR_DATA_PTR(arr);
bool ndistinct_enabled = false;
bool dependencies_enabled = false;
bool mcv_enabled = false;
for (i = 0; i < ARR_DIMS(arr)[0]; i++)
{
if (enabled[i] == STATS_EXT_NDISTINCT)
{
ndistinct_enabled = true;
}
if (enabled[i] == STATS_EXT_DEPENDENCIES)
{
dependencies_enabled = true;
}
}
/*
* If any option is disabled, then we'll need to append the types clause
* to show which options are enabled. We omit the types clause on purpose
* when all options are enabled, so a pg_dump/pg_restore will create all
* statistics types on a newer postgres version, if the statistics had all
* options enabled on the original version.
*/
if (!ndistinct_enabled || !dependencies_enabled || !mcv_enabled)
{
bool gotone = false;
appendStringInfoString(&buf, " (");
if (ndistinct_enabled)
{
appendStringInfoString(&buf, "ndistinct");
gotone = true;
}
if (dependencies_enabled)
{
appendStringInfo(&buf, "%sdependencies", gotone ? ", " : "");
gotone = true;
}
if (mcv_enabled)
{
appendStringInfo(&buf, "%smcv", gotone ? ", " : "");
}
appendStringInfoChar(&buf, ')');
}
appendStringInfoString(&buf, " ON ");
for (colno = 0; colno < statextrec->stxkeys.dim1; colno++)
{
AttrNumber attnum = statextrec->stxkeys.values[colno];
if (colno > 0)
{
appendStringInfoString(&buf, ", ");
}
char *attname = get_attname(statextrec->stxrelid, attnum, false);
appendStringInfoString(&buf, quote_identifier(attname));
}
appendStringInfo(&buf, " FROM %s",
generate_relation_name(statextrec->stxrelid, NIL));
ReleaseSysCache(statexttup);
return buf.data;
}
static char * static char *
pg_get_triggerdef_worker(Oid trigid, bool pretty) pg_get_triggerdef_worker(Oid trigid, bool pretty)
{ {

View File

@ -429,6 +429,7 @@ static void get_from_clause_coldeflist(RangeTblFunction *rtfunc,
deparse_context *context); deparse_context *context);
static void get_tablesample_def(TableSampleClause *tablesample, static void get_tablesample_def(TableSampleClause *tablesample,
deparse_context *context); deparse_context *context);
char *pg_get_statisticsobj_worker(Oid statextid, bool missing_ok);
static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); static char *pg_get_triggerdef_worker(Oid trigid, bool pretty);
static void set_simple_column_names(deparse_namespace *dpns); static void set_simple_column_names(deparse_namespace *dpns);
static void get_opclass_name(Oid opclass, Oid actual_datatype, static void get_opclass_name(Oid opclass, Oid actual_datatype,
@ -7488,6 +7489,128 @@ pg_get_triggerdef_command(Oid triggerId)
return pg_get_triggerdef_worker(triggerId, prettyOutput); return pg_get_triggerdef_worker(triggerId, prettyOutput);
} }
char *
pg_get_statisticsobj_worker(Oid statextid, bool missing_ok)
{
StringInfoData buf;
int colno;
bool isnull;
int i;
HeapTuple statexttup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statextid));
if (!HeapTupleIsValid(statexttup))
{
if (missing_ok)
{
return NULL;
}
elog(ERROR, "cache lookup failed for statistics object %u", statextid);
}
Form_pg_statistic_ext statextrec = (Form_pg_statistic_ext) GETSTRUCT(statexttup);
initStringInfo(&buf);
char *nsp = get_namespace_name(statextrec->stxnamespace);
appendStringInfo(&buf, "CREATE STATISTICS %s",
quote_qualified_identifier(nsp,
NameStr(statextrec->stxname)));
/*
* Decode the stxkind column so that we know which stats types to print.
*/
Datum datum = SysCacheGetAttr(STATEXTOID, statexttup,
Anum_pg_statistic_ext_stxkind, &isnull);
Assert(!isnull);
ArrayType *arr = DatumGetArrayTypeP(datum);
if (ARR_NDIM(arr) != 1 ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != CHAROID)
{
elog(ERROR, "stxkind is not a 1-D char array");
}
char *enabled = (char *) ARR_DATA_PTR(arr);
bool ndistinct_enabled = false;
bool dependencies_enabled = false;
bool mcv_enabled = false;
for (i = 0; i < ARR_DIMS(arr)[0]; i++)
{
if (enabled[i] == STATS_EXT_NDISTINCT)
{
ndistinct_enabled = true;
}
if (enabled[i] == STATS_EXT_DEPENDENCIES)
{
dependencies_enabled = true;
}
if (enabled[i] == STATS_EXT_MCV)
{
mcv_enabled = true;
}
}
/*
* If any option is disabled, then we'll need to append the types clause
* to show which options are enabled. We omit the types clause on purpose
* when all options are enabled, so a pg_dump/pg_restore will create all
* statistics types on a newer postgres version, if the statistics had all
* options enabled on the original version.
*/
if (!ndistinct_enabled || !dependencies_enabled || !mcv_enabled)
{
bool gotone = false;
appendStringInfoString(&buf, " (");
if (ndistinct_enabled)
{
appendStringInfoString(&buf, "ndistinct");
gotone = true;
}
if (dependencies_enabled)
{
appendStringInfo(&buf, "%sdependencies", gotone ? ", " : "");
gotone = true;
}
if (mcv_enabled)
{
appendStringInfo(&buf, "%smcv", gotone ? ", " : "");
}
appendStringInfoChar(&buf, ')');
}
appendStringInfoString(&buf, " ON ");
for (colno = 0; colno < statextrec->stxkeys.dim1; colno++)
{
AttrNumber attnum = statextrec->stxkeys.values[colno];
if (colno > 0)
{
appendStringInfoString(&buf, ", ");
}
char *attname = get_attname(statextrec->stxrelid, attnum, false);
appendStringInfoString(&buf, quote_identifier(attname));
}
appendStringInfo(&buf, " FROM %s",
generate_relation_name(statextrec->stxrelid, NIL));
ReleaseSysCache(statexttup);
return buf.data;
}
static char * static char *
pg_get_triggerdef_worker(Oid trigid, bool pretty) pg_get_triggerdef_worker(Oid trigid, bool pretty)
{ {

View File

@ -434,6 +434,7 @@ static void get_from_clause_coldeflist(RangeTblFunction *rtfunc,
deparse_context *context); deparse_context *context);
static void get_tablesample_def(TableSampleClause *tablesample, static void get_tablesample_def(TableSampleClause *tablesample,
deparse_context *context); deparse_context *context);
char *pg_get_statisticsobj_worker(Oid statextid, bool missing_ok);
static char *pg_get_triggerdef_worker(Oid trigid, bool pretty); static char *pg_get_triggerdef_worker(Oid trigid, bool pretty);
static void set_simple_column_names(deparse_namespace *dpns); static void set_simple_column_names(deparse_namespace *dpns);
static void get_opclass_name(Oid opclass, Oid actual_datatype, static void get_opclass_name(Oid opclass, Oid actual_datatype,
@ -7546,6 +7547,128 @@ pg_get_triggerdef_command(Oid triggerId)
return pg_get_triggerdef_worker(triggerId, prettyOutput); return pg_get_triggerdef_worker(triggerId, prettyOutput);
} }
char *
pg_get_statisticsobj_worker(Oid statextid, bool missing_ok)
{
StringInfoData buf;
int colno;
bool isnull;
int i;
HeapTuple statexttup = SearchSysCache1(STATEXTOID, ObjectIdGetDatum(statextid));
if (!HeapTupleIsValid(statexttup))
{
if (missing_ok)
{
return NULL;
}
elog(ERROR, "cache lookup failed for statistics object %u", statextid);
}
Form_pg_statistic_ext statextrec = (Form_pg_statistic_ext) GETSTRUCT(statexttup);
initStringInfo(&buf);
char *nsp = get_namespace_name(statextrec->stxnamespace);
appendStringInfo(&buf, "CREATE STATISTICS %s",
quote_qualified_identifier(nsp,
NameStr(statextrec->stxname)));
/*
* Decode the stxkind column so that we know which stats types to print.
*/
Datum datum = SysCacheGetAttr(STATEXTOID, statexttup,
Anum_pg_statistic_ext_stxkind, &isnull);
Assert(!isnull);
ArrayType *arr = DatumGetArrayTypeP(datum);
if (ARR_NDIM(arr) != 1 ||
ARR_HASNULL(arr) ||
ARR_ELEMTYPE(arr) != CHAROID)
{
elog(ERROR, "stxkind is not a 1-D char array");
}
char *enabled = (char *) ARR_DATA_PTR(arr);
bool ndistinct_enabled = false;
bool dependencies_enabled = false;
bool mcv_enabled = false;
for (i = 0; i < ARR_DIMS(arr)[0]; i++)
{
if (enabled[i] == STATS_EXT_NDISTINCT)
{
ndistinct_enabled = true;
}
if (enabled[i] == STATS_EXT_DEPENDENCIES)
{
dependencies_enabled = true;
}
if (enabled[i] == STATS_EXT_MCV)
{
mcv_enabled = true;
}
}
/*
* If any option is disabled, then we'll need to append the types clause
* to show which options are enabled. We omit the types clause on purpose
* when all options are enabled, so a pg_dump/pg_restore will create all
* statistics types on a newer postgres version, if the statistics had all
* options enabled on the original version.
*/
if (!ndistinct_enabled || !dependencies_enabled || !mcv_enabled)
{
bool gotone = false;
appendStringInfoString(&buf, " (");
if (ndistinct_enabled)
{
appendStringInfoString(&buf, "ndistinct");
gotone = true;
}
if (dependencies_enabled)
{
appendStringInfo(&buf, "%sdependencies", gotone ? ", " : "");
gotone = true;
}
if (mcv_enabled)
{
appendStringInfo(&buf, "%smcv", gotone ? ", " : "");
}
appendStringInfoChar(&buf, ')');
}
appendStringInfoString(&buf, " ON ");
for (colno = 0; colno < statextrec->stxkeys.dim1; colno++)
{
AttrNumber attnum = statextrec->stxkeys.values[colno];
if (colno > 0)
{
appendStringInfoString(&buf, ", ");
}
char *attname = get_attname(statextrec->stxrelid, attnum, false);
appendStringInfoString(&buf, quote_identifier(attname));
}
appendStringInfo(&buf, " FROM %s",
generate_relation_name(statextrec->stxrelid, NIL));
ReleaseSysCache(statexttup);
return buf.data;
}
static char * static char *
pg_get_triggerdef_worker(Oid trigid, bool pretty) pg_get_triggerdef_worker(Oid trigid, bool pretty)
{ {

View File

@ -21,6 +21,7 @@
#include "catalog/indexing.h" #include "catalog/indexing.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_depend.h" #include "catalog/pg_depend.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_proc_d.h" #include "catalog/pg_proc_d.h"
#include "catalog/pg_rewrite.h" #include "catalog/pg_rewrite.h"
#include "catalog/pg_rewrite_d.h" #include "catalog/pg_rewrite_d.h"
@ -119,6 +120,7 @@ typedef struct ViewDependencyNode
static List * GetRelationTriggerFunctionDepencyList(Oid relationId); static List * GetRelationTriggerFunctionDepencyList(Oid relationId);
static List * GetRelationStatsSchemaDependencyList(Oid relationId);
static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId); static DependencyDefinition * CreateObjectAddressDependencyDef(Oid classId, Oid objectId);
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition); static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
@ -926,6 +928,17 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
List *triggerFunctionDepencyList = List *triggerFunctionDepencyList =
GetRelationTriggerFunctionDepencyList(relationId); GetRelationTriggerFunctionDepencyList(relationId);
result = list_concat(result, triggerFunctionDepencyList); result = list_concat(result, triggerFunctionDepencyList);
/*
* Statistics' both depend to the relations and to the schemas they belong
* to. Also, pg_depend records dependencies from statistics to their schemas
* but not from relations to their statistics' schemas. Given above two,
* we directly expand dependencies for the relations to schemas of
* statistics.
*/
List *statisticsSchemaDependencyList =
GetRelationStatsSchemaDependencyList(relationId);
result = list_concat(result, statisticsSchemaDependencyList);
} }
default: default:
@ -938,6 +951,28 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
} }
/*
* GetRelationStatsSchemaDependencyList returns a list of DependencyDefinition
* objects for the schemas that statistics' of the relation with relationId depends.
*/
static List *
GetRelationStatsSchemaDependencyList(Oid relationId)
{
List *dependencyList = NIL;
List *schemaIds = GetExplicitStatisticsSchemaIdList(relationId);
Oid schemaId = InvalidOid;
foreach_oid(schemaId, schemaIds)
{
DependencyDefinition *dependency =
CreateObjectAddressDependencyDef(NamespaceRelationId, schemaId);
dependencyList = lappend(dependencyList, dependency);
}
return dependencyList;
}
/* /*
* GetRelationTriggerFunctionDepencyList returns a list of DependencyDefinition * GetRelationTriggerFunctionDepencyList returns a list of DependencyDefinition
* objects for the functions that triggers of the relation with relationId depends. * objects for the functions that triggers of the relation with relationId depends.

View File

@ -571,6 +571,9 @@ GetPostLoadTableCreationCommands(Oid relationId)
List *triggerCommands = GetExplicitTriggerCommandList(relationId); List *triggerCommands = GetExplicitTriggerCommandList(relationId);
tableDDLEventList = list_concat(tableDDLEventList, triggerCommands); tableDDLEventList = list_concat(tableDDLEventList, triggerCommands);
List *statisticsCommands = GetExplicitStatisticsCommandList(relationId);
tableDDLEventList = list_concat(tableDDLEventList, statisticsCommands);
return tableDDLEventList; return tableDDLEventList;
} }

View File

@ -522,6 +522,27 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
break; break;
} }
case T_CreateStatsStmt:
{
CreateStatsStmt *createStatsStmt = (CreateStatsStmt *) parseTree;
/* because CREATE STATISTICS statements can only have one relation */
RangeVar *relation = linitial(createStatsStmt->relations);
char **relationName = &(relation->relname);
char **objectSchemaName = &(relation->schemaname);
SetSchemaNameIfNotExist(objectSchemaName, schemaName);
AppendShardIdToName(relationName, shardId);
RangeVar *stat = makeRangeVarFromNameList(createStatsStmt->defnames);
AppendShardIdToName(&stat->relname, shardId);
createStatsStmt->defnames = MakeNameListFromRangeVar(stat);
break;
}
case T_TruncateStmt: case T_TruncateStmt:
{ {
/* /*

View File

@ -48,6 +48,7 @@ char * pg_get_rule_expr(Node *expression);
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid,
StringInfo buffer); StringInfo buffer);
extern char * pg_get_triggerdef_command(Oid triggerId); extern char * pg_get_triggerdef_command(Oid triggerId);
extern char * pg_get_statisticsobj_worker(Oid statextid, bool missing_ok);
extern char * generate_relation_name(Oid relid, List *namespaces); extern char * generate_relation_name(Oid relid, List *namespaces);
extern char * generate_qualified_relation_name(Oid relid); extern char * generate_qualified_relation_name(Oid relid);
extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2); extern char * generate_operator_name(Oid operid, Oid arg1, Oid arg2);

View File

@ -239,6 +239,12 @@ extern ObjectAddress AlterSchemaRenameStmtObjectAddress(Node *node, bool missing
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt); extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt); extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
/* statistics.c - forward declarations */
extern List * PreprocessCreateStatisticsStmt(Node *node, const char *queryString);
extern List * PostprocessCreateStatisticsStmt(Node *node, const char *queryString);
extern ObjectAddress CreateStatisticsStmtObjectAddress(Node *node, bool missingOk);
extern List * GetExplicitStatisticsCommandList(Oid relationId);
extern List * GetExplicitStatisticsSchemaIdList(Oid relationId);
/* subscription.c - forward declarations */ /* subscription.c - forward declarations */
extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt); extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt);

View File

@ -55,6 +55,11 @@ extern void QualifyAlterTableSchemaStmt(Node *stmt);
extern char * DeparseGrantOnSchemaStmt(Node *stmt); extern char * DeparseGrantOnSchemaStmt(Node *stmt);
extern char * DeparseAlterSchemaRenameStmt(Node *stmt); extern char * DeparseAlterSchemaRenameStmt(Node *stmt);
/* forward declarations for deparse_statistics_stmts.c */
extern char * DeparseCreateStatisticsStmt(Node *node);
extern void QualifyCreateStatisticsStmt(Node *node);
/* forward declarations for deparse_type_stmts.c */ /* forward declarations for deparse_type_stmts.c */
extern char * DeparseCompositeTypeStmt(Node *stmt); extern char * DeparseCompositeTypeStmt(Node *stmt);
extern char * DeparseCreateEnumStmt(Node *stmt); extern char * DeparseCreateEnumStmt(Node *stmt);

View File

@ -0,0 +1,147 @@
CREATE SCHEMA "statistics'Test";
SET search_path TO "statistics'Test";
SET citus.next_shard_id TO 980000;
SET client_min_messages TO WARNING;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
-- test create statistics propagation
CREATE TABLE test_stats (
a int,
b int
);
SELECT create_distributed_table('test_stats', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics
CREATE TABLE "test'stats2" (
a int,
b int
);
CREATE STATISTICS s2 (dependencies) ON a, b FROM "test'stats2";
SELECT create_distributed_table('test''stats2', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- test when stats is on a different schema
CREATE SCHEMA sc1;
CREATE TABLE tbl (a int, "B" text);
SELECT create_distributed_table ('tbl', 'a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE STATISTICS sc1.st1 ON a, "B" FROM tbl;
-- test distributing table with already created stats on a new schema
CREATE TABLE test_stats3 (
a int,
b int
);
CREATE SCHEMA sc2;
CREATE STATISTICS sc2."neW'Stat" ON a,b FROM test_stats3;
SELECT create_distributed_table ('test_stats3','a');
create_distributed_table
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
SELECT stxname
FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
)
ORDER BY stxname ASC;
stxname
---------------------------------------------------------------------
neW'Stat_980096
neW'Stat_980098
neW'Stat_980100
neW'Stat_980102
neW'Stat_980104
neW'Stat_980106
neW'Stat_980108
neW'Stat_980110
neW'Stat_980112
neW'Stat_980114
neW'Stat_980116
neW'Stat_980118
neW'Stat_980120
neW'Stat_980122
neW'Stat_980124
neW'Stat_980126
s1_980000
s1_980002
s1_980004
s1_980006
s1_980008
s1_980010
s1_980012
s1_980014
s1_980016
s1_980018
s1_980020
s1_980022
s1_980024
s1_980026
s1_980028
s1_980030
s2_980032
s2_980034
s2_980036
s2_980038
s2_980040
s2_980042
s2_980044
s2_980046
s2_980048
s2_980050
s2_980052
s2_980054
s2_980056
s2_980058
s2_980060
s2_980062
st1_980064
st1_980066
st1_980068
st1_980070
st1_980072
st1_980074
st1_980076
st1_980078
st1_980080
st1_980082
st1_980084
st1_980086
st1_980088
st1_980090
st1_980092
st1_980094
(64 rows)
SELECT count(DISTINCT stxnamespace)
FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
);
count
---------------------------------------------------------------------
3
(1 row)
\c - - - :master_port
SET client_min_messages TO WARNING;
DROP SCHEMA "statistics'Test" CASCADE;
DROP SCHEMA sc1 CASCADE;
DROP SCHEMA sc2 CASCADE;

View File

@ -91,6 +91,11 @@ test: cte_inline recursive_view_local_table
test: pg13 pg12 test: pg13 pg12
test: tableam test: tableam
# ----------
# Tests for statistics propagation
# ----------
test: propagate_statistics
# ---------- # ----------
# Miscellaneous tests to check our query planning behavior # Miscellaneous tests to check our query planning behavior
# ---------- # ----------

View File

@ -0,0 +1,67 @@
CREATE SCHEMA "statistics'Test";
SET search_path TO "statistics'Test";
SET citus.next_shard_id TO 980000;
SET client_min_messages TO WARNING;
SET citus.shard_count TO 32;
SET citus.shard_replication_factor TO 1;
-- test create statistics propagation
CREATE TABLE test_stats (
a int,
b int
);
SELECT create_distributed_table('test_stats', 'a');
CREATE STATISTICS s1 (dependencies) ON a, b FROM test_stats;
-- test for distributing an already existing statistics
CREATE TABLE "test'stats2" (
a int,
b int
);
CREATE STATISTICS s2 (dependencies) ON a, b FROM "test'stats2";
SELECT create_distributed_table('test''stats2', 'a');
-- test when stats is on a different schema
CREATE SCHEMA sc1;
CREATE TABLE tbl (a int, "B" text);
SELECT create_distributed_table ('tbl', 'a');
CREATE STATISTICS sc1.st1 ON a, "B" FROM tbl;
-- test distributing table with already created stats on a new schema
CREATE TABLE test_stats3 (
a int,
b int
);
CREATE SCHEMA sc2;
CREATE STATISTICS sc2."neW'Stat" ON a,b FROM test_stats3;
SELECT create_distributed_table ('test_stats3','a');
\c - - - :worker_1_port
SELECT stxname
FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
)
ORDER BY stxname ASC;
SELECT count(DISTINCT stxnamespace)
FROM pg_statistic_ext
WHERE stxnamespace IN (
SELECT oid
FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
);
\c - - - :master_port
SET client_min_messages TO WARNING;
DROP SCHEMA "statistics'Test" CASCADE;
DROP SCHEMA sc1 CASCADE;
DROP SCHEMA sc2 CASCADE;