mirror of https://github.com/citusdata/citus.git
Add a utility to process each table index (#4382)
A utility function is added so that each caller can implement a handler for each index on a given table. This means that the caller doesn't need to worry about how to access each index, the only thing that it needs to do each to implement a function to which each index on the table is passed iteratively.pull/4309/head
parent
746b36103e
commit
f164575524
|
@ -32,6 +32,7 @@
|
||||||
#include "distributed/multi_executor.h"
|
#include "distributed/multi_executor.h"
|
||||||
#include "distributed/multi_physical_planner.h"
|
#include "distributed/multi_physical_planner.h"
|
||||||
#include "distributed/multi_partitioning_utils.h"
|
#include "distributed/multi_partitioning_utils.h"
|
||||||
|
#include "distributed/namespace_utils.h"
|
||||||
#include "distributed/resource_lock.h"
|
#include "distributed/resource_lock.h"
|
||||||
#include "distributed/relation_access_tracking.h"
|
#include "distributed/relation_access_tracking.h"
|
||||||
#include "distributed/relation_utils.h"
|
#include "distributed/relation_utils.h"
|
||||||
|
@ -41,6 +42,7 @@
|
||||||
#include "nodes/parsenodes.h"
|
#include "nodes/parsenodes.h"
|
||||||
#include "storage/lmgr.h"
|
#include "storage/lmgr.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/inval.h"
|
#include "utils/inval.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
#include "utils/syscache.h"
|
#include "utils/syscache.h"
|
||||||
|
@ -248,6 +250,50 @@ CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ExecuteFunctionOnEachTableIndex executes the given pgIndexProcessor function on each
|
||||||
|
* index of the given relation.
|
||||||
|
* It returns a list that is filled by the pgIndexProcessor.
|
||||||
|
*/
|
||||||
|
List *
|
||||||
|
ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor pgIndexProcessor)
|
||||||
|
{
|
||||||
|
List *result = NIL;
|
||||||
|
ScanKeyData scanKey[1];
|
||||||
|
int scanKeyCount = 1;
|
||||||
|
|
||||||
|
PushOverrideEmptySearchPath(CurrentMemoryContext);
|
||||||
|
|
||||||
|
/* open system catalog and scan all indexes that belong to this table */
|
||||||
|
Relation pgIndex = table_open(IndexRelationId, AccessShareLock);
|
||||||
|
|
||||||
|
ScanKeyInit(&scanKey[0], Anum_pg_index_indrelid,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ, relationId);
|
||||||
|
|
||||||
|
SysScanDesc scanDescriptor = systable_beginscan(pgIndex,
|
||||||
|
IndexIndrelidIndexId, true, /* indexOK */
|
||||||
|
NULL, scanKeyCount, scanKey);
|
||||||
|
|
||||||
|
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
while (HeapTupleIsValid(heapTuple))
|
||||||
|
{
|
||||||
|
Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple);
|
||||||
|
pgIndexProcessor(indexForm, &result);
|
||||||
|
|
||||||
|
heapTuple = systable_getnext(scanDescriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* clean up scan and close system catalog */
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
table_close(pgIndex, AccessShareLock);
|
||||||
|
|
||||||
|
/* revert back to original search_path */
|
||||||
|
PopOverrideSearchPath();
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* SwitchToSequentialExecutionIfIndexNameTooLong generates the longest index name
|
* SwitchToSequentialExecutionIfIndexNameTooLong generates the longest index name
|
||||||
* on the shards of the partitions, and if exceeds the limit switches to the
|
* on the shards of the partitions, and if exceeds the limit switches to the
|
||||||
|
|
|
@ -76,7 +76,8 @@ int NextPlacementId = 0;
|
||||||
|
|
||||||
static List * GetTableReplicaIdentityCommand(Oid relationId);
|
static List * GetTableReplicaIdentityCommand(Oid relationId);
|
||||||
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
|
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
|
||||||
|
static void GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm,
|
||||||
|
List **indexDDLEventList);
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
PG_FUNCTION_INFO_V1(master_get_table_metadata);
|
PG_FUNCTION_INFO_V1(master_get_table_metadata);
|
||||||
|
@ -684,27 +685,17 @@ GetPreLoadTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
|
||||||
List *
|
List *
|
||||||
GetTableIndexAndConstraintCommands(Oid relationId)
|
GetTableIndexAndConstraintCommands(Oid relationId)
|
||||||
{
|
{
|
||||||
List *indexDDLEventList = NIL;
|
return ExecuteFunctionOnEachTableIndex(relationId,
|
||||||
ScanKeyData scanKey[1];
|
GatherIndexAndConstraintDefinitionList);
|
||||||
int scanKeyCount = 1;
|
}
|
||||||
|
|
||||||
PushOverrideEmptySearchPath(CurrentMemoryContext);
|
|
||||||
|
|
||||||
/* open system catalog and scan all indexes that belong to this table */
|
/*
|
||||||
Relation pgIndex = table_open(IndexRelationId, AccessShareLock);
|
* GatherIndexAndConstraintDefinitionList adds the DDL command for the given index.
|
||||||
|
*/
|
||||||
ScanKeyInit(&scanKey[0], Anum_pg_index_indrelid,
|
static void
|
||||||
BTEqualStrategyNumber, F_OIDEQ, relationId);
|
GatherIndexAndConstraintDefinitionList(Form_pg_index indexForm, List **indexDDLEventList)
|
||||||
|
{
|
||||||
SysScanDesc scanDescriptor = systable_beginscan(pgIndex,
|
|
||||||
IndexIndrelidIndexId,
|
|
||||||
true, /* indexOK */
|
|
||||||
NULL, scanKeyCount, scanKey);
|
|
||||||
|
|
||||||
HeapTuple heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
while (HeapTupleIsValid(heapTuple))
|
|
||||||
{
|
|
||||||
Form_pg_index indexForm = (Form_pg_index) GETSTRUCT(heapTuple);
|
|
||||||
Oid indexId = indexForm->indexrelid;
|
Oid indexId = indexForm->indexrelid;
|
||||||
char *statementDef = NULL;
|
char *statementDef = NULL;
|
||||||
|
|
||||||
|
@ -724,7 +715,7 @@ GetTableIndexAndConstraintCommands(Oid relationId)
|
||||||
}
|
}
|
||||||
|
|
||||||
/* append found constraint or index definition to the list */
|
/* append found constraint or index definition to the list */
|
||||||
indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString(
|
*indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString(
|
||||||
statementDef));
|
statementDef));
|
||||||
|
|
||||||
/* if table is clustered on this index, append definition to the list */
|
/* if table is clustered on this index, append definition to the list */
|
||||||
|
@ -733,21 +724,9 @@ GetTableIndexAndConstraintCommands(Oid relationId)
|
||||||
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
|
char *clusteredDef = pg_get_indexclusterdef_string(indexId);
|
||||||
Assert(clusteredDef != NULL);
|
Assert(clusteredDef != NULL);
|
||||||
|
|
||||||
indexDDLEventList = lappend(indexDDLEventList, makeTableDDLCommandString(
|
*indexDDLEventList = lappend(*indexDDLEventList, makeTableDDLCommandString(
|
||||||
clusteredDef));
|
clusteredDef));
|
||||||
}
|
}
|
||||||
|
|
||||||
heapTuple = systable_getnext(scanDescriptor);
|
|
||||||
}
|
|
||||||
|
|
||||||
/* clean up scan and close system catalog */
|
|
||||||
systable_endscan(scanDescriptor);
|
|
||||||
table_close(pgIndex, AccessShareLock);
|
|
||||||
|
|
||||||
/* revert back to original search_path */
|
|
||||||
PopOverrideSearchPath();
|
|
||||||
|
|
||||||
return indexDDLEventList;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -67,6 +67,8 @@ typedef enum ExtractForeignKeyConstraintsMode
|
||||||
/* cluster.c - forward declarations */
|
/* cluster.c - forward declarations */
|
||||||
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
|
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
|
||||||
|
|
||||||
|
/* index.c */
|
||||||
|
typedef void (*PGIndexProcessor)(Form_pg_index, List **);
|
||||||
|
|
||||||
/* call.c */
|
/* call.c */
|
||||||
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);
|
extern bool CallDistributedProcedureRemotely(CallStmt *callStmt, DestReceiver *dest);
|
||||||
|
@ -183,6 +185,8 @@ extern List * PostprocessIndexStmt(Node *node,
|
||||||
const char *queryString);
|
const char *queryString);
|
||||||
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
|
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
|
||||||
extern void MarkIndexValid(IndexStmt *indexStmt);
|
extern void MarkIndexValid(IndexStmt *indexStmt);
|
||||||
|
extern List * ExecuteFunctionOnEachTableIndex(Oid relationId, PGIndexProcessor
|
||||||
|
pgIndexProcessor);
|
||||||
|
|
||||||
/* objectaddress.c - forward declarations */
|
/* objectaddress.c - forward declarations */
|
||||||
extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok);
|
extern ObjectAddress CreateExtensionStmtObjectAddress(Node *stmt, bool missing_ok);
|
||||||
|
|
Loading…
Reference in New Issue