Make create_distributed_table transactional

pull/1436/head
velioglu 2017-05-29 11:39:59 +03:00
parent fd72cca6c8
commit 6ea15fbb25
22 changed files with 862 additions and 245 deletions

View File

@ -41,6 +41,7 @@
#include "distributed/pg_dist_colocation.h" #include "distributed/pg_dist_colocation.h"
#include "distributed/pg_dist_partition.h" #include "distributed/pg_dist_partition.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_protocol.h" #include "distributed/worker_protocol.h"
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "executor/executor.h" #include "executor/executor.h"
@ -71,7 +72,7 @@ int ReplicationModel = REPLICATION_MODEL_COORDINATOR;
static void CreateReferenceTable(Oid distributedRelationId); static void CreateReferenceTable(Oid distributedRelationId);
static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName, static void ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, char replicationModel, char distributionMethod, char replicationModel,
uint32 colocationId, bool requireEmpty); uint32 colocationId);
static char LookupDistributionMethod(Oid distributionMethodOid); static char LookupDistributionMethod(Oid distributionMethodOid);
static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId, static Oid SupportFunctionForColumn(Var *partitionColumn, Oid accessMethodId,
int16 supportFunctionNumber); int16 supportFunctionNumber);
@ -86,6 +87,11 @@ static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
static bool RelationUsesIdentityColumns(TupleDesc relationDesc); static bool RelationUsesIdentityColumns(TupleDesc relationDesc);
#endif #endif
static void EnsureSchemaExistsOnAllNodes(Oid relationId);
static void EnsureLocalTableEmpty(Oid relationId);
static void EnsureTableNotDistributed(Oid relationId);
static void EnsureIsTableId(Oid relationId);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table);
@ -108,8 +114,9 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char *distributionColumnName = text_to_cstring(distributionColumnText); char *distributionColumnName = text_to_cstring(distributionColumnText);
char distributionMethod = LookupDistributionMethod(distributionMethodOid); char distributionMethod = LookupDistributionMethod(distributionMethodOid);
bool requireEmpty = true;
EnsureTableNotDistributed(distributedRelationId);
EnsureLocalTableEmpty(distributedRelationId);
EnsureCoordinator(); EnsureCoordinator();
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
@ -125,7 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(distributedRelationId, distributionColumnName, ConvertToDistributedTable(distributedRelationId, distributionColumnName,
distributionMethod, REPLICATION_MODEL_COORDINATOR, distributionMethod, REPLICATION_MODEL_COORDINATOR,
INVALID_COLOCATION_ID, requireEmpty); INVALID_COLOCATION_ID);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -182,7 +189,8 @@ create_distributed_table(PG_FUNCTION_ARGS)
/* if distribution method is not hash, just create partition metadata */ /* if distribution method is not hash, just create partition metadata */
if (distributionMethod != DISTRIBUTE_BY_HASH) if (distributionMethod != DISTRIBUTE_BY_HASH)
{ {
bool requireEmpty = true; EnsureTableNotDistributed(relationId);
EnsureLocalTableEmpty(relationId);
if (ReplicationModel != REPLICATION_MODEL_COORDINATOR) if (ReplicationModel != REPLICATION_MODEL_COORDINATOR)
{ {
@ -193,7 +201,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
ConvertToDistributedTable(relationId, distributionColumnName, ConvertToDistributedTable(relationId, distributionColumnName,
distributionMethod, REPLICATION_MODEL_COORDINATOR, distributionMethod, REPLICATION_MODEL_COORDINATOR,
INVALID_COLOCATION_ID, requireEmpty); INVALID_COLOCATION_ID);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -239,7 +247,6 @@ CreateReferenceTable(Oid relationId)
List *workerNodeList = NIL; List *workerNodeList = NIL;
int replicationFactor = 0; int replicationFactor = 0;
char *distributionColumnName = NULL; char *distributionColumnName = NULL;
bool requireEmpty = true;
char relationKind = 0; char relationKind = 0;
EnsureCoordinator(); EnsureCoordinator();
@ -262,15 +269,19 @@ CreateReferenceTable(Oid relationId)
relationKind = get_rel_relkind(relationId); relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION) if (relationKind == RELKIND_RELATION)
{ {
requireEmpty = false; EnsureTableNotDistributed(relationId);
}
else
{
EnsureTableNotDistributed(relationId);
EnsureLocalTableEmpty(relationId);
} }
colocationId = CreateReferenceTableColocationId(); colocationId = CreateReferenceTableColocationId();
/* first, convert the relation into distributed relation */ /* first, convert the relation into distributed relation */
ConvertToDistributedTable(relationId, distributionColumnName, ConvertToDistributedTable(relationId, distributionColumnName,
DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId, DISTRIBUTE_BY_NONE, REPLICATION_MODEL_2PC, colocationId);
requireEmpty);
/* now, create the single shard replicated to all nodes */ /* now, create the single shard replicated to all nodes */
CreateReferenceTableShard(relationId); CreateReferenceTableShard(relationId);
@ -288,19 +299,17 @@ CreateReferenceTable(Oid relationId)
/* /*
* ConvertToDistributedTable converts the given regular PostgreSQL table into a * ConvertToDistributedTable converts the given regular PostgreSQL table into a
* distributed table. First, it checks if the given table can be distributed, * distributed table. First, it checks if the given table can be distributed,
* then it creates related tuple in pg_dist_partition. If requireEmpty is true, * then it creates related tuple in pg_dist_partition.
* this function errors out when presented with a relation containing rows.
* *
* XXX: We should perform more checks here to see if this table is fit for * XXX: We should perform more checks here to see if this table is fit for
* partitioning. At a minimum, we should validate the following: (i) this node * partitioning. At a minimum, we should validate the following: (i) this node
* runs as the master node, (ii) table does not make use of the inheritance * runs as the master node, (ii) table does not make use of the inheritance
* mechanism, (iii) table does not own columns that are sequences, and (iv) * mechanism and (iii) table does not have collated columns.
* table does not have collated columns.
*/ */
static void static void
ConvertToDistributedTable(Oid relationId, char *distributionColumnName, ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
char distributionMethod, char replicationModel, char distributionMethod, char replicationModel,
uint32 colocationId, bool requireEmpty) uint32 colocationId)
{ {
Relation relation = NULL; Relation relation = NULL;
TupleDesc relationDesc = NULL; TupleDesc relationDesc = NULL;
@ -367,17 +376,6 @@ ConvertToDistributedTable(Oid relationId, char *distributionColumnName,
} }
#endif #endif
/* check that table is empty if that is required */
if (requireEmpty && !LocalTableEmpty(relationId))
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"",
relationName),
errdetail("Relation \"%s\" contains data.",
relationName),
errhint("Empty your table before distributing it.")));
}
/* /*
* Distribution column returns NULL for reference tables, * Distribution column returns NULL for reference tables,
* but it is not used below for reference tables. * but it is not used below for reference tables.
@ -618,7 +616,7 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
uint32 colocationId = INVALID_COLOCATION_ID; uint32 colocationId = INVALID_COLOCATION_ID;
Oid sourceRelationId = InvalidOid; Oid sourceRelationId = InvalidOid;
Oid distributionColumnType = InvalidOid; Oid distributionColumnType = InvalidOid;
bool requireEmpty = true; bool useExclusiveConnection = false;
char relationKind = 0; char relationKind = 0;
/* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */ /* get an access lock on the relation to prevent DROP TABLE and ALTER TABLE */
@ -666,12 +664,26 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
relationKind = get_rel_relkind(relationId); relationKind = get_rel_relkind(relationId);
if (relationKind == RELKIND_RELATION) if (relationKind == RELKIND_RELATION)
{ {
requireEmpty = false; EnsureTableNotDistributed(relationId);
useExclusiveConnection = IsTransactionBlock() || !LocalTableEmpty(relationId);
}
else
{
EnsureTableNotDistributed(relationId);
EnsureLocalTableEmpty(relationId);
} }
/* create distributed table metadata */ /* create distributed table metadata */
ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH, ConvertToDistributedTable(relationId, distributionColumnName, DISTRIBUTE_BY_HASH,
ReplicationModel, colocationId, requireEmpty); ReplicationModel, colocationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
*/
EnsureSchemaExistsOnAllNodes(relationId);
/* create shards */ /* create shards */
if (sourceRelationId != InvalidOid) if (sourceRelationId != InvalidOid)
@ -681,11 +693,12 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
CheckDistributionColumnType(sourceRelationId, relationId); CheckDistributionColumnType(sourceRelationId, relationId);
CreateColocatedShards(relationId, sourceRelationId); CreateColocatedShards(relationId, sourceRelationId, useExclusiveConnection);
} }
else else
{ {
CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor); CreateShardsWithRoundRobinPolicy(relationId, shardCount, replicationFactor,
useExclusiveConnection);
} }
/* copy over data for regular relations */ /* copy over data for regular relations */
@ -699,6 +712,109 @@ CreateHashDistributedTable(Oid relationId, char *distributionColumnName,
} }
/*
* EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user
* and creates the schema of the given relationId. The function errors out if the
* command cannot be executed in any of the worker nodes.
*/
static void
EnsureSchemaExistsOnAllNodes(Oid relationId)
{
List *workerNodeList = ActiveWorkerNodeList();
ListCell *workerNodeCell = NULL;
StringInfo applySchemaCreationDDL = makeStringInfo();
Oid schemaId = get_rel_namespace(relationId);
const char *createSchemaDDL = CreateSchemaDDLCommand(schemaId);
uint64 connectionFlag = FORCE_NEW_CONNECTION;
if (createSchemaDDL == NULL)
{
return;
}
appendStringInfo(applySchemaCreationDDL, "%s", createSchemaDDL);
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, NULL,
NULL);
ExecuteCriticalRemoteCommand(connection, applySchemaCreationDDL->data);
}
}
/*
* EnsureLocalTableEmpty errors out if the local table is not empty.
*/
static void
EnsureLocalTableEmpty(Oid relationId)
{
bool localTableEmpty = false;
char *relationName = get_rel_name(relationId);
localTableEmpty = LocalTableEmpty(relationId);
if (!localTableEmpty)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("cannot distribute relation \"%s\"", relationName),
errdetail("Relation \"%s\" contains data.", relationName),
errhint("Empty your table before distributing it.")));
}
}
/*
* EnsureIsTableId errors out if the id is not belong to a regular of foreign table.
*/
static void
EnsureIsTableId(Oid relationId)
{
Relation relation = relation_open(relationId, AccessShareLock);
char *relationName = get_rel_name(relationId);
char relationKind = 0;
/* verify target relation is either regular or foreign table */
relationKind = relation->rd_rel->relkind;
if (relationKind != RELKIND_RELATION && relationKind != RELKIND_FOREIGN_TABLE)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("%s is not a regular or foreign table",
relationName)));
}
relation_close(relation, NoLock);
}
/*
* EnsureTableNotDistributed errors out if the relationId doesn't belong to regular or foreign table
* or the table is distributed.
*/
static void
EnsureTableNotDistributed(Oid relationId)
{
char *relationName = get_rel_name(relationId);
bool isDistributedTable = false;
EnsureIsTableId(relationId);
isDistributedTable = IsDistributedTable(relationId);
if (isDistributedTable)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_TABLE_DEFINITION),
errmsg("table \"%s\" is already distributed",
relationName)));
}
}
/* /*
* ColumnType returns the column type of the given column. * ColumnType returns the column type of the given column.
*/ */

View File

@ -36,6 +36,7 @@
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "distributed/transaction_management.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -68,10 +69,14 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
Oid distributedTableId = ResolveRelationId(tableNameText); Oid distributedTableId = ResolveRelationId(tableNameText);
/* do not add any data */
bool useExclusiveConnections = false;
EnsureCoordinator(); EnsureCoordinator();
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor,
useExclusiveConnections);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -79,28 +84,27 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
/* /*
* CreateShardsWithRoundRobinPolicy creates empty shards for the given table * CreateShardsWithRoundRobinPolicy creates empty shards for the given table
* based on the specified number of initial shards. The function first gets a * based on the specified number of initial shards. The function first updates
* list of candidate nodes and issues DDL commands on the nodes to create empty * metadata on the coordinator node to make this shard (and its placements)
* shard placements on those nodes. The function then updates metadata on the * visible. Note that the function assumes the table is hash partitioned and
* master node to make this shard (and its placements) visible. Note that the * calculates the min/max hash token ranges for each shard, giving them an equal
* function assumes the table is hash partitioned and calculates the min/max * split of the hash space. Finally, function creates empty shard placements on
* hash token ranges for each shard, giving them an equal split of the hash space. * worker nodes.
*/ */
void void
CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor) int32 replicationFactor, bool useExclusiveConnections)
{ {
char *relationOwner = NULL;
char shardStorageType = 0; char shardStorageType = 0;
List *workerNodeList = NIL; List *workerNodeList = NIL;
List *ddlCommandList = NIL;
int32 workerNodeCount = 0; int32 workerNodeCount = 0;
uint32 placementAttemptCount = 0; uint32 placementAttemptCount = 0;
uint64 hashTokenIncrement = 0; uint64 hashTokenIncrement = 0;
List *existingShardList = NIL; List *existingShardList = NIL;
int64 shardIndex = 0; int64 shardIndex = 0;
bool includeSequenceDefaults = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
bool colocatedShard = false;
List *insertedShardPlacements = NIL;
/* make sure table is hash partitioned */ /* make sure table is hash partitioned */
CheckHashPartitionedTable(distributedTableId); CheckHashPartitionedTable(distributedTableId);
@ -116,8 +120,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* we plan to add shards: get an exclusive lock on relation oid */ /* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(distributedTableId, ExclusiveLock); LockRelationOid(distributedTableId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);
/* validate that shards haven't already been created for this table */ /* validate that shards haven't already been created for this table */
existingShardList = LoadShardList(distributedTableId); existingShardList = LoadShardList(distributedTableId);
if (existingShardList != NIL) if (existingShardList != NIL)
@ -167,9 +169,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
/* make sure we don't process cancel signals until all shards are created */ /* make sure we don't process cancel signals until all shards are created */
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
/* retrieve the DDL commands for the table */
ddlCommandList = GetTableDDLEvents(distributedTableId, includeSequenceDefaults);
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);
if (replicationFactor > workerNodeCount) if (replicationFactor > workerNodeCount)
{ {
@ -200,6 +199,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement); int32 shardMinHashToken = INT32_MIN + (shardIndex * hashTokenIncrement);
int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1); int32 shardMaxHashToken = shardMinHashToken + (hashTokenIncrement - 1);
uint64 shardId = GetNextShardId(); uint64 shardId = GetNextShardId();
List *currentInsertedShardPlacements = NIL;
/* if we are at the last shard, make sure the max token value is INT_MAX */ /* if we are at the last shard, make sure the max token value is INT_MAX */
if (shardIndex == (shardCount - 1)) if (shardIndex == (shardCount - 1))
@ -219,13 +219,21 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
*/ */
LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardDistributionMetadata(shardId, ExclusiveLock);
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
workerNodeList, roundRobinNodeIndex, replicationFactor);
InsertShardRow(distributedTableId, shardId, shardStorageType, InsertShardRow(distributedTableId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText); minHashTokenText, maxHashTokenText);
currentInsertedShardPlacements = InsertShardPlacementRows(distributedTableId,
shardId,
workerNodeList,
roundRobinNodeIndex,
replicationFactor);
insertedShardPlacements = list_concat(insertedShardPlacements,
currentInsertedShardPlacements);
} }
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnections, colocatedShard);
if (QueryCancelPending) if (QueryCancelPending)
{ {
ereport(WARNING, (errmsg("cancel requests are ignored during shard creation"))); ereport(WARNING, (errmsg("cancel requests are ignored during shard creation")));
@ -241,16 +249,15 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
* the source relation. * the source relation.
*/ */
void void
CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId) CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId, bool
useExclusiveConnections)
{ {
char *targetTableRelationOwner = NULL;
char targetShardStorageType = 0; char targetShardStorageType = 0;
List *existingShardList = NIL; List *existingShardList = NIL;
List *sourceShardIntervalList = NIL; List *sourceShardIntervalList = NIL;
List *targetTableDDLEvents = NIL;
List *targetTableForeignConstraintCommands = NIL;
ListCell *sourceShardCell = NULL; ListCell *sourceShardCell = NULL;
bool includeSequenceDefaults = false; bool colocatedShard = true;
List *insertedShardPlacements = NIL;
/* make sure that tables are hash partitioned */ /* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId); CheckHashPartitionedTable(targetRelationId);
@ -284,10 +291,6 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
targetRelationName))); targetRelationName)));
} }
targetTableRelationOwner = TableOwner(targetRelationId);
targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults);
targetTableForeignConstraintCommands = GetTableForeignConstraintCommands(
targetRelationId);
targetShardStorageType = ShardStorageType(targetRelationId); targetShardStorageType = ShardStorageType(targetRelationId);
foreach(sourceShardCell, sourceShardIntervalList) foreach(sourceShardCell, sourceShardIntervalList)
@ -296,47 +299,40 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
uint64 sourceShardId = sourceShardInterval->shardId; uint64 sourceShardId = sourceShardInterval->shardId;
uint64 newShardId = GetNextShardId(); uint64 newShardId = GetNextShardId();
ListCell *sourceShardPlacementCell = NULL; ListCell *sourceShardPlacementCell = NULL;
int sourceShardIndex = ShardIndex(sourceShardInterval);
int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue); int32 shardMinValue = DatumGetInt32(sourceShardInterval->minValue);
int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue); int32 shardMaxValue = DatumGetInt32(sourceShardInterval->maxValue);
text *shardMinValueText = IntegerToText(shardMinValue); text *shardMinValueText = IntegerToText(shardMinValue);
text *shardMaxValueText = IntegerToText(shardMaxValue); text *shardMaxValueText = IntegerToText(shardMaxValue);
List *sourceShardPlacementList = ShardPlacementList(sourceShardId); List *sourceShardPlacementList = ShardPlacementList(sourceShardId);
InsertShardRow(targetRelationId, newShardId, targetShardStorageType,
shardMinValueText, shardMaxValueText);
foreach(sourceShardPlacementCell, sourceShardPlacementList) foreach(sourceShardPlacementCell, sourceShardPlacementList)
{ {
ShardPlacement *sourcePlacement = ShardPlacement *sourcePlacement =
(ShardPlacement *) lfirst(sourceShardPlacementCell); (ShardPlacement *) lfirst(sourceShardPlacementCell);
char *sourceNodeName = sourcePlacement->nodeName; uint32 groupId = sourcePlacement->groupId;
int32 sourceNodePort = sourcePlacement->nodePort;
bool created = WorkerCreateShard(targetRelationId, sourceNodeName,
sourceNodePort, sourceShardIndex, newShardId,
targetTableRelationOwner,
targetTableDDLEvents,
targetTableForeignConstraintCommands);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0; const uint64 shardSize = 0;
uint64 shardPlacementId = 0;
ShardPlacement *shardPlacement = NULL;
InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID, shardState, /*
shardSize, sourcePlacement->groupId); * Optimistically add shard placement row the pg_dist_shard_placement, in case
} * of any error it will be roll-backed.
else */
{ shardPlacementId = InsertShardPlacementRow(newShardId, INVALID_PLACEMENT_ID,
char *targetRelationName = get_rel_name(targetRelationId); shardState, shardSize, groupId);
char *sourceRelationName = get_rel_name(sourceRelationId);
ereport(ERROR, (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), shardPlacement = LoadShardPlacement(newShardId, shardPlacementId);
errmsg("table \"%s\" could not be colocated with %s", insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
targetRelationName, sourceRelationName)));
} }
} }
InsertShardRow(targetRelationId, newShardId, targetShardStorageType, CreateShardsOnWorkers(targetRelationId, insertedShardPlacements,
shardMinValueText, shardMaxValueText); useExclusiveConnections, colocatedShard);
}
} }
@ -348,10 +344,8 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
void void
CreateReferenceTableShard(Oid distributedTableId) CreateReferenceTableShard(Oid distributedTableId)
{ {
char *relationOwner = NULL;
char shardStorageType = 0; char shardStorageType = 0;
List *workerNodeList = NIL; List *workerNodeList = NIL;
List *ddlCommandList = NIL;
int32 workerNodeCount = 0; int32 workerNodeCount = 0;
List *existingShardList = NIL; List *existingShardList = NIL;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
@ -359,7 +353,9 @@ CreateReferenceTableShard(Oid distributedTableId)
int replicationFactor = 0; int replicationFactor = 0;
text *shardMinValue = NULL; text *shardMinValue = NULL;
text *shardMaxValue = NULL; text *shardMaxValue = NULL;
bool includeSequenceDefaults = false; bool useExclusiveConnection = false;
bool colocatedShard = false;
List *insertedShardPlacements = NIL;
/* /*
* In contrast to append/range partitioned tables it makes more sense to * In contrast to append/range partitioned tables it makes more sense to
@ -372,8 +368,6 @@ CreateReferenceTableShard(Oid distributedTableId)
/* we plan to add shards: get an exclusive lock on relation oid */ /* we plan to add shards: get an exclusive lock on relation oid */
LockRelationOid(distributedTableId, ExclusiveLock); LockRelationOid(distributedTableId, ExclusiveLock);
relationOwner = TableOwner(distributedTableId);
/* set shard storage type according to relation type */ /* set shard storage type according to relation type */
shardStorageType = ShardStorageType(distributedTableId); shardStorageType = ShardStorageType(distributedTableId);
@ -394,9 +388,6 @@ CreateReferenceTableShard(Oid distributedTableId)
/* get the next shard id */ /* get the next shard id */
shardId = GetNextShardId(); shardId = GetNextShardId();
/* retrieve the DDL commands for the table */
ddlCommandList = GetTableDDLEvents(distributedTableId, includeSequenceDefaults);
/* set the replication factor equal to the number of worker nodes */ /* set the replication factor equal to the number of worker nodes */
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);
replicationFactor = workerNodeCount; replicationFactor = workerNodeCount;
@ -409,11 +400,15 @@ CreateReferenceTableShard(Oid distributedTableId)
*/ */
LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardDistributionMetadata(shardId, ExclusiveLock);
CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner,
workerNodeList, workerStartIndex, replicationFactor);
InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue, InsertShardRow(distributedTableId, shardId, shardStorageType, shardMinValue,
shardMaxValue); shardMaxValue);
insertedShardPlacements = InsertShardPlacementRows(distributedTableId, shardId,
workerNodeList, workerStartIndex,
replicationFactor);
CreateShardsOnWorkers(distributedTableId, insertedShardPlacements,
useExclusiveConnection, colocatedShard);
} }

View File

@ -847,9 +847,10 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
/* /*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts * InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. If placementId is * a new row with the given values into that system catalog. If placementId is
* INVALID_PLACEMENT_ID, a new placement id will be assigned. * INVALID_PLACEMENT_ID, a new placement id will be assigned.Then, returns the
* placement id of the added shard placement.
*/ */
void uint64
InsertShardPlacementRow(uint64 shardId, uint64 placementId, InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
uint32 groupId) uint32 groupId)
@ -886,6 +887,8 @@ InsertShardPlacementRow(uint64 shardId, uint64 placementId,
CommandCounterIncrement(); CommandCounterIncrement();
heap_close(pgDistPlacement, NoLock); heap_close(pgDistPlacement, NoLock);
return placementId;
} }

View File

@ -496,12 +496,6 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
{ {
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
char tableType = 0; char tableType = 0;
#if (PG_VERSION_NUM >= 100000)
List *sequenceIdlist = getOwnedSequences(relationId, InvalidAttrNumber);
#else
List *sequenceIdlist = getOwnedSequences(relationId);
#endif
ListCell *sequenceIdCell;
char *tableSchemaDef = NULL; char *tableSchemaDef = NULL;
char *tableColumnOptionsDef = NULL; char *tableColumnOptionsDef = NULL;
char *createSchemaCommand = NULL; char *createSchemaCommand = NULL;
@ -539,15 +533,6 @@ GetTableCreationCommands(Oid relationId, bool includeSequenceDefaults)
tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand); tableDDLEventList = lappend(tableDDLEventList, createSchemaCommand);
} }
/* create sequences if needed */
foreach(sequenceIdCell, sequenceIdlist)
{
Oid sequenceRelid = lfirst_oid(sequenceIdCell);
char *sequenceDef = pg_get_sequencedef_string(sequenceRelid);
tableDDLEventList = lappend(tableDDLEventList, sequenceDef);
}
/* fetch table schema and column option definitions */ /* fetch table schema and column option definitions */
tableSchemaDef = pg_get_tableschemadef_string(relationId, includeSequenceDefaults); tableSchemaDef = pg_get_tableschemadef_string(relationId, includeSequenceDefaults);
tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId); tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);

View File

@ -61,10 +61,9 @@ PG_FUNCTION_INFO_V1(master_update_shard_statistics);
/* /*
* master_create_empty_shard creates an empty shard for the given distributed * master_create_empty_shard creates an empty shard for the given distributed
* table. For this, the function first gets a list of candidate nodes, connects * table. The function first updates metadata on the coordinator node to make
* to these nodes, and issues DDL commands on the nodes to create empty shard * this shard visible. Then it creates empty shard on worker node and added
* placements. The function then updates metadata on the master node to make * shard placement row to metadata table.
* this shard (and its placements) visible.
*/ */
Datum Datum
master_create_empty_shard(PG_FUNCTION_ARGS) master_create_empty_shard(PG_FUNCTION_ARGS)
@ -73,7 +72,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText); char *relationName = text_to_cstring(relationNameText);
List *workerNodeList = NIL; List *workerNodeList = NIL;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
List *ddlEventList = NULL;
uint32 attemptableNodeCount = 0; uint32 attemptableNodeCount = 0;
uint32 liveNodeCount = 0; uint32 liveNodeCount = 0;
@ -86,9 +84,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
Oid relationId = ResolveRelationId(relationNameText); Oid relationId = ResolveRelationId(relationNameText);
char relationKind = get_rel_relkind(relationId); char relationKind = get_rel_relkind(relationId);
char *relationOwner = TableOwner(relationId);
char replicationModel = REPLICATION_MODEL_INVALID; char replicationModel = REPLICATION_MODEL_INVALID;
bool includeSequenceDefaults = false;
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
@ -140,9 +136,6 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
/* generate new and unique shardId from sequence */ /* generate new and unique shardId from sequence */
shardId = GetNextShardId(); shardId = GetNextShardId();
/* get table DDL commands to replay on the worker node */
ddlEventList = GetTableDDLEvents(relationId, includeSequenceDefaults);
/* if enough live nodes, add an extra candidate node as backup */ /* if enough live nodes, add an extra candidate node as backup */
attemptableNodeCount = ShardReplicationFactor; attemptableNodeCount = ShardReplicationFactor;
liveNodeCount = WorkerGetLiveNodeCount(); liveNodeCount = WorkerGetLiveNodeCount();
@ -184,11 +177,11 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
candidateNodeIndex++; candidateNodeIndex++;
} }
CreateShardPlacements(relationId, shardId, ddlEventList, relationOwner,
candidateNodeList, 0, ShardReplicationFactor);
InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue); InsertShardRow(relationId, shardId, storageType, nullMinValue, nullMaxValue);
CreateAppendDistributedShardPlacements(relationId, shardId, candidateNodeList,
ShardReplicationFactor);
PG_RETURN_INT64(shardId); PG_RETURN_INT64(shardId);
} }
@ -362,23 +355,23 @@ CheckDistributedTable(Oid relationId)
/* /*
* CreateShardPlacements attempts to create a certain number of placements * CreateAppendDistributedShardPlacements creates shards for append distributed
* (provided by the replicationFactor argument) on the provided list of worker * tables on worker nodes. After successfully creating shard on the worker,
* nodes. Beginning at the provided start index, DDL commands are attempted on * shard placement rows are added to the metadata.
* worker nodes (via WorkerCreateShards). If there are more worker nodes than
* required for replication, one remote failure is tolerated. If the provided
* replication factor is not attained, an error is raised (placements remain on
* nodes if some DDL commands had been successful).
*/ */
void void
CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
char *newPlacementOwner, List *workerNodeList, List *workerNodeList, int replicationFactor)
int workerStartIndex, int replicationFactor)
{ {
int attemptCount = replicationFactor; int attemptCount = replicationFactor;
int workerNodeCount = list_length(workerNodeList); int workerNodeCount = list_length(workerNodeList);
int placementsCreated = 0; int placementsCreated = 0;
int attemptNumber = 0; int attemptNumber = 0;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(relationId);
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
uint32 connectionFlag = FOR_DDL;
char *relationOwner = TableOwner(relationId);
/* if we have enough nodes, add an extra placement attempt for backup */ /* if we have enough nodes, add an extra placement attempt for backup */
if (workerNodeCount > replicationFactor) if (workerNodeCount > replicationFactor)
@ -388,32 +381,32 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
for (attemptNumber = 0; attemptNumber < attemptCount; attemptNumber++) for (attemptNumber = 0; attemptNumber < attemptCount; attemptNumber++)
{ {
int workerNodeIndex = (workerStartIndex + attemptNumber) % workerNodeCount; int workerNodeIndex = attemptNumber % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex); WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
uint32 nodeGroupId = workerNode->groupId;
char *nodeName = workerNode->workerName; char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort; uint32 nodePort = workerNode->workerPort;
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
relationId);
int shardIndex = -1; /* not used in this code path */ int shardIndex = -1; /* not used in this code path */
bool created = false;
created = WorkerCreateShard(relationId, nodeName, nodePort, shardIndex,
shardId, newPlacementOwner, ddlEventList,
foreignConstraintCommandList);
if (created)
{
const RelayFileState shardState = FILE_FINALIZED; const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0; const uint64 shardSize = 0;
MultiConnection *connection =
GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
relationOwner, NULL);
if (PQstatus(connection->pgConn) != CONNECTION_OK)
{
ereport(WARNING, (errmsg("could not connect to node \"%s:%u\"", nodeName,
nodePort)));
continue;
}
WorkerCreateShard(relationId, shardIndex, shardId, ddlCommandList,
foreignConstraintCommandList, connection);
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize,
workerNode->groupId); nodeGroupId);
placementsCreated++; placementsCreated++;
}
else
{
ereport(WARNING, (errmsg("could not create shard on \"%s:%u\"",
nodeName, nodePort)));
}
if (placementsCreated >= replicationFactor) if (placementsCreated >= replicationFactor)
{ {
@ -431,19 +424,121 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList,
/* /*
* WorkerCreateShard applies DDL commands for the given shardId to create the * InsertShardPlacementRows inserts shard placements to the metadata table on
* shard on the worker node. Note that this function opens a new connection for * the coordinator node. Then, returns the list of added shard placements.
* each DDL command, and could leave the shard in an half-initialized state.
*/ */
bool List *
WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, InsertShardPlacementRows(Oid relationId, int64 shardId, List *workerNodeList,
int shardIndex, uint64 shardId, char *newShardOwner, int workerStartIndex, int replicationFactor)
List *ddlCommandList, List *foreignConstraintCommandList) {
int workerNodeCount = list_length(workerNodeList);
int attemptNumber = 0;
int placementsInserted = 0;
List *insertedShardPlacements = NIL;
for (attemptNumber = 0; attemptNumber < replicationFactor; attemptNumber++)
{
int workerNodeIndex = (workerStartIndex + attemptNumber) % workerNodeCount;
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
uint32 nodeGroupId = workerNode->groupId;
const RelayFileState shardState = FILE_FINALIZED;
const uint64 shardSize = 0;
uint64 shardPlacementId = 0;
ShardPlacement *shardPlacement = NULL;
shardPlacementId = InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID,
shardState, shardSize, nodeGroupId);
shardPlacement = LoadShardPlacement(shardId, shardPlacementId);
insertedShardPlacements = lappend(insertedShardPlacements, shardPlacement);
placementsInserted++;
if (placementsInserted >= replicationFactor)
{
break;
}
}
return insertedShardPlacements;
}
/*
* CreateShardsOnWorkers creates shards on worker nodes given the shard placements
* as a parameter. Function opens connections in transactional way. If the caller
* needs an exclusive connection (in case of distributing local table with data
* on it) or creating shards in a transaction, per placement connection is opened
* for each placement.
*/
void
CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection, bool colocatedShard)
{
char *placementOwner = TableOwner(distributedRelationId);
bool includeSequenceDefaults = false;
List *ddlCommandList = GetTableDDLEvents(distributedRelationId,
includeSequenceDefaults);
List *foreignConstraintCommandList = GetTableForeignConstraintCommands(
distributedRelationId);
List *claimedConnectionList = NIL;
ListCell *connectionCell = NULL;
ListCell *shardPlacementCell = NULL;
BeginOrContinueCoordinatedTransaction();
foreach(shardPlacementCell, shardPlacements)
{
ShardPlacement *shardPlacement = (ShardPlacement *) lfirst(shardPlacementCell);
uint64 shardId = shardPlacement->shardId;
ShardInterval *shardInterval = LoadShardInterval(shardId);
MultiConnection *connection = NULL;
int shardIndex = -1;
if (colocatedShard)
{
shardIndex = ShardIndex(shardInterval);
}
connection = GetPlacementConnection(FOR_DDL, shardPlacement,
placementOwner);
if (useExclusiveConnection)
{
ClaimConnectionExclusively(connection);
claimedConnectionList = lappend(claimedConnectionList, connection);
}
RemoteTransactionBeginIfNecessary(connection);
MarkRemoteTransactionCritical(connection);
WorkerCreateShard(distributedRelationId, shardIndex, shardId,
ddlCommandList, foreignConstraintCommandList,
connection);
}
/*
* We need to unclaim all connections to make them usable again for the copy
* command, otherwise copy going to open new connections to placements and
* can not see uncommitted changes.
*/
foreach(connectionCell, claimedConnectionList)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
UnclaimConnection(connection);
}
}
/*
* WorkerCreateShard applies DDL commands for the given shardId to create the
* shard on the worker node. Commands are sent to the worker node over the
* given connection.
*/
void
WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId, List *ddlCommandList,
List *foreignConstraintCommandList, MultiConnection *connection)
{ {
Oid schemaId = get_rel_namespace(relationId); Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId); char *schemaName = get_namespace_name(schemaId);
char *escapedSchemaName = quote_literal_cstr(schemaName); char *escapedSchemaName = quote_literal_cstr(schemaName);
bool shardCreated = true;
ListCell *ddlCommandCell = NULL; ListCell *ddlCommandCell = NULL;
ListCell *foreignConstraintCommandCell = NULL; ListCell *foreignConstraintCommandCell = NULL;
@ -451,7 +546,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
{ {
char *ddlCommand = (char *) lfirst(ddlCommandCell); char *ddlCommand = (char *) lfirst(ddlCommandCell);
char *escapedDDLCommand = quote_literal_cstr(ddlCommand); char *escapedDDLCommand = quote_literal_cstr(ddlCommand);
List *queryResultList = NIL;
StringInfo applyDDLCommand = makeStringInfo(); StringInfo applyDDLCommand = makeStringInfo();
if (strcmp(schemaName, "public") != 0) if (strcmp(schemaName, "public") != 0)
@ -466,13 +560,7 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
escapedDDLCommand); escapedDDLCommand);
} }
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner, ExecuteCriticalRemoteCommand(connection, applyDDLCommand->data);
applyDDLCommand);
if (queryResultList == NIL)
{
shardCreated = false;
break;
}
} }
foreach(foreignConstraintCommandCell, foreignConstraintCommandList) foreach(foreignConstraintCommandCell, foreignConstraintCommandList)
@ -486,7 +574,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
char *escapedReferencedSchemaName = NULL; char *escapedReferencedSchemaName = NULL;
uint64 referencedShardId = INVALID_SHARD_ID; uint64 referencedShardId = INVALID_SHARD_ID;
List *queryResultList = NIL;
StringInfo applyForeignConstraintCommand = makeStringInfo(); StringInfo applyForeignConstraintCommand = makeStringInfo();
/* we need to parse the foreign constraint command to get referencing table id */ /* we need to parse the foreign constraint command to get referencing table id */
@ -522,16 +609,9 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName, WORKER_APPLY_INTER_SHARD_DDL_COMMAND, shardId, escapedSchemaName,
referencedShardId, escapedReferencedSchemaName, escapedCommand); referencedShardId, escapedReferencedSchemaName, escapedCommand);
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
applyForeignConstraintCommand);
if (queryResultList == NIL)
{
shardCreated = false;
break;
}
}
return shardCreated; ExecuteCriticalRemoteCommand(connection, applyForeignConstraintCommand->data);
}
} }

View File

@ -358,6 +358,24 @@ LoadGroupShardPlacement(uint64 shardId, uint64 placementId)
} }
/*
* LoadShardPlacement returns a shard placement for the primary node.
*/
ShardPlacement *
LoadShardPlacement(uint64 shardId, uint64 placementId)
{
ShardCacheEntry *shardEntry = NULL;
GroupShardPlacement *groupPlacement = NULL;
ShardPlacement *nodePlacement = NULL;
shardEntry = LookupShardCacheEntry(shardId);
groupPlacement = LoadGroupShardPlacement(shardId, placementId);
nodePlacement = ResolveGroupShardPlacement(groupPlacement, shardEntry);
return nodePlacement;
}
/* /*
* FindShardPlacementOnGroup returns the shard placement for the given shard * FindShardPlacementOnGroup returns the shard placement for the given shard
* on the given group, or returns NULL of no placement for the shard exists * on the given group, or returns NULL of no placement for the shard exists

View File

@ -126,7 +126,7 @@ extern List * GroupShardPlacementsForTableOnGroup(Oid relationId, uint32 groupId
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue); text *shardMinValue, text *shardMaxValue);
extern void DeleteShardRow(uint64 shardId); extern void DeleteShardRow(uint64 shardId);
extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId, extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
char shardState, uint64 shardLength, char shardState, uint64 shardLength,
uint32 groupId); uint32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod, extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,

View File

@ -18,6 +18,7 @@
#include "c.h" #include "c.h"
#include "fmgr.h" #include "fmgr.h"
#include "distributed/connection_management.h"
#include "distributed/shardinterval_utils.h" #include "distributed/shardinterval_utils.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
@ -106,17 +107,25 @@ extern List * GetTableIndexAndConstraintCommands(Oid relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId);
extern char ShardStorageType(Oid relationId); extern char ShardStorageType(Oid relationId);
extern void CheckDistributedTable(Oid relationId); extern void CheckDistributedTable(Oid relationId);
extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, extern void CreateAppendDistributedShardPlacements(Oid relationId, int64 shardId,
char *newPlacementOwner, List *workerNodeList, List *workerNodeList, int
int workerStartIndex, int replicationFactor); replicationFactor);
extern void CreateShardsOnWorkers(Oid distributedRelationId, List *shardPlacements,
bool useExclusiveConnection,
bool colocatedShard);
extern List * InsertShardPlacementRows(Oid relationId, int64 shardId,
List *workerNodeList, int workerStartIndex,
int replicationFactor);
extern uint64 UpdateShardStatistics(int64 shardId); extern uint64 UpdateShardStatistics(int64 shardId);
extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount, extern void CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
int32 replicationFactor); int32 replicationFactor,
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId); bool useExclusiveConnections);
extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
bool useExclusiveConnections);
extern void CreateReferenceTableShard(Oid distributedTableId); extern void CreateReferenceTableShard(Oid distributedTableId);
extern bool WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort, extern void WorkerCreateShard(Oid relationId, int shardIndex, uint64 shardId,
int shardIndex, uint64 shardId, char *newShardOwner, List *ddlCommandList, List *foreignConstraintCommandList,
List *ddlCommandList, List *foreignConstraintCommadList); MultiConnection *connection);
extern Oid ForeignConstraintGetReferencedTableId(char *queryString); extern Oid ForeignConstraintGetReferencedTableId(char *queryString);
extern void CheckHashPartitionedTable(Oid distributedTableId); extern void CheckHashPartitionedTable(Oid distributedTableId);
extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName, extern void CheckTableSchemaNameForDrop(Oid relationId, char **schemaName,

View File

@ -71,6 +71,7 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId); extern ShardPlacement * FindShardPlacementOnGroup(uint32 groupId, uint64 shardId);
extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId); extern GroupShardPlacement * LoadGroupShardPlacement(uint64 shardId, uint64 placementId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern int GetLocalGroupId(void); extern int GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);

View File

@ -48,8 +48,7 @@ DETAIL: Distributed relations must not specify the WITH (OIDS) option in their
ALTER TABLE table_to_distribute SET WITHOUT OIDS; ALTER TABLE table_to_distribute SET WITHOUT OIDS;
-- use an index instead of table name -- use an index instead of table name
SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash'); SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash');
ERROR: cannot distribute relation: table_to_distribute_pkey ERROR: table_to_distribute_pkey is not a regular or foreign table
DETAIL: Distributed relations must be regular or foreign tables.
-- use a bad column name -- use a bad column name
SELECT master_create_distributed_table('table_to_distribute', 'bad_column', 'hash'); SELECT master_create_distributed_table('table_to_distribute', 'bad_column', 'hash');
ERROR: column "bad_column" of relation "table_to_distribute" does not exist ERROR: column "bad_column" of relation "table_to_distribute" does not exist

View File

@ -552,3 +552,279 @@ SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
u u
(4 rows) (4 rows)
\c - - - :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
--------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
--------+------+-----------
(0 rows)
\c - - - :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
--------+------+-----------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
--------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
CREATE INDEX rollback_index ON rollback_table(id);
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
count
-------
3
(1 row)
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
create_distributed_table
--------------------------
(1 row)
\copy rollback_table from stdin delimiter ','
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
Column | Type | Modifiers
--------+------+-----------
(0 rows)
\c - - - :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
--------------------------
(1 row)
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
create_distributed_table
--------------------------
(1 row)
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360430'::regclass;
Column | Type | Modifiers
--------+---------+-----------
id | integer |
(1 row)
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360462'::regclass;
Column | Type | Modifiers
--------+---------+-----------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
create_distributed_table
--------------------------
(1 row)
SELECT master_create_empty_shard('append_tt1');
master_create_empty_shard
---------------------------
360494
(1 row)
ROLLBACK;
-- Table exists on the worker node.
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360494'::regclass;
Column | Type | Modifiers
--------+---------+-----------
id | integer |
(1 row)
\c - - - :master_port
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
Column | Type | Modifiers
--------+------+-----------
(0 rows)
\c - - - :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
id
----
1
(1 row)
COMMIT;
-- Placements should be created on the worker
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360495'::regclass;
Column | Type | Modifiers
--------+---------+-----------
id | integer |
(1 row)
\c - - - :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
--------------------------
(1 row)
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
Column | Type | Modifiers
--------+------+-----------
(0 rows)
\c - - - :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
create_distributed_table
--------------------------
(1 row)
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
SELECT create_distributed_table('stage_table', 'id');
NOTICE: Copying data from local table...
create_distributed_table
--------------------------
(1 row)
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
id
----
3
(1 row)
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
count
-------
4
(1 row)
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
create_distributed_table
--------------------------
(1 row)
\COPY tt1 from stdin;
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
create_distributed_table
--------------------------
(1 row)
\COPY tt2 from stdin;
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
id
----
3
(1 row)
SELECT * FROM tt2 WHERE id = 6;
id
----
6
(1 row)
END;
SELECT count(*) FROM tt1;
count
-------
6
(1 row)
DROP TABLE tt1;
DROP TABLE tt2;

View File

@ -978,14 +978,6 @@ SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int; ALTER TABLE explain_table ADD COLUMN value int;
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK; ROLLBACK;
-- test explain with local INSERT ... SELECT -- test explain with local INSERT ... SELECT
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)

View File

@ -978,14 +978,6 @@ SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int; ALTER TABLE explain_table ADD COLUMN value int;
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc' HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
Custom Scan (Citus Router)
Task Count: 1
Tasks Shown: All
-> Task
Node: host=localhost port=57637 dbname=regression
-> Seq Scan on explain_table_570001 explain_table
Filter: (id = 1)
ROLLBACK; ROLLBACK;
-- test explain with local INSERT ... SELECT -- test explain with local INSERT ... SELECT
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)

View File

@ -60,7 +60,6 @@ SELECT unnest(master_metadata_snapshot());
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
ALTER TABLE public.mx_test_table OWNER TO postgres ALTER TABLE public.mx_test_table OWNER TO postgres
@ -68,7 +67,7 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_create_truncate_trigger('public.mx_test_table') SELECT worker_create_truncate_trigger('public.mx_test_table')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007) INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(13 rows) (12 rows)
-- Show that CREATE INDEX commands are included in the metadata snapshot -- Show that CREATE INDEX commands are included in the metadata snapshot
CREATE INDEX mx_index ON mx_test_table(col_2); CREATE INDEX mx_index ON mx_test_table(col_2);
@ -82,7 +81,6 @@ SELECT unnest(master_metadata_snapshot());
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE) INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, isactive) VALUES (2, 2, 'localhost', 57638, 'default', FALSE, TRUE),(1, 1, 'localhost', 57637, 'default', FALSE, TRUE)
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2) TABLESPACE pg_default
ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -91,7 +89,7 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_create_truncate_trigger('public.mx_test_table') SELECT worker_create_truncate_trigger('public.mx_test_table')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007) INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(14 rows) (13 rows)
-- Show that schema changes are included in the metadata snapshot -- Show that schema changes are included in the metadata snapshot
CREATE SCHEMA mx_testing_schema; CREATE SCHEMA mx_testing_schema;
@ -108,7 +106,6 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -117,7 +114,7 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007) INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows) (15 rows)
-- Show that append distributed tables are not included in the metadata snapshot -- Show that append distributed tables are not included in the metadata snapshot
CREATE TABLE non_mx_test_table (col_1 int, col_2 text); CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
@ -138,7 +135,6 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -147,7 +143,7 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007) INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows) (15 rows)
-- Show that range distributed tables are not included in the metadata snapshot -- Show that range distributed tables are not included in the metadata snapshot
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass; UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
@ -161,7 +157,6 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE') SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL) CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2) TABLESPACE pg_default
ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1)
@ -170,7 +165,7 @@ SELECT unnest(master_metadata_snapshot());
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table') SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007) INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
(16 rows) (15 rows)
-- Test start_metadata_sync_to_node UDF -- Test start_metadata_sync_to_node UDF
-- Ensure that hasmetadata=false for all nodes -- Ensure that hasmetadata=false for all nodes

View File

@ -224,11 +224,11 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'objects_for_xacts2' and schema
0 0
(1 row) (1 row)
-- but the shard exists since we do not create shards in a transaction -- shard also does not exist since we create shards in a transaction
SELECT count(*) FROM pg_tables WHERE tablename LIKE 'objects_for_xacts2_%' and schemaname = 'citus_mx_schema_for_xacts'; SELECT count(*) FROM pg_tables WHERE tablename LIKE 'objects_for_xacts2_%' and schemaname = 'citus_mx_schema_for_xacts';
count count
------- -------
1 0
(1 row) (1 row)
-- make sure that master_drop_all_shards does not work from the worker nodes -- make sure that master_drop_all_shards does not work from the worker nodes

View File

@ -119,15 +119,6 @@ ERROR: cannot associate an existing sequence with a distributed table
HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards. HINT: Use a sequence in a distributed table by specifying a serial column type before creating any shards.
-- an edge case, but it's OK to change an owner to the same distributed table -- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
List of relations
Schema | Name | Type | Owner
--------+------------------------+----------+----------
public | testserialtable_id_seq | sequence | postgres
(1 row)
-- drop distributed table -- drop distributed table
\c - - - :master_port \c - - - :master_port
DROP TABLE testserialtable; DROP TABLE testserialtable;

View File

@ -504,9 +504,8 @@ ORDER BY
\ds transactional_drop_serial_column2_seq \ds transactional_drop_serial_column2_seq
List of relations List of relations
Schema | Name | Type | Owner Schema | Name | Type | Owner
--------+---------------------------------------+----------+---------- --------+------+------+-------
public | transactional_drop_serial_column2_seq | sequence | postgres (0 rows)
(1 row)
\c - - - :master_port \c - - - :master_port
-- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT -- test DROP TABLE(ergo master_drop_all_shards) in transaction, then COMMIT

View File

@ -440,8 +440,7 @@ SELECT master_create_distributed_table('customer_worker_copy_append_seq', 'id',
\c - - - 57637 \c - - - 57637
-- Test copy from the worker node -- Test copy from the worker node
COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636); COPY customer_worker_copy_append_seq FROM '@abs_srcdir@/data/customer.1.data' with (delimiter '|', master_host 'localhost', master_port 57636);
ERROR: cannot copy to table with serial column from worker ERROR: relation "public.customer_worker_copy_append_seq_seq_seq" does not exist
HINT: Connect to the master node to COPY to tables which use serial column types.
-- Connect back to the master node -- Connect back to the master node
\c - - - 57636 \c - - - 57636
-- Create customer table for the worker copy with constraint and index -- Create customer table for the worker copy with constraint and index

View File

@ -274,7 +274,6 @@ SELECT * FROM data_load_test;
DROP TABLE data_load_test; DROP TABLE data_load_test;
SET citus.shard_replication_factor TO default; SET citus.shard_replication_factor TO default;
SET citus.shard_count to 4; SET citus.shard_count to 4;
CREATE TABLE lineitem_hash_part (like lineitem); CREATE TABLE lineitem_hash_part (like lineitem);
@ -293,3 +292,177 @@ SELECT * FROM master_get_table_ddl_events('unlogged_table');
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%'; SELECT relpersistence FROM pg_class WHERE relname LIKE 'unlogged_table_%';
\c - - - :master_port
-- Test rollback of create table
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - - :master_port
-- Insert 3 rows to make sure that copy after shard creation touches the same
-- worker node twice.
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
INSERT INTO rollback_table VALUES(1, 'Name_1');
INSERT INTO rollback_table VALUES(2, 'Name_2');
INSERT INTO rollback_table VALUES(3, 'Name_3');
SELECT create_distributed_table('rollback_table','id');
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - - :master_port
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
CREATE INDEX rollback_index ON rollback_table(id);
COMMIT;
-- Check the table is created
SELECT count(*) FROM rollback_table;
DROP TABLE rollback_table;
BEGIN;
CREATE TABLE rollback_table(id int, name varchar(20));
SELECT create_distributed_table('rollback_table','id');
\copy rollback_table from stdin delimiter ','
1, 'name_1'
2, 'name_2'
3, 'name_3'
\.
ROLLBACK;
-- Table should not exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid FROM pg_class WHERE relname LIKE 'rollback_table%');
\c - - - :master_port
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
CREATE TABLE tt2(id int);
SELECT create_distributed_table('tt2','id');
INSERT INTO tt1 VALUES(1);
INSERT INTO tt2 SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Table should exist on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360430'::regclass;
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt2_360462'::regclass;
\c - - - :master_port
DROP TABLE tt1;
DROP TABLE tt2;
-- It is known that creating a table with master_create_empty_shard is not
-- transactional, so table stay remaining on the worker node after the rollback
BEGIN;
CREATE TABLE append_tt1(id int);
SELECT create_distributed_table('append_tt1','id','append');
SELECT master_create_empty_shard('append_tt1');
ROLLBACK;
-- Table exists on the worker node.
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.append_tt1_360494'::regclass;
\c - - - :master_port
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'public.tt1%');
\c - - - :master_port
-- Queries executing with router executor is allowed in the same transaction
-- with create_distributed_table
BEGIN;
CREATE TABLE tt1(id int);
INSERT INTO tt1 VALUES(1);
SELECT create_distributed_table('tt1','id');
INSERT INTO tt1 VALUES(2);
SELECT * FROM tt1 WHERE id = 1;
COMMIT;
-- Placements should be created on the worker
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = 'public.tt1_360495'::regclass;
\c - - - :master_port
DROP TABLE tt1;
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
DROP TABLE tt1;
COMMIT;
-- There should be no table on the worker node
\c - - - :worker_1_port
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid = (SELECT oid from pg_class WHERE relname LIKE 'tt1%');
\c - - - :master_port
-- Tests with create_distributed_table & DDL & DML commands
-- Test should pass since GetPlacementListConnection can provide connections
-- in this order of execution
CREATE TABLE sample_table(id int);
SELECT create_distributed_table('sample_table','id');
BEGIN;
CREATE TABLE stage_table (LIKE sample_table);
\COPY stage_table FROM stdin; -- Note that this operation is a local copy
1
2
3
4
\.
SELECT create_distributed_table('stage_table', 'id');
INSERT INTO sample_table SELECT * FROM stage_table;
DROP TABLE stage_table;
SELECT * FROM sample_table WHERE id = 3;
COMMIT;
-- Show that rows of sample_table are updated
SELECT count(*) FROM sample_table;
DROP table sample_table;
-- Test as create_distributed_table - copy - create_distributed_table - copy
-- This combination is used by tests written by some ORMs.
BEGIN;
CREATE TABLE tt1(id int);
SELECT create_distributed_table('tt1','id');
\COPY tt1 from stdin;
1
2
3
\.
CREATE TABLE tt2(like tt1);
SELECT create_distributed_table('tt2','id');
\COPY tt2 from stdin;
4
5
6
\.
INSERT INTO tt1 SELECT * FROM tt2;
SELECT * FROM tt1 WHERE id = 3;
SELECT * FROM tt2 WHERE id = 6;
END;
SELECT count(*) FROM tt1;
DROP TABLE tt1;
DROP TABLE tt2;

View File

@ -463,8 +463,6 @@ SELECT create_distributed_table('explain_table', 'id');
ALTER TABLE explain_table ADD COLUMN value int; ALTER TABLE explain_table ADD COLUMN value int;
EXPLAIN (COSTS FALSE) SELECT value FROM explain_table WHERE id = 1;
ROLLBACK; ROLLBACK;
-- test explain with local INSERT ... SELECT -- test explain with local INSERT ... SELECT

View File

@ -128,7 +128,7 @@ SELECT count(*) FROM pg_tables WHERE tablename = 'objects_for_xacts2' and schema
-- the distributed table not exists on the worker node -- the distributed table not exists on the worker node
SELECT count(*) FROM pg_tables WHERE tablename = 'objects_for_xacts2' and schemaname = 'citus_mx_schema_for_xacts'; SELECT count(*) FROM pg_tables WHERE tablename = 'objects_for_xacts2' and schemaname = 'citus_mx_schema_for_xacts';
-- but the shard exists since we do not create shards in a transaction -- shard also does not exist since we create shards in a transaction
SELECT count(*) FROM pg_tables WHERE tablename LIKE 'objects_for_xacts2_%' and schemaname = 'citus_mx_schema_for_xacts'; SELECT count(*) FROM pg_tables WHERE tablename LIKE 'objects_for_xacts2_%' and schemaname = 'citus_mx_schema_for_xacts';
-- make sure that master_drop_all_shards does not work from the worker nodes -- make sure that master_drop_all_shards does not work from the worker nodes

View File

@ -80,10 +80,6 @@ ALTER SEQUENCE standalone_sequence OWNED BY testserialtable.group_id;
-- an edge case, but it's OK to change an owner to the same distributed table -- an edge case, but it's OK to change an owner to the same distributed table
ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id; ALTER SEQUENCE testserialtable_id_seq OWNED BY testserialtable.id;
-- verify sequence was created on worker
\c - - - :worker_1_port
\ds
-- drop distributed table -- drop distributed table
\c - - - :master_port \c - - - :master_port
DROP TABLE testserialtable; DROP TABLE testserialtable;