/*------------------------------------------------------------------------- * * metadata_sync.c * * Routines for synchronizing metadata to all workers. * * Copyright (c) Citus Data, Inc. * * $Id$ * *------------------------------------------------------------------------- */ #include #include #include #include "postgres.h" #include "miscadmin.h" #include "pgstat.h" #include "access/genam.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/nbtree.h" #include "access/sysattr.h" #include "access/xact.h" #include "catalog/dependency.h" #include "catalog/indexing.h" #include "catalog/pg_am.h" #include "catalog/pg_attrdef.h" #include "catalog/pg_collation.h" #include "catalog/pg_constraint.h" #include "catalog/pg_depend.h" #include "catalog/pg_foreign_server.h" #include "catalog/pg_namespace.h" #include "catalog/pg_proc.h" #include "catalog/pg_type.h" #include "commands/async.h" #include "executor/spi.h" #include "foreign/foreign.h" #include "nodes/makefuncs.h" #include "nodes/pg_list.h" #include "parser/parse_type.h" #include "postmaster/bgworker.h" #include "postmaster/postmaster.h" #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/snapmgr.h" #include "utils/syscache.h" #include "distributed/argutils.h" #include "distributed/backend_data.h" #include "distributed/citus_ruleutils.h" #include "distributed/colocation_utils.h" #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/coordinator_protocol.h" #include "distributed/deparser.h" #include "distributed/distribution_column.h" #include "distributed/listutils.h" #include "distributed/maintenanced.h" #include "distributed/metadata/dependency.h" #include "distributed/metadata/distobject.h" #include "distributed/metadata/pg_dist_object.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/metadata_utility.h" #include "distributed/multi_executor.h" #include "distributed/multi_join_order.h" #include "distributed/multi_partitioning_utils.h" #include "distributed/multi_physical_planner.h" #include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_node.h" #include "distributed/pg_dist_schema.h" #include "distributed/pg_dist_shard.h" #include "distributed/relation_access_tracking.h" #include "distributed/remote_commands.h" #include "distributed/resource_lock.h" #include "distributed/tenant_schema_metadata.h" #include "distributed/utils/array_type.h" #include "distributed/utils/function.h" #include "distributed/version_compat.h" #include "distributed/worker_manager.h" #include "distributed/worker_protocol.h" #include "distributed/worker_transaction.h" /* managed via a GUC */ char *EnableManualMetadataChangesForUser = ""; int MetadataSyncTransMode = METADATA_SYNC_TRANSACTIONAL; static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId); static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress); static char * SchemaOwnerName(Oid objectId); static bool HasMetadataWorkers(void); static void CreateShellTableOnWorkers(Oid relationId); static void CreateTableMetadataOnWorkers(Oid relationId); static void CreateDependingViewsOnWorkers(Oid relationId); static void AddTableToPublications(Oid relationId); static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void); static bool ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey); static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError); static void DropMetadataSnapshotOnNode(WorkerNode *workerNode); static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName); static GrantStmt * GenerateGrantStmtForRights(ObjectType objectType, Oid roleOid, Oid objectId, char *permission, bool withGrantOption); static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId); static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission); static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem); static List * GenerateGrantOnFunctionQueriesFromAclItem(Oid schemaOid, AclItem *aclItem); static List * GrantOnSequenceDDLCommands(Oid sequenceOid); static List * GenerateGrantOnSequenceQueriesFromAclItem(Oid sequenceOid, AclItem *aclItem); static char * GenerateSetRoleQuery(Oid roleOid); static void MetadataSyncSigTermHandler(SIGNAL_ARGS); static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS); static bool ShouldSkipMetadataChecks(void); static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId, char replicationModel, Var *distributionKey); static void EnsureCoordinatorInitiatedOperation(void); static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, text *shardMinValue, text *shardMaxValue); static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId, int64 shardLength, int32 groupId); static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation); static char * ColocationGroupDeleteCommand(uint32 colocationId); static char * RemoteSchemaIdExpressionById(Oid schemaId); static char * RemoteSchemaIdExpressionByName(char *schemaName); static char * RemoteTypeIdExpression(Oid typeId); static char * RemoteCollationIdExpression(Oid colocationId); static char * RemoteTableIdExpression(Oid relationId); PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes); PG_FUNCTION_INFO_V1(start_metadata_sync_to_node); PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node); PG_FUNCTION_INFO_V1(worker_record_sequence_dependency); /* * Functions to modify metadata. Normally modifying metadata requires * superuser. However, these functions can be called with superusers * or regular users as long as the regular user owns the input object. */ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy); PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata); PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation); PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata); PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema); PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata); static bool got_SIGTERM = false; static bool got_SIGALRM = false; #define METADATA_SYNC_APP_NAME "Citus Metadata Sync Daemon" /* * start_metadata_sync_to_node function sets hasmetadata column of the given * node to true, and then activate node without replicating reference tables. */ Datum start_metadata_sync_to_node(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); EnsureSuperUser(); EnsureCoordinator(); char *nodeNameString = text_to_cstring(nodeName); WorkerNode *workerNode = ModifiableWorkerNode(nodeNameString, nodePort); /* * Create MetadataSyncContext which is used throughout nodes' activation. * It contains activated nodes, bare connections if the mode is nontransactional, * and a memory context for allocation. */ bool collectCommands = false; bool nodesAddedInSameTransaction = false; MetadataSyncContext *context = CreateMetadataSyncContext(list_make1(workerNode), collectCommands, nodesAddedInSameTransaction); ActivateNodeList(context); TransactionModifiedNodeMetadata = true; PG_RETURN_VOID(); } /* * start_metadata_sync_to_all_nodes function sets hasmetadata column of * all the primary worker nodes to true, and then activate nodes without * replicating reference tables. */ Datum start_metadata_sync_to_all_nodes(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureSuperUser(); EnsureCoordinator(); List *nodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock); /* * Create MetadataSyncContext which is used throughout nodes' activation. * It contains activated nodes, bare connections if the mode is nontransactional, * and a memory context for allocation. */ bool collectCommands = false; bool nodesAddedInSameTransaction = false; MetadataSyncContext *context = CreateMetadataSyncContext(nodeList, collectCommands, nodesAddedInSameTransaction); ActivateNodeList(context); TransactionModifiedNodeMetadata = true; PG_RETURN_BOOL(true); } /* * SyncCitusTableMetadata syncs citus table metadata to worker nodes with metadata. * Our definition of metadata includes the shell table and its inter relations with * other shell tables, corresponding pg_dist_object, pg_dist_partiton, pg_dist_shard * and pg_dist_shard placement entries. This function also propagates the views that * depend on the given relation, to the metadata workers, and adds the relation to * the appropriate publications. */ void SyncCitusTableMetadata(Oid relationId) { CreateShellTableOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId); CreateInterTableRelationshipOfRelationOnWorkers(relationId); if (!IsTableOwnedByExtension(relationId)) { ObjectAddress relationAddress = { 0 }; ObjectAddressSet(relationAddress, RelationRelationId, relationId); MarkObjectDistributed(&relationAddress); } CreateDependingViewsOnWorkers(relationId); AddTableToPublications(relationId); } /* * CreateDependingViewsOnWorkers takes a relationId and creates the views that depend on * that relation on workers with metadata. Propagated views are marked as distributed. */ static void CreateDependingViewsOnWorkers(Oid relationId) { List *views = GetDependingViews(relationId); if (list_length(views) < 1) { /* no view to propagate */ return; } SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); Oid viewOid = InvalidOid; foreach_oid(viewOid, views) { if (!ShouldMarkRelationDistributed(viewOid)) { continue; } ObjectAddress *viewAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*viewAddress, RelationRelationId, viewOid); EnsureAllObjectDependenciesExistOnAllNodes(list_make1(viewAddress)); char *createViewCommand = CreateViewDDLCommand(viewOid); char *alterViewOwnerCommand = AlterViewOwnerCommand(viewOid); SendCommandToWorkersWithMetadata(createViewCommand); SendCommandToWorkersWithMetadata(alterViewOwnerCommand); MarkObjectDistributed(viewAddress); } SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); } /* * AddTableToPublications adds the table to a publication on workers with metadata. */ static void AddTableToPublications(Oid relationId) { List *publicationIds = GetRelationPublications(relationId); if (publicationIds == NIL) { return; } Oid publicationId = InvalidOid; SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); foreach_oid(publicationId, publicationIds) { ObjectAddress *publicationAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*publicationAddress, PublicationRelationId, publicationId); List *addresses = list_make1(publicationAddress); if (!ShouldPropagateAnyObject(addresses)) { /* skip non-distributed publications */ continue; } /* ensure schemas exist */ EnsureAllObjectDependenciesExistOnAllNodes(addresses); bool isAdd = true; char *alterPublicationCommand = GetAlterPublicationTableDDLCommand(publicationId, relationId, isAdd); /* send ALTER PUBLICATION .. ADD to workers with metadata */ SendCommandToWorkersWithMetadata(alterPublicationCommand); } SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION); } /* * EnsureSequentialModeMetadataOperations makes sure that the current transaction is * already in sequential mode, or can still safely be put in sequential mode, * it errors if that is not possible. The error contains information for the user to * retry the transaction with sequential mode set from the beginning. * * Metadata objects (e.g., distributed table on the workers) exists only 1 instance of * the type used by potentially multiple other shards/connections. To make sure all * shards/connections in the transaction can interact with the metadata needs to be * visible on all connections used by the transaction, meaning we can only use 1 * connection per node. */ void EnsureSequentialModeMetadataOperations(void) { if (!IsTransactionBlock()) { /* we do not need to switch to sequential mode if we are not in a transaction */ return; } if (ParallelQueryExecutedInTransaction()) { ereport(ERROR, (errmsg( "cannot execute metadata syncing operation because there was a " "parallel operation on a distributed table in the " "transaction"), errdetail("When modifying metadata, Citus needs to " "perform all operations over a single connection per " "node to ensure consistency."), errhint("Try re-running the transaction with " "\"SET LOCAL citus.multi_shard_modify_mode TO " "\'sequential\';\""))); } ereport(DEBUG1, (errmsg("switching to sequential query execution mode"), errdetail("Metadata synced or stopped syncing. To make " "sure subsequent commands see the metadata correctly " "we need to make sure to use only one connection for " "all future commands"))); SetLocalMultiShardModifyModeToSequential(); } /* * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * to false in pg_dist_node table, thus indicating that the specified worker node does not * receive DDL changes anymore and cannot be used for issuing queries. */ Datum stop_metadata_sync_to_node(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureCoordinator(); EnsureSuperUser(); text *nodeName = PG_GETARG_TEXT_P(0); int32 nodePort = PG_GETARG_INT32(1); bool clearMetadata = PG_GETARG_BOOL(2); char *nodeNameString = text_to_cstring(nodeName); LockRelationOid(DistNodeRelationId(), ExclusiveLock); WorkerNode *workerNode = FindWorkerNodeAnyCluster(nodeNameString, nodePort); if (workerNode == NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("node (%s,%d) does not exist", nodeNameString, nodePort))); } if (NodeIsCoordinator(workerNode)) { ereport(NOTICE, (errmsg("node (%s,%d) is the coordinator and should have " "metadata, skipping stopping the metadata sync", nodeNameString, nodePort))); PG_RETURN_VOID(); } if (clearMetadata) { if (NodeIsPrimary(workerNode)) { ereport(NOTICE, (errmsg("dropping metadata on the node (%s,%d)", nodeNameString, nodePort))); DropMetadataSnapshotOnNode(workerNode); } else { /* * If this is a secondary node we can't actually clear metadata from it, * we assume the primary node is cleared. */ ereport(NOTICE, (errmsg("(%s,%d) is a secondary node: to clear the metadata," " you should clear metadata from the primary node", nodeNameString, nodePort))); } } workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_hasmetadata, BoolGetDatum( false)); workerNode = SetWorkerColumn(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(false)); TransactionModifiedNodeMetadata = true; PG_RETURN_VOID(); } /* * ClusterHasKnownMetadataWorkers returns true if the node executing the function * knows at least one worker with metadata. We do it * (a) by checking the node that executes the function is a worker with metadata * (b) the coordinator knows at least one worker with metadata. */ bool ClusterHasKnownMetadataWorkers() { bool workerWithMetadata = false; if (!IsCoordinator()) { workerWithMetadata = true; } if (workerWithMetadata || HasMetadataWorkers()) { return true; } return false; } /* * ShouldSyncUserCommandForObject checks if the user command should be synced to the * worker nodes for the given object. */ bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress) { if (objectAddress.classId == RelationRelationId) { Oid relOid = objectAddress.objectId; return ShouldSyncTableMetadata(relOid) || ShouldSyncSequenceMetadata(relOid) || get_rel_relkind(relOid) == RELKIND_VIEW; } return false; } /* * ShouldSyncTableMetadata checks if the metadata of a distributed table should be * propagated to metadata workers, i.e. the table is a hash distributed table or * a Citus table that doesn't have shard key. */ bool ShouldSyncTableMetadata(Oid relationId) { if (!EnableMetadataSync || !OidIsValid(relationId) || !IsCitusTable(relationId)) { return false; } CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId); bool hashDistributed = IsCitusTableTypeCacheEntry(tableEntry, HASH_DISTRIBUTED); bool citusTableWithNoDistKey = !HasDistributionKeyCacheEntry(tableEntry); return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey); } /* * ShouldSyncTableMetadataViaCatalog checks if the metadata of a Citus table should * be propagated to metadata workers, i.e. the table is an MX table or Citus table * that doesn't have shard key. * Tables with streaming replication model (which means RF=1) and hash distribution are * considered as MX tables. * * ShouldSyncTableMetadataViaCatalog does not use the CitusTableCache and instead reads * from catalog tables directly. */ bool ShouldSyncTableMetadataViaCatalog(Oid relationId) { if (!OidIsValid(relationId) || !IsCitusTableViaCatalog(relationId)) { return false; } char partitionMethod = PartitionMethodViaCatalog(relationId); bool hashDistributed = partitionMethod == DISTRIBUTE_BY_HASH; bool citusTableWithNoDistKey = partitionMethod == DISTRIBUTE_BY_NONE; return ShouldSyncTableMetadataInternal(hashDistributed, citusTableWithNoDistKey); } /* * FetchRelationIdFromPgPartitionHeapTuple returns relation id from given heap tuple. */ Oid FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc) { Assert(heapTuple->t_tableOid == DistPartitionRelationId()); bool isNullArray[Natts_pg_dist_partition]; Datum datumArray[Natts_pg_dist_partition]; heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray); Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1]; Oid relationId = DatumGetObjectId(relationIdDatum); return relationId; } /* * ShouldSyncTableMetadataInternal decides whether we should sync the metadata for a table * based on whether it is a hash distributed table, or a citus table with no distribution * key. * * This function is here to make sure that ShouldSyncTableMetadata and * ShouldSyncTableMetadataViaCatalog behaves the same way. */ static bool ShouldSyncTableMetadataInternal(bool hashDistributed, bool citusTableWithNoDistKey) { return hashDistributed || citusTableWithNoDistKey; } /* * ShouldSyncSequenceMetadata checks if the metadata of a sequence should be * propagated to metadata workers, i.e. the sequence is marked as distributed */ bool ShouldSyncSequenceMetadata(Oid relationId) { if (!OidIsValid(relationId) || !(get_rel_relkind(relationId) == RELKIND_SEQUENCE)) { return false; } ObjectAddress *sequenceAddress = palloc0(sizeof(ObjectAddress)); ObjectAddressSet(*sequenceAddress, RelationRelationId, relationId); return IsAnyObjectDistributed(list_make1(sequenceAddress)); } /* * SyncMetadataSnapshotToNode does the following: * SyncNodeMetadataSnapshotToNode does the following: * 1. Sets the localGroupId on the worker so the worker knows which tuple in * pg_dist_node represents itself. * 2. Recreates the node metadata on the given worker. * If raiseOnError is true, it errors out if synchronization fails. */ static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError) { char *currentUser = CurrentUserName(); /* generate and add the local group id's update query */ char *localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId); /* generate the queries which drop the node metadata */ List *dropMetadataCommandList = NodeMetadataDropCommands(); /* generate the queries which create the node metadata from scratch */ List *createMetadataCommandList = NodeMetadataCreateCommands(); List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand); recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, dropMetadataCommandList); recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList, createMetadataCommandList); /* * Send the snapshot recreation commands in a single remote transaction and * if requested, error out in any kind of failure. Note that it is not * required to send createMetadataSnapshotCommandList in the same transaction * that we send nodeDeleteCommand and nodeInsertCommand commands below. */ if (raiseOnError) { SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1( workerNode), currentUser, recreateMetadataSnapshotCommandList); return true; } else { bool success = SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( workerNode->workerName, workerNode->workerPort, currentUser, recreateMetadataSnapshotCommandList); return success; } } /* * DropMetadataSnapshotOnNode creates the queries which drop the metadata and sends them * to the worker given as parameter. */ static void DropMetadataSnapshotOnNode(WorkerNode *workerNode) { EnsureSequentialModeMetadataOperations(); char *userName = CurrentUserName(); /* * Detach partitions, break dependencies between sequences and table then * remove shell tables first. */ bool singleTransaction = true; List *dropMetadataCommandList = DetachPartitionCommandList(); dropMetadataCommandList = lappend(dropMetadataCommandList, BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND); dropMetadataCommandList = lappend(dropMetadataCommandList, WorkerDropAllShellTablesCommand(singleTransaction)); dropMetadataCommandList = list_concat(dropMetadataCommandList, NodeMetadataDropCommands()); dropMetadataCommandList = lappend(dropMetadataCommandList, LocalGroupIdUpdateCommand(0)); /* remove all dist table and object/table related metadata afterwards */ dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PARTITIONS); dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_SHARDS); dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PLACEMENTS); dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_COLOCATION); Assert(superuser()); SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction( workerNode->workerName, workerNode->workerPort, userName, dropMetadataCommandList); } /* * NodeMetadataCreateCommands returns list of queries that are * required to create the current metadata snapshot of the node that the * function is called. The metadata snapshot commands includes the * following queries: * * (i) Query that populates pg_dist_node table */ List * NodeMetadataCreateCommands(void) { List *metadataSnapshotCommandList = NIL; bool includeNodesFromOtherClusters = true; List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters); /* make sure we have deterministic output for our tests */ workerNodeList = SortList(workerNodeList, CompareWorkerNodes); /* generate insert command for pg_dist_node table */ char *nodeListInsertCommand = NodeListInsertCommand(workerNodeList); metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, nodeListInsertCommand); return metadataSnapshotCommandList; } /* * CitusTableMetadataCreateCommandList returns the set of commands necessary to * create the given distributed table metadata on a worker. */ List * CitusTableMetadataCreateCommandList(Oid relationId) { CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); List *commandList = NIL; /* command to insert pg_dist_partition entry */ char *metadataCommand = DistributionCreateCommand(cacheEntry); commandList = lappend(commandList, metadataCommand); /* commands to insert pg_dist_shard & pg_dist_placement entries */ List *shardIntervalList = LoadShardIntervalList(relationId); List *shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList); commandList = list_concat(commandList, shardMetadataInsertCommandList); return commandList; } /* * NodeMetadataDropCommands returns list of queries that are required to * drop all the metadata of the node that are not related to clustered tables. * The drop metadata snapshot commands includes the following queries: * * (i) Queries that delete all the rows from pg_dist_node table */ List * NodeMetadataDropCommands(void) { List *dropSnapshotCommandList = NIL; dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES); return dropSnapshotCommandList; } /* * NodeListInsertCommand generates a single multi-row INSERT command that can be * executed to insert the nodes that are in workerNodeList to pg_dist_node table. */ char * NodeListInsertCommand(List *workerNodeList) { StringInfo nodeListInsertCommand = makeStringInfo(); int workerCount = list_length(workerNodeList); int processedWorkerNodeCount = 0; Oid primaryRole = PrimaryNodeRoleId(); /* if there are no workers, return NULL */ if (workerCount == 0) { return nodeListInsertCommand->data; } if (primaryRole == InvalidOid) { ereport(ERROR, (errmsg("bad metadata, noderole does not exist"), errdetail("you should never see this, please submit " "a bug report"), errhint("run ALTER EXTENSION citus UPDATE and try again"))); } /* generate the query without any values yet */ appendStringInfo(nodeListInsertCommand, "INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, " "noderack, hasmetadata, metadatasynced, isactive, noderole, " "nodecluster, shouldhaveshards) VALUES "); /* iterate over the worker nodes, add the values */ WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { char *hasMetadataString = workerNode->hasMetadata ? "TRUE" : "FALSE"; char *metadataSyncedString = workerNode->metadataSynced ? "TRUE" : "FALSE"; char *isActiveString = workerNode->isActive ? "TRUE" : "FALSE"; char *shouldHaveShards = workerNode->shouldHaveShards ? "TRUE" : "FALSE"; Datum nodeRoleOidDatum = ObjectIdGetDatum(workerNode->nodeRole); Datum nodeRoleStringDatum = DirectFunctionCall1(enum_out, nodeRoleOidDatum); char *nodeRoleString = DatumGetCString(nodeRoleStringDatum); appendStringInfo(nodeListInsertCommand, "(%d, %d, %s, %d, %s, %s, %s, %s, '%s'::noderole, %s, %s)", workerNode->nodeId, workerNode->groupId, quote_literal_cstr(workerNode->workerName), workerNode->workerPort, quote_literal_cstr(workerNode->workerRack), hasMetadataString, metadataSyncedString, isActiveString, nodeRoleString, quote_literal_cstr(workerNode->nodeCluster), shouldHaveShards); processedWorkerNodeCount++; if (processedWorkerNodeCount != workerCount) { appendStringInfo(nodeListInsertCommand, ","); } } return nodeListInsertCommand->data; } /* * NodeListIdempotentInsertCommand generates an idempotent multi-row INSERT command that * can be executed to insert the nodes that are in workerNodeList to pg_dist_node table. * It would insert new nodes or replace current nodes with new nodes if nodename-nodeport * pairs already exist. */ char * NodeListIdempotentInsertCommand(List *workerNodeList) { StringInfo nodeInsertIdempotentCommand = makeStringInfo(); char *nodeInsertStr = NodeListInsertCommand(workerNodeList); appendStringInfoString(nodeInsertIdempotentCommand, nodeInsertStr); char *onConflictStr = " ON CONFLICT ON CONSTRAINT pg_dist_node_nodename_nodeport_key " "DO UPDATE SET nodeid = EXCLUDED.nodeid, " "groupid = EXCLUDED.groupid, " "nodename = EXCLUDED.nodename, " "nodeport = EXCLUDED.nodeport, " "noderack = EXCLUDED.noderack, " "hasmetadata = EXCLUDED.hasmetadata, " "isactive = EXCLUDED.isactive, " "noderole = EXCLUDED.noderole, " "nodecluster = EXCLUDED.nodecluster ," "metadatasynced = EXCLUDED.metadatasynced, " "shouldhaveshards = EXCLUDED.shouldhaveshards"; appendStringInfoString(nodeInsertIdempotentCommand, onConflictStr); return nodeInsertIdempotentCommand->data; } /* * MarkObjectsDistributedCreateCommand generates a command that can be executed to * insert or update the provided objects into pg_dist_object on a worker node. */ char * MarkObjectsDistributedCreateCommand(List *addresses, List *distributionArgumentIndexes, List *colocationIds, List *forceDelegations) { StringInfo insertDistributedObjectsCommand = makeStringInfo(); Assert(list_length(addresses) == list_length(distributionArgumentIndexes)); Assert(list_length(distributionArgumentIndexes) == list_length(colocationIds)); appendStringInfo(insertDistributedObjectsCommand, "WITH distributed_object_data(typetext, objnames, " "objargs, distargumentindex, colocationid, force_delegation) AS (VALUES "); bool isFirstObject = true; for (int currentObjectCounter = 0; currentObjectCounter < list_length(addresses); currentObjectCounter++) { ObjectAddress *address = list_nth(addresses, currentObjectCounter); int distributionArgumentIndex = list_nth_int(distributionArgumentIndexes, currentObjectCounter); int colocationId = list_nth_int(colocationIds, currentObjectCounter); int forceDelegation = list_nth_int(forceDelegations, currentObjectCounter); List *names = NIL; List *args = NIL; char *objectType = getObjectTypeDescription(address, false); getObjectIdentityParts(address, &names, &args, false); if (!isFirstObject) { appendStringInfo(insertDistributedObjectsCommand, ", "); } isFirstObject = false; appendStringInfo(insertDistributedObjectsCommand, "(%s, ARRAY[", quote_literal_cstr(objectType)); char *name = NULL; bool firstInNameLoop = true; foreach_ptr(name, names) { if (!firstInNameLoop) { appendStringInfo(insertDistributedObjectsCommand, ", "); } firstInNameLoop = false; appendStringInfoString(insertDistributedObjectsCommand, quote_literal_cstr(name)); } appendStringInfo(insertDistributedObjectsCommand, "]::text[], ARRAY["); char *arg; bool firstInArgLoop = true; foreach_ptr(arg, args) { if (!firstInArgLoop) { appendStringInfo(insertDistributedObjectsCommand, ", "); } firstInArgLoop = false; appendStringInfoString(insertDistributedObjectsCommand, quote_literal_cstr(arg)); } appendStringInfo(insertDistributedObjectsCommand, "]::text[], "); appendStringInfo(insertDistributedObjectsCommand, "%d, ", distributionArgumentIndex); appendStringInfo(insertDistributedObjectsCommand, "%d, ", colocationId); appendStringInfo(insertDistributedObjectsCommand, "%s)", forceDelegation ? "true" : "false"); } appendStringInfo(insertDistributedObjectsCommand, ") "); appendStringInfo(insertDistributedObjectsCommand, "SELECT citus_internal_add_object_metadata(" "typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) " "FROM distributed_object_data;"); return insertDistributedObjectsCommand->data; } /* * citus_internal_add_object_metadata is an internal UDF to * add a row to pg_dist_object. */ Datum citus_internal_add_object_metadata(PG_FUNCTION_ARGS) { char *textType = TextDatumGetCString(PG_GETARG_DATUM(0)); ArrayType *nameArray = PG_GETARG_ARRAYTYPE_P(1); ArrayType *argsArray = PG_GETARG_ARRAYTYPE_P(2); int distributionArgumentIndex = PG_GETARG_INT32(3); int colocationId = PG_GETARG_INT32(4); bool forceDelegation = PG_GETARG_INT32(5); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); /* * Ensure given distributionArgumentIndex and colocationId values are * sane. Since we check sanity of object related parameters within * PgGetObjectAddress below, we are not checking them here. */ EnsureObjectMetadataIsSane(distributionArgumentIndex, colocationId); } /* * We check the acl/ownership while getting the object address. That * funtion also checks the sanity of given textType, nameArray and * argsArray parameters */ ObjectAddress objectAddress = PgGetObjectAddress(textType, nameArray, argsArray); /* First, disable propagation off to not to cause infinite propagation */ bool prevDependencyCreationValue = EnableMetadataSync; SetLocalEnableMetadataSync(false); MarkObjectDistributed(&objectAddress); if (distributionArgumentIndex != INVALID_DISTRIBUTION_ARGUMENT_INDEX || colocationId != INVALID_COLOCATION_ID) { int *distributionArgumentIndexAddress = distributionArgumentIndex == INVALID_DISTRIBUTION_ARGUMENT_INDEX ? NULL : &distributionArgumentIndex; int *colocationIdAddress = colocationId == INVALID_COLOCATION_ID ? NULL : &colocationId; bool *forceDelegationAddress = forceDelegation == false ? NULL : &forceDelegation; UpdateFunctionDistributionInfo(&objectAddress, distributionArgumentIndexAddress, colocationIdAddress, forceDelegationAddress); } SetLocalEnableMetadataSync(prevDependencyCreationValue); PG_RETURN_VOID(); } /* * EnsureObjectMetadataIsSane checks whether the distribution argument index and * colocation id metadata params for distributed object is sane. You can look * PgGetObjectAddress to find checks related to object sanity. */ static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, int colocationId) { if (distributionArgumentIndex != INVALID_DISTRIBUTION_ARGUMENT_INDEX) { if (distributionArgumentIndex < 0 || distributionArgumentIndex > FUNC_MAX_ARGS) { ereport(ERROR, errmsg("distribution_argument_index must be between" " 0 and %d", FUNC_MAX_ARGS)); } } if (colocationId != INVALID_COLOCATION_ID) { if (colocationId < 0) { ereport(ERROR, errmsg("colocationId must be a positive number")); } } } /* * DistributionCreateCommands generates a commands that can be * executed to replicate the metadata for a Citus table. */ char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry) { StringInfo insertDistributionCommand = makeStringInfo(); Oid relationId = cacheEntry->relationId; char distributionMethod = cacheEntry->partitionMethod; char *qualifiedRelationName = generate_qualified_relation_name(relationId); uint32 colocationId = cacheEntry->colocationId; char replicationModel = cacheEntry->replicationModel; StringInfo tablePartitionKeyNameString = makeStringInfo(); if (!HasDistributionKeyCacheEntry(cacheEntry)) { appendStringInfo(tablePartitionKeyNameString, "NULL"); } else { char *partitionKeyColumnName = ColumnToColumnName(relationId, (Node *) cacheEntry->partitionColumn); appendStringInfo(tablePartitionKeyNameString, "%s", quote_literal_cstr(partitionKeyColumnName)); } appendStringInfo(insertDistributionCommand, "SELECT citus_internal_add_partition_metadata " "(%s::regclass, '%c', %s, %d, '%c')", quote_literal_cstr(qualifiedRelationName), distributionMethod, tablePartitionKeyNameString->data, colocationId, replicationModel); return insertDistributionCommand->data; } /* * DistributionDeleteCommand generates a command that can be executed * to drop a distributed table and its metadata on a remote node. */ char * DistributionDeleteCommand(const char *schemaName, const char *tableName) { StringInfo deleteDistributionCommand = makeStringInfo(); char *distributedRelationName = quote_qualified_identifier(schemaName, tableName); appendStringInfo(deleteDistributionCommand, "SELECT worker_drop_distributed_table(%s)", quote_literal_cstr(distributedRelationName)); return deleteDistributionCommand->data; } /* * DistributionDeleteMetadataCommand returns a query to delete pg_dist_partition * metadata from a worker node for a given table. */ char * DistributionDeleteMetadataCommand(Oid relationId) { StringInfo deleteCommand = makeStringInfo(); char *qualifiedRelationName = generate_qualified_relation_name(relationId); appendStringInfo(deleteCommand, "SELECT pg_catalog.citus_internal_delete_partition_metadata(%s)", quote_literal_cstr(qualifiedRelationName)); return deleteCommand->data; } /* * TableOwnerResetCommand generates a commands that can be executed * to reset the table owner. */ char * TableOwnerResetCommand(Oid relationId) { StringInfo ownerResetCommand = makeStringInfo(); char *qualifiedRelationName = generate_qualified_relation_name(relationId); char *tableOwnerName = TableOwner(relationId); appendStringInfo(ownerResetCommand, "ALTER TABLE %s OWNER TO %s", qualifiedRelationName, quote_identifier(tableOwnerName)); return ownerResetCommand->data; } /* * ShardListInsertCommand generates a single command that can be * executed to replicate shard and shard placement metadata for the * given shard intervals. The function assumes that each shard has a * single placement, and asserts this information. */ List * ShardListInsertCommand(List *shardIntervalList) { List *commandList = NIL; int shardCount = list_length(shardIntervalList); /* if there are no shards, return empty list */ if (shardCount == 0) { return commandList; } /* add placements to insertPlacementCommand */ StringInfo insertPlacementCommand = makeStringInfo(); appendStringInfo(insertPlacementCommand, "WITH placement_data(shardid, " "shardlength, groupid, placementid) AS (VALUES "); ShardInterval *shardInterval = NULL; bool firstPlacementProcessed = false; foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; List *shardPlacementList = ActiveShardPlacementList(shardId); ShardPlacement *placement = NULL; foreach_ptr(placement, shardPlacementList) { if (firstPlacementProcessed) { /* * As long as this is not the first placement of the first shard, * append the comma. */ appendStringInfo(insertPlacementCommand, ", "); } firstPlacementProcessed = true; appendStringInfo(insertPlacementCommand, "(%ld, %ld, %d, %ld)", shardId, placement->shardLength, placement->groupId, placement->placementId); } } appendStringInfo(insertPlacementCommand, ") "); appendStringInfo(insertPlacementCommand, "SELECT citus_internal_add_placement_metadata(" "shardid, shardlength, groupid, placementid) " "FROM placement_data;"); /* now add shards to insertShardCommand */ StringInfo insertShardCommand = makeStringInfo(); appendStringInfo(insertShardCommand, "WITH shard_data(relationname, shardid, storagetype, " "shardminvalue, shardmaxvalue) AS (VALUES "); foreach_ptr(shardInterval, shardIntervalList) { uint64 shardId = shardInterval->shardId; Oid distributedRelationId = shardInterval->relationId; char *qualifiedRelationName = generate_qualified_relation_name( distributedRelationId); StringInfo minHashToken = makeStringInfo(); StringInfo maxHashToken = makeStringInfo(); if (shardInterval->minValueExists) { appendStringInfo(minHashToken, "'%d'", DatumGetInt32( shardInterval->minValue)); } else { appendStringInfo(minHashToken, "NULL"); } if (shardInterval->maxValueExists) { appendStringInfo(maxHashToken, "'%d'", DatumGetInt32( shardInterval->maxValue)); } else { appendStringInfo(maxHashToken, "NULL"); } appendStringInfo(insertShardCommand, "(%s::regclass, %ld, '%c'::\"char\", %s, %s)", quote_literal_cstr(qualifiedRelationName), shardId, shardInterval->storageType, minHashToken->data, maxHashToken->data); if (llast(shardIntervalList) != shardInterval) { appendStringInfo(insertShardCommand, ", "); } } appendStringInfo(insertShardCommand, ") "); appendStringInfo(insertShardCommand, "SELECT citus_internal_add_shard_metadata(relationname, shardid, " "storagetype, shardminvalue, shardmaxvalue) " "FROM shard_data;"); /* * There are no active placements for the table, so do not create the * command as it'd lead to syntax error. * * This is normally not an expected situation, however the current * implementation of citus_disable_node allows to disable nodes with * the only active placements. So, for example a single shard/placement * distributed table on a disabled node might trigger zero placement * case. * * TODO: remove this check once citus_disable_node errors out for * the above scenario. */ if (firstPlacementProcessed) { /* first insert shards, than the placements */ commandList = lappend(commandList, insertShardCommand->data); commandList = lappend(commandList, insertPlacementCommand->data); } return commandList; } /* * ShardListDeleteCommand generates a command list that can be executed to delete * shard and shard placement metadata for the given shard. */ List * ShardDeleteCommandList(ShardInterval *shardInterval) { uint64 shardId = shardInterval->shardId; StringInfo deleteShardCommand = makeStringInfo(); appendStringInfo(deleteShardCommand, "SELECT citus_internal_delete_shard_metadata(%ld);", shardId); return list_make1(deleteShardCommand->data); } /* * NodeDeleteCommand generate a command that can be * executed to delete the metadata for a worker node. */ char * NodeDeleteCommand(uint32 nodeId) { StringInfo nodeDeleteCommand = makeStringInfo(); appendStringInfo(nodeDeleteCommand, "DELETE FROM pg_dist_node " "WHERE nodeid = %u", nodeId); return nodeDeleteCommand->data; } /* * NodeStateUpdateCommand generates a command that can be executed to update * isactive column of a node in pg_dist_node table. */ char * NodeStateUpdateCommand(uint32 nodeId, bool isActive) { StringInfo nodeStateUpdateCommand = makeStringInfo(); char *isActiveString = isActive ? "TRUE" : "FALSE"; appendStringInfo(nodeStateUpdateCommand, "UPDATE pg_dist_node SET isactive = %s " "WHERE nodeid = %u", isActiveString, nodeId); return nodeStateUpdateCommand->data; } /* * ShouldHaveShardsUpdateCommand generates a command that can be executed to * update the shouldhaveshards column of a node in pg_dist_node table. */ char * ShouldHaveShardsUpdateCommand(uint32 nodeId, bool shouldHaveShards) { StringInfo nodeStateUpdateCommand = makeStringInfo(); char *shouldHaveShardsString = shouldHaveShards ? "TRUE" : "FALSE"; appendStringInfo(nodeStateUpdateCommand, "UPDATE pg_catalog.pg_dist_node SET shouldhaveshards = %s " "WHERE nodeid = %u", shouldHaveShardsString, nodeId); return nodeStateUpdateCommand->data; } /* * ColocationIdUpdateCommand creates the SQL command to change the colocationId * of the table with the given name to the given colocationId in pg_dist_partition * table. */ char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId) { StringInfo command = makeStringInfo(); char *qualifiedRelationName = generate_qualified_relation_name(relationId); appendStringInfo(command, "SELECT citus_internal_update_relation_colocation(%s::regclass, %d)", quote_literal_cstr(qualifiedRelationName), colocationId); return command->data; } /* * PlacementUpsertCommand creates a SQL command for upserting a pg_dist_placment * entry with the given properties. In the case of a conflict on placementId, the command * updates all properties (excluding the placementId) with the given ones. */ char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId) { StringInfo command = makeStringInfo(); appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardLength, groupId, placementId); return command->data; } /* * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * of a worker and returns the command in a string. */ char * LocalGroupIdUpdateCommand(int32 groupId) { StringInfo updateCommand = makeStringInfo(); appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d", groupId); return updateCommand->data; } /* * DDLCommandsForSequence returns the DDL commands needs to be run to create the * sequence and alter the owner to the given owner name. */ List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName) { List *sequenceDDLList = NIL; char *sequenceDef = pg_get_sequencedef_string(sequenceOid); char *escapedSequenceDef = quote_literal_cstr(sequenceDef); StringInfo wrappedSequenceDef = makeStringInfo(); StringInfo sequenceGrantStmt = makeStringInfo(); char *sequenceName = generate_qualified_relation_name(sequenceOid); Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid); Oid sequenceTypeOid = sequenceData->seqtypid; char *typeName = format_type_be(sequenceTypeOid); /* create schema if needed */ appendStringInfo(wrappedSequenceDef, WORKER_APPLY_SEQUENCE_COMMAND, escapedSequenceDef, quote_literal_cstr(typeName)); appendStringInfo(sequenceGrantStmt, "ALTER SEQUENCE %s OWNER TO %s", sequenceName, quote_identifier(ownerName)); sequenceDDLList = lappend(sequenceDDLList, wrappedSequenceDef->data); sequenceDDLList = lappend(sequenceDDLList, sequenceGrantStmt->data); sequenceDDLList = list_concat(sequenceDDLList, GrantOnSequenceDDLCommands( sequenceOid)); return sequenceDDLList; } /* * GetAttributeTypeOid returns the OID of the type of the attribute of * provided relationId that has the provided attnum */ Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum) { Oid resultOid = InvalidOid; ScanKeyData key[2]; /* Grab an appropriate lock on the pg_attribute relation */ Relation attrel = table_open(AttributeRelationId, AccessShareLock); /* Use the index to scan only system attributes of the target relation */ ScanKeyInit(&key[0], Anum_pg_attribute_attrelid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); ScanKeyInit(&key[1], Anum_pg_attribute_attnum, BTLessEqualStrategyNumber, F_INT2LE, Int16GetDatum(attnum)); SysScanDesc scan = systable_beginscan(attrel, AttributeRelidNumIndexId, true, NULL, 2, key); HeapTuple attributeTuple; while (HeapTupleIsValid(attributeTuple = systable_getnext(scan))) { Form_pg_attribute att = (Form_pg_attribute) GETSTRUCT(attributeTuple); resultOid = att->atttypid; } systable_endscan(scan); table_close(attrel, AccessShareLock); return resultOid; } /* * GetDependentSequencesWithRelation appends the attnum and id of sequences that * have direct (owned sequences) or indirect dependency with the given relationId, * to the lists passed as NIL initially. * For both cases, we use the intermediate AttrDefault object from pg_depend. * If attnum is specified, we only return the sequences related to that * attribute of the relationId. * See DependencyType for the possible values of depType. * We use DEPENDENCY_INTERNAL for sequences created by identity column. * DEPENDENCY_AUTO for regular sequences. */ void GetDependentSequencesWithRelation(Oid relationId, List **seqInfoList, AttrNumber attnum, char depType) { Assert(*seqInfoList == NIL); List *attrdefResult = NIL; List *attrdefAttnumResult = NIL; ScanKeyData key[3]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_refclassid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(RelationRelationId)); ScanKeyInit(&key[1], Anum_pg_depend_refobjid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); if (attnum) { ScanKeyInit(&key[2], Anum_pg_depend_refobjsubid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(attnum)); } SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, NULL, attnum ? 3 : 2, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); if (deprec->classid == AttrDefaultRelationId && deprec->objsubid == 0 && deprec->refobjsubid != 0 && deprec->deptype == depType) { /* * We are going to generate corresponding SequenceInfo * in the following loop. */ attrdefResult = lappend_oid(attrdefResult, deprec->objid); attrdefAttnumResult = lappend_int(attrdefAttnumResult, deprec->refobjsubid); } else if (deprec->deptype == depType && deprec->refobjsubid != 0 && deprec->classid == RelationRelationId && get_rel_relkind(deprec->objid) == RELKIND_SEQUENCE) { SequenceInfo *seqInfo = (SequenceInfo *) palloc(sizeof(SequenceInfo)); seqInfo->sequenceOid = deprec->objid; seqInfo->attributeNumber = deprec->refobjsubid; seqInfo->isNextValDefault = false; *seqInfoList = lappend(*seqInfoList, seqInfo); } } systable_endscan(scan); table_close(depRel, AccessShareLock); AttrNumber attrdefAttnum = InvalidAttrNumber; Oid attrdefOid = InvalidOid; forboth_int_oid(attrdefAttnum, attrdefAttnumResult, attrdefOid, attrdefResult) { List *sequencesFromAttrDef = GetSequencesFromAttrDef(attrdefOid); /* to simplify and eliminate cases like "DEFAULT nextval('..') - nextval('..')" */ if (list_length(sequencesFromAttrDef) > 1) { ereport(ERROR, (errmsg( "More than one sequence in a column default" " is not supported for distribution " "or for adding local tables to metadata"))); } if (list_length(sequencesFromAttrDef) == 1) { SequenceInfo *seqInfo = (SequenceInfo *) palloc(sizeof(SequenceInfo)); seqInfo->sequenceOid = linitial_oid(sequencesFromAttrDef); seqInfo->attributeNumber = attrdefAttnum; seqInfo->isNextValDefault = true; *seqInfoList = lappend(*seqInfoList, seqInfo); } } } /* * GetDependentDependentRelationsWithSequence returns a list of oids of * relations that have have a dependency on the given sequence. * There are three types of dependencies: * 1. direct auto (owned sequences), created using SERIAL or BIGSERIAL * 2. indirect auto (through an AttrDef), created using DEFAULT nextval('..') * 3. internal, created using GENERATED ALWAYS AS IDENTITY * * Depending on the passed deptype, we return the relations that have the * given type(s): * - DEPENDENCY_AUTO returns both 1 and 2 * - DEPENDENCY_INTERNAL returns 3 * * The returned list can contain duplicates, as the same relation can have * multiple dependencies on the sequence. */ List * GetDependentRelationsWithSequence(Oid sequenceOid, char depType) { List *relations = NIL; ScanKeyData key[2]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(RelationRelationId)); ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(sequenceOid)); SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, lengthof(key), key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); if ( deprec->refclassid == RelationRelationId && deprec->refobjsubid != 0 && deprec->deptype == depType) { relations = lappend_oid(relations, deprec->refobjid); } } systable_endscan(scan); table_close(depRel, AccessShareLock); if (depType == DEPENDENCY_AUTO) { Oid attrDefOid; List *attrDefOids = GetAttrDefsFromSequence(sequenceOid); foreach_oid(attrDefOid, attrDefOids) { ObjectAddress columnAddress = GetAttrDefaultColumnAddress(attrDefOid); relations = lappend_oid(relations, columnAddress.objectId); } } return relations; } /* * GetSequencesFromAttrDef returns a list of sequence OIDs that have * dependency with the given attrdefOid in pg_depend */ List * GetSequencesFromAttrDef(Oid attrdefOid) { List *sequencesResult = NIL; ScanKeyData key[2]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(AttrDefaultRelationId)); ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(attrdefOid)); SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); if (deprec->refclassid == RelationRelationId && deprec->deptype == DEPENDENCY_NORMAL && get_rel_relkind(deprec->refobjid) == RELKIND_SEQUENCE) { sequencesResult = lappend_oid(sequencesResult, deprec->refobjid); } } systable_endscan(scan); table_close(depRel, AccessShareLock); return sequencesResult; } #if PG_VERSION_NUM < PG_VERSION_15 /* * Given a pg_attrdef OID, return the relation OID and column number of * the owning column (represented as an ObjectAddress for convenience). * * Returns InvalidObjectAddress if there is no such pg_attrdef entry. */ ObjectAddress GetAttrDefaultColumnAddress(Oid attrdefoid) { ObjectAddress result = InvalidObjectAddress; ScanKeyData skey[1]; HeapTuple tup; Relation attrdef = table_open(AttrDefaultRelationId, AccessShareLock); ScanKeyInit(&skey[0], Anum_pg_attrdef_oid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(attrdefoid)); SysScanDesc scan = systable_beginscan(attrdef, AttrDefaultOidIndexId, true, NULL, 1, skey); if (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_attrdef atdform = (Form_pg_attrdef) GETSTRUCT(tup); result.classId = RelationRelationId; result.objectId = atdform->adrelid; result.objectSubId = atdform->adnum; } systable_endscan(scan); table_close(attrdef, AccessShareLock); return result; } #endif /* * GetAttrDefsFromSequence returns a list of attrdef OIDs that have * a dependency on the given sequence */ List * GetAttrDefsFromSequence(Oid seqOid) { List *attrDefsResult = NIL; ScanKeyData key[2]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_refclassid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(RelationRelationId)); ScanKeyInit(&key[1], Anum_pg_depend_refobjid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(seqOid)); SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, NULL, lengthof(key), key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); if (deprec->classid == AttrDefaultRelationId && deprec->deptype == DEPENDENCY_NORMAL) { attrDefsResult = lappend_oid(attrDefsResult, deprec->objid); } } systable_endscan(scan); table_close(depRel, AccessShareLock); return attrDefsResult; } /* * GetDependentFunctionsWithRelation returns the dependent functions for the * given relation id. */ List * GetDependentFunctionsWithRelation(Oid relationId) { List *referencingObjects = NIL; List *functionOids = NIL; ScanKeyData key[2]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_refclassid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(RelationRelationId)); ScanKeyInit(&key[1], Anum_pg_depend_refobjid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId)); SysScanDesc scan = systable_beginscan(depRel, DependReferenceIndexId, true, NULL, 2, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); /* * objsubid is nonzero only for table columns and zero for anything else. * Since we are trying to find a dependency from the column of a table to * function we've added deprec->refobjsubid != 0 check. * * We are following DEPENDENCY_AUTO for dependencies via column and * DEPENDENCY_NORMAL anything else. Since only procedure dependencies * for those dependencies will be obtained in GetFunctionDependenciesForObjects * following both dependency types are not harmful. */ if ((deprec->refobjsubid != 0 && deprec->deptype == DEPENDENCY_AUTO) || deprec->deptype == DEPENDENCY_NORMAL) { ObjectAddress *refAddress = palloc(sizeof(ObjectAddress)); ObjectAddressSubSet(*refAddress, deprec->classid, deprec->objid, deprec->objsubid); referencingObjects = lappend(referencingObjects, refAddress); } } systable_endscan(scan); table_close(depRel, AccessShareLock); ObjectAddress *referencingObject = NULL; foreach_ptr(referencingObject, referencingObjects) { functionOids = list_concat(functionOids, GetFunctionDependenciesForObjects(referencingObject)); } return functionOids; } /* * GetFunctionDependenciesForObjects returns a list of function OIDs that have * dependency with the given object */ static List * GetFunctionDependenciesForObjects(ObjectAddress *objectAddress) { List *functionOids = NIL; ScanKeyData key[3]; HeapTuple tup; Relation depRel = table_open(DependRelationId, AccessShareLock); ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(objectAddress->classId)); ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(objectAddress->objectId)); ScanKeyInit(&key[2], Anum_pg_depend_objsubid, BTEqualStrategyNumber, F_INT4EQ, Int32GetDatum(objectAddress->objectSubId)); SysScanDesc scan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 3, key); while (HeapTupleIsValid(tup = systable_getnext(scan))) { Form_pg_depend deprec = (Form_pg_depend) GETSTRUCT(tup); if (deprec->refclassid == ProcedureRelationId) { functionOids = lappend_oid(functionOids, deprec->refobjid); } } systable_endscan(scan); table_close(depRel, AccessShareLock); return functionOids; } /* * SequenceDependencyCommandList generates commands to record the dependency * of sequences on tables on the worker. This dependency does not exist by * default since the sequences and table are created separately, but it is * necessary to ensure that the sequence is dropped when the table is * dropped. */ List * SequenceDependencyCommandList(Oid relationId) { List *sequenceCommandList = NIL; List *columnNameList = NIL; List *sequenceIdList = NIL; ExtractDefaultColumnsAndOwnedSequences(relationId, &columnNameList, &sequenceIdList); char *columnName = NULL; Oid sequenceId = InvalidOid; forboth_ptr_oid(columnName, columnNameList, sequenceId, sequenceIdList) { if (!OidIsValid(sequenceId)) { /* * ExtractDefaultColumnsAndOwnedSequences returns entries for all columns, * but with 0 sequence ID unless there is default nextval(..). */ continue; } char *sequenceDependencyCommand = CreateSequenceDependencyCommand(relationId, sequenceId, columnName); sequenceCommandList = lappend(sequenceCommandList, makeTableDDLCommandString( sequenceDependencyCommand)); } return sequenceCommandList; } /* * IdentitySequenceDependencyCommandList generate a command to execute * a UDF (WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES) on workers to modify the identity * columns min/max values to produce unique values on workers. */ List * IdentitySequenceDependencyCommandList(Oid targetRelationId) { List *commandList = NIL; Relation relation = relation_open(targetRelationId, AccessShareLock); TupleDesc tupleDescriptor = RelationGetDescr(relation); bool tableHasIdentityColumn = false; for (int attributeIndex = 0; attributeIndex < tupleDescriptor->natts; attributeIndex++) { Form_pg_attribute attributeForm = TupleDescAttr(tupleDescriptor, attributeIndex); if (attributeForm->attidentity) { tableHasIdentityColumn = true; break; } } relation_close(relation, NoLock); if (tableHasIdentityColumn) { StringInfo stringInfo = makeStringInfo(); char *tableName = generate_qualified_relation_name(targetRelationId); appendStringInfo(stringInfo, WORKER_ADJUST_IDENTITY_COLUMN_SEQ_RANGES, quote_literal_cstr(tableName)); commandList = lappend(commandList, makeTableDDLCommandString( stringInfo->data)); } return commandList; } /* * CreateSequenceDependencyCommand generates a query string for calling * worker_record_sequence_dependency on the worker to recreate a sequence->table * dependency. */ static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName) { char *relationName = generate_qualified_relation_name(relationId); char *sequenceName = generate_qualified_relation_name(sequenceId); StringInfo sequenceDependencyCommand = makeStringInfo(); appendStringInfo(sequenceDependencyCommand, "SELECT pg_catalog.worker_record_sequence_dependency" "(%s::regclass,%s::regclass,%s)", quote_literal_cstr(sequenceName), quote_literal_cstr(relationName), quote_literal_cstr(columnName)); return sequenceDependencyCommand->data; } /* * worker_record_sequence_dependency records the fact that the sequence depends on * the table in pg_depend, such that it will be automatically dropped. */ Datum worker_record_sequence_dependency(PG_FUNCTION_ARGS) { Oid sequenceOid = PG_GETARG_OID(0); Oid relationOid = PG_GETARG_OID(1); Name columnName = PG_GETARG_NAME(2); const char *columnNameStr = NameStr(*columnName); /* lookup column definition */ HeapTuple columnTuple = SearchSysCacheAttName(relationOid, columnNameStr); if (!HeapTupleIsValid(columnTuple)) { ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN), errmsg("column \"%s\" does not exist", columnNameStr))); } Form_pg_attribute columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple); if (columnForm->attnum <= 0) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot create dependency on system column \"%s\"", columnNameStr))); } ObjectAddress sequenceAddr = { .classId = RelationRelationId, .objectId = sequenceOid, .objectSubId = 0 }; ObjectAddress relationAddr = { .classId = RelationRelationId, .objectId = relationOid, .objectSubId = columnForm->attnum }; EnsureTableOwner(sequenceOid); EnsureTableOwner(relationOid); /* dependency from sequence to table */ recordDependencyOn(&sequenceAddr, &relationAddr, DEPENDENCY_AUTO); ReleaseSysCache(columnTuple); PG_RETURN_VOID(); } /* * CreateSchemaDDLCommand returns a "CREATE SCHEMA..." SQL string for creating the given * schema if not exists and with proper authorization. */ char * CreateSchemaDDLCommand(Oid schemaId) { char *schemaName = get_namespace_name(schemaId); StringInfo schemaNameDef = makeStringInfo(); const char *quotedSchemaName = quote_identifier(schemaName); const char *ownerName = quote_identifier(SchemaOwnerName(schemaId)); appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, quotedSchemaName, ownerName); return schemaNameDef->data; } /* * GrantOnSchemaDDLCommands creates a list of ddl command for replicating the permissions * of roles on schemas. */ List * GrantOnSchemaDDLCommands(Oid schemaOid) { HeapTuple schemaTuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(schemaOid)); bool isNull = true; Datum aclDatum = SysCacheGetAttr(NAMESPACEOID, schemaTuple, Anum_pg_namespace_nspacl, &isNull); if (isNull) { ReleaseSysCache(schemaTuple); return NIL; } Acl *acl = DatumGetAclPCopy(aclDatum); AclItem *aclDat = ACL_DAT(acl); int aclNum = ACL_NUM(acl); List *commands = NIL; ReleaseSysCache(schemaTuple); for (int i = 0; i < aclNum; i++) { commands = list_concat(commands, GenerateGrantOnSchemaQueriesFromAclItem( schemaOid, &aclDat[i])); } return commands; } /* * GenerateGrantOnSchemaQueryFromACLItem generates a query string for replicating a users permissions * on a schema. */ List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem) { AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA; AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_SCHEMA; /* * seems unlikely but we check if there is a grant option in the list without the actual permission */ Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE)); Assert(!(grants & ACL_CREATE) || (permissions & ACL_CREATE)); Oid granteeOid = aclItem->ai_grantee; List *queries = NIL; queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor)); if (permissions & ACL_USAGE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_SCHEMA, granteeOid, schemaOid, "USAGE", grants & ACL_USAGE)); queries = lappend(queries, query); } if (permissions & ACL_CREATE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_SCHEMA, granteeOid, schemaOid, "CREATE", grants & ACL_CREATE)); queries = lappend(queries, query); } queries = lappend(queries, "RESET ROLE"); return queries; } /* * GenerateGrantStmtForRights is the function for creating GrantStmt's for all * types of objects that are supported. It takes parameters to fill a GrantStmt's * fields and returns the GrantStmt. * The field `objects` of GrantStmt doesn't have a common structure for all types. * Make sure you have added your object type to GetObjectsForGrantStmt. */ static GrantStmt * GenerateGrantStmtForRights(ObjectType objectType, Oid roleOid, Oid objectId, char *permission, bool withGrantOption) { GrantStmt *stmt = makeNode(GrantStmt); stmt->is_grant = true; stmt->targtype = ACL_TARGET_OBJECT; stmt->objtype = objectType; stmt->objects = GetObjectsForGrantStmt(objectType, objectId); stmt->privileges = list_make1(GetAccessPrivObjectForGrantStmt(permission)); stmt->grantees = list_make1(GetRoleSpecObjectForUser(roleOid)); stmt->grant_option = withGrantOption; return stmt; } /* * GetObjectsForGrantStmt takes an object type and object id and returns the 'objects' * field to be used when creating GrantStmt. We have only one object here (the one with * the oid = objectId) but we pass it into the GrantStmt as a list with one element, * as GrantStmt->objects field is actually a list. */ static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId) { switch (objectType) { /* supported object types */ case OBJECT_SCHEMA: { return list_make1(makeString(get_namespace_name(objectId))); } /* enterprise supported object types */ case OBJECT_FUNCTION: case OBJECT_AGGREGATE: case OBJECT_PROCEDURE: { ObjectWithArgs *owa = ObjectWithArgsFromOid(objectId); return list_make1(owa); } case OBJECT_FDW: { ForeignDataWrapper *fdw = GetForeignDataWrapper(objectId); return list_make1(makeString(fdw->fdwname)); } case OBJECT_FOREIGN_SERVER: { ForeignServer *server = GetForeignServer(objectId); return list_make1(makeString(server->servername)); } case OBJECT_SEQUENCE: { Oid namespaceOid = get_rel_namespace(objectId); RangeVar *sequence = makeRangeVar(get_namespace_name(namespaceOid), get_rel_name(objectId), -1); return list_make1(sequence); } default: { elog(ERROR, "unsupported object type for GRANT"); } } return NIL; } /* * GrantOnFunctionDDLCommands creates a list of ddl command for replicating the permissions * of roles on distributed functions. */ List * GrantOnFunctionDDLCommands(Oid functionOid) { HeapTuple proctup = SearchSysCache1(PROCOID, ObjectIdGetDatum(functionOid)); bool isNull = true; Datum aclDatum = SysCacheGetAttr(PROCOID, proctup, Anum_pg_proc_proacl, &isNull); if (isNull) { ReleaseSysCache(proctup); return NIL; } Acl *acl = DatumGetAclPCopy(aclDatum); AclItem *aclDat = ACL_DAT(acl); int aclNum = ACL_NUM(acl); List *commands = NIL; ReleaseSysCache(proctup); for (int i = 0; i < aclNum; i++) { commands = list_concat(commands, GenerateGrantOnFunctionQueriesFromAclItem( functionOid, &aclDat[i])); } return commands; } /* * GrantOnForeignServerDDLCommands creates a list of ddl command for replicating the * permissions of roles on distributed foreign servers. */ List * GrantOnForeignServerDDLCommands(Oid serverId) { HeapTuple servertup = SearchSysCache1(FOREIGNSERVEROID, ObjectIdGetDatum(serverId)); bool isNull = true; Datum aclDatum = SysCacheGetAttr(FOREIGNSERVEROID, servertup, Anum_pg_foreign_server_srvacl, &isNull); if (isNull) { ReleaseSysCache(servertup); return NIL; } Acl *aclEntry = DatumGetAclPCopy(aclDatum); AclItem *privileges = ACL_DAT(aclEntry); int numberOfPrivsGranted = ACL_NUM(aclEntry); List *commands = NIL; ReleaseSysCache(servertup); for (int i = 0; i < numberOfPrivsGranted; i++) { commands = list_concat(commands, GenerateGrantOnForeignServerQueriesFromAclItem( serverId, &privileges[i])); } return commands; } /* * GenerateGrantOnForeignServerQueriesFromAclItem generates a query string for * replicating a users permissions on a foreign server. */ List * GenerateGrantOnForeignServerQueriesFromAclItem(Oid serverId, AclItem *aclItem) { /* privileges to be granted */ AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_FOREIGN_SERVER; /* WITH GRANT OPTION clause */ AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_FOREIGN_SERVER; /* * seems unlikely but we check if there is a grant option in the list without the actual permission */ Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE)); Oid granteeOid = aclItem->ai_grantee; List *queries = NIL; /* switch to the role which had granted acl */ queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor)); /* generate the GRANT stmt that will be executed by the grantor role */ if (permissions & ACL_USAGE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_FOREIGN_SERVER, granteeOid, serverId, "USAGE", grants & ACL_USAGE)); queries = lappend(queries, query); } /* reset the role back */ queries = lappend(queries, "RESET ROLE"); return queries; } /* * GenerateGrantOnFunctionQueryFromACLItem generates a query string for replicating a users permissions * on a distributed function. */ List * GenerateGrantOnFunctionQueriesFromAclItem(Oid functionOid, AclItem *aclItem) { AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_FUNCTION; AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_FUNCTION; /* * seems unlikely but we check if there is a grant option in the list without the actual permission */ Assert(!(grants & ACL_EXECUTE) || (permissions & ACL_EXECUTE)); Oid granteeOid = aclItem->ai_grantee; List *queries = NIL; queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor)); if (permissions & ACL_EXECUTE) { char prokind = get_func_prokind(functionOid); ObjectType objectType; if (prokind == PROKIND_FUNCTION) { objectType = OBJECT_FUNCTION; } else if (prokind == PROKIND_PROCEDURE) { objectType = OBJECT_PROCEDURE; } else if (prokind == PROKIND_AGGREGATE) { objectType = OBJECT_AGGREGATE; } else { ereport(ERROR, (errmsg("unsupported prokind"), errdetail("GRANT commands on procedures are propagated only " "for procedures, functions, and aggregates."))); } char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( objectType, granteeOid, functionOid, "EXECUTE", grants & ACL_EXECUTE)); queries = lappend(queries, query); } queries = lappend(queries, "RESET ROLE"); return queries; } /* * GenerateGrantOnFDWQueriesFromAclItem generates a query string for * replicating a users permissions on a foreign data wrapper. */ List * GenerateGrantOnFDWQueriesFromAclItem(Oid FDWId, AclItem *aclItem) { /* privileges to be granted */ AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_FDW; /* WITH GRANT OPTION clause */ AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_FDW; /* * seems unlikely but we check if there is a grant option in the list without the actual permission */ Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE)); Oid granteeOid = aclItem->ai_grantee; List *queries = NIL; /* switch to the role which had granted acl */ queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor)); /* generate the GRANT stmt that will be executed by the grantor role */ if (permissions & ACL_USAGE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_FDW, granteeOid, FDWId, "USAGE", grants & ACL_USAGE)); queries = lappend(queries, query); } /* reset the role back */ queries = lappend(queries, "RESET ROLE"); return queries; } /* * GetAccessPrivObjectForGrantStmt creates an AccessPriv object for the given permission. * It will be used when creating GrantStmt objects. */ static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission) { AccessPriv *accessPriv = makeNode(AccessPriv); accessPriv->priv_name = pstrdup(permission); accessPriv->cols = NULL; return accessPriv; } /* * GrantOnSequenceDDLCommands creates a list of ddl command for replicating the permissions * of roles on distributed sequences. */ static List * GrantOnSequenceDDLCommands(Oid sequenceOid) { HeapTuple seqtup = SearchSysCache1(RELOID, ObjectIdGetDatum(sequenceOid)); bool isNull = false; Datum aclDatum = SysCacheGetAttr(RELOID, seqtup, Anum_pg_class_relacl, &isNull); if (isNull) { ReleaseSysCache(seqtup); return NIL; } Acl *acl = DatumGetAclPCopy(aclDatum); AclItem *aclDat = ACL_DAT(acl); int aclNum = ACL_NUM(acl); List *commands = NIL; ReleaseSysCache(seqtup); for (int i = 0; i < aclNum; i++) { commands = list_concat(commands, GenerateGrantOnSequenceQueriesFromAclItem( sequenceOid, &aclDat[i])); } return commands; } /* * GenerateGrantOnSequenceQueriesFromAclItem generates a query string for replicating a users permissions * on a distributed sequence. */ static List * GenerateGrantOnSequenceQueriesFromAclItem(Oid sequenceOid, AclItem *aclItem) { AclMode permissions = ACLITEM_GET_PRIVS(*aclItem) & ACL_ALL_RIGHTS_SEQUENCE; AclMode grants = ACLITEM_GET_GOPTIONS(*aclItem) & ACL_ALL_RIGHTS_SEQUENCE; /* * seems unlikely but we check if there is a grant option in the list without the actual permission */ Assert(!(grants & ACL_USAGE) || (permissions & ACL_USAGE)); Assert(!(grants & ACL_SELECT) || (permissions & ACL_SELECT)); Assert(!(grants & ACL_UPDATE) || (permissions & ACL_UPDATE)); Oid granteeOid = aclItem->ai_grantee; List *queries = NIL; queries = lappend(queries, GenerateSetRoleQuery(aclItem->ai_grantor)); if (permissions & ACL_USAGE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_SEQUENCE, granteeOid, sequenceOid, "USAGE", grants & ACL_USAGE)); queries = lappend(queries, query); } if (permissions & ACL_SELECT) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_SEQUENCE, granteeOid, sequenceOid, "SELECT", grants & ACL_SELECT)); queries = lappend(queries, query); } if (permissions & ACL_UPDATE) { char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( OBJECT_SEQUENCE, granteeOid, sequenceOid, "UPDATE", grants & ACL_UPDATE)); queries = lappend(queries, query); } queries = lappend(queries, "RESET ROLE"); return queries; } /* * SetLocalEnableMetadataSync sets the enable_metadata_sync locally */ void SetLocalEnableMetadataSync(bool state) { set_config_option("citus.enable_metadata_sync", state == true ? "on" : "off", (superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION, GUC_ACTION_LOCAL, true, 0, false); } static char * GenerateSetRoleQuery(Oid roleOid) { StringInfo buf = makeStringInfo(); appendStringInfo(buf, "SET ROLE %s", quote_identifier(GetUserNameFromId(roleOid, false))); return buf->data; } /* * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger * function, which creates the truncate trigger on the worker. */ TableDDLCommand * TruncateTriggerCreateCommand(Oid relationId) { StringInfo triggerCreateCommand = makeStringInfo(); char *tableName = generate_qualified_relation_name(relationId); appendStringInfo(triggerCreateCommand, "SELECT worker_create_truncate_trigger(%s)", quote_literal_cstr(tableName)); TableDDLCommand *triggerDDLCommand = makeTableDDLCommandString( triggerCreateCommand->data); return triggerDDLCommand; } /* * SchemaOwnerName returns the name of the owner of the specified schema. */ static char * SchemaOwnerName(Oid objectId) { Oid ownerId = InvalidOid; HeapTuple tuple = SearchSysCache1(NAMESPACEOID, ObjectIdGetDatum(objectId)); if (HeapTupleIsValid(tuple)) { ownerId = ((Form_pg_namespace) GETSTRUCT(tuple))->nspowner; } else { ownerId = GetUserId(); } char *ownerName = GetUserNameFromId(ownerId, false); ReleaseSysCache(tuple); return ownerName; } /* * HasMetadataWorkers returns true if any of the workers in the cluster has its * hasmetadata column set to true, which happens when start_metadata_sync_to_node * command is run. */ static bool HasMetadataWorkers(void) { List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) { if (workerNode->hasMetadata) { return true; } } return false; } /* * CreateInterTableRelationshipOfRelationOnWorkers create inter table relationship * for the the given relation id on each worker node with metadata. */ void CreateInterTableRelationshipOfRelationOnWorkers(Oid relationId) { /* if the table is owned by an extension we don't create */ bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); if (tableOwnedByExtension) { return; } List *commandList = InterTableRelationshipOfRelationCommandList(relationId); /* prevent recursive propagation */ SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); const char *command = NULL; foreach_ptr(command, commandList) { SendCommandToWorkersWithMetadata(command); } } /* * InterTableRelationshipOfRelationCommandList returns the command list to create * inter table relationship for the given relation. */ List * InterTableRelationshipOfRelationCommandList(Oid relationId) { /* commands to create foreign key constraints */ List *commandList = GetReferencingForeignConstaintCommands(relationId); /* commands to create partitioning hierarchy */ if (PartitionTable(relationId)) { char *alterTableAttachPartitionCommands = GenerateAlterTableAttachPartitionCommand(relationId); commandList = lappend(commandList, alterTableAttachPartitionCommands); } return commandList; } /* * CreateShellTableOnWorkers creates the shell table on each worker node with metadata * including sequence dependency and truncate triggers. */ static void CreateShellTableOnWorkers(Oid relationId) { if (IsTableOwnedByExtension(relationId)) { return; } List *commandList = list_make1(DISABLE_DDL_PROPAGATION); IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS; IncludeIdentities includeIdentityDefaults = INCLUDE_IDENTITY; bool creatingShellTableOnRemoteNode = true; List *tableDDLCommands = GetFullTableCreationCommands(relationId, includeSequenceDefaults, includeIdentityDefaults, creatingShellTableOnRemoteNode); TableDDLCommand *tableDDLCommand = NULL; foreach_ptr(tableDDLCommand, tableDDLCommands) { Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); } const char *command = NULL; foreach_ptr(command, commandList) { SendCommandToWorkersWithMetadata(command); } } /* * CreateTableMetadataOnWorkers creates the list of commands needed to create the * metadata of the given distributed table and sends these commands to all metadata * workers i.e. workers with hasmetadata=true. Before sending the commands, in order * to prevent recursive propagation, DDL propagation on workers are disabled with a * `SET citus.enable_ddl_propagation TO off;` command. */ static void CreateTableMetadataOnWorkers(Oid relationId) { List *commandList = CitusTableMetadataCreateCommandList(relationId); /* prevent recursive propagation */ SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); /* send the commands one by one */ const char *command = NULL; foreach_ptr(command, commandList) { SendCommandToWorkersWithMetadata(command); } } /* * DetachPartitionCommandList returns list of DETACH commands to detach partitions * of all distributed tables. This function is used for detaching partitions in MX * workers before DROPping distributed partitioned tables in them. Thus, we are * disabling DDL propagation to the beginning of the commands (we are also enabling * DDL propagation at the end of command list to swtich back to original state). As * an extra step, if there are no partitions to DETACH, this function simply returns * empty list to not disable/enable DDL propagation for nothing. */ List * DetachPartitionCommandList(void) { List *detachPartitionCommandList = NIL; List *distributedTableList = CitusTableList(); /* we iterate over all distributed partitioned tables and DETACH their partitions */ CitusTableCacheEntry *cacheEntry = NULL; foreach_ptr(cacheEntry, distributedTableList) { if (!PartitionedTable(cacheEntry->relationId)) { continue; } List *partitionList = PartitionList(cacheEntry->relationId); List *detachCommands = GenerateDetachPartitionCommandRelationIdList(partitionList); detachPartitionCommandList = list_concat(detachPartitionCommandList, detachCommands); } if (list_length(detachPartitionCommandList) == 0) { return NIL; } detachPartitionCommandList = lcons(DISABLE_DDL_PROPAGATION, detachPartitionCommandList); /* * We probably do not need this but as an extra precaution, we are enabling * DDL propagation to switch back to original state. */ detachPartitionCommandList = lappend(detachPartitionCommandList, ENABLE_DDL_PROPAGATION); return detachPartitionCommandList; } /* * SyncNodeMetadataToNodesOptional tries recreating the metadata * snapshot in the metadata workers that are out of sync. * Returns the result of synchronization. * * This function must be called within coordinated transaction * since updates on the pg_dist_node metadata must be rollbacked if anything * goes wrong. */ static NodeMetadataSyncResult SyncNodeMetadataToNodesOptional(void) { NodeMetadataSyncResult result = NODE_METADATA_SYNC_SUCCESS; if (!IsCoordinator()) { return NODE_METADATA_SYNC_SUCCESS; } /* * Request a RowExclusiveLock so we don't run concurrently with other * functions updating pg_dist_node, but allow concurrency with functions * which are just reading from pg_dist_node. */ if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock)) { return NODE_METADATA_SYNC_FAILED_LOCK; } List *syncedWorkerList = NIL; List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) { if (workerNode->hasMetadata && !workerNode->metadataSynced) { bool raiseInterrupts = false; if (!SyncNodeMetadataSnapshotToNode(workerNode, raiseInterrupts)) { ereport(WARNING, (errmsg("failed to sync metadata to %s:%d", workerNode->workerName, workerNode->workerPort))); result = NODE_METADATA_SYNC_FAILED_SYNC; } else { /* we add successfully synced nodes to set metadatasynced column later */ syncedWorkerList = lappend(syncedWorkerList, workerNode); } } } foreach_ptr(workerNode, syncedWorkerList) { SetWorkerColumnOptional(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); /* we fetch the same node again to check if it's synced or not */ WorkerNode *nodeUpdated = FindWorkerNode(workerNode->workerName, workerNode->workerPort); if (!nodeUpdated->metadataSynced) { /* set the result to FAILED to trigger the sync again */ result = NODE_METADATA_SYNC_FAILED_SYNC; } } return result; } /* * SyncNodeMetadataToNodes recreates the node metadata snapshot in all the * metadata workers. * * This function runs within a coordinated transaction since updates on * the pg_dist_node metadata must be rollbacked if anything * goes wrong. */ void SyncNodeMetadataToNodes(void) { EnsureCoordinator(); /* * Request a RowExclusiveLock so we don't run concurrently with other * functions updating pg_dist_node, but allow concurrency with functions * which are just reading from pg_dist_node. */ if (!ConditionalLockRelationOid(DistNodeRelationId(), RowExclusiveLock)) { ereport(ERROR, (errmsg("cannot sync metadata because a concurrent " "metadata syncing operation is in progress"))); } List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) { if (workerNode->hasMetadata) { SetWorkerColumnLocalOnly(workerNode, Anum_pg_dist_node_metadatasynced, BoolGetDatum(true)); bool raiseOnError = true; SyncNodeMetadataSnapshotToNode(workerNode, raiseOnError); } } } /* * SyncNodeMetadataToNodesMain is the main function for syncing node metadata to * MX nodes. It retries until success and then exits. */ void SyncNodeMetadataToNodesMain(Datum main_arg) { Oid databaseOid = DatumGetObjectId(main_arg); /* extension owner is passed via bgw_extra */ Oid extensionOwner = InvalidOid; memcpy_s(&extensionOwner, sizeof(extensionOwner), MyBgworkerEntry->bgw_extra, sizeof(Oid)); pqsignal(SIGTERM, MetadataSyncSigTermHandler); pqsignal(SIGALRM, MetadataSyncSigAlrmHandler); BackgroundWorkerUnblockSignals(); /* connect to database, after that we can actually access catalogs */ BackgroundWorkerInitializeConnectionByOid(databaseOid, extensionOwner, 0); /* make worker recognizable in pg_stat_activity */ pgstat_report_appname(METADATA_SYNC_APP_NAME); bool syncedAllNodes = false; while (!syncedAllNodes) { InvalidateMetadataSystemCache(); StartTransactionCommand(); /* * Some functions in ruleutils.c, which we use to get the DDL for * metadata propagation, require an active snapshot. */ PushActiveSnapshot(GetTransactionSnapshot()); if (!LockCitusExtension()) { ereport(DEBUG1, (errmsg("could not lock the citus extension, " "skipping metadata sync"))); } else if (CheckCitusVersion(DEBUG1) && CitusHasBeenLoaded()) { UseCoordinatedTransaction(); NodeMetadataSyncResult result = SyncNodeMetadataToNodesOptional(); syncedAllNodes = (result == NODE_METADATA_SYNC_SUCCESS); /* we use LISTEN/NOTIFY to wait for metadata syncing in tests */ if (result != NODE_METADATA_SYNC_FAILED_LOCK) { Async_Notify(METADATA_SYNC_CHANNEL, NULL); } } PopActiveSnapshot(); CommitTransactionCommand(); ProcessCompletedNotifies(); if (syncedAllNodes) { break; } /* * If backend is cancelled (e.g. bacause of distributed deadlock), * CHECK_FOR_INTERRUPTS() will raise a cancellation error which will * result in exit(1). */ CHECK_FOR_INTERRUPTS(); /* * SIGTERM is used for when maintenance daemon tries to clean-up * metadata sync daemons spawned by terminated maintenance daemons. */ if (got_SIGTERM) { exit(0); } /* * SIGALRM is used for testing purposes and it simulates an error in metadata * sync daemon. */ if (got_SIGALRM) { elog(ERROR, "Error in metadata sync daemon"); } pg_usleep(MetadataSyncRetryInterval * 1000); } } /* * MetadataSyncSigTermHandler set a flag to request termination of metadata * sync daemon. */ static void MetadataSyncSigTermHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGTERM = true; if (MyProc != NULL) { SetLatch(&MyProc->procLatch); } errno = save_errno; } /* * MetadataSyncSigAlrmHandler set a flag to request error at metadata * sync daemon. This is used for testing purposes. */ static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS) { int save_errno = errno; got_SIGALRM = true; if (MyProc != NULL) { SetLatch(&MyProc->procLatch); } errno = save_errno; } /* * SpawnSyncNodeMetadataToNodes starts a background worker which runs node metadata * sync. On success it returns workers' handle. Otherwise it returns NULL. */ BackgroundWorkerHandle * SpawnSyncNodeMetadataToNodes(Oid database, Oid extensionOwner) { BackgroundWorker worker; BackgroundWorkerHandle *handle = NULL; /* Configure a worker. */ memset(&worker, 0, sizeof(worker)); SafeSnprintf(worker.bgw_name, BGW_MAXLEN, "Citus Metadata Sync: %u/%u", database, extensionOwner); worker.bgw_flags = BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION; worker.bgw_start_time = BgWorkerStart_ConsistentState; /* don't restart, we manage restarts from maintenance daemon */ worker.bgw_restart_time = BGW_NEVER_RESTART; strcpy_s(worker.bgw_library_name, sizeof(worker.bgw_library_name), "citus"); strcpy_s(worker.bgw_function_name, sizeof(worker.bgw_library_name), "SyncNodeMetadataToNodesMain"); worker.bgw_main_arg = ObjectIdGetDatum(MyDatabaseId); memcpy_s(worker.bgw_extra, sizeof(worker.bgw_extra), &extensionOwner, sizeof(Oid)); worker.bgw_notify_pid = MyProcPid; if (!RegisterDynamicBackgroundWorker(&worker, &handle)) { return NULL; } pid_t pid; WaitForBackgroundWorkerStartup(handle, &pid); return handle; } /* * SignalMetadataSyncDaemon signals metadata sync daemons belonging to * the given database. */ void SignalMetadataSyncDaemon(Oid database, int sig) { int backendCount = pgstat_fetch_stat_numbackends(); for (int backend = 1; backend <= backendCount; backend++) { LocalPgBackendStatus *localBeEntry = pgstat_fetch_stat_local_beentry(backend); if (!localBeEntry) { continue; } PgBackendStatus *beStatus = &localBeEntry->backendStatus; if (beStatus->st_databaseid == database && strncmp(beStatus->st_appname, METADATA_SYNC_APP_NAME, BGW_MAXLEN) == 0) { kill(beStatus->st_procpid, sig); } } } /* * ShouldInitiateMetadataSync returns if metadata sync daemon should be initiated. * It sets lockFailure to true if pg_dist_node lock couldn't be acquired for the * check. */ bool ShouldInitiateMetadataSync(bool *lockFailure) { if (!IsCoordinator()) { *lockFailure = false; return false; } Oid distNodeOid = DistNodeRelationId(); if (!ConditionalLockRelationOid(distNodeOid, AccessShareLock)) { *lockFailure = true; return false; } bool shouldSyncMetadata = false; List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerList) { if (workerNode->hasMetadata && !workerNode->metadataSynced) { shouldSyncMetadata = true; break; } } UnlockRelationOid(distNodeOid, AccessShareLock); *lockFailure = false; return shouldSyncMetadata; } /* * citus_internal_add_partition_metadata is an internal UDF to * add a row to pg_dist_partition. */ Datum citus_internal_add_partition_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "relation"); Oid relationId = PG_GETARG_OID(0); PG_ENSURE_ARGNOTNULL(1, "distribution method"); char distributionMethod = PG_GETARG_CHAR(1); PG_ENSURE_ARGNOTNULL(3, "Colocation ID"); int colocationId = PG_GETARG_INT32(3); PG_ENSURE_ARGNOTNULL(4, "replication model"); char replicationModel = PG_GETARG_CHAR(4); text *distributionColumnText = NULL; char *distributionColumnString = NULL; Var *distributionColumnVar = NULL; /* this flag is only valid for citus local tables, so set it to false */ bool autoConverted = false; /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); /* we want to serialize all the metadata changes to this table */ LockRelationOid(relationId, ShareUpdateExclusiveLock); if (!PG_ARGISNULL(2)) { distributionColumnText = PG_GETARG_TEXT_P(2); distributionColumnString = text_to_cstring(distributionColumnText); distributionColumnVar = BuildDistributionKeyFromColumnName(relationId, distributionColumnString, AccessShareLock); Assert(distributionColumnVar != NULL); } if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); if (distributionMethod == DISTRIBUTE_BY_NONE && distributionColumnVar != NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Reference or local tables cannot have " "distribution columns"))); } else if (distributionMethod != DISTRIBUTE_BY_NONE && distributionColumnVar == NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Distribution column cannot be NULL for " "relation \"%s\"", get_rel_name(relationId)))); } /* * Even if the table owner is a malicious user and the partition * metadata is not sane, the user can only affect its own tables. * Given that the user is owner of the table, we should allow. */ EnsurePartitionMetadataIsSane(relationId, distributionMethod, colocationId, replicationModel, distributionColumnVar); } InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumnVar, colocationId, replicationModel, autoConverted); PG_RETURN_VOID(); } /* * EnsurePartitionMetadataIsSane ensures that the input values are safe * for inserting into pg_dist_partition metadata. */ static void EnsurePartitionMetadataIsSane(Oid relationId, char distributionMethod, int colocationId, char replicationModel, Var *distributionColumnVar) { if (!(distributionMethod == DISTRIBUTE_BY_HASH || distributionMethod == DISTRIBUTE_BY_NONE)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Metadata syncing is only allowed for hash, reference " "and local tables:%c", distributionMethod))); } if (colocationId < INVALID_COLOCATION_ID) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Metadata syncing is only allowed for valid " "colocation id values."))); } else if (colocationId != INVALID_COLOCATION_ID && distributionMethod == DISTRIBUTE_BY_HASH) { int count = 1; List *targetColocatedTableList = ColocationGroupTableList(colocationId, count); /* * If we have any colocated hash tables, ensure if they share the * same distribution key properties. */ if (list_length(targetColocatedTableList) >= 1) { Oid targetRelationId = linitial_oid(targetColocatedTableList); EnsureColumnTypeEquality(relationId, targetRelationId, distributionColumnVar, DistPartitionKeyOrError(targetRelationId)); } } if (!(replicationModel == REPLICATION_MODEL_2PC || replicationModel == REPLICATION_MODEL_STREAMING || replicationModel == REPLICATION_MODEL_COORDINATOR)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Metadata syncing is only allowed for " "known replication models."))); } if (distributionMethod == DISTRIBUTE_BY_NONE && !(replicationModel == REPLICATION_MODEL_STREAMING || replicationModel == REPLICATION_MODEL_2PC)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Local or references tables can only have '%c' or '%c' " "as the replication model.", REPLICATION_MODEL_STREAMING, REPLICATION_MODEL_2PC))); } } /* * citus_internal_delete_partition_metadata is an internal UDF to * delete a row in pg_dist_partition. */ Datum citus_internal_delete_partition_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "relation"); Oid relationId = PG_GETARG_OID(0); /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); /* we want to serialize all the metadata changes to this table */ LockRelationOid(relationId, ShareUpdateExclusiveLock); if (!ShouldSkipMetadataChecks()) { EnsureCoordinatorInitiatedOperation(); } DeletePartitionRow(relationId); PG_RETURN_VOID(); } /* * citus_internal_add_shard_metadata is an internal UDF to * add a row to pg_dist_shard. */ Datum citus_internal_add_shard_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "relation"); Oid relationId = PG_GETARG_OID(0); PG_ENSURE_ARGNOTNULL(1, "shard id"); int64 shardId = PG_GETARG_INT64(1); PG_ENSURE_ARGNOTNULL(2, "storage type"); char storageType = PG_GETARG_CHAR(2); text *shardMinValue = NULL; if (!PG_ARGISNULL(3)) { shardMinValue = PG_GETARG_TEXT_P(3); } text *shardMaxValue = NULL; if (!PG_ARGISNULL(4)) { shardMaxValue = PG_GETARG_TEXT_P(4); } /* only owner of the table (or superuser) is allowed to add the Citus metadata */ EnsureTableOwner(relationId); /* we want to serialize all the metadata changes to this table */ LockRelationOid(relationId, ShareUpdateExclusiveLock); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); /* * Even if the table owner is a malicious user and the shard metadata is * not sane, the user can only affect its own tables. Given that the * user is owner of the table, we should allow. */ EnsureShardMetadataIsSane(relationId, shardId, storageType, shardMinValue, shardMaxValue); } InsertShardRow(relationId, shardId, storageType, shardMinValue, shardMaxValue); PG_RETURN_VOID(); } /* * EnsureCoordinatorInitiatedOperation is a helper function which ensures that * the execution is initiated by the coordinator on a worker node. */ static void EnsureCoordinatorInitiatedOperation(void) { /* * We are restricting the operation to only MX workers with the local group id * check. The other two checks are to ensure that the operation is initiated * by the coordinator. */ if (!(IsCitusInternalBackend() || IsRebalancerInternalBackend()) || GetLocalGroupId() == COORDINATOR_GROUP_ID) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("This is an internal Citus function can only be " "used in a distributed transaction"))); } } /* * EnsureShardMetadataIsSane ensures that the input values are safe * for inserting into pg_dist_shard metadata. */ static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storageType, text *shardMinValue, text *shardMaxValue) { if (shardId <= INVALID_SHARD_ID) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Invalid shard id: %ld", shardId))); } if (!(storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_FOREIGN)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Invalid shard storage type: %c", storageType))); } char partitionMethod = PartitionMethodViaCatalog(relationId); if (partitionMethod == DISTRIBUTE_BY_INVALID) { /* connection from the coordinator operating on a shard */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("The relation \"%s\" does not have a valid " "entry in pg_dist_partition.", get_rel_name(relationId)))); } else if (!(partitionMethod == DISTRIBUTE_BY_HASH || partitionMethod == DISTRIBUTE_BY_NONE)) { /* connection from the coordinator operating on a shard */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Metadata syncing is only allowed for hash, " "reference and local tables: %c", partitionMethod))); } List *distShardTupleList = LookupDistShardTuples(relationId); if (partitionMethod == DISTRIBUTE_BY_NONE) { if (shardMinValue != NULL || shardMaxValue != NULL) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shards of reference or local table \"%s\" should " "have NULL shard ranges", relationName))); } else if (list_length(distShardTupleList) != 0) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("relation \"%s\" has already at least one shard, " "adding more is not allowed", relationName))); } } else if (partitionMethod == DISTRIBUTE_BY_HASH) { if (shardMinValue == NULL || shardMaxValue == NULL) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shards of has distributed table \"%s\" " "cannot have NULL shard ranges", relationName))); } char *shardMinValueString = text_to_cstring(shardMinValue); char *shardMaxValueString = text_to_cstring(shardMaxValue); /* pg_strtoint32 does the syntax and out of bound checks for us */ int32 shardMinValueInt = pg_strtoint32(shardMinValueString); int32 shardMaxValueInt = pg_strtoint32(shardMaxValueString); if (shardMinValueInt > shardMaxValueInt) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("shardMinValue=%d is greater than " "shardMaxValue=%d for table \"%s\", which is " "not allowed", shardMinValueInt, shardMaxValueInt, get_rel_name(relationId)))); } /* * We are only dealing with hash distributed tables, that's why we * can hard code data type and typemod. */ const int intervalTypeId = INT4OID; const int intervalTypeMod = -1; Relation distShardRelation = table_open(DistShardRelationId(), AccessShareLock); TupleDesc distShardTupleDesc = RelationGetDescr(distShardRelation); FmgrInfo *shardIntervalCompareFunction = GetFunctionInfo(intervalTypeId, BTREE_AM_OID, BTORDER_PROC); HeapTuple shardTuple = NULL; foreach_ptr(shardTuple, distShardTupleList) { ShardInterval *shardInterval = TupleToShardInterval(shardTuple, distShardTupleDesc, intervalTypeId, intervalTypeMod); Datum firstMin = Int32GetDatum(shardMinValueInt); Datum firstMax = Int32GetDatum(shardMaxValueInt); Datum secondMin = shardInterval->minValue; Datum secondMax = shardInterval->maxValue; Oid collationId = InvalidOid; /* * This is an unexpected case as we are reading the metadata, which has * already been verified for being not NULL. Still, lets be extra * cautious to avoid any crashes. */ if (!shardInterval->minValueExists || !shardInterval->maxValueExists) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shards of has distributed table \"%s\" " "cannot have NULL shard ranges", relationName))); } if (ShardIntervalsOverlapWithParams(firstMin, firstMax, secondMin, secondMax, shardIntervalCompareFunction, collationId)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shard intervals overlap for table \"%s\": " "%ld and %ld", get_rel_name(relationId), shardId, shardInterval->shardId))); } } table_close(distShardRelation, NoLock); } } /* * citus_internal_add_placement_metadata is an internal UDF to * add a row to pg_dist_placement. */ Datum citus_internal_add_placement_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int64 shardId = PG_GETARG_INT64(0); int64 shardLength = PG_GETARG_INT64(1); int32 groupId = PG_GETARG_INT32(2); int64 placementId = PG_GETARG_INT64(3); citus_internal_add_placement_metadata_internal(shardId, shardLength, groupId, placementId); PG_RETURN_VOID(); } /* * citus_internal_add_placement_metadata is an internal UDF to * delete a row from pg_dist_placement. */ Datum citus_internal_delete_placement_metadata(PG_FUNCTION_ARGS) { PG_ENSURE_ARGNOTNULL(0, "placement_id"); int64 placementId = PG_GETARG_INT64(0); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); } DeleteShardPlacementRow(placementId); PG_RETURN_VOID(); } /* * citus_internal_add_placement_metadata_legacy is the old function that will be dropped. */ Datum citus_internal_add_placement_metadata_legacy(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int64 shardId = PG_GETARG_INT64(0); int64 shardLength = PG_GETARG_INT64(2); int32 groupId = PG_GETARG_INT32(3); int64 placementId = PG_GETARG_INT64(4); citus_internal_add_placement_metadata_internal(shardId, shardLength, groupId, placementId); PG_RETURN_VOID(); } /* * citus_internal_add_placement_metadata_internal is the internal function * too insert a row into pg_dist_placement */ void citus_internal_add_placement_metadata_internal(int64 shardId, int64 shardLength, int32 groupId, int64 placementId) { bool missingOk = false; Oid relationId = LookupShardRelationFromCatalog(shardId, missingOk); /* only owner of the table is allowed to modify the metadata */ EnsureTableOwner(relationId); /* we want to serialize all the metadata changes to this table */ LockRelationOid(relationId, ShareUpdateExclusiveLock); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); /* * Even if the table owner is a malicious user, as long as the shard placements * fit into basic requirements of Citus metadata, the user can only affect its * own tables. Given that the user is owner of the table, we should allow. */ EnsureShardPlacementMetadataIsSane(relationId, shardId, placementId, shardLength, groupId); } InsertShardPlacementRow(shardId, placementId, shardLength, groupId); } /* * EnsureShardPlacementMetadataIsSane ensures if the input parameters for * the shard placement metadata is sane. */ static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placementId, int64 shardLength, int32 groupId) { /* we have just read the metadata, so we are sure that the shard exists */ Assert(ShardExists(shardId)); if (placementId <= INVALID_PLACEMENT_ID) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shard placement has invalid placement id " "(%ld) for shard(%ld)", placementId, shardId))); } bool nodeIsInMetadata = false; WorkerNode *workerNode = PrimaryNodeForGroup(groupId, &nodeIsInMetadata); if (!workerNode) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Node with group id %d for shard placement " "%ld does not exist", groupId, shardId))); } } /* * ShouldSkipMetadataChecks returns true if the current user is allowed to * make any */ static bool ShouldSkipMetadataChecks(void) { if (strcmp(EnableManualMetadataChangesForUser, "") != 0) { /* * EnableManualMetadataChangesForUser is a GUC which * can be changed by a super user. We use this GUC as * a safety belt in case the current metadata checks are * too restrictive and the operator can allow users to skip * the checks. */ /* * Make sure that the user exists, and print it to prevent any * optimization skipping the get_role_oid call. */ bool missingOK = false; Oid allowedUserId = get_role_oid(EnableManualMetadataChangesForUser, missingOK); if (allowedUserId == GetUserId()) { return true; } } return false; } /* * citus_internal_update_placement_metadata is an internal UDF to * update a row in pg_dist_placement. */ Datum citus_internal_update_placement_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int64 shardId = PG_GETARG_INT64(0); int32 sourceGroupId = PG_GETARG_INT32(1); int32 targetGroupId = PG_GETARG_INT32(2); ShardPlacement *placement = NULL; if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); if (!ShardExists(shardId)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shard id does not exists: %ld", shardId))); } bool missingOk = false; EnsureShardOwner(shardId, missingOk); /* * This function ensures that the source group exists hence we * call it from this code-block. */ placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId); bool nodeIsInMetadata = false; WorkerNode *workerNode = PrimaryNodeForGroup(targetGroupId, &nodeIsInMetadata); if (!workerNode) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Node with group id %d for shard placement " "%ld does not exist", targetGroupId, shardId))); } } else { placement = ActiveShardPlacementOnGroup(sourceGroupId, shardId); } /* * Updating pg_dist_placement ensures that the node with targetGroupId * exists and this is the only placement on that group. */ if (placement == NULL) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Active placement for shard %ld is not " "found on group:%d", shardId, targetGroupId))); } UpdatePlacementGroupId(placement->placementId, targetGroupId); PG_RETURN_VOID(); } /* * citus_internal_delete_shard_metadata is an internal UDF to * delete a row in pg_dist_shard and corresponding placement rows * from pg_dist_shard_placement. */ Datum citus_internal_delete_shard_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); int64 shardId = PG_GETARG_INT64(0); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); if (!ShardExists(shardId)) { ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Shard id does not exists: %ld", shardId))); } bool missingOk = false; EnsureShardOwner(shardId, missingOk); } List *shardPlacementList = ShardPlacementList(shardId); ShardPlacement *shardPlacement = NULL; foreach_ptr(shardPlacement, shardPlacementList) { DeleteShardPlacementRow(shardPlacement->placementId); } DeleteShardRow(shardId); PG_RETURN_VOID(); } /* * citus_internal_update_relation_colocation is an internal UDF to * delete a row in pg_dist_shard and corresponding placement rows * from pg_dist_shard_placement. */ Datum citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); uint32 targetColocationId = PG_GETARG_UINT32(1); EnsureTableOwner(relationId); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); /* ensure that the table is in pg_dist_partition */ char partitionMethod = PartitionMethodViaCatalog(relationId); if (partitionMethod == DISTRIBUTE_BY_INVALID) { /* connection from the coordinator operating on a shard */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("The relation \"%s\" does not have a valid " "entry in pg_dist_partition.", get_rel_name(relationId)))); } else if (!IsCitusTableType(relationId, HASH_DISTRIBUTED) && !IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED)) { /* connection from the coordinator operating on a shard */ ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), errmsg("Updating colocation ids are only allowed for hash " "and single shard distributed tables: %c", partitionMethod))); } int count = 1; List *targetColocatedTableList = ColocationGroupTableList(targetColocationId, count); if (list_length(targetColocatedTableList) == 0) { /* the table is colocated with none, so nothing to check */ } else { Oid targetRelationId = linitial_oid(targetColocatedTableList); ErrorIfShardPlacementsNotColocated(relationId, targetRelationId); CheckReplicationModel(relationId, targetRelationId); CheckDistributionColumnType(relationId, targetRelationId); } } bool localOnly = true; UpdateRelationColocationGroup(relationId, targetColocationId, localOnly); PG_RETURN_VOID(); } /* * citus_internal_add_colocation_metadata is an internal UDF to * add a row to pg_dist_colocation. */ Datum citus_internal_add_colocation_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureSuperUser(); int colocationId = PG_GETARG_INT32(0); int shardCount = PG_GETARG_INT32(1); int replicationFactor = PG_GETARG_INT32(2); Oid distributionColumnType = PG_GETARG_INT32(3); Oid distributionColumnCollation = PG_GETARG_INT32(4); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); } InsertColocationGroupLocally(colocationId, shardCount, replicationFactor, distributionColumnType, distributionColumnCollation); PG_RETURN_VOID(); } /* * citus_internal_delete_colocation_metadata is an internal UDF to * delte row from pg_dist_colocation. */ Datum citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); EnsureSuperUser(); int colocationId = PG_GETARG_INT32(0); if (!ShouldSkipMetadataChecks()) { /* this UDF is not allowed allowed for executing as a separate command */ EnsureCoordinatorInitiatedOperation(); } DeleteColocationGroupLocally(colocationId); PG_RETURN_VOID(); } /* * citus_internal_add_tenant_schema is an internal UDF to * call InsertTenantSchemaLocally on a remote node. * * None of the parameters are allowed to be NULL. To set the colocation * id to NULL in metadata, use INVALID_COLOCATION_ID. */ Datum citus_internal_add_tenant_schema(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "schema_id"); Oid schemaId = PG_GETARG_OID(0); PG_ENSURE_ARGNOTNULL(1, "colocation_id"); uint32 colocationId = PG_GETARG_INT32(1); InsertTenantSchemaLocally(schemaId, colocationId); PG_RETURN_VOID(); } /* * citus_internal_delete_tenant_schema is an internal UDF to * call DeleteTenantSchemaLocally on a remote node. * * The schemaId parameter is not allowed to be NULL. Morever, input schema is * expected to be dropped already because this function is called from Citus * drop hook and only used to clean up metadata after the schema is dropped. */ Datum citus_internal_delete_tenant_schema(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "schema_id"); Oid schemaId = PG_GETARG_OID(0); DeleteTenantSchemaLocally(schemaId); PG_RETURN_VOID(); } /* * citus_internal_update_none_dist_table_metadata is an internal UDF to * update a row in pg_dist_partition that belongs to given none-distributed * table. */ Datum citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS) { CheckCitusVersion(ERROR); PG_ENSURE_ARGNOTNULL(0, "relation_id"); Oid relationId = PG_GETARG_OID(0); PG_ENSURE_ARGNOTNULL(1, "replication_model"); char replicationModel = PG_GETARG_CHAR(1); PG_ENSURE_ARGNOTNULL(2, "colocation_id"); uint32 colocationId = PG_GETARG_INT32(2); PG_ENSURE_ARGNOTNULL(3, "auto_converted"); bool autoConverted = PG_GETARG_BOOL(3); if (!ShouldSkipMetadataChecks()) { EnsureCoordinatorInitiatedOperation(); } UpdateNoneDistTableMetadata(relationId, replicationModel, colocationId, autoConverted); PG_RETURN_VOID(); } /* * SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker. */ void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation) { char *command = ColocationGroupCreateCommand(colocationId, shardCount, replicationFactor, distributionColumnType, distributionColumnCollation); /* * We require superuser for all pg_dist_colocation operations because we have * no reasonable way of restricting access. */ SendCommandToWorkersWithMetadataViaSuperUser(command); } /* * ColocationGroupCreateCommand returns a command for creating a colocation group. */ static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor, Oid distributionColumnType, Oid distributionColumnCollation) { StringInfo insertColocationCommand = makeStringInfo(); appendStringInfo(insertColocationCommand, "SELECT pg_catalog.citus_internal_add_colocation_metadata(" "%d, %d, %d, %s, %s)", colocationId, shardCount, replicationFactor, RemoteTypeIdExpression(distributionColumnType), RemoteCollationIdExpression(distributionColumnCollation)); return insertColocationCommand->data; } /* * RemoteTypeIdExpression returns an expression in text form that can * be used to obtain the OID of a type on a different node when included * in a query string. */ static char * RemoteTypeIdExpression(Oid typeId) { /* by default, use 0 (InvalidOid) */ char *expression = "0"; /* we also have pg_dist_colocation entries for reference tables */ if (typeId != InvalidOid) { char *typeName = format_type_extended(typeId, -1, FORMAT_TYPE_FORCE_QUALIFY | FORMAT_TYPE_ALLOW_INVALID); /* format_type_extended returns ??? in case of an unknown type */ if (strcmp(typeName, "???") != 0) { StringInfo regtypeExpression = makeStringInfo(); appendStringInfo(regtypeExpression, "%s::regtype", quote_literal_cstr(typeName)); expression = regtypeExpression->data; } } return expression; } /* * RemoteCollationIdExpression returns an expression in text form that can * be used to obtain the OID of a collation on a different node when included * in a query string. */ static char * RemoteCollationIdExpression(Oid colocationId) { /* by default, use 0 (InvalidOid) */ char *expression = "0"; if (colocationId != InvalidOid) { Datum collationIdDatum = ObjectIdGetDatum(colocationId); HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); if (HeapTupleIsValid(collationTuple)) { Form_pg_collation collationform = (Form_pg_collation) GETSTRUCT(collationTuple); char *collationName = NameStr(collationform->collname); char *collationSchemaName = get_namespace_name(collationform->collnamespace); char *qualifiedCollationName = quote_qualified_identifier(collationSchemaName, collationName); StringInfo regcollationExpression = makeStringInfo(); appendStringInfo(regcollationExpression, "%s::regcollation", quote_literal_cstr(qualifiedCollationName)); expression = regcollationExpression->data; } ReleaseSysCache(collationTuple); } return expression; } /* * SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers. */ void SyncDeleteColocationGroupToNodes(uint32 colocationId) { char *command = ColocationGroupDeleteCommand(colocationId); /* * We require superuser for all pg_dist_colocation operations because we have * no reasonable way of restricting access. */ SendCommandToWorkersWithMetadataViaSuperUser(command); } /* * ColocationGroupDeleteCommand returns a command for deleting a colocation group. */ static char * ColocationGroupDeleteCommand(uint32 colocationId) { StringInfo deleteColocationCommand = makeStringInfo(); appendStringInfo(deleteColocationCommand, "SELECT pg_catalog.citus_internal_delete_colocation_metadata(%d)", colocationId); return deleteColocationCommand->data; } /* * TenantSchemaInsertCommand returns a command to call * citus_internal_add_tenant_schema(). */ char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId) { StringInfo command = makeStringInfo(); appendStringInfo(command, "SELECT pg_catalog.citus_internal_add_tenant_schema(%s, %u)", RemoteSchemaIdExpressionById(schemaId), colocationId); return command->data; } /* * TenantSchemaDeleteCommand returns a command to call * citus_internal_delete_tenant_schema(). */ char * TenantSchemaDeleteCommand(char *schemaName) { StringInfo command = makeStringInfo(); appendStringInfo(command, "SELECT pg_catalog.citus_internal_delete_tenant_schema(%s)", RemoteSchemaIdExpressionByName(schemaName)); return command->data; } /* * UpdateNoneDistTableMetadataCommand returns a command to call * citus_internal_update_none_dist_table_metadata(). */ char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel, uint32 colocationId, bool autoConverted) { StringInfo command = makeStringInfo(); appendStringInfo(command, "SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)", RemoteTableIdExpression(relationId), replicationModel, colocationId, autoConverted ? "true" : "false"); return command->data; } /* * AddPlacementMetadataCommand returns a command to call * citus_internal_add_placement_metadata(). */ char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId, uint64 shardLength, int32 groupId) { StringInfo command = makeStringInfo(); appendStringInfo(command, "SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld)", shardId, shardLength, groupId, placementId); return command->data; } /* * DeletePlacementMetadataCommand returns a command to call * citus_internal_delete_placement_metadata(). */ char * DeletePlacementMetadataCommand(uint64 placementId) { StringInfo command = makeStringInfo(); appendStringInfo(command, "SELECT pg_catalog.citus_internal_delete_placement_metadata(%ld)", placementId); return command->data; } /* * RemoteSchemaIdExpressionById returns an expression in text form that * can be used to obtain the OID of the schema with given schema id on a * different node when included in a query string. */ static char * RemoteSchemaIdExpressionById(Oid schemaId) { char *schemaName = get_namespace_name(schemaId); if (schemaName == NULL) { ereport(ERROR, (errmsg("schema with OID %u does not exist", schemaId))); } return RemoteSchemaIdExpressionByName(schemaName); } /* * RemoteSchemaIdExpressionByName returns an expression in text form that * can be used to obtain the OID of the schema with given schema name on a * different node when included in a query string. */ static char * RemoteSchemaIdExpressionByName(char *schemaName) { StringInfo regnamespaceExpr = makeStringInfo(); appendStringInfo(regnamespaceExpr, "%s::regnamespace", quote_literal_cstr(quote_identifier(schemaName))); return regnamespaceExpr->data; } /* * RemoteTableIdExpression returns an expression in text form that * can be used to obtain the OID of given table on a different node * when included in a query string. */ static char * RemoteTableIdExpression(Oid relationId) { StringInfo regclassExpr = makeStringInfo(); appendStringInfo(regclassExpr, "%s::regclass", quote_literal_cstr(generate_qualified_relation_name(relationId))); return regclassExpr->data; } /* * SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata * synced among given node list into metadataSyncContext. */ void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context, List *nodeList) { /* sync is disabled, then no nodes to sync */ if (!EnableMetadataSync) { return; } List *activatedWorkerNodeList = NIL; WorkerNode *node = NULL; foreach_ptr(node, nodeList) { if (NodeIsPrimary(node)) { /* warn if we have coordinator in nodelist */ if (NodeIsCoordinator(node)) { ereport(NOTICE, (errmsg("%s:%d is the coordinator and already contains " "metadata, skipping syncing the metadata", node->workerName, node->workerPort))); continue; } activatedWorkerNodeList = lappend(activatedWorkerNodeList, node); } } context->activatedWorkerNodeList = activatedWorkerNodeList; } /* * EstablishAndSetMetadataSyncBareConnections establishes and sets * connections used throughout nontransactional metadata sync. */ void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context) { Assert(MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL); int connectionFlags = REQUIRE_METADATA_CONNECTION; /* establish bare connections to activated worker nodes */ List *bareConnectionList = NIL; WorkerNode *node = NULL; foreach_ptr(node, context->activatedWorkerNodeList) { MultiConnection *connection = GetNodeUserDatabaseConnection(connectionFlags, node->workerName, node->workerPort, CurrentUserName(), NULL); Assert(connection != NULL); ForceConnectionCloseAtTransactionEnd(connection); bareConnectionList = lappend(bareConnectionList, connection); } context->activatedWorkerBareConnections = bareConnectionList; } /* * CreateMetadataSyncContext creates a context which contains worker connections * and a MemoryContext to be used throughout the metadata sync. * * If we collect commands, connections will not be established as caller's intent * is to collect sync commands. * * If the nodes are newly added before activation, we would not try to unset * metadatasynced in separate transaction during nontransactional metadatasync. */ MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool collectCommands, bool nodesAddedInSameTransaction) { /* should be alive during local transaction during the sync */ MemoryContext context = AllocSetContextCreate(TopTransactionContext, "metadata_sync_context", ALLOCSET_DEFAULT_SIZES); MetadataSyncContext *metadataSyncContext = (MetadataSyncContext *) palloc0( sizeof(MetadataSyncContext)); metadataSyncContext->context = context; metadataSyncContext->transactionMode = MetadataSyncTransMode; metadataSyncContext->collectCommands = collectCommands; metadataSyncContext->collectedCommands = NIL; metadataSyncContext->nodesAddedInSameTransaction = nodesAddedInSameTransaction; /* filter the nodes that needs to be activated from given node list */ SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList); /* * establish connections only for nontransactional mode to prevent connection * open-close for each command */ if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL) { EstablishAndSetMetadataSyncBareConnections(metadataSyncContext); } /* use 2PC coordinated transactions if we operate in transactional mode */ if (MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL) { Use2PCForCoordinatedTransaction(); } return metadataSyncContext; } /* * ResetMetadataSyncMemoryContext resets memory context inside metadataSyncContext, if * we are not collecting commands. */ void ResetMetadataSyncMemoryContext(MetadataSyncContext *context) { if (!MetadataSyncCollectsCommands(context)) { MemoryContextReset(context->context); } } /* * MetadataSyncCollectsCommands returns whether context is used for collecting * commands instead of sending them to workers. */ bool MetadataSyncCollectsCommands(MetadataSyncContext *context) { return context->collectCommands; } /* * SendOrCollectCommandListToActivatedNodes sends the commands to the activated nodes with * bare connections inside metadatacontext or via coordinated connections. * Note that when context only collects commands, we add commands into the context * without sending the commands. */ void SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *commands) { /* do nothing if no commands */ if (commands == NIL) { return; } /* * do not send any command to workers if we collect commands. * Collect commands into metadataSyncContext's collected command * list. */ if (MetadataSyncCollectsCommands(context)) { context->collectedCommands = list_concat(context->collectedCommands, commands); return; } /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *workerNodes = context->activatedWorkerNodeList; SendMetadataCommandListToWorkerListInCoordinatedTransaction(workerNodes, CurrentUserName(), commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { List *workerConnections = context->activatedWorkerBareConnections; SendCommandListToWorkerListWithBareConnections(workerConnections, commands); } else { pg_unreachable(); } } /* * SendOrCollectCommandListToMetadataNodes sends the commands to the metadata nodes with * bare connections inside metadatacontext or via coordinated connections. * Note that when context only collects commands, we add commands into the context * without sending the commands. */ void SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *commands) { /* * do not send any command to workers if we collcet commands. * Collect commands into metadataSyncContext's collected command * list. */ if (MetadataSyncCollectsCommands(context)) { context->collectedCommands = list_concat(context->collectedCommands, commands); return; } /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *metadataNodes = TargetWorkerSetNodeList(NON_COORDINATOR_METADATA_NODES, RowShareLock); SendMetadataCommandListToWorkerListInCoordinatedTransaction(metadataNodes, CurrentUserName(), commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { SendBareCommandListToMetadataWorkers(commands); } else { pg_unreachable(); } } /* * SendOrCollectCommandListToSingleNode sends the commands to the specific worker * indexed by nodeIdx with bare connection inside metadatacontext or via coordinated * connection. Note that when context only collects commands, we add commands into * the context without sending the commands. */ void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *commands, int nodeIdx) { /* * Do not send any command to workers if we collect commands. * Collect commands into metadataSyncContext's collected command * list. */ if (MetadataSyncCollectsCommands(context)) { context->collectedCommands = list_concat(context->collectedCommands, commands); return; } /* send commands to new workers, the current user should be a superuser */ Assert(superuser()); if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL) { List *workerNodes = context->activatedWorkerNodeList; Assert(nodeIdx < list_length(workerNodes)); WorkerNode *node = list_nth(workerNodes, nodeIdx); SendMetadataCommandListToWorkerListInCoordinatedTransaction(list_make1(node), CurrentUserName(), commands); } else if (context->transactionMode == METADATA_SYNC_NON_TRANSACTIONAL) { List *workerConnections = context->activatedWorkerBareConnections; Assert(nodeIdx < list_length(workerConnections)); MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx); List *connectionList = list_make1(workerConnection); SendCommandListToWorkerListWithBareConnections(connectionList, commands); } else { pg_unreachable(); } } /* * WorkerDropAllShellTablesCommand returns command required to drop shell tables * from workers. When singleTransaction is false, we create transaction per shell * table. Otherwise, we drop all shell tables within single transaction. */ char * WorkerDropAllShellTablesCommand(bool singleTransaction) { char *singleTransactionString = (singleTransaction) ? "true" : "false"; StringInfo removeAllShellTablesCommand = makeStringInfo(); appendStringInfo(removeAllShellTablesCommand, WORKER_DROP_ALL_SHELL_TABLES, singleTransactionString); return removeAllShellTablesCommand->data; } /* * WorkerDropSequenceDependencyCommand returns command to drop sequence dependencies for * given table. */ char * WorkerDropSequenceDependencyCommand(Oid relationId) { char *qualifiedTableName = generate_qualified_relation_name(relationId); StringInfo breakSequenceDepCommand = makeStringInfo(); appendStringInfo(breakSequenceDepCommand, BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND, quote_literal_cstr(qualifiedTableName)); return breakSequenceDepCommand->data; } /* * PropagateNodeWideObjectsCommandList is called during node activation to * propagate any object that should be propagated for every node. These are * generally not linked to any distributed object but change system wide behaviour. */ static List * PropagateNodeWideObjectsCommandList(void) { /* collect all commands */ List *ddlCommands = NIL; if (EnableAlterRoleSetPropagation) { /* * Get commands for database and postgres wide settings. Since these settings are not * linked to any role that can be distributed we need to distribute them seperately */ List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid); ddlCommands = list_concat(ddlCommands, alterRoleSetCommands); } return ddlCommands; } /* * SyncDistributedObjects sync the distributed objects to the nodes in metadataSyncContext * with transactional or nontransactional mode according to transactionMode inside * metadataSyncContext. * * Transactions should be ordered like below: * - Nodewide objects (only roles for now), * - Deletion of sequence and shell tables and metadata entries * - All dependencies (e.g., types, schemas, sequences) and all shell distributed * table and their pg_dist_xx metadata entries * - Inter relation between those shell tables * * Note that we do not create the distributed dependencies on the coordinator * since all the dependencies should be present in the coordinator already. */ void SyncDistributedObjects(MetadataSyncContext *context) { if (context->activatedWorkerNodeList == NIL) { return; } EnsureSequentialModeMetadataOperations(); Assert(ShouldPropagate()); /* Send systemwide objects, only roles for now */ SendNodeWideObjectsSyncCommands(context); /* * Break dependencies between sequences-shell tables, then remove shell tables, * and metadata tables respectively. * We should delete shell tables before metadata entries as we look inside * pg_dist_partition to figure out shell tables. */ SendShellTableDeletionCommands(context); SendMetadataDeletionCommands(context); /* * Commands to insert pg_dist_colocation entries. * Replicating dist objects and their metadata depends on this step. */ SendColocationMetadataCommands(context); /* * Replicate all objects of the pg_dist_object to the remote node and * create metadata entries for Citus tables (pg_dist_shard, pg_dist_shard_placement, * pg_dist_partition, pg_dist_object). */ SendDependencyCreationCommands(context); SendDistTableMetadataCommands(context); SendDistObjectCommands(context); /* * Commands to insert pg_dist_schema entries. * * Need to be done after syncing distributed objects because the schemas * need to exist on the worker. */ SendTenantSchemaMetadataCommands(context); /* * After creating each table, handle the inter table relationship between * those tables. */ SendInterTableRelationshipCommands(context); } /* * SendNodeWideObjectsSyncCommands sends systemwide objects to workers with * transactional or nontransactional mode according to transactionMode inside * metadataSyncContext. */ void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context) { /* propagate node wide objects. It includes only roles for now. */ List *commandList = PropagateNodeWideObjectsCommandList(); if (commandList == NIL) { return; } commandList = lcons(DISABLE_DDL_PROPAGATION, commandList); commandList = lappend(commandList, ENABLE_DDL_PROPAGATION); SendOrCollectCommandListToActivatedNodes(context, commandList); } /* * SendShellTableDeletionCommands sends sequence, and shell table deletion * commands to workers with transactional or nontransactional mode according to * transactionMode inside metadataSyncContext. */ void SendShellTableDeletionCommands(MetadataSyncContext *context) { /* break all sequence deps for citus tables */ char *breakSeqDepsCommand = BREAK_ALL_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND; SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand)); /* remove shell tables */ bool singleTransaction = (context->transactionMode == METADATA_SYNC_TRANSACTIONAL); char *dropShellTablesCommand = WorkerDropAllShellTablesCommand(singleTransaction); SendOrCollectCommandListToActivatedNodes(context, list_make1(dropShellTablesCommand)); } /* * SendMetadataDeletionCommands sends metadata entry deletion commands to workers * with transactional or nontransactional mode according to transactionMode inside * metadataSyncContext. */ void SendMetadataDeletionCommands(MetadataSyncContext *context) { /* remove pg_dist_partition entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PARTITIONS)); /* remove pg_dist_shard entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_SHARDS)); /* remove pg_dist_placement entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PLACEMENTS)); /* remove pg_dist_object entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS)); /* remove pg_dist_colocation entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_COLOCATION)); /* remove pg_dist_schema entries */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_TENANT_SCHEMAS)); } /* * SendColocationMetadataCommands sends colocation metadata with transactional or * nontransactional mode according to transactionMode inside metadataSyncContext. */ void SendColocationMetadataCommands(MetadataSyncContext *context) { ScanKeyData scanKey[1]; int scanKeyCount = 0; Relation relation = table_open(DistColocationRelationId(), AccessShareLock); SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); HeapTuple nextTuple = NULL; while (true) { ResetMetadataSyncMemoryContext(context); nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { break; } StringInfo colocationGroupCreateCommand = makeStringInfo(); appendStringInfo(colocationGroupCreateCommand, "WITH colocation_group_data (colocationid, shardcount, " "replicationfactor, distributioncolumntype, " "distributioncolumncollationname, " "distributioncolumncollationschema) AS (VALUES "); Form_pg_dist_colocation colocationForm = (Form_pg_dist_colocation) GETSTRUCT(nextTuple); appendStringInfo(colocationGroupCreateCommand, "(%d, %d, %d, %s, ", colocationForm->colocationid, colocationForm->shardcount, colocationForm->replicationfactor, RemoteTypeIdExpression(colocationForm->distributioncolumntype)); /* * For collations, include the names in the VALUES section and then * join with pg_collation. */ Oid distributionColumCollation = colocationForm->distributioncolumncollation; if (distributionColumCollation != InvalidOid) { Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation); HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum); if (HeapTupleIsValid(collationTuple)) { Form_pg_collation collationform = (Form_pg_collation) GETSTRUCT(collationTuple); char *collationName = NameStr(collationform->collname); char *collationSchemaName = get_namespace_name(collationform->collnamespace); appendStringInfo(colocationGroupCreateCommand, "%s, %s)", quote_literal_cstr(collationName), quote_literal_cstr(collationSchemaName)); ReleaseSysCache(collationTuple); } else { appendStringInfo(colocationGroupCreateCommand, "NULL, NULL)"); } } else { appendStringInfo(colocationGroupCreateCommand, "NULL, NULL)"); } appendStringInfo(colocationGroupCreateCommand, ") SELECT pg_catalog.citus_internal_add_colocation_metadata(" "colocationid, shardcount, replicationfactor, " "distributioncolumntype, coalesce(c.oid, 0)) " "FROM colocation_group_data d LEFT JOIN pg_collation c " "ON (d.distributioncolumncollationname = c.collname " "AND d.distributioncolumncollationschema::regnamespace" " = c.collnamespace)"); List *commandList = list_make1(colocationGroupCreateCommand->data); SendOrCollectCommandListToActivatedNodes(context, commandList); } MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); table_close(relation, AccessShareLock); } /* * SendTenantSchemaMetadataCommands sends tenant schema metadata entries with * transactional or nontransactional mode according to transactionMode inside * metadataSyncContext. */ void SendTenantSchemaMetadataCommands(MetadataSyncContext *context) { ScanKeyData scanKey[1]; int scanKeyCount = 0; Relation pgDistTenantSchema = table_open(DistTenantSchemaRelationId(), AccessShareLock); SysScanDesc scanDesc = systable_beginscan(pgDistTenantSchema, InvalidOid, false, NULL, scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); HeapTuple heapTuple = NULL; while (true) { ResetMetadataSyncMemoryContext(context); heapTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(heapTuple)) { break; } Form_pg_dist_schema tenantSchemaForm = (Form_pg_dist_schema) GETSTRUCT(heapTuple); StringInfo insertTenantSchemaCommand = makeStringInfo(); appendStringInfo(insertTenantSchemaCommand, "SELECT pg_catalog.citus_internal_add_tenant_schema(%s, %u)", RemoteSchemaIdExpressionById(tenantSchemaForm->schemaid), tenantSchemaForm->colocationid); List *commandList = list_make1(insertTenantSchemaCommand->data); SendOrCollectCommandListToActivatedNodes(context, commandList); } MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); table_close(pgDistTenantSchema, AccessShareLock); } /* * SendDependencyCreationCommands sends dependency creation commands to workers * with transactional or nontransactional mode according to transactionMode * inside metadataSyncContext. */ void SendDependencyCreationCommands(MetadataSyncContext *context) { /* disable ddl propagation */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DISABLE_DDL_PROPAGATION)); MemoryContext oldContext = MemoryContextSwitchTo(context->context); /* collect all dependencies in creation order and get their ddl commands */ List *dependencies = GetDistributedObjectAddressList(); /* * Depending on changes in the environment, such as the enable_metadata_sync guc * there might be objects in the distributed object address list that should currently * not be propagated by citus as they are 'not supported'. */ dependencies = FilterObjectAddressListByPredicate(dependencies, &SupportedDependencyByCitus); dependencies = OrderObjectAddressListInDependencyOrder(dependencies); /* * We need to create a subcontext as we reset the context after each dependency * creation but we want to preserve all dependency objects at metadataSyncContext. */ MemoryContext commandsContext = AllocSetContextCreate(context->context, "dependency commands context", ALLOCSET_DEFAULT_SIZES); MemoryContextSwitchTo(commandsContext); ObjectAddress *dependency = NULL; foreach_ptr(dependency, dependencies) { if (!MetadataSyncCollectsCommands(context)) { MemoryContextReset(commandsContext); } if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL)) { /* * We expect extension-owned objects to be created as a result * of the extension being created. */ continue; } /* dependency creation commands */ List *ddlCommands = GetAllDependencyCreateDDLCommands(list_make1(dependency)); SendOrCollectCommandListToActivatedNodes(context, ddlCommands); } MemoryContextSwitchTo(oldContext); if (!MetadataSyncCollectsCommands(context)) { MemoryContextDelete(commandsContext); } ResetMetadataSyncMemoryContext(context); /* enable ddl propagation */ SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); } /* * SendDistTableMetadataCommands sends commands related to pg_dist_shard and, * pg_dist_shard_placement entries to workers with transactional or nontransactional * mode according to transactionMode inside metadataSyncContext. */ void SendDistTableMetadataCommands(MetadataSyncContext *context) { ScanKeyData scanKey[1]; int scanKeyCount = 0; Relation relation = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDesc = RelationGetDescr(relation); SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); HeapTuple nextTuple = NULL; while (true) { ResetMetadataSyncMemoryContext(context); nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { break; } /* * Create Citus table metadata commands (pg_dist_shard, pg_dist_shard_placement, * pg_dist_partition). Only Citus tables have shard metadata. */ Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc); if (!ShouldSyncTableMetadata(relationId)) { continue; } List *commandList = CitusTableMetadataCreateCommandList(relationId); SendOrCollectCommandListToActivatedNodes(context, commandList); } MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); table_close(relation, AccessShareLock); } /* * SendDistObjectCommands sends commands related to pg_dist_object entries to * workers with transactional or nontransactional mode according to transactionMode * inside metadataSyncContext. */ void SendDistObjectCommands(MetadataSyncContext *context) { ScanKeyData scanKey[1]; int scanKeyCount = 0; Relation relation = table_open(DistObjectRelationId(), AccessShareLock); TupleDesc tupleDesc = RelationGetDescr(relation); SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); HeapTuple nextTuple = NULL; while (true) { ResetMetadataSyncMemoryContext(context); nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { break; } Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(nextTuple); ObjectAddress *address = palloc(sizeof(ObjectAddress)); ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid, pg_dist_object->objsubid); bool distributionArgumentIndexIsNull = false; Datum distributionArgumentIndexDatum = heap_getattr(nextTuple, Anum_pg_dist_object_distribution_argument_index, tupleDesc, &distributionArgumentIndexIsNull); int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum); bool colocationIdIsNull = false; Datum colocationIdDatum = heap_getattr(nextTuple, Anum_pg_dist_object_colocationid, tupleDesc, &colocationIdIsNull); int32 colocationId = DatumGetInt32(colocationIdDatum); bool forceDelegationIsNull = false; Datum forceDelegationDatum = heap_getattr(nextTuple, Anum_pg_dist_object_force_delegation, tupleDesc, &forceDelegationIsNull); bool forceDelegation = DatumGetBool(forceDelegationDatum); if (distributionArgumentIndexIsNull) { distributionArgumentIndex = INVALID_DISTRIBUTION_ARGUMENT_INDEX; } if (colocationIdIsNull) { colocationId = INVALID_COLOCATION_ID; } if (forceDelegationIsNull) { forceDelegation = NO_FORCE_PUSHDOWN; } char *workerMetadataUpdateCommand = MarkObjectsDistributedCreateCommand(list_make1(address), list_make1_int(distributionArgumentIndex), list_make1_int(colocationId), list_make1_int(forceDelegation)); SendOrCollectCommandListToActivatedNodes(context, list_make1(workerMetadataUpdateCommand)); } MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); relation_close(relation, NoLock); } /* * SendInterTableRelationshipCommands sends inter-table relationship commands * (e.g. constraints, attach partitions) to workers with transactional or * nontransactional mode per inter table relationship according to transactionMode * inside metadataSyncContext. */ void SendInterTableRelationshipCommands(MetadataSyncContext *context) { /* disable ddl propagation */ SendOrCollectCommandListToActivatedNodes(context, list_make1(DISABLE_DDL_PROPAGATION)); ScanKeyData scanKey[1]; int scanKeyCount = 0; Relation relation = table_open(DistPartitionRelationId(), AccessShareLock); TupleDesc tupleDesc = RelationGetDescr(relation); SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL, scanKeyCount, scanKey); MemoryContext oldContext = MemoryContextSwitchTo(context->context); HeapTuple nextTuple = NULL; while (true) { ResetMetadataSyncMemoryContext(context); nextTuple = systable_getnext(scanDesc); if (!HeapTupleIsValid(nextTuple)) { break; } Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc); if (!ShouldSyncTableMetadata(relationId)) { continue; } /* * Skip foreign key and partition creation when the Citus table is * owned by an extension. */ if (IsTableOwnedByExtension(relationId)) { continue; } List *commandList = InterTableRelationshipOfRelationCommandList(relationId); SendOrCollectCommandListToActivatedNodes(context, commandList); } MemoryContextSwitchTo(oldContext); systable_endscan(scanDesc); table_close(relation, AccessShareLock); /* enable ddl propagation */ SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION)); }