/*------------------------------------------------------------------------- * * delete_protocol.c * * Routine for deleting shards in the distributed cluster. This function takes * in a delete command and deletes a shard if and only if all rows in the shard * satisfy the conditions in the delete command. * * Copyright (c) Citus Data, Inc. * * $Id$ * *------------------------------------------------------------------------- */ #include "postgres.h" #include "distributed/pg_version_constants.h" #include "c.h" #include "fmgr.h" #include "libpq-fe.h" #include "miscadmin.h" #include "port.h" #include #include "access/xact.h" #include "catalog/namespace.h" #include "commands/dbcommands.h" #include "distributed/commands/utility_hook.h" #include "distributed/connection_management.h" #include "distributed/deparse_shard_query.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_sync.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_planner.h" #include "distributed/multi_physical_planner.h" #include "distributed/multi_server_executor.h" #include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_shard.h" #include "distributed/placement_connection.h" #include "distributed/relay_utility.h" #include "distributed/remote_commands.h" #include "distributed/shard_cleaner.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" #include "lib/stringinfo.h" #include "nodes/nodeFuncs.h" #include "nodes/nodes.h" #include "nodes/parsenodes.h" #include "nodes/pg_list.h" #include "nodes/primnodes.h" #include "optimizer/clauses.h" #include "nodes/pathnodes.h" #include "optimizer/optimizer.h" #include "optimizer/restrictinfo.h" #include "storage/lock.h" #include "storage/lmgr.h" #include "tcop/tcopprot.h" #include "utils/array.h" #include "utils/builtins.h" #include "utils/elog.h" #include "utils/errcodes.h" #include "utils/lsyscache.h" #include "utils/varlena.h" /* Local functions forward declarations */ static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList, bool dropShardsMetadataOnly); static List * DropTaskList(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList); static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, const char *shardRelationName, const char *dropShardPlacementCommand); static char * CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName, char storageType); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(master_apply_delete_command); PG_FUNCTION_INFO_V1(citus_drop_all_shards); PG_FUNCTION_INFO_V1(master_drop_all_shards); PG_FUNCTION_INFO_V1(master_drop_sequences); /* * master_apply_delete_command is a deprecated function for dropping shards * in an append-distributed tables. */ Datum master_apply_delete_command(PG_FUNCTION_ARGS) { ereport(ERROR, (errmsg("master_apply_delete_command has been deprecated"))); } /* * citus_drop_all_shards attempts to drop all shards for a given relation. * This function can be called even if the table has already been dropped. * In that case, the schema name and relation name arguments are used to * determine that table name. Otherwise, the relation ID is used and the * other arguments are ignored. */ Datum citus_drop_all_shards(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); bool dropShardsMetadataOnly = PG_GETARG_BOOL(3); char *schemaName = text_to_cstring(schemaNameText); char *relationName = text_to_cstring(relationNameText); /* * The SQL_DROP trigger calls this function even for tables that are * not distributed. In that case, silently ignore and return -1. */ if (!IsCitusTableViaCatalog(relationId) || !EnableDDLPropagation) { PG_RETURN_INT32(-1); } EnsureCoordinator(); CheckTableSchemaNameForDrop(relationId, &schemaName, &relationName); /* * citus_drop_all_shards is typically called from the DROP TABLE trigger, * but could be called by a user directly. Make sure we have an * AccessExclusiveLock to prevent any other commands from running on this table * concurrently. */ LockRelationOid(relationId, AccessExclusiveLock); List *shardIntervalList = LoadUnsortedShardIntervalListViaCatalog(relationId); int droppedShardCount = DropShards(relationId, schemaName, relationName, shardIntervalList, dropShardsMetadataOnly); PG_RETURN_INT32(droppedShardCount); } /* * master_drop_all_shards is a wrapper function for old UDF name. */ Datum master_drop_all_shards(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); text *schemaNameText = PG_GETARG_TEXT_P(1); text *relationNameText = PG_GETARG_TEXT_P(2); bool dropShardsMetadataOnly = false; LOCAL_FCINFO(local_fcinfo, 4); InitFunctionCallInfoData(*local_fcinfo, NULL, 4, InvalidOid, NULL, NULL); local_fcinfo->args[0].value = ObjectIdGetDatum(relationId); local_fcinfo->args[0].isnull = false; local_fcinfo->args[1].value = PointerGetDatum(schemaNameText); local_fcinfo->args[1].isnull = false; local_fcinfo->args[2].value = PointerGetDatum(relationNameText); local_fcinfo->args[2].isnull = false; local_fcinfo->args[3].value = BoolGetDatum(dropShardsMetadataOnly); local_fcinfo->args[3].isnull = false; return citus_drop_all_shards(local_fcinfo); } /* * master_drop_sequences was previously used to drop sequences on workers * when using metadata syncing. * * It may still be called when dropping objects during CREATE EXTENSION, * hence the function remains in place. */ Datum master_drop_sequences(PG_FUNCTION_ARGS) { PG_RETURN_VOID(); } /* * CheckTableSchemaNameForDrop errors out if the current user does not * have permission to un-distribute the given relation, taking into * account that it may be called from the drop trigger. If the table exists, * the function rewrites the given table and schema name. */ void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, char **tableName) { char *tempTableName = get_rel_name(relationId); if (tempTableName != NULL) { /* ensure proper values are used if the table exists */ Oid schemaId = get_rel_namespace(relationId); (*schemaName) = get_namespace_name(schemaId); (*tableName) = tempTableName; EnsureTableOwner(relationId); } } /* * DropShards drops all given shards in a relation. The id, name and schema * for the relation are explicitly provided, since this function may be * called when the table is already dropped. * * We mark shard placements that we couldn't drop as to be deleted later, but * we do delete the shard metadadata. * * If dropShardsMetadataOnly is true, then we don't send remote commands to drop the shards: * we only remove pg_dist_placement and pg_dist_shard rows. */ static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList, bool dropShardsMetadataOnly) { Assert(OidIsValid(relationId)); Assert(schemaName != NULL); Assert(relationName != NULL); UseCoordinatedTransaction(); /* * We will use below variable across this function to decide if we can * use local execution */ int32 localGroupId = GetLocalGroupId(); /* DROP table commands are currently only supported from the coordinator */ Assert(localGroupId == COORDINATOR_GROUP_ID); Use2PCForCoordinatedTransaction(); List *dropTaskList = DropTaskList(relationId, schemaName, relationName, deletableShardIntervalList); bool shouldExecuteTasksLocally = ShouldExecuteTasksLocally(dropTaskList); Task *task = NULL; foreach_ptr(task, dropTaskList) { uint64 shardId = task->anchorShardId; ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, task->taskPlacementList) { uint64 shardPlacementId = shardPlacement->placementId; int32 shardPlacementGroupId = shardPlacement->groupId; bool isLocalShardPlacement = (shardPlacementGroupId == localGroupId); /* * If this variable is true, that means the active DROP SCHEMA/DATABASE ... CASCADE * will drop the shard. If we try to drop it over another connection, we will * get into a distributed deadlock. Hence, if this variable is true we should just * delete the shard placement metadata and skip dropping the shard for now. */ bool skipIfDropSchemaOrDBInProgress = isLocalShardPlacement && DropSchemaOrDBInProgress() && localGroupId == COORDINATOR_GROUP_ID; /* * We want to send commands to drop shards when both * skipIfDropSchemaOrDBInProgress and dropShardsMetadataOnly are false. */ bool applyRemoteShardsDrop = !skipIfDropSchemaOrDBInProgress && !dropShardsMetadataOnly; if (applyRemoteShardsDrop) { /* * If it is a local placement of a distributed table or a reference table, * then execute the DROP command locally. */ if (isLocalShardPlacement && shouldExecuteTasksLocally) { List *singleTaskList = list_make1(task); ExecuteLocalUtilityTaskList(singleTaskList); } else { /* * Either it was not a local placement or we could not use * local execution even if it was a local placement. * If it is the second case, then it is possibly because in * current transaction, some commands or queries connected * to local group as well. * * Regardless of the node is a remote node or the current node, * try to open a new connection (or use an existing one) to * connect to that node to drop the shard placement over that * remote connection. */ const char *dropShardPlacementCommand = TaskQueryString(task); ExecuteDropShardPlacementCommandRemotely(shardPlacement, relationName, dropShardPlacementCommand); if (isLocalShardPlacement) { SetLocalExecutionStatus(LOCAL_EXECUTION_DISABLED); } } } DeleteShardPlacementRow(shardPlacementId); } /* * Now that we deleted all placements of the shard (or their metadata), * delete the shard metadata as well. */ DeleteShardRow(shardId); } int droppedShardCount = list_length(deletableShardIntervalList); return droppedShardCount; } /* * DropTaskList returns a list of tasks to execute a DROP command on shard * placements of distributed table. This is handled separately from other * DDL commands because we handle it via the DROP trigger, which is called * whenever a drop cascades. */ static List * DropTaskList(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList) { /* resulting task list */ List *taskList = NIL; /* enumerate the tasks when putting them to the taskList */ int taskId = 1; ShardInterval *shardInterval = NULL; foreach_ptr(shardInterval, deletableShardIntervalList) { Assert(shardInterval->relationId == relationId); uint64 shardId = shardInterval->shardId; char storageType = shardInterval->storageType; char *shardRelationName = pstrdup(relationName); /* build shard relation name */ AppendShardIdToName(&shardRelationName, shardId); char *dropShardPlacementCommand = CreateDropShardPlacementCommand(schemaName, shardRelationName, storageType); Task *task = CitusMakeNode(Task); task->jobId = INVALID_JOB_ID; task->taskId = taskId++; task->taskType = DDL_TASK; SetTaskQueryString(task, dropShardPlacementCommand); task->dependentTaskList = NULL; task->replicationModel = REPLICATION_MODEL_INVALID; task->anchorShardId = shardId; task->taskPlacementList = ShardPlacementListViaCatalog(shardId); taskList = lappend(taskList, task); } return taskList; } /* * ExecuteDropShardPlacementCommandRemotely executes the given DROP shard command * via remote critical connection. */ static void ExecuteDropShardPlacementCommandRemotely(ShardPlacement *shardPlacement, const char *relationName, const char *dropShardPlacementCommand) { Assert(shardPlacement != NULL); Assert(relationName != NULL); Assert(dropShardPlacementCommand != NULL); uint32 connectionFlags = FOR_DDL; MultiConnection *connection = GetPlacementConnection(connectionFlags, shardPlacement, NULL); /* * This code-path doesn't support optional connections, so we don't expect * NULL connections. */ Assert(connection != NULL); RemoteTransactionBeginIfNecessary(connection); if (PQstatus(connection->pgConn) != CONNECTION_OK) { char *workerName = shardPlacement->nodeName; uint32 workerPort = shardPlacement->nodePort; /* build shard relation name */ uint64 shardId = shardPlacement->shardId; char *shardRelationName = pstrdup(relationName); AppendShardIdToName(&shardRelationName, shardId); ereport(WARNING, (errmsg("could not connect to shard \"%s\" on node " "\"%s:%u\"", shardRelationName, workerName, workerPort), errdetail("Marking this shard placement for " "deletion"))); InsertCleanupRecordInCurrentTransaction(CLEANUP_OBJECT_SHARD_PLACEMENT, shardRelationName, shardPlacement->groupId, CLEANUP_DEFERRED_ON_SUCCESS); return; } MarkRemoteTransactionCritical(connection); ExecuteCriticalRemoteCommand(connection, dropShardPlacementCommand); } /* * CreateDropShardPlacementCommand function builds the DROP command to drop * the given shard relation by qualifying it with schema name according to * shard relation's storage type. */ static char * CreateDropShardPlacementCommand(const char *schemaName, const char *shardRelationName, char storageType) { Assert(schemaName != NULL); Assert(shardRelationName != NULL); StringInfo workerDropQuery = makeStringInfo(); const char *quotedShardName = quote_qualified_identifier(schemaName, shardRelationName); /* build workerDropQuery according to shard storage type */ if (storageType == SHARD_STORAGE_TABLE) { appendStringInfo(workerDropQuery, DROP_REGULAR_TABLE_COMMAND, quotedShardName); } else if (storageType == SHARD_STORAGE_FOREIGN) { appendStringInfo(workerDropQuery, DROP_FOREIGN_TABLE_COMMAND, quotedShardName); } else { /* no other storage type is expected here */ Assert(false); } return workerDropQuery->data; }