Description: Refactor code that handles DDL commands from one file into a module

The file handling the utility functions (DDL) for citus organically grew over time and became unreasonably large. This refactor takes that file and refactored the functionality into separate files per command. Initially modeled after the directory and file layout that can be found in postgres.

Although the size of the change is quite big there are barely any code changes. Only one two functions have been added for readability purposes:

- PostProcessIndexStmt which is extracted from PostProcessUtility
- PostProcessAlterTableStmt which is extracted from multi_ProcessUtility

A README.md has been added to `src/backend/distributed/commands` describing the contents of the module and every file in the module.
We need more documentation around the overloading of the COPY command, for now the boilerplate has been added for people with better knowledge to fill out.
pull/2469/head
Marco Slot 2018-10-11 00:52:19 +02:00 committed by Nils Dijk
parent ce463e9812
commit f383e4f307
No known key found for this signature in database
GPG Key ID: CA1177EF9434F241
47 changed files with 4945 additions and 4779 deletions

View File

@ -5,16 +5,28 @@ EXTENSION = citus
MODULE_big = citus
OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/commands/cluster.o \
src/backend/distributed/commands/create_distributed_table.o \
src/backend/distributed/commands/drop_distributed_table.o \
src/backend/distributed/commands/extension.o \
src/backend/distributed/commands/foreign_constraint.o \
src/backend/distributed/commands/grant.o \
src/backend/distributed/commands/index.o \
src/backend/distributed/commands/multi_copy.o \
src/backend/distributed/commands/policy.o \
src/backend/distributed/commands/rename.o \
src/backend/distributed/commands/schema.o \
src/backend/distributed/commands/sequence.o \
src/backend/distributed/commands/subscription.o \
src/backend/distributed/commands/table.o \
src/backend/distributed/commands/transmit.o \
src/backend/distributed/commands/truncate.o \
src/backend/distributed/commands/utility_hook.o \
src/backend/distributed/commands/vacuum.o \
src/backend/distributed/connection/connection_configuration.o \
src/backend/distributed/connection/connection_management.o \
src/backend/distributed/connection/placement_connection.o \
src/backend/distributed/connection/remote_commands.o \
src/backend/distributed/ddl/foreign_constraint.o \
src/backend/distributed/ddl/policy.o \
src/backend/distributed/executor/citus_custom_scan.o \
src/backend/distributed/executor/insert_select_executor.o \
src/backend/distributed/executor/intermediate_results.o \
@ -24,7 +36,6 @@ OBJS = src/backend/distributed/shared_library_init.o \
src/backend/distributed/executor/multi_router_executor.o \
src/backend/distributed/executor/multi_server_executor.o \
src/backend/distributed/executor/multi_task_tracker_executor.o \
src/backend/distributed/executor/multi_utility.o \
src/backend/distributed/executor/query_stats.o \
src/backend/distributed/executor/subplan_execution.o \
src/backend/distributed/master/citus_create_restore_point.o \

View File

@ -0,0 +1,54 @@
# Commands
The commands module is modeled after `backend/commands` from the postgres repository and
contains the logic for Citus on how to run these commands on distributed objects. Even
though the structure of the directory has some resemblence to its postgres relative files
here are somewhat more finegrained. This is due to the nature of citus commands that are
heavily focused on distributed tables. Instead of having all commands in `tablecmds.c`
they are often moved to files that are named after the command.
| File | Description |
|------------------------------|-------------|
| `create_distributed_table.c` | Implementation of UDF's for creating distributed tables |
| `drop_distributed_table.c` | Implementation for dropping metadata for partitions of distributed tables |
| `extension.c` | Implementation of `CREATE EXTENSION` commands for citus specific checks |
| `foreign_constraint.c` | Implementation of and helper functions for foreign key constraints |
| `grant.c` | Placeholder for code granting users access to relations, implemented as enterprise feature |
| `index.c` | Implementation of commands specific to indices on distributed tables |
| `multi_copy.c` | Implementation of `COPY` command. There are multiple different copy modes which are described in detail below |
| `policy.c` | Implementation of `CREATE\ALTER POLICY` commands. |
| `rename.c` | Implementation of `ALTER ... RENAME ...` commands. It implements the renaming of applicable objects, otherwise provides the user with a warning |
| `schema.c` | |
| `sequence.c` | Implementation of `CREATE/ALTER SEQUENCE` commands. Primarily checks correctness of sequence statements as they are not propagated to the worker nodes |
| `table.c` | |
| `transmit.c` | Implementation of `COPY` commands with `format transmit` set in the options. This format is used to transfer files from one node to another node |
| `truncate.c` | Implementation of `TRUNCATE` commands on distributed tables |
| `utility_hook.c` | This is the entry point from postgres into the commands module of citus. It contains the implementation that gets registered in postgres' `ProcessUtility_hook` callback to extends the functionality of the original ProcessUtility. This code is used to route the incomming commands to their respective implementation in Citus |
| `vacuum.c` | Implementation of `VACUUM` commands on distributed tables |
# COPY
The copy command is overloaded for a couple of use-cases specific to citus. The syntax of
the command stays the same, however the implementation might slightly differ from the
stock implementation. The overloading is mostly done via extra options that Citus uses to
indicate how to operate the copy. The options used are described below.
## FORMAT transmit
Implemented in `transmit.c`
TODO: to be written by someone with enough knowledge to write how, when and why it is used.
## FORMAT result
Implemented in `multi_copy.c`
TODO: to be written by someone with enough knowledge to write how, when and why it is used.
## MASTER_HOST host
Implemented in `multi_copy.c`
Triggered by the `MASTER_HOST` option being set on the copy command. Also accepts `MASTER_PORT`
TODO: to be written by someone with enough knowledge to write how, when and why it is used.

View File

@ -0,0 +1,49 @@
/*-------------------------------------------------------------------------
*
* cluster.c
* Commands for CLUSTER statement
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
/* placeholder for PlanClusterStmt */
List *
PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand)
{
bool showPropagationWarning = false;
/* CLUSTER all */
if (clusterStmt->relation == NULL)
{
showPropagationWarning = true;
}
else
{
Oid relationId = InvalidOid;
bool missingOK = false;
relationId = RangeVarGetRelid(clusterStmt->relation, AccessShareLock,
missingOK);
if (OidIsValid(relationId))
{
showPropagationWarning = IsDistributedTable(relationId);
}
}
if (showPropagationWarning)
{
ereport(WARNING, (errmsg("not propagating CLUSTER command to worker nodes")));
}
return NIL;
}

View File

@ -31,22 +31,20 @@
#include "commands/defrem.h"
#include "commands/extension.h"
#include "commands/trigger.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/distribution_column.h"
#include "distributed/foreign_constraint.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/policy.h"
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"

View File

@ -11,10 +11,10 @@
#include "postgres.h"
#include "miscadmin.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_utility.h"
#include "distributed/worker_transaction.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
@ -112,7 +112,7 @@ master_remove_distributed_table_metadata_from_workers(PG_FUNCTION_ARGS)
/*
* MasterRemoveDistributedTableMetadataFromWorkers drops the table and removes
* all the metadata beloning the distributed table in the worker nodes
* all the metadata belonging the distributed table in the worker nodes
* with metadata. The function doesn't drop the tables that are
* the shards on the workers.
*

View File

@ -0,0 +1,117 @@
/*-------------------------------------------------------------------------
*
* extension.c
* Commands for creating and altering extensions.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "citus_version.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
/* Local functions forward declarations for helper functions */
static char * ExtractNewExtensionVersion(Node *parsetree);
/*
* IsCitusExtensionStmt returns whether a given utility is a CREATE or ALTER
* EXTENSION statement which references the citus extension. This function
* returns false for all other inputs.
*/
bool
IsCitusExtensionStmt(Node *parsetree)
{
char *extensionName = "";
if (IsA(parsetree, CreateExtensionStmt))
{
extensionName = ((CreateExtensionStmt *) parsetree)->extname;
}
else if (IsA(parsetree, AlterExtensionStmt))
{
extensionName = ((AlterExtensionStmt *) parsetree)->extname;
}
return (strcmp(extensionName, "citus") == 0);
}
/*
* ErrorIfUnstableCreateOrAlterExtensionStmt compares CITUS_EXTENSIONVERSION
* and version given CREATE/ALTER EXTENSION statement will create/update to. If
* they are not same in major or minor version numbers, this function errors
* out. It ignores the schema version.
*/
void
ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree)
{
char *newExtensionVersion = ExtractNewExtensionVersion(parsetree);
if (newExtensionVersion != NULL)
{
/* explicit version provided in CREATE or ALTER EXTENSION UPDATE; verify */
if (!MajorVersionsCompatible(newExtensionVersion, CITUS_EXTENSIONVERSION))
{
ereport(ERROR, (errmsg("specified version incompatible with loaded "
"Citus library"),
errdetail("Loaded library requires %s, but %s was specified.",
CITUS_MAJORVERSION, newExtensionVersion),
errhint("If a newer library is present, restart the database "
"and try the command again.")));
}
}
else
{
/*
* No version was specified, so PostgreSQL will use the default_version
* from the citus.control file.
*/
CheckAvailableVersion(ERROR);
}
}
/*
* ExtractNewExtensionVersion returns the new extension version specified by
* a CREATE or ALTER EXTENSION statement. Other inputs are not permitted. This
* function returns NULL for statements with no explicit version specified.
*/
static char *
ExtractNewExtensionVersion(Node *parsetree)
{
char *newVersion = NULL;
List *optionsList = NIL;
ListCell *optionsCell = NULL;
if (IsA(parsetree, CreateExtensionStmt))
{
optionsList = ((CreateExtensionStmt *) parsetree)->options;
}
else if (IsA(parsetree, AlterExtensionStmt))
{
optionsList = ((AlterExtensionStmt *) parsetree)->options;
}
else
{
/* input must be one of the two above types */
Assert(false);
}
foreach(optionsCell, optionsList)
{
DefElem *defElement = (DefElem *) lfirst(optionsCell);
if (strncmp(defElement->defname, "new_version", NAMEDATALEN) == 0)
{
newVersion = strVal(defElement->arg);
break;
}
}
return newVersion;
}

View File

@ -20,7 +20,7 @@
#endif
#include "catalog/pg_type.h"
#include "distributed/colocation_utils.h"
#include "distributed/foreign_constraint.h"
#include "distributed/commands.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_join_order.h"
#include "distributed/version_compat.h"

View File

@ -0,0 +1,19 @@
/*-------------------------------------------------------------------------
*
* grant.c
* Commands for granting access to distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "distributed/commands.h"
/* placeholder for PlanGrantStmt */
List *
PlanGrantStmt(GrantStmt *grantStmt)
{
return NIL;
}

View File

@ -0,0 +1,678 @@
/*-------------------------------------------------------------------------
*
* index.c
* Commands for creating and altering indices on distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/catalog.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/distributed_planner.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"
#include "miscadmin.h"
#include "nodes/parsenodes.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
/* Local functions forward declarations for helper functions */
static List * CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt);
static void RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid,
void *arg);
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
/*
* This struct defines the state for the callback for drop statements.
* It is copied as it is from commands/tablecmds.c in Postgres source.
*/
struct DropRelationCallbackState
{
char relkind;
Oid heapOid;
bool concurrent;
};
/*
* IsIndexRenameStmt returns whether the passed-in RenameStmt is the following
* form:
*
* - ALTER INDEX RENAME
*/
bool
IsIndexRenameStmt(RenameStmt *renameStmt)
{
bool isIndexRenameStmt = false;
if (renameStmt->renameType == OBJECT_INDEX)
{
isIndexRenameStmt = true;
}
return isIndexRenameStmt;
}
/*
* PlanIndexStmt determines whether a given CREATE INDEX statement involves
* a distributed table. If so (and if the statement does not use unsupported
* options), it modifies the input statement to ensure proper execution against
* the master node table and creates a DDLJob to encapsulate information needed
* during the worker node portion of DDL execution before returning that DDLJob
* in a List. If no distributed table is involved, this function returns NIL.
*/
List *
PlanIndexStmt(IndexStmt *createIndexStatement, const char *createIndexCommand)
{
List *ddlJobs = NIL;
/*
* We first check whether a distributed relation is affected. For that, we need to
* open the relation. To prevent race conditions with later lookups, lock the table,
* and modify the rangevar to include the schema.
*/
if (createIndexStatement->relation != NULL)
{
Relation relation = NULL;
Oid relationId = InvalidOid;
bool isDistributedRelation = false;
char *namespaceName = NULL;
LOCKMODE lockmode = ShareLock;
MemoryContext relationContext = NULL;
/*
* We don't support concurrently creating indexes for distributed
* tables, but till this point, we don't know if it is a regular or a
* distributed table.
*/
if (createIndexStatement->concurrent)
{
lockmode = ShareUpdateExclusiveLock;
}
/*
* XXX: Consider using RangeVarGetRelidExtended with a permission
* checking callback. Right now we'll acquire the lock before having
* checked permissions, and will only fail when executing the actual
* index statements.
*/
relation = heap_openrv(createIndexStatement->relation, lockmode);
relationId = RelationGetRelid(relation);
isDistributedRelation = IsDistributedTable(relationId);
/*
* Before we do any further processing, fix the schema name to make sure
* that a (distributed) table with the same name does not appear on the
* search path in front of the current schema. We do this even if the
* table is not distributed, since a distributed table may appear on the
* search path by the time postgres starts processing this statement.
*/
namespaceName = get_namespace_name(RelationGetNamespace(relation));
/* ensure we copy string into proper context */
relationContext = GetMemoryChunkContext(createIndexStatement->relation);
namespaceName = MemoryContextStrdup(relationContext, namespaceName);
createIndexStatement->relation->schemaname = namespaceName;
heap_close(relation, NoLock);
if (isDistributedRelation)
{
Oid namespaceId = InvalidOid;
Oid indexRelationId = InvalidOid;
char *indexName = createIndexStatement->idxname;
ErrorIfUnsupportedIndexStmt(createIndexStatement);
namespaceId = get_namespace_oid(namespaceName, false);
indexRelationId = get_relname_relid(indexName, namespaceId);
/* if index does not exist, send the command to workers */
if (!OidIsValid(indexRelationId))
{
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId;
ddlJob->concurrentIndexCmd = createIndexStatement->concurrent;
ddlJob->commandString = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(relationId, createIndexStatement);
ddlJobs = list_make1(ddlJob);
}
}
}
return ddlJobs;
}
/*
* PlanDropIndexStmt determines whether a given DROP INDEX statement involves
* a distributed table. If so (and if the statement does not use unsupported
* options), it modifies the input statement to ensure proper execution against
* the master node table and creates a DDLJob to encapsulate information needed
* during the worker node portion of DDL execution before returning that DDLJob
* in a List. If no distributed table is involved, this function returns NIL.
*/
List *
PlanDropIndexStmt(DropStmt *dropIndexStatement, const char *dropIndexCommand)
{
List *ddlJobs = NIL;
ListCell *dropObjectCell = NULL;
Oid distributedIndexId = InvalidOid;
Oid distributedRelationId = InvalidOid;
Assert(dropIndexStatement->removeType == OBJECT_INDEX);
/* check if any of the indexes being dropped belong to a distributed table */
foreach(dropObjectCell, dropIndexStatement->objects)
{
Oid indexId = InvalidOid;
Oid relationId = InvalidOid;
bool isDistributedRelation = false;
struct DropRelationCallbackState state;
uint32 rvrFlags = RVR_MISSING_OK;
LOCKMODE lockmode = AccessExclusiveLock;
List *objectNameList = (List *) lfirst(dropObjectCell);
RangeVar *rangeVar = makeRangeVarFromNameList(objectNameList);
/*
* We don't support concurrently dropping indexes for distributed
* tables, but till this point, we don't know if it is a regular or a
* distributed table.
*/
if (dropIndexStatement->concurrent)
{
lockmode = ShareUpdateExclusiveLock;
}
/*
* The next few statements are based on RemoveRelations() in
* commands/tablecmds.c in Postgres source.
*/
AcceptInvalidationMessages();
state.relkind = RELKIND_INDEX;
state.heapOid = InvalidOid;
state.concurrent = dropIndexStatement->concurrent;
indexId = RangeVarGetRelidInternal(rangeVar, lockmode, rvrFlags,
RangeVarCallbackForDropIndex,
(void *) &state);
/*
* If the index does not exist, we don't do anything here, and allow
* postgres to throw appropriate error or notice message later.
*/
if (!OidIsValid(indexId))
{
continue;
}
relationId = IndexGetRelation(indexId, false);
isDistributedRelation = IsDistributedTable(relationId);
if (isDistributedRelation)
{
distributedIndexId = indexId;
distributedRelationId = relationId;
break;
}
}
if (OidIsValid(distributedIndexId))
{
DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
ddlJob->targetRelationId = distributedRelationId;
ddlJob->concurrentIndexCmd = dropIndexStatement->concurrent;
ddlJob->commandString = dropIndexCommand;
ddlJob->taskList = DropIndexTaskList(distributedRelationId, distributedIndexId,
dropIndexStatement);
ddlJobs = list_make1(ddlJob);
}
return ddlJobs;
}
/*
* PostProcessIndexStmt marks new indexes invalid if they were created using the
* CONCURRENTLY flag. This (non-transactional) change provides the fallback
* state if an error is raised, otherwise a sub-sequent change to valid will be
* committed.
*/
void
PostProcessIndexStmt(IndexStmt *indexStmt)
{
Relation relation = NULL;
Oid indexRelationId = InvalidOid;
Relation indexRelation = NULL;
Relation pg_index = NULL;
HeapTuple indexTuple = NULL;
Form_pg_index indexForm = NULL;
/* we are only processing CONCURRENT index statements */
if (!indexStmt->concurrent)
{
return;
}
/* this logic only applies to the coordinator */
if (!IsCoordinator())
{
return;
}
/* commit the current transaction and start anew */
CommitTransactionCommand();
StartTransactionCommand();
/* get the affected relation and index */
relation = heap_openrv(indexStmt->relation, ShareUpdateExclusiveLock);
indexRelationId = get_relname_relid(indexStmt->idxname,
RelationGetNamespace(relation));
indexRelation = index_open(indexRelationId, RowExclusiveLock);
/* close relations but retain locks */
heap_close(relation, NoLock);
index_close(indexRelation, NoLock);
/* mark index as invalid, in-place (cannot be rolled back) */
index_set_state_flags(indexRelationId, INDEX_DROP_CLEAR_VALID);
/* re-open a transaction command from here on out */
CommitTransactionCommand();
StartTransactionCommand();
/* now, update index's validity in a way that can roll back */
pg_index = heap_open(IndexRelationId, RowExclusiveLock);
indexTuple = SearchSysCacheCopy1(INDEXRELID, ObjectIdGetDatum(indexRelationId));
Assert(HeapTupleIsValid(indexTuple)); /* better be present, we have lock! */
/* mark as valid, save, and update pg_index indexes */
indexForm = (Form_pg_index) GETSTRUCT(indexTuple);
indexForm->indisvalid = true;
CatalogTupleUpdate(pg_index, &indexTuple->t_self, indexTuple);
/* clean up; index now marked valid, but ROLLBACK will mark invalid */
heap_freetuple(indexTuple);
heap_close(pg_index, RowExclusiveLock);
}
/*
* ErrorIfUnsupportedAlterIndexStmt checks if the corresponding alter index
* statement is supported for distributed tables and errors out if it is not.
* Currently, only the following commands are supported.
*
* ALTER INDEX SET ()
* ALTER INDEX RESET ()
*/
void
ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement)
{
List *commandList = alterTableStatement->cmds;
ListCell *commandCell = NULL;
/* error out if any of the subcommands are unsupported */
foreach(commandCell, commandList)
{
AlterTableCmd *command = (AlterTableCmd *) lfirst(commandCell);
AlterTableType alterTableType = command->subtype;
switch (alterTableType)
{
case AT_SetRelOptions: /* SET (...) */
case AT_ResetRelOptions: /* RESET (...) */
case AT_ReplaceRelOptions: /* replace entire option list */
{
/* this command is supported by Citus */
break;
}
/* unsupported create index statements */
case AT_SetTableSpace:
default:
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("alter index ... set tablespace ... "
"is currently unsupported"),
errdetail("Only RENAME TO, SET (), and RESET () "
"are supported.")));
return; /* keep compiler happy */
}
}
}
}
/*
* CreateIndexTaskList builds a list of tasks to execute a CREATE INDEX command
* against a specified distributed table.
*/
static List *
CreateIndexTaskList(Oid relationId, IndexStmt *indexStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Task *task = NULL;
deparse_shard_index_statement(indexStmt, relationId, shardId, &ddlString);
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}
/*
* Before acquiring a table lock, check whether we have sufficient rights.
* In the case of DROP INDEX, also try to lock the table before the index.
*
* This code is heavily borrowed from RangeVarCallbackForDropRelation() in
* commands/tablecmds.c in Postgres source. We need this to ensure the right
* order of locking while dealing with DROP INDEX statements. Because we are
* exclusively using this callback for INDEX processing, the PARTITION-related
* logic from PostgreSQL's similar callback has been omitted as unneeded.
*/
static void
RangeVarCallbackForDropIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg)
{
/* *INDENT-OFF* */
HeapTuple tuple;
struct DropRelationCallbackState *state;
char relkind;
char expected_relkind;
Form_pg_class classform;
LOCKMODE heap_lockmode;
state = (struct DropRelationCallbackState *) arg;
relkind = state->relkind;
heap_lockmode = state->concurrent ?
ShareUpdateExclusiveLock : AccessExclusiveLock;
Assert(relkind == RELKIND_INDEX);
/*
* If we previously locked some other index's heap, and the name we're
* looking up no longer refers to that relation, release the now-useless
* lock.
*/
if (relOid != oldRelOid && OidIsValid(state->heapOid))
{
UnlockRelationOid(state->heapOid, heap_lockmode);
state->heapOid = InvalidOid;
}
/* Didn't find a relation, so no need for locking or permission checks. */
if (!OidIsValid(relOid))
return;
tuple = SearchSysCache1(RELOID, ObjectIdGetDatum(relOid));
if (!HeapTupleIsValid(tuple))
return; /* concurrently dropped, so nothing to do */
classform = (Form_pg_class) GETSTRUCT(tuple);
/*
* PG 11 sends relkind as partitioned index for an index
* on partitioned table. It is handled the same
* as regular index as far as we are concerned here.
*
* See tablecmds.c:RangeVarCallbackForDropRelation()
*/
expected_relkind = classform->relkind;
#if PG_VERSION_NUM >= 110000
if (expected_relkind == RELKIND_PARTITIONED_INDEX)
expected_relkind = RELKIND_INDEX;
#endif
if (expected_relkind != relkind)
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("\"%s\" is not an index", rel->relname)));
/* Allow DROP to either table owner or schema owner */
if (!pg_class_ownercheck(relOid, GetUserId()) &&
!pg_namespace_ownercheck(classform->relnamespace, GetUserId()))
{
aclcheck_error(ACLCHECK_NOT_OWNER, ACLCHECK_OBJECT_INDEX, rel->relname);
}
if (!allowSystemTableMods && IsSystemClass(relOid, classform))
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
errmsg("permission denied: \"%s\" is a system catalog",
rel->relname)));
ReleaseSysCache(tuple);
/*
* In DROP INDEX, attempt to acquire lock on the parent table before
* locking the index. index_drop() will need this anyway, and since
* regular queries lock tables before their indexes, we risk deadlock if
* we do it the other way around. No error if we don't find a pg_index
* entry, though --- the relation may have been dropped.
*/
if (relkind == RELKIND_INDEX && relOid != oldRelOid)
{
state->heapOid = IndexGetRelation(relOid, true);
if (OidIsValid(state->heapOid))
LockRelationOid(state->heapOid, heap_lockmode);
}
/* *INDENT-ON* */
}
/*
* ErrorIfUnsupportedIndexStmt checks if the corresponding index statement is
* supported for distributed tables and errors out if it is not.
*/
static void
ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
{
char *indexRelationName = createIndexStatement->idxname;
if (indexRelationName == NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("creating index without a name on a distributed table is "
"currently unsupported")));
}
if (createIndexStatement->tableSpace != NULL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("specifying tablespaces with CREATE INDEX statements is "
"currently unsupported")));
}
if (createIndexStatement->unique)
{
RangeVar *relation = createIndexStatement->relation;
bool missingOk = false;
/* caller uses ShareLock for non-concurrent indexes, use the same lock here */
LOCKMODE lockMode = ShareLock;
Oid relationId = RangeVarGetRelid(relation, lockMode, missingOk);
Var *partitionKey = DistPartitionKey(relationId);
char partitionMethod = PartitionMethod(relationId);
List *indexParameterList = NIL;
ListCell *indexParameterCell = NULL;
bool indexContainsPartitionColumn = false;
/*
* Reference tables do not have partition key, and unique constraints
* are allowed for them. Thus, we added a short-circuit for reference tables.
*/
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
return;
}
if (partitionMethod == DISTRIBUTE_BY_APPEND)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("creating unique indexes on append-partitioned tables "
"is currently unsupported")));
}
indexParameterList = createIndexStatement->indexParams;
foreach(indexParameterCell, indexParameterList)
{
IndexElem *indexElement = (IndexElem *) lfirst(indexParameterCell);
char *columnName = indexElement->name;
AttrNumber attributeNumber = InvalidAttrNumber;
/* column name is null for index expressions, skip it */
if (columnName == NULL)
{
continue;
}
attributeNumber = get_attnum(relationId, columnName);
if (attributeNumber == partitionKey->varattno)
{
indexContainsPartitionColumn = true;
}
}
if (!indexContainsPartitionColumn)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("creating unique indexes on non-partition "
"columns is currently unsupported")));
}
}
}
/*
* ErrorIfUnsupportedDropIndexStmt checks if the corresponding drop index statement is
* supported for distributed tables and errors out if it is not.
*/
static void
ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
{
Assert(dropIndexStatement->removeType == OBJECT_INDEX);
if (list_length(dropIndexStatement->objects) > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop multiple distributed objects in a "
"single command"),
errhint("Try dropping each object in a separate DROP "
"command.")));
}
}
/*
* DropIndexTaskList builds a list of tasks to execute a DROP INDEX command
* against a specified distributed table.
*/
static List *
DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
char *indexName = get_rel_name(indexId);
Oid schemaId = get_rel_namespace(indexId);
char *schemaName = get_namespace_name(schemaId);
StringInfoData ddlString;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
initStringInfo(&ddlString);
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
char *shardIndexName = pstrdup(indexName);
Task *task = NULL;
AppendShardIdToName(&shardIndexName, shardId);
/* deparse shard-specific DROP INDEX command */
appendStringInfo(&ddlString, "DROP INDEX %s %s %s %s",
(dropStmt->concurrent ? "CONCURRENTLY" : ""),
(dropStmt->missing_ok ? "IF EXISTS" : ""),
quote_qualified_identifier(schemaName, shardIndexName),
(dropStmt->behavior == DROP_RESTRICT ? "RESTRICT" : "CASCADE"));
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = pstrdup(ddlString.data);
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
resetStringInfo(&ddlString);
}
return taskList;
}

View File

@ -55,13 +55,17 @@
#include "access/htup_details.h"
#include "access/htup.h"
#include "access/sdir.h"
#include "access/sysattr.h"
#include "access/xact.h"
#include "catalog/namespace.h"
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_router_planner.h"
@ -73,7 +77,9 @@
#include "distributed/resource_lock.h"
#include "distributed/shard_pruning.h"
#include "distributed/version_compat.h"
#include "distributed/worker_protocol.h"
#include "executor/executor.h"
#include "foreign/foreign.h"
#include "libpq/pqformat.h"
#include "nodes/makefuncs.h"
#include "tsearch/ts_locale.h"
@ -133,6 +139,13 @@ static CopyCoercionData * ColumnCoercionPaths(TupleDesc destTupleDescriptor,
static FmgrInfo * TypeOutputFunctions(uint32 columnCount, Oid *typeIdArray,
bool binaryFormat);
static Datum CoerceColumnValue(Datum inputValue, CopyCoercionData *coercionPath);
static void CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort);
static void CheckCopyPermissions(CopyStmt *copyStatement);
static List * CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist);
static bool IsCopyResultStmt(CopyStmt *copyStatement);
static bool IsCopyFromWorker(CopyStmt *copyStatement);
static NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
static void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void CopySendData(CopyOutState outputState, const void *databuf, int datasize);
@ -161,7 +174,7 @@ PG_FUNCTION_INFO_V1(citus_text_send_as_jsonb);
* statement to related subfunctions based on where the copy command is run
* and the partition method of the distributed table.
*/
void
static void
CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
{
bool isCopyFromWorker = false;
@ -200,7 +213,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
Oid relationId = RangeVarGetRelid(copyStatement->relation, NoLock, false);
char partitionMethod = PartitionMethod(relationId);
/* disallow modifications to a partition table which have rep. factpr > 1 */
/* disallow modifications to a partition table which have rep. factor > 1 */
EnsurePartitionTableNotReplicated(relationId);
if (partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod ==
@ -226,7 +239,7 @@ CitusCopyFrom(CopyStmt *copyStatement, char *completionTag)
/*
* IsCopyFromWorker checks if the given copy statement has the master host option.
*/
bool
static bool
IsCopyFromWorker(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
@ -680,7 +693,7 @@ CopyToNewShards(CopyStmt *copyStatement, char *completionTag, Oid relationId)
* it. Note that if the master_port is not provided, we use 5432 as the default
* port.
*/
NodeAddress *
static NodeAddress *
MasterNodeAddress(CopyStmt *copyStatement)
{
NodeAddress *masterNodeAddress = (NodeAddress *) palloc0(sizeof(NodeAddress));
@ -2423,3 +2436,414 @@ CitusCopyDestReceiverDestroy(DestReceiver *destReceiver)
pfree(copyDest);
}
/*
* IsCopyResultStmt determines whether the given copy statement is a
* COPY "resultkey" FROM STDIN WITH (format result) statement, which is used
* to copy query results from the coordinator into workers.
*/
static bool
IsCopyResultStmt(CopyStmt *copyStatement)
{
ListCell *optionCell = NULL;
bool hasFormatReceive = false;
/* extract WITH (...) options from the COPY statement */
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
strncmp(defGetString(defel), "result", NAMEDATALEN) == 0)
{
hasFormatReceive = true;
break;
}
}
return hasFormatReceive;
}
/*
* ProcessCopyStmt handles Citus specific concerns for COPY like supporting
* COPYing from distributed tables and preventing unsupported actions. The
* function returns a modified COPY statement to be executed, or NULL if no
* further processing is needed.
*
* commandMustRunAsOwner is an output parameter used to communicate to the caller whether
* the copy statement should be executed using elevated privileges. If
* ProcessCopyStmt that is required, a call to CheckCopyPermissions will take
* care of verifying the current user's permissions before ProcessCopyStmt
* returns.
*/
Node *
ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustRunAsOwner)
{
*commandMustRunAsOwner = false; /* make sure variable is initialized */
/*
* Handle special COPY "resultid" FROM STDIN WITH (format result) commands
* for sending intermediate results to workers.
*/
if (IsCopyResultStmt(copyStatement))
{
const char *resultId = copyStatement->relation->relname;
ReceiveQueryResultViaCopy(resultId);
return NULL;
}
/*
* We check whether a distributed relation is affected. For that, we need to open the
* relation. To prevent race conditions with later lookups, lock the table, and modify
* the rangevar to include the schema.
*/
if (copyStatement->relation != NULL)
{
bool isDistributedRelation = false;
bool isCopyFromWorker = IsCopyFromWorker(copyStatement);
if (isCopyFromWorker)
{
RangeVar *relation = copyStatement->relation;
NodeAddress *masterNodeAddress = MasterNodeAddress(copyStatement);
char *nodeName = masterNodeAddress->nodeName;
int32 nodePort = masterNodeAddress->nodePort;
CreateLocalTable(relation, nodeName, nodePort);
/*
* We expect copy from worker to be on a distributed table; otherwise,
* it fails in CitusCopyFrom() while checking the partition method.
*/
isDistributedRelation = true;
}
else
{
bool isFrom = copyStatement->is_from;
Relation copiedRelation = NULL;
char *schemaName = NULL;
MemoryContext relationContext = NULL;
/* consider using RangeVarGetRelidExtended to check perms before locking */
copiedRelation = heap_openrv(copyStatement->relation,
isFrom ? RowExclusiveLock : AccessShareLock);
isDistributedRelation = IsDistributedTable(RelationGetRelid(copiedRelation));
/* ensure future lookups hit the same relation */
schemaName = get_namespace_name(RelationGetNamespace(copiedRelation));
/* ensure we copy string into proper context */
relationContext = GetMemoryChunkContext(copyStatement->relation);
schemaName = MemoryContextStrdup(relationContext, schemaName);
copyStatement->relation->schemaname = schemaName;
heap_close(copiedRelation, NoLock);
}
if (isDistributedRelation)
{
if (copyStatement->is_from)
{
/* check permissions, we're bypassing postgres' normal checks */
if (!isCopyFromWorker)
{
CheckCopyPermissions(copyStatement);
}
CitusCopyFrom(copyStatement, completionTag);
return NULL;
}
else if (!copyStatement->is_from)
{
/*
* The copy code only handles SELECTs in COPY ... TO on master tables,
* as that can be done non-invasively. To handle COPY master_rel TO
* the copy statement is replaced by a generated select statement.
*/
ColumnRef *allColumns = makeNode(ColumnRef);
SelectStmt *selectStmt = makeNode(SelectStmt);
ResTarget *selectTarget = makeNode(ResTarget);
allColumns->fields = list_make1(makeNode(A_Star));
allColumns->location = -1;
selectTarget->name = NULL;
selectTarget->indirection = NIL;
selectTarget->val = (Node *) allColumns;
selectTarget->location = -1;
selectStmt->targetList = list_make1(selectTarget);
selectStmt->fromClause = list_make1(copyObject(copyStatement->relation));
/* replace original statement */
copyStatement = copyObject(copyStatement);
copyStatement->relation = NULL;
copyStatement->query = (Node *) selectStmt;
}
}
}
if (copyStatement->filename != NULL && !copyStatement->is_program)
{
const char *filename = copyStatement->filename;
if (CacheDirectoryElement(filename))
{
/*
* Only superusers are allowed to copy from a file, so we have to
* become superuser to execute copies to/from files used by citus'
* query execution.
*
* XXX: This is a decidedly suboptimal solution, as that means
* that triggers, input functions, etc. run with elevated
* privileges. But this is better than not being able to run
* queries as normal user.
*/
*commandMustRunAsOwner = true;
/*
* Have to manually check permissions here as the COPY is will be
* run as a superuser.
*/
if (copyStatement->relation != NULL)
{
CheckCopyPermissions(copyStatement);
}
/*
* Check if we have a "COPY (query) TO filename". If we do, copy
* doesn't accept relative file paths. However, SQL tasks that get
* assigned to worker nodes have relative paths. We therefore
* convert relative paths to absolute ones here.
*/
if (copyStatement->relation == NULL &&
!copyStatement->is_from &&
!is_absolute_path(filename))
{
copyStatement->filename = make_absolute_path(filename);
}
}
}
return (Node *) copyStatement;
}
/*
* CreateLocalTable gets DDL commands from the remote node for the given
* relation. Then, it creates the local relation as temporary and on commit drop.
*/
static void
CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
{
List *ddlCommandList = NIL;
ListCell *ddlCommandCell = NULL;
char *relationName = relation->relname;
char *schemaName = relation->schemaname;
char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName);
/*
* The warning message created in TableDDLCommandList() is descriptive
* enough; therefore, we just throw an error which says that we could not
* run the copy operation.
*/
ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName);
if (ddlCommandList == NIL)
{
ereport(ERROR, (errmsg("could not run copy from the worker node")));
}
/* apply DDL commands against the local database */
foreach(ddlCommandCell, ddlCommandList)
{
StringInfo ddlCommand = (StringInfo) lfirst(ddlCommandCell);
Node *ddlCommandNode = ParseTreeNode(ddlCommand->data);
bool applyDDLCommand = false;
if (IsA(ddlCommandNode, CreateStmt) ||
IsA(ddlCommandNode, CreateForeignTableStmt))
{
CreateStmt *createStatement = (CreateStmt *) ddlCommandNode;
/* create the local relation as temporary and on commit drop */
createStatement->relation->relpersistence = RELPERSISTENCE_TEMP;
createStatement->oncommit = ONCOMMIT_DROP;
/* temporarily strip schema name */
createStatement->relation->schemaname = NULL;
applyDDLCommand = true;
}
else if (IsA(ddlCommandNode, CreateForeignServerStmt))
{
CreateForeignServerStmt *createServerStmt =
(CreateForeignServerStmt *) ddlCommandNode;
if (GetForeignServerByName(createServerStmt->servername, true) == NULL)
{
/* create server if not exists */
applyDDLCommand = true;
}
}
else if ((IsA(ddlCommandNode, CreateExtensionStmt)))
{
applyDDLCommand = true;
}
else if ((IsA(ddlCommandNode, CreateSeqStmt)))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot copy to table with serial column from worker"),
errhint("Connect to the master node to COPY to tables which "
"use serial column types.")));
}
/* run only a selected set of DDL commands */
if (applyDDLCommand)
{
CitusProcessUtility(ddlCommandNode, CreateCommandTag(ddlCommandNode),
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
CommandCounterIncrement();
}
}
}
/*
* Check whether the current user has the permission to execute a COPY
* statement, raise ERROR if not. In some cases we have to do this separately
* from postgres' copy.c, because we have to execute the copy with elevated
* privileges.
*
* Copied from postgres, where it's part of DoCopy().
*/
static void
CheckCopyPermissions(CopyStmt *copyStatement)
{
/* *INDENT-OFF* */
bool is_from = copyStatement->is_from;
Relation rel;
Oid relid;
List *range_table = NIL;
TupleDesc tupDesc;
AclMode required_access = (is_from ? ACL_INSERT : ACL_SELECT);
List *attnums;
ListCell *cur;
RangeTblEntry *rte;
rel = heap_openrv(copyStatement->relation,
is_from ? RowExclusiveLock : AccessShareLock);
relid = RelationGetRelid(rel);
rte = makeNode(RangeTblEntry);
rte->rtekind = RTE_RELATION;
rte->relid = relid;
rte->relkind = rel->rd_rel->relkind;
rte->requiredPerms = required_access;
range_table = list_make1(rte);
tupDesc = RelationGetDescr(rel);
attnums = CopyGetAttnums(tupDesc, rel, copyStatement->attlist);
foreach(cur, attnums)
{
int attno = lfirst_int(cur) - FirstLowInvalidHeapAttributeNumber;
if (is_from)
{
rte->insertedCols = bms_add_member(rte->insertedCols, attno);
}
else
{
rte->selectedCols = bms_add_member(rte->selectedCols, attno);
}
}
ExecCheckRTPerms(range_table, true);
/* TODO: Perform RLS checks once supported */
heap_close(rel, NoLock);
/* *INDENT-ON* */
}
/* Helper for CheckCopyPermissions(), copied from postgres */
static List *
CopyGetAttnums(TupleDesc tupDesc, Relation rel, List *attnamelist)
{
/* *INDENT-OFF* */
List *attnums = NIL;
if (attnamelist == NIL)
{
/* Generate default column list */
int attr_count = tupDesc->natts;
int i;
for (i = 0; i < attr_count; i++)
{
if (TupleDescAttr(tupDesc, i)->attisdropped)
continue;
attnums = lappend_int(attnums, i + 1);
}
}
else
{
/* Validate the user-supplied list and extract attnums */
ListCell *l;
foreach(l, attnamelist)
{
char *name = strVal(lfirst(l));
int attnum;
int i;
/* Lookup column name */
attnum = InvalidAttrNumber;
for (i = 0; i < tupDesc->natts; i++)
{
Form_pg_attribute att = TupleDescAttr(tupDesc, i);
if (att->attisdropped)
continue;
if (namestrcmp(&(att->attname), name) == 0)
{
attnum = att->attnum;
break;
}
}
if (attnum == InvalidAttrNumber)
{
if (rel != NULL)
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" of relation \"%s\" does not exist",
name, RelationGetRelationName(rel))));
else
ereport(ERROR,
(errcode(ERRCODE_UNDEFINED_COLUMN),
errmsg("column \"%s\" does not exist",
name)));
}
/* Check for duplicates */
if (list_member_int(attnums, attnum))
ereport(ERROR,
(errcode(ERRCODE_DUPLICATE_COLUMN),
errmsg("column \"%s\" specified more than once",
name)));
attnums = lappend_int(attnums, attnum);
}
}
return attnums;
/* *INDENT-ON* */
}

View File

@ -12,8 +12,8 @@
#include "catalog/namespace.h"
#include "commands/policy.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "distributed/policy.h"
#include "utils/builtins.h"

View File

@ -0,0 +1,139 @@
/*-------------------------------------------------------------------------
*
* rename.c
* Commands for renaming objects related to distributed tables
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/index.h"
#include "catalog/namespace.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
/*
* PlanRenameStmt first determines whether a given rename statement involves
* a distributed table. If so (and if it is supported, i.e. renames a column),
* it creates a DDLJob to encapsulate information needed during the worker node
* portion of DDL execution before returning that DDLJob in a List. If no dis-
* tributed table is involved, this function returns NIL.
*/
List *
PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand)
{
Oid objectRelationId = InvalidOid; /* SQL Object OID */
Oid tableRelationId = InvalidOid; /* Relation OID, maybe not the same. */
bool isDistributedRelation = false;
DDLJob *ddlJob = NULL;
/*
* We only support some of the PostgreSQL supported RENAME statements, and
* our list include only renaming table and index (related) objects.
*/
if (!IsAlterTableRenameStmt(renameStmt) &&
!IsIndexRenameStmt(renameStmt) &&
!IsPolicyRenameStmt(renameStmt))
{
return NIL;
}
/*
* The lock levels here should be same as the ones taken in
* RenameRelation(), renameatt() and RenameConstraint(). However, since all
* three statements have identical lock levels, we just use a single statement.
*/
objectRelationId = RangeVarGetRelid(renameStmt->relation,
AccessExclusiveLock,
renameStmt->missing_ok);
/*
* If the table does not exist, don't do anything here to allow PostgreSQL
* to throw the appropriate error or notice message later.
*/
if (!OidIsValid(objectRelationId))
{
return NIL;
}
/* we have no planning to do unless the table is distributed */
switch (renameStmt->renameType)
{
case OBJECT_TABLE:
case OBJECT_COLUMN:
case OBJECT_TABCONSTRAINT:
case OBJECT_POLICY:
{
/* the target object is our tableRelationId. */
tableRelationId = objectRelationId;
break;
}
case OBJECT_INDEX:
{
/*
* here, objRelationId points to the index relation entry, and we
* are interested into the entry of the table on which the index is
* defined.
*/
tableRelationId = IndexGetRelation(objectRelationId, false);
break;
}
default:
/*
* Nodes that are not supported by Citus: we pass-through to the
* main PostgreSQL executor. Any Citus-supported RenameStmt
* renameType must appear above in the switch, explicitly.
*/
return NIL;
}
isDistributedRelation = IsDistributedTable(tableRelationId);
if (!isDistributedRelation)
{
return NIL;
}
/*
* We might ERROR out on some commands, but only for Citus tables where
* isDistributedRelation is true. That's why this test comes this late in
* the function.
*/
ErrorIfUnsupportedRenameStmt(renameStmt);
ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = tableRelationId;
ddlJob->concurrentIndexCmd = false;
ddlJob->commandString = renameCommand;
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);
return list_make1(ddlJob);
}
/*
* ErrorIfDistributedRenameStmt errors out if the corresponding rename statement
* operates on any part of a distributed table other than a column.
*
* Note: This function handles RenameStmt applied to relations handed by Citus.
* At the moment of writing this comment, it could be either tables or indexes.
*/
void
ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt)
{
if (IsAlterTableRenameStmt(renameStmt) &&
renameStmt->renameType == OBJECT_TABCONSTRAINT)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("renaming constraints belonging to distributed tables is "
"currently unsupported")));
}
}

View File

@ -0,0 +1,159 @@
/*-------------------------------------------------------------------------
*
* schema.c
* Commands for creating and altering schemas for distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup.h"
#include "access/htup_details.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/relcache.h"
/* Local functions forward declarations for helper functions */
static char * GetSchemaNameFromDropObject(ListCell *dropSchemaCell);
/*
* ProcessDropSchemaStmt invalidates the foreign key cache if any table created
* under dropped schema involved in any foreign key relationship.
*/
void
ProcessDropSchemaStmt(DropStmt *dropStatement)
{
Relation pgClass = NULL;
HeapTuple heapTuple = NULL;
SysScanDesc scanDescriptor = NULL;
ScanKeyData scanKey[1];
int scanKeyCount = 1;
Oid scanIndexId = InvalidOid;
bool useIndex = false;
ListCell *dropSchemaCell;
if (dropStatement->behavior != DROP_CASCADE)
{
return;
}
foreach(dropSchemaCell, dropStatement->objects)
{
char *schemaString = GetSchemaNameFromDropObject(dropSchemaCell);
Oid namespaceOid = get_namespace_oid(schemaString, true);
if (namespaceOid == InvalidOid)
{
continue;
}
pgClass = heap_open(RelationRelationId, AccessShareLock);
ScanKeyInit(&scanKey[0], Anum_pg_class_relnamespace, BTEqualStrategyNumber,
F_OIDEQ, namespaceOid);
scanDescriptor = systable_beginscan(pgClass, scanIndexId, useIndex, NULL,
scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
while (HeapTupleIsValid(heapTuple))
{
Form_pg_class relationForm = (Form_pg_class) GETSTRUCT(heapTuple);
char *relationName = NameStr(relationForm->relname);
Oid relationId = get_relname_relid(relationName, namespaceOid);
/* we're not interested in non-valid, non-distributed relations */
if (relationId == InvalidOid || !IsDistributedTable(relationId))
{
heapTuple = systable_getnext(scanDescriptor);
continue;
}
/* invalidate foreign key cache if the table involved in any foreign key */
if (TableReferenced(relationId) || TableReferencing(relationId))
{
MarkInvalidateForeignKeyGraph();
systable_endscan(scanDescriptor);
heap_close(pgClass, NoLock);
return;
}
heapTuple = systable_getnext(scanDescriptor);
}
systable_endscan(scanDescriptor);
heap_close(pgClass, NoLock);
}
}
/*
* PlanAlterObjectSchemaStmt determines whether a given ALTER ... SET SCHEMA
* statement involves a distributed table and issues a warning if so. Because
* we do not support distributed ALTER ... SET SCHEMA, this function always
* returns NIL.
*/
List *
PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand)
{
Oid relationId = InvalidOid;
if (alterObjectSchemaStmt->relation == NULL)
{
return NIL;
}
relationId = RangeVarGetRelid(alterObjectSchemaStmt->relation,
AccessExclusiveLock,
alterObjectSchemaStmt->missing_ok);
/* first check whether a distributed relation is affected */
if (!OidIsValid(relationId) || !IsDistributedTable(relationId))
{
return NIL;
}
/* emit a warning if a distributed relation is affected */
ereport(WARNING, (errmsg("not propagating ALTER ... SET SCHEMA commands to "
"worker nodes"),
errhint("Connect to worker nodes directly to manually "
"change schemas of affected objects.")));
return NIL;
}
/*
* GetSchemaNameFromDropObject gets the name of the drop schema from given
* list cell. This function is defined due to API change between PG 9.6 and
* PG 10.
*/
static char *
GetSchemaNameFromDropObject(ListCell *dropSchemaCell)
{
char *schemaString = NULL;
#if (PG_VERSION_NUM >= 100000)
Value *schemaValue = (Value *) lfirst(dropSchemaCell);
schemaString = strVal(schemaValue);
#else
List *schemaNameList = (List *) lfirst(dropSchemaCell);
schemaString = NameListToString(schemaNameList);
#endif
return schemaString;
}

View File

@ -0,0 +1,155 @@
/*-------------------------------------------------------------------------
*
* sequence.c
* This file contains implementation of CREATE and ALTER SEQUENCE
* statement functions to run in a distributed setting
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "commands/defrem.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "nodes/parsenodes.h"
/* Local functions forward declarations for helper functions */
static bool OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId);
/*
* ErrorIfUnsupportedSeqStmt errors out if the provided create sequence
* statement specifies a distributed table in its OWNED BY clause.
*/
void
ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt)
{
Oid ownedByTableId = InvalidOid;
/* create is easy: just prohibit any distributed OWNED BY */
if (OptionsSpecifyOwnedBy(createSeqStmt->options, &ownedByTableId))
{
if (IsDistributedTable(ownedByTableId))
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot create sequences that specify a distributed "
"table in their OWNED BY option"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* ErrorIfDistributedAlterSeqOwnedBy errors out if the provided alter sequence
* statement attempts to change the owned by property of a distributed sequence
* or attempt to change a local sequence to be owned by a distributed table.
*/
void
ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt)
{
Oid sequenceId = RangeVarGetRelid(alterSeqStmt->sequence, AccessShareLock,
alterSeqStmt->missing_ok);
bool sequenceOwned = false;
Oid ownedByTableId = InvalidOid;
Oid newOwnedByTableId = InvalidOid;
int32 ownedByColumnId = 0;
bool hasDistributedOwner = false;
/* alter statement referenced nonexistent sequence; return */
if (sequenceId == InvalidOid)
{
return;
}
#if (PG_VERSION_NUM >= 100000)
sequenceOwned = sequenceIsOwned(sequenceId, DEPENDENCY_AUTO, &ownedByTableId,
&ownedByColumnId);
if (!sequenceOwned)
{
sequenceOwned = sequenceIsOwned(sequenceId, DEPENDENCY_INTERNAL, &ownedByTableId,
&ownedByColumnId);
}
#else
sequenceOwned = sequenceIsOwned(sequenceId, &ownedByTableId, &ownedByColumnId);
#endif
/* see whether the sequence is already owned by a distributed table */
if (sequenceOwned)
{
hasDistributedOwner = IsDistributedTable(ownedByTableId);
}
if (OptionsSpecifyOwnedBy(alterSeqStmt->options, &newOwnedByTableId))
{
/* if a distributed sequence tries to change owner, error */
if (hasDistributedOwner && ownedByTableId != newOwnedByTableId)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot alter OWNED BY option of a sequence "
"already owned by a distributed table")));
}
else if (!hasDistributedOwner && IsDistributedTable(newOwnedByTableId))
{
/* and don't let local sequences get a distributed OWNED BY */
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot associate an existing sequence with a "
"distributed table"),
errhint("Use a sequence in a distributed table by specifying "
"a serial column type before creating any shards.")));
}
}
}
/*
* OptionsSpecifyOwnedBy processes the options list of either a CREATE or ALTER
* SEQUENCE command, extracting the first OWNED BY option it encounters. The
* identifier for the specified table is placed in the Oid out parameter before
* returning true. Returns false if no such option is found. Still returns true
* for OWNED BY NONE, but leaves the out paramter set to InvalidOid.
*/
static bool
OptionsSpecifyOwnedBy(List *optionList, Oid *ownedByTableId)
{
ListCell *optionCell = NULL;
foreach(optionCell, optionList)
{
DefElem *defElem = (DefElem *) lfirst(optionCell);
if (strcmp(defElem->defname, "owned_by") == 0)
{
List *ownedByNames = defGetQualifiedName(defElem);
int nameCount = list_length(ownedByNames);
/* if only one name is present, this is OWNED BY NONE */
if (nameCount == 1)
{
*ownedByTableId = InvalidOid;
return true;
}
else
{
/*
* Otherwise, we have a list of schema, table, column, which we
* need to truncate to simply the schema and table to determine
* the relevant relation identifier.
*/
List *relNameList = list_truncate(list_copy(ownedByNames), nameCount - 1);
RangeVar *rangeVar = makeRangeVarFromNameList(relNameList);
bool failOK = true;
*ownedByTableId = RangeVarGetRelid(rangeVar, NoLock, failOK);
return true;
}
}
}
return false;
}

View File

@ -0,0 +1,27 @@
/*-------------------------------------------------------------------------
*
* subscription.c
* Commands for creating subscriptions
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#if (PG_VERSION_NUM >= 100000)
#include "distributed/commands.h"
#include "nodes/parsenodes.h"
/* placeholder for ProcessCreateSubscriptionStmt */
Node *
ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt)
{
return (Node *) createSubStmt;
}
#endif /* PG_VERSION_NUM >= 100000 */

File diff suppressed because it is too large Load Diff

View File

@ -15,6 +15,7 @@
#include <sys/stat.h>
#include <unistd.h>
#include "commands/defrem.h"
#include "distributed/relay_utility.h"
#include "distributed/transmit.h"
#include "distributed/worker_protocol.h"
@ -330,3 +331,83 @@ ReceiveCopyData(StringInfo copyData)
return copyDone;
}
/* Is the passed in statement a transmit statement? */
bool
IsTransmitStmt(Node *parsetree)
{
if (IsA(parsetree, CopyStmt))
{
CopyStmt *copyStatement = (CopyStmt *) parsetree;
ListCell *optionCell = NULL;
/* Extract options from the statement node tree */
foreach(optionCell, copyStatement->options)
{
DefElem *defel = (DefElem *) lfirst(optionCell);
if (strncmp(defel->defname, "format", NAMEDATALEN) == 0 &&
strncmp(defGetString(defel), "transmit", NAMEDATALEN) == 0)
{
return true;
}
}
}
return false;
}
/*
* VerifyTransmitStmt checks that the passed in command is a valid transmit
* statement. Raise ERROR if not.
*
* Note that only 'toplevel' options in the CopyStmt struct are checked, and
* that verification of the target files existance is not done here.
*/
void
VerifyTransmitStmt(CopyStmt *copyStatement)
{
char *fileName = NULL;
EnsureSuperUser();
/* do some minimal option verification */
if (copyStatement->relation == NULL ||
copyStatement->relation->relname == NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("FORMAT 'transmit' requires a target file")));
}
fileName = copyStatement->relation->relname;
if (is_absolute_path(fileName))
{
ereport(ERROR, (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("absolute path not allowed"))));
}
else if (!path_is_relative_and_below_cwd(fileName))
{
ereport(ERROR,
(errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
(errmsg("path must be in or below the current directory"))));
}
if (copyStatement->filename != NULL)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("FORMAT 'transmit' only accepts STDIN/STDOUT"
" as input/output")));
}
if (copyStatement->query != NULL ||
copyStatement->attlist != NULL ||
copyStatement->is_program)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("FORMAT 'transmit' does not accept query, attribute list"
" or PROGRAM parameters ")));
}
}

View File

@ -0,0 +1,287 @@
/*-------------------------------------------------------------------------
*
* truncate.c
* Commands for truncating distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/distributed_planner.h"
#include "distributed/listutils.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_join_order.h"
#include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_transaction.h"
#include "storage/lmgr.h"
#include "utils/lsyscache.h"
#include "utils/rel.h"
#define LOCK_RELATION_IF_EXISTS "SELECT lock_relation_if_exists('%s', '%s');"
/* Local functions forward declarations for unsupported command checks */
static void ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement);
static void ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command);
static void EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement);
static void LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement);
static void AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode);
/*
* ProcessTruncateStatement handles few things that should be
* done before standard process utility is called for truncate
* command.
*/
void
ProcessTruncateStatement(TruncateStmt *truncateStatement)
{
ErrorIfUnsupportedTruncateStmt(truncateStatement);
EnsurePartitionTableNotReplicatedForTruncate(truncateStatement);
ExecuteTruncateStmtSequentialIfNecessary(truncateStatement);
LockTruncatedRelationMetadataInWorkers(truncateStatement);
}
/*
* ErrorIfUnsupportedTruncateStmt errors out if the command attempts to
* truncate a distributed foreign table.
*/
static void
ErrorIfUnsupportedTruncateStmt(TruncateStmt *truncateStatement)
{
List *relationList = truncateStatement->relations;
ListCell *relationCell = NULL;
foreach(relationCell, relationList)
{
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, true);
char relationKind = get_rel_relkind(relationId);
if (IsDistributedTable(relationId) &&
relationKind == RELKIND_FOREIGN_TABLE)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("truncating distributed foreign tables is "
"currently unsupported"),
errhint("Use master_drop_all_shards to remove "
"foreign table's shards.")));
}
}
}
/*
* EnsurePartitionTableNotReplicatedForTruncate a simple wrapper around
* EnsurePartitionTableNotReplicated for TRUNCATE command.
*/
static void
EnsurePartitionTableNotReplicatedForTruncate(TruncateStmt *truncateStatement)
{
ListCell *relationCell = NULL;
foreach(relationCell, truncateStatement->relations)
{
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
Relation relation = heap_openrv(relationRV, NoLock);
Oid relationId = RelationGetRelid(relation);
if (!IsDistributedTable(relationId))
{
heap_close(relation, NoLock);
continue;
}
EnsurePartitionTableNotReplicated(relationId);
heap_close(relation, NoLock);
}
}
/*
* ExecuteTruncateStmtSequentialIfNecessary decides if the TRUNCATE stmt needs
* to run sequential. If so, it calls SetLocalMultiShardModifyModeToSequential().
*
* If a reference table which has a foreign key from a distributed table is truncated
* we need to execute the command sequentially to avoid self-deadlock.
*/
static void
ExecuteTruncateStmtSequentialIfNecessary(TruncateStmt *command)
{
List *relationList = command->relations;
ListCell *relationCell = NULL;
bool failOK = false;
foreach(relationCell, relationList)
{
RangeVar *rangeVar = (RangeVar *) lfirst(relationCell);
Oid relationId = RangeVarGetRelid(rangeVar, NoLock, failOK);
if (IsDistributedTable(relationId) &&
PartitionMethod(relationId) == DISTRIBUTE_BY_NONE &&
TableReferenced(relationId))
{
char *relationName = get_rel_name(relationId);
ereport(DEBUG1, (errmsg("switching to sequential query execution mode"),
errdetail(
"Reference relation \"%s\" is modified, which might lead "
"to data inconsistencies or distributed deadlocks via "
"parallel accesses to hash distributed relations due to "
"foreign keys. Any parallel modification to "
"those hash distributed relations in the same "
"transaction can only be executed in sequential query "
"execution mode", relationName)));
SetLocalMultiShardModifyModeToSequential();
/* nothing to do more */
return;
}
}
}
/*
* LockTruncatedRelationMetadataInWorkers determines if distributed
* lock is necessary for truncated relations, and acquire locks.
*
* LockTruncatedRelationMetadataInWorkers handles distributed locking
* of truncated tables before standard utility takes over.
*
* Actual distributed truncation occurs inside truncate trigger.
*
* This is only for distributed serialization of truncate commands.
* The function assumes that there is no foreign key relation between
* non-distributed and distributed relations.
*/
static void
LockTruncatedRelationMetadataInWorkers(TruncateStmt *truncateStatement)
{
List *distributedRelationList = NIL;
ListCell *relationCell = NULL;
/* nothing to do if there is no metadata at worker nodes */
if (!ClusterHasKnownMetadataWorkers())
{
return;
}
foreach(relationCell, truncateStatement->relations)
{
RangeVar *relationRV = (RangeVar *) lfirst(relationCell);
Relation relation = heap_openrv(relationRV, NoLock);
Oid relationId = RelationGetRelid(relation);
DistTableCacheEntry *cacheEntry = NULL;
List *referencingTableList = NIL;
ListCell *referencingTableCell = NULL;
if (!IsDistributedTable(relationId))
{
heap_close(relation, NoLock);
continue;
}
if (list_member_oid(distributedRelationList, relationId))
{
heap_close(relation, NoLock);
continue;
}
distributedRelationList = lappend_oid(distributedRelationList, relationId);
cacheEntry = DistributedTableCacheEntry(relationId);
Assert(cacheEntry != NULL);
referencingTableList = cacheEntry->referencingRelationsViaForeignKey;
foreach(referencingTableCell, referencingTableList)
{
Oid referencingRelationId = lfirst_oid(referencingTableCell);
distributedRelationList = list_append_unique_oid(distributedRelationList,
referencingRelationId);
}
heap_close(relation, NoLock);
}
if (distributedRelationList != NIL)
{
AcquireDistributedLockOnRelations(distributedRelationList, AccessExclusiveLock);
}
}
/*
* AcquireDistributedLockOnRelations acquire a distributed lock on worker nodes
* for given list of relations ids. Relation id list and worker node list
* sorted so that the lock is acquired in the same order regardless of which
* node it was run on. Notice that no lock is acquired on coordinator node.
*
* Notice that the locking functions is sent to all workers regardless of if
* it has metadata or not. This is because a worker node only knows itself
* and previous workers that has metadata sync turned on. The node does not
* know about other nodes that have metadata sync turned on afterwards.
*/
static void
AcquireDistributedLockOnRelations(List *relationIdList, LOCKMODE lockMode)
{
ListCell *relationIdCell = NULL;
List *workerNodeList = ActivePrimaryNodeList();
const char *lockModeText = LockModeToLockModeText(lockMode);
/*
* We want to acquire locks in the same order accross the nodes.
* Although relation ids may change, their ordering will not.
*/
relationIdList = SortList(relationIdList, CompareOids);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
BeginOrContinueCoordinatedTransaction();
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
/*
* We only acquire distributed lock on relation if
* the relation is sync'ed between mx nodes.
*/
if (ShouldSyncTableMetadata(relationId))
{
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
StringInfo lockRelationCommand = makeStringInfo();
ListCell *workerNodeCell = NULL;
appendStringInfo(lockRelationCommand, LOCK_RELATION_IF_EXISTS,
qualifiedRelationName, lockModeText);
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
int nodePort = workerNode->workerPort;
/* if local node is one of the targets, acquire the lock locally */
if (workerNode->groupId == GetLocalGroupId())
{
LockRelationOid(relationId, lockMode);
continue;
}
SendCommandToWorker(nodeName, nodePort, lockRelationCommand->data);
}
}
}
}

View File

@ -0,0 +1,801 @@
/*-------------------------------------------------------------------------
* utility_hook.c
* Citus utility hook and related functionality.
*
* The utility hook is called by PostgreSQL when processing any command
* that is not SELECT, UPDATE, DELETE, INSERT, in place of the regular
* ProcessUtility function. We use this primarily to implement (or in
* some cases prevent) DDL commands and COPY on distributed tables.
*
* For DDL commands that affect distributed tables, we check whether
* they are valid (and implemented) for the distributed table and then
* propagate the command to all shards and, in case of MX, to distributed
* tables on other nodes. We still call the original ProcessUtility
* function to apply catalog changes on the coordinator.
*
* For COPY into a distributed table, we provide an alternative
* implementation in ProcessCopyStmt that sends rows to shards based
* on their distribution column value instead of writing it to the local
* table on the coordinator. For COPY from a distributed table, we
* replace the table with a SELECT * FROM table and pass it back to
* ProcessUtility, which will plan the query via the distributed planner
* hook.
*
* Copyright (c) 2012-2018, Citus Data, Inc.
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/attnum.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "catalog/catalog.h"
#include "catalog/dependency.h"
#include "commands/dbcommands.h"
#include "commands/tablecmds.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h" /* IWYU pragma: keep */
#include "distributed/maintenanced.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"
#include "distributed/version_compat.h"
#include "distributed/worker_transaction.h"
#include "lib/stringinfo.h"
#include "tcop/utility.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/syscache.h"
bool EnableDDLPropagation = true; /* ddl propagation is enabled */
static bool shouldInvalidateForeignKeyGraph = false;
/* Local functions forward declarations for helper functions */
static void ExecuteDistributedDDLJob(DDLJob *ddlJob);
static char * SetSearchPathToCurrentSearchPathCommand(void);
static char * CurrentSearchPath(void);
static void PostProcessUtility(Node *parsetree);
/*
* multi_ProcessUtility9x is the 9.x-compatible wrapper for Citus' main utility
* hook. It simply adapts the old-style hook to call into the new-style (10+)
* hook, which is what now houses all actual logic.
*/
void
multi_ProcessUtility9x(Node *parsetree,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
DestReceiver *dest,
char *completionTag)
{
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY;
plannedStmt->utilityStmt = parsetree;
multi_ProcessUtility(plannedStmt, queryString, context, params, NULL, dest,
completionTag);
}
/*
* CitusProcessUtility is a version-aware wrapper of ProcessUtility to account
* for argument differences between the 9.x and 10+ PostgreSQL versions.
*/
void
CitusProcessUtility(Node *node, const char *queryString, ProcessUtilityContext context,
ParamListInfo params, DestReceiver *dest, char *completionTag)
{
#if (PG_VERSION_NUM >= 100000)
PlannedStmt *plannedStmt = makeNode(PlannedStmt);
plannedStmt->commandType = CMD_UTILITY;
plannedStmt->utilityStmt = node;
ProcessUtility(plannedStmt, queryString, context, params, NULL, dest,
completionTag);
#else
ProcessUtility(node, queryString, context, params, dest, completionTag);
#endif
}
/*
* multi_ProcessUtility is the main entry hook for implementing Citus-specific
* utility behavior. Its primary responsibilities are intercepting COPY and DDL
* commands and augmenting the coordinator's command with corresponding tasks
* to be run on worker nodes, after suitably ensuring said commands' options
* are fully supported by Citus. Much of the DDL behavior is toggled by Citus'
* enable_ddl_propagation GUC. In addition to DDL and COPY, utilities such as
* TRUNCATE and VACUUM are also supported.
*/
void
multi_ProcessUtility(PlannedStmt *pstmt,
const char *queryString,
ProcessUtilityContext context,
ParamListInfo params,
struct QueryEnvironment *queryEnv,
DestReceiver *dest,
char *completionTag)
{
Node *parsetree = pstmt->utilityStmt;
bool commandMustRunAsOwner = false;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
List *ddlJobs = NIL;
bool checkExtensionVersion = false;
if (IsA(parsetree, TransactionStmt))
{
/*
* Transaction statements (e.g. ABORT, COMMIT) can be run in aborted
* transactions in which case a lot of checks cannot be done safely in
* that state. Since we never need to intercept transaction statements,
* skip our checks and immediately fall into standard_ProcessUtility.
*/
#if (PG_VERSION_NUM >= 100000)
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
#endif
return;
}
checkExtensionVersion = IsCitusExtensionStmt(parsetree);
if (EnableVersionChecks && checkExtensionVersion)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
if (!CitusHasBeenLoaded())
{
/*
* Ensure that utility commands do not behave any differently until CREATE
* EXTENSION is invoked.
*/
#if (PG_VERSION_NUM >= 100000)
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
#endif
return;
}
#if (PG_VERSION_NUM >= 100000)
if (IsA(parsetree, CreateSubscriptionStmt))
{
CreateSubscriptionStmt *createSubStmt = (CreateSubscriptionStmt *) parsetree;
parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
}
#endif
#if (PG_VERSION_NUM >= 110000)
if (IsA(parsetree, CallStmt))
{
/*
* Stored procedures are a bit strange in the sense that some statements
* are not in a transaction block, but can be rolled back. We need to
* make sure we send all statements in a transaction block. The
* StoredProcedureLevel variable signals this to the router executor
* and indicates how deep in the call stack we are in case of nested
* stored procedures.
*/
StoredProcedureLevel += 1;
PG_TRY();
{
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
StoredProcedureLevel -= 1;
}
PG_CATCH();
{
StoredProcedureLevel -= 1;
PG_RE_THROW();
}
PG_END_TRY();
return;
}
#endif
/*
* TRANSMIT used to be separate command, but to avoid patching the grammar
* it's no overlaid onto COPY, but with FORMAT = 'transmit' instead of the
* normal FORMAT options.
*/
if (IsTransmitStmt(parsetree))
{
CopyStmt *copyStatement = (CopyStmt *) parsetree;
VerifyTransmitStmt(copyStatement);
/* ->relation->relname is the target file in our overloaded COPY */
if (copyStatement->is_from)
{
RedirectCopyDataToRegularFile(copyStatement->relation->relname);
}
else
{
SendRegularFile(copyStatement->relation->relname);
}
/* Don't execute the faux copy statement */
return;
}
if (IsA(parsetree, CopyStmt))
{
MemoryContext planContext = GetMemoryChunkContext(parsetree);
MemoryContext previousContext;
parsetree = copyObject(parsetree);
parsetree = ProcessCopyStmt((CopyStmt *) parsetree, completionTag,
&commandMustRunAsOwner);
previousContext = MemoryContextSwitchTo(planContext);
parsetree = copyObject(parsetree);
MemoryContextSwitchTo(previousContext);
if (parsetree == NULL)
{
return;
}
}
/* we're mostly in DDL (and VACUUM/TRUNCATE) territory at this point... */
if (IsA(parsetree, CreateSeqStmt))
{
ErrorIfUnsupportedSeqStmt((CreateSeqStmt *) parsetree);
}
if (IsA(parsetree, AlterSeqStmt))
{
ErrorIfDistributedAlterSeqOwnedBy((AlterSeqStmt *) parsetree);
}
if (IsA(parsetree, TruncateStmt))
{
ProcessTruncateStatement((TruncateStmt *) parsetree);
}
/* only generate worker DDLJobs if propagation is enabled */
if (EnableDDLPropagation)
{
if (IsA(parsetree, IndexStmt))
{
MemoryContext oldContext = MemoryContextSwitchTo(GetMemoryChunkContext(
parsetree));
/* copy parse tree since we might scribble on it to fix the schema name */
parsetree = copyObject(parsetree);
MemoryContextSwitchTo(oldContext);
ddlJobs = PlanIndexStmt((IndexStmt *) parsetree, queryString);
}
if (IsA(parsetree, DropStmt))
{
DropStmt *dropStatement = (DropStmt *) parsetree;
if (dropStatement->removeType == OBJECT_INDEX)
{
ddlJobs = PlanDropIndexStmt(dropStatement, queryString);
}
if (dropStatement->removeType == OBJECT_TABLE)
{
ProcessDropTableStmt(dropStatement);
}
if (dropStatement->removeType == OBJECT_SCHEMA)
{
ProcessDropSchemaStmt(dropStatement);
}
if (dropStatement->removeType == OBJECT_POLICY)
{
ddlJobs = PlanDropPolicyStmt(dropStatement, queryString);
}
}
if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
if (alterTableStmt->relkind == OBJECT_TABLE ||
alterTableStmt->relkind == OBJECT_INDEX)
{
ddlJobs = PlanAlterTableStmt(alterTableStmt, queryString);
}
}
/*
* ALTER TABLE ... RENAME statements have their node type as RenameStmt and
* not AlterTableStmt. So, we intercept RenameStmt to tackle these commands.
*/
if (IsA(parsetree, RenameStmt))
{
ddlJobs = PlanRenameStmt((RenameStmt *) parsetree, queryString);
}
/* handle distributed CLUSTER statements */
if (IsA(parsetree, ClusterStmt))
{
ddlJobs = PlanClusterStmt((ClusterStmt *) parsetree, queryString);
}
/*
* ALTER ... SET SCHEMA statements have their node type as AlterObjectSchemaStmt.
* So, we intercept AlterObjectSchemaStmt to tackle these commands.
*/
if (IsA(parsetree, AlterObjectSchemaStmt))
{
AlterObjectSchemaStmt *setSchemaStmt = (AlterObjectSchemaStmt *) parsetree;
ddlJobs = PlanAlterObjectSchemaStmt(setSchemaStmt, queryString);
}
if (IsA(parsetree, GrantStmt))
{
ddlJobs = PlanGrantStmt((GrantStmt *) parsetree);
}
if (IsA(parsetree, CreatePolicyStmt))
{
ddlJobs = PlanCreatePolicyStmt((CreatePolicyStmt *) parsetree);
}
if (IsA(parsetree, AlterPolicyStmt))
{
ddlJobs = PlanAlterPolicyStmt((AlterPolicyStmt *) parsetree);
}
/*
* ALTER TABLE ALL IN TABLESPACE statements have their node type as
* AlterTableMoveAllStmt. At the moment we do not support this functionality in
* the distributed environment. We warn out here.
*/
if (IsA(parsetree, AlterTableMoveAllStmt))
{
ereport(WARNING, (errmsg("not propagating ALTER TABLE ALL IN TABLESPACE "
"commands to worker nodes"),
errhint("Connect to worker nodes directly to manually "
"move all tables.")));
}
}
else
{
/*
* citus.enable_ddl_propagation is disabled, which means that PostgreSQL
* should handle the DDL command on a distributed table directly, without
* Citus intervening. The only exception is partition column drop, in
* which case we error out. Advanced Citus users use this to implement their
* own DDL propagation. We also use it to avoid re-propagating DDL commands
* when changing MX tables on workers. Below, we also make sure that DDL
* commands don't run queries that might get intercepted by Citus and error
* out, specifically we skip validation in foreign keys.
*/
if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStmt = (AlterTableStmt *) parsetree;
if (alterTableStmt->relkind == OBJECT_TABLE)
{
ErrorIfAlterDropsPartitionColumn(alterTableStmt);
/*
* When issuing an ALTER TABLE ... ADD FOREIGN KEY command, the
* the validation step should be skipped on the distributed table.
* Therefore, we check whether the given ALTER TABLE statement is a
* FOREIGN KEY constraint and if so disable the validation step.
* Note that validation is done on the shard level when DDL
* propagation is enabled. Unlike the preceeding Plan* calls, the
* following eagerly executes some tasks on workers.
*/
parsetree = WorkerProcessAlterTableStmt(alterTableStmt, queryString);
}
}
}
/* inform the user about potential caveats */
if (IsA(parsetree, CreatedbStmt))
{
ereport(NOTICE, (errmsg("Citus partially supports CREATE DATABASE for "
"distributed databases"),
errdetail("Citus does not propagate CREATE DATABASE "
"command to workers"),
errhint("You can manually create a database and its "
"extensions on workers.")));
}
else if (IsA(parsetree, CreateRoleStmt))
{
ereport(NOTICE, (errmsg("not propagating CREATE ROLE/USER commands to worker"
" nodes"),
errhint("Connect to worker nodes directly to manually create all"
" necessary users and roles.")));
}
/*
* Make sure that on DROP DATABASE we terminate the background daemon
* associated with it.
*/
if (IsA(parsetree, DropdbStmt))
{
const bool missingOK = true;
DropdbStmt *dropDbStatement = (DropdbStmt *) parsetree;
char *dbname = dropDbStatement->dbname;
Oid databaseOid = get_database_oid(dbname, missingOK);
if (OidIsValid(databaseOid))
{
StopMaintenanceDaemon(databaseOid);
}
}
/* set user if needed and go ahead and run local utility using standard hook */
if (commandMustRunAsOwner)
{
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
}
#if (PG_VERSION_NUM >= 100000)
pstmt->utilityStmt = parsetree;
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
#else
standard_ProcessUtility(parsetree, queryString, context,
params, dest, completionTag);
#endif
/*
* We only process CREATE TABLE ... PARTITION OF commands in the function below
* to handle the case when user creates a table as a partition of distributed table.
*/
if (IsA(parsetree, CreateStmt))
{
CreateStmt *createStatement = (CreateStmt *) parsetree;
ProcessCreateTableStmtPartitionOf(createStatement);
}
/*
* We only process ALTER TABLE ... ATTACH PARTITION commands in the function below
* and distribute the partition if necessary.
*/
if (IsA(parsetree, AlterTableStmt))
{
AlterTableStmt *alterTableStatement = (AlterTableStmt *) parsetree;
ProcessAlterTableStmtAttachPartition(alterTableStatement);
}
/* don't run post-process code for local commands */
if (ddlJobs != NIL)
{
PostProcessUtility(parsetree);
}
if (commandMustRunAsOwner)
{
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
}
/*
* Re-forming the foreign key graph relies on the command being executed
* on the local table first. However, in order to decide whether the
* command leads to an invalidation, we need to check before the command
* is being executed since we read pg_constraint table. Thus, we maintain a
* local flag and do the invalidation after multi_ProcessUtility,
* before ExecuteDistributedDDLJob().
*/
InvalidateForeignKeyGraphForDDL();
/* after local command has completed, finish by executing worker DDLJobs, if any */
if (ddlJobs != NIL)
{
ListCell *ddlJobCell = NULL;
if (IsA(parsetree, AlterTableStmt))
{
PostProcessAlterTableStmt(castNode(AlterTableStmt, parsetree));
}
foreach(ddlJobCell, ddlJobs)
{
DDLJob *ddlJob = (DDLJob *) lfirst(ddlJobCell);
ExecuteDistributedDDLJob(ddlJob);
}
}
/* TODO: fold VACUUM's processing into the above block */
if (IsA(parsetree, VacuumStmt))
{
VacuumStmt *vacuumStmt = (VacuumStmt *) parsetree;
ProcessVacuumStmt(vacuumStmt, queryString);
}
/*
* Ensure value is valid, we can't do some checks during CREATE
* EXTENSION. This is important to register some invalidation callbacks.
*/
CitusHasBeenLoaded();
}
/*
* ExecuteDistributedDDLJob simply executes a provided DDLJob in a distributed trans-
* action, including metadata sync if needed. If the multi shard commit protocol is
* in its default value of '1pc', then a notice message indicating that '2pc' might be
* used for extra safety. In the commit protocol, a BEGIN is sent after connection to
* each shard placement and COMMIT/ROLLBACK is handled by
* CoordinatedTransactionCallback function.
*
* The function errors out if the node is not the coordinator or if the DDL is on
* a partitioned table which has replication factor > 1.
*
*/
static void
ExecuteDistributedDDLJob(DDLJob *ddlJob)
{
bool shouldSyncMetadata = ShouldSyncTableMetadata(ddlJob->targetRelationId);
EnsureCoordinator();
EnsurePartitionTableNotReplicated(ddlJob->targetRelationId);
if (!ddlJob->concurrentIndexCmd)
{
if (shouldSyncMetadata)
{
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/*
* Given that we're relaying the query to the worker nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (setSearchPathCommand != NULL)
{
SendCommandToWorkers(WORKERS_WITH_METADATA, setSearchPathCommand);
}
SendCommandToWorkers(WORKERS_WITH_METADATA, (char *) ddlJob->commandString);
}
if (MultiShardConnectionType == SEQUENTIAL_CONNECTION ||
ddlJob->executeSequentially)
{
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
}
else
{
ExecuteModifyTasksWithoutResults(ddlJob->taskList);
}
}
else
{
/* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
PG_TRY();
{
ExecuteModifyTasksSequentiallyWithoutResults(ddlJob->taskList, CMD_UTILITY);
if (shouldSyncMetadata)
{
List *commandList = list_make1(DISABLE_DDL_PROPAGATION);
char *setSearchPathCommand = SetSearchPathToCurrentSearchPathCommand();
/*
* Given that we're relaying the query to the worker nodes directly,
* we should set the search path exactly the same when necessary.
*/
if (setSearchPathCommand != NULL)
{
commandList = lappend(commandList, setSearchPathCommand);
}
commandList = lappend(commandList, (char *) ddlJob->commandString);
SendBareCommandListToWorkers(WORKERS_WITH_METADATA, commandList);
}
}
PG_CATCH();
{
ereport(ERROR,
(errmsg("CONCURRENTLY-enabled index command failed"),
errdetail("CONCURRENTLY-enabled index commands can fail partially, "
"leaving behind an INVALID index."),
errhint("Use DROP INDEX CONCURRENTLY IF EXISTS to remove the "
"invalid index, then retry the original command.")));
}
PG_END_TRY();
}
}
/*
* SetSearchPathToCurrentSearchPathCommand generates a command which can
* set the search path to the exact same search path that the issueing node
* has.
*
* If the current search path is null (or doesn't have any valid schemas),
* the function returns NULL.
*/
static char *
SetSearchPathToCurrentSearchPathCommand(void)
{
StringInfo setCommand = NULL;
char *currentSearchPath = CurrentSearchPath();
if (currentSearchPath == NULL)
{
return NULL;
}
setCommand = makeStringInfo();
appendStringInfo(setCommand, "SET search_path TO %s;", currentSearchPath);
return setCommand->data;
}
/*
* CurrentSearchPath is a C interface for calling current_schemas(bool) that
* PostgreSQL exports.
*
* CurrentSchemas returns all the schemas in the seach_path that are seperated
* with comma (,) sign. The returned string can be used to set the search_path.
*
* The function omits implicit schemas.
*
* The function returns NULL if there are no valid schemas in the search_path,
* mimicing current_schemas(false) function.
*/
static char *
CurrentSearchPath(void)
{
StringInfo currentSearchPath = makeStringInfo();
List *searchPathList = fetch_search_path(false);
ListCell *searchPathCell;
bool schemaAdded = false;
foreach(searchPathCell, searchPathList)
{
char *schemaName = get_namespace_name(lfirst_oid(searchPathCell));
/* watch out for deleted namespace */
if (schemaName)
{
if (schemaAdded)
{
appendStringInfoString(currentSearchPath, ",");
schemaAdded = false;
}
appendStringInfoString(currentSearchPath, quote_identifier(schemaName));
schemaAdded = true;
}
}
/* fetch_search_path() returns a palloc'd list that we should free now */
list_free(searchPathList);
return (currentSearchPath->len > 0 ? currentSearchPath->data : NULL);
}
/*
* PostProcessUtility performs additional tasks after a utility's local portion
* has been completed.
*/
static void
PostProcessUtility(Node *parsetree)
{
if (IsA(parsetree, IndexStmt))
{
PostProcessIndexStmt(castNode(IndexStmt, parsetree));
}
}
/*
* MarkInvalidateForeignKeyGraph marks whether the foreign key graph should be
* invalidated due to a DDL.
*/
void
MarkInvalidateForeignKeyGraph()
{
shouldInvalidateForeignKeyGraph = true;
}
/*
* InvalidateForeignKeyGraphForDDL simply keeps track of whether
* the foreign key graph should be invalidated due to a DDL.
*/
void
InvalidateForeignKeyGraphForDDL(void)
{
if (shouldInvalidateForeignKeyGraph)
{
InvalidateForeignKeyGraph();
shouldInvalidateForeignKeyGraph = false;
}
}
/*
* DDLTaskList builds a list of tasks to execute a DDL command on a
* given list of shards.
*/
List *
DDLTaskList(Oid relationId, const char *commandString)
{
List *taskList = NIL;
List *shardIntervalList = LoadShardIntervalList(relationId);
ListCell *shardIntervalCell = NULL;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName);
char *escapedCommandString = quote_literal_cstr(commandString);
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
/* lock metadata before getting placement lists */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
StringInfo applyCommand = makeStringInfo();
Task *task = NULL;
/*
* If rightRelationId is not InvalidOid, instead of worker_apply_shard_ddl_command
* we use worker_apply_inter_shard_ddl_command.
*/
appendStringInfo(applyCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
escapedSchemaName, escapedCommandString);
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = DDL_TASK;
task->queryString = applyCommand->data;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->dependedTaskList = NULL;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
}
return taskList;
}

View File

@ -0,0 +1,341 @@
/*-------------------------------------------------------------------------
*
* vacuum.c
* Commands for vacuuming distributed tables.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "c.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_router_executor.h"
#include "distributed/resource_lock.h"
#include "distributed/transaction_management.h"
#include "distributed/version_compat.h"
#include "storage/lmgr.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
/* Local functions forward declarations for processing distributed table commands */
static bool IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList);
static List * VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList);
static StringInfo DeparseVacuumStmtPrefix(int vacuumFlags);
static char * DeparseVacuumColumnNames(List *columnNameList);
/*
* ProcessVacuumStmt processes vacuum statements that may need propagation to
* distributed tables. If a VACUUM or ANALYZE command references a distributed
* table, it is propagated to all involved nodes; otherwise, this function will
* immediately exit after some error checking.
*
* Unlike most other Process functions within this file, this function does not
* return a modified parse node, as it is expected that the local VACUUM or
* ANALYZE has already been processed.
*/
void
ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand)
{
int relationIndex = 0;
bool distributedVacuumStmt = false;
List *vacuumRelationList = ExtractVacuumTargetRels(vacuumStmt);
ListCell *vacuumRelationCell = NULL;
List *relationIdList = NIL;
ListCell *relationIdCell = NULL;
LOCKMODE lockMode = (vacuumStmt->options & VACOPT_FULL) ? AccessExclusiveLock :
ShareUpdateExclusiveLock;
int executedVacuumCount = 0;
foreach(vacuumRelationCell, vacuumRelationList)
{
RangeVar *vacuumRelation = (RangeVar *) lfirst(vacuumRelationCell);
Oid relationId = RangeVarGetRelid(vacuumRelation, lockMode, false);
relationIdList = lappend_oid(relationIdList, relationId);
}
distributedVacuumStmt = IsDistributedVacuumStmt(vacuumStmt, relationIdList);
if (!distributedVacuumStmt)
{
return;
}
/* execute vacuum on distributed tables */
foreach(relationIdCell, relationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
if (IsDistributedTable(relationId))
{
List *vacuumColumnList = NIL;
List *taskList = NIL;
/*
* VACUUM commands cannot run inside a transaction block, so we use
* the "bare" commit protocol without BEGIN/COMMIT. However, ANALYZE
* commands can run inside a transaction block. Notice that we do this
* once even if there are multiple distributed tables to be vacuumed.
*/
if (executedVacuumCount == 0 && (vacuumStmt->options & VACOPT_VACUUM) != 0)
{
/* save old commit protocol to restore at xact end */
Assert(SavedMultiShardCommitProtocol == COMMIT_PROTOCOL_BARE);
SavedMultiShardCommitProtocol = MultiShardCommitProtocol;
MultiShardCommitProtocol = COMMIT_PROTOCOL_BARE;
}
vacuumColumnList = VacuumColumnList(vacuumStmt, relationIndex);
taskList = VacuumTaskList(relationId, vacuumStmt->options, vacuumColumnList);
ExecuteModifyTasksWithoutResults(taskList);
executedVacuumCount++;
}
relationIndex++;
}
}
/*
* IsSupportedDistributedVacuumStmt returns whether distributed execution of a
* given VacuumStmt is supported. The provided relationId list represents
* the list of tables targeted by the provided statement.
*
* Returns true if the statement requires distributed execution and returns
* false otherwise.
*/
static bool
IsDistributedVacuumStmt(VacuumStmt *vacuumStmt, List *vacuumRelationIdList)
{
const char *stmtName = (vacuumStmt->options & VACOPT_VACUUM) ? "VACUUM" : "ANALYZE";
bool distributeStmt = false;
ListCell *relationIdCell = NULL;
int distributedRelationCount = 0;
int vacuumedRelationCount = 0;
/*
* No table in the vacuum statement means vacuuming all relations
* which is not supported by citus.
*/
vacuumedRelationCount = list_length(vacuumRelationIdList);
if (vacuumedRelationCount == 0)
{
/* WARN for unqualified VACUUM commands */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Provide a specific table in order to %s "
"distributed tables.", stmtName)));
}
foreach(relationIdCell, vacuumRelationIdList)
{
Oid relationId = lfirst_oid(relationIdCell);
if (OidIsValid(relationId) && IsDistributedTable(relationId))
{
distributedRelationCount++;
}
}
if (distributedRelationCount == 0)
{
/* nothing to do here */
}
else if (!EnableDDLPropagation)
{
/* WARN if DDL propagation is not enabled */
ereport(WARNING, (errmsg("not propagating %s command to worker nodes", stmtName),
errhint("Set citus.enable_ddl_propagation to true in order to "
"send targeted %s commands to worker nodes.",
stmtName)));
}
else
{
distributeStmt = true;
}
return distributeStmt;
}
/*
* VacuumTaskList returns a list of tasks to be executed as part of processing
* a VacuumStmt which targets a distributed relation.
*/
static List *
VacuumTaskList(Oid relationId, int vacuumOptions, List *vacuumColumnList)
{
List *taskList = NIL;
List *shardIntervalList = NIL;
ListCell *shardIntervalCell = NULL;
uint64 jobId = INVALID_JOB_ID;
int taskId = 1;
StringInfo vacuumString = DeparseVacuumStmtPrefix(vacuumOptions);
const char *columnNames = NULL;
const int vacuumPrefixLen = vacuumString->len;
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *tableName = get_rel_name(relationId);
columnNames = DeparseVacuumColumnNames(vacuumColumnList);
/*
* We obtain ShareUpdateExclusiveLock here to not conflict with INSERT's
* RowExclusiveLock. However if VACUUM FULL is used, we already obtain
* AccessExclusiveLock before reaching to that point and INSERT's will be
* blocked anyway. This is inline with PostgreSQL's own behaviour.
*/
LockRelationOid(relationId, ShareUpdateExclusiveLock);
shardIntervalList = LoadShardIntervalList(relationId);
/* grab shard lock before getting placement list */
LockShardListMetadata(shardIntervalList, ShareLock);
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Task *task = NULL;
char *shardName = pstrdup(tableName);
AppendShardIdToName(&shardName, shardInterval->shardId);
shardName = quote_qualified_identifier(schemaName, shardName);
vacuumString->len = vacuumPrefixLen;
appendStringInfoString(vacuumString, shardName);
appendStringInfoString(vacuumString, columnNames);
task = CitusMakeNode(Task);
task->jobId = jobId;
task->taskId = taskId++;
task->taskType = VACUUM_ANALYZE_TASK;
task->queryString = pstrdup(vacuumString->data);
task->dependedTaskList = NULL;
task->replicationModel = REPLICATION_MODEL_INVALID;
task->anchorShardId = shardId;
task->taskPlacementList = FinalizedShardPlacementList(shardId);
taskList = lappend(taskList, task);
}
return taskList;
}
/*
* DeparseVacuumStmtPrefix returns a StringInfo appropriate for use as a prefix
* during distributed execution of a VACUUM or ANALYZE statement. Callers may
* reuse this prefix within a loop to generate shard-specific VACUUM or ANALYZE
* statements.
*/
static StringInfo
DeparseVacuumStmtPrefix(int vacuumFlags)
{
StringInfo vacuumPrefix = makeStringInfo();
const int unsupportedFlags PG_USED_FOR_ASSERTS_ONLY = ~(
VACOPT_ANALYZE |
VACOPT_DISABLE_PAGE_SKIPPING |
VACOPT_FREEZE |
VACOPT_FULL |
VACOPT_VERBOSE
);
/* determine actual command and block out its bit */
if (vacuumFlags & VACOPT_VACUUM)
{
appendStringInfoString(vacuumPrefix, "VACUUM ");
vacuumFlags &= ~VACOPT_VACUUM;
}
else
{
appendStringInfoString(vacuumPrefix, "ANALYZE ");
vacuumFlags &= ~VACOPT_ANALYZE;
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE ");
vacuumFlags &= ~VACOPT_VERBOSE;
}
}
/* unsupported flags should have already been rejected */
Assert((vacuumFlags & unsupportedFlags) == 0);
/* if no flags remain, exit early */
if (vacuumFlags == 0)
{
return vacuumPrefix;
}
/* otherwise, handle options */
appendStringInfoChar(vacuumPrefix, '(');
if (vacuumFlags & VACOPT_ANALYZE)
{
appendStringInfoString(vacuumPrefix, "ANALYZE,");
}
if (vacuumFlags & VACOPT_DISABLE_PAGE_SKIPPING)
{
appendStringInfoString(vacuumPrefix, "DISABLE_PAGE_SKIPPING,");
}
if (vacuumFlags & VACOPT_FREEZE)
{
appendStringInfoString(vacuumPrefix, "FREEZE,");
}
if (vacuumFlags & VACOPT_FULL)
{
appendStringInfoString(vacuumPrefix, "FULL,");
}
if (vacuumFlags & VACOPT_VERBOSE)
{
appendStringInfoString(vacuumPrefix, "VERBOSE,");
}
vacuumPrefix->data[vacuumPrefix->len - 1] = ')';
appendStringInfoChar(vacuumPrefix, ' ');
return vacuumPrefix;
}
/*
* DeparseVacuumColumnNames joins the list of strings using commas as a
* delimiter. The whole thing is placed in parenthesis and set off with a
* single space in order to facilitate appending it to the end of any VACUUM
* or ANALYZE command which uses explicit column names. If the provided list
* is empty, this function returns an empty string to keep the calling code
* simplest.
*/
static char *
DeparseVacuumColumnNames(List *columnNameList)
{
StringInfo columnNames = makeStringInfo();
ListCell *columnNameCell = NULL;
if (columnNameList == NIL)
{
return columnNames->data;
}
appendStringInfoString(columnNames, " (");
foreach(columnNameCell, columnNameList)
{
char *columnName = strVal(lfirst(columnNameCell));
appendStringInfo(columnNames, "%s,", columnName);
}
columnNames->data[columnNames->len - 1] = ')';
return columnNames->data;
}

View File

@ -10,9 +10,9 @@
#include "postgres.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_physical_planner.h"

View File

@ -18,11 +18,11 @@
#include "catalog/pg_enum.h"
#include "commands/copy.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/connection_management.h"
#include "distributed/intermediate_results.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/remote_commands.h"
#include "distributed/transmit.h"

View File

@ -16,9 +16,9 @@
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "distributed/citus_custom_scan.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/insert_select_executor.h"
#include "distributed/insert_select_planner.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_executor.h"
#include "distributed/multi_master_planner.h"
#include "distributed/distributed_planner.h"
@ -26,7 +26,6 @@
#include "distributed/multi_router_planner.h"
#include "distributed/multi_resowner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/resource_lock.h"
#include "distributed/worker_protocol.h"
#include "executor/execdebug.h"

File diff suppressed because it is too large Load Diff

View File

@ -25,6 +25,7 @@
#include "access/xact.h"
#include "catalog/namespace.h"
#include "commands/dbcommands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
@ -33,7 +34,6 @@
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/placement_connection.h"

View File

@ -25,7 +25,7 @@
#include "commands/event_trigger.h"
#include "distributed/citus_clauses.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/foreign_constraint.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"

View File

@ -41,12 +41,12 @@
#include "catalog/pg_namespace.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/policy.h"
#include "distributed/worker_manager.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"

View File

@ -19,8 +19,8 @@
#include "catalog/pg_class.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/foreign_constraint.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"

View File

@ -29,9 +29,9 @@
#endif
#include "distributed/citus_ruleutils.h"
#include "distributed/colocation_utils.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_planner.h"
#include "distributed/foreign_constraint.h"
#include "distributed/multi_client_executor.h"
#include "distributed/multi_router_executor.h"
#include "distributed/master_metadata_utility.h"

View File

@ -15,9 +15,10 @@
#include "commands/tablecmds.h"
#include "commands/trigger.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"

View File

@ -28,8 +28,8 @@
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands.h"
#include "distributed/distribution_column.h"
#include "distributed/foreign_constraint.h"
#include "distributed/listutils.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"

View File

@ -55,10 +55,10 @@
#include "catalog/pg_class.h"
#include "distributed/citus_nodes.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/distributed_planner.h"
#include "distributed/errormessage.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_logical_planner.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_physical_planner.h"

View File

@ -29,8 +29,8 @@
#include "catalog/namespace.h"
#include "catalog/pg_class.h"
#include "catalog/pg_constraint.h"
#include "distributed/commands.h"
#include "distributed/metadata_cache.h"
#include "distributed/policy.h"
#include "distributed/relay_utility.h"
#include "distributed/version_compat.h"
#include "lib/stringinfo.h"

View File

@ -21,13 +21,14 @@
#include "executor/executor.h"
#include "distributed/backend_data.h"
#include "distributed/citus_nodefuncs.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/distributed_deadlock_detection.h"
#include "distributed/maintenanced.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_copy.h"
#include "distributed/multi_explain.h"
#include "distributed/multi_join_order.h"
#include "distributed/multi_logical_optimizer.h"
@ -35,7 +36,6 @@
#include "distributed/multi_router_executor.h"
#include "distributed/multi_router_planner.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/pg_dist_partition.h"
#include "distributed/placement_connection.h"
#include "distributed/relation_access_tracking.h"

View File

@ -1308,3 +1308,43 @@ simple_quote_literal(StringInfo buf, const char *val)
}
appendStringInfoChar(buf, '\'');
}
/*
* RoleSpecString resolves the role specification to its string form that is suitable for transport to a worker node.
* This function resolves the following identifiers from the current context so they are safe to transfer.
*
* CURRENT_USER - resolved to the user name of the current role being used
* SESSION_USER - resolved to the user name of the user that opened the session
*/
const char *
RoleSpecString(RoleSpec *spec)
{
switch (spec->roletype)
{
case ROLESPEC_CSTRING:
{
return quote_identifier(spec->rolename);
}
case ROLESPEC_CURRENT_USER:
{
return quote_identifier(GetUserNameFromId(GetUserId(), false));
}
case ROLESPEC_SESSION_USER:
{
return quote_identifier(GetUserNameFromId(GetSessionUserId(), false));
}
case ROLESPEC_PUBLIC:
{
return "PUBLIC";
}
default:
{
elog(ERROR, "unexpected role type %d", spec->roletype);
}
}
}

View File

@ -16,7 +16,7 @@
#include "access/htup_details.h"
#include "access/genam.h"
#include "distributed/colocation_utils.h"
#include "distributed/foreign_constraint.h"
#include "distributed/commands.h"
#include "distributed/listutils.h"
#include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h"

View File

@ -27,6 +27,7 @@
#include "commands/extension.h"
#include "commands/sequence.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/connection_management.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
@ -34,7 +35,6 @@
#include "distributed/multi_logical_optimizer.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/multi_server_executor.h"
#include "distributed/multi_utility.h"
#include "distributed/relay_utility.h"
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"

View File

@ -33,7 +33,7 @@
#include "catalog/pg_type.h"
#include "commands/copy.h"
#include "commands/defrem.h"
#include "distributed/multi_copy.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/multi_physical_planner.h"
#include "distributed/resource_lock.h"
#include "distributed/transmit.h"

View File

@ -42,6 +42,7 @@ extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId);
extern bool contain_nextval_expression_walker(Node *node, void *context);
extern char * pg_get_replica_identity_command(Oid tableRelationId);
extern const char * RoleSpecString(RoleSpec *spec);
/* Function declarations for version dependent PostgreSQL ruleutils functions */
extern void pg_get_query_def(Query *query, StringInfo buffer);

View File

@ -0,0 +1,127 @@
/*-------------------------------------------------------------------------
*
* commands.h
* Declarations for public functions and variables used for all commands
* and DDL operations for citus. All declarations are grouped by the
* file that implements them.
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_COMMANDS_H
#define CITUS_COMMANDS_H
#include "postgres.h"
#include "utils/rel.h"
#include "nodes/parsenodes.h"
/* cluster.c - forward declarations */
extern List * PlanClusterStmt(ClusterStmt *clusterStmt, const char *clusterCommand);
/* extension.c - forward declarations */
extern bool IsCitusExtensionStmt(Node *parsetree);
extern void ErrorIfUnstableCreateOrAlterExtensionStmt(Node *parsetree);
/* foreign_constraint.c - forward declarations */
extern bool ConstraintIsAForeignKeyToReferenceTable(char *constraintName,
Oid leftRelationId);
extern void ErrorIfUnsupportedForeignConstraint(Relation relation, char
distributionMethod,
Var *distributionColumn, uint32
colocationId);
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId);
extern bool HasForeignKeyToReferenceTable(Oid relationId);
extern bool TableReferenced(Oid relationId);
extern bool TableReferencing(Oid relationId);
extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId);
/* grant.c - forward declarations */
extern List * PlanGrantStmt(GrantStmt *grantStmt);
/* index.c - forward declarations */
extern bool IsIndexRenameStmt(RenameStmt *renameStmt);
extern List * PlanIndexStmt(IndexStmt *createIndexStatement,
const char *createIndexCommand);
extern List * PlanDropIndexStmt(DropStmt *dropIndexStatement,
const char *dropIndexCommand);
extern void PostProcessIndexStmt(IndexStmt *indexStmt);
extern void ErrorIfUnsupportedAlterIndexStmt(AlterTableStmt *alterTableStatement);
/* policy.c - forward declarations */
extern List * CreatePolicyCommands(Oid relationId);
extern void ErrorIfUnsupportedPolicy(Relation relation);
extern void ErrorIfUnsupportedPolicyExpr(Node *expr);
extern List * PlanCreatePolicyStmt(CreatePolicyStmt *stmt);
extern List * PlanAlterPolicyStmt(AlterPolicyStmt *stmt);
extern List * PlanDropPolicyStmt(DropStmt *stmt, const char *queryString);
extern bool IsPolicyRenameStmt(RenameStmt *stmt);
extern void CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64
shardId);
extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, uint64
shardId);
/* rename.c - forward declarations*/
extern List * PlanRenameStmt(RenameStmt *renameStmt, const char *renameCommand);
extern void ErrorIfUnsupportedRenameStmt(RenameStmt *renameStmt);
/* schema.c - forward declarations */
extern void ProcessDropSchemaStmt(DropStmt *dropSchemaStatement);
extern List * PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
const char *alterObjectSchemaCommand);
/* sequence.c - forward declarations */
extern void ErrorIfUnsupportedSeqStmt(CreateSeqStmt *createSeqStmt);
extern void ErrorIfDistributedAlterSeqOwnedBy(AlterSeqStmt *alterSeqStmt);
#if (PG_VERSION_NUM >= 100000)
/* subscription.c - forward declarations */
extern Node * ProcessCreateSubscriptionStmt(CreateSubscriptionStmt *createSubStmt);
#endif /* PG_VERSION_NUM >= 100000 */
/* table.c - forward declarations */
extern void ProcessDropTableStmt(DropStmt *dropTableStatement);
extern void ProcessCreateTableStmtPartitionOf(CreateStmt *createStatement);
extern void ProcessAlterTableStmtAttachPartition(AlterTableStmt *alterTableStatement);
extern List * PlanAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand);
extern Node * WorkerProcessAlterTableStmt(AlterTableStmt *alterTableStatement,
const char *alterTableCommand);
extern bool IsAlterTableRenameStmt(RenameStmt *renameStmt);
extern void ErrorIfAlterDropsPartitionColumn(AlterTableStmt *alterTableStatement);
extern void PostProcessAlterTableStmt(AlterTableStmt *pStmt);
extern void ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *command,
Constraint *constraint);
extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
/* truncate.c - forward declarations */
extern void ProcessTruncateStatement(TruncateStmt *truncateStatement);
/* vacuum.c - froward declarations */
extern void ProcessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
#endif /*CITUS_COMMANDS_H */

View File

@ -128,9 +128,8 @@ extern void AppendCopyRowData(Datum *valueArray, bool *isNullArray,
extern void AppendCopyBinaryHeaders(CopyOutState headerOutputState);
extern void AppendCopyBinaryFooters(CopyOutState footerOutputState);
extern void EndRemoteCopy(int64 shardId, List *connectionList, bool stopOnFailure);
extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
extern Node * ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag,
bool *commandMustRunAsOwner);
#endif /* MULTI_COPY_H */

View File

@ -1,6 +1,6 @@
/*-------------------------------------------------------------------------
*
* multi_utility.h
* utility_hook.h
* Citus utility hook and related functionality.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
@ -10,10 +10,14 @@
#ifndef MULTI_UTILITY_H
#define MULTI_UTILITY_H
#include "postgres.h"
#include "distributed/version_compat.h"
#include "utils/relcache.h"
#include "tcop/utility.h"
extern bool EnableDDLPropagation;
extern bool EnableVersionChecks;
/*
* A DDLJob encapsulates the remote tasks and commands needed to process all or
@ -30,9 +34,6 @@ typedef struct DDLJob
List *taskList; /* worker DDL tasks to execute */
} DDLJob;
#if (PG_VERSION_NUM < 100000)
struct QueryEnvironment; /* forward-declare to appease compiler */
#endif
extern void multi_ProcessUtility(PlannedStmt *pstmt, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
@ -44,14 +45,8 @@ extern void multi_ProcessUtility9x(Node *parsetree, const char *queryString,
extern void CitusProcessUtility(Node *node, const char *queryString,
ProcessUtilityContext context, ParamListInfo params,
DestReceiver *dest, char *completionTag);
extern List * PlanGrantStmt(GrantStmt *grantStmt);
extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
Var *distributionColumn, uint32 colocationId);
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
extern void MarkInvalidateForeignKeyGraph(void);
extern void InvalidateForeignKeyGraphForDDL(void);
extern List * DDLTaskList(Oid relationId, const char *commandString);
extern const char * RoleSpecString(RoleSpec *spec);
#endif /* MULTI_UTILITY_H */

View File

@ -1,32 +0,0 @@
/*-------------------------------------------------------------------------
* foreign_constraint.h
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef FOREIGN_CONSTRAINT_H
#define FOREIGN_CONSTRAINT_H
#include "postgres.h"
#include "postgres_ext.h"
#include "utils/relcache.h"
#include "utils/hsearch.h"
#include "nodes/primnodes.h"
extern bool ConstraintIsAForeignKeyToReferenceTable(char *constraintName,
Oid leftRelationId);
extern void ErrorIfUnsupportedForeignConstraint(Relation relation, char
distributionMethod,
Var *distributionColumn, uint32
colocationId);
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId);
extern bool HasForeignKeyToReferenceTable(Oid relationId);
extern bool TableReferenced(Oid relationId);
extern bool TableReferencing(Oid relationId);
extern bool ConstraintIsAForeignKey(char *constraintName, Oid relationId);
#endif

View File

@ -14,7 +14,7 @@
#include "fmgr.h"
#include "distributed/multi_copy.h"
#include "distributed/commands/multi_copy.h"
#include "nodes/execnodes.h"
#include "nodes/pg_list.h"
#include "tcop/dest.h"

View File

@ -1,32 +0,0 @@
/*-------------------------------------------------------------------------
* policy.h
*
* Copyright (c) 2018, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_POLICY_H
#define CITUS_POLICY_H
#include "utils/rel.h"
extern List * CreatePolicyCommands(Oid relationId);
extern void ErrorIfUnsupportedPolicy(Relation relation);
extern void ErrorIfUnsupportedPolicyExpr(Node *expr);
extern List * PlanCreatePolicyStmt(CreatePolicyStmt *stmt);
extern List * PlanAlterPolicyStmt(AlterPolicyStmt *stmt);
extern List * PlanDropPolicyStmt(DropStmt *stmt, const char *queryString);
extern bool IsPolicyRenameStmt(RenameStmt *stmt);
extern void CreatePolicyEventExtendNames(CreatePolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void AlterPolicyEventExtendNames(AlterPolicyStmt *stmt, const char *schemaName,
uint64 shardId);
extern void RenamePolicyEventExtendNames(RenameStmt *stmt, const char *schemaName, uint64
shardId);
extern void DropPolicyEventExtendNames(DropStmt *stmt, const char *schemaName, uint64
shardId);
#endif /*CITUS_POLICY_H */

View File

@ -11,7 +11,10 @@
#ifndef TRANSMIT_H
#define TRANSMIT_H
#include "c.h"
#include "lib/stringinfo.h"
#include "nodes/parsenodes.h"
#include "storage/fd.h"
@ -23,5 +26,9 @@ extern File FileOpenForTransmit(const char *filename, int fileFlags, int fileMod
/* Function declaration local to commands and worker modules */
extern void FreeStringInfo(StringInfo stringInfo);
/* Local functions forward declarations for Transmit statement */
extern bool IsTransmitStmt(Node *parsetree);
extern void VerifyTransmitStmt(CopyStmt *copyStatement);
#endif /* TRANSMIT_H */

View File

@ -24,11 +24,16 @@
#endif
#if (PG_VERSION_NUM < 100000)
struct QueryEnvironment; /* forward-declare to appease compiler */
#endif
#if (PG_VERSION_NUM >= 90600 && PG_VERSION_NUM < 110000)
#include "access/hash.h"
#include "storage/fd.h"
#include "optimizer/prep.h"
#include "postmaster/bgworker.h"
#include "utils/memutils.h"
/* PostgreSQL 11 splits hash procs into "standard" and "extended" */