mirror of https://github.com/citusdata/citus.git
Add Metadata Snapshot Infrastructure
This change adds the required infrastructure about metadata snapshot from MX codebase into Citus, mainly metadata_sync.c file and master_metadata_snapshot UDF.pull/799/head
parent
14315f05a5
commit
ed3af403fd
|
@ -8,7 +8,7 @@ EXTENSION = citus
|
|||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
|
||||
5.2-1 5.2-2 5.2-3 5.2-4 \
|
||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5
|
||||
6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6
|
||||
|
||||
# All citus--*.sql files in the source directory
|
||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||
|
@ -16,7 +16,7 @@ DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)
|
|||
DATA_built = $(foreach v,$(EXTVERSIONS),$(EXTENSION)--$(v).sql)
|
||||
|
||||
# directories with source files
|
||||
SUBDIRS = . commands executor master planner relay test transaction utils worker
|
||||
SUBDIRS = . commands executor master metadata planner relay test transaction utils worker
|
||||
|
||||
# That patsubst rule searches all directories listed in SUBDIRS for .c
|
||||
# files, and adds the corresponding .o files to OBJS
|
||||
|
@ -68,6 +68,8 @@ $(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql
|
|||
cat $^ > $@
|
||||
$(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql
|
||||
cat $^ > $@
|
||||
$(EXTENSION)--6.0-6.sql: $(EXTENSION)--6.0-5.sql $(EXTENSION)--6.0-5--6.0-6.sql
|
||||
cat $^ > $@
|
||||
|
||||
NO_PGXS = 1
|
||||
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
CREATE FUNCTION worker_drop_distributed_table(logicalrelid Oid)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_drop_distributed_table$$;
|
||||
|
||||
COMMENT ON FUNCTION worker_drop_distributed_table(logicalrelid Oid)
|
||||
IS 'drop the clustered table and its reference from metadata tables';
|
||||
|
||||
CREATE FUNCTION column_name_to_column(table_name regclass, column_name text)
|
||||
RETURNS text
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$column_name_to_column$$;
|
||||
COMMENT ON FUNCTION column_name_to_column(table_name regclass, column_name text)
|
||||
IS 'convert a column name to its textual Var representation';
|
|
@ -1,6 +1,6 @@
|
|||
# Citus extension
|
||||
comment = 'Citus distributed database'
|
||||
default_version = '6.0-5'
|
||||
default_version = '6.0-6'
|
||||
module_pathname = '$libdir/citus'
|
||||
relocatable = false
|
||||
schema = pg_catalog
|
||||
|
|
|
@ -29,6 +29,7 @@
|
|||
#include "commands/extension.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/pg_dist_partition.h"
|
||||
|
|
|
@ -624,53 +624,6 @@ UpdateShardPlacementState(uint64 placementId, char shardState)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildDistributionKeyFromColumnName builds a simple distribution key consisting
|
||||
* only out of a reference to the column of name columnName. Errors out if the
|
||||
* specified column does not exist or is not suitable to be used as a
|
||||
* distribution column.
|
||||
*/
|
||||
Node *
|
||||
BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName)
|
||||
{
|
||||
HeapTuple columnTuple = NULL;
|
||||
Form_pg_attribute columnForm = NULL;
|
||||
Var *column = NULL;
|
||||
char *tableName = RelationGetRelationName(distributedRelation);
|
||||
|
||||
/* it'd probably better to downcase identifiers consistent with SQL case folding */
|
||||
truncate_identifier(columnName, strlen(columnName), true);
|
||||
|
||||
/* lookup column definition */
|
||||
columnTuple = SearchSysCacheAttName(RelationGetRelid(distributedRelation),
|
||||
columnName);
|
||||
if (!HeapTupleIsValid(columnTuple))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
|
||||
errmsg("column \"%s\" of relation \"%s\" does not exist",
|
||||
columnName, tableName)));
|
||||
}
|
||||
|
||||
columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple);
|
||||
|
||||
/* check if the column may be referenced in the distribution key */
|
||||
if (columnForm->attnum <= 0)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot reference system column \"%s\" in relation \"%s\"",
|
||||
columnName, tableName)));
|
||||
}
|
||||
|
||||
/* build Var referencing only the chosen distribution column */
|
||||
column = makeVar(1, columnForm->attnum, columnForm->atttypid,
|
||||
columnForm->atttypmod, columnForm->attcollation, 0);
|
||||
|
||||
ReleaseSysCache(columnTuple);
|
||||
|
||||
return (Node *) column;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Check that the current user has `mode` permissions on relationId, error out
|
||||
* if not. Superusers always have such permissions.
|
||||
|
|
|
@ -0,0 +1,416 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* metadata_sync.c
|
||||
*
|
||||
* Routines for synchronizing metadata to all workers.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
* $Id$
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
#include "miscadmin.h"
|
||||
|
||||
#include <sys/stat.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "access/heapam.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/worker_manager.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "nodes/pg_list.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/memutils.h"
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncTableMetadata checks if a distributed table has streaming replication model
|
||||
* and hash distribution. In that case the distributed table is considered an MX table,
|
||||
* and its metadata is required to exist on the worker nodes.
|
||||
*/
|
||||
bool
|
||||
ShouldSyncTableMetadata(Oid relationId)
|
||||
{
|
||||
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
|
||||
bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
|
||||
bool usesStreamingReplication =
|
||||
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
|
||||
|
||||
if (usesStreamingReplication && usesHashDistribution)
|
||||
{
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataCreateCommands returns list of queries that are
|
||||
* required to create the current metadata snapshot of the node that the
|
||||
* function is called. The metadata snapshot commands includes the
|
||||
* following queries:
|
||||
*
|
||||
* (i) Query that populates pg_dist_node table
|
||||
* (ii) Queries that create the clustered tables
|
||||
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
|
||||
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
|
||||
* (v) Queries that populate pg_dist_shard_placement table referenced by (iv)
|
||||
*/
|
||||
List *
|
||||
MetadataCreateCommands(void)
|
||||
{
|
||||
List *metadataSnapshotCommandList = NIL;
|
||||
List *distributedTableList = DistributedTableList();
|
||||
List *workerNodeList = WorkerNodeList();
|
||||
ListCell *distributedTableCell = NULL;
|
||||
char *nodeListInsertCommand = NULL;
|
||||
|
||||
/* generate insert command for pg_dist_node table */
|
||||
nodeListInsertCommand = NodeListInsertCommand(workerNodeList);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
nodeListInsertCommand);
|
||||
|
||||
/* iterate over the distributed tables */
|
||||
foreach(distributedTableCell, distributedTableList)
|
||||
{
|
||||
DistTableCacheEntry *cacheEntry =
|
||||
(DistTableCacheEntry *) lfirst(distributedTableCell);
|
||||
List *clusteredTableDDLEvents = NIL;
|
||||
List *shardIntervalList = NIL;
|
||||
List *shardCreateCommandList = NIL;
|
||||
Oid clusteredTableId = cacheEntry->relationId;
|
||||
|
||||
/* add only clustered tables */
|
||||
if (!ShouldSyncTableMetadata(clusteredTableId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/* add the DDL events first */
|
||||
clusteredTableDDLEvents = GetDistributedTableDDLEvents(cacheEntry);
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
clusteredTableDDLEvents);
|
||||
|
||||
/* add the pg_dist_shard{,placement} entries */
|
||||
shardIntervalList = LoadShardIntervalList(clusteredTableId);
|
||||
shardCreateCommandList = ShardListInsertCommand(shardIntervalList);
|
||||
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
shardCreateCommandList);
|
||||
}
|
||||
|
||||
return metadataSnapshotCommandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MetadataDropCommands returns list of queries that are required to
|
||||
* drop all the metadata of the node that are related to clustered tables.
|
||||
* The drop metadata snapshot commands includes the following queries:
|
||||
*
|
||||
* (i) Queries that delete all the rows from pg_dist_node table
|
||||
* (ii) Queries that drop the clustered tables and remove its references from
|
||||
* the pg_dist_partition. Note that distributed relation ids are gathered
|
||||
* from the worker itself to prevent dropping any non-distributed tables
|
||||
* with the same name.
|
||||
* (iii) Queries that delete all the rows from pg_dist_shard table referenced by (ii)
|
||||
* (iv) Queries that delete all the rows from pg_dist_shard_placement table
|
||||
* referenced by (iii)
|
||||
*/
|
||||
List *
|
||||
MetadataDropCommands(void)
|
||||
{
|
||||
List *dropSnapshotCommandList = NIL;
|
||||
char *removeTablesCommand = NULL;
|
||||
char *removeNodesCommand = NULL;
|
||||
|
||||
removeNodesCommand = DELETE_ALL_NODES;
|
||||
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
|
||||
removeNodesCommand);
|
||||
|
||||
removeTablesCommand = REMOVE_ALL_CLUSTERED_TABLES_COMMAND;
|
||||
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
|
||||
removeTablesCommand);
|
||||
|
||||
return dropSnapshotCommandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeListInsertCommand generates a single multi-row INSERT command that can be
|
||||
* executed to insert the nodes that are in workerNodeList to pg_dist_node table.
|
||||
*/
|
||||
char *
|
||||
NodeListInsertCommand(List *workerNodeList)
|
||||
{
|
||||
ListCell *workerNodeCell = NULL;
|
||||
StringInfo nodeListInsertCommand = makeStringInfo();
|
||||
int workerCount = list_length(workerNodeList);
|
||||
int processedWorkerNodeCount = 0;
|
||||
|
||||
/* if there are no workers, return NULL */
|
||||
if (workerCount == 0)
|
||||
{
|
||||
return nodeListInsertCommand->data;
|
||||
}
|
||||
|
||||
/* generate the query without any values yet */
|
||||
appendStringInfo(nodeListInsertCommand,
|
||||
"INSERT INTO pg_dist_node "
|
||||
"(nodeid, groupid, nodename, nodeport, noderack) "
|
||||
"VALUES ");
|
||||
|
||||
/* iterate over the worker nodes, add the values */
|
||||
foreach(workerNodeCell, workerNodeList)
|
||||
{
|
||||
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
|
||||
|
||||
appendStringInfo(nodeListInsertCommand,
|
||||
"(%d, %d, %s, %d, '%s')",
|
||||
workerNode->nodeId,
|
||||
workerNode->groupId,
|
||||
quote_literal_cstr(workerNode->workerName),
|
||||
workerNode->workerPort,
|
||||
workerNode->workerRack);
|
||||
|
||||
processedWorkerNodeCount++;
|
||||
if (processedWorkerNodeCount != workerCount)
|
||||
{
|
||||
appendStringInfo(nodeListInsertCommand, ",");
|
||||
}
|
||||
}
|
||||
|
||||
return nodeListInsertCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributionCreateCommands generates a commands that can be
|
||||
* executed to replicate the metadata for a distributed table.
|
||||
*/
|
||||
char *
|
||||
DistributionCreateCommand(DistTableCacheEntry *cacheEntry)
|
||||
{
|
||||
StringInfo insertDistributionCommand = makeStringInfo();
|
||||
Oid relationId = cacheEntry->relationId;
|
||||
char distributionMethod = cacheEntry->partitionMethod;
|
||||
char *partitionKeyString = cacheEntry->partitionKeyString;
|
||||
char *qualifiedRelationName =
|
||||
generate_qualified_relation_name(relationId);
|
||||
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
|
||||
uint64 colocationId = cacheEntry->colocationId;
|
||||
char replicationModel = cacheEntry->replicationModel;
|
||||
|
||||
appendStringInfo(insertDistributionCommand,
|
||||
"INSERT INTO pg_dist_partition "
|
||||
"(logicalrelid, partmethod, partkey, colocationid, repmodel) "
|
||||
"VALUES "
|
||||
"(%s::regclass, '%c', column_name_to_column(%s,%s), %lu, '%c')",
|
||||
quote_literal_cstr(qualifiedRelationName),
|
||||
distributionMethod,
|
||||
quote_literal_cstr(qualifiedRelationName),
|
||||
quote_literal_cstr(partitionKeyColumnName),
|
||||
colocationId,
|
||||
replicationModel);
|
||||
|
||||
return insertDistributionCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributionDeleteCommand generates a command that can be executed
|
||||
* to drop a distributed table and its metadata on a remote node.
|
||||
*/
|
||||
char *
|
||||
DistributionDeleteCommand(char *schemaName, char *tableName)
|
||||
{
|
||||
char *distributedRelationName = NULL;
|
||||
StringInfo deleteDistributionCommand = makeStringInfo();
|
||||
|
||||
distributedRelationName = quote_qualified_identifier(schemaName, tableName);
|
||||
|
||||
appendStringInfo(deleteDistributionCommand,
|
||||
"SELECT worker_drop_distributed_table(%s::regclass)",
|
||||
quote_literal_cstr(distributedRelationName));
|
||||
|
||||
return deleteDistributionCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TableOwnerResetCommand generates a commands that can be executed
|
||||
* to reset the table owner.
|
||||
*/
|
||||
char *
|
||||
TableOwnerResetCommand(Oid relationId)
|
||||
{
|
||||
StringInfo ownerResetCommand = makeStringInfo();
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(relationId);
|
||||
char *tableOwnerName = TableOwner(relationId);
|
||||
|
||||
appendStringInfo(ownerResetCommand,
|
||||
"ALTER TABLE %s OWNER TO %s",
|
||||
qualifiedRelationName,
|
||||
quote_identifier(tableOwnerName));
|
||||
|
||||
return ownerResetCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShardListInsertCommand generates a singe command that can be
|
||||
* executed to replicate shard and shard placement metadata for the
|
||||
* given shard intervals. The function assumes that each shard has a
|
||||
* single placement, and asserts this information.
|
||||
*/
|
||||
List *
|
||||
ShardListInsertCommand(List *shardIntervalList)
|
||||
{
|
||||
List *commandList = NIL;
|
||||
ListCell *shardIntervalCell = NULL;
|
||||
StringInfo insertPlacementCommand = makeStringInfo();
|
||||
StringInfo insertShardCommand = makeStringInfo();
|
||||
int shardCount = list_length(shardIntervalList);
|
||||
int processedShardCount = 0;
|
||||
int processedShardPlacementCount = 0;
|
||||
|
||||
/* if there are no shards, return empty list */
|
||||
if (shardCount == 0)
|
||||
{
|
||||
return commandList;
|
||||
}
|
||||
|
||||
/* generate the shard placement query without any values yet */
|
||||
appendStringInfo(insertPlacementCommand,
|
||||
"INSERT INTO pg_dist_shard_placement "
|
||||
"(shardid, shardstate, shardlength,"
|
||||
" nodename, nodeport, placementid) "
|
||||
"VALUES ");
|
||||
|
||||
/* add placements to insertPlacementCommand */
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
|
||||
List *shardPlacementList = FinalizedShardPlacementList(shardId);
|
||||
ShardPlacement *placement = NULL;
|
||||
|
||||
/* the function only handles single placement per shard */
|
||||
Assert(list_length(shardPlacementList) == 1);
|
||||
|
||||
placement = (ShardPlacement *) linitial(shardPlacementList);
|
||||
|
||||
appendStringInfo(insertPlacementCommand,
|
||||
"(%lu, 1, %lu, %s, %d, %lu)",
|
||||
shardId,
|
||||
placement->shardLength,
|
||||
quote_literal_cstr(placement->nodeName),
|
||||
placement->nodePort,
|
||||
placement->placementId);
|
||||
|
||||
processedShardPlacementCount++;
|
||||
if (processedShardPlacementCount != shardCount)
|
||||
{
|
||||
appendStringInfo(insertPlacementCommand, ",");
|
||||
}
|
||||
}
|
||||
|
||||
/* add the command to the list that we'll return */
|
||||
commandList = lappend(commandList, insertPlacementCommand->data);
|
||||
|
||||
/* now, generate the shard query without any values yet */
|
||||
appendStringInfo(insertShardCommand,
|
||||
"INSERT INTO pg_dist_shard "
|
||||
"(logicalrelid, shardid, shardstorage,"
|
||||
" shardminvalue, shardmaxvalue) "
|
||||
"VALUES ");
|
||||
|
||||
/* now add shards to insertShardCommand */
|
||||
foreach(shardIntervalCell, shardIntervalList)
|
||||
{
|
||||
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
Oid distributedRelationId = shardInterval->relationId;
|
||||
char *qualifiedRelationName = generate_qualified_relation_name(
|
||||
distributedRelationId);
|
||||
|
||||
int minHashToken = DatumGetInt32(shardInterval->minValue);
|
||||
int maxHashToken = DatumGetInt32(shardInterval->maxValue);
|
||||
|
||||
appendStringInfo(insertShardCommand,
|
||||
"(%s::regclass, %lu, '%c', '%d', '%d')",
|
||||
quote_literal_cstr(qualifiedRelationName),
|
||||
shardId,
|
||||
shardInterval->storageType,
|
||||
minHashToken,
|
||||
maxHashToken);
|
||||
|
||||
processedShardCount++;
|
||||
if (processedShardCount != shardCount)
|
||||
{
|
||||
appendStringInfo(insertShardCommand, ",");
|
||||
}
|
||||
}
|
||||
|
||||
/* finally add the command to the list that we'll return */
|
||||
commandList = lappend(commandList, insertShardCommand->data);
|
||||
|
||||
return commandList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* NodeDeleteCommand generate a command that can be
|
||||
* executed to delete the metadata for a worker node.
|
||||
*/
|
||||
char *
|
||||
NodeDeleteCommand(uint32 nodeId)
|
||||
{
|
||||
StringInfo nodeDeleteCommand = makeStringInfo();
|
||||
|
||||
appendStringInfo(nodeDeleteCommand,
|
||||
"DELETE FROM pg_dist_node "
|
||||
"WHERE nodeid = %u", nodeId);
|
||||
|
||||
return nodeDeleteCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
|
||||
* create this relation on a worker. This includes setting up any sequences,
|
||||
* setting the owner of the table, and inserting into metadata tables.
|
||||
*/
|
||||
List *
|
||||
GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry)
|
||||
{
|
||||
char *ownerResetCommand = NULL;
|
||||
char *metadataCommand = NULL;
|
||||
Oid relationId = cacheEntry->relationId;
|
||||
|
||||
List *commandList = GetTableDDLEvents(relationId);
|
||||
|
||||
ownerResetCommand = TableOwnerResetCommand(relationId);
|
||||
commandList = lappend(commandList, ownerResetCommand);
|
||||
|
||||
metadataCommand = DistributionCreateCommand(cacheEntry);
|
||||
commandList = lappend(commandList, metadataCommand);
|
||||
|
||||
return commandList;
|
||||
}
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include "access/heapam.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/master_protocol.h"
|
||||
|
@ -46,8 +47,6 @@ PG_FUNCTION_INFO_V1(load_shard_placement_array);
|
|||
PG_FUNCTION_INFO_V1(partition_column_id);
|
||||
PG_FUNCTION_INFO_V1(partition_type);
|
||||
PG_FUNCTION_INFO_V1(is_distributed_table);
|
||||
PG_FUNCTION_INFO_V1(column_name_to_column);
|
||||
PG_FUNCTION_INFO_V1(column_name_to_column_id);
|
||||
PG_FUNCTION_INFO_V1(create_monolithic_shard_row);
|
||||
PG_FUNCTION_INFO_V1(create_healthy_local_shard_placement_row);
|
||||
PG_FUNCTION_INFO_V1(delete_shard_placement_row);
|
||||
|
@ -214,61 +213,6 @@ is_distributed_table(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* column_name_to_column is an internal UDF to obtain a textual representation
|
||||
* of a particular column node (Var), given a relation identifier and column
|
||||
* name. There is no requirement that the table be distributed; this function
|
||||
* simply returns the textual representation of a Var representing a column.
|
||||
* This function will raise an ERROR if no such column can be found or if the
|
||||
* provided name refers to a system column.
|
||||
*/
|
||||
Datum
|
||||
column_name_to_column(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
text *columnText = PG_GETARG_TEXT_P(1);
|
||||
Relation relation = NULL;
|
||||
char *columnName = text_to_cstring(columnText);
|
||||
Var *column = NULL;
|
||||
char *columnNodeString = NULL;
|
||||
text *columnNodeText = NULL;
|
||||
|
||||
relation = relation_open(relationId, AccessExclusiveLock);
|
||||
|
||||
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
|
||||
columnNodeString = nodeToString(column);
|
||||
columnNodeText = cstring_to_text(columnNodeString);
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
PG_RETURN_TEXT_P(columnNodeText);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* column_name_to_column_id takes a relation identifier and a name of a column
|
||||
* in that relation and returns the index of that column in the relation. If
|
||||
* the provided name is a system column or no column at all, this function will
|
||||
* throw an error instead.
|
||||
*/
|
||||
Datum
|
||||
column_name_to_column_id(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid distributedTableId = PG_GETARG_OID(0);
|
||||
char *columnName = PG_GETARG_CSTRING(1);
|
||||
Relation relation = NULL;
|
||||
Var *column = NULL;
|
||||
|
||||
relation = relation_open(distributedTableId, AccessExclusiveLock);
|
||||
|
||||
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
PG_RETURN_INT16((int16) column->varattno);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* create_monolithic_shard_row creates a single shard covering all possible
|
||||
* hash values for a given table and inserts a row representing that shard
|
||||
|
|
|
@ -0,0 +1,64 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* test/src/metadata_sync.c
|
||||
*
|
||||
* This file contains functions to exercise the metadata snapshoy
|
||||
* generation functionality within Citus.
|
||||
*
|
||||
* Copyright (c) 2014-2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
#include "postgres.h"
|
||||
#include "c.h"
|
||||
#include "fmgr.h"
|
||||
|
||||
#include "catalog/pg_type.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/test_helper_functions.h" /* IWYU pragma: keep */
|
||||
#include "utils/array.h"
|
||||
#include "utils/builtins.h"
|
||||
|
||||
|
||||
/* declarations for dynamic loading */
|
||||
PG_FUNCTION_INFO_V1(master_metadata_snapshot);
|
||||
|
||||
|
||||
/*
|
||||
* master_metadata_snapshot prints all the queries that are required
|
||||
* to generate a metadata snapshot.
|
||||
*/
|
||||
Datum
|
||||
master_metadata_snapshot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
List *dropSnapshotCommands = MetadataDropCommands();
|
||||
List *createSnapshotCommands = MetadataCreateCommands();
|
||||
List *snapshotCommandList = NIL;
|
||||
ListCell *snapshotCommandCell = NULL;
|
||||
int snapshotCommandCount = 0;
|
||||
Datum *snapshotCommandDatumArray = NULL;
|
||||
ArrayType *snapshotCommandArrayType = NULL;
|
||||
int snapshotCommandIndex = 0;
|
||||
Oid ddlCommandTypeId = TEXTOID;
|
||||
|
||||
snapshotCommandList = list_concat(snapshotCommandList, dropSnapshotCommands);
|
||||
snapshotCommandList = list_concat(snapshotCommandList, createSnapshotCommands);
|
||||
|
||||
snapshotCommandCount = list_length(snapshotCommandList);
|
||||
snapshotCommandDatumArray = palloc0(snapshotCommandCount * sizeof(Datum));
|
||||
|
||||
foreach(snapshotCommandCell, snapshotCommandList)
|
||||
{
|
||||
char *metadataSnapshotCommand = (char *) lfirst(snapshotCommandCell);
|
||||
Datum metadataSnapshotCommandDatum = CStringGetTextDatum(metadataSnapshotCommand);
|
||||
|
||||
snapshotCommandDatumArray[snapshotCommandIndex] = metadataSnapshotCommandDatum;
|
||||
snapshotCommandIndex++;
|
||||
}
|
||||
|
||||
snapshotCommandArrayType = DatumArrayToArrayType(snapshotCommandDatumArray,
|
||||
snapshotCommandCount,
|
||||
ddlCommandTypeId);
|
||||
|
||||
PG_RETURN_ARRAYTYPE_P(snapshotCommandArrayType);
|
||||
}
|
|
@ -174,41 +174,6 @@ pg_get_serverdef_string(Oid tableRelationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendOptionListToString converts the option list to its textual format, and
|
||||
* appends this text to the given string buffer.
|
||||
*/
|
||||
static void
|
||||
AppendOptionListToString(StringInfo stringBuffer, List *optionList)
|
||||
{
|
||||
if (optionList != NIL)
|
||||
{
|
||||
ListCell *optionCell = NULL;
|
||||
bool firstOptionPrinted = false;
|
||||
|
||||
appendStringInfo(stringBuffer, " OPTIONS (");
|
||||
|
||||
foreach(optionCell, optionList)
|
||||
{
|
||||
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||
char *optionName = option->defname;
|
||||
char *optionValue = defGetString(option);
|
||||
|
||||
if (firstOptionPrinted)
|
||||
{
|
||||
appendStringInfo(stringBuffer, ", ");
|
||||
}
|
||||
firstOptionPrinted = true;
|
||||
|
||||
appendStringInfo(stringBuffer, "%s ", quote_identifier(optionName));
|
||||
appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue));
|
||||
}
|
||||
|
||||
appendStringInfo(stringBuffer, ")");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* pg_get_sequencedef_string returns the definition of a given sequence. This
|
||||
* definition includes explicit values for all CREATE SEQUENCE options.
|
||||
|
@ -774,6 +739,77 @@ pg_get_table_grants(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* generate_qualified_relation_name computes the schema-qualified name to display for a
|
||||
* relation specified by OID.
|
||||
*/
|
||||
char *
|
||||
generate_qualified_relation_name(Oid relid)
|
||||
{
|
||||
HeapTuple tp;
|
||||
Form_pg_class reltup;
|
||||
char *relname;
|
||||
char *nspname;
|
||||
char *result;
|
||||
|
||||
tp = SearchSysCache1(RELOID, ObjectIdGetDatum(relid));
|
||||
if (!HeapTupleIsValid(tp))
|
||||
{
|
||||
elog(ERROR, "cache lookup failed for relation %u", relid);
|
||||
}
|
||||
reltup = (Form_pg_class) GETSTRUCT(tp);
|
||||
relname = NameStr(reltup->relname);
|
||||
|
||||
nspname = get_namespace_name(reltup->relnamespace);
|
||||
if (!nspname)
|
||||
{
|
||||
elog(ERROR, "cache lookup failed for namespace %u",
|
||||
reltup->relnamespace);
|
||||
}
|
||||
|
||||
result = quote_qualified_identifier(nspname, relname);
|
||||
|
||||
ReleaseSysCache(tp);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* AppendOptionListToString converts the option list to its textual format, and
|
||||
* appends this text to the given string buffer.
|
||||
*/
|
||||
static void
|
||||
AppendOptionListToString(StringInfo stringBuffer, List *optionList)
|
||||
{
|
||||
if (optionList != NIL)
|
||||
{
|
||||
ListCell *optionCell = NULL;
|
||||
bool firstOptionPrinted = false;
|
||||
|
||||
appendStringInfo(stringBuffer, " OPTIONS (");
|
||||
|
||||
foreach(optionCell, optionList)
|
||||
{
|
||||
DefElem *option = (DefElem *) lfirst(optionCell);
|
||||
char *optionName = option->defname;
|
||||
char *optionValue = defGetString(option);
|
||||
|
||||
if (firstOptionPrinted)
|
||||
{
|
||||
appendStringInfo(stringBuffer, ", ");
|
||||
}
|
||||
firstOptionPrinted = true;
|
||||
|
||||
appendStringInfo(stringBuffer, "%s ", quote_identifier(optionName));
|
||||
appendStringInfo(stringBuffer, "%s", quote_literal_cstr(optionValue));
|
||||
}
|
||||
|
||||
appendStringInfo(stringBuffer, ")");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/* copy of postgresql's function, which is static as well */
|
||||
static const char *
|
||||
convert_aclright_to_string(int aclright)
|
||||
|
|
|
@ -0,0 +1,205 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* distribution_column.c
|
||||
*
|
||||
* This file contains functions for translating distribution columns in
|
||||
* metadata tables.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
|
||||
#include "access/attnum.h"
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/nodes.h"
|
||||
#include "nodes/primnodes.h"
|
||||
#include "parser/scansup.h"
|
||||
#include "utils/builtins.h"
|
||||
#include "utils/elog.h"
|
||||
#include "utils/errcodes.h"
|
||||
#include "utils/lsyscache.h"
|
||||
#include "utils/rel.h"
|
||||
#include "utils/relcache.h"
|
||||
#include "utils/syscache.h"
|
||||
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(column_name_to_column);
|
||||
PG_FUNCTION_INFO_V1(column_name_to_column_id);
|
||||
PG_FUNCTION_INFO_V1(column_to_column_name);
|
||||
|
||||
|
||||
/*
|
||||
* column_name_to_column is an internal UDF to obtain a textual representation
|
||||
* of a particular column node (Var), given a relation identifier and column
|
||||
* name. There is no requirement that the table be distributed; this function
|
||||
* simply returns the textual representation of a Var representing a column.
|
||||
* This function will raise an ERROR if no such column can be found or if the
|
||||
* provided name refers to a system column.
|
||||
*/
|
||||
Datum
|
||||
column_name_to_column(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
text *columnText = PG_GETARG_TEXT_P(1);
|
||||
Relation relation = NULL;
|
||||
char *columnName = text_to_cstring(columnText);
|
||||
Var *column = NULL;
|
||||
char *columnNodeString = NULL;
|
||||
text *columnNodeText = NULL;
|
||||
|
||||
relation = relation_open(relationId, AccessShareLock);
|
||||
|
||||
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
|
||||
columnNodeString = nodeToString(column);
|
||||
columnNodeText = cstring_to_text(columnNodeString);
|
||||
|
||||
relation_close(relation, AccessShareLock);
|
||||
|
||||
PG_RETURN_TEXT_P(columnNodeText);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* column_name_to_column_id takes a relation identifier and a name of a column
|
||||
* in that relation and returns the index of that column in the relation. If
|
||||
* the provided name is a system column or no column at all, this function will
|
||||
* throw an error instead.
|
||||
*/
|
||||
Datum
|
||||
column_name_to_column_id(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid distributedTableId = PG_GETARG_OID(0);
|
||||
char *columnName = PG_GETARG_CSTRING(1);
|
||||
Relation relation = NULL;
|
||||
Var *column = NULL;
|
||||
|
||||
relation = relation_open(distributedTableId, AccessExclusiveLock);
|
||||
|
||||
column = (Var *) BuildDistributionKeyFromColumnName(relation, columnName);
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
PG_RETURN_INT16((int16) column->varattno);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* column_to_column_name is an internal UDF to obtain the human-readable name
|
||||
* of a column given a relation identifier and the column's internal textual
|
||||
* (Var) representation. This function will raise an ERROR if no such column
|
||||
* can be found or if the provided Var refers to a system column.
|
||||
*/
|
||||
Datum
|
||||
column_to_column_name(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
text *columnNodeText = PG_GETARG_TEXT_P(1);
|
||||
|
||||
char *columnNodeString = text_to_cstring(columnNodeText);
|
||||
char *columnName = NULL;
|
||||
text *columnText = NULL;
|
||||
|
||||
columnName = ColumnNameToColumn(relationId, columnNodeString);
|
||||
|
||||
columnText = cstring_to_text(columnName);
|
||||
|
||||
PG_RETURN_TEXT_P(columnText);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildDistributionKeyFromColumnName builds a simple distribution key consisting
|
||||
* only out of a reference to the column of name columnName. Errors out if the
|
||||
* specified column does not exist or is not suitable to be used as a
|
||||
* distribution column.
|
||||
*/
|
||||
Node *
|
||||
BuildDistributionKeyFromColumnName(Relation distributedRelation, char *columnName)
|
||||
{
|
||||
HeapTuple columnTuple = NULL;
|
||||
Form_pg_attribute columnForm = NULL;
|
||||
Var *column = NULL;
|
||||
char *tableName = RelationGetRelationName(distributedRelation);
|
||||
|
||||
/* it'd probably better to downcase identifiers consistent with SQL case folding */
|
||||
truncate_identifier(columnName, strlen(columnName), true);
|
||||
|
||||
/* lookup column definition */
|
||||
columnTuple = SearchSysCacheAttName(RelationGetRelid(distributedRelation),
|
||||
columnName);
|
||||
if (!HeapTupleIsValid(columnTuple))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
|
||||
errmsg("column \"%s\" of relation \"%s\" does not exist",
|
||||
columnName, tableName)));
|
||||
}
|
||||
|
||||
columnForm = (Form_pg_attribute) GETSTRUCT(columnTuple);
|
||||
|
||||
/* check if the column may be referenced in the distribution key */
|
||||
if (columnForm->attnum <= 0)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot reference system column \"%s\" in relation \"%s\"",
|
||||
columnName, tableName)));
|
||||
}
|
||||
|
||||
/* build Var referencing only the chosen distribution column */
|
||||
column = makeVar(1, columnForm->attnum, columnForm->atttypid,
|
||||
columnForm->atttypmod, columnForm->attcollation, 0);
|
||||
|
||||
ReleaseSysCache(columnTuple);
|
||||
|
||||
return (Node *) column;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColumnNameToColumn returns the human-readable name of a column given a
|
||||
* relation identifier and the column's internal textual (Var) representation.
|
||||
* This function will raise an ERROR if no such column can be found or if the
|
||||
* provided Var refers to a system column.
|
||||
*/
|
||||
char *
|
||||
ColumnNameToColumn(Oid relationId, char *columnNodeString)
|
||||
{
|
||||
Node *columnNode = NULL;
|
||||
Var *column = NULL;
|
||||
AttrNumber columnNumber = InvalidAttrNumber;
|
||||
char *columnName = NULL;
|
||||
|
||||
columnNode = stringToNode(columnNodeString);
|
||||
|
||||
Assert(IsA(columnNode, Var));
|
||||
column = (Var *) columnNode;
|
||||
|
||||
columnNumber = column->varattno;
|
||||
if (!AttrNumberIsForUserDefinedAttr(columnNumber))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_COLUMN_REFERENCE),
|
||||
errmsg("attribute %d of relation \"%s\" is a system column",
|
||||
columnNumber, relationName)));
|
||||
}
|
||||
|
||||
columnName = get_attname(relationId, column->varattno);
|
||||
if (columnName == NULL)
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_UNDEFINED_COLUMN),
|
||||
errmsg("attribute %d of relation \"%s\" does not exist",
|
||||
columnNumber, relationName)));
|
||||
}
|
||||
|
||||
return columnName;
|
||||
}
|
|
@ -92,6 +92,7 @@ static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
|||
static void ResetDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
||||
static void InvalidateDistRelationCacheCallback(Datum argument, Oid relationId);
|
||||
static void InvalidateNodeRelationCacheCallback(Datum argument, Oid relationId);
|
||||
static List * DistTableOidList(void);
|
||||
static HeapTuple LookupDistPartitionTuple(Relation pgDistPartition, Oid relationId);
|
||||
static List * LookupDistShardTuples(Oid relationId);
|
||||
static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMethod,
|
||||
|
@ -133,6 +134,34 @@ IsDistributedTable(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributedTableList returns a list that includes all the valid distributed table
|
||||
* cache entries.
|
||||
*/
|
||||
List *
|
||||
DistributedTableList(void)
|
||||
{
|
||||
List *distTableOidList = NIL;
|
||||
List *distributedTableList = NIL;
|
||||
ListCell *distTableOidCell = NULL;
|
||||
|
||||
/* first, we need to iterate over pg_dist_partition */
|
||||
distTableOidList = DistTableOidList();
|
||||
|
||||
foreach(distTableOidCell, distTableOidList)
|
||||
{
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
Oid relationId = lfirst_oid(distTableOidCell);
|
||||
|
||||
cacheEntry = DistributedTableCacheEntry(relationId);
|
||||
|
||||
distributedTableList = lappend(distributedTableList, cacheEntry);
|
||||
}
|
||||
|
||||
return distributedTableList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LoadShardInterval reads shard metadata for given shardId from pg_dist_shard,
|
||||
* and converts min/max values in these metadata to their properly typed datum
|
||||
|
@ -1091,6 +1120,7 @@ InitializeWorkerNodeCache(void)
|
|||
strlcpy(workerNode->workerName, currentNode->workerName, WORKER_LENGTH);
|
||||
workerNode->workerPort = currentNode->workerPort;
|
||||
workerNode->groupId = currentNode->groupId;
|
||||
workerNode->nodeId = currentNode->nodeId;
|
||||
strlcpy(workerNode->workerRack, currentNode->workerRack, WORKER_LENGTH);
|
||||
|
||||
if (handleFound)
|
||||
|
@ -1253,6 +1283,51 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistTableOidList iterates over the pg_dist_partition table and returns
|
||||
* a list that consists of the logicalrelids.
|
||||
*/
|
||||
static List *
|
||||
DistTableOidList(void)
|
||||
{
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 0;
|
||||
HeapTuple heapTuple = NULL;
|
||||
List *distTableOidList = NIL;
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
|
||||
Relation pgDistPartition = heap_open(DistPartitionRelationId(), AccessShareLock);
|
||||
|
||||
scanDescriptor = systable_beginscan(pgDistPartition,
|
||||
InvalidOid, false,
|
||||
NULL, scanKeyCount, scanKey);
|
||||
|
||||
tupleDescriptor = RelationGetDescr(pgDistPartition);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
while (HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
bool isNull = false;
|
||||
Oid relationId = InvalidOid;
|
||||
Datum relationIdDatum = heap_getattr(heapTuple,
|
||||
Anum_pg_dist_partition_logicalrelid,
|
||||
tupleDescriptor, &isNull);
|
||||
|
||||
relationId = DatumGetObjectId(relationIdDatum);
|
||||
|
||||
distTableOidList = lappend_oid(distTableOidList, relationId);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
heap_close(pgDistPartition, AccessShareLock);
|
||||
|
||||
return distTableOidList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InvalidateNodeRelationCacheCallback destroys the WorkerNodeHash when
|
||||
* any change happens on pg_dist_node table. It also set WorkerNodeHash to
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* worker_drop_protocol.c
|
||||
*
|
||||
* Routines for dropping distributed tables and their metadata on worker nodes.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
* $Id$
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "access/heapam.h"
|
||||
#include "access/xact.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/master_metadata_utility.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "foreign/foreign.h"
|
||||
#include "utils/fmgroids.h"
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(worker_drop_distributed_table);
|
||||
|
||||
|
||||
static void DeletePartitionRow(Oid distributedRelationId);
|
||||
|
||||
|
||||
/*
|
||||
* worker_drop_distributed_table drops the distributed table with the given oid,
|
||||
* then, removes the associated rows from pg_dist_partition, pg_dist_shard and
|
||||
* pg_dist_shard_placement. The function also drops the server for foreign tables.
|
||||
*
|
||||
* Note that drop fails if any dependent objects are present for any of the
|
||||
* distributed tables. Also, shard placements of the distributed tables are
|
||||
* not dropped as in the case of "DROP TABLE distributed_table;" command.
|
||||
*
|
||||
* The function errors out if the input relation Oid is not a regular or foreign table.
|
||||
*/
|
||||
Datum
|
||||
worker_drop_distributed_table(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Datum relationIdDatum = PG_GETARG_OID(0);
|
||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||
|
||||
ObjectAddress distributedTableObject = { InvalidOid, InvalidOid, 0 };
|
||||
Relation distributedRelation = NULL;
|
||||
List *shardList = LoadShardList(relationId);
|
||||
ListCell *shardCell = NULL;
|
||||
char relationKind = '\0';
|
||||
|
||||
/* first check the relation type */
|
||||
distributedRelation = relation_open(relationId, AccessShareLock);
|
||||
relationKind = distributedRelation->rd_rel->relkind;
|
||||
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
char *relationName = generate_relation_name(relationId, NIL);
|
||||
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
|
||||
errmsg("%s is not a regular or foreign table", relationName)));
|
||||
}
|
||||
|
||||
/* close the relation since we do not need anymore */
|
||||
relation_close(distributedRelation, AccessShareLock);
|
||||
|
||||
/* prepare distributedTableObject for dropping the table */
|
||||
distributedTableObject.classId = RelationRelationId;
|
||||
distributedTableObject.objectId = relationId;
|
||||
distributedTableObject.objectSubId = 0;
|
||||
|
||||
/* drop the server for the foreign relations */
|
||||
if (relationKind == RELKIND_FOREIGN_TABLE)
|
||||
{
|
||||
ObjectAddresses *objects = new_object_addresses();
|
||||
ObjectAddress foreignServerObject = { InvalidOid, InvalidOid, 0 };
|
||||
ForeignTable *foreignTable = GetForeignTable(relationId);
|
||||
Oid serverId = foreignTable->serverid;
|
||||
|
||||
/* prepare foreignServerObject for dropping the server */
|
||||
foreignServerObject.classId = ForeignServerRelationId;
|
||||
foreignServerObject.objectId = serverId;
|
||||
foreignServerObject.objectSubId = 0;
|
||||
|
||||
/* add the addresses that are going to be dropped */
|
||||
add_exact_object_address(&distributedTableObject, objects);
|
||||
add_exact_object_address(&foreignServerObject, objects);
|
||||
|
||||
/* drop both the table and the server */
|
||||
performMultipleDeletions(objects, DROP_RESTRICT,
|
||||
PERFORM_DELETION_INTERNAL);
|
||||
}
|
||||
else
|
||||
{
|
||||
/* drop the table only */
|
||||
performDeletion(&distributedTableObject, DROP_RESTRICT,
|
||||
PERFORM_DELETION_INTERNAL);
|
||||
}
|
||||
|
||||
/* iterate over shardList to delete the corresponding rows */
|
||||
foreach(shardCell, shardList)
|
||||
{
|
||||
List *shardPlacementList = NIL;
|
||||
ListCell *shardPlacementCell = NULL;
|
||||
uint64 *shardIdPointer = (uint64 *) lfirst(shardCell);
|
||||
uint64 shardId = (*shardIdPointer);
|
||||
|
||||
shardPlacementList = ShardPlacementList(shardId);
|
||||
foreach(shardPlacementCell, shardPlacementList)
|
||||
{
|
||||
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
|
||||
char *workerName = placement->nodeName;
|
||||
uint32 workerPort = placement->nodePort;
|
||||
|
||||
/* delete the row from pg_dist_shard_placement */
|
||||
DeleteShardPlacementRow(shardId, workerName, workerPort);
|
||||
}
|
||||
|
||||
/* delete the row from pg_dist_shard */
|
||||
DeleteShardRow(shardId);
|
||||
}
|
||||
|
||||
/* delete the row from pg_dist_partition */
|
||||
DeletePartitionRow(relationId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeletePartitionRow removes the row from pg_dist_partition where the logicalrelid
|
||||
* field equals to distributedRelationId. Then, the function invalidates the
|
||||
* metadata cache.
|
||||
*/
|
||||
void
|
||||
DeletePartitionRow(Oid distributedRelationId)
|
||||
{
|
||||
Relation pgDistPartition = NULL;
|
||||
HeapTuple heapTuple = NULL;
|
||||
SysScanDesc scanDescriptor = NULL;
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
|
||||
pgDistPartition = heap_open(DistPartitionRelationId(), RowExclusiveLock);
|
||||
|
||||
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
|
||||
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(distributedRelationId));
|
||||
|
||||
scanDescriptor = systable_beginscan(pgDistPartition, InvalidOid, false, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
if (!HeapTupleIsValid(heapTuple))
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not find valid entry for partition %d",
|
||||
distributedRelationId)));
|
||||
}
|
||||
|
||||
simple_heap_delete(pgDistPartition, &heapTuple->t_self);
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
|
||||
/* invalidate the cache */
|
||||
CitusInvalidateRelcacheByRelid(distributedRelationId);
|
||||
|
||||
/* increment the counter so that next command can see the row */
|
||||
CommandCounterIncrement();
|
||||
|
||||
heap_close(pgDistPartition, RowExclusiveLock);
|
||||
}
|
|
@ -38,6 +38,7 @@ extern void pg_get_query_def(Query *query, StringInfo buffer);
|
|||
extern void deparse_shard_query(Query *query, Oid distrelid, int64 shardid, StringInfo
|
||||
buffer);
|
||||
extern char * generate_relation_name(Oid relid, List *namespaces);
|
||||
extern char * generate_qualified_relation_name(Oid relid);
|
||||
|
||||
|
||||
#endif /* CITUS_RULEUTILS_H */
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* distribution_column.h
|
||||
* Type and function declarations used for handling the distribution
|
||||
* column of distributed tables.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
* $Id$
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef DISTRIBUTION_COLUMN_H
|
||||
#define DISTRIBUTION_COLUMN_H
|
||||
|
||||
|
||||
#include "utils/rel.h"
|
||||
|
||||
|
||||
/* Remaining metadata utility functions */
|
||||
extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,
|
||||
char *columnName);
|
||||
extern char * ColumnNameToColumn(Oid relationId, char *columnNodeString);
|
||||
|
||||
#endif /* DISTRIBUTION_COLUMN_H */
|
|
@ -81,8 +81,6 @@ extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32
|
|||
workerPort);
|
||||
|
||||
/* Remaining metadata utility functions */
|
||||
extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation,
|
||||
char *columnName);
|
||||
extern char * TableOwner(Oid relationId);
|
||||
extern void EnsureTablePermissions(Oid relationId, AclMode mode);
|
||||
extern void EnsureTableOwner(Oid relationId);
|
||||
|
|
|
@ -53,6 +53,7 @@ typedef struct
|
|||
|
||||
|
||||
extern bool IsDistributedTable(Oid relationId);
|
||||
extern List * DistributedTableList(void);
|
||||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
||||
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* metadata_sync.h
|
||||
* Type and function declarations used to sync metadata across all
|
||||
* workers.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef METADATA_SYNC_H
|
||||
#define METADATA_SYNC_H
|
||||
|
||||
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
|
||||
/* Functions declarations for metadata syncing */
|
||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||
extern List * MetadataCreateCommands(void);
|
||||
extern List * MetadataDropCommands(void);
|
||||
extern char * DistributionCreateCommand(DistTableCacheEntry *cacheEntry);
|
||||
extern char * DistributionDeleteCommand(char *schemaName,
|
||||
char *tableName);
|
||||
extern char * TableOwnerResetCommand(Oid distributedRelationId);
|
||||
extern char * NodeListInsertCommand(List *workerNodeList);
|
||||
extern List * ShardListInsertCommand(List *shardIntervalList);
|
||||
extern char * NodeDeleteCommand(uint32 nodeId);
|
||||
extern List * GetDistributedTableDDLEvents(DistTableCacheEntry *cacheEntry);
|
||||
|
||||
|
||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
|
||||
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
||||
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
|
||||
|
||||
|
||||
#endif /* METADATA_SYNC_H */
|
|
@ -51,10 +51,6 @@ CREATE FUNCTION acquire_shared_shard_lock(bigint)
|
|||
RETURNS void
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
CREATE FUNCTION column_name_to_column(regclass, text)
|
||||
RETURNS text
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
-- ===================================================================
|
||||
-- test distribution metadata functionality
|
||||
-- ===================================================================
|
||||
|
|
|
@ -31,6 +31,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-2';
|
|||
ALTER EXTENSION citus UPDATE TO '6.0-3';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-6';
|
||||
-- drop extension an re-create in newest version
|
||||
DROP EXTENSION citus;
|
||||
\c
|
||||
|
|
|
@ -0,0 +1,151 @@
|
|||
--
|
||||
-- MULTI_METADATA_SNAPSHOT
|
||||
--
|
||||
-- Tests for metadata snapshot functions.
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000;
|
||||
-- Create the necessary test utility function
|
||||
CREATE FUNCTION master_metadata_snapshot()
|
||||
RETURNS text[]
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
COMMENT ON FUNCTION master_metadata_snapshot()
|
||||
IS 'commands to create the metadata snapshot';
|
||||
|
||||
-- Show that none of the existing tables are qualified to be MX tables
|
||||
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
||||
logicalrelid | partmethod | partkey | colocationid | repmodel
|
||||
--------------+------------+---------+--------------+----------
|
||||
(0 rows)
|
||||
|
||||
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and
|
||||
-- pg_dist_node entries
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
-------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
(3 rows)
|
||||
|
||||
-- Create a test table with constraints and SERIAL
|
||||
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL);
|
||||
SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT master_create_worker_shards('mx_test_table', 8, 1);
|
||||
master_create_worker_shards
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Set the replication model of the test table to streaming replication so that it is
|
||||
-- considered as an MX table
|
||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass;
|
||||
-- Show that the created MX table is included in the metadata snapshot
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
(10 rows)
|
||||
|
||||
-- Show that CREATE INDEX commands are included in the metadata snapshot
|
||||
CREATE INDEX mx_index ON mx_test_table(col_2);
|
||||
NOTICE: using one-phase commit for distributed DDL commands
|
||||
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
|
||||
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
(11 rows)
|
||||
|
||||
-- Show that schema changes are included in the metadata snapshot
|
||||
CREATE SCHEMA mx_testing_schema;
|
||||
ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema;
|
||||
WARNING: not propagating ALTER ... SET SCHEMA commands to worker nodes
|
||||
HINT: Connect to worker nodes directly to manually change schemas of affected objects.
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
(12 rows)
|
||||
|
||||
-- Show that append distributed tables are not included in the metadata snapshot
|
||||
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
|
||||
SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append');
|
||||
master_create_distributed_table
|
||||
---------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
(12 rows)
|
||||
|
||||
-- Show that range distributed tables are not included in the metadata snapshot
|
||||
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
unnest
|
||||
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|
||||
TRUNCATE pg_dist_node
|
||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack) VALUES (2, 2, 'localhost', 57638, 'default'),(1, 1, 'localhost', 57637, 'default')
|
||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
|
||||
ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres
|
||||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
(12 rows)
|
||||
|
|
@ -133,6 +133,7 @@ test: multi_data_types
|
|||
test: multi_repartition_udt
|
||||
test: multi_repartitioned_subquery_udf
|
||||
test: multi_modifying_xacts
|
||||
test: multi_metadata_snapshot
|
||||
|
||||
# ---------
|
||||
# multi_copy creates hash and range-partitioned tables and performs COPY
|
||||
|
|
|
@ -67,11 +67,6 @@ CREATE FUNCTION acquire_shared_shard_lock(bigint)
|
|||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
CREATE FUNCTION column_name_to_column(regclass, text)
|
||||
RETURNS text
|
||||
AS 'citus'
|
||||
LANGUAGE C STRICT;
|
||||
|
||||
-- ===================================================================
|
||||
-- test distribution metadata functionality
|
||||
-- ===================================================================
|
||||
|
|
|
@ -36,6 +36,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-2';
|
|||
ALTER EXTENSION citus UPDATE TO '6.0-3';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-4';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-5';
|
||||
ALTER EXTENSION citus UPDATE TO '6.0-6';
|
||||
|
||||
-- drop extension an re-create in newest version
|
||||
DROP EXTENSION citus;
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
--
|
||||
-- MULTI_METADATA_SNAPSHOT
|
||||
--
|
||||
|
||||
-- Tests for metadata snapshot functions.
|
||||
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1310000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000;
|
||||
|
||||
|
||||
-- Create the necessary test utility function
|
||||
CREATE FUNCTION master_metadata_snapshot()
|
||||
RETURNS text[]
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus';
|
||||
|
||||
COMMENT ON FUNCTION master_metadata_snapshot()
|
||||
IS 'commands to create the metadata snapshot';
|
||||
|
||||
-- Show that none of the existing tables are qualified to be MX tables
|
||||
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
|
||||
|
||||
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and
|
||||
-- pg_dist_node entries
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
|
||||
-- Create a test table with constraints and SERIAL
|
||||
CREATE TABLE mx_test_table (col_1 int UNIQUE, col_2 text NOT NULL, col_3 SERIAL);
|
||||
SELECT master_create_distributed_table('mx_test_table', 'col_1', 'hash');
|
||||
SELECT master_create_worker_shards('mx_test_table', 8, 1);
|
||||
|
||||
-- Set the replication model of the test table to streaming replication so that it is
|
||||
-- considered as an MX table
|
||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass;
|
||||
|
||||
-- Show that the created MX table is included in the metadata snapshot
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
|
||||
-- Show that CREATE INDEX commands are included in the metadata snapshot
|
||||
CREATE INDEX mx_index ON mx_test_table(col_2);
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
|
||||
-- Show that schema changes are included in the metadata snapshot
|
||||
CREATE SCHEMA mx_testing_schema;
|
||||
ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema;
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
|
||||
-- Show that append distributed tables are not included in the metadata snapshot
|
||||
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
|
||||
SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append');
|
||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(master_metadata_snapshot());
|
||||
|
||||
-- Show that range distributed tables are not included in the metadata snapshot
|
||||
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(master_metadata_snapshot());
|
Loading…
Reference in New Issue