mirror of https://github.com/citusdata/citus.git
Adds support for table undistribution
parent
7378ab6bf8
commit
375310b7f1
|
@ -12,6 +12,7 @@
|
|||
#include "miscadmin.h"
|
||||
|
||||
#include "distributed/pg_version_constants.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "access/hash.h"
|
||||
|
@ -33,15 +34,19 @@
|
|||
#include "catalog/pg_trigger.h"
|
||||
#include "commands/defrem.h"
|
||||
#include "commands/extension.h"
|
||||
#include "commands/tablecmds.h"
|
||||
#include "commands/trigger.h"
|
||||
#include "distributed/commands/multi_copy.h"
|
||||
#include "distributed/citus_ruleutils.h"
|
||||
#include "distributed/colocation_utils.h"
|
||||
#include "distributed/commands.h"
|
||||
#include "distributed/deparser.h"
|
||||
#include "distributed/distribution_column.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/metadata/dependency.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
|
@ -119,11 +124,15 @@ static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
|
|||
DestReceiver *copyDest,
|
||||
TupleTableSlot *slot,
|
||||
EState *estate);
|
||||
static void UndistributeTable(Oid relationId);
|
||||
static List * GetViewCreationCommandsOfTable(Oid relationId);
|
||||
static void ReplaceTable(Oid sourceId, Oid targetId);
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
PG_FUNCTION_INFO_V1(master_create_distributed_table);
|
||||
PG_FUNCTION_INFO_V1(create_distributed_table);
|
||||
PG_FUNCTION_INFO_V1(create_reference_table);
|
||||
PG_FUNCTION_INFO_V1(undistribute_table);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -304,6 +313,25 @@ create_reference_table(PG_FUNCTION_ARGS)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* undistribute_table gets a distributed table name and
|
||||
* udistributes it.
|
||||
*/
|
||||
Datum
|
||||
undistribute_table(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
EnsureTableOwner(relationId);
|
||||
|
||||
UndistributeTable(relationId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureCitusTableCanBeCreated checks if
|
||||
* - we are on the coordinator
|
||||
|
@ -1502,3 +1530,231 @@ RelationUsesHeapAccessMethodOrNone(Relation relation)
|
|||
return true;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* UndistributeTable undistributes the given table. The undistribution is done by
|
||||
* creating a new table, moving everything to the new table and dropping the old one.
|
||||
* So the oid of the table is not preserved.
|
||||
*
|
||||
* The undistributed table will have the same name, columns and rows. It will also have
|
||||
* partitions, views, sequences of the old table. Finally it will have everything created
|
||||
* by GetTableConstructionCommands function, which include indexes. These will be
|
||||
* re-created during undistribution, so their oids are not preserved either (except for
|
||||
* sequences). However, their names are preserved.
|
||||
*
|
||||
* The tables with references are not supported. The function gives an error if there are
|
||||
* any references to or from the table.
|
||||
*
|
||||
* The dropping of old table is done with CASCADE. Anything not mentioned here will
|
||||
* be dropped.
|
||||
*/
|
||||
void
|
||||
UndistributeTable(Oid relationId)
|
||||
{
|
||||
Relation relation = try_relation_open(relationId, ExclusiveLock);
|
||||
if (relation == NULL)
|
||||
{
|
||||
ereport(ERROR, (errmsg("Cannot undistribute table"),
|
||||
errdetail("No such distributed table exists. "
|
||||
"Might have already been undistributed.")));
|
||||
}
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
ereport(ERROR, (errmsg("Cannot undistribute table."),
|
||||
errdetail("The table is not distributed.")));
|
||||
}
|
||||
|
||||
if (TableReferencing(relationId))
|
||||
{
|
||||
ereport(ERROR, (errmsg("Cannot undistribute table "
|
||||
"because it has a foreign key.")));
|
||||
}
|
||||
|
||||
if (TableReferenced(relationId))
|
||||
{
|
||||
ereport(ERROR, (errmsg("Cannot undistribute table "
|
||||
"because a foreign key references to it.")));
|
||||
}
|
||||
|
||||
|
||||
List *tableBuildingCommands = GetTableBuildingCommands(relationId, true);
|
||||
List *tableConstructionCommands = GetTableConstructionCommands(relationId);
|
||||
|
||||
tableConstructionCommands = list_concat(tableConstructionCommands,
|
||||
GetViewCreationCommandsOfTable(relationId));
|
||||
|
||||
int spiResult = SPI_connect();
|
||||
if (spiResult != SPI_OK_CONNECT)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not connect to SPI manager")));
|
||||
}
|
||||
|
||||
char *relationName = get_rel_name(relationId);
|
||||
Oid schemaId = get_rel_namespace(relationId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
||||
if (PartitionedTable(relationId))
|
||||
{
|
||||
ereport(NOTICE, (errmsg("Undistributing the partitions of %s",
|
||||
quote_qualified_identifier(schemaName, relationName))));
|
||||
List *partitionList = PartitionList(relationId);
|
||||
Oid partitionRelationId = InvalidOid;
|
||||
foreach_oid(partitionRelationId, partitionList)
|
||||
{
|
||||
char *detachPartitionCommand = GenerateDetachPartitionCommand(
|
||||
partitionRelationId);
|
||||
char *attachPartitionCommand = GenerateAlterTableAttachPartitionCommand(
|
||||
partitionRelationId);
|
||||
|
||||
/*
|
||||
* We first detach the partitions to be able to undistribute them separately.
|
||||
*/
|
||||
spiResult = SPI_execute(detachPartitionCommand, false, 0);
|
||||
if (spiResult != SPI_OK_UTILITY)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not run SPI query")));
|
||||
}
|
||||
tableBuildingCommands = lappend(tableBuildingCommands,
|
||||
attachPartitionCommand);
|
||||
UndistributeTable(partitionRelationId);
|
||||
}
|
||||
}
|
||||
|
||||
char *tempName = pstrdup(relationName);
|
||||
uint32 hashOfName = hash_any((unsigned char *) tempName, strlen(tempName));
|
||||
AppendShardIdToName(&tempName, hashOfName);
|
||||
|
||||
char *tableCreationCommand = NULL;
|
||||
|
||||
ereport(NOTICE, (errmsg("Creating a new local table for %s",
|
||||
quote_qualified_identifier(schemaName, relationName))));
|
||||
|
||||
foreach_ptr(tableCreationCommand, tableBuildingCommands)
|
||||
{
|
||||
Node *parseTree = ParseTreeNode(tableCreationCommand);
|
||||
|
||||
RelayEventExtendNames(parseTree, schemaName, hashOfName);
|
||||
CitusProcessUtility(parseTree, tableCreationCommand, PROCESS_UTILITY_TOPLEVEL,
|
||||
NULL, None_Receiver, NULL);
|
||||
}
|
||||
|
||||
ReplaceTable(relationId, get_relname_relid(tempName, schemaId));
|
||||
|
||||
char *tableConstructionCommand = NULL;
|
||||
foreach_ptr(tableConstructionCommand, tableConstructionCommands)
|
||||
{
|
||||
spiResult = SPI_execute(tableConstructionCommand, false, 0);
|
||||
if (spiResult != SPI_OK_UTILITY)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not run SPI query")));
|
||||
}
|
||||
}
|
||||
|
||||
spiResult = SPI_finish();
|
||||
if (spiResult != SPI_OK_FINISH)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not finish SPI connection")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetViewCreationCommandsOfTable takes a table oid generates the CREATE VIEW
|
||||
* commands for views that depend to the given table. This includes the views
|
||||
* that recursively depend on the table too.
|
||||
*/
|
||||
List *
|
||||
GetViewCreationCommandsOfTable(Oid relationId)
|
||||
{
|
||||
List *views = GetDependingViews(relationId);
|
||||
List *commands = NIL;
|
||||
|
||||
Oid viewOid = InvalidOid;
|
||||
foreach_oid(viewOid, views)
|
||||
{
|
||||
Datum viewDefinitionDatum = DirectFunctionCall1(pg_get_viewdef,
|
||||
ObjectIdGetDatum(viewOid));
|
||||
char *viewDefinition = TextDatumGetCString(viewDefinitionDatum);
|
||||
StringInfo query = makeStringInfo();
|
||||
char *viewName = get_rel_name(viewOid);
|
||||
char *schemaName = get_namespace_name(get_rel_namespace(viewOid));
|
||||
char *qualifiedViewName = quote_qualified_identifier(schemaName, viewName);
|
||||
appendStringInfo(query,
|
||||
"CREATE VIEW %s AS %s",
|
||||
qualifiedViewName,
|
||||
viewDefinition);
|
||||
commands = lappend(commands, query->data);
|
||||
}
|
||||
return commands;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReplaceTable replaces the source table with the target table.
|
||||
* It moves all the rows of the source table to target table with INSERT SELECT.
|
||||
* Changes the dependencies of the sequences owned by source table to target table.
|
||||
* Then drops the source table and renames the target table to source tables name.
|
||||
*
|
||||
* Source and target tables need to be in the same schema and have the same columns.
|
||||
*/
|
||||
void
|
||||
ReplaceTable(Oid sourceId, Oid targetId)
|
||||
{
|
||||
char *sourceName = get_rel_name(sourceId);
|
||||
char *targetName = get_rel_name(targetId);
|
||||
Oid schemaId = get_rel_namespace(sourceId);
|
||||
char *schemaName = get_namespace_name(schemaId);
|
||||
|
||||
StringInfo query = makeStringInfo();
|
||||
|
||||
ereport(NOTICE, (errmsg("Moving the data of %s",
|
||||
quote_qualified_identifier(schemaName, sourceName))));
|
||||
|
||||
appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s",
|
||||
quote_qualified_identifier(schemaName, targetName),
|
||||
quote_qualified_identifier(schemaName, sourceName));
|
||||
int spiResult = SPI_execute(query->data, false, 0);
|
||||
if (spiResult != SPI_OK_INSERT)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not run SPI query")));
|
||||
}
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
List *ownedSequences = getOwnedSequences(sourceId);
|
||||
#else
|
||||
List *ownedSequences = getOwnedSequences(sourceId, InvalidAttrNumber);
|
||||
#endif
|
||||
Oid sequenceOid = InvalidOid;
|
||||
foreach_oid(sequenceOid, ownedSequences)
|
||||
{
|
||||
changeDependencyFor(RelationRelationId, sequenceOid,
|
||||
RelationRelationId, sourceId, targetId);
|
||||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("Dropping the old %s",
|
||||
quote_qualified_identifier(schemaName, sourceName))));
|
||||
|
||||
resetStringInfo(query);
|
||||
appendStringInfo(query, "DROP TABLE %s CASCADE",
|
||||
quote_qualified_identifier(schemaName, sourceName));
|
||||
spiResult = SPI_execute(query->data, false, 0);
|
||||
if (spiResult != SPI_OK_UTILITY)
|
||||
{
|
||||
ereport(ERROR, (errmsg("could not run SPI query")));
|
||||
}
|
||||
|
||||
ereport(NOTICE, (errmsg("Renaming the new table to %s",
|
||||
quote_qualified_identifier(schemaName, sourceName))));
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||
RenameRelationInternal(targetId,
|
||||
sourceName, false, false);
|
||||
#else
|
||||
RenameRelationInternal(targetId,
|
||||
sourceName, false);
|
||||
#endif
|
||||
}
|
||||
|
|
|
@ -10,16 +10,24 @@
|
|||
|
||||
#include "postgres.h"
|
||||
|
||||
#include "distributed/pg_version_constants.h"
|
||||
|
||||
#include "access/genam.h"
|
||||
#include "access/heapam.h"
|
||||
#include "access/htup_details.h"
|
||||
#include "access/skey.h"
|
||||
#include "access/sysattr.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/indexing.h"
|
||||
#include "catalog/pg_class.h"
|
||||
#include "catalog/pg_depend.h"
|
||||
#include "catalog/pg_rewrite.h"
|
||||
#include "catalog/pg_rewrite_d.h"
|
||||
#include "catalog/pg_shdepend.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#if PG_VERSION_NUM >= PG_VERSION_13
|
||||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata/dependency.h"
|
||||
|
@ -96,6 +104,17 @@ typedef struct DependencyDefinition
|
|||
} data;
|
||||
} DependencyDefinition;
|
||||
|
||||
/*
|
||||
* ViewDependencyNode represents a view (or possibly a table) in a dependency graph of
|
||||
* views.
|
||||
*/
|
||||
typedef struct ViewDependencyNode
|
||||
{
|
||||
Oid id;
|
||||
int remainingDependencyCount;
|
||||
List *dependingNodes;
|
||||
}ViewDependencyNode;
|
||||
|
||||
|
||||
static ObjectAddress DependencyDefinitionObjectAddress(DependencyDefinition *definition);
|
||||
|
||||
|
@ -130,6 +149,8 @@ static void ApplyAddToDependencyList(ObjectAddressCollector *collector,
|
|||
DependencyDefinition *definition);
|
||||
static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
|
||||
ObjectAddress target);
|
||||
static ViewDependencyNode * BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap);
|
||||
static Oid GetDependingView(Form_pg_depend pg_depend);
|
||||
|
||||
|
||||
/*
|
||||
|
@ -910,3 +931,152 @@ DependencyDefinitionObjectAddress(DependencyDefinition *definition)
|
|||
|
||||
ereport(ERROR, (errmsg("unsupported dependency definition mode")));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* BuildViewDependencyGraph gets a relation (or a view) and builds a dependency graph for the
|
||||
* depending views.
|
||||
*/
|
||||
static ViewDependencyNode *
|
||||
BuildViewDependencyGraph(Oid relationId, HTAB *nodeMap)
|
||||
{
|
||||
bool found = false;
|
||||
ViewDependencyNode *node = (ViewDependencyNode *) hash_search(nodeMap, &relationId,
|
||||
HASH_ENTER, &found);
|
||||
|
||||
if (found)
|
||||
{
|
||||
return node;
|
||||
}
|
||||
|
||||
node->id = relationId;
|
||||
node->remainingDependencyCount = 0;
|
||||
node->dependingNodes = NIL;
|
||||
|
||||
ObjectAddress target = { 0 };
|
||||
ObjectAddressSet(target, RelationRelationId, relationId);
|
||||
|
||||
ScanKeyData key[2];
|
||||
HeapTuple depTup = NULL;
|
||||
|
||||
/*
|
||||
* iterate the actual pg_depend catalog
|
||||
*/
|
||||
Relation depRel = table_open(DependRelationId, AccessShareLock);
|
||||
|
||||
ScanKeyInit(&key[0], Anum_pg_depend_refclassid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(target.classId));
|
||||
ScanKeyInit(&key[1], Anum_pg_depend_refobjid, BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(target.objectId));
|
||||
SysScanDesc depScan = systable_beginscan(depRel, DependReferenceIndexId,
|
||||
true, NULL, 2, key);
|
||||
|
||||
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
|
||||
{
|
||||
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
|
||||
|
||||
Oid dependingView = GetDependingView(pg_depend);
|
||||
if (dependingView != InvalidOid)
|
||||
{
|
||||
ViewDependencyNode *dependingNode = BuildViewDependencyGraph(dependingView,
|
||||
nodeMap);
|
||||
|
||||
node->dependingNodes = lappend(node->dependingNodes, dependingNode);
|
||||
dependingNode->remainingDependencyCount++;
|
||||
}
|
||||
}
|
||||
|
||||
systable_endscan(depScan);
|
||||
relation_close(depRel, AccessShareLock);
|
||||
|
||||
return node;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDependingViews takes a relation id, finds the views that depend on the relation
|
||||
* and returns list of the oids of those views. It recurses on the pg_depend table to
|
||||
* find the views that recursively depend on the table.
|
||||
*
|
||||
* The returned views will have the correct order for creating them, from the point of
|
||||
* dependencies between.
|
||||
*/
|
||||
List *
|
||||
GetDependingViews(Oid relationId)
|
||||
{
|
||||
HASHCTL info;
|
||||
memset(&info, 0, sizeof(info));
|
||||
info.keysize = sizeof(Oid);
|
||||
info.entrysize = sizeof(ViewDependencyNode);
|
||||
info.hash = oid_hash;
|
||||
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION);
|
||||
HTAB *nodeMap = hash_create("view dependency map (oid)", 32, &info, hashFlags);
|
||||
|
||||
ViewDependencyNode *tableNode = BuildViewDependencyGraph(relationId, nodeMap);
|
||||
|
||||
List *dependingViews = NIL;
|
||||
List *nodeQueue = list_make1(tableNode);
|
||||
ViewDependencyNode *node = NULL;
|
||||
foreach_ptr(node, nodeQueue)
|
||||
{
|
||||
ViewDependencyNode *dependingNode = NULL;
|
||||
foreach_ptr(dependingNode, node->dependingNodes)
|
||||
{
|
||||
dependingNode->remainingDependencyCount--;
|
||||
if (dependingNode->remainingDependencyCount == 0)
|
||||
{
|
||||
nodeQueue = lappend(nodeQueue, dependingNode);
|
||||
dependingViews = lappend_oid(dependingViews, dependingNode->id);
|
||||
}
|
||||
}
|
||||
}
|
||||
return dependingViews;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetDependingView gets a row of pg_depend and returns the oid of the view that is depended.
|
||||
* If the depended object is not a rewrite object, the object to rewrite is not a view or it
|
||||
* is the same view with the depending one InvalidOid is returned.
|
||||
*/
|
||||
Oid
|
||||
GetDependingView(Form_pg_depend pg_depend)
|
||||
{
|
||||
if (pg_depend->classid != RewriteRelationId)
|
||||
{
|
||||
return InvalidOid;
|
||||
}
|
||||
|
||||
Relation rewriteRel = table_open(RewriteRelationId, AccessShareLock);
|
||||
ScanKeyData rkey[1];
|
||||
|
||||
#if PG_VERSION_NUM >= PG_VERSION_12
|
||||
ScanKeyInit(&rkey[0],
|
||||
Anum_pg_rewrite_oid,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(pg_depend->objid));
|
||||
#else
|
||||
ScanKeyInit(&rkey[0],
|
||||
ObjectIdAttributeNumber,
|
||||
BTEqualStrategyNumber, F_OIDEQ,
|
||||
ObjectIdGetDatum(pg_depend->objid));
|
||||
#endif
|
||||
|
||||
SysScanDesc rscan = systable_beginscan(rewriteRel, RewriteOidIndexId,
|
||||
true, NULL, 1, rkey);
|
||||
|
||||
HeapTuple rewriteTup = systable_getnext(rscan);
|
||||
Form_pg_rewrite pg_rewrite = (Form_pg_rewrite) GETSTRUCT(rewriteTup);
|
||||
|
||||
bool isView = get_rel_relkind(pg_rewrite->ev_class) == RELKIND_VIEW;
|
||||
bool isDifferentThanRef = pg_rewrite->ev_class != pg_depend->refobjid;
|
||||
|
||||
systable_endscan(rscan);
|
||||
relation_close(rewriteRel, AccessShareLock);
|
||||
|
||||
if (isView && isDifferentThanRef)
|
||||
{
|
||||
return pg_rewrite->ev_class;
|
||||
}
|
||||
return InvalidOid;
|
||||
}
|
||||
|
|
|
@ -539,6 +539,23 @@ GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
|
|||
includeSequenceDefaults);
|
||||
tableDDLEventList = list_concat(tableDDLEventList, tableCreationCommandList);
|
||||
|
||||
List *otherCommands = GetTableConstructionCommands(relationId);
|
||||
tableDDLEventList = list_concat(tableDDLEventList, otherCommands);
|
||||
|
||||
return tableDDLEventList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTableConstructionCommands takes in a relationId and returns the list
|
||||
* of DDL commands needed to reconstruct the relation except the ones that actually
|
||||
* create the table.
|
||||
*/
|
||||
List *
|
||||
GetTableConstructionCommands(Oid relationId)
|
||||
{
|
||||
List *tableDDLEventList = NIL;
|
||||
|
||||
List *indexAndConstraintCommandList = GetTableIndexAndConstraintCommands(relationId);
|
||||
tableDDLEventList = list_concat(tableDDLEventList, indexAndConstraintCommandList);
|
||||
|
||||
|
@ -612,6 +629,30 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
|
|||
tableDDLEventList = lappend(tableDDLEventList, serverDef);
|
||||
}
|
||||
|
||||
List *tableBuildingCommands = GetTableBuildingCommands(relationId,
|
||||
includeSequenceDefaults);
|
||||
tableDDLEventList = list_concat(tableDDLEventList,
|
||||
tableBuildingCommands);
|
||||
|
||||
/* revert back to original search_path */
|
||||
PopOverrideSearchPath();
|
||||
|
||||
return tableDDLEventList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetTableBuildingCommands takes in a relationId, and returns the list of DDL
|
||||
* commands needed to rebuild the relation. This does not include the schema
|
||||
* and the server commands.
|
||||
*/
|
||||
List *
|
||||
GetTableBuildingCommands(Oid relationId, bool includeSequenceDefaults)
|
||||
{
|
||||
List *tableDDLEventList = NIL;
|
||||
|
||||
PushOverrideEmptySearchPath(CurrentMemoryContext);
|
||||
|
||||
/* fetch table schema and column option definitions */
|
||||
char *tableSchemaDef = pg_get_tableschemadef_string(relationId,
|
||||
includeSequenceDefaults);
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
-- citus--9.4-1--9.5-1
|
||||
|
||||
-- bump version to 9.5-1
|
||||
#include "udfs/undistribute_table/9.5-1.sql"
|
||||
|
||||
SET search_path = 'pg_catalog';
|
||||
|
||||
DROP FUNCTION task_tracker_assign_task(bigint, integer, text);
|
||||
|
|
|
@ -57,3 +57,5 @@ CREATE TRIGGER dist_authinfo_task_tracker_cache_invalidate
|
|||
FOR EACH STATEMENT EXECUTE PROCEDURE task_tracker_conninfo_cache_invalidate();
|
||||
|
||||
RESET search_path;
|
||||
|
||||
DROP FUNCTION pg_catalog.undistribute_table(table_name regclass);
|
||||
|
|
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$undistribute_table$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
IS 'undistributes a distributed table';
|
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$undistribute_table$$;
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.undistribute_table(
|
||||
table_name regclass)
|
||||
IS 'undistributes a distributed table';
|
|
@ -101,7 +101,9 @@ extern uint64 GetNextShardId(void);
|
|||
extern uint64 GetNextPlacementId(void);
|
||||
extern Oid ResolveRelationId(text *relationName, bool missingOk);
|
||||
extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
|
||||
extern List * GetTableConstructionCommands(Oid relationId);
|
||||
extern List * GetTableCreationCommands(Oid relationId, bool forShardCreation);
|
||||
extern List * GetTableBuildingCommands(Oid relationId, bool includeSequenceDefaults);
|
||||
extern List * GetTableIndexAndConstraintCommands(Oid relationId);
|
||||
extern bool IndexImpliedByAConstraint(Form_pg_index indexForm);
|
||||
extern char ShardStorageType(Oid relationId);
|
||||
|
@ -150,6 +152,7 @@ extern Datum master_drop_sequences(PG_FUNCTION_ARGS);
|
|||
extern Datum master_modify_multiple_shards(PG_FUNCTION_ARGS);
|
||||
extern Datum lock_relation_if_exists(PG_FUNCTION_ARGS);
|
||||
extern Datum master_drop_all_shards(PG_FUNCTION_ARGS);
|
||||
extern int MasterDropAllShards(Oid relationId, char *schemaName, char *relationName);
|
||||
|
||||
/* function declarations for shard creation functionality */
|
||||
extern Datum master_create_worker_shards(PG_FUNCTION_ARGS);
|
||||
|
|
|
@ -21,5 +21,6 @@ extern List * GetUniqueDependenciesList(List *objectAddressesList);
|
|||
extern List * GetDependenciesForObject(const ObjectAddress *target);
|
||||
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
|
||||
extern bool SupportedDependencyByCitus(const ObjectAddress *address);
|
||||
extern List * GetDependingViews(Oid relationId);
|
||||
|
||||
#endif /* CITUS_DEPENDENCY_H */
|
||||
|
|
|
@ -0,0 +1,237 @@
|
|||
Parsed test spec with 2 sessions
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-undistribute s1-commit
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-undistribute: <... completed>
|
||||
error in steps s1-commit s2-undistribute: ERROR: Cannot undistribute table
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-select s1-commit
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-select: <... completed>
|
||||
a b
|
||||
|
||||
1 2
|
||||
3 4
|
||||
5 6
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-insert s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-insert:
|
||||
INSERT INTO dist_table VALUES (7, 8), (9, 10);
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
3 4
|
||||
5 6
|
||||
7 8
|
||||
9 10
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-insert-select s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-insert-select:
|
||||
INSERT INTO dist_table SELECT * FROM dist_table;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-insert-select: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
1 2
|
||||
3 4
|
||||
3 4
|
||||
5 6
|
||||
5 6
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-delete s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-delete:
|
||||
DELETE FROM dist_table WHERE a = 3;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-delete: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
5 6
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-copy s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-copy:
|
||||
COPY dist_table FROM PROGRAM 'echo 11, 12 && echo 13, 14' WITH CSV;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-copy: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a b
|
||||
|
||||
1 2
|
||||
3 4
|
||||
5 6
|
||||
11 12
|
||||
13 14
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-drop s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-drop:
|
||||
DROP TABLE dist_table;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-drop: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
ERROR: relation "dist_table" does not exist
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-truncate s1-commit s2-select
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-truncate:
|
||||
TRUNCATE dist_table;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-truncate: <... completed>
|
||||
step s2-select:
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
|
||||
a b
|
||||
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-select-for-update s1-commit
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-select-for-update:
|
||||
SELECT * FROM dist_table WHERE a = 5 FOR UPDATE;
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-select-for-update: <... completed>
|
||||
a b
|
||||
|
||||
5 6
|
||||
|
||||
starting permutation: s1-begin s1-undistribute s2-create-index-concurrently s1-commit
|
||||
step s1-begin:
|
||||
BEGIN;
|
||||
|
||||
step s1-undistribute:
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
undistribute_table
|
||||
|
||||
|
||||
step s2-create-index-concurrently:
|
||||
CREATE INDEX CONCURRENTLY idx ON dist_table (a);
|
||||
<waiting ...>
|
||||
step s1-commit:
|
||||
COMMIT;
|
||||
|
||||
step s2-create-index-concurrently: <... completed>
|
|
@ -418,7 +418,7 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 9.5-1
|
||||
ALTER EXTENSION citus UPDATE TO '9.5-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function task_tracker_assign_task(bigint,integer,text) |
|
||||
function task_tracker_cleanup_job(bigint) |
|
||||
|
@ -426,7 +426,8 @@ SELECT * FROM print_extension_changes();
|
|||
function task_tracker_task_status(bigint,integer) |
|
||||
function worker_execute_sql_task(bigint,integer,text,boolean) |
|
||||
function worker_merge_files_and_run_query(bigint,integer,text,text) |
|
||||
(6 rows)
|
||||
| function undistribute_table(regclass)
|
||||
(7 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -2724,6 +2724,7 @@ SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
|
|||
5 | 5
|
||||
(6 rows)
|
||||
|
||||
DROP TABLE dist_table_with_sequence;
|
||||
-- Select from distributed table into reference table
|
||||
CREATE TABLE ref_table (user_id serial, value_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
|
|
@ -0,0 +1,362 @@
|
|||
CREATE SCHEMA undistribute_table;
|
||||
SET search_path TO undistribute_table;
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE dist_table (id INT, a INT, b TEXT);
|
||||
SELECT create_distributed_table('dist_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (1, 3, 'abc');
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid = 'dist_table'::regclass;
|
||||
logicalrelid
|
||||
---------------------------------------------------------------------
|
||||
dist_table
|
||||
(1 row)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'dist\_table\_%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,2)
|
||||
(localhost,57638,t,2)
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM dist_table ORDER BY 1, 2, 3;
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | abc
|
||||
1 | 3 | abc
|
||||
2 | 3 | abcd
|
||||
(3 rows)
|
||||
|
||||
SELECT undistribute_table('dist_table');
|
||||
NOTICE: Creating a new local table for undistribute_table.dist_table
|
||||
NOTICE: Moving the data of undistribute_table.dist_table
|
||||
NOTICE: Dropping the old undistribute_table.dist_table
|
||||
NOTICE: Renaming the new table to undistribute_table.dist_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid = 'dist_table'::regclass;
|
||||
logicalrelid
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'dist\_table\_%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,0)
|
||||
(localhost,57638,t,0)
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM dist_table ORDER BY 1, 2, 3;
|
||||
id | a | b
|
||||
---------------------------------------------------------------------
|
||||
1 | 2 | abc
|
||||
1 | 3 | abc
|
||||
2 | 3 | abcd
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE dist_table;
|
||||
-- test indexes
|
||||
CREATE TABLE dist_table (id INT, a INT, b TEXT);
|
||||
SELECT create_distributed_table('dist_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (1, 3, 'abc');
|
||||
CREATE INDEX index1 ON dist_table (id);
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'dist_table';
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
---------------------------------------------------------------------
|
||||
undistribute_table | dist_table | index1 | | CREATE INDEX index1 ON undistribute_table.dist_table USING btree (id)
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('dist_table');
|
||||
NOTICE: Creating a new local table for undistribute_table.dist_table
|
||||
NOTICE: Moving the data of undistribute_table.dist_table
|
||||
NOTICE: Dropping the old undistribute_table.dist_table
|
||||
NOTICE: Renaming the new table to undistribute_table.dist_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'dist_table';
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
---------------------------------------------------------------------
|
||||
undistribute_table | dist_table | index1 | | CREATE INDEX index1 ON undistribute_table.dist_table USING btree (id)
|
||||
(1 row)
|
||||
|
||||
DROP TABLE dist_table;
|
||||
-- test tables with references
|
||||
-- we expect errors
|
||||
CREATE TABLE referenced_table (id INT PRIMARY KEY, a INT, b TEXT);
|
||||
SELECT create_distributed_table('referenced_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO referenced_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (4, 3, 'abc');
|
||||
CREATE TABLE referencing_table (id INT REFERENCES referenced_table (id), a INT, b TEXT);
|
||||
SELECT create_distributed_table('referencing_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO referencing_table VALUES (4, 6, 'cba'), (1, 1, 'dcba'), (2, 3, 'aaa');
|
||||
SELECT undistribute_table('referenced_table');
|
||||
ERROR: Cannot undistribute table because a foreign key references to it.
|
||||
SELECT undistribute_table('referencing_table');
|
||||
ERROR: Cannot undistribute table because it has a foreign key.
|
||||
DROP TABLE referenced_table, referencing_table;
|
||||
-- test partitioned tables
|
||||
CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id);
|
||||
CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5);
|
||||
CREATE TABLE partitioned_table_6_10 PARTITION OF partitioned_table FOR VALUES FROM (6) TO (10);
|
||||
SELECT create_distributed_table('partitioned_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO partitioned_table VALUES (2, 12), (7, 2);
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE 'partitioned\_table%' ORDER BY 1;
|
||||
logicalrelid
|
||||
---------------------------------------------------------------------
|
||||
partitioned_table
|
||||
partitioned_table_1_5
|
||||
partitioned_table_6_10
|
||||
(3 rows)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'partitioned\_table%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,6)
|
||||
(localhost,57638,t,6)
|
||||
(2 rows)
|
||||
|
||||
SELECT inhrelid::regclass FROM pg_catalog.pg_inherits WHERE inhparent = 'partitioned_table'::regclass ORDER BY 1;
|
||||
inhrelid
|
||||
---------------------------------------------------------------------
|
||||
partitioned_table_1_5
|
||||
partitioned_table_6_10
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM partitioned_table ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
2 | 12
|
||||
7 | 2
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM partitioned_table_1_5 ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
2 | 12
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM partitioned_table_6_10 ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
7 | 2
|
||||
(1 row)
|
||||
|
||||
SELECT undistribute_table('partitioned_table');
|
||||
NOTICE: Undistributing the partitions of undistribute_table.partitioned_table
|
||||
NOTICE: Creating a new local table for undistribute_table.partitioned_table_1_5
|
||||
NOTICE: Moving the data of undistribute_table.partitioned_table_1_5
|
||||
NOTICE: Dropping the old undistribute_table.partitioned_table_1_5
|
||||
NOTICE: Renaming the new table to undistribute_table.partitioned_table_1_5
|
||||
NOTICE: Creating a new local table for undistribute_table.partitioned_table_6_10
|
||||
NOTICE: Moving the data of undistribute_table.partitioned_table_6_10
|
||||
NOTICE: Dropping the old undistribute_table.partitioned_table_6_10
|
||||
NOTICE: Renaming the new table to undistribute_table.partitioned_table_6_10
|
||||
NOTICE: Creating a new local table for undistribute_table.partitioned_table
|
||||
NOTICE: Moving the data of undistribute_table.partitioned_table
|
||||
NOTICE: Dropping the old undistribute_table.partitioned_table
|
||||
NOTICE: Renaming the new table to undistribute_table.partitioned_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE 'partitioned\_table%' ORDER BY 1;
|
||||
logicalrelid
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'partitioned\_table%'$$);
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,0)
|
||||
(localhost,57638,t,0)
|
||||
(2 rows)
|
||||
|
||||
SELECT inhrelid::regclass FROM pg_catalog.pg_inherits WHERE inhparent = 'partitioned_table'::regclass ORDER BY 1;
|
||||
inhrelid
|
||||
---------------------------------------------------------------------
|
||||
partitioned_table_1_5
|
||||
partitioned_table_6_10
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM partitioned_table ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
2 | 12
|
||||
7 | 2
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM partitioned_table_1_5 ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
2 | 12
|
||||
(1 row)
|
||||
|
||||
SELECT * FROM partitioned_table_6_10 ORDER BY 1, 2;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
7 | 2
|
||||
(1 row)
|
||||
|
||||
DROP TABLE partitioned_table;
|
||||
-- test tables with sequences
|
||||
CREATE TABLE seq_table (id INT, a bigserial);
|
||||
SELECT create_distributed_table('seq_table', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT objid::regclass AS "Sequence Name" FROM pg_depend WHERE refobjid = 'seq_table'::regclass::oid AND classid = 'pg_class'::regclass::oid;
|
||||
Sequence Name
|
||||
---------------------------------------------------------------------
|
||||
seq_table_a_seq
|
||||
(1 row)
|
||||
|
||||
INSERT INTO seq_table (id) VALUES (5), (9), (3);
|
||||
SELECT * FROM seq_table ORDER BY a;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
5 | 1
|
||||
9 | 2
|
||||
3 | 3
|
||||
(3 rows)
|
||||
|
||||
SELECT undistribute_table('seq_table');
|
||||
NOTICE: Creating a new local table for undistribute_table.seq_table
|
||||
NOTICE: Moving the data of undistribute_table.seq_table
|
||||
NOTICE: Dropping the old undistribute_table.seq_table
|
||||
NOTICE: Renaming the new table to undistribute_table.seq_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT objid::regclass AS "Sequence Name" FROM pg_depend WHERE refobjid = 'seq_table'::regclass::oid AND classid = 'pg_class'::regclass::oid;
|
||||
Sequence Name
|
||||
---------------------------------------------------------------------
|
||||
seq_table_a_seq
|
||||
(1 row)
|
||||
|
||||
INSERT INTO seq_table (id) VALUES (7), (1), (8);
|
||||
SELECT * FROM seq_table ORDER BY a;
|
||||
id | a
|
||||
---------------------------------------------------------------------
|
||||
5 | 1
|
||||
9 | 2
|
||||
3 | 3
|
||||
7 | 4
|
||||
1 | 5
|
||||
8 | 6
|
||||
(6 rows)
|
||||
|
||||
DROP TABLE seq_table;
|
||||
--test tables with views
|
||||
CREATE TABLE view_table (a int, b int, c int);
|
||||
SELECT create_distributed_table('view_table', 'a');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO view_table VALUES (1, 2, 3), (2, 4, 6), (3, 6, 9);
|
||||
CREATE SCHEMA another_schema;
|
||||
CREATE VIEW undis_view1 AS SELECT a, b FROM view_table;
|
||||
CREATE VIEW undis_view2 AS SELECT a, c FROM view_table;
|
||||
CREATE VIEW another_schema.undis_view3 AS SELECT b, c FROM undis_view1 JOIN undis_view2 ON undis_view1.a = undis_view2.a;
|
||||
SELECT schemaname, viewname, viewowner, definition FROM pg_views WHERE viewname LIKE 'undis\_view%' ORDER BY viewname;
|
||||
schemaname | viewname | viewowner | definition
|
||||
---------------------------------------------------------------------
|
||||
undistribute_table | undis_view1 | postgres | SELECT view_table.a, +
|
||||
| | | view_table.b +
|
||||
| | | FROM view_table;
|
||||
undistribute_table | undis_view2 | postgres | SELECT view_table.a, +
|
||||
| | | view_table.c +
|
||||
| | | FROM view_table;
|
||||
another_schema | undis_view3 | postgres | SELECT undis_view1.b, +
|
||||
| | | undis_view2.c +
|
||||
| | | FROM (undis_view1 +
|
||||
| | | JOIN undis_view2 ON ((undis_view1.a = undis_view2.a)));
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM another_schema.undis_view3 ORDER BY 1, 2;
|
||||
b | c
|
||||
---------------------------------------------------------------------
|
||||
2 | 3
|
||||
4 | 6
|
||||
6 | 9
|
||||
(3 rows)
|
||||
|
||||
SELECT undistribute_table('view_table');
|
||||
NOTICE: Creating a new local table for undistribute_table.view_table
|
||||
NOTICE: Moving the data of undistribute_table.view_table
|
||||
NOTICE: Dropping the old undistribute_table.view_table
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to view undis_view1
|
||||
drop cascades to view undis_view2
|
||||
drop cascades to view another_schema.undis_view3
|
||||
CONTEXT: SQL statement "DROP TABLE undistribute_table.view_table CASCADE"
|
||||
NOTICE: Renaming the new table to undistribute_table.view_table
|
||||
undistribute_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT schemaname, viewname, viewowner, definition FROM pg_views WHERE viewname LIKE 'undis\_view%' ORDER BY viewname;
|
||||
schemaname | viewname | viewowner | definition
|
||||
---------------------------------------------------------------------
|
||||
undistribute_table | undis_view1 | postgres | SELECT view_table.a, +
|
||||
| | | view_table.b +
|
||||
| | | FROM view_table;
|
||||
undistribute_table | undis_view2 | postgres | SELECT view_table.a, +
|
||||
| | | view_table.c +
|
||||
| | | FROM view_table;
|
||||
another_schema | undis_view3 | postgres | SELECT undis_view1.b, +
|
||||
| | | undis_view2.c +
|
||||
| | | FROM (undis_view1 +
|
||||
| | | JOIN undis_view2 ON ((undis_view1.a = undis_view2.a)));
|
||||
(3 rows)
|
||||
|
||||
SELECT * FROM another_schema.undis_view3 ORDER BY 1, 2;
|
||||
b | c
|
||||
---------------------------------------------------------------------
|
||||
2 | 3
|
||||
4 | 6
|
||||
6 | 9
|
||||
(3 rows)
|
||||
|
||||
DROP TABLE view_table CASCADE;
|
||||
NOTICE: drop cascades to 3 other objects
|
||||
DETAIL: drop cascades to view undis_view1
|
||||
drop cascades to view undis_view2
|
||||
drop cascades to view another_schema.undis_view3
|
||||
DROP SCHEMA undistribute_table, another_schema CASCADE;
|
|
@ -63,6 +63,7 @@ test: isolation_ref2ref_foreign_keys
|
|||
test: isolation_multiuser_locking
|
||||
test: shared_connection_waits
|
||||
test: isolation_cancellation
|
||||
test: isolation_undistribute_table
|
||||
|
||||
# MX tests
|
||||
test: isolation_reference_on_mx
|
||||
|
|
|
@ -89,7 +89,7 @@ test: multi_deparse_shard_query multi_distributed_transaction_id intermediate_re
|
|||
test: multi_explain hyperscale_tutorial partitioned_intermediate_results distributed_intermediate_results multi_real_time_transaction
|
||||
test: multi_basic_queries multi_complex_expressions multi_subquery multi_subquery_complex_queries multi_subquery_behavioral_analytics
|
||||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_view multi_sql_function multi_prepare_sql
|
||||
test: sql_procedure multi_function_in_join row_types materialized_view
|
||||
test: sql_procedure multi_function_in_join row_types materialized_view undistribute_table
|
||||
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc statement_cancel_error_message
|
||||
test: multi_agg_distinct multi_agg_approximate_distinct multi_limit_clause_approximate multi_outer_join_reference multi_single_relation_subquery multi_prepare_plsql set_role_in_transaction
|
||||
|
|
|
@ -64,7 +64,7 @@ test: multi_basic_queries multi_complex_expressions multi_subquery_complex_queri
|
|||
test: multi_subquery_complex_reference_clause multi_subquery_window_functions multi_sql_function
|
||||
test: multi_function_in_join row_types
|
||||
test: multi_subquery_in_where_reference_clause full_join adaptive_executor propagate_set_commands
|
||||
test: rollback_to_savepoint
|
||||
test: rollback_to_savepoint insert_select_into_local_table undistribute_table
|
||||
test: multi_subquery_union multi_subquery_in_where_clause multi_subquery_misc
|
||||
test: multi_limit_clause_approximate multi_single_relation_subquery set_role_in_transaction
|
||||
test: multi_select_for_update
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
setup
|
||||
{
|
||||
CREATE TABLE dist_table(a INT, b INT);
|
||||
SELECT create_distributed_table('dist_table', 'a');
|
||||
INSERT INTO dist_table VALUES (1, 2), (3, 4), (5, 6);
|
||||
}
|
||||
|
||||
teardown
|
||||
{
|
||||
DROP TABLE IF EXISTS dist_table;
|
||||
}
|
||||
|
||||
session "s1"
|
||||
|
||||
step "s1-begin"
|
||||
{
|
||||
BEGIN;
|
||||
}
|
||||
|
||||
step "s1-undistribute"
|
||||
{
|
||||
SELECT undistribute_table('dist_table');
|
||||
}
|
||||
|
||||
step "s1-commit"
|
||||
{
|
||||
COMMIT;
|
||||
}
|
||||
|
||||
session "s2"
|
||||
|
||||
step "s2-undistribute"
|
||||
{
|
||||
SELECT undistribute_table('dist_table');
|
||||
}
|
||||
|
||||
step "s2-insert"
|
||||
{
|
||||
INSERT INTO dist_table VALUES (7, 8), (9, 10);
|
||||
}
|
||||
|
||||
step "s2-select"
|
||||
{
|
||||
SELECT * FROM dist_table ORDER BY 1, 2;
|
||||
}
|
||||
|
||||
step "s2-insert-select"
|
||||
{
|
||||
INSERT INTO dist_table SELECT * FROM dist_table;
|
||||
}
|
||||
|
||||
step "s2-delete"
|
||||
{
|
||||
DELETE FROM dist_table WHERE a = 3;
|
||||
}
|
||||
|
||||
step "s2-copy"
|
||||
{
|
||||
COPY dist_table FROM PROGRAM 'echo 11, 12 && echo 13, 14' WITH CSV;
|
||||
}
|
||||
|
||||
step "s2-drop"
|
||||
{
|
||||
DROP TABLE dist_table;
|
||||
}
|
||||
|
||||
step "s2-truncate"
|
||||
{
|
||||
TRUNCATE dist_table;
|
||||
}
|
||||
|
||||
step "s2-select-for-update"
|
||||
{
|
||||
SELECT * FROM dist_table WHERE a = 5 FOR UPDATE;
|
||||
}
|
||||
|
||||
step "s2-create-index-concurrently"
|
||||
{
|
||||
CREATE INDEX CONCURRENTLY idx ON dist_table (a);
|
||||
}
|
||||
|
||||
|
||||
permutation "s1-begin" "s1-undistribute" "s2-undistribute" "s1-commit"
|
||||
|
||||
permutation "s1-begin" "s1-undistribute" "s2-select" "s1-commit"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-insert" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-insert-select" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-delete" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-copy" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-drop" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-truncate" "s1-commit" "s2-select"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-select-for-update" "s1-commit"
|
||||
permutation "s1-begin" "s1-undistribute" "s2-create-index-concurrently" "s1-commit"
|
|
@ -2019,6 +2019,8 @@ SELECT user_id FROM dist_table_with_sequence WHERE user_id = 1;
|
|||
|
||||
SELECT * FROM dist_table_with_sequence ORDER BY user_id, value_1;
|
||||
|
||||
DROP TABLE dist_table_with_sequence;
|
||||
|
||||
-- Select from distributed table into reference table
|
||||
CREATE TABLE ref_table (user_id serial, value_1 int);
|
||||
SELECT create_reference_table('ref_table');
|
||||
|
|
|
@ -0,0 +1,115 @@
|
|||
CREATE SCHEMA undistribute_table;
|
||||
SET search_path TO undistribute_table;
|
||||
SET citus.shard_count TO 4;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
||||
CREATE TABLE dist_table (id INT, a INT, b TEXT);
|
||||
SELECT create_distributed_table('dist_table', 'id');
|
||||
INSERT INTO dist_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (1, 3, 'abc');
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid = 'dist_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'dist\_table\_%'$$);
|
||||
SELECT * FROM dist_table ORDER BY 1, 2, 3;
|
||||
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid = 'dist_table'::regclass;
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'dist\_table\_%'$$);
|
||||
SELECT * FROM dist_table ORDER BY 1, 2, 3;
|
||||
|
||||
DROP TABLE dist_table;
|
||||
|
||||
-- test indexes
|
||||
CREATE TABLE dist_table (id INT, a INT, b TEXT);
|
||||
SELECT create_distributed_table('dist_table', 'id');
|
||||
INSERT INTO dist_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (1, 3, 'abc');
|
||||
|
||||
CREATE INDEX index1 ON dist_table (id);
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'dist_table';
|
||||
|
||||
SELECT undistribute_table('dist_table');
|
||||
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'dist_table';
|
||||
|
||||
DROP TABLE dist_table;
|
||||
|
||||
-- test tables with references
|
||||
-- we expect errors
|
||||
CREATE TABLE referenced_table (id INT PRIMARY KEY, a INT, b TEXT);
|
||||
SELECT create_distributed_table('referenced_table', 'id');
|
||||
INSERT INTO referenced_table VALUES (1, 2, 'abc'), (2, 3, 'abcd'), (4, 3, 'abc');
|
||||
|
||||
CREATE TABLE referencing_table (id INT REFERENCES referenced_table (id), a INT, b TEXT);
|
||||
SELECT create_distributed_table('referencing_table', 'id');
|
||||
INSERT INTO referencing_table VALUES (4, 6, 'cba'), (1, 1, 'dcba'), (2, 3, 'aaa');
|
||||
|
||||
SELECT undistribute_table('referenced_table');
|
||||
SELECT undistribute_table('referencing_table');
|
||||
|
||||
DROP TABLE referenced_table, referencing_table;
|
||||
|
||||
-- test partitioned tables
|
||||
CREATE TABLE partitioned_table (id INT, a INT) PARTITION BY RANGE (id);
|
||||
CREATE TABLE partitioned_table_1_5 PARTITION OF partitioned_table FOR VALUES FROM (1) TO (5);
|
||||
CREATE TABLE partitioned_table_6_10 PARTITION OF partitioned_table FOR VALUES FROM (6) TO (10);
|
||||
SELECT create_distributed_table('partitioned_table', 'id');
|
||||
INSERT INTO partitioned_table VALUES (2, 12), (7, 2);
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE 'partitioned\_table%' ORDER BY 1;
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'partitioned\_table%'$$);
|
||||
SELECT inhrelid::regclass FROM pg_catalog.pg_inherits WHERE inhparent = 'partitioned_table'::regclass ORDER BY 1;
|
||||
SELECT * FROM partitioned_table ORDER BY 1, 2;
|
||||
SELECT * FROM partitioned_table_1_5 ORDER BY 1, 2;
|
||||
SELECT * FROM partitioned_table_6_10 ORDER BY 1, 2;
|
||||
|
||||
SELECT undistribute_table('partitioned_table');
|
||||
|
||||
SELECT logicalrelid FROM pg_dist_partition WHERE logicalrelid::regclass::text LIKE 'partitioned\_table%' ORDER BY 1;
|
||||
SELECT run_command_on_workers($$SELECT COUNT(*) FROM pg_catalog.pg_class WHERE relname LIKE 'partitioned\_table%'$$);
|
||||
SELECT inhrelid::regclass FROM pg_catalog.pg_inherits WHERE inhparent = 'partitioned_table'::regclass ORDER BY 1;
|
||||
SELECT * FROM partitioned_table ORDER BY 1, 2;
|
||||
SELECT * FROM partitioned_table_1_5 ORDER BY 1, 2;
|
||||
SELECT * FROM partitioned_table_6_10 ORDER BY 1, 2;
|
||||
|
||||
DROP TABLE partitioned_table;
|
||||
|
||||
|
||||
-- test tables with sequences
|
||||
CREATE TABLE seq_table (id INT, a bigserial);
|
||||
SELECT create_distributed_table('seq_table', 'id');
|
||||
|
||||
SELECT objid::regclass AS "Sequence Name" FROM pg_depend WHERE refobjid = 'seq_table'::regclass::oid AND classid = 'pg_class'::regclass::oid;
|
||||
INSERT INTO seq_table (id) VALUES (5), (9), (3);
|
||||
SELECT * FROM seq_table ORDER BY a;
|
||||
|
||||
SELECT undistribute_table('seq_table');
|
||||
|
||||
SELECT objid::regclass AS "Sequence Name" FROM pg_depend WHERE refobjid = 'seq_table'::regclass::oid AND classid = 'pg_class'::regclass::oid;
|
||||
INSERT INTO seq_table (id) VALUES (7), (1), (8);
|
||||
SELECT * FROM seq_table ORDER BY a;
|
||||
|
||||
DROP TABLE seq_table;
|
||||
|
||||
|
||||
--test tables with views
|
||||
CREATE TABLE view_table (a int, b int, c int);
|
||||
SELECT create_distributed_table('view_table', 'a');
|
||||
INSERT INTO view_table VALUES (1, 2, 3), (2, 4, 6), (3, 6, 9);
|
||||
|
||||
CREATE SCHEMA another_schema;
|
||||
|
||||
CREATE VIEW undis_view1 AS SELECT a, b FROM view_table;
|
||||
CREATE VIEW undis_view2 AS SELECT a, c FROM view_table;
|
||||
CREATE VIEW another_schema.undis_view3 AS SELECT b, c FROM undis_view1 JOIN undis_view2 ON undis_view1.a = undis_view2.a;
|
||||
|
||||
SELECT schemaname, viewname, viewowner, definition FROM pg_views WHERE viewname LIKE 'undis\_view%' ORDER BY viewname;
|
||||
SELECT * FROM another_schema.undis_view3 ORDER BY 1, 2;
|
||||
|
||||
SELECT undistribute_table('view_table');
|
||||
|
||||
SELECT schemaname, viewname, viewowner, definition FROM pg_views WHERE viewname LIKE 'undis\_view%' ORDER BY viewname;
|
||||
SELECT * FROM another_schema.undis_view3 ORDER BY 1, 2;
|
||||
|
||||
DROP TABLE view_table CASCADE;
|
||||
|
||||
DROP SCHEMA undistribute_table, another_schema CASCADE;
|
Loading…
Reference in New Issue