Merge pull request #1103 from citusdata/mx/reference_table_support

MX Support for Reference Tables
pull/1143/head
Eren Başak 2017-01-18 17:15:12 +02:00 committed by GitHub
commit c0f1a7609f
22 changed files with 1635 additions and 168 deletions

View File

@ -80,7 +80,6 @@ static void CreateHashDistributedTable(Oid relationId, char *distributionColumnN
int shardCount, int replicationFactor); int shardCount, int replicationFactor);
static Oid ColumnType(Oid relationId, char *columnName); static Oid ColumnType(Oid relationId, char *columnName);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(master_create_distributed_table); PG_FUNCTION_INFO_V1(master_create_distributed_table);
PG_FUNCTION_INFO_V1(create_distributed_table); PG_FUNCTION_INFO_V1(create_distributed_table);
@ -177,18 +176,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
List *commandList = GetDistributedTableDDLEvents(relationId); CreateTableMetadataOnWorkers(relationId);
ListCell *commandCell = NULL;
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
} }
PG_RETURN_VOID(); PG_RETURN_VOID();
@ -244,6 +232,8 @@ CreateReferenceTable(Oid relationId)
/* now, create the single shard replicated to all nodes */ /* now, create the single shard replicated to all nodes */
CreateReferenceTableShard(relationId); CreateReferenceTableShard(relationId);
CreateTableMetadataOnWorkers(relationId);
} }

View File

@ -400,6 +400,12 @@ CopyToExistingShards(CopyStmt *copyStatement, char *completionTag)
columnValues = palloc0(columnCount * sizeof(Datum)); columnValues = palloc0(columnCount * sizeof(Datum));
columnNulls = palloc0(columnCount * sizeof(bool)); columnNulls = palloc0(columnCount * sizeof(bool));
/* we don't support copy to reference tables from workers */
if (partitionMethod == DISTRIBUTE_BY_NONE)
{
EnsureSchemaNode();
}
/* load the list of shards and verify that we have shards to copy into */ /* load the list of shards and verify that we have shards to copy into */
shardIntervalList = LoadShardIntervalList(tableId); shardIntervalList = LoadShardIntervalList(tableId);
if (shardIntervalList == NIL) if (shardIntervalList == NIL)

View File

@ -307,10 +307,9 @@ GetNextShardId()
/* /*
* master_get_new_placementid allocates and returns a unique placementId for * master_get_new_placementid is a user facing wrapper function around
* the placement to be created. This allocation occurs both in shared memory * GetNextPlacementId() which allocates and returns a unique placement id for the
* and in write ahead logs; writing to logs avoids the risk of having shardId * placement to be created.
* collisions.
* *
* NB: This can be called by any user; for now we have decided that that's * NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such * ok. We might want to restrict this to users part of a specific role or such
@ -318,25 +317,51 @@ GetNextShardId()
*/ */
Datum Datum
master_get_new_placementid(PG_FUNCTION_ARGS) master_get_new_placementid(PG_FUNCTION_ARGS)
{
uint64 placementId = 0;
Datum placementIdDatum = 0;
EnsureSchemaNode();
placementId = GetNextPlacementId();
placementIdDatum = Int64GetDatum(placementId);
PG_RETURN_DATUM(placementIdDatum);
}
/*
* GetNextPlacementId allocates and returns a unique placementId for
* the placement to be created. This allocation occurs both in shared memory
* and in write ahead logs; writing to logs avoids the risk of having shardId
* collisions.
*
* NB: This can be called by any user; for now we have decided that that's
* ok. We might want to restrict this to users part of a specific role or such
* at some later point.
*/
uint64
GetNextPlacementId(void)
{ {
text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME);
Oid sequenceId = ResolveRelationId(sequenceName); Oid sequenceId = ResolveRelationId(sequenceName);
Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId);
Oid savedUserId = InvalidOid; Oid savedUserId = InvalidOid;
int savedSecurityContext = 0; int savedSecurityContext = 0;
Datum shardIdDatum = 0; Datum placementIdDatum = 0;
uint64 placementId = 0;
EnsureSchemaNode();
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
/* generate new and unique shardId from sequence */ /* generate new and unique placement id from sequence */
shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); placementIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum);
SetUserIdAndSecContext(savedUserId, savedSecurityContext); SetUserIdAndSecContext(savedUserId, savedSecurityContext);
PG_RETURN_DATUM(shardIdDatum); placementId = DatumGetInt64(placementIdDatum);
return placementId;
} }

View File

@ -63,11 +63,11 @@ PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
/* /*
* start_metadata_sync_to_node function creates the metadata in a worker for preparing the * start_metadata_sync_to_node function creates the metadata in a worker for preparing the
* worker for accepting MX-table queries. The function first sets the localGroupId of the * worker for accepting queries. The function first sets the localGroupId of the worker
* worker so that the worker knows which tuple in pg_dist_node table represents itself. * so that the worker knows which tuple in pg_dist_node table represents itself. After
* After that, SQL statetemens for re-creating metadata about mx distributed * that, SQL statetemens for re-creating metadata of MX-eligible distributed tables are
* tables are sent to the worker. Finally, the hasmetadata column of the target node in * sent to the worker. Finally, the hasmetadata column of the target node in pg_dist_node
* pg_dist_node is marked as true. * is marked as true.
*/ */
Datum Datum
start_metadata_sync_to_node(PG_FUNCTION_ARGS) start_metadata_sync_to_node(PG_FUNCTION_ARGS)
@ -132,7 +132,7 @@ start_metadata_sync_to_node(PG_FUNCTION_ARGS)
/* /*
* stop_metadata_sync_to_node function sets the hasmetadata column of the specified node * stop_metadata_sync_to_node function sets the hasmetadata column of the specified node
* to false in pg_dist_node table, thus indicating that the specified worker node does not * to false in pg_dist_node table, thus indicating that the specified worker node does not
* receive DDL changes anymore and cannot be used for issuing mx queries. * receive DDL changes anymore and cannot be used for issuing queries.
*/ */
Datum Datum
stop_metadata_sync_to_node(PG_FUNCTION_ARGS) stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
@ -159,19 +159,24 @@ stop_metadata_sync_to_node(PG_FUNCTION_ARGS)
/* /*
* ShouldSyncTableMetadata checks if a distributed table has streaming replication model * ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* and hash distribution. In that case the distributed table is considered an MX table, * propagated to metadata workers, i.e. the table is an MX table or reference table.
* and its metadata is required to exist on the worker nodes. * Tables with streaming replication model (which means RF=1) and hash distribution are
* considered as MX tables while tables with none distribution are reference tables.
*/ */
bool bool
ShouldSyncTableMetadata(Oid relationId) ShouldSyncTableMetadata(Oid relationId)
{ {
DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId); DistTableCacheEntry *tableEntry = DistributedTableCacheEntry(relationId);
bool usesHashDistribution = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
bool usesStreamingReplication = bool hashDistributed = (tableEntry->partitionMethod == DISTRIBUTE_BY_HASH);
bool streamingReplicated =
(tableEntry->replicationModel == REPLICATION_MODEL_STREAMING); (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING);
if (usesStreamingReplication && usesHashDistribution) bool mxTable = (streamingReplicated && hashDistributed);
bool referenceTable = (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE);
if (mxTable || referenceTable)
{ {
return true; return true;
} }
@ -199,7 +204,7 @@ MetadataCreateCommands(void)
{ {
List *metadataSnapshotCommandList = NIL; List *metadataSnapshotCommandList = NIL;
List *distributedTableList = DistributedTableList(); List *distributedTableList = DistributedTableList();
List *mxTableList = NIL; List *propagatedTableList = NIL;
List *workerNodeList = WorkerNodeList(); List *workerNodeList = WorkerNodeList();
ListCell *distributedTableCell = NULL; ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL; char *nodeListInsertCommand = NULL;
@ -209,19 +214,19 @@ MetadataCreateCommands(void)
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList, metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
nodeListInsertCommand); nodeListInsertCommand);
/* create the list of mx tables */ /* create the list of tables whose metadata will be created */
foreach(distributedTableCell, distributedTableList) foreach(distributedTableCell, distributedTableList)
{ {
DistTableCacheEntry *cacheEntry = DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell); (DistTableCacheEntry *) lfirst(distributedTableCell);
if (ShouldSyncTableMetadata(cacheEntry->relationId)) if (ShouldSyncTableMetadata(cacheEntry->relationId))
{ {
mxTableList = lappend(mxTableList, cacheEntry); propagatedTableList = lappend(propagatedTableList, cacheEntry);
} }
} }
/* create the mx tables, but not the metadata */ /* create the tables, but not the metadata */
foreach(distributedTableCell, mxTableList) foreach(distributedTableCell, propagatedTableList)
{ {
DistTableCacheEntry *cacheEntry = DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell); (DistTableCacheEntry *) lfirst(distributedTableCell);
@ -240,7 +245,7 @@ MetadataCreateCommands(void)
} }
/* construct the foreign key constraints after all tables are created */ /* construct the foreign key constraints after all tables are created */
foreach(distributedTableCell, mxTableList) foreach(distributedTableCell, propagatedTableList)
{ {
DistTableCacheEntry *cacheEntry = DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell); (DistTableCacheEntry *) lfirst(distributedTableCell);
@ -253,7 +258,7 @@ MetadataCreateCommands(void)
} }
/* after all tables are created, create the metadata */ /* after all tables are created, create the metadata */
foreach(distributedTableCell, mxTableList) foreach(distributedTableCell, propagatedTableList)
{ {
DistTableCacheEntry *cacheEntry = DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell); (DistTableCacheEntry *) lfirst(distributedTableCell);
@ -323,7 +328,7 @@ GetDistributedTableDDLEvents(Oid relationId)
metadataCommand = DistributionCreateCommand(cacheEntry); metadataCommand = DistributionCreateCommand(cacheEntry);
commandList = lappend(commandList, metadataCommand); commandList = lappend(commandList, metadataCommand);
/* commands to create the truncate trigger of the mx table */ /* commands to create the truncate trigger of the table */
truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId); truncateTriggerCreateCommand = TruncateTriggerCreateCommand(relationId);
commandList = lappend(commandList, truncateTriggerCreateCommand); commandList = lappend(commandList, truncateTriggerCreateCommand);
@ -436,19 +441,30 @@ DistributionCreateCommand(DistTableCacheEntry *cacheEntry)
char *partitionKeyString = cacheEntry->partitionKeyString; char *partitionKeyString = cacheEntry->partitionKeyString;
char *qualifiedRelationName = char *qualifiedRelationName =
generate_qualified_relation_name(relationId); generate_qualified_relation_name(relationId);
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
uint32 colocationId = cacheEntry->colocationId; uint32 colocationId = cacheEntry->colocationId;
char replicationModel = cacheEntry->replicationModel; char replicationModel = cacheEntry->replicationModel;
StringInfo tablePartitionKeyString = makeStringInfo();
if (distributionMethod == DISTRIBUTE_BY_NONE)
{
appendStringInfo(tablePartitionKeyString, "NULL");
}
else
{
char *partitionKeyColumnName = ColumnNameToColumn(relationId, partitionKeyString);
appendStringInfo(tablePartitionKeyString, "column_name_to_column(%s,%s)",
quote_literal_cstr(qualifiedRelationName),
quote_literal_cstr(partitionKeyColumnName));
}
appendStringInfo(insertDistributionCommand, appendStringInfo(insertDistributionCommand,
"INSERT INTO pg_dist_partition " "INSERT INTO pg_dist_partition "
"(logicalrelid, partmethod, partkey, colocationid, repmodel) " "(logicalrelid, partmethod, partkey, colocationid, repmodel) "
"VALUES " "VALUES "
"(%s::regclass, '%c', column_name_to_column(%s,%s), %d, '%c')", "(%s::regclass, '%c', %s, %d, '%c')",
quote_literal_cstr(qualifiedRelationName), quote_literal_cstr(qualifiedRelationName),
distributionMethod, distributionMethod,
quote_literal_cstr(qualifiedRelationName), tablePartitionKeyString->data,
quote_literal_cstr(partitionKeyColumnName),
colocationId, colocationId,
replicationModel); replicationModel);
@ -511,7 +527,6 @@ ShardListInsertCommand(List *shardIntervalList)
StringInfo insertShardCommand = makeStringInfo(); StringInfo insertShardCommand = makeStringInfo();
int shardCount = list_length(shardIntervalList); int shardCount = list_length(shardIntervalList);
int processedShardCount = 0; int processedShardCount = 0;
int processedShardPlacementCount = 0;
/* if there are no shards, return empty list */ /* if there are no shards, return empty list */
if (shardCount == 0) if (shardCount == 0)
@ -519,13 +534,6 @@ ShardListInsertCommand(List *shardIntervalList)
return commandList; return commandList;
} }
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
"VALUES ");
/* add placements to insertPlacementCommand */ /* add placements to insertPlacementCommand */
foreach(shardIntervalCell, shardIntervalList) foreach(shardIntervalCell, shardIntervalList)
{ {
@ -533,12 +541,25 @@ ShardListInsertCommand(List *shardIntervalList)
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
List *shardPlacementList = FinalizedShardPlacementList(shardId); List *shardPlacementList = FinalizedShardPlacementList(shardId);
ShardPlacement *placement = NULL; ListCell *shardPlacementCell = NULL;
/* the function only handles single placement per shard */ foreach(shardPlacementCell, shardPlacementList)
Assert(list_length(shardPlacementList) == 1); {
ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell);
placement = (ShardPlacement *) linitial(shardPlacementList); if (insertPlacementCommand->len == 0)
{
/* generate the shard placement query without any values yet */
appendStringInfo(insertPlacementCommand,
"INSERT INTO pg_dist_shard_placement "
"(shardid, shardstate, shardlength,"
" nodename, nodeport, placementid) "
"VALUES ");
}
else
{
appendStringInfo(insertPlacementCommand, ",");
}
appendStringInfo(insertPlacementCommand, appendStringInfo(insertPlacementCommand,
"(%lu, 1, %lu, %s, %d, %lu)", "(%lu, 1, %lu, %s, %d, %lu)",
@ -547,11 +568,6 @@ ShardListInsertCommand(List *shardIntervalList)
quote_literal_cstr(placement->nodeName), quote_literal_cstr(placement->nodeName),
placement->nodePort, placement->nodePort,
placement->placementId); placement->placementId);
processedShardPlacementCount++;
if (processedShardPlacementCount != shardCount)
{
appendStringInfo(insertPlacementCommand, ",");
} }
} }
@ -573,17 +589,36 @@ ShardListInsertCommand(List *shardIntervalList)
Oid distributedRelationId = shardInterval->relationId; Oid distributedRelationId = shardInterval->relationId;
char *qualifiedRelationName = generate_qualified_relation_name( char *qualifiedRelationName = generate_qualified_relation_name(
distributedRelationId); distributedRelationId);
StringInfo minHashToken = makeStringInfo();
StringInfo maxHashToken = makeStringInfo();
int minHashToken = DatumGetInt32(shardInterval->minValue); if (shardInterval->minValueExists)
int maxHashToken = DatumGetInt32(shardInterval->maxValue); {
appendStringInfo(minHashToken, "'%d'", DatumGetInt32(
shardInterval->minValue));
}
else
{
appendStringInfo(minHashToken, "NULL");
}
if (shardInterval->maxValueExists)
{
appendStringInfo(maxHashToken, "'%d'", DatumGetInt32(
shardInterval->maxValue));
}
else
{
appendStringInfo(maxHashToken, "NULL");
}
appendStringInfo(insertShardCommand, appendStringInfo(insertShardCommand,
"(%s::regclass, %lu, '%c', '%d', '%d')", "(%s::regclass, %lu, '%c', %s, %s)",
quote_literal_cstr(qualifiedRelationName), quote_literal_cstr(qualifiedRelationName),
shardId, shardId,
shardInterval->storageType, shardInterval->storageType,
minHashToken, minHashToken->data,
maxHashToken); maxHashToken->data);
processedShardCount++; processedShardCount++;
if (processedShardCount != shardCount) if (processedShardCount != shardCount)
@ -635,6 +670,24 @@ ColocationIdUpdateCommand(Oid relationId, uint32 colocationId)
} }
/*
* PlacementUpsertCommand creates a SQL command for upserting a pg_dist_shard_placment
* entry with the given properties. In the case of a conflict on placementId, the command
* updates all properties (excluding the placementId) with the given ones.
*/
char *
PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort)
{
StringInfo command = makeStringInfo();
appendStringInfo(command, UPSERT_PLACEMENT, shardId, shardState, shardLength,
quote_literal_cstr(nodeName), nodePort, placementId);
return command->data;
}
/* /*
* LocalGroupIdUpdateCommand creates the SQL command required to set the local group id * LocalGroupIdUpdateCommand creates the SQL command required to set the local group id
* of a worker and returns the command in a string. * of a worker and returns the command in a string.
@ -899,3 +952,29 @@ HasMetadataWorkers(void)
return false; return false;
} }
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers
* with hasmetadata=true. Before sending the commands, in order to prevent recursive
* propagation, DDL propagation on workers are disabled with a
* `SET citus.enable_ddl_propagation TO off;` command.
*/
void
CreateTableMetadataOnWorkers(Oid relationId)
{
List *commandList = GetDistributedTableDDLEvents(relationId);
ListCell *commandCell = NULL;
/* prevent recursive propagation */
SendCommandToWorkers(WORKERS_WITH_METADATA, DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
foreach(commandCell, commandList)
{
char *command = (char *) lfirst(commandCell);
SendCommandToWorkers(WORKERS_WITH_METADATA, command);
}
}

View File

@ -364,12 +364,18 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
bool hasShardPlacements = false; bool hasShardPlacements = false;
WorkerNode *workerNode = NULL; WorkerNode *workerNode = NULL;
List *referenceTableList = NIL; List *referenceTableList = NIL;
uint32 deletedNodeId = INVALID_PLACEMENT_ID;
EnsureSchemaNode(); EnsureSchemaNode();
EnsureSuperUser(); EnsureSuperUser();
workerNode = FindWorkerNode(nodeName, nodePort); workerNode = FindWorkerNode(nodeName, nodePort);
if (workerNode != NULL)
{
deletedNodeId = workerNode->nodeId;
}
DeleteNodeRow(nodeName, nodePort); DeleteNodeRow(nodeName, nodePort);
DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort); DeleteAllReferenceTablePlacementsFromNode(nodeName, nodePort);
@ -410,7 +416,7 @@ RemoveNodeFromCluster(char *nodeName, int32 nodePort, bool forceRemove)
} }
} }
nodeDeleteCommand = NodeDeleteCommand(workerNode->nodeId); nodeDeleteCommand = NodeDeleteCommand(deletedNodeId);
/* make sure we don't have any lingering session lifespan connections */ /* make sure we don't have any lingering session lifespan connections */
CloseNodeConnectionsAfterTransaction(nodeName, nodePort); CloseNodeConnectionsAfterTransaction(nodeName, nodePort);
@ -728,6 +734,7 @@ DeleteNodeRow(char *nodeName, int32 nodePort)
NULL, scanKeyCount, scanKey); NULL, scanKeyCount, scanKey);
heapTuple = systable_getnext(heapScan); heapTuple = systable_getnext(heapScan);
if (!HeapTupleIsValid(heapTuple)) if (!HeapTupleIsValid(heapTuple))
{ {
ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"", ereport(ERROR, (errmsg("could not find valid entry for node \"%s:%d\"",

View File

@ -20,6 +20,7 @@
#include "distributed/master_protocol.h" #include "distributed/master_protocol.h"
#include "distributed/master_metadata_utility.h" #include "distributed/master_metadata_utility.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/multi_logical_planner.h" #include "distributed/multi_logical_planner.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
#include "distributed/resource_lock.h" #include "distributed/resource_lock.h"
@ -54,6 +55,7 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
uint64 shardId = INVALID_SHARD_ID; uint64 shardId = INVALID_SHARD_ID;
DistTableCacheEntry *tableEntry = NULL;
EnsureSchemaNode(); EnsureSchemaNode();
@ -67,7 +69,9 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
"create_reference_table('%s');", relationName))); "create_reference_table('%s');", relationName)));
} }
if (PartitionMethod(relationId) == DISTRIBUTE_BY_NONE) tableEntry = DistributedTableCacheEntry(relationId);
if (tableEntry->partitionMethod == DISTRIBUTE_BY_NONE)
{ {
char *relationName = get_rel_name(relationId); char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
@ -76,6 +80,16 @@ upgrade_to_reference_table(PG_FUNCTION_ARGS)
relationName))); relationName)));
} }
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("cannot upgrade to reference table"),
errdetail("Upgrade is only supported for statement-based "
"replicated tables but \"%s\" is streaming replicated",
relationName)));
}
shardIntervalList = LoadShardIntervalList(relationId); shardIntervalList = LoadShardIntervalList(relationId);
if (list_length(shardIntervalList) != 1) if (list_length(shardIntervalList) != 1)
{ {
@ -198,11 +212,18 @@ ReplicateSingleShardTableToAllWorkers(Oid relationId)
ReplicateShardToAllWorkers(shardInterval); ReplicateShardToAllWorkers(shardInterval);
/* /*
* After copying the shards, we need to update metadata tables to mark this table as * We need to update metadata tables to mark this table as reference table. We modify
* reference table. We modify pg_dist_partition, pg_dist_colocation and pg_dist_shard * pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in
* tables in ConvertToReferenceTableMetadata function. * ConvertToReferenceTableMetadata function.
*/ */
ConvertToReferenceTableMetadata(relationId, shardId); ConvertToReferenceTableMetadata(relationId, shardId);
/*
* After the table has been officially marked as a reference table, we need to create
* the reference table itself and insert its pg_dist_partition, pg_dist_shard and
* existing pg_dist_shard_placement rows.
*/
CreateTableMetadataOnWorkers(relationId);
} }
@ -248,18 +269,45 @@ ReplicateShardToAllWorkers(ShardInterval *shardInterval)
nodeName, nodePort, nodeName, nodePort,
missingWorkerOk); missingWorkerOk);
/*
* Although this function is used for reference tables and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of
* a non-reference table to reference table, unhealty placements may exist. In
* this case, we repair the shard placement and update its state in
* pg_dist_shard_placement table.
*/
if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED) if (targetPlacement == NULL || targetPlacement->shardState != FILE_FINALIZED)
{ {
uint64 placementId = 0;
SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner, SendCommandListToWorkerInSingleTransaction(nodeName, nodePort, tableOwner,
ddlCommandList); ddlCommandList);
if (targetPlacement == NULL) if (targetPlacement == NULL)
{ {
InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, 0, placementId = GetNextPlacementId();
InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, 0,
nodeName, nodePort); nodeName, nodePort);
} }
else else
{ {
UpdateShardPlacementState(targetPlacement->placementId, FILE_FINALIZED); placementId = targetPlacement->placementId;
UpdateShardPlacementState(placementId, FILE_FINALIZED);
}
/*
* Although ReplicateShardToAllWorkers is used only for reference tables,
* during the upgrade phase, the placements are created before the table is
* marked as a reference table. All metadata (including the placement
* metadata) will be copied to workers after all reference table changed
* are finished.
*/
if (ShouldSyncTableMetadata(shardInterval->relationId))
{
char *placementCommand = PlacementUpsertCommand(shardId, placementId,
FILE_FINALIZED, 0,
nodeName, nodePort);
SendCommandToWorkers(WORKERS_WITH_METADATA, placementCommand);
} }
} }
} }
@ -354,10 +402,17 @@ DeleteAllReferenceTablePlacementsFromNode(char *workerName, uint32 workerPort)
List *shardIntervalList = LoadShardIntervalList(referenceTableId); List *shardIntervalList = LoadShardIntervalList(referenceTableId);
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList); ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
uint64 shardId = shardInterval->shardId; uint64 shardId = shardInterval->shardId;
uint64 placementId = INVALID_PLACEMENT_ID;
StringInfo deletePlacementCommand = makeStringInfo();
LockShardDistributionMetadata(shardId, ExclusiveLock); LockShardDistributionMetadata(shardId, ExclusiveLock);
DeleteShardPlacementRow(shardId, workerName, workerPort); placementId = DeleteShardPlacementRow(shardId, workerName, workerPort);
appendStringInfo(deletePlacementCommand,
"DELETE FROM pg_dist_shard_placement WHERE placementid=%lu",
placementId);
SendCommandToWorkers(WORKERS_WITH_METADATA, deletePlacementCommand->data);
} }
} }

View File

@ -97,6 +97,7 @@ extern bool SchemaNode(void);
/* Function declarations local to the distributed module */ /* Function declarations local to the distributed module */
extern bool CStoreTable(Oid relationId); extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void); extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName); extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableDDLEvents(Oid relationId);
extern List * GetTableForeignConstraintCommands(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId);

View File

@ -31,6 +31,9 @@ extern List * ShardListInsertCommand(List *shardIntervalList);
extern char * NodeDeleteCommand(uint32 nodeId); extern char * NodeDeleteCommand(uint32 nodeId);
extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId); extern char * ColocationIdUpdateCommand(Oid relationId, uint32 colocationId);
extern char * CreateSchemaDDLCommand(Oid schemaId); extern char * CreateSchemaDDLCommand(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, char *nodeName, uint32 nodePort);
extern void CreateTableMetadataOnWorkers(Oid relationId);
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node" #define DELETE_ALL_NODES "TRUNCATE pg_dist_node"
@ -38,6 +41,16 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
"SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition" "SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition"
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'" #define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)" #define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)"
#define UPSERT_PLACEMENT "INSERT INTO pg_dist_shard_placement " \
"(shardid, shardstate, shardlength, " \
"nodename, nodeport, placementid) " \
"VALUES (%lu, %d, %lu, %s, %d, %lu) " \
"ON CONFLICT (placementid) DO UPDATE SET " \
"shardid = EXCLUDED.shardid, " \
"shardstate = EXCLUDED.shardstate, " \
"shardlength = EXCLUDED.shardlength, " \
"nodename = EXCLUDED.nodename, " \
"nodeport = EXCLUDED.nodeport"
#endif /* METADATA_SYNC_H */ #endif /* METADATA_SYNC_H */

View File

@ -8,6 +8,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1310000;
SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_placement_id
\gset \gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function -- Create the necessary test utility function
CREATE FUNCTION master_metadata_snapshot() CREATE FUNCTION master_metadata_snapshot()
RETURNS text[] RETURNS text[]
@ -23,8 +25,8 @@ SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
--------------+------------+---------+--------------+---------- --------------+------------+---------+--------------+----------
(0 rows) (0 rows)
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and -- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries -- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot()); SELECT unnest(master_metadata_snapshot());
unnest unnest
---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
@ -1120,7 +1122,7 @@ SELECT create_distributed_table('mx_table', 'a');
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
master_add_node master_add_node
--------------------------------- ---------------------------------
(3,3,localhost,57638,default,f) (4,4,localhost,57638,default,f)
(1 row) (1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port); SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
@ -1148,19 +1150,19 @@ SELECT * FROM mx_table ORDER BY a;
\c - mx_user - :worker_2_port \c - mx_user - :worker_2_port
SELECT nextval('mx_table_b_seq'); SELECT nextval('mx_table_b_seq');
nextval nextval
----------------- ------------------
844424930131969 1125899906842625
(1 row) (1 row)
INSERT INTO mx_table (a) VALUES (39); INSERT INTO mx_table (a) VALUES (39);
INSERT INTO mx_table (a) VALUES (40); INSERT INTO mx_table (a) VALUES (40);
SELECT * FROM mx_table ORDER BY a; SELECT * FROM mx_table ORDER BY a;
a | b a | b
----+----------------- ----+------------------
37 | 281474976710658 37 | 281474976710658
38 | 281474976710659 38 | 281474976710659
39 | 844424930131970 39 | 1125899906842626
40 | 844424930131971 40 | 1125899906842627
(4 rows) (4 rows)
\c - mx_user - :master_port \c - mx_user - :master_port
@ -1181,12 +1183,161 @@ DROP USER mx_user;
DROP USER mx_user; DROP USER mx_user;
\c - - - :worker_2_port \c - - - :worker_2_port
DROP USER mx_user; DROP USER mx_user;
-- Check that create_reference_table creates the metadata on workers
\c - - - :master_port
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
create_reference_table
------------------------
(1 row)
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
\c - - - :worker_1_port
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
SELECT
logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport
FROM
pg_dist_partition
NATURAL JOIN pg_dist_shard
NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_ref'::regclass;
logicalrelid | partmethod | repmodel | shardid | placementid | nodename | nodeport
--------------+------------+----------+---------+-------------+-----------+----------
mx_ref | n | t | 1310183 | 100184 | localhost | 57638
mx_ref | n | t | 1310183 | 100183 | localhost | 57637
(2 rows)
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Check that DDL commands are propagated to reference tables on workers
\c - - - :master_port
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
CREATE INDEX mx_ref_index ON mx_ref(col_1);
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
col_3 | numeric | default 0
Indexes:
"mx_ref_index" btree (col_1)
\c - - - :worker_1_port
\d mx_ref
Table "public.mx_ref"
Column | Type | Modifiers
--------+---------+-----------
col_1 | integer |
col_2 | text |
col_3 | numeric | default 0
Indexes:
"mx_ref_index" btree (col_1)
-- Check that metada is cleaned successfully upon drop table
\c - - - :master_port
DROP TABLE mx_ref;
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid;
logicalrelid | shardid | shardstorage | shardminvalue | shardmaxvalue
--------------+---------+--------------+---------------+---------------
(0 rows)
SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
shardid | shardstate | shardlength | nodename | nodeport | placementid
---------+------------+-------------+----------+----------+-------------
(0 rows)
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node
--------------------
(1 row)
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
create_reference_table
------------------------
(1 row)
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
(1 row)
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
(1 row)
\c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "mx_ref" to all workers
master_add_node
---------------------------------
(5,5,localhost,57638,default,f)
(1 row)
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
1310184 | localhost | 57638
(2 rows)
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
shardid | nodename | nodeport
---------+-----------+----------
1310184 | localhost | 57637
1310184 | localhost | 57638
(2 rows)
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Cleanup -- Cleanup
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1 NOTICE: drop cascades to constraint mx_fk_constraint_2 on table mx_test_schema_1.mx_table_1
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node stop_metadata_sync_to_node
---------------------------- ----------------------------
@ -1202,5 +1353,7 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
RESET citus.shard_count; RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol; RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -1018,6 +1018,9 @@ ORDER BY s.logicalrelid, sp.shardstate;
reference_failure_test | 1 | 2 reference_failure_test | 1 | 2
(1 row) (1 row)
-- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests -- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port \c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -10,6 +10,13 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000;
-- create copy of pg_dist_shard_placement to reload after the test -- create copy of pg_dist_shard_placement to reload after the test
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- remove non-existing node -- remove non-existing node
SELECT master_remove_node('localhost', 55555); SELECT master_remove_node('localhost', 55555);
ERROR: could not find valid entry for node "localhost:55555" ERROR: could not find valid entry for node "localhost:55555"
@ -78,6 +85,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node
-------------------- --------------------
@ -112,6 +140,25 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0 1380000 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- remove same node twice -- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
ERROR: could not find valid entry for node "localhost:57638" ERROR: could not find valid entry for node "localhost:57638"
@ -153,6 +200,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node
@ -190,6 +258,26 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
-- remove node in a transaction and COMMIT -- remove node in a transaction and COMMIT
-- status before master_remove_node -- status before master_remove_node
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
@ -220,6 +308,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node
@ -256,6 +365,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0 1380000 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -294,6 +423,27 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN; BEGIN;
INSERT INTO remove_node_reference_table VALUES(1); INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -338,6 +488,32 @@ SELECT * FROM remove_node_reference_table;
1 1
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
SELECT * FROM remove_node_reference_table;
column1
---------
1
(1 row)
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -376,6 +552,26 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
(1 row)
\c - - - :master_port
BEGIN; BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int; ALTER TABLE remove_node_reference_table ADD column2 int;
NOTICE: using one-phase commit for distributed DDL commands NOTICE: using one-phase commit for distributed DDL commands
@ -415,6 +611,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0 1380000 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- verify table structure is changed -- verify table structure is changed
\d remove_node_reference_table \d remove_node_reference_table
Table "public.remove_node_reference_table" Table "public.remove_node_reference_table"
@ -485,6 +701,28 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
master_remove_node master_remove_node
-------------------- --------------------
@ -519,6 +757,26 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0 1380000 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -561,6 +819,28 @@ WHERE colocationid IN
1380000 | 1 | 2 | 0 1380000 | 1 | 2 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
1
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1380000 | 1 | 0 | localhost | 57638
1380001 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
SELECT master_disable_node('localhost', :worker_2_port); SELECT master_disable_node('localhost', :worker_2_port);
master_disable_node master_disable_node
--------------------- ---------------------
@ -595,6 +875,25 @@ WHERE colocationid IN
1380000 | 1 | 1 | 0 1380000 | 1 | 1 | 0
(1 row) (1 row)
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
count
-------
0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+----------+----------
(0 rows)
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
NOTICE: Replicating reference table "remove_node_reference_table" to all workers NOTICE: Replicating reference table "remove_node_reference_table" to all workers
@ -608,6 +907,12 @@ NOTICE: Replicating reference table "table1" to all workers
DROP TABLE remove_node_reference_table; DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1; DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE; DROP SCHEMA remove_node_reference_table_schema CASCADE;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)
-- reload pg_dist_shard_placement table -- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement; DROP TABLE tmp_shard_placement;

View File

@ -360,6 +360,8 @@ SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
(1 row) (1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int); CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node -- status before master_add_node
SELECT SELECT
@ -388,11 +390,12 @@ SELECT
FROM FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
logicalrelid | partmethod | colocationid | repmodel logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+---------- -----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t replicate_reference_table_reference_one | n | 1370004 | t
replicate_reference_table_hash | h | 1370005 | s replicate_reference_table_hash | h | 1370005 | c
(2 rows) (2 rows)
BEGIN; BEGIN;
@ -448,7 +451,9 @@ SELECT
FROM FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
logicalrelid | partmethod | colocationid | repmodel logicalrelid | partmethod | colocationid | repmodel
-----------------------------------------+------------+--------------+---------- -----------------------------------------+------------+--------------+----------
replicate_reference_table_reference_one | n | 1370004 | t replicate_reference_table_reference_one | n | 1370004 | t

View File

@ -46,7 +46,7 @@ SELECT master_create_empty_shard('test_truncate_append');
(1 row) (1 row)
-- verify 3 shards are presents -- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid;
shardid shardid
--------- ---------
1210000 1210000
@ -113,7 +113,7 @@ SELECT count(*) FROM test_truncate_range;
(1 row) (1 row)
-- verify 3 shards are presents -- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
shardid shardid
--------- ---------
1210003 1210003
@ -130,7 +130,7 @@ SELECT count(*) FROM test_truncate_range;
(1 row) (1 row)
-- verify 3 shards are still present -- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
shardid shardid
--------- ---------
1210003 1210003
@ -190,7 +190,7 @@ SELECT count(*) FROM test_truncate_hash;
(1 row) (1 row)
-- verify 4 shards are present -- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid shardid
--------- ---------
(0 rows) (0 rows)
@ -221,7 +221,7 @@ SELECT count(*) FROM test_truncate_hash;
(1 row) (1 row)
-- verify 4 shards are still presents -- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
shardid shardid
--------- ---------
1210006 1210006

View File

@ -28,6 +28,13 @@ SELECT create_distributed_table('mx_table_2', 'col_1');
(1 row) (1 row)
CREATE TABLE mx_ref_table (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref_table');
create_reference_table
------------------------
(1 row)
-- Check that the created tables are colocated MX tables -- Check that the created tables are colocated MX tables
SELECT logicalrelid, repmodel, colocationid SELECT logicalrelid, repmodel, colocationid
FROM pg_dist_partition FROM pg_dist_partition
@ -46,6 +53,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
(1 row) (1 row)
COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
INSERT INTO mx_ref_table VALUES (-37, 'morbi');
INSERT INTO mx_ref_table VALUES (-78, 'sapien');
INSERT INTO mx_ref_table VALUES (-34, 'augue');
SELECT * FROM mx_table ORDER BY col_1; SELECT * FROM mx_table ORDER BY col_1;
col_1 | col_2 | col_3 col_1 | col_2 | col_3
-------+----------+------- -------+----------+-------
@ -98,6 +108,38 @@ SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass;
5 5
(1 row) (1 row)
-- INSERT/UPDATE/DELETE/COPY on reference tables
SELECT * FROM mx_ref_table ORDER BY col_1;
col_1 | col_2
-------+--------
-78 | sapien
-37 | morbi
-34 | augue
(3 rows)
INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum');
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
DELETE FROM mx_ref_table WHERE col_1 = -78;
ERROR: cannot perform distributed planning for the given modification
DETAIL: Modifications to reference tables are supported only from the schema node.
COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
ERROR: operation is not allowed on this node
HINT: Connect to the schema node and run it again.
SELECT * FROM mx_ref_table ORDER BY col_1;
col_1 | col_2
-------+--------
-78 | sapien
-37 | morbi
-34 | augue
(3 rows)
\c - - - :master_port
DROP TABLE mx_ref_table;
\c - - - :worker_1_port
-- DDL commands -- DDL commands
\d mx_table \d mx_table
Table "public.mx_table" Table "public.mx_table"

View File

@ -12,6 +12,7 @@ SELECT upgrade_to_reference_table('upgrade_reference_table_local');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_local" is not distributed. DETAIL: Relation "upgrade_reference_table_local" is not distributed.
HINT: Instead, you can use; create_reference_table('upgrade_reference_table_local'); HINT: Instead, you can use; create_reference_table('upgrade_reference_table_local');
DROP TABLE upgrade_reference_table_local;
-- test with table which has more than one shard -- test with table which has more than one shard
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
@ -24,6 +25,7 @@ SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'colum
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_multiple_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. DETAIL: Relation "upgrade_reference_table_multiple_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables.
DROP TABLE upgrade_reference_table_multiple_shard;
-- test with table which has no shard -- test with table which has no shard
CREATE TABLE upgrade_reference_table_no_shard(column1 int); CREATE TABLE upgrade_reference_table_no_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
@ -35,6 +37,7 @@ SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', '
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_no_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables. DETAIL: Relation "upgrade_reference_table_no_shard" shard count is not one. Only relations with one shard can be upgraded to reference tables.
DROP TABLE upgrade_reference_table_no_shard;
-- test with table with foreign keys -- test with table with foreign keys
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
@ -52,12 +55,17 @@ SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'
(1 row) (1 row)
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_referenced" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. DETAIL: Relation "upgrade_reference_table_referenced" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables.
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_referencing" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables. DETAIL: Relation "upgrade_reference_table_referencing" is part of a foreign constraint. Foreign key constraints are not allowed from or to reference tables.
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
-- test with no healthy placements -- test with no healthy placements
CREATE TABLE upgrade_reference_table_unhealthy(column1 int); CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
@ -66,9 +74,11 @@ SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
(1 row) (1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
ERROR: could not find any healthy placement for shard 1360006 ERROR: could not find any healthy placement for shard 1360006
DROP TABLE upgrade_reference_table_unhealthy;
-- test with table containing composite type -- test with table containing composite type
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
\c - - - :worker_1_port \c - - - :worker_1_port
@ -83,9 +93,11 @@ SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
(1 row) (1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
ERROR: type "public.upgrade_test_composite_type" does not exist ERROR: type "public.upgrade_test_composite_type" does not exist
CONTEXT: while executing command on localhost:57638 CONTEXT: while executing command on localhost:57638
DROP TABLE upgrade_reference_table_composite;
-- test with reference table -- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int); CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference'); SELECT create_reference_table('upgrade_reference_table_reference');
@ -97,6 +109,7 @@ SELECT create_reference_table('upgrade_reference_table_reference');
SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
ERROR: cannot upgrade to reference table ERROR: cannot upgrade to reference table
DETAIL: Relation "upgrade_reference_table_reference" is already a reference table DETAIL: Relation "upgrade_reference_table_reference" is already a reference table
DROP TABLE upgrade_reference_table_reference;
-- test valid cases, append distributed table -- test valid cases, append distributed table
CREATE TABLE upgrade_reference_table_append(column1 int); CREATE TABLE upgrade_reference_table_append(column1 int);
SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append'); SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append');
@ -166,7 +179,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_append'::regclass; logicalrelid = 'upgrade_reference_table_append'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
n | t | 1360002 | t n | t | 1360005 | t
(1 row) (1 row)
SELECT SELECT
@ -188,7 +201,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass); WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0 1360005 | 1 | 2 | 0
(1 row) (1 row)
SELECT SELECT
@ -206,6 +219,8 @@ ORDER BY
1360009 | 1 | 0 | localhost | 57638 1360009 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
DROP TABLE upgrade_reference_table_append;
-- test valid cases, shard exists at one worker -- test valid cases, shard exists at one worker
CREATE TABLE upgrade_reference_table_one_worker(column1 int); CREATE TABLE upgrade_reference_table_one_worker(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
@ -214,6 +229,7 @@ SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1')
(1 row) (1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -223,7 +239,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass; logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360001 | s h | f | 1360006 | c
(1 row) (1 row)
SELECT SELECT
@ -245,7 +261,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23 1360006 | 1 | 1 | 23
(1 row) (1 row)
SELECT SELECT
@ -275,7 +291,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_worker'::regclass; logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
n | t | 1360002 | t n | t | 1360007 | t
(1 row) (1 row)
SELECT SELECT
@ -297,7 +313,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass); WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0 1360007 | 1 | 2 | 0
(1 row) (1 row)
SELECT SELECT
@ -315,6 +331,8 @@ ORDER BY
1360010 | 1 | 0 | localhost | 57638 1360010 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
DROP TABLE upgrade_reference_table_one_worker;
-- test valid cases, shard exists at both workers but one is unhealthy -- test valid cases, shard exists at both workers but one is unhealthy
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
@ -334,7 +352,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360003 | c h | f | 1360008 | c
(1 row) (1 row)
SELECT SELECT
@ -356,7 +374,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360003 | 1 | 2 | 23 1360008 | 1 | 2 | 23
(1 row) (1 row)
SELECT SELECT
@ -389,7 +407,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass; logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
n | t | 1360002 | t n | t | 1360009 | t
(1 row) (1 row)
SELECT SELECT
@ -411,7 +429,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass); WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0 1360009 | 1 | 2 | 0
(1 row) (1 row)
SELECT SELECT
@ -429,6 +447,8 @@ ORDER BY
1360011 | 1 | 0 | localhost | 57638 1360011 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
DROP TABLE upgrade_reference_table_one_unhealthy;
-- test valid cases, shard exists at both workers and both are healthy -- test valid cases, shard exists at both workers and both are healthy
CREATE TABLE upgrade_reference_table_both_healthy(column1 int); CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1'); SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1');
@ -446,7 +466,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360004 | c h | f | 1360010 | c
(1 row) (1 row)
SELECT SELECT
@ -468,7 +488,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360004 | 1 | 2 | 23 1360010 | 1 | 2 | 23
(1 row) (1 row)
SELECT SELECT
@ -501,7 +521,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass; logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
n | t | 1360002 | t n | t | 1360011 | t
(1 row) (1 row)
SELECT SELECT
@ -523,7 +543,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass); WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0 1360011 | 1 | 2 | 0
(1 row) (1 row)
SELECT SELECT
@ -541,6 +561,8 @@ ORDER BY
1360012 | 1 | 0 | localhost | 57638 1360012 | 1 | 0 | localhost | 57638
(2 rows) (2 rows)
DROP TABLE upgrade_reference_table_both_healthy;
-- test valid cases, do it in transaction and ROLLBACK -- test valid cases, do it in transaction and ROLLBACK
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
@ -550,6 +572,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_rollback',
(1 row) (1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -559,7 +582,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360001 | s h | f | 1360012 | c
(1 row) (1 row)
SELECT SELECT
@ -581,7 +604,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23 1360012 | 1 | 1 | 23
(1 row) (1 row)
SELECT SELECT
@ -613,7 +636,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360001 | s h | f | 1360012 | c
(1 row) (1 row)
SELECT SELECT
@ -627,6 +650,9 @@ WHERE
1360013 | f | f 1360013 | f | f
(1 row) (1 row)
-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation
VACUUM ANALYZE pg_dist_colocation;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
WHERE colocationid IN WHERE colocationid IN
@ -635,7 +661,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23 1360012 | 1 | 1 | 23
(1 row) (1 row)
SELECT SELECT
@ -650,6 +676,8 @@ WHERE shardid IN
1360013 | 1 | 0 | localhost | 57637 1360013 | 1 | 0 | localhost | 57637
(1 row) (1 row)
DROP TABLE upgrade_reference_table_transaction_rollback;
-- test valid cases, do it in transaction and COMMIT -- test valid cases, do it in transaction and COMMIT
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
@ -659,6 +687,7 @@ SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'c
(1 row) (1 row)
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
@ -668,7 +697,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
h | f | 1360001 | s h | f | 1360014 | c
(1 row) (1 row)
SELECT SELECT
@ -690,7 +719,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360001 | 1 | 1 | 23 1360014 | 1 | 1 | 23
(1 row) (1 row)
SELECT SELECT
@ -722,7 +751,7 @@ WHERE
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass; logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
partmethod | partkeyisnull | colocationid | repmodel partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+---------- ------------+---------------+--------------+----------
n | t | 1360002 | t n | t | 1360015 | t
(1 row) (1 row)
SELECT SELECT
@ -744,7 +773,7 @@ WHERE colocationid IN
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass); WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------ --------------+------------+-------------------+------------------------
1360002 | 1 | 2 | 0 1360015 | 1 | 2 | 0
(1 row) (1 row)
SELECT SELECT
@ -771,19 +800,286 @@ Table "public.upgrade_reference_table_transaction_commit_1360014"
column1 | integer | column1 | integer |
\c - - - :master_port \c - - - :master_port
-- drop used tables to clean the workspace
DROP TABLE upgrade_reference_table_local;
DROP TABLE upgrade_reference_table_multiple_shard;
DROP TABLE upgrade_reference_table_no_shard;
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
DROP TABLE upgrade_reference_table_unhealthy;
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
DROP TABLE upgrade_reference_table_reference;
DROP TABLE upgrade_reference_table_append;
DROP TABLE upgrade_reference_table_one_worker;
DROP TABLE upgrade_reference_table_one_unhealthy;
DROP TABLE upgrade_reference_table_both_healthy;
DROP TABLE upgrade_reference_table_transaction_rollback;
DROP TABLE upgrade_reference_table_transaction_commit; DROP TABLE upgrade_reference_table_transaction_commit;
-- create an mx table
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
create_distributed_table
--------------------------
(1 row)
-- verify that streaming replicated tables cannot be upgraded to reference tables
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360016 | s
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360015 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360016 | 1 | 1 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
(1 row)
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
ERROR: cannot upgrade to reference table
DETAIL: Upgrade is only supported for statement-based replicated tables but "upgrade_reference_table_mx" is streaming replicated
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360016 | s
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360015 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360016 | 1 | 1 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360015 | 1 | 0 | localhost | 57637
(1 row)
DROP TABLE upgrade_reference_table_mx;
-- test valid cases, do it with MX
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
create_distributed_table
--------------------------
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE nodeport = :worker_2_port AND
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
-----------------------------
(1 row)
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
h | f | 1360017 | c
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | f | f
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360017 | 1 | 2 | 23
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 3 | 0 | localhost | 57638
(2 rows)
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
upgrade_to_reference_table
----------------------------
(1 row)
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360018 | t
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | t | t
(1 row)
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
colocationid | shardcount | replicationfactor | distributioncolumntype
--------------+------------+-------------------+------------------------
1360018 | 1 | 2 | 0
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
-- situation on metadata worker
\c - - - :worker_1_port
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
partmethod | partkeyisnull | colocationid | repmodel
------------+---------------+--------------+----------
n | t | 1360018 | t
(1 row)
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
shardid | shardminvalueisnull | shardmaxvalueisnull
---------+---------------------+---------------------
1360016 | t | t
(1 row)
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
shardid | shardstate | shardlength | nodename | nodeport
---------+------------+-------------+-----------+----------
1360016 | 1 | 0 | localhost | 57637
1360016 | 1 | 0 | localhost | 57638
(2 rows)
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
stop_metadata_sync_to_node
----------------------------
(1 row)

View File

@ -13,6 +13,9 @@ SELECT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq') AS last_pla
\gset \gset
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART 100000;
SELECT nextval('pg_catalog.pg_dist_groupid_seq') AS last_group_id \gset
SELECT nextval('pg_catalog.pg_dist_node_nodeid_seq') AS last_node_id \gset
-- Create the necessary test utility function -- Create the necessary test utility function
CREATE FUNCTION master_metadata_snapshot() CREATE FUNCTION master_metadata_snapshot()
RETURNS text[] RETURNS text[]
@ -25,8 +28,8 @@ COMMENT ON FUNCTION master_metadata_snapshot()
-- Show that none of the existing tables are qualified to be MX tables -- Show that none of the existing tables are qualified to be MX tables
SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s'; SELECT * FROM pg_dist_partition WHERE partmethod='h' AND repmodel='s';
-- Show that, with no MX tables, metadata snapshot contains only the delete commands and -- Show that, with no MX tables, metadata snapshot contains only the delete commands,
-- pg_dist_node entries -- pg_dist_node entries and reference tables
SELECT unnest(master_metadata_snapshot()); SELECT unnest(master_metadata_snapshot());
-- Create a test table with constraints and SERIAL -- Create a test table with constraints and SERIAL
@ -506,11 +509,83 @@ DROP USER mx_user;
\c - - - :worker_2_port \c - - - :worker_2_port
DROP USER mx_user; DROP USER mx_user;
-- Check that create_reference_table creates the metadata on workers
\c - - - :master_port
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT
logicalrelid, partmethod, repmodel, shardid, placementid, nodename, nodeport
FROM
pg_dist_partition
NATURAL JOIN pg_dist_shard
NATURAL JOIN pg_dist_shard_placement
WHERE
logicalrelid = 'mx_ref'::regclass;
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
-- Check that DDL commands are propagated to reference tables on workers
\c - - - :master_port
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
CREATE INDEX mx_ref_index ON mx_ref(col_1);
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
-- Check that metada is cleaned successfully upon drop table
\c - - - :master_port
DROP TABLE mx_ref;
\d mx_ref
\c - - - :worker_1_port
\d mx_ref
SELECT * FROM pg_dist_shard WHERE shardid=:ref_table_shardid;
SELECT * FROM pg_dist_shard_placement WHERE shardid=:ref_table_shardid;
-- Check that master_add_node propagates the metadata about new placements of a reference table
\c - - - :master_port
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
SELECT master_remove_node('localhost', :worker_2_port);
CREATE TABLE mx_ref (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref');
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port
SELECT master_add_node('localhost', :worker_2_port);
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :worker_1_port
SELECT shardid, nodename, nodeport
FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
WHERE logicalrelid='mx_ref'::regclass;
\c - - - :master_port
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);
DROP TABLE tmp_shard_placement;
-- Cleanup -- Cleanup
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_test_schema_2.mx_table_2 CASCADE; DROP TABLE mx_test_schema_2.mx_table_2 CASCADE;
DROP TABLE mx_test_schema_1.mx_table_1 CASCADE; DROP TABLE mx_test_schema_1.mx_table_1 CASCADE;
DROP TABLE mx_testing_schema.mx_test_table; DROP TABLE mx_testing_schema.mx_test_table;
DROP TABLE mx_ref;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
@ -518,5 +593,7 @@ RESET citus.shard_count;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;
RESET citus.multi_shard_commit_protocol; RESET citus.multi_shard_commit_protocol;
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id; ALTER SEQUENCE pg_catalog.pg_dist_shard_placement_placementid_seq RESTART :last_placement_id;

View File

@ -758,6 +758,10 @@ AND s.logicalrelid = 'reference_failure_test'::regclass
GROUP BY s.logicalrelid, sp.shardstate GROUP BY s.logicalrelid, sp.shardstate
ORDER BY s.logicalrelid, sp.shardstate; ORDER BY s.logicalrelid, sp.shardstate;
-- connect back to the worker and set rename the test_user back
\c - :default_user - :worker_1_port
ALTER USER test_user_new RENAME TO test_user;
-- connect back to the master with the proper user to continue the tests -- connect back to the master with the proper user to continue the tests
\c - :default_user - :master_port \c - :default_user - :master_port
DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second; DROP TABLE reference_modifying_xacts, hash_modifying_xacts, hash_modifying_xacts_second;

View File

@ -14,6 +14,8 @@ ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART 1380000;
CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; CREATE TABLE tmp_shard_placement AS SELECT * FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port; DELETE FROM pg_dist_shard_placement WHERE nodeport = :worker_2_port;
-- make worker 1 receive metadata changes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- remove non-existing node -- remove non-existing node
SELECT master_remove_node('localhost', 55555); SELECT master_remove_node('localhost', 55555);
@ -53,6 +55,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
-- status after master_remove_node -- status after master_remove_node
@ -72,6 +87,18 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- remove same node twice -- remove same node twice
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -98,6 +125,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
ROLLBACK; ROLLBACK;
@ -119,6 +159,18 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- remove node in a transaction and COMMIT -- remove node in a transaction and COMMIT
@ -139,6 +191,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN; BEGIN;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
COMMIT; COMMIT;
@ -160,6 +225,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
@ -182,6 +260,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN; BEGIN;
INSERT INTO remove_node_reference_table VALUES(1); INSERT INTO remove_node_reference_table VALUES(1);
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -207,6 +298,21 @@ WHERE colocationid IN
--verify the data is inserted --verify the data is inserted
SELECT * FROM remove_node_reference_table; SELECT * FROM remove_node_reference_table;
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
SELECT * FROM remove_node_reference_table;
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
@ -230,6 +336,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
BEGIN; BEGIN;
ALTER TABLE remove_node_reference_table ADD column2 int; ALTER TABLE remove_node_reference_table ADD column2 int;
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
@ -252,6 +371,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- verify table structure is changed -- verify table structure is changed
\d remove_node_reference_table \d remove_node_reference_table
@ -290,6 +422,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_remove_node('localhost', :worker_2_port); SELECT master_remove_node('localhost', :worker_2_port);
-- status after master_remove_node -- status after master_remove_node
@ -309,6 +454,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass); WHERE logicalrelid = 'remove_node_reference_table_schema.table1'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
@ -334,6 +492,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
SELECT master_disable_node('localhost', :worker_2_port); SELECT master_disable_node('localhost', :worker_2_port);
-- status after master_disable_node -- status after master_disable_node
@ -353,6 +524,19 @@ WHERE colocationid IN
FROM pg_dist_partition FROM pg_dist_partition
WHERE logicalrelid = 'remove_node_reference_table'::regclass); WHERE logicalrelid = 'remove_node_reference_table'::regclass);
\c - - - :worker_1_port
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM
pg_dist_shard_placement
WHERE
nodeport = :worker_2_port;
\c - - - :master_port
-- re-add the node for next tests -- re-add the node for next tests
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
@ -362,6 +546,7 @@ DROP TABLE remove_node_reference_table;
DROP TABLE remove_node_reference_table_schema.table1; DROP TABLE remove_node_reference_table_schema.table1;
DROP SCHEMA remove_node_reference_table_schema CASCADE; DROP SCHEMA remove_node_reference_table_schema CASCADE;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
-- reload pg_dist_shard_placement table -- reload pg_dist_shard_placement table
INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement); INSERT INTO pg_dist_shard_placement (SELECT * FROM tmp_shard_placement);

View File

@ -231,6 +231,9 @@ SET citus.shard_replication_factor TO 1;
CREATE TABLE replicate_reference_table_hash(column1 int); CREATE TABLE replicate_reference_table_hash(column1 int);
SELECT create_distributed_table('replicate_reference_table_hash', 'column1'); SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
CREATE TABLE replicate_reference_table_reference_two(column1 int); CREATE TABLE replicate_reference_table_reference_two(column1 int);
-- status before master_add_node -- status before master_add_node
@ -253,7 +256,8 @@ SELECT
FROM FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY logicalrelid;
BEGIN; BEGIN;
SELECT master_add_node('localhost', :worker_2_port); SELECT master_add_node('localhost', :worker_2_port);
@ -283,7 +287,9 @@ SELECT
FROM FROM
pg_dist_partition pg_dist_partition
WHERE WHERE
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two'); logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
ORDER BY
logicalrelid;
DROP TABLE replicate_reference_table_reference_one; DROP TABLE replicate_reference_table_reference_one;
DROP TABLE replicate_reference_table_hash; DROP TABLE replicate_reference_table_hash;

View File

@ -31,7 +31,7 @@ SELECT master_create_empty_shard('test_truncate_append');
SELECT master_create_empty_shard('test_truncate_append'); SELECT master_create_empty_shard('test_truncate_append');
-- verify 3 shards are presents -- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_append'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_append; TRUNCATE TABLE test_truncate_append;
@ -79,7 +79,7 @@ INSERT INTO test_truncate_range values (100);
SELECT count(*) FROM test_truncate_range; SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are presents -- verify 3 shards are presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_range; TRUNCATE TABLE test_truncate_range;
@ -87,7 +87,7 @@ TRUNCATE TABLE test_truncate_range;
SELECT count(*) FROM test_truncate_range; SELECT count(*) FROM test_truncate_range;
-- verify 3 shards are still present -- verify 3 shards are still present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_range'::regclass ORDER BY shardid;
-- verify that truncate can be aborted -- verify that truncate can be aborted
INSERT INTO test_truncate_range VALUES (1); INSERT INTO test_truncate_range VALUES (1);
@ -117,7 +117,7 @@ INSERT INTO test_truncate_hash values (100);
SELECT count(*) FROM test_truncate_hash; SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are present -- verify 4 shards are present
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
TRUNCATE TABLE test_truncate_hash; TRUNCATE TABLE test_truncate_hash;
@ -136,7 +136,7 @@ TRUNCATE TABLE test_truncate_hash;
SELECT count(*) FROM test_truncate_hash; SELECT count(*) FROM test_truncate_hash;
-- verify 4 shards are still presents -- verify 4 shards are still presents
SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass; SELECT shardid FROM pg_dist_shard where logicalrelid = 'test_truncate_hash'::regclass ORDER BY shardid;
-- verify that truncate can be aborted -- verify that truncate can be aborted
INSERT INTO test_truncate_hash VALUES (1); INSERT INTO test_truncate_hash VALUES (1);

View File

@ -25,6 +25,9 @@ SELECT create_distributed_table('mx_table', 'col_1');
CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL); CREATE TABLE mx_table_2 (col_1 int, col_2 text, col_3 BIGSERIAL);
SELECT create_distributed_table('mx_table_2', 'col_1'); SELECT create_distributed_table('mx_table_2', 'col_1');
CREATE TABLE mx_ref_table (col_1 int, col_2 text);
SELECT create_reference_table('mx_ref_table');
-- Check that the created tables are colocated MX tables -- Check that the created tables are colocated MX tables
SELECT logicalrelid, repmodel, colocationid SELECT logicalrelid, repmodel, colocationid
FROM pg_dist_partition FROM pg_dist_partition
@ -41,6 +44,10 @@ COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
65832, 'amet' 65832, 'amet'
\. \.
INSERT INTO mx_ref_table VALUES (-37, 'morbi');
INSERT INTO mx_ref_table VALUES (-78, 'sapien');
INSERT INTO mx_ref_table VALUES (-34, 'augue');
SELECT * FROM mx_table ORDER BY col_1; SELECT * FROM mx_table ORDER BY col_1;
-- Try commands from metadata worker -- Try commands from metadata worker
@ -73,6 +80,18 @@ INSERT INTO pg_dist_shard SELECT * FROM pg_dist_shard_temp;
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass; SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='mx_table'::regclass;
-- INSERT/UPDATE/DELETE/COPY on reference tables
SELECT * FROM mx_ref_table ORDER BY col_1;
INSERT INTO mx_ref_table (col_1, col_2) VALUES (-6, 'vestibulum');
UPDATE mx_ref_table SET col_2 = 'habitant' WHERE col_1 = -37;
DELETE FROM mx_ref_table WHERE col_1 = -78;
COPY mx_ref_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
SELECT * FROM mx_ref_table ORDER BY col_1;
\c - - - :master_port
DROP TABLE mx_ref_table;
\c - - - :worker_1_port
-- DDL commands -- DDL commands
\d mx_table \d mx_table
CREATE INDEX mx_test_index ON mx_table(col_1); CREATE INDEX mx_test_index ON mx_table(col_1);

View File

@ -11,17 +11,20 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
-- test with not distributed table -- test with not distributed table
CREATE TABLE upgrade_reference_table_local(column1 int); CREATE TABLE upgrade_reference_table_local(column1 int);
SELECT upgrade_to_reference_table('upgrade_reference_table_local'); SELECT upgrade_to_reference_table('upgrade_reference_table_local');
DROP TABLE upgrade_reference_table_local;
-- test with table which has more than one shard -- test with table which has more than one shard
SET citus.shard_count TO 4; SET citus.shard_count TO 4;
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int); CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1'); SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1');
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard'); SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
DROP TABLE upgrade_reference_table_multiple_shard;
-- test with table which has no shard -- test with table which has no shard
CREATE TABLE upgrade_reference_table_no_shard(column1 int); CREATE TABLE upgrade_reference_table_no_shard(column1 int);
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append'); SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard'); SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
DROP TABLE upgrade_reference_table_no_shard;
-- test with table with foreign keys -- test with table with foreign keys
SET citus.shard_count TO 1; SET citus.shard_count TO 1;
@ -32,14 +35,23 @@ SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1')
CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1)); CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1));
SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1'); SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1');
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced'); SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing'); SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
-- test with no healthy placements -- test with no healthy placements
CREATE TABLE upgrade_reference_table_unhealthy(column1 int); CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1'); SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006; UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1360006;
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy'); SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
DROP TABLE upgrade_reference_table_unhealthy;
-- test with table containing composite type -- test with table containing composite type
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text); CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
@ -52,12 +64,15 @@ SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type); CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type);
SELECT create_distributed_table('upgrade_reference_table_composite', 'column1'); SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
SELECT upgrade_to_reference_table('upgrade_reference_table_composite'); SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
DROP TABLE upgrade_reference_table_composite;
-- test with reference table -- test with reference table
CREATE TABLE upgrade_reference_table_reference(column1 int); CREATE TABLE upgrade_reference_table_reference(column1 int);
SELECT create_reference_table('upgrade_reference_table_reference'); SELECT create_reference_table('upgrade_reference_table_reference');
SELECT upgrade_to_reference_table('upgrade_reference_table_reference'); SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
DROP TABLE upgrade_reference_table_reference;
-- test valid cases, append distributed table -- test valid cases, append distributed table
CREATE TABLE upgrade_reference_table_append(column1 int); CREATE TABLE upgrade_reference_table_append(column1 int);
@ -134,9 +149,12 @@ WHERE shardid IN
ORDER BY ORDER BY
nodeport; nodeport;
DROP TABLE upgrade_reference_table_append;
-- test valid cases, shard exists at one worker -- test valid cases, shard exists at one worker
CREATE TABLE upgrade_reference_table_one_worker(column1 int); CREATE TABLE upgrade_reference_table_one_worker(column1 int);
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1'); SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
@ -202,6 +220,8 @@ WHERE shardid IN
ORDER BY ORDER BY
nodeport; nodeport;
DROP TABLE upgrade_reference_table_one_worker;
-- test valid cases, shard exists at both workers but one is unhealthy -- test valid cases, shard exists at both workers but one is unhealthy
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int); CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
@ -274,6 +294,8 @@ WHERE shardid IN
ORDER BY ORDER BY
nodeport; nodeport;
DROP TABLE upgrade_reference_table_one_unhealthy;
-- test valid cases, shard exists at both workers and both are healthy -- test valid cases, shard exists at both workers and both are healthy
CREATE TABLE upgrade_reference_table_both_healthy(column1 int); CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1'); SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1');
@ -344,10 +366,13 @@ WHERE shardid IN
ORDER BY ORDER BY
nodeport; nodeport;
DROP TABLE upgrade_reference_table_both_healthy;
-- test valid cases, do it in transaction and ROLLBACK -- test valid cases, do it in transaction and ROLLBACK
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int); CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1'); SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
@ -396,8 +421,11 @@ SELECT
FROM FROM
pg_dist_shard pg_dist_shard
WHERE WHERE
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass; logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
-- eliminate the duplicate intermediate duplicate rows in pg_dist_colocation
VACUUM ANALYZE pg_dist_colocation;
SELECT * SELECT *
FROM pg_dist_colocation FROM pg_dist_colocation
WHERE colocationid IN WHERE colocationid IN
@ -413,10 +441,13 @@ WHERE shardid IN
FROM pg_dist_shard FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass); WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
DROP TABLE upgrade_reference_table_transaction_rollback;
-- test valid cases, do it in transaction and COMMIT -- test valid cases, do it in transaction and COMMIT
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int); CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1'); SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1');
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
-- situation before upgrade_reference_table -- situation before upgrade_reference_table
SELECT SELECT
@ -489,20 +520,185 @@ ORDER BY
\d upgrade_reference_table_transaction_commit_* \d upgrade_reference_table_transaction_commit_*
\c - - - :master_port \c - - - :master_port
-- drop used tables to clean the workspace
DROP TABLE upgrade_reference_table_local;
DROP TABLE upgrade_reference_table_multiple_shard;
DROP TABLE upgrade_reference_table_no_shard;
DROP TABLE upgrade_reference_table_referencing;
DROP TABLE upgrade_reference_table_referenced;
DROP TABLE upgrade_reference_table_unhealthy;
DROP TABLE upgrade_reference_table_composite;
DROP TYPE upgrade_test_composite_type;
DROP TABLE upgrade_reference_table_reference;
DROP TABLE upgrade_reference_table_append;
DROP TABLE upgrade_reference_table_one_worker;
DROP TABLE upgrade_reference_table_one_unhealthy;
DROP TABLE upgrade_reference_table_both_healthy;
DROP TABLE upgrade_reference_table_transaction_rollback;
DROP TABLE upgrade_reference_table_transaction_commit; DROP TABLE upgrade_reference_table_transaction_commit;
-- create an mx table
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 1;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
-- verify that streaming replicated tables cannot be upgraded to reference tables
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
DROP TABLE upgrade_reference_table_mx;
-- test valid cases, do it with MX
SET citus.shard_count TO 1;
SET citus.shard_replication_factor TO 2;
CREATE TABLE upgrade_reference_table_mx(column1 int);
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
UPDATE pg_dist_shard_placement SET shardstate = 3
WHERE nodeport = :worker_2_port AND
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
-- situation before upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
-- situation after upgrade_reference_table
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT *
FROM pg_dist_colocation
WHERE colocationid IN
(SELECT colocationid
FROM pg_dist_partition
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
-- situation on metadata worker
\c - - - :worker_1_port
SELECT
partmethod, (partkey IS NULL) as partkeyisnull, colocationid, repmodel
FROM
pg_dist_partition
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
FROM
pg_dist_shard
WHERE
logicalrelid = 'upgrade_reference_table_mx'::regclass;
SELECT
shardid, shardstate, shardlength, nodename, nodeport
FROM pg_dist_shard_placement
WHERE shardid IN
(SELECT shardid
FROM pg_dist_shard
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
ORDER BY nodeport;
\c - - - :master_port
DROP TABLE upgrade_reference_table_mx;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);