citus/src/backend/distributed/metadata/metadata_sync.c

707 lines
22 KiB
C

/*-------------------------------------------------------------------------
*
* metadata_sync.c
*
* Routines for synchronizing metadata to all workers.
*
* Copyright (c) 2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include <sys/stat.h>
#include <unistd.h>
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_foreign_server.h"
#include "distributed/citus_ruleutils.h"
#include "distributed/distribution_column.h"
#include "distributed/master_metadata_utility.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_node.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_transaction.h"
#include "foreign/foreign.h"
#include "nodes/pg_list.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/lsyscache.h"
#include "utils/memutils.h"
static char * LocalGroupIdUpdateCommand(uint32 groupId);
static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata);
static char * TruncateTriggerCreateCommand(Oid relationId);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
/*
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting MX-table queries. The function first sets the localGroupId of the
* worker so that the worker knows which tuple in pg_dist_node table represents itself.
* After that, SQL statetemens for re-creating metadata about mx distributed
* tables are sent to the worker. Finally, the hasmetadata column of the target node in
* pg_dist_node is marked as true.
*/
Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
char *extensionOwner = CitusExtensionOwnerName();
WorkerNode *workerNode = NULL;
char *localGroupIdUpdateCommand = NULL;
List *recreateMetadataSnapshotCommandList = NIL;
List *dropMetadataCommandList = NIL;
List *createMetadataCommandList = NIL;
EnsureSchemaNode();
EnsureSuperUser();
PreventTransactionChain(true, "start_metadata_sync_to_node");
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("you cannot sync metadata to a non-existent node"),
errhint("First, add the node with SELECT master_add_node(%s,%d)",
nodeNameString, nodePort)));
}
/* generate and add the local group id's update query */
localGroupIdUpdateCommand = LocalGroupIdUpdateCommand(workerNode->groupId);
/* generate the queries which drop the metadata */
dropMetadataCommandList = MetadataDropCommands();
/* generate the queries which create the metadata from scratch */
createMetadataCommandList = MetadataCreateCommands();
recreateMetadataSnapshotCommandList = lappend(recreateMetadataSnapshotCommandList,
localGroupIdUpdateCommand);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
dropMetadataCommandList);
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
createMetadataCommandList);
/*
* Send the snapshot recreation commands in a single remote transaction and
* error out in any kind of failure. Note that it is not required to send
* createMetadataSnapshotCommandList in the same transaction that we send
* nodeDeleteCommand and nodeInsertCommand commands below.
*/
SendCommandListToWorkerInSingleTransaction(nodeNameString, nodePort, extensionOwner,
recreateMetadataSnapshotCommandList);
MarkNodeHasMetadata(nodeNameString, nodePort, true);
PG_RETURN_VOID();
}
/*
* stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
* to false in pg_dist_node table, thus indicating that the specified worker node does not
* receive DDL changes anymore and cannot be used for issuing mx queries.
*/
Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
{
text *nodeName = PG_GETARG_TEXT_P(0);
int32 nodePort = PG_GETARG_INT32(1);
char *nodeNameString = text_to_cstring(nodeName);
WorkerNode *workerNode = NULL;
EnsureSchemaNode();
EnsureSuperUser();
workerNode = FindWorkerNode(nodeNameString, nodePort);
if (workerNode == NULL)
{
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("node (%s,%d) does not exist", nodeNameString, nodePort)));
}
MarkNodeHasMetadata(nodeNameString, nodePort, false);
PG_RETURN_VOID();
}
/*
* ShouldSyncTableMetadata checks if a distributed table has streaming replication model
* and hash distribution. In that case the distributed table is considered an MX table,
* and its metadata is required to exist on the worker nodes.
*/
bool
ShouldSyncTableMetadata(Oid relationId)
{
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
bool usesStreamingReplication =
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
if (usesStreamingReplication && usesHashDistribution)
{
return true;
}
else
{
return false;
}
}
/*
* MetadataCreateCommands returns list of queries that are
* required to create the current metadata snapshot of the node that the
* function is called. The metadata snapshot commands includes the
* following queries:
*
* (i) Query that populates pg_dist_node table
* (ii) Queries that create the clustered tables
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_shard_placement table referenced by (iv)
*/
List *
MetadataCreateCommands(void)
{
List *metadataSnapshotCommandList = NIL;
List *distributedTableList = DistributedTableList();
List *mxTableList = NIL;
List *workerNodeList = WorkerNodeList();
ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL;
/* generate insert command for pg_dist_node table */
nodeListInsertCommand = NodeListInsertCommand(workerNodeList);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
nodeListInsertCommand);
/* create the list of mx tables */
foreach(distributedTableCell, distributedTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
mxTableList = lappend(mxTableList, cacheEntry);
}
}
/* create the mx tables, but not the metadata */
foreach(distributedTableCell, mxTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
Oid relationId = cacheEntry->relationId;
List *commandList = GetTableDDLEvents(relationId);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
commandList);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
tableOwnerResetCommand);
}
/* construct the foreign key constraints after all tables are created */
foreach(distributedTableCell, mxTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
List *foreignConstraintCommands =
GetTableForeignConstraintCommands(cacheEntry->relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
foreignConstraintCommands);
}
/* after all tables are created, create the metadata */
foreach(distributedTableCell, mxTableList)
{
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
List *shardIntervalList = NIL;
List *shardCreateCommandList = NIL;
char *metadataCommand = NULL;
char *truncateTriggerCreateCommand = NULL;
Oid clusteredTableId = cacheEntry->relationId;
/* add the table metadata command first*/
metadataCommand = DistributionCreateCommand(cacheEntry);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
metadataCommand);
/* add the truncate trigger command after the table became distributed */
truncateTriggerCreateCommand =
TruncateTriggerCreateCommand(cacheEntry->relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
truncateTriggerCreateCommand);
/* add the pg_dist_shard{,placement} entries */
shardIntervalList = LoadShardIntervalList(clusteredTableId);
shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
shardCreateCommandList);
}
return metadataSnapshotCommandList;
}
/*
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
* create the given distributed table on a worker. The list includes setting up any
* sequences, setting the owner of the table, inserting table and shard metadata,
* setting the truncate trigger and foreign key constraints.
*/
List *
GetDistributedTableDDLEvents(Oid relationId)
{
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *shardIntervalList = NIL;
List *commandList = NIL;
List *foreignConstraintCommands = NIL;
List *shardMetadataInsertCommandList = NIL;
char *tableOwnerResetCommand = NULL;
char *metadataCommand = NULL;
char *truncateTriggerCreateCommand = NULL;
/* commands to create the table */
commandList = GetTableDDLEvents(relationId);
/* command to reset the table owner */
tableOwnerResetCommand = TableOwnerResetCommand(relationId);
commandList = lappend(commandList, tableOwnerResetCommand);
/* command to insert pg_dist_partition entry */
metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the mx table */
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand);
/* commands to insert pg_dist_shard & pg_dist_shard_placement entries */
shardIntervalList = LoadShardIntervalList(relationId);
shardMetadataInsertCommandList = ShardListInsertCommand(shardIntervalList);
commandList = list_concat(commandList, shardMetadataInsertCommandList);
/* commands to create foreign key constraints */
foreignConstraintCommands = GetTableForeignConstraintCommands(relationId);
commandList = list_concat(commandList, foreignConstraintCommands);
return commandList;
}
/*
* MetadataDropCommands returns list of queries that are required to
* drop all the metadata of the node that are related to clustered tables.
* The drop metadata snapshot commands includes the following queries:
*
* (i) Queries that delete all the rows from pg_dist_node table
* (ii) Queries that drop the clustered tables and remove its references from
* the pg_dist_partition. Note that distributed relation ids are gathered
* from the worker itself to prevent dropping any non-distributed tables
* with the same name.
* (iii) Queries that delete all the rows from pg_dist_shard table referenced by (ii)
* (iv) Queries that delete all the rows from pg_dist_shard_placement table
* referenced by (iii)
*/
List *
MetadataDropCommands(void)
{
List *dropSnapshotCommandList = NIL;
char *removeTablesCommand = NULL;
char *removeNodesCommand = NULL;
removeNodesCommand = DELETE_ALL_NODES;
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
removeNodesCommand);
removeTablesCommand = REMOVE_ALL_CLUSTERED_TABLES_COMMAND;
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
removeTablesCommand);
return dropSnapshotCommandList;
}
/*
* NodeListInsertCommand generates a single multi-row INSERT command that can be
* executed to insert the nodes that are in workerNodeList to pg_dist_node table.
*/
char *
NodeListInsertCommand(List *workerNodeList)
{
ListCell *workerNodeCell = NULL;
StringInfo nodeListInsertCommand = makeStringInfo();
int workerCount = list_length(workerNodeList);
int processedWorkerNodeCount = 0;
/* if there are no workers, return NULL */
if (workerCount == 0)
{
return nodeListInsertCommand->data;
}
/* generate the query without any values yet */
appendStringInfo(nodeListInsertCommand,
"INSERT INTO pg_dist_node "
"(nodeid, groupid, nodename, nodeport, noderack, hasmetadata) "
"VALUES ");
/* iterate over the worker nodes, add the values */
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *hasMetadaString = workerNode->hasMetadata ? "TRUE" : "FALSE";
appendStringInfo(nodeListInsertCommand,
"(%d, %d, %s, %d, '%s', %s)",
workerNode->nodeId,
workerNode->groupId,
quote_literal_cstr(workerNode->workerName),
workerNode->workerPort,
workerNode->workerRack,
hasMetadaString);
processedWorkerNodeCount++;
if (processedWorkerNodeCount != workerCount)
{
appendStringInfo(nodeListInsertCommand, ",");
}
}
return nodeListInsertCommand->data;
}
/*
* DistributionCreateCommands generates a commands that can be
* executed to replicate the metadata for a distributed table.
*/
char *
DistributionCreateCommand(DistTableCacheEntry *cacheEntry)
{
StringInfo insertDistributionCommand = makeStringInfo();
Oid relationId = cacheEntry->relationId;
char distributionMethod = cacheEntry->partitionMethod;
char *partitionKeyString = cacheEntry->partitionKeyString;
char *qualifiedRelationName =
generate_qualified_relation_name(relationId);
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
uint32 colocationId = cacheEntry->colocationId;
char replicationModel = cacheEntry->replicationModel;
appendStringInfo(insertDistributionCommand,
"INSERT INTO pg_dist_partition "
"(logicalrelid, partmethod, partkey, colocationid, repmodel) "
"VALUES "
"(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')",
quote_literal_cstr(qualifiedRelationName),
distributionMethod,
quote_literal_cstr(qualifiedRelationName),
quote_literal_cstr(partitionKeyColumnName),
colocationId,
replicationModel);
return insertDistributionCommand->data;
}
/*
* DistributionDeleteCommand generates a command that can be executed
* to drop a distributed table and its metadata on a remote node.
*/
char *
DistributionDeleteCommand(char *schemaName, char *tableName)
{
char *distributedRelationName = NULL;
StringInfo deleteDistributionCommand = makeStringInfo();
distributedRelationName = quote_qualified_identifier(schemaName, tableName);
appendStringInfo(deleteDistributionCommand,
"SELECT worker_drop_distributed_table(%s::regclass)",
quote_literal_cstr(distributedRelationName));
return deleteDistributionCommand->data;
}
/*
* TableOwnerResetCommand generates a commands that can be executed
* to reset the table owner.
*/
char *
TableOwnerResetCommand(Oid relationId)
{
StringInfo ownerResetCommand = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
char *tableOwnerName = TableOwner(relationId);
appendStringInfo(ownerResetCommand,
"ALTER TABLE %s OWNER TO %s",
qualifiedRelationName,
quote_identifier(tableOwnerName));
return ownerResetCommand->data;
}
/*
* ShardListInsertCommand generates a singe command that can be
* executed to replicate shard and shard placement metadata for the
* given shard intervals. The function assumes that each shard has a
* single placement, and asserts this information.
*/
List *
ShardListInsertCommand(List *shardIntervalList)
{
List *commandList = NIL;
ListCell *shardIntervalCell = NULL;
StringInfo insertPlacementCommand = makeStringInfo();
StringInfo insertShardCommand = makeStringInfo();
int shardCount = list_length(shardIntervalList);
int processedShardCount = 0;
int processedShardPlacementCount = 0;
/* if there are no shards, return empty list */
if (shardCount == 0)
{
return commandList;
}
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
"VALUES ");
/* add placements to insertPlacementCommand */
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId);
ShardPlacement *placement = NULL;
/* the function only handles single placement per shard */
Assert(list_length(shardPlacementList) == 1);
placement = (ShardPlacement *) linitial(shardPlacementList);
appendStringInfo(insertPlacementCommand,
"(%lu, 1, %lu, %s, %d, %lu)",
shardId,
placement->shardLength,
quote_literal_cstr(placement->nodeName),
placement->nodePort,
placement->placementId);
processedShardPlacementCount++;
if (processedShardPlacementCount != shardCount)
{
appendStringInfo(insertPlacementCommand, ",");
}
}
/* add the command to the list that we'll return */
commandList = lappend(commandList, insertPlacementCommand->data);
/* now, generate the shard query without any values yet */
appendStringInfo(insertShardCommand,
"INSERT INTO pg_dist_shard "
"(logicalrelid, shardid, shardstorage,"
" shardminvalue, shardmaxvalue) "
"VALUES ");
/* now add shards to insertShardCommand */
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
uint64 shardId = shardInterval->shardId;
Oid distributedRelationId = shardInterval->relationId;
char *qualifiedRelationName = generate_qualified_relation_name(
distributedRelationId);
int minHashToken = DatumGetInt32(shardInterval->minValue);
int maxHashToken = DatumGetInt32(shardInterval->maxValue);
appendStringInfo(insertShardCommand,
"(%s::regclass, %lu, '%c', '%d', '%d')",
quote_literal_cstr(qualifiedRelationName),
shardId,
shardInterval->storageType,
minHashToken,
maxHashToken);
processedShardCount++;
if (processedShardCount != shardCount)
{
appendStringInfo(insertShardCommand, ",");
}
}
/* finally add the command to the list that we'll return */
commandList = lappend(commandList, insertShardCommand->data);
return commandList;
}
/*
* NodeDeleteCommand generate a command that can be
* executed to delete the metadata for a worker node.
*/
char *
NodeDeleteCommand(uint32 nodeId)
{
StringInfo nodeDeleteCommand = makeStringInfo();
appendStringInfo(nodeDeleteCommand,
"DELETE FROM pg_dist_node "
"WHERE nodeid = %u", nodeId);
return nodeDeleteCommand->data;
}
/*
* ColocationIdUpdateCommand creates the SQL command to change the colocationId
* of the table with the given name to the given colocationId in pg_dist_partition
* table.
*/
char *
ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
{
StringInfo command = makeStringInfo();
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
appendStringInfo(command, "UPDATE pg_dist_partition "
"SET colocationid = %d "
"WHERE logicalrelid = %s::regclass",
colocationId, quote_literal_cstr(qualifiedRelationName));
return command->data;
}
/*
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string.
*/
static char *
LocalGroupIdUpdateCommand(uint32 groupId)
{
StringInfo updateCommand = makeStringInfo();
appendStringInfo(updateCommand, "UPDATE pg_dist_local_group SET groupid = %d",
groupId);
return updateCommand->data;
}
/*
* MarkNodeHasMetadata function sets the hasmetadata column of the specified worker in
* pg_dist_node to true.
*/
static void
MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata)
{
const bool indexOK = false;
const int scanKeyCount = 2;
Relation pgDistNode = NULL;
TupleDesc tupleDescriptor = NULL;
ScanKeyData scanKey[scanKeyCount];
SysScanDesc scanDescriptor = NULL;
HeapTuple heapTuple = NULL;
Datum values[Natts_pg_dist_node];
bool isnull[Natts_pg_dist_node];
bool replace[Natts_pg_dist_node];
pgDistNode = heap_open(DistNodeRelationId(), RowExclusiveLock);
tupleDescriptor = RelationGetDescr(pgDistNode);
ScanKeyInit(&scanKey[0], Anum_pg_dist_node_nodename,
BTEqualStrategyNumber, F_TEXTEQ, CStringGetTextDatum(nodeName));
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_nodeport,
BTEqualStrategyNumber, F_INT8EQ, Int32GetDatum(nodePort));
scanDescriptor = systable_beginscan(pgDistNode, InvalidOid, indexOK,
NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",
nodeName, nodePort)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_node_hasmetadata - 1] = BoolGetDatum(hasMetadata);
isnull[Anum_pg_dist_node_hasmetadata - 1] = false;
replace[Anum_pg_dist_node_hasmetadata - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
simple_heap_update(pgDistNode, &heapTuple->t_self, heapTuple);
CatalogUpdateIndexes(pgDistNode, heapTuple);
CitusInvalidateRelcacheByRelid(DistNodeRelationId());
CommandCounterIncrement();
systable_endscan(scanDescriptor);
heap_close(pgDistNode, NoLock);
}
/*
* TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger
* function, which creates the truncate trigger on the worker.
*/
static char *
TruncateTriggerCreateCommand(Oid relationId)
{
StringInfo triggerCreateCommand = makeStringInfo();
char *tableName = generate_qualified_relation_name(relationId);
appendStringInfo(triggerCreateCommand,
"SELECT worker_create_truncate_trigger(%s)",
quote_literal_cstr(tableName));
return triggerCreateCommand->data;
}