/*------------------------------------------------------------------------- * * reference_table_utils.c * * Declarations for public utility functions related to reference tables. * * Copyright (c) 2014-2016, Citus Data, Inc. * *------------------------------------------------------------------------- */ #include "postgres.h" #include "miscadmin.h" #include "access/heapam.h" #include "access/htup_details.h" #include "access/genam.h" #include "distributed/colocation_utils.h" #include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/master_metadata_utility.h" #include "distributed/metadata_cache.h" #include "distributed/metadata_sync.h" #include "distributed/multi_logical_planner.h" #include "distributed/reference_table_utils.h" #include "distributed/resource_lock.h" #include "distributed/shardinterval_utils.h" #include "distributed/transaction_management.h" #include "distributed/worker_manager.h" #include "distributed/worker_transaction.h" #include "utils/fmgroids.h" #include "utils/lsyscache.h" #include "utils/rel.h" /* local function forward declarations */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId); static void ReplicateShardToAllWorkers(ShardInterval *shardInterval); static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort); static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId); static int CompareOids(const void *leftElement, const void *rightElement); /* exports for SQL callable functions */ PG_FUNCTION_INFO_V1(upgrade_to_reference_table); /* * upgrade_to_reference_table accepts a broadcast table which has only one shard and * replicates it across all nodes to create a reference table. It also modifies related * metadata to mark the table as reference. */ Datum upgrade_to_reference_table(PG_FUNCTION_ARGS) { Oid relationId = PG_GETARG_OID(0); List *shardIntervalList = NIL; ShardInterval *shardInterval = NULL; uint64 shardId = INVALID_SHARD_ID; DistTableCacheEntry *tableEntry = NULL; EnsureCoordinator(); CheckCitusVersion(ERROR); if (!IsDistributedTable(relationId)) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot upgrade to reference table"), errdetail("Relation \"%s\" is not distributed.", relationName), errhint("Instead, you can use; " "create_reference_table('%s');", relationName))); } tableEntry = DistributedTableCacheEntry(relationId); if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot upgrade to reference table"), errdetail("Relation \"%s\" is already a reference table", relationName))); } if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("cannot upgrade to reference table"), errdetail("Upgrade is only supported for statement-based " "replicated tables but \"%s\" is streaming replicated", relationName))); } shardIntervalList = LoadShardIntervalList(relationId); if (list_length(shardIntervalList) != 1) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot upgrade to reference table"), errdetail("Relation \"%s\" shard count is not one. Only " "relations with one shard can be upgraded to " "reference tables.", relationName))); } shardInterval = (ShardInterval *) linitial(shardIntervalList); shardId = shardInterval->shardId; LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardResource(shardId, ExclusiveLock); ReplicateSingleShardTableToAllWorkers(relationId); PG_RETURN_VOID(); } /* * ReplicateAllReferenceTablesToNode function finds all reference tables and * replicates them to the given worker node. It also modifies pg_dist_colocation * table to update the replication factor column when necessary. This function * skips reference tables if that node already has healthy placement of that * reference table to prevent unnecessary data transfer. */ void ReplicateAllReferenceTablesToNode(char *nodeName, int nodePort) { List *referenceTableList = ReferenceTableOidList(); ListCell *referenceTableCell = NULL; List *workerNodeList = ActiveWorkerNodeList(); uint32 workerCount = 0; Oid firstReferenceTableId = InvalidOid; uint32 referenceTableColocationId = INVALID_COLOCATION_ID; /* if there is no reference table, we do not need to do anything */ if (list_length(referenceTableList) == 0) { return; } /* * We sort the reference table list to prevent deadlocks in concurrent * ReplicateAllReferenceTablesToAllNodes calls. */ referenceTableList = SortList(referenceTableList, CompareOids); foreach(referenceTableCell, referenceTableList) { Oid referenceTableId = lfirst_oid(referenceTableCell); List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; LockShardDistributionMetadata(shardId, ExclusiveLock); ReplicateShardToNode(shardInterval, nodeName, nodePort); } /* * After replicating reference tables, we will update replication factor column for * colocation group of reference tables so that worker count will be equal to * replication factor again. */ workerCount = list_length(workerNodeList); firstReferenceTableId = linitial_oid(referenceTableList); referenceTableColocationId = TableColocationId(firstReferenceTableId); UpdateColocationGroupReplicationFactor(referenceTableColocationId, workerCount); } /* * ReplicateSingleShardTableToAllWorkers accepts a broadcast table and replicates it to * all worker nodes. It assumes that caller of this function ensures that given broadcast * table has only one shard. */ static void ReplicateSingleShardTableToAllWorkers(Oid relationId) { List *shardIntervalList = LoadShardIntervalList(relationId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList( shardInterval); if (foreignConstraintCommandList != NIL || TableReferenced(relationId)) { char *relationName = get_rel_name(relationId); ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg("cannot upgrade to reference table"), errdetail("Relation \"%s\" is part of a foreign constraint. " "Foreign key constraints are not allowed " "from or to reference tables.", relationName))); } /* * ReplicateShardToAllWorkers function opens separate transactions (i.e., not part * of any coordinated transactions) to each worker and replicates given shard to all * workers. If a worker already has a healthy replica of given shard, it skips that * worker to prevent copying unnecessary data. */ ReplicateShardToAllWorkers(shardInterval); /* * We need to update metadata tables to mark this table as reference table. We modify * pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in * ConvertToReferenceTableMetadata function. */ ConvertToReferenceTableMetadata(relationId, shardId); /* * After the table has been officially marked as a reference table, we need to create * the reference table itself and insert its pg_dist_partition, pg_dist_shard and * existing pg_dist_shard_placement rows. */ CreateTableMetadataOnWorkers(relationId); } /* * ReplicateShardToAllWorkers function replicates given shard to the all worker nodes * in separate transactions. While replicating, it only replicates the shard to the * workers which does not have a healthy replica of the shard. However, this function * does not obtain any lock on shard resource and shard metadata. It is caller's * responsibility to take those locks. */ static void ReplicateShardToAllWorkers(ShardInterval *shardInterval) { /* we do not use pgDistNode, we only obtain a lock on it to prevent modifications */ Relation pgDistNode = heap_open(DistNodeRelationId(), AccessShareLock); List *workerNodeList = ActiveWorkerNodeList(); ListCell *workerNodeCell = NULL; /* * We will iterate over all worker nodes and if healthy placement is not exist at * given node we will copy the shard to that node. Then we will also modify * the metadata to reflect newly copied shard. */ workerNodeList = SortList(workerNodeList, CompareWorkerNodes); foreach(workerNodeCell, workerNodeList) { WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); char *nodeName = workerNode->workerName; uint32 nodePort = workerNode->workerPort; ReplicateShardToNode(shardInterval, nodeName, nodePort); } heap_close(pgDistNode, NoLock); } /* * ReplicateShardToNode function replicates given shard to the given worker node * in a separate transaction. While replicating, it only replicates the shard to the * workers which does not have a healthy replica of the shard. This function also modifies * metadata by inserting/updating related rows in pg_dist_shard_placement. */ static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) { uint64 shardId = shardInterval->shardId; bool missingOk = false; ShardPlacement *sourceShardPlacement = FinalizedShardPlacement(shardId, missingOk); char *srcNodeName = sourceShardPlacement->nodeName; uint32 srcNodePort = sourceShardPlacement->nodePort; List *ddlCommandList = CopyShardCommandList(shardInterval, srcNodeName, srcNodePort); List *shardPlacementList = ShardPlacementList(shardId); bool missingWorkerOk = true; ShardPlacement *targetPlacement = SearchShardPlacementInList(shardPlacementList, nodeName, nodePort, missingWorkerOk); char *tableOwner = TableOwner(shardInterval->relationId); /* * Although this function is used for reference tables and reference table shard * placements always have shardState = FILE_FINALIZED, in case of an upgrade of * a non-reference table to reference table, unhealty placements may exist. In * this case, we repair the shard placement and update its state in * pg_dist_shard_placement table. */ if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) { uint64 placementId = 0; ereport(NOTICE, (errmsg("Replicating reference table \"%s\" to the node %s:%d", get_rel_name(shardInterval->relationId), nodeName, nodePort))); SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, ddlCommandList); if (targetPlacement == NULL) { placementId = GetNextPlacementId(); InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0, nodeName, nodePort); } else { placementId = targetPlacement->placementId; UpdateShardPlacementState(placementId, FILE_FINALIZED); } /* * Although ReplicateShardToAllWorkers is used only for reference tables, * during the upgrade phase, the placements are created before the table is * marked as a reference table. All metadata (including the placement * metadata) will be copied to workers after all reference table changed * are finished. */ if (ShouldSyncTableMetadata(shardInterval->relationId)) { char *placementCommand = PlacementUpsertCommand(shardId, placementId, FILE_FINALIZED, 0, nodeName, nodePort); SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand); } } } /* * ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to * reference table metadata. To do this, this function updates pg_dist_partition, * pg_dist_colocation and pg_dist_shard. This function assumes that caller ensures that * given broadcast table has only one shard. */ static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId) { uint32 currentColocationId = TableColocationId(relationId); uint32 newColocationId = CreateReferenceTableColocationId(); Var *distributionColumn = NULL; char shardStorageType = ShardStorageType(relationId); text *shardMinValue = NULL; text *shardMaxValue = NULL; /* delete old metadata rows */ DeletePartitionRow(relationId); DeleteColocationGroupIfNoTablesBelong(currentColocationId); DeleteShardRow(shardId); /* insert new metadata rows */ InsertIntoPgDistPartition(relationId, DISTRIBUTE_BY_NONE, distributionColumn, newColocationId, REPLICATION_MODEL_2PC); InsertShardRow(relationId, shardId, shardStorageType, shardMinValue, shardMaxValue); } /* * CreateReferenceTableColocationId creates a new co-location id for reference tables and * writes it into pg_dist_colocation, then returns the created co-location id. Since there * can be only one colocation group for all kinds of reference tables, if a co-location id * is already created for reference tables, this function returns it without creating * anything. */ uint32 CreateReferenceTableColocationId() { uint32 colocationId = INVALID_COLOCATION_ID; List *workerNodeList = ActiveWorkerNodeList(); int shardCount = 1; int replicationFactor = list_length(workerNodeList); Oid distributionColumnType = InvalidOid; /* check for existing colocations */ colocationId = ColocationId(shardCount, replicationFactor, distributionColumnType); if (colocationId == INVALID_COLOCATION_ID) { colocationId = CreateColocationGroup(shardCount, replicationFactor, distributionColumnType); } return colocationId; } /* * DeleteAllReferenceTablePlacementsFromNode function iterates over list of reference * tables and deletes all reference table placements from pg_dist_shard_placement table * for given worker node. However, it does not modify replication factor of the colocation * group of reference tables. It is caller's responsibility to do that if it is necessary. */ void DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort) { List *referenceTableList = ReferenceTableOidList(); ListCell *referenceTableCell = NULL; /* if there are no reference tables, we do not need to do anything */ if (list_length(referenceTableList) == 0) { return; } /* * We sort the reference table list to prevent deadlocks in concurrent * DeleteAllReferenceTablePlacementsFromNode calls. */ referenceTableList = SortList(referenceTableList, CompareOids); foreach(referenceTableCell, referenceTableList) { Oid referenceTableId = lfirst_oid(referenceTableCell); List *shardIntervalList = LoadShardIntervalList(referenceTableId); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); uint64 shardId = shardInterval->shardId; uint64 placementId = INVALID_PLACEMENT_ID; StringInfo deletePlacementCommand = makeStringInfo(); LockShardDistributionMetadata(shardId, ExclusiveLock); placementId = DeleteShardPlacementRow(shardId, workerName, workerPort); appendStringInfo(deletePlacementCommand, "DELETE FROM pg_dist_shard_placement WHERE placementid=%lu", placementId); SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data); } } /* * ReferenceTableOidList function scans pg_dist_partition to create a list of all * reference tables. To create the list, it performs sequential scan. Since it is not * expected that this function will be called frequently, it is OK not to use index scan. * If this function becomes performance bottleneck, it is possible to modify this function * to perform index scan. */ List * ReferenceTableOidList() { List *distTableOidList = DistTableOidList(); ListCell *distTableOidCell = NULL; List *referenceTableList = NIL; foreach(distTableOidCell, distTableOidList) { DistTableCacheEntry *cacheEntry = NULL; Oid relationId = lfirst_oid(distTableOidCell); cacheEntry = DistributedTableCacheEntry(relationId); if (cacheEntry->partitionMethod == DISTRIBUTE_BY_NONE) { referenceTableList = lappend_oid(referenceTableList, relationId); } } return referenceTableList; } /* CompareOids is a comparison function for sort shard oids */ static int CompareOids(const void *leftElement, const void *rightElement) { Oid *leftId = (Oid *) leftElement; Oid *rightId = (Oid *) rightElement; if (*leftId > *rightId) { return 1; } else if (*leftId < *rightId) { return -1; } else { return 0; } }