Not undistribute Citus local table when converting it to a reference table / single-shard table

pull/7131/head
Onur Tirtir 2023-08-18 12:45:12 +03:00
parent 34e3119b48
commit a830862717
42 changed files with 2916 additions and 123 deletions

View File

@ -60,6 +60,7 @@
#include "distributed/reference_table_utils.h"
#include "distributed/relation_access_tracking.h"
#include "distributed/remote_commands.h"
#include "distributed/replicate_none_dist_table_shard.h"
#include "distributed/resource_lock.h"
#include "distributed/shard_cleaner.h"
#include "distributed/shard_rebalancer.h"
@ -139,6 +140,10 @@ static CitusTableParams DecideCitusTableParams(CitusTableType tableType,
distributedTableParams);
static void CreateCitusTable(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams);
static void ConvertCitusLocalTableToTableType(Oid relationId,
CitusTableType tableType,
DistributedTableParams *
distributedTableParams);
static void CreateHashDistributedTableShards(Oid relationId, int shardCount,
Oid colocatedTableId, bool localTableEmpty);
static void CreateSingleShardTableShard(Oid relationId, Oid colocatedTableId,
@ -159,7 +164,7 @@ static void EnsureCitusTableCanBeCreated(Oid relationOid);
static void PropagatePrerequisiteObjectsForDistributedTable(Oid relationId);
static void EnsureDistributedSequencesHaveOneType(Oid relationId,
List *seqInfoList);
static void CopyLocalDataIntoShards(Oid relationId);
static void CopyLocalDataIntoShards(Oid distributedTableId);
static List * TupleDescColumnNameList(TupleDesc tupleDescriptor);
#if (PG_VERSION_NUM >= PG_VERSION_15)
@ -172,10 +177,10 @@ static bool is_valid_numeric_typmod(int32 typmod);
static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc,
Var *distributionColumn);
static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty);
static void DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate);
static uint64 DoCopyFromLocalTableIntoShards(Relation distributedRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate);
static void ErrorIfTemporaryTable(Oid relationId);
static void ErrorIfForeignTable(Oid relationOid);
static void SendAddLocalTableToMetadataCommandOutsideTransaction(Oid relationId);
@ -1019,19 +1024,29 @@ CreateDistributedTable(Oid relationId, char *distributionColumnName,
/*
* CreateReferenceTable is a wrapper around CreateCitusTable that creates a
* reference table.
* CreateReferenceTable creates a reference table.
*/
void
CreateReferenceTable(Oid relationId)
{
CreateCitusTable(relationId, REFERENCE_TABLE, NULL);
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/*
* Create the shard of given Citus local table on workers to convert
* it into a reference table.
*/
ConvertCitusLocalTableToTableType(relationId, REFERENCE_TABLE, NULL);
}
else
{
CreateCitusTable(relationId, REFERENCE_TABLE, NULL);
}
}
/*
* CreateSingleShardTable is a wrapper around CreateCitusTable that creates a
* single shard distributed table that doesn't have a shard key.
* CreateSingleShardTable creates a single shard distributed table that
* doesn't have a shard key.
*/
void
CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
@ -1042,7 +1057,21 @@ CreateSingleShardTable(Oid relationId, ColocationParam colocationParam)
.shardCountIsStrict = true,
.distributionColumnName = NULL
};
CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams);
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/*
* Create the shard of given Citus local table on appropriate node
* and drop the local one to convert it into a single-shard distributed
* table.
*/
ConvertCitusLocalTableToTableType(relationId, SINGLE_SHARD_DISTRIBUTED,
&distributedTableParams);
}
else
{
CreateCitusTable(relationId, SINGLE_SHARD_DISTRIBUTED, &distributedTableParams);
}
}
@ -1097,7 +1126,7 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
/*
* EnsureTableNotDistributed errors out when relation is a citus table but
* we don't want to ask user to first undistribute their citus local tables
* when creating reference or distributed tables from them.
* when creating distributed tables from them.
* For this reason, here we undistribute citus local tables beforehand.
* But since UndistributeTable does not support undistributing relations
* involved in foreign key relationships, we first drop foreign keys that
@ -1107,6 +1136,13 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
List *originalForeignKeyRecreationCommands = NIL;
if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
/*
* We use ConvertCitusLocalTableToTableType instead of CreateCitusTable
* to create a reference table or a single-shard table from a Citus
* local table.
*/
Assert(tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED);
/* store foreign key creation commands that relation is involved */
originalForeignKeyRecreationCommands =
GetFKeyCreationCommandsRelationInvolvedWithTableType(relationId,
@ -1319,6 +1355,206 @@ CreateCitusTable(Oid relationId, CitusTableType tableType,
}
/*
* ConvertCitusLocalTableToTableType converts given Citus local table to
* given table type.
*
* This only supports converting Citus local tables to reference tables
* (by replicating the shard to workers) and single-shard distributed
* tables (by replicating the shard to the appropriate worker and dropping
* the local one).
*/
static void
ConvertCitusLocalTableToTableType(Oid relationId, CitusTableType tableType,
DistributedTableParams *distributedTableParams)
{
if (!IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
{
ereport(ERROR, (errmsg("table is not a local table added to metadata")));
}
if (tableType != REFERENCE_TABLE && tableType != SINGLE_SHARD_DISTRIBUTED)
{
ereport(ERROR, (errmsg("table type is not supported for conversion")));
}
if ((tableType == SINGLE_SHARD_DISTRIBUTED) != (distributedTableParams != NULL))
{
ereport(ERROR, (errmsg("distributed table params must be provided "
"when creating a distributed table and must "
"not be otherwise")));
}
EnsureCitusTableCanBeCreated(relationId);
Relation relation = try_relation_open(relationId, ExclusiveLock);
if (relation == NULL)
{
ereport(ERROR, (errmsg("could not create Citus table: "
"relation does not exist")));
}
relation_close(relation, NoLock);
if (tableType == SINGLE_SHARD_DISTRIBUTED && ShardReplicationFactor > 1)
{
ereport(ERROR, (errmsg("could not create single shard table: "
"citus.shard_replication_factor is greater than 1"),
errhint("Consider setting citus.shard_replication_factor to 1 "
"and try again")));
}
LockRelationOid(relationId, ExclusiveLock);
Var *distributionColumn = NULL;
CitusTableParams citusTableParams = DecideCitusTableParams(tableType,
distributedTableParams);
uint32 colocationId = INVALID_COLOCATION_ID;
if (distributedTableParams &&
distributedTableParams->colocationParam.colocationParamType ==
COLOCATE_WITH_COLOCATION_ID)
{
colocationId = distributedTableParams->colocationParam.colocationId;
}
else
{
colocationId = ColocationIdForNewTable(relationId, tableType,
distributedTableParams,
distributionColumn);
}
/* check constraints etc. on table based on new distribution params */
EnsureRelationCanBeDistributed(relationId, distributionColumn,
citusTableParams.distributionMethod,
colocationId, citusTableParams.replicationModel);
/*
* Regarding the foreign key relationships that given relation is involved,
* EnsureRelationCanBeDistributed() only checks the ones where the relation
* is the referencing table. And given that the table at hand is a Citus
* local table, right now it may only be referenced by a reference table
* or a Citus local table. However, given that neither of those two cases
* are not applicable for a distributed table, here we throw an error if
* that's the case.
*
* Note that we don't need to check the same if we're creating a reference
* table from a Citus local table because all the foreign keys referencing
* Citus local tables are supported by reference tables.
*/
if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
EnsureNoFKeyFromTableType(relationId, INCLUDE_CITUS_LOCAL_TABLES |
INCLUDE_REFERENCE_TABLES);
}
EnsureReferenceTablesExistOnAllNodes();
LockColocationId(colocationId, ShareLock);
/*
* When converting to a single shard table, we want to drop the placement
* on the coordinator, but only if transferring to a different node. In that
* case, shouldDropLocalPlacement is true. When converting to a reference
* table, we always keep the placement on the coordinator, so for reference
* tables shouldDropLocalPlacement is always false.
*/
bool shouldDropLocalPlacement = false;
List *targetNodeList = NIL;
if (tableType == SINGLE_SHARD_DISTRIBUTED)
{
uint32 targetNodeId = SingleShardTableColocationNodeId(colocationId);
if (targetNodeId != CoordinatorNodeIfAddedAsWorkerOrError()->nodeId)
{
bool missingOk = false;
WorkerNode *targetNode = FindNodeWithNodeId(targetNodeId, missingOk);
targetNodeList = list_make1(targetNode);
shouldDropLocalPlacement = true;
}
}
else if (tableType == REFERENCE_TABLE)
{
targetNodeList = ActivePrimaryNonCoordinatorNodeList(ShareLock);
targetNodeList = SortList(targetNodeList, CompareWorkerNodes);
}
bool autoConverted = false;
UpdateNoneDistTableMetadataGlobally(
relationId, citusTableParams.replicationModel,
colocationId, autoConverted);
/* create the shard placement on workers and insert into pg_dist_placement globally */
if (list_length(targetNodeList) > 0)
{
NoneDistTableReplicateCoordinatorPlacement(relationId, targetNodeList);
}
if (shouldDropLocalPlacement)
{
/*
* We don't yet drop the local placement before handling partitions.
* Otherewise, local shard placements of the partitions will be gone
* before we create them on workers.
*
* However, we need to delete the related entry from pg_dist_placement
* before distributing partitions (if any) because we need a sane metadata
* state before doing so.
*/
NoneDistTableDeleteCoordinatorPlacement(relationId);
}
/* if this table is partitioned table, distribute its partitions too */
if (PartitionedTable(relationId))
{
/* right now we don't allow partitioned reference tables */
Assert(tableType == SINGLE_SHARD_DISTRIBUTED);
List *partitionList = PartitionList(relationId);
char *parentRelationName = generate_qualified_relation_name(relationId);
/*
* When there are many partitions, each call to
* ConvertCitusLocalTableToTableType accumulates used memory.
* Create and free citus_per_partition_context for each call.
*/
MemoryContext citusPartitionContext =
AllocSetContextCreate(CurrentMemoryContext,
"citus_per_partition_context",
ALLOCSET_DEFAULT_SIZES);
MemoryContext oldContext = MemoryContextSwitchTo(citusPartitionContext);
Oid partitionRelationId = InvalidOid;
foreach_oid(partitionRelationId, partitionList)
{
MemoryContextReset(citusPartitionContext);
DistributedTableParams childDistributedTableParams = {
.colocationParam = {
.colocationParamType = COLOCATE_WITH_TABLE_LIKE_OPT,
.colocateWithTableName = parentRelationName,
},
.shardCount = distributedTableParams->shardCount,
.shardCountIsStrict = false,
.distributionColumnName = distributedTableParams->distributionColumnName,
};
ConvertCitusLocalTableToTableType(partitionRelationId, tableType,
&childDistributedTableParams);
}
MemoryContextSwitchTo(oldContext);
MemoryContextDelete(citusPartitionContext);
}
if (shouldDropLocalPlacement)
{
NoneDistTableDropCoordinatorPlacementTable(relationId);
}
}
/*
* DecideCitusTableParams decides CitusTableParams based on given CitusTableType
* and DistributedTableParams if it's a distributed table.
@ -2381,9 +2617,37 @@ RegularTable(Oid relationId)
/*
* CopyLocalDataIntoShards copies data from the local table, which is hidden
* after converting it to a distributed table, into the shards of the distributed
* table. For partitioned tables, this functions returns without copying the data
* CopyLocalDataIntoShards is a wrapper around CopyFromLocalTableIntoDistTable
* to copy data from the local table, which is hidden after converting it to a
* distributed table, into the shards of the distributed table.
*
* After copying local data into the distributed table, the local data remains
* in place and should be truncated at a later time.
*/
static void
CopyLocalDataIntoShards(Oid distributedTableId)
{
uint64 rowsCopied = CopyFromLocalTableIntoDistTable(distributedTableId,
distributedTableId);
if (rowsCopied > 0)
{
char *qualifiedRelationName =
generate_qualified_relation_name(distributedTableId);
ereport(NOTICE, (errmsg("copying the data has completed"),
errdetail("The local data in the table is no longer visible, "
"but is still on disk."),
errhint("To remove the local data, run: SELECT "
"truncate_local_data_after_distributing_table($$%s$$)",
qualifiedRelationName)));
}
}
/*
* CopyFromLocalTableIntoDistTable copies data from given local table into
* the shards of given distributed table.
*
* For partitioned tables, this functions returns without copying the data
* because we call this function for both partitioned tables and its partitions.
* Returning early saves us from copying data to workers twice.
*
@ -2393,35 +2657,30 @@ RegularTable(Oid relationId)
* opens a connection and starts a COPY for each shard placement that will have
* data.
*
* We could call the planner and executor here and send the output to the
* DestReceiver, but we are in a tricky spot here since Citus is already
* intercepting queries on this table in the planner and executor hooks and we
* want to read from the local table. To keep it simple, we perform a heap scan
* directly on the table.
* We assume that the local table might indeed be a distributed table and the
* caller would want to read the local data from the shell table in that case.
* For this reason, to keep it simple, we perform a heap scan directly on the
* table instead of using SELECT.
*
* Any writes on the table that are started during this operation will be handled
* as distributed queries once the current transaction commits. SELECTs will
* continue to read from the local table until the current transaction commits,
* after which new SELECTs will be handled as distributed queries.
*
* After copying local data into the distributed table, the local data remains
* in place and should be truncated at a later time.
* We read from the table and pass each tuple to the CitusCopyDestReceiver which
* opens a connection and starts a COPY for each shard placement that will have
* data.
*/
static void
CopyLocalDataIntoShards(Oid distributedRelationId)
uint64
CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId)
{
/* take an ExclusiveLock to block all operations except SELECT */
Relation distributedRelation = table_open(distributedRelationId, ExclusiveLock);
Relation localRelation = table_open(localTableId, ExclusiveLock);
/*
* Skip copying from partitioned tables, we will copy the data from
* partition to partition's shards.
*/
if (PartitionedTable(distributedRelationId))
if (PartitionedTable(distributedTableId))
{
table_close(distributedRelation, NoLock);
table_close(localRelation, NoLock);
return;
return 0;
}
/*
@ -2435,35 +2694,43 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
*/
PushActiveSnapshot(GetLatestSnapshot());
/* get the table columns */
TupleDesc tupleDescriptor = RelationGetDescr(distributedRelation);
TupleTableSlot *slot = table_slot_create(distributedRelation, NULL);
List *columnNameList = TupleDescColumnNameList(tupleDescriptor);
Relation distributedRelation = RelationIdGetRelation(distributedTableId);
/* get the table columns for distributed table */
TupleDesc destTupleDescriptor = RelationGetDescr(distributedRelation);
List *columnNameList = TupleDescColumnNameList(destTupleDescriptor);
RelationClose(distributedRelation);
int partitionColumnIndex = INVALID_PARTITION_COLUMN_INDEX;
/* determine the partition column in the tuple descriptor */
Var *partitionColumn = PartitionColumn(distributedRelationId, 0);
Var *partitionColumn = PartitionColumn(distributedTableId, 0);
if (partitionColumn != NULL)
{
partitionColumnIndex = partitionColumn->varattno - 1;
}
/* create tuple slot for local relation */
TupleDesc sourceTupleDescriptor = RelationGetDescr(localRelation);
TupleTableSlot *slot = table_slot_create(localRelation, NULL);
/* initialise per-tuple memory context */
EState *estate = CreateExecutorState();
ExprContext *econtext = GetPerTupleExprContext(estate);
econtext->ecxt_scantuple = slot;
const bool nonPublishableData = false;
DestReceiver *copyDest =
(DestReceiver *) CreateCitusCopyDestReceiver(distributedRelationId,
(DestReceiver *) CreateCitusCopyDestReceiver(distributedTableId,
columnNameList,
partitionColumnIndex,
estate, NULL, nonPublishableData);
/* initialise state for writing to shards, we'll open connections on demand */
copyDest->rStartup(copyDest, 0, tupleDescriptor);
copyDest->rStartup(copyDest, 0, sourceTupleDescriptor);
DoCopyFromLocalTableIntoShards(distributedRelation, copyDest, slot, estate);
uint64 rowsCopied = DoCopyFromLocalTableIntoShards(localRelation, copyDest, slot,
estate);
/* finish writing into the shards */
copyDest->rShutdown(copyDest);
@ -2472,24 +2739,28 @@ CopyLocalDataIntoShards(Oid distributedRelationId)
/* free memory and close the relation */
ExecDropSingleTupleTableSlot(slot);
FreeExecutorState(estate);
table_close(distributedRelation, NoLock);
table_close(localRelation, NoLock);
PopActiveSnapshot();
return rowsCopied;
}
/*
* DoCopyFromLocalTableIntoShards performs a copy operation
* from local tables into shards.
*
* Returns the number of rows copied.
*/
static void
DoCopyFromLocalTableIntoShards(Relation distributedRelation,
static uint64
DoCopyFromLocalTableIntoShards(Relation localRelation,
DestReceiver *copyDest,
TupleTableSlot *slot,
EState *estate)
{
/* begin reading from local table */
TableScanDesc scan = table_beginscan(distributedRelation, GetActiveSnapshot(), 0,
TableScanDesc scan = table_beginscan(localRelation, GetActiveSnapshot(), 0,
NULL);
MemoryContext oldContext = MemoryContextSwitchTo(GetPerTupleMemoryContext(estate));
@ -2524,22 +2795,12 @@ DoCopyFromLocalTableIntoShards(Relation distributedRelation,
ereport(DEBUG1, (errmsg("Copied " UINT64_FORMAT " rows", rowsCopied)));
}
if (rowsCopied > 0)
{
char *qualifiedRelationName =
generate_qualified_relation_name(RelationGetRelid(distributedRelation));
ereport(NOTICE, (errmsg("copying the data has completed"),
errdetail("The local data in the table is no longer visible, "
"but is still on disk."),
errhint("To remove the local data, run: SELECT "
"truncate_local_data_after_distributing_table($$%s$$)",
qualifiedRelationName)));
}
MemoryContextSwitchTo(oldContext);
/* finish reading from the local table */
table_endscan(scan);
return rowsCopied;
}

View File

@ -839,6 +839,22 @@ GetForeignConstraintToReferenceTablesCommands(Oid relationId)
}
/*
* GetForeignConstraintToReferenceTablesCommands takes in a relationId, and
* returns the list of foreign constraint commands needed to reconstruct
* foreign key constraints that the table is involved in as the "referenced"
* one and the "referencing" table is a reference table.
*/
List *
GetForeignConstraintFromOtherReferenceTablesCommands(Oid relationId)
{
int flags = INCLUDE_REFERENCED_CONSTRAINTS |
EXCLUDE_SELF_REFERENCES |
INCLUDE_REFERENCE_TABLES;
return GetForeignConstraintCommandsInternal(relationId, flags);
}
/*
* GetForeignConstraintToDistributedTablesCommands takes in a relationId, and
* returns the list of foreign constraint commands needed to reconstruct

View File

@ -103,8 +103,6 @@ static List * GetRelationIdListFromRangeVarList(List *rangeVarList, LOCKMODE loc
static bool AlterTableCommandTypeIsTrigger(AlterTableType alterTableType);
static bool AlterTableDropsForeignKey(AlterTableStmt *alterTableStatement);
static void ErrorIfUnsupportedAlterTableStmt(AlterTableStmt *alterTableStatement);
static List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
static bool AlterInvolvesPartitionColumn(AlterTableStmt *alterTableStatement,
AlterTableCmd *command);
static bool AlterColumnInvolvesIdentityColumn(AlterTableStmt *alterTableStatement,
@ -3858,7 +3856,7 @@ SetupExecutionModeForAlterTable(Oid relationId, AlterTableCmd *command)
* applied. rightRelationId is the relation id of either index or distributed table which
* given command refers to.
*/
static List *
List *
InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString)
{

View File

@ -150,6 +150,7 @@ static char * RemoteSchemaIdExpressionById(Oid schemaId);
static char * RemoteSchemaIdExpressionByName(char *schemaName);
static char * RemoteTypeIdExpression(Oid typeId);
static char * RemoteCollationIdExpression(Oid colocationId);
static char * RemoteTableIdExpression(Oid relationId);
PG_FUNCTION_INFO_V1(start_metadata_sync_to_all_nodes);
@ -167,6 +168,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_partition_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata_legacy);
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
@ -176,6 +178,7 @@ PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
PG_FUNCTION_INFO_V1(citus_internal_add_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_delete_tenant_schema);
PG_FUNCTION_INFO_V1(citus_internal_update_none_dist_table_metadata);
static bool got_SIGTERM = false;
@ -3449,6 +3452,28 @@ citus_internal_add_placement_metadata(PG_FUNCTION_ARGS)
}
/*
* citus_internal_add_placement_metadata is an internal UDF to
* delete a row from pg_dist_placement.
*/
Datum
citus_internal_delete_placement_metadata(PG_FUNCTION_ARGS)
{
PG_ENSURE_ARGNOTNULL(0, "placement_id");
int64 placementId = PG_GETARG_INT64(0);
if (!ShouldSkipMetadataChecks())
{
/* this UDF is not allowed allowed for executing as a separate command */
EnsureCoordinatorInitiatedOperation();
}
DeleteShardPlacementRow(placementId);
PG_RETURN_VOID();
}
/*
* citus_internal_add_placement_metadata_legacy is the old function that will be dropped.
*/
@ -3836,6 +3861,40 @@ citus_internal_delete_tenant_schema(PG_FUNCTION_ARGS)
}
/*
* citus_internal_update_none_dist_table_metadata is an internal UDF to
* update a row in pg_dist_partition that belongs to given none-distributed
* table.
*/
Datum
citus_internal_update_none_dist_table_metadata(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
PG_ENSURE_ARGNOTNULL(0, "relation_id");
Oid relationId = PG_GETARG_OID(0);
PG_ENSURE_ARGNOTNULL(1, "replication_model");
char replicationModel = PG_GETARG_CHAR(1);
PG_ENSURE_ARGNOTNULL(2, "colocation_id");
uint32 colocationId = PG_GETARG_INT32(2);
PG_ENSURE_ARGNOTNULL(3, "auto_converted");
bool autoConverted = PG_GETARG_BOOL(3);
if (!ShouldSkipMetadataChecks())
{
EnsureCoordinatorInitiatedOperation();
}
UpdateNoneDistTableMetadata(relationId, replicationModel,
colocationId, autoConverted);
PG_RETURN_VOID();
}
/*
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
*/
@ -4017,6 +4076,55 @@ TenantSchemaDeleteCommand(char *schemaName)
}
/*
* UpdateNoneDistTableMetadataCommand returns a command to call
* citus_internal_update_none_dist_table_metadata().
*/
char *
UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(%s, '%c', %u, %s)",
RemoteTableIdExpression(relationId), replicationModel, colocationId,
autoConverted ? "true" : "false");
return command->data;
}
/*
* AddPlacementMetadataCommand returns a command to call
* citus_internal_add_placement_metadata().
*/
char *
AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT citus_internal_add_placement_metadata(%ld, %ld, %d, %ld)",
shardId, shardLength, groupId, placementId);
return command->data;
}
/*
* DeletePlacementMetadataCommand returns a command to call
* citus_internal_delete_placement_metadata().
*/
char *
DeletePlacementMetadataCommand(uint64 placementId)
{
StringInfo command = makeStringInfo();
appendStringInfo(command,
"SELECT pg_catalog.citus_internal_delete_placement_metadata(%ld)",
placementId);
return command->data;
}
/*
* RemoteSchemaIdExpressionById returns an expression in text form that
* can be used to obtain the OID of the schema with given schema id on a
@ -4051,6 +4159,22 @@ RemoteSchemaIdExpressionByName(char *schemaName)
}
/*
* RemoteTableIdExpression returns an expression in text form that
* can be used to obtain the OID of given table on a different node
* when included in a query string.
*/
static char *
RemoteTableIdExpression(Oid relationId)
{
StringInfo regclassExpr = makeStringInfo();
appendStringInfo(regclassExpr, "%s::regclass",
quote_literal_cstr(generate_qualified_relation_name(relationId)));
return regclassExpr->data;
}
/*
* SetMetadataSyncNodesFromNodeList sets list of nodes that needs to be metadata
* synced among given node list into metadataSyncContext.

View File

@ -1398,6 +1398,17 @@ IsActiveShardPlacement(ShardPlacement *shardPlacement)
}
/*
* IsRemoteShardPlacement returns true if the shard placement is on a remote
* node.
*/
bool
IsRemoteShardPlacement(ShardPlacement *shardPlacement)
{
return shardPlacement->groupId != GetLocalGroupId();
}
/*
* IsPlacementOnWorkerNode checks if the shard placement is for to the given
* workenode.
@ -1783,6 +1794,24 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType,
}
/*
* InsertShardPlacementRowGlobally inserts shard placement that has given
* parameters into pg_dist_placement globally.
*/
ShardPlacement *
InsertShardPlacementRowGlobally(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId)
{
InsertShardPlacementRow(shardId, placementId, shardLength, groupId);
char *insertPlacementCommand =
AddPlacementMetadataCommand(shardId, placementId, shardLength, groupId);
SendCommandToWorkersWithMetadata(insertPlacementCommand);
return LoadShardPlacement(shardId, placementId);
}
/*
* InsertShardPlacementRow opens the shard placement system catalog, and inserts
* a new row with the given values into that system catalog. If placementId is
@ -1999,6 +2028,21 @@ DeleteShardRow(uint64 shardId)
}
/*
* DeleteShardPlacementRowGlobally deletes shard placement that has given
* parameters from pg_dist_placement globally.
*/
void
DeleteShardPlacementRowGlobally(uint64 placementId)
{
DeleteShardPlacementRow(placementId);
char *deletePlacementCommand =
DeletePlacementMetadataCommand(placementId);
SendCommandToWorkersWithMetadata(deletePlacementCommand);
}
/*
* DeleteShardPlacementRow opens the shard placement system catalog, finds the placement
* with the given placementId, and deletes it.
@ -2243,6 +2287,93 @@ UpdateDistributionColumn(Oid relationId, char distributionMethod, Var *distribut
}
/*
* UpdateNoneDistTableMetadataGlobally globally updates pg_dist_partition for
* given none-distributed table.
*/
void
UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted)
{
UpdateNoneDistTableMetadata(relationId, replicationModel,
colocationId, autoConverted);
if (ShouldSyncTableMetadata(relationId))
{
char *metadataCommand =
UpdateNoneDistTableMetadataCommand(relationId,
replicationModel,
colocationId,
autoConverted);
SendCommandToWorkersWithMetadata(metadataCommand);
}
}
/*
* UpdateNoneDistTableMetadata locally updates pg_dist_partition for given
* none-distributed table.
*/
void
UpdateNoneDistTableMetadata(Oid relationId, char replicationModel, uint32 colocationId,
bool autoConverted)
{
if (HasDistributionKey(relationId))
{
ereport(ERROR, (errmsg("cannot update metadata for a distributed "
"table that has a distribution column")));
}
ScanKeyData scanKey[1];
int scanKeyCount = 1;
bool indexOK = true;
Datum values[Natts_pg_dist_partition];
bool isnull[Natts_pg_dist_partition];
bool replace[Natts_pg_dist_partition];
Relation pgDistPartition = table_open(DistPartitionRelationId(), RowExclusiveLock);
TupleDesc tupleDescriptor = RelationGetDescr(pgDistPartition);
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_logicalrelid,
BTEqualStrategyNumber, F_OIDEQ, ObjectIdGetDatum(relationId));
SysScanDesc scanDescriptor = systable_beginscan(pgDistPartition,
DistPartitionLogicalRelidIndexId(),
indexOK,
NULL, scanKeyCount, scanKey);
HeapTuple heapTuple = systable_getnext(scanDescriptor);
if (!HeapTupleIsValid(heapTuple))
{
ereport(ERROR, (errmsg("could not find valid entry for Citus table with oid: %u",
relationId)));
}
memset(replace, 0, sizeof(replace));
values[Anum_pg_dist_partition_colocationid - 1] = UInt32GetDatum(colocationId);
isnull[Anum_pg_dist_partition_colocationid - 1] = false;
replace[Anum_pg_dist_partition_colocationid - 1] = true;
values[Anum_pg_dist_partition_repmodel - 1] = CharGetDatum(replicationModel);
isnull[Anum_pg_dist_partition_repmodel - 1] = false;
replace[Anum_pg_dist_partition_repmodel - 1] = true;
values[Anum_pg_dist_partition_autoconverted - 1] = BoolGetDatum(autoConverted);
isnull[Anum_pg_dist_partition_autoconverted - 1] = false;
replace[Anum_pg_dist_partition_autoconverted - 1] = true;
heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace);
CatalogTupleUpdate(pgDistPartition, &heapTuple->t_self, heapTuple);
CitusInvalidateRelcacheByRelid(relationId);
CommandCounterIncrement();
systable_endscan(scanDescriptor);
table_close(pgDistPartition, NoLock);
}
/*
* Check that the current user has `mode` permissions on relationId, error out
* if not. Superusers always have such permissions.

View File

@ -397,13 +397,8 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}
int roundRobinNodeIdx =
EmptySingleShardTableColocationDecideNodeId(colocationId);
char shardStorageType = ShardStorageType(relationId);
text *minHashTokenText = NULL;
@ -412,9 +407,6 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
InsertShardRow(relationId, shardId, shardStorageType,
minHashTokenText, maxHashTokenText);
/* determine the node index based on colocation id */
int roundRobinNodeIdx = colocationId % workerNodeCount;
int replicationFactor = 1;
List *insertedShardPlacements = InsertShardPlacementRows(
relationId,
@ -433,6 +425,30 @@ CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId, uint32 colocatio
}
/*
* EmptySingleShardTableColocationDecideNodeId returns index of the node
* that first shard to be created in given "single-shard table colocation
* group" should be placed on.
*
* This is determined by modulo of the colocation id by the length of the
* list returned by DistributedTablePlacementNodeList().
*/
int
EmptySingleShardTableColocationDecideNodeId(uint32 colocationId)
{
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
int32 workerNodeCount = list_length(workerNodeList);
if (workerNodeCount == 0)
{
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("couldn't find any worker nodes"),
errhint("Add more worker nodes")));
}
return colocationId % workerNodeCount;
}
/*
* CheckHashPartitionedTable looks up the partition information for the given
* tableId and checks if the table is hash partitioned. If not, the function

View File

@ -0,0 +1,301 @@
/*-------------------------------------------------------------------------
*
* replicate_none_dist_table_shard.c
* Routines to replicate shard of none-distributed table to
* a remote node.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "nodes/pg_list.h"
#include "distributed/adaptive_executor.h"
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h"
#include "distributed/deparse_shard_query.h"
#include "distributed/listutils.h"
#include "distributed/replicate_none_dist_table_shard.h"
#include "distributed/shard_utils.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
static void CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId);
static Oid ForeignConstraintGetReferencingTableId(const char *queryString);
static void EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId);
static void SetLocalEnableManualChangesToShard(bool state);
/*
* NoneDistTableReplicateCoordinatorPlacement replicates local (presumably
* coordinator) shard placement of given none-distributed table to given
* target nodes and inserts records for new placements into pg_dist_placement.
*/
void
NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
List *targetNodeList)
{
EnsureCoordinator();
EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId);
/*
* We don't expect callers try to replicate the shard to remote nodes
* if some of the remote nodes have a placement for the shard already.
*/
int64 shardId = GetFirstShardId(noneDistTableId);
List *remoteShardPlacementList =
FilterShardPlacementList(ActiveShardPlacementList(shardId),
IsRemoteShardPlacement);
if (list_length(remoteShardPlacementList) > 0)
{
ereport(ERROR, (errmsg("table already has a remote shard placement")));
}
uint64 shardLength = ShardLength(shardId);
/* insert new placements to pg_dist_placement */
List *insertedPlacementList = NIL;
WorkerNode *targetNode = NULL;
foreach_ptr(targetNode, targetNodeList)
{
ShardPlacement *shardPlacement =
InsertShardPlacementRowGlobally(shardId, GetNextPlacementId(),
shardLength, targetNode->groupId);
/* and save the placement for shard creation on workers */
insertedPlacementList = lappend(insertedPlacementList, shardPlacement);
}
/* create new placements */
bool useExclusiveConnection = false;
CreateShardsOnWorkers(noneDistTableId, insertedPlacementList,
useExclusiveConnection);
/* fetch coordinator placement before deleting it */
Oid localPlacementTableId = GetTableLocalShardOid(noneDistTableId, shardId);
ShardPlacement *coordinatorPlacement =
linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID));
/*
* CreateForeignKeysFromReferenceTablesOnShards and CopyFromLocalTableIntoDistTable
* need to ignore the local placement, hence we temporarily delete it before
* calling them.
*/
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
/* and copy data from local placement to new placements */
CopyFromLocalTableIntoDistTable(
localPlacementTableId, noneDistTableId
);
/*
* CreateShardsOnWorkers only creates the foreign keys where given relation
* is the referencing one, so we need to create the foreign keys where given
* relation is the referenced one as well. We're only interested in the cases
* where the referencing relation is a reference table because the other
* possible table types --i.e., Citus local tables atm-- cannot have placements
* on remote nodes.
*
* Note that we need to create the foreign keys where given relation is the
* referenced one after copying the data so that constraint checks can pass.
*/
CreateForeignKeysFromReferenceTablesOnShards(noneDistTableId);
/* using the same placement id, re-insert the deleted placement */
InsertShardPlacementRowGlobally(shardId, coordinatorPlacement->placementId,
shardLength, COORDINATOR_GROUP_ID);
}
/*
* NoneDistTableDeleteCoordinatorPlacement deletes pg_dist_placement record for
* local (presumably coordinator) shard placement of given none-distributed table.
*/
void
NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId)
{
EnsureCoordinator();
EnsureNoneDistTableWithCoordinatorPlacement(noneDistTableId);
int64 shardId = GetFirstShardId(noneDistTableId);
/* we've already verified that table has a coordinator placement */
ShardPlacement *coordinatorPlacement =
linitial(ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID));
/* remove the old placement from metadata of local node, i.e., coordinator */
DeleteShardPlacementRowGlobally(coordinatorPlacement->placementId);
}
/*
* NoneDistTableDropCoordinatorPlacementTable drops local (presumably coordinator)
* shard placement table of given none-distributed table.
*/
void
NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId)
{
EnsureCoordinator();
if (HasDistributionKey(noneDistTableId))
{
ereport(ERROR, (errmsg("table is not a none-distributed table")));
}
/*
* We undistribute Citus local tables that are not chained with any reference
* tables via foreign keys at the end of the utility hook.
* Here we temporarily set the related GUC to off to disable the logic for
* internally executed DDL's that might invoke this mechanism unnecessarily.
*
* We also temporarily disable citus.enable_manual_changes_to_shards GUC to
* allow given command to modify shard. Note that we disable it only for
* local session because changes made to shards are allowed for Citus internal
* backends anyway.
*/
int save_nestlevel = NewGUCNestLevel();
SetLocalEnableLocalReferenceForeignKeys(false);
SetLocalEnableManualChangesToShard(true);
StringInfo dropShardCommand = makeStringInfo();
int64 shardId = GetFirstShardId(noneDistTableId);
ShardInterval *shardInterval = LoadShardInterval(shardId);
appendStringInfo(dropShardCommand, DROP_REGULAR_TABLE_COMMAND,
ConstructQualifiedShardName(shardInterval));
Task *task = CitusMakeNode(Task);
task->jobId = INVALID_JOB_ID;
task->taskId = INVALID_TASK_ID;
task->taskType = DDL_TASK;
task->replicationModel = REPLICATION_MODEL_INVALID;
SetTaskQueryString(task, dropShardCommand->data);
ShardPlacement *targetPlacement = CitusMakeNode(ShardPlacement);
SetPlacementNodeMetadata(targetPlacement, CoordinatorNodeIfAddedAsWorkerOrError());
task->taskPlacementList = list_make1(targetPlacement);
bool localExecutionSupported = true;
ExecuteUtilityTaskList(list_make1(task), localExecutionSupported);
AtEOXact_GUC(true, save_nestlevel);
}
/*
* CreateForeignKeysFromReferenceTablesOnShards creates foreign keys on shards
* where given none-distributed table is the referenced table and the referencing
* one is a reference table.
*/
static void
CreateForeignKeysFromReferenceTablesOnShards(Oid noneDistTableId)
{
EnsureCoordinator();
if (HasDistributionKey(noneDistTableId))
{
ereport(ERROR, (errmsg("table is not a none-distributed table")));
}
List *ddlCommandList =
GetForeignConstraintFromOtherReferenceTablesCommands(noneDistTableId);
if (list_length(ddlCommandList) == 0)
{
return;
}
List *taskList = NIL;
char *command = NULL;
foreach_ptr(command, ddlCommandList)
{
List *commandTaskList = InterShardDDLTaskList(
ForeignConstraintGetReferencingTableId(command),
noneDistTableId, command
);
taskList = list_concat(taskList, commandTaskList);
}
if (list_length(taskList) == 0)
{
return;
}
bool localExecutionSupported = true;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
}
/*
* ForeignConstraintGetReferencedTableId parses given foreign constraint command and
* extracts refenrencing table id from it.
*/
static Oid
ForeignConstraintGetReferencingTableId(const char *queryString)
{
Node *queryNode = ParseTreeNode(queryString);
if (!IsA(queryNode, AlterTableStmt))
{
ereport(ERROR, (errmsg("command is not an ALTER TABLE statement")));
}
AlterTableStmt *foreignConstraintStmt = (AlterTableStmt *) queryNode;
if (list_length(foreignConstraintStmt->cmds) != 1)
{
ereport(ERROR, (errmsg("command does not contain a single command")));
}
AlterTableCmd *command = (AlterTableCmd *) linitial(foreignConstraintStmt->cmds);
if (command->subtype == AT_AddConstraint)
{
Constraint *constraint = (Constraint *) command->def;
if (constraint && constraint->contype == CONSTR_FOREIGN)
{
bool missingOk = false;
return RangeVarGetRelid(foreignConstraintStmt->relation, NoLock,
missingOk);
}
}
ereport(ERROR, (errmsg("command does not contain a foreign constraint")));
}
/*
* EnsureNoneDistTableWithCoordinatorPlacement throws an error if given
* table is not a none-distributed that has a coordinator placement.
*/
static void
EnsureNoneDistTableWithCoordinatorPlacement(Oid noneDistTableId)
{
if (HasDistributionKey(noneDistTableId))
{
ereport(ERROR, (errmsg("table is not a none-distributed table")));
}
int64 shardId = GetFirstShardId(noneDistTableId);
if (!ActiveShardPlacementListOnGroup(shardId, COORDINATOR_GROUP_ID))
{
ereport(ERROR, (errmsg("table does not have a coordinator placement")));
}
}
/*
* SetLocalEnableManualChangesToShard locally enables
* citus.enable_manual_changes_to_shards GUC.
*/
static void
SetLocalEnableManualChangesToShard(bool state)
{
set_config_option("citus.enable_manual_changes_to_shards",
state ? "on" : "off",
(superuser() ? PGC_SUSET : PGC_USERSET), PGC_S_SESSION,
GUC_ACTION_LOCAL, true, 0, false);
}

View File

@ -4,3 +4,6 @@
#include "udfs/citus_prepare_pg_upgrade/12.1-1.sql"
#include "udfs/citus_finish_pg_upgrade/12.1-1.sql"
#include "udfs/citus_internal_update_none_dist_table_metadata/12.1-1.sql"
#include "udfs/citus_internal_delete_placement_metadata/12.1-1.sql"

View File

@ -3,3 +3,12 @@
-- we have modified the relevant upgrade script to include any_value changes
-- we don't need to upgrade this downgrade path for any_value changes
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op.
DROP FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
relation_id oid, replication_model "char", colocation_id bigint,
auto_converted boolean
);
DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint
);

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME',
$$citus_internal_delete_placement_metadata$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_placement_metadata(bigint)
IS 'Delete placement with given id from pg_dist_placement metadata table.';

View File

@ -0,0 +1,9 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
placement_id bigint)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME',
$$citus_internal_delete_placement_metadata$$;
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_placement_metadata(bigint)
IS 'Delete placement with given id from pg_dist_placement metadata table.';

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
relation_id oid,
replication_model "char",
colocation_id bigint,
auto_converted boolean)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(oid, "char", bigint, boolean)
IS 'Update pg_dist_partition metadata table for given none-distributed table, to convert it to another type of none-distributed table.';

View File

@ -0,0 +1,11 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
relation_id oid,
replication_model "char",
colocation_id bigint,
auto_converted boolean)
RETURNS void
LANGUAGE C
VOLATILE
AS 'MODULE_PATHNAME';
COMMENT ON FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(oid, "char", bigint, boolean)
IS 'Update pg_dist_partition metadata table for given none-distributed table, to convert it to another type of none-distributed table.';

View File

@ -53,6 +53,7 @@ static int CompareShardPlacementsByNode(const void *leftElement,
const void *rightElement);
static uint32 CreateColocationGroupForRelation(Oid sourceRelationId);
static void BreakColocation(Oid sourceRelationId);
static uint32 SingleShardTableGetNodeId(Oid relationId);
/* exports for SQL callable functions */
@ -1231,6 +1232,56 @@ ColocatedTableId(int32 colocationId)
}
/*
* SingleShardTableColocationNodeId takes a colocation id that presumably
* belongs to colocation group used to colocate a set of single-shard
* tables and returns id of the node that stores / is expected to store
* the shards within the colocation group.
*/
uint32
SingleShardTableColocationNodeId(uint32 colocationId)
{
List *tablesInColocationGroup = ColocationGroupTableList(colocationId, 0);
if (list_length(tablesInColocationGroup) == 0)
{
int workerNodeIndex =
EmptySingleShardTableColocationDecideNodeId(colocationId);
List *workerNodeList = DistributedTablePlacementNodeList(RowShareLock);
WorkerNode *workerNode = (WorkerNode *) list_nth(workerNodeList, workerNodeIndex);
return workerNode->nodeId;
}
else
{
Oid colocatedTableId = ColocatedTableId(colocationId);
return SingleShardTableGetNodeId(colocatedTableId);
}
}
/*
* SingleShardTableGetNodeId returns id of the node that stores shard of
* given single-shard table.
*/
static uint32
SingleShardTableGetNodeId(Oid relationId)
{
if (!IsCitusTableType(relationId, SINGLE_SHARD_DISTRIBUTED))
{
ereport(ERROR, (errmsg("table is not a single-shard distributed table")));
}
int64 shardId = GetFirstShardId(relationId);
List *shardPlacementList = ShardPlacementList(shardId);
if (list_length(shardPlacementList) != 1)
{
ereport(ERROR, (errmsg("table shard does not have a single shard placement")));
}
return ((ShardPlacement *) linitial(shardPlacementList))->nodeId;
}
/*
* ColocatedShardIdInRelation returns shardId of the shard from given relation, so that
* returned shard is co-located with given shard.

View File

@ -25,6 +25,7 @@ extern List * ColocatedTableList(Oid distributedTableId);
extern List * ColocatedShardIntervalList(ShardInterval *shardInterval);
extern List * ColocatedNonPartitionShardIntervalList(ShardInterval *shardInterval);
extern Oid ColocatedTableId(int32 colocationId);
extern uint32 SingleShardTableColocationNodeId(uint32 colocationId);
extern uint64 ColocatedShardIdInRelation(Oid relationId, int shardIndex);
uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColumnType,
Oid distributionColumnCollation);

View File

@ -292,6 +292,7 @@ extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
relationId);
extern List * GetReferencingForeignConstaintCommands(Oid relationOid);
extern List * GetForeignConstraintToReferenceTablesCommands(Oid relationId);
extern List * GetForeignConstraintFromOtherReferenceTablesCommands(Oid relationId);
extern List * GetForeignConstraintToDistributedTablesCommands(Oid relationId);
extern List * GetForeignConstraintFromDistributedTablesCommands(Oid relationId);
extern List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
@ -605,6 +606,8 @@ extern void ErrorUnsupportedAlterTableAddColumn(Oid relationId, AlterTableCmd *c
extern void ErrorIfUnsupportedConstraint(Relation relation, char distributionMethod,
char referencingReplicationModel,
Var *distributionColumn, uint32 colocationId);
extern List * InterShardDDLTaskList(Oid leftRelationId, Oid rightRelationId,
const char *commandString);
extern List * AlterTableSchemaStmtObjectAddress(Node *stmt,
bool missing_ok, bool isPostprocess);
extern List * MakeNameListFromRangeVar(const RangeVar *rel);

View File

@ -263,6 +263,7 @@ extern void CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId,
extern void CreateReferenceTableShard(Oid distributedTableId);
extern void CreateSingleShardTableShardWithRoundRobinPolicy(Oid relationId,
uint32 colocationId);
extern int EmptySingleShardTableColocationDecideNodeId(uint32 colocationId);
extern List * WorkerCreateShardCommandList(Oid relationId, uint64 shardId,
List *ddlCommandList);
extern Oid ForeignConstraintGetReferencedTableId(const char *queryString);

View File

@ -139,6 +139,11 @@ extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
extern char * TenantSchemaInsertCommand(Oid schemaId, uint32 colocationId);
extern char * TenantSchemaDeleteCommand(char *schemaName);
extern char * UpdateNoneDistTableMetadataCommand(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted);
extern char * AddPlacementMetadataCommand(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);
extern char * DeletePlacementMetadataCommand(uint64 placementId);
extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList,
bool collectCommands,

View File

@ -324,6 +324,7 @@ extern ShardInterval * CopyShardInterval(ShardInterval *srcInterval);
extern uint64 ShardLength(uint64 shardId);
extern bool NodeGroupHasShardPlacements(int32 groupId);
extern bool IsActiveShardPlacement(ShardPlacement *ShardPlacement);
extern bool IsRemoteShardPlacement(ShardPlacement *shardPlacement);
extern bool IsPlacementOnWorkerNode(ShardPlacement *placement, WorkerNode *workerNode);
extern List * FilterShardPlacementList(List *shardPlacementList, bool (*filter)(
ShardPlacement *));
@ -349,6 +350,10 @@ extern List * RemoveCoordinatorPlacementIfNotSingleNode(List *placementList);
extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType,
text *shardMinValue, text *shardMaxValue);
extern void DeleteShardRow(uint64 shardId);
extern ShardPlacement * InsertShardPlacementRowGlobally(uint64 shardId,
uint64 placementId,
uint64 shardLength,
int32 groupId);
extern uint64 InsertShardPlacementRow(uint64 shardId, uint64 placementId,
uint64 shardLength, int32 groupId);
extern void InsertIntoPgDistPartition(Oid relationId, char distributionMethod,
@ -360,8 +365,13 @@ extern void UpdateDistributionColumnGlobally(Oid relationId, char distributionMe
extern void UpdateDistributionColumn(Oid relationId, char distributionMethod,
Var *distributionColumn, int colocationId);
extern void DeletePartitionRow(Oid distributedRelationId);
extern void UpdateNoneDistTableMetadataGlobally(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted);
extern void UpdateNoneDistTableMetadata(Oid relationId, char replicationModel,
uint32 colocationId, bool autoConverted);
extern void DeleteShardRow(uint64 shardId);
extern void UpdatePlacementGroupId(uint64 placementId, int groupId);
extern void DeleteShardPlacementRowGlobally(uint64 placementId);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void CreateSingleShardTable(Oid relationId, ColocationParam colocationParam);
extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
@ -369,6 +379,7 @@ extern void CreateDistributedTable(Oid relationId, char *distributionColumnName,
bool shardCountIsStrict, char *colocateWithTableName);
extern void CreateReferenceTable(Oid relationId);
extern void CreateTruncateTrigger(Oid relationId);
extern uint64 CopyFromLocalTableIntoDistTable(Oid localTableId, Oid distributedTableId);
extern void EnsureUndistributeTenantTableSafe(Oid relationId, const char *operationName);
extern TableConversionReturn * UndistributeTable(TableConversionParameters *params);
extern void UndistributeTables(List *relationIdList);

View File

@ -0,0 +1,20 @@
/*-------------------------------------------------------------------------
*
* replicate_none_dist_table_shard.h
* Routines to replicate shard of none-distributed table to
* a remote node.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef REPLICA_LOCAL_TABLE_SHARD_H
#define REPLICA_LOCAL_TABLE_SHARD_H
extern void NoneDistTableReplicateCoordinatorPlacement(Oid noneDistTableId,
List *targetNodeList);
extern void NoneDistTableDeleteCoordinatorPlacement(Oid noneDistTableId);
extern void NoneDistTableDropCoordinatorPlacementTable(Oid noneDistTableId);
#endif /* REPLICA_LOCAL_TABLE_SHARD_H */

View File

@ -110,7 +110,7 @@ DEPS = {
),
"create_role_propagation": TestDeps(None, ["multi_cluster_management"]),
"single_node_enterprise": TestDeps(None),
"single_node": TestDeps(None),
"single_node": TestDeps(None, ["multi_test_helpers"]),
"single_node_truncate": TestDeps(None),
"multi_explain": TestDeps(
"base_schedule", ["multi_insert_select_non_pushable_queries"]

View File

@ -1,9 +1,11 @@
SET search_path TO truncate_cascade_tests_schema;
-- Hide detail of truncate error because it might either reference
-- table_with_fk_1 or table_with_fk_2 in the error message.
\set VERBOSITY TERSE
-- Test truncate error on table with dependencies
TRUNCATE table_with_pk;
ERROR: cannot truncate a table referenced in a foreign key constraint
DETAIL: Table "table_with_fk_1" references "table_with_pk".
HINT: Truncate table "table_with_fk_1" at the same time, or use TRUNCATE ... CASCADE.
\set VERBOSITY DEFAULT
-- Test truncate rollback on table with dependencies
SELECT COUNT(*) FROM table_with_fk_1;
count

View File

@ -1201,9 +1201,9 @@ ALTER TABLE reference_table_1 OWNER TO another_user;
SELECT run_command_on_placements('reference_table_1', 'ALTER TABLE %s OWNER TO another_user');
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,1810093,t,"ALTER TABLE")
(localhost,57637,1810093,t,"ALTER TABLE")
(localhost,57638,1810093,t,"ALTER TABLE")
(localhost,57636,1810092,t,"ALTER TABLE")
(localhost,57637,1810092,t,"ALTER TABLE")
(localhost,57638,1810092,t,"ALTER TABLE")
(3 rows)
BEGIN;

View File

@ -366,5 +366,701 @@ BEGIN;
(1 row)
ROLLBACK;
\set VERBOSITY DEFAULT
-- Test the UDFs that we use to convert Citus local tables to single-shard tables and
-- reference tables.
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, true);
ERROR: This is an internal Citus function can only be used in a distributed transaction
SELECT pg_catalog.citus_internal_delete_placement_metadata(1);
ERROR: This is an internal Citus function can only be used in a distributed transaction
CREATE ROLE test_user_create_ref_dist WITH LOGIN;
GRANT ALL ON SCHEMA create_ref_dist_from_citus_local TO test_user_create_ref_dist;
ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_create_ref_dist';
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
SET ROLE test_user_create_ref_dist;
SET citus.next_shard_id TO 1850000;
SET citus.next_placement_id TO 8510000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(null, 't', 1, true);
ERROR: relation_id cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, null, 1, true);
ERROR: replication_model cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', null, true);
ERROR: colocation_id cannot be NULL
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, null);
ERROR: auto_converted cannot be NULL
SELECT pg_catalog.citus_internal_delete_placement_metadata(null);
ERROR: placement_id cannot be NULL
CREATE TABLE udf_test (col_1 int);
SELECT citus_add_local_table_to_metadata('udf_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
BEGIN;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
citus_internal_update_none_dist_table_metadata
---------------------------------------------------------------------
(1 row)
SELECT COUNT(*)=1 FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.udf_test'::regclass AND repmodel = 'k' AND colocationid = 99999 AND autoconverted = true;
?column?
---------------------------------------------------------------------
t
(1 row)
SELECT placementid AS udf_test_placementid FROM pg_dist_shard_placement
WHERE shardid = get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.udf_test') \gset
SELECT pg_catalog.citus_internal_delete_placement_metadata(:udf_test_placementid);
citus_internal_delete_placement_metadata
---------------------------------------------------------------------
(1 row)
SELECT COUNT(*)=0 FROM pg_dist_placement WHERE placementid = :udf_test_placementid;
?column?
---------------------------------------------------------------------
t
(1 row)
ROLLBACK;
RESET ROLE;
DROP TABLE udf_test;
REVOKE ALL ON SCHEMA create_ref_dist_from_citus_local FROM test_user_create_ref_dist;
DROP USER test_user_create_ref_dist;
ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user;
SELECT pg_reload_conf();
pg_reload_conf
---------------------------------------------------------------------
t
(1 row)
SELECT pg_sleep(0.1);
pg_sleep
---------------------------------------------------------------------
(1 row)
-- Test lazy conversion from Citus local to single-shard tables and reference tables.
SET citus.next_shard_id TO 1860000;
SET citus.next_placement_id TO 8520000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SET client_min_messages to ERROR;
INSERT INTO reference_table_1 VALUES (1, 1), (2, 2), (201, 201), (202, 202);
CREATE TABLE citus_local_table_7 (col_1 int UNIQUE);
INSERT INTO citus_local_table_7 VALUES (1), (2), (201), (202);
SELECT citus_add_local_table_to_metadata('citus_local_table_7');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
CREATE TABLE fkey_test (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
INSERT INTO fkey_test VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
SELECT citus_add_local_table_to_metadata('fkey_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- check unsupported foreign key constraints
ALTER TABLE reference_table_1 ADD CONSTRAINT ref_1_col_1_fkey_test_int_col_1 FOREIGN KEY (col_1) REFERENCES fkey_test(int_col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ERROR: relation fkey_test is referenced by a foreign key from reference_table_1
DETAIL: foreign keys from a reference table to a distributed table are not supported.
ALTER TABLE reference_table_1 DROP CONSTRAINT ref_1_col_1_fkey_test_int_col_1;
ALTER TABLE citus_local_table_7 ADD CONSTRAINT citus_local_1_col_1_fkey_test_int_col_1 FOREIGN KEY (col_1) REFERENCES fkey_test(int_col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ERROR: relation fkey_test is referenced by a foreign key from citus_local_table_7
DETAIL: foreign keys from a citus local table to a distributed table are not supported.
ALTER TABLE citus_local_table_7 DROP CONSTRAINT citus_local_1_col_1_fkey_test_int_col_1;
ALTER TABLE fkey_test ADD CONSTRAINT fkey_test_int_col_1_citus_local_1_col_1 FOREIGN KEY (int_col_1) REFERENCES citus_local_table_7(col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ERROR: cannot create foreign key constraint since relations are not colocated or not referencing a reference table
DETAIL: A distributed table can only have foreign keys if it is referencing another colocated hash distributed table or a reference table
ALTER TABLE fkey_test DROP CONSTRAINT fkey_test_int_col_1_citus_local_1_col_1;
CREATE TABLE tbl_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
CREATE INDEX tbl_1_int_col_2_idx ON tbl_1 (int_col_2);
INSERT INTO tbl_1 VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_2_ref_1_col_1 FOREIGN KEY (int_col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_2_tbl_1_int_col_1 FOREIGN KEY (int_col_2) REFERENCES tbl_1(int_col_1);
SELECT citus_add_local_table_to_metadata('tbl_1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_1') AS tbl_1_old_shard_id \gset
SELECT create_distributed_table('tbl_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
-- check data
SELECT * FROM tbl_1 ORDER BY int_col_1;
int_col_1 | text_col_1 | int_col_2
---------------------------------------------------------------------
1 | 1 | 1
2 | 2 | 2
201 | 201 | 201
202 | 202 | 202
(4 rows)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_1', :tbl_1_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_1', 3);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 3 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_1');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
-- test partitioning
CREATE TABLE tbl_2 (
int_col_1 int PRIMARY KEY,
text_col_1 text,
int_col_2 int
) PARTITION BY RANGE (int_col_1);
CREATE TABLE tbl_2_child_1 PARTITION OF tbl_2 FOR VALUES FROM (0) TO (100);
CREATE TABLE tbl_2_child_2 PARTITION OF tbl_2 FOR VALUES FROM (200) TO (300);
INSERT INTO tbl_2 VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
SELECT citus_add_local_table_to_metadata('tbl_2');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
ALTER TABLE tbl_2 ADD CONSTRAINT tbl_2_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_2 ADD CONSTRAINT tbl_2_int_col_2_ref_1_col_1 FOREIGN KEY (int_col_2) REFERENCES reference_table_1(col_1);
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2') AS tbl_2_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2_child_1') AS tbl_2_child_1_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2_child_2') AS tbl_2_child_2_old_shard_id \gset
SELECT create_distributed_table('tbl_2', null, colocate_with=>'tbl_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2', :tbl_2_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2', 1);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_partition_count_on_placements('create_ref_dist_from_citus_local.tbl_2', 2);
verify_partition_count_on_placements
---------------------------------------------------------------------
t
(1 row)
-- verify the same for children
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_1', :tbl_2_child_1_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2_child_1', 1);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2_child_1');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_2');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_2', :tbl_2_child_2_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2_child_2', 1);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2_child_2');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
-- verify that placements of all 4 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:tbl_1_old_shard_id, :tbl_2_old_shard_id, :tbl_2_child_1_old_shard_id, :tbl_2_child_2_old_shard_id
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify the same by executing a router query that targets both tables
SET client_min_messages to DEBUG2;
SELECT COUNT(*) FROM tbl_1, tbl_2;
DEBUG: Creating router plan
count
---------------------------------------------------------------------
16
(1 row)
SET client_min_messages to ERROR;
CREATE TABLE reference_table_3(col_1 INT UNIQUE, col_2 INT UNIQUE);
INSERT INTO reference_table_3 VALUES (1, 1), (2, 2), (201, 201), (202, 202);
CREATE TABLE tbl_3 (
int_col_1 int PRIMARY KEY,
text_col_1 text,
int_col_2 int
) PARTITION BY RANGE (int_col_1);
CREATE TABLE tbl_3_child_1 PARTITION OF tbl_3 FOR VALUES FROM (0) TO (100);
ALTER TABLE tbl_3 ADD CONSTRAINT tbl_3_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_3(col_1);
SELECT create_reference_table('reference_table_3');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO tbl_3 VALUES (1, '1', 1), (2, '2', 2);
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_3') AS tbl_3_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_3_child_1') AS tbl_3_child_1_old_shard_id \gset
SELECT create_distributed_table('tbl_3', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3', :tbl_3_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_3', 1);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 1 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_3');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_partition_count_on_placements('create_ref_dist_from_citus_local.tbl_3', 1);
verify_partition_count_on_placements
---------------------------------------------------------------------
t
(1 row)
-- verify the same for children
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3_child_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3_child_1', :tbl_3_child_1_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_3_child_1', 1);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT cardinality(fkey_names) = 1 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_3_child_1');
verify_fkey_count_on_shards
---------------------------------------------------------------------
t
(1 row)
-- verify that placements of all 2 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:tbl_3_old_shard_id, :tbl_3_child_1_old_shard_id
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify the same by executing a router query that targets the table
SET client_min_messages to DEBUG2;
SELECT COUNT(*) FROM tbl_3;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
count
---------------------------------------------------------------------
2
(1 row)
SET client_min_messages to ERROR;
CREATE TABLE single_shard_conversion_colocated_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_colocated_1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1') AS single_shard_conversion_colocated_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_colocated_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1', :single_shard_conversion_colocated_1_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_colocated_2 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_colocated_2');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2') AS single_shard_conversion_colocated_2_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_colocated_2', null, colocate_with=>'single_shard_conversion_colocated_1');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2', :single_shard_conversion_colocated_2_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
-- make sure that they're created on the same colocation group
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_1'::regclass
)
=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- verify that placements of 2 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:single_shard_conversion_colocated_1_old_shard_id, :single_shard_conversion_colocated_2_old_shard_id
);
?column?
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_noncolocated_1 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_noncolocated_1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1') AS single_shard_conversion_noncolocated_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_noncolocated_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1', :single_shard_conversion_noncolocated_1_old_shard_id, false);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
-- make sure that they're created on different colocation groups
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
-- Test creating a reference table from a Citus local table
-- (ref_table_conversion_test) that has foreign keys from/to Citus
-- local tables and reference tables:
--
-- citus_local_referencing ---------- ----------> citus_local_referenced
-- | ^
-- v |
-- ref_table_conversion_test
-- ^ |
-- | v
-- reference_table_referencing ---------- ----------> reference_table_referenced
--
CREATE TABLE citus_local_referenced(a int PRIMARY KEY);
SELECT citus_add_local_table_to_metadata('citus_local_referenced');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_local_referenced VALUES (1), (2), (3), (4);
CREATE TABLE reference_table_referenced(a int PRIMARY KEY);
SELECT create_reference_table('reference_table_referenced');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO reference_table_referenced VALUES (1), (2), (3), (4);
CREATE TABLE ref_table_conversion_test (
a int PRIMARY KEY
);
SELECT citus_add_local_table_to_metadata('ref_table_conversion_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
ALTER TABLE ref_table_conversion_test ADD CONSTRAINT ref_table_a_citus_local_referenced_a FOREIGN KEY (a) REFERENCES citus_local_referenced(a);
ALTER TABLE ref_table_conversion_test ADD CONSTRAINT ref_table_a_reference_table_referenced_a FOREIGN KEY (a) REFERENCES reference_table_referenced(a);
INSERT INTO ref_table_conversion_test VALUES (1), (2), (3), (4);
CREATE INDEX ref_table_conversion_test_a_idx1 ON ref_table_conversion_test (a);
CREATE INDEX ref_table_conversion_test_a_idx2 ON ref_table_conversion_test (a);
CREATE TABLE citus_local_referencing(a int);
ALTER TABLE citus_local_referencing ADD CONSTRAINT citus_local_referencing_a_ref_table_a FOREIGN KEY (a) REFERENCES ref_table_conversion_test(a);
SELECT citus_add_local_table_to_metadata('citus_local_referencing');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
INSERT INTO citus_local_referencing VALUES (1), (2), (3), (4);
CREATE TABLE reference_table_referencing(a int);
ALTER TABLE reference_table_referencing ADD CONSTRAINT reference_table_referencing_a_ref_table_a FOREIGN KEY (a) REFERENCES ref_table_conversion_test(a);
SELECT create_reference_table('reference_table_referencing');
create_reference_table
---------------------------------------------------------------------
(1 row)
INSERT INTO reference_table_referencing VALUES (1), (2), (3), (4);
-- save old shardid and placementid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset
SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset
SELECT create_reference_table('ref_table_conversion_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- check data on all placements
SELECT result FROM run_command_on_all_nodes(
$$SELECT COUNT(*)=4 FROM create_ref_dist_from_citus_local.ref_table_conversion_test$$
);
result
---------------------------------------------------------------------
t
t
t
(3 rows)
SELECT public.verify_pg_dist_partition_for_reference_table('create_ref_dist_from_citus_local.ref_table_conversion_test');
verify_pg_dist_partition_for_reference_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placements_for_reference_table('create_ref_dist_from_citus_local.ref_table_conversion_test',
:ref_table_conversion_test_old_shard_id,
:ref_table_conversion_test_old_coord_placement_id);
verify_shard_placements_for_reference_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.ref_table_conversion_test', 3);
verify_index_count_on_shard_placements
---------------------------------------------------------------------
t
(1 row)
SELECT on_node, fkey_names FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.ref_table_conversion_test') ORDER BY 1,2;
on_node | fkey_names
---------------------------------------------------------------------
on_coordinator | {citus_local_referencing_a_ref_table_a_1860015,ref_table_a_citus_local_referenced_a_1860014,ref_table_a_reference_table_referenced_a_1860014,reference_table_referencing_a_ref_table_a_1860016}
on_worker | {ref_table_a_reference_table_referenced_a_1860014,reference_table_referencing_a_ref_table_a_1860016}
on_worker | {ref_table_a_reference_table_referenced_a_1860014,reference_table_referencing_a_ref_table_a_1860016}
(3 rows)
CREATE TABLE dropped_column_test(a int, b int, c text not null, d text not null);
INSERT INTO dropped_column_test VALUES(1, null, 'text_1', 'text_2');
ALTER TABLE dropped_column_test DROP column b;
SELECT citus_add_local_table_to_metadata('dropped_column_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_reference_table('dropped_column_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- check data on all placements
SELECT result FROM run_command_on_all_nodes(
$$
SELECT jsonb_agg(q.*) FROM (
SELECT * FROM create_ref_dist_from_citus_local.dropped_column_test
) q
$$
);
result
---------------------------------------------------------------------
[{"a": 1, "c": "text_1", "d": "text_2"}]
[{"a": 1, "c": "text_1", "d": "text_2"}]
[{"a": 1, "c": "text_1", "d": "text_2"}]
(3 rows)
SET citus.shard_replication_factor TO 2;
CREATE TABLE replication_factor_test(a int);
SELECT citus_add_local_table_to_metadata('replication_factor_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
SELECT create_distributed_table('replication_factor_test', null);
ERROR: could not create single shard table: citus.shard_replication_factor is greater than 1
HINT: Consider setting citus.shard_replication_factor to 1 and try again
SET citus.shard_replication_factor TO 1;
-- cleanup at exit
DROP SCHEMA create_ref_dist_from_citus_local CASCADE;

View File

@ -614,11 +614,11 @@ INSERT INTO "Table?!.1Table" VALUES (10, 15, (150, row_to_json(row(4,8)))::int_j
INSERT INTO "Table?!.1Table" VALUES (5, 5, (5, row_to_json(row(5,5)))::int_jsonb_type, row_to_json(row(5,5), true));
-- tuples that are supposed to violate different data type / check constraints
INSERT INTO "Table?!.1Table"(id, jsondata, name) VALUES (101, '{"a": 1}', 'text_1');
ERROR: conflicting key value violates exclusion constraint "Table?!.1Table_name_excl_1730043"
ERROR: conflicting key value violates exclusion constraint "Table?!.1Table_name_excl_1730042"
DETAIL: Key (name)=(text_1) conflicts with existing key (name)=(text_1).
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO "Table?!.1Table"(id, jsondata, price) VALUES (101, '{"a": 1}', -1);
ERROR: new row for relation "Table?!.1Table_1730043" violates check constraint "Table?!.1Table_price_check"
ERROR: new row for relation "Table?!.1Table_1730042" violates check constraint "Table?!.1Table_price_check"
DETAIL: Failing row contains (101, null, null, {"a": 1}, null, -1, 0, null, 5, 14, 74).
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO "Table?!.1Table"(id, jsondata, age_with_default_col) VALUES (101, '{"a": 1}', -1);
@ -863,7 +863,7 @@ CREATE INDEX "my!Index2New" ON "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901
CREATE UNIQUE INDEX uniqueIndex2New ON "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789"(id);
-- error out for already existing, because of the unique index
INSERT INTO "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789" VALUES (1, 1, row_to_json(row(1,1), true));
ERROR: duplicate key value violates unique constraint "partition1_nullKeyTable.1!?!901234567890123456_bf4a8ac1_1730056"
ERROR: duplicate key value violates unique constraint "partition1_nullKeyTable.1!?!901234567890123456_bf4a8ac1_1730054"
DETAIL: Key (id)=(X) already exists.
CONTEXT: while executing command on localhost:xxxxx
-- verify all 4 shard indexes are created on the same node
@ -895,8 +895,8 @@ DETAIL: Reference tables and local tables can only have foreign keys to referen
ROLLBACK;
-- errors out because of foreign key violation
INSERT INTO "NULL_!_dist_key"."nullKeyTable.1!?!9012345678901234567890123456789012345678901234567890123456789" VALUES (100, 1, row_to_json(row(1,1), true));
ERROR: insert or update on table "partition100_nullKeyTable.1!?!9012345678901234_0aba0bf3_1730058" violates foreign key constraint "fkey_to_dummy_ref_1730055"
DETAIL: Key (id)=(X) is not present in table "dummy_reference_table_1730059".
ERROR: insert or update on table "partition100_nullKeyTable.1!?!9012345678901234_0aba0bf3_1730056" violates foreign key constraint "fkey_to_dummy_ref_1730053"
DETAIL: Key (id)=(X) is not present in table "dummy_reference_table_1730057".
CONTEXT: while executing command on localhost:xxxxx
-- now inserts successfully
INSERT INTO dummy_reference_table VALUES (100);
@ -1163,7 +1163,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730100"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730098"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
@ -1209,7 +1209,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730136"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730133"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
@ -1239,8 +1239,8 @@ BEGIN;
(1 row)
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
ERROR: cannot create foreign key constraint since foreign keys from reference tables and local tables to distributed tables are not supported
DETAIL: Reference tables and local tables can only have foreign keys to reference tables and local tables
ERROR: relation referenced_table is referenced by a foreign key from referencing_table
DETAIL: foreign keys from a reference table to a distributed table are not supported.
ROLLBACK;
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
@ -1274,8 +1274,8 @@ BEGIN;
(1 row)
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
ERROR: cannot create foreign key constraint since foreign keys from reference tables and local tables to distributed tables are not supported
DETAIL: Reference tables and local tables can only have foreign keys to reference tables and local tables
ERROR: relation referenced_table is referenced by a foreign key from referencing_table
DETAIL: foreign keys from a citus local table to a distributed table are not supported.
ROLLBACK;
BEGIN;
SELECT create_distributed_table('referenced_table', NULL, distribution_type=>null);
@ -1327,8 +1327,8 @@ SELECT result, success FROM run_command_on_workers($$
$$);
result | success
---------------------------------------------------------------------
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730153" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730153" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730146" | f
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730146" | f
(2 rows)
DROP TABLE referencing_table, referenced_table;
@ -1343,8 +1343,8 @@ SELECT create_distributed_table('self_fkey_test', NULL, distribution_type=>null)
INSERT INTO self_fkey_test VALUES (1, 1); -- ok
INSERT INTO self_fkey_test VALUES (2, 3); -- fails
ERROR: insert or update on table "self_fkey_test_1730154" violates foreign key constraint "self_fkey_test_b_fkey_1730154"
DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730154".
ERROR: insert or update on table "self_fkey_test_1730147" violates foreign key constraint "self_fkey_test_b_fkey_1730147"
DETAIL: Key (b)=(3) is not present in table "self_fkey_test_1730147".
CONTEXT: while executing command on localhost:xxxxx
-- similar foreign key tests but this time create the referencing table later on
-- referencing table is a single-shard table
@ -1368,7 +1368,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730156"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730149"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
@ -1391,7 +1391,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (2, 1);
-- fails
INSERT INTO referencing_table VALUES (1, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730158"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_b_fkey_1730151"
DETAIL: Key (a, b)=(1, 2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;
@ -1498,7 +1498,7 @@ BEGIN;
INSERT INTO referencing_table VALUES (1, 2);
-- fails
INSERT INTO referencing_table VALUES (2, 2);
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730199"
ERROR: insert or update on table "referencing_table_xxxxxxx" violates foreign key constraint "referencing_table_a_fkey_1730191"
DETAIL: Key (a)=(2) is not present in table "referenced_table_xxxxxxx".
CONTEXT: while executing command on localhost:xxxxx
ROLLBACK;

View File

@ -196,9 +196,6 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").kill()');
SELECT create_distributed_table('test_table', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$create_distributed_table_non_empty_failure.test_table$$)
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
count
@ -231,9 +228,6 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY").cancel(' || pg_b
SELECT create_distributed_table('test_table', 'id');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$create_distributed_table_non_empty_failure.test_table$$)
ERROR: canceling statement due to user request
SELECT count(*) FROM pg_dist_shard WHERE logicalrelid='create_distributed_table_non_empty_failure.test_table'::regclass;
count

View File

@ -107,9 +107,6 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 3").kill()');
SELECT create_reference_table('ref_table');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$failure_reference_table.ref_table$$)
ERROR: failed to COPY to shard xxxxx on localhost:xxxxx
SELECT count(*) FROM pg_dist_shard_placement;
count
@ -126,9 +123,6 @@ SELECT citus.mitmproxy('conn.onCommandComplete(command="COPY 3").cancel(' || pg_
SELECT create_reference_table('ref_table');
NOTICE: Copying data from local table...
NOTICE: copying the data has completed
DETAIL: The local data in the table is no longer visible, but is still on disk.
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$failure_reference_table.ref_table$$)
ERROR: canceling statement due to user request
SELECT count(*) FROM pg_dist_shard_placement;
count

View File

@ -1399,9 +1399,11 @@ SELECT * FROM multi_extension.print_extension_changes();
-- Snapshot of state at 12.1-1
ALTER EXTENSION citus UPDATE TO '12.1-1';
SELECT * FROM multi_extension.print_extension_changes();
previous_object | current_object
previous_object | current_object
---------------------------------------------------------------------
(0 rows)
| function citus_internal_delete_placement_metadata(bigint) void
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
(2 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version

View File

@ -856,7 +856,7 @@ SELECT create_reference_table('reference_table_second');
CREATE TABLE referenced_local_table(id int PRIMARY KEY, other_column int);
DROP TABLE reference_table CASCADE;
NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey on table reference_table_second
NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey_1350654 on table public.reference_table_second_1350654
NOTICE: drop cascades to constraint reference_table_second_referencing_column_fkey_1350653 on table public.reference_table_second_1350653
CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)"
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
CREATE TABLE reference_table(id int, referencing_column int REFERENCES referenced_local_table(id));
@ -917,7 +917,7 @@ DROP TABLE reference_table CASCADE;
NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to constraint fk on table references_to_reference_table
drop cascades to constraint fk on table reference_table_second
NOTICE: drop cascades to constraint fk_1350663 on table public.reference_table_second_1350663
NOTICE: drop cascades to constraint fk_1350662 on table public.reference_table_second_1350662
CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)"
PL/pgSQL function citus_drop_trigger() line XX at PERFORM
CREATE TABLE reference_table(id int PRIMARY KEY, referencing_column int);
@ -1277,6 +1277,6 @@ ERROR: cannot create foreign key constraint since Citus does not support ON DEL
-- we no longer need those tables
DROP TABLE referenced_by_reference_table, references_to_reference_table, reference_table, reference_table_second, referenced_local_table, self_referencing_reference_table, dropfkeytest2,
set_on_default_test_referenced, set_on_default_test_referencing;
NOTICE: drop cascades to constraint fk_1350664 on table public.reference_table_1350664
NOTICE: drop cascades to constraint fk_1350663 on table public.reference_table_1350663
CONTEXT: SQL statement "SELECT citus_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name, drop_shards_metadata_only := false)"
PL/pgSQL function citus_drop_trigger() line XX at PERFORM

View File

@ -793,7 +793,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370021 | 1 | 0 | localhost | 57637
1370019 | 1 | 0 | localhost | 57637
(1 row)
-- we should see the two shard placements after activation
@ -818,7 +818,7 @@ WHERE
ORDER BY 1,4,5;
shardid | shardstate | shardlength | nodename | nodeport
---------------------------------------------------------------------
1370021 | 1 | 0 | localhost | 57637
1370019 | 1 | 0 | localhost | 57637
(1 row)
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);

View File

@ -299,3 +299,230 @@ RETURNS jsonb AS $func$
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of indexes.
CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements(
qualified_table_name text,
n_expected_indexes int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_indexes = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- Returns names of the foreign keys that shards of given table are involved in
-- (as referencing or referenced one).
CREATE OR REPLACE FUNCTION get_fkey_names_on_placements(
qualified_table_name text)
RETURNS TABLE (
on_node text,
shard_id bigint,
fkey_names text[]
)
AS $func$
BEGIN
RETURN QUERY SELECT
CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col,
shardid,
(CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col
FROM run_command_on_placements(
qualified_table_name,
$$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$
)
JOIN pg_dist_node USING (nodename, nodeport);
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of partitions.
CREATE OR REPLACE FUNCTION verify_partition_count_on_placements(
qualified_table_name text,
n_expected_partitions int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_partitions = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Whether shard is on the coordinator or on a primary worker node, and if this is expected.
-- Given shardid is used for shard placement of the table.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table(
qualified_table_name text,
expected_shard_id bigint,
expect_placement_on_coord boolean)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
nodename_nodeport_groupid record;
result boolean;
BEGIN
SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND
logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id;
IF nodename_nodeport_groupid IS NULL
THEN
RAISE NOTICE 'Shard placement is not on a primary worker node';
RETURN false;
END IF;
IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord
THEN
RAISE NOTICE 'Shard placement is on an unexpected node';
RETURN false;
END IF;
-- verify that metadata on workers is correct too
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_workers($$
SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE logicalrelid = ''%s''::regclass AND
shardid = %s AND
nodename = ''%s'' AND
nodeport = %s AND
groupid = %s
$$)
);',
qualified_table_name, expected_shard_id,
nodename_nodeport_groupid.nodename,
nodename_nodeport_groupid.nodeport,
nodename_nodeport_groupid.groupid
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Shard placement exist on coordinator and on all primary worker nodes.
-- Given shardid is used for shard placements of the table.
-- Given placementid is used for the coordinator shard placement.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table(
qualified_table_name text,
expected_shard_id bigint,
expected_coord_placement_id bigint)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT
(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) =
(SELECT COUNT(*)
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s)
AND
(SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s AND
placementid = %s AND groupid = 0)
$$)
);',
qualified_table_name, expected_shard_id,
qualified_table_name, expected_shard_id,
expected_coord_placement_id
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given single-shard table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''s'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given reference table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''t'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;

View File

@ -868,7 +868,7 @@ SELECT create_reference_table('FKTABLE');
(1 row)
-- show that the definition is expected
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY oid;
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY 1;
pg_get_constraintdef
---------------------------------------------------------------------
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default)
@ -881,8 +881,8 @@ SET search_path TO pg15;
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY oid;
pg_get_constraintdef
---------------------------------------------------------------------
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default)
FOREIGN KEY (tid, fk_id_del_set_null) REFERENCES pktable(tid, id) ON DELETE SET NULL (fk_id_del_set_null)
FOREIGN KEY (tid, fk_id_del_set_default) REFERENCES pktable(tid, id) ON DELETE SET DEFAULT (fk_id_del_set_default)
(2 rows)
-- also, make sure that it works as expected

View File

@ -391,8 +391,8 @@ SELECT EXISTS(
(1 row)
INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a');
ERROR: insert or update on table "another_partitioned_table_child_1920090" violates foreign key constraint "another_partitioned_table_a_fkey_1920089"
DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920087".
ERROR: insert or update on table "another_partitioned_table_child_1920088" violates foreign key constraint "another_partitioned_table_a_fkey_1920087"
DETAIL: Key (a)=(1) is not present in table "partitioned_table_1920085".
CONTEXT: while executing command on localhost:xxxxx
INSERT INTO tenant_4.partitioned_table VALUES (1, 'a');
INSERT INTO tenant_4.another_partitioned_table VALUES (1, 'a');

View File

@ -177,8 +177,119 @@ WHERE logicalrelid = 'tenant_1.tbl_1'::regclass AND
(1 row)
RESET citus.enable_schema_based_sharding;
-- Test lazy conversion from Citus local to single-shard tables
-- and reference tables, on single node. This means that no shard
-- replication should be needed.
CREATE TABLE ref_table_conversion_test (
a int PRIMARY KEY
);
SELECT citus_add_local_table_to_metadata('ref_table_conversion_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid and placementid
SELECT get_shard_id_for_distribution_column('single_node.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset
SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset
SELECT create_reference_table('ref_table_conversion_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_reference_table('single_node.ref_table_conversion_test');
verify_pg_dist_partition_for_reference_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placements_for_reference_table('single_node.ref_table_conversion_test',
:ref_table_conversion_test_old_shard_id,
:ref_table_conversion_test_old_coord_placement_id);
verify_shard_placements_for_reference_table
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_test_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_1') AS single_shard_conversion_test_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_1', :single_shard_conversion_test_1_old_shard_id, true);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_test_2 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_2');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_2') AS single_shard_conversion_test_2_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_2', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_2');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_2', :single_shard_conversion_test_2_old_shard_id, true);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
-- make sure that they're created on different colocation groups
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2, ref_table_conversion_test, single_shard_conversion_test_1, single_shard_conversion_test_2;
DROP SCHEMA tenant_1 CASCADE;
RESET client_min_messages;
-- so that we don't have to update rest of the test output

View File

@ -177,8 +177,119 @@ WHERE logicalrelid = 'tenant_1.tbl_1'::regclass AND
(1 row)
RESET citus.enable_schema_based_sharding;
-- Test lazy conversion from Citus local to single-shard tables
-- and reference tables, on single node. This means that no shard
-- replication should be needed.
CREATE TABLE ref_table_conversion_test (
a int PRIMARY KEY
);
SELECT citus_add_local_table_to_metadata('ref_table_conversion_test');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid and placementid
SELECT get_shard_id_for_distribution_column('single_node.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset
SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset
SELECT create_reference_table('ref_table_conversion_test');
create_reference_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_reference_table('single_node.ref_table_conversion_test');
verify_pg_dist_partition_for_reference_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placements_for_reference_table('single_node.ref_table_conversion_test',
:ref_table_conversion_test_old_shard_id,
:ref_table_conversion_test_old_coord_placement_id);
verify_shard_placements_for_reference_table
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_test_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_1');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_1') AS single_shard_conversion_test_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_1', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_1');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_1', :single_shard_conversion_test_1_old_shard_id, true);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
CREATE TABLE single_shard_conversion_test_2 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_2');
citus_add_local_table_to_metadata
---------------------------------------------------------------------
(1 row)
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_2') AS single_shard_conversion_test_2_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_2', null, colocate_with=>'none');
create_distributed_table
---------------------------------------------------------------------
(1 row)
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_2');
verify_pg_dist_partition_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_2', :single_shard_conversion_test_2_old_shard_id, true);
verify_shard_placement_for_single_shard_table
---------------------------------------------------------------------
t
(1 row)
-- make sure that they're created on different colocation groups
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_2'::regclass
);
?column?
---------------------------------------------------------------------
t
(1 row)
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2, ref_table_conversion_test, single_shard_conversion_test_1, single_shard_conversion_test_2;
DROP SCHEMA tenant_1 CASCADE;
RESET client_min_messages;
-- so that we don't have to update rest of the test output

View File

@ -73,6 +73,7 @@ ORDER BY 1;
function citus_internal_adjust_local_clock_to_remote(cluster_clock)
function citus_internal_delete_colocation_metadata(integer)
function citus_internal_delete_partition_metadata(regclass)
function citus_internal_delete_placement_metadata(bigint)
function citus_internal_delete_shard_metadata(bigint)
function citus_internal_delete_tenant_schema(oid)
function citus_internal_global_blocked_processes()
@ -82,6 +83,7 @@ ORDER BY 1;
function citus_internal_start_replication_origin_tracking()
function citus_internal_stop_replication_origin_tracking()
function citus_internal_unregister_tenant_schema_globally(oid,text)
function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean)
function citus_internal_update_placement_metadata(bigint,integer,integer)
function citus_internal_update_relation_colocation(oid,integer)
function citus_is_clock_after(cluster_clock,cluster_clock)
@ -338,5 +340,5 @@ ORDER BY 1;
view citus_stat_tenants_local
view pg_dist_shard_placement
view time_partitions
(328 rows)
(330 rows)

View File

@ -16,10 +16,10 @@
# Tests around schema changes, these are run first, so there's no preexisting objects.
# ---
test: multi_extension
test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw
test: single_node
test: relation_access_tracking_single_node
test: single_node_truncate
test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw
test: multi_cluster_management
# below tests are placed right after multi_cluster_management as we do

View File

@ -1,8 +1,14 @@
SET search_path TO truncate_cascade_tests_schema;
-- Hide detail of truncate error because it might either reference
-- table_with_fk_1 or table_with_fk_2 in the error message.
\set VERBOSITY TERSE
-- Test truncate error on table with dependencies
TRUNCATE table_with_pk;
\set VERBOSITY DEFAULT
-- Test truncate rollback on table with dependencies
SELECT COUNT(*) FROM table_with_fk_1;
SELECT COUNT(*) FROM table_with_fk_2;

View File

@ -214,5 +214,368 @@ BEGIN;
SELECT create_distributed_table('citus_local_table_5', 'col_1', 'append');
ROLLBACK;
\set VERBOSITY DEFAULT
-- Test the UDFs that we use to convert Citus local tables to single-shard tables and
-- reference tables.
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, true);
SELECT pg_catalog.citus_internal_delete_placement_metadata(1);
CREATE ROLE test_user_create_ref_dist WITH LOGIN;
GRANT ALL ON SCHEMA create_ref_dist_from_citus_local TO test_user_create_ref_dist;
ALTER SYSTEM SET citus.enable_manual_metadata_changes_for_user TO 'test_user_create_ref_dist';
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
SET ROLE test_user_create_ref_dist;
SET citus.next_shard_id TO 1850000;
SET citus.next_placement_id TO 8510000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(null, 't', 1, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, null, 1, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', null, true);
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata(1, 't', 1, null);
SELECT pg_catalog.citus_internal_delete_placement_metadata(null);
CREATE TABLE udf_test (col_1 int);
SELECT citus_add_local_table_to_metadata('udf_test');
BEGIN;
SELECT pg_catalog.citus_internal_update_none_dist_table_metadata('create_ref_dist_from_citus_local.udf_test'::regclass, 'k', 99999, true);
SELECT COUNT(*)=1 FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.udf_test'::regclass AND repmodel = 'k' AND colocationid = 99999 AND autoconverted = true;
SELECT placementid AS udf_test_placementid FROM pg_dist_shard_placement
WHERE shardid = get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.udf_test') \gset
SELECT pg_catalog.citus_internal_delete_placement_metadata(:udf_test_placementid);
SELECT COUNT(*)=0 FROM pg_dist_placement WHERE placementid = :udf_test_placementid;
ROLLBACK;
RESET ROLE;
DROP TABLE udf_test;
REVOKE ALL ON SCHEMA create_ref_dist_from_citus_local FROM test_user_create_ref_dist;
DROP USER test_user_create_ref_dist;
ALTER SYSTEM RESET citus.enable_manual_metadata_changes_for_user;
SELECT pg_reload_conf();
SELECT pg_sleep(0.1);
-- Test lazy conversion from Citus local to single-shard tables and reference tables.
SET citus.next_shard_id TO 1860000;
SET citus.next_placement_id TO 8520000;
SET citus.shard_replication_factor TO 1;
SET search_path TO create_ref_dist_from_citus_local;
SET client_min_messages to ERROR;
INSERT INTO reference_table_1 VALUES (1, 1), (2, 2), (201, 201), (202, 202);
CREATE TABLE citus_local_table_7 (col_1 int UNIQUE);
INSERT INTO citus_local_table_7 VALUES (1), (2), (201), (202);
SELECT citus_add_local_table_to_metadata('citus_local_table_7');
CREATE TABLE fkey_test (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
INSERT INTO fkey_test VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
SELECT citus_add_local_table_to_metadata('fkey_test');
-- check unsupported foreign key constraints
ALTER TABLE reference_table_1 ADD CONSTRAINT ref_1_col_1_fkey_test_int_col_1 FOREIGN KEY (col_1) REFERENCES fkey_test(int_col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ALTER TABLE reference_table_1 DROP CONSTRAINT ref_1_col_1_fkey_test_int_col_1;
ALTER TABLE citus_local_table_7 ADD CONSTRAINT citus_local_1_col_1_fkey_test_int_col_1 FOREIGN KEY (col_1) REFERENCES fkey_test(int_col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ALTER TABLE citus_local_table_7 DROP CONSTRAINT citus_local_1_col_1_fkey_test_int_col_1;
ALTER TABLE fkey_test ADD CONSTRAINT fkey_test_int_col_1_citus_local_1_col_1 FOREIGN KEY (int_col_1) REFERENCES citus_local_table_7(col_1);
SELECT create_distributed_table('fkey_test', null, colocate_with=>'none');
ALTER TABLE fkey_test DROP CONSTRAINT fkey_test_int_col_1_citus_local_1_col_1;
CREATE TABLE tbl_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
CREATE INDEX tbl_1_int_col_2_idx ON tbl_1 (int_col_2);
INSERT INTO tbl_1 VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_2_ref_1_col_1 FOREIGN KEY (int_col_2) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_1 ADD CONSTRAINT tbl_1_int_col_2_tbl_1_int_col_1 FOREIGN KEY (int_col_2) REFERENCES tbl_1(int_col_1);
SELECT citus_add_local_table_to_metadata('tbl_1');
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_1') AS tbl_1_old_shard_id \gset
SELECT create_distributed_table('tbl_1', null, colocate_with=>'none');
-- check data
SELECT * FROM tbl_1 ORDER BY int_col_1;
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_1');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_1', :tbl_1_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_1', 3);
SELECT cardinality(fkey_names) = 3 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_1');
-- test partitioning
CREATE TABLE tbl_2 (
int_col_1 int PRIMARY KEY,
text_col_1 text,
int_col_2 int
) PARTITION BY RANGE (int_col_1);
CREATE TABLE tbl_2_child_1 PARTITION OF tbl_2 FOR VALUES FROM (0) TO (100);
CREATE TABLE tbl_2_child_2 PARTITION OF tbl_2 FOR VALUES FROM (200) TO (300);
INSERT INTO tbl_2 VALUES (1, '1', 1), (2, '2', 2), (201, '201', 201), (202, '202', 202);
SELECT citus_add_local_table_to_metadata('tbl_2');
ALTER TABLE tbl_2 ADD CONSTRAINT tbl_2_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_1(col_1);
ALTER TABLE tbl_2 ADD CONSTRAINT tbl_2_int_col_2_ref_1_col_1 FOREIGN KEY (int_col_2) REFERENCES reference_table_1(col_1);
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2') AS tbl_2_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2_child_1') AS tbl_2_child_1_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_2_child_2') AS tbl_2_child_2_old_shard_id \gset
SELECT create_distributed_table('tbl_2', null, colocate_with=>'tbl_1');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2', :tbl_2_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2', 1);
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2');
SELECT public.verify_partition_count_on_placements('create_ref_dist_from_citus_local.tbl_2', 2);
-- verify the same for children
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_1');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_1', :tbl_2_child_1_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2_child_1', 1);
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2_child_1');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_2');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_2_child_2', :tbl_2_child_2_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_2_child_2', 1);
SELECT cardinality(fkey_names) = 2 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_2_child_2');
-- verify that placements of all 4 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:tbl_1_old_shard_id, :tbl_2_old_shard_id, :tbl_2_child_1_old_shard_id, :tbl_2_child_2_old_shard_id
);
-- verify the same by executing a router query that targets both tables
SET client_min_messages to DEBUG2;
SELECT COUNT(*) FROM tbl_1, tbl_2;
SET client_min_messages to ERROR;
CREATE TABLE reference_table_3(col_1 INT UNIQUE, col_2 INT UNIQUE);
INSERT INTO reference_table_3 VALUES (1, 1), (2, 2), (201, 201), (202, 202);
CREATE TABLE tbl_3 (
int_col_1 int PRIMARY KEY,
text_col_1 text,
int_col_2 int
) PARTITION BY RANGE (int_col_1);
CREATE TABLE tbl_3_child_1 PARTITION OF tbl_3 FOR VALUES FROM (0) TO (100);
ALTER TABLE tbl_3 ADD CONSTRAINT tbl_3_int_col_1_ref_1_col_1 FOREIGN KEY (int_col_1) REFERENCES reference_table_3(col_1);
SELECT create_reference_table('reference_table_3');
INSERT INTO tbl_3 VALUES (1, '1', 1), (2, '2', 2);
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_3') AS tbl_3_old_shard_id \gset
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.tbl_3_child_1') AS tbl_3_child_1_old_shard_id \gset
SELECT create_distributed_table('tbl_3', null, colocate_with=>'none');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3', :tbl_3_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_3', 1);
SELECT cardinality(fkey_names) = 1 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_3');
SELECT public.verify_partition_count_on_placements('create_ref_dist_from_citus_local.tbl_3', 1);
-- verify the same for children
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3_child_1');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.tbl_3_child_1', :tbl_3_child_1_old_shard_id, false);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.tbl_3_child_1', 1);
SELECT cardinality(fkey_names) = 1 AS verify_fkey_count_on_shards FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.tbl_3_child_1');
-- verify that placements of all 2 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:tbl_3_old_shard_id, :tbl_3_child_1_old_shard_id
);
-- verify the same by executing a router query that targets the table
SET client_min_messages to DEBUG2;
SELECT COUNT(*) FROM tbl_3;
SET client_min_messages to ERROR;
CREATE TABLE single_shard_conversion_colocated_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_colocated_1');
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1') AS single_shard_conversion_colocated_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_colocated_1', null, colocate_with=>'none');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_1', :single_shard_conversion_colocated_1_old_shard_id, false);
CREATE TABLE single_shard_conversion_colocated_2 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_colocated_2');
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2') AS single_shard_conversion_colocated_2_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_colocated_2', null, colocate_with=>'single_shard_conversion_colocated_1');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_colocated_2', :single_shard_conversion_colocated_2_old_shard_id, false);
-- make sure that they're created on the same colocation group
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_1'::regclass
)
=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_2'::regclass
);
-- verify that placements of 2 tables are on the same node
SELECT COUNT(DISTINCT(groupid)) = 1 FROM pg_dist_placement WHERE shardid IN (
:single_shard_conversion_colocated_1_old_shard_id, :single_shard_conversion_colocated_2_old_shard_id
);
CREATE TABLE single_shard_conversion_noncolocated_1 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_noncolocated_1');
-- save old shardid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1') AS single_shard_conversion_noncolocated_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_noncolocated_1', null, colocate_with=>'none');
SELECT public.verify_pg_dist_partition_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1');
SELECT public.verify_shard_placement_for_single_shard_table('create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1', :single_shard_conversion_noncolocated_1_old_shard_id, false);
-- make sure that they're created on different colocation groups
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_colocated_1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'create_ref_dist_from_citus_local.single_shard_conversion_noncolocated_1'::regclass
);
-- Test creating a reference table from a Citus local table
-- (ref_table_conversion_test) that has foreign keys from/to Citus
-- local tables and reference tables:
--
-- citus_local_referencing ---------- ----------> citus_local_referenced
-- | ^
-- v |
-- ref_table_conversion_test
-- ^ |
-- | v
-- reference_table_referencing ---------- ----------> reference_table_referenced
--
CREATE TABLE citus_local_referenced(a int PRIMARY KEY);
SELECT citus_add_local_table_to_metadata('citus_local_referenced');
INSERT INTO citus_local_referenced VALUES (1), (2), (3), (4);
CREATE TABLE reference_table_referenced(a int PRIMARY KEY);
SELECT create_reference_table('reference_table_referenced');
INSERT INTO reference_table_referenced VALUES (1), (2), (3), (4);
CREATE TABLE ref_table_conversion_test (
a int PRIMARY KEY
);
SELECT citus_add_local_table_to_metadata('ref_table_conversion_test');
ALTER TABLE ref_table_conversion_test ADD CONSTRAINT ref_table_a_citus_local_referenced_a FOREIGN KEY (a) REFERENCES citus_local_referenced(a);
ALTER TABLE ref_table_conversion_test ADD CONSTRAINT ref_table_a_reference_table_referenced_a FOREIGN KEY (a) REFERENCES reference_table_referenced(a);
INSERT INTO ref_table_conversion_test VALUES (1), (2), (3), (4);
CREATE INDEX ref_table_conversion_test_a_idx1 ON ref_table_conversion_test (a);
CREATE INDEX ref_table_conversion_test_a_idx2 ON ref_table_conversion_test (a);
CREATE TABLE citus_local_referencing(a int);
ALTER TABLE citus_local_referencing ADD CONSTRAINT citus_local_referencing_a_ref_table_a FOREIGN KEY (a) REFERENCES ref_table_conversion_test(a);
SELECT citus_add_local_table_to_metadata('citus_local_referencing');
INSERT INTO citus_local_referencing VALUES (1), (2), (3), (4);
CREATE TABLE reference_table_referencing(a int);
ALTER TABLE reference_table_referencing ADD CONSTRAINT reference_table_referencing_a_ref_table_a FOREIGN KEY (a) REFERENCES ref_table_conversion_test(a);
SELECT create_reference_table('reference_table_referencing');
INSERT INTO reference_table_referencing VALUES (1), (2), (3), (4);
-- save old shardid and placementid
SELECT get_shard_id_for_distribution_column('create_ref_dist_from_citus_local.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset
SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset
SELECT create_reference_table('ref_table_conversion_test');
-- check data on all placements
SELECT result FROM run_command_on_all_nodes(
$$SELECT COUNT(*)=4 FROM create_ref_dist_from_citus_local.ref_table_conversion_test$$
);
SELECT public.verify_pg_dist_partition_for_reference_table('create_ref_dist_from_citus_local.ref_table_conversion_test');
SELECT public.verify_shard_placements_for_reference_table('create_ref_dist_from_citus_local.ref_table_conversion_test',
:ref_table_conversion_test_old_shard_id,
:ref_table_conversion_test_old_coord_placement_id);
SELECT public.verify_index_count_on_shard_placements('create_ref_dist_from_citus_local.ref_table_conversion_test', 3);
SELECT on_node, fkey_names FROM public.get_fkey_names_on_placements('create_ref_dist_from_citus_local.ref_table_conversion_test') ORDER BY 1,2;
CREATE TABLE dropped_column_test(a int, b int, c text not null, d text not null);
INSERT INTO dropped_column_test VALUES(1, null, 'text_1', 'text_2');
ALTER TABLE dropped_column_test DROP column b;
SELECT citus_add_local_table_to_metadata('dropped_column_test');
SELECT create_reference_table('dropped_column_test');
-- check data on all placements
SELECT result FROM run_command_on_all_nodes(
$$
SELECT jsonb_agg(q.*) FROM (
SELECT * FROM create_ref_dist_from_citus_local.dropped_column_test
) q
$$
);
SET citus.shard_replication_factor TO 2;
CREATE TABLE replication_factor_test(a int);
SELECT citus_add_local_table_to_metadata('replication_factor_test');
SELECT create_distributed_table('replication_factor_test', null);
SET citus.shard_replication_factor TO 1;
-- cleanup at exit
DROP SCHEMA create_ref_dist_from_citus_local CASCADE;

View File

@ -316,3 +316,237 @@ RETURNS jsonb AS $func$
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of indexes.
CREATE OR REPLACE FUNCTION verify_index_count_on_shard_placements(
qualified_table_name text,
n_expected_indexes int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_indexes = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_index WHERE indrelid::regclass = '%s'::regclass$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- Returns names of the foreign keys that shards of given table are involved in
-- (as referencing or referenced one).
CREATE OR REPLACE FUNCTION get_fkey_names_on_placements(
qualified_table_name text)
RETURNS TABLE (
on_node text,
shard_id bigint,
fkey_names text[]
)
AS $func$
BEGIN
RETURN QUERY SELECT
CASE WHEN groupid = 0 THEN 'on_coordinator' ELSE 'on_worker' END AS on_node_col,
shardid,
(CASE WHEN result = '' THEN '{}' ELSE result END)::text[] AS fkey_names_col
FROM run_command_on_placements(
qualified_table_name,
$$SELECT array_agg(conname ORDER BY conname) FROM pg_constraint WHERE '%s'::regclass IN (conrelid, confrelid) AND contype = 'f'$$
)
JOIN pg_dist_node USING (nodename, nodeport);
END;
$func$ LANGUAGE plpgsql;
-- Returns true if all shard placements of given table have given number of partitions.
CREATE OR REPLACE FUNCTION verify_partition_count_on_placements(
qualified_table_name text,
n_expected_partitions int)
RETURNS BOOLEAN
AS $func$
DECLARE
v_result boolean;
BEGIN
SELECT n_expected_partitions = ALL(
SELECT result::int INTO v_result
FROM run_command_on_placements(
qualified_table_name,
$$SELECT COUNT(*) FROM pg_inherits WHERE inhparent = '%s'::regclass;$$
)
);
RETURN v_result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Whether shard is on the coordinator or on a primary worker node, and if this is expected.
-- Given shardid is used for shard placement of the table.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placement_for_single_shard_table(
qualified_table_name text,
expected_shard_id bigint,
expect_placement_on_coord boolean)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
nodename_nodeport_groupid record;
result boolean;
BEGIN
SELECT nodename, nodeport, groupid INTO nodename_nodeport_groupid
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = 'primary' AND shouldhaveshards AND isactive AND
logicalrelid = qualified_table_name::regclass AND shardid = expected_shard_id;
IF nodename_nodeport_groupid IS NULL
THEN
RAISE NOTICE 'Shard placement is not on a primary worker node';
RETURN false;
END IF;
IF (nodename_nodeport_groupid.groupid = 0) != expect_placement_on_coord
THEN
RAISE NOTICE 'Shard placement is on an unexpected node';
RETURN false;
END IF;
-- verify that metadata on workers is correct too
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_workers($$
SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE logicalrelid = ''%s''::regclass AND
shardid = %s AND
nodename = ''%s'' AND
nodeport = %s AND
groupid = %s
$$)
);',
qualified_table_name, expected_shard_id,
nodename_nodeport_groupid.nodename,
nodename_nodeport_groupid.nodeport,
nodename_nodeport_groupid.groupid
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_placement on all nodes and returns true if the following holds:
-- Shard placement exist on coordinator and on all primary worker nodes.
-- Given shardid is used for shard placements of the table.
-- Given placementid is used for the coordinator shard placement.
-- Placement metadata is correct on all nodes.
CREATE OR REPLACE FUNCTION verify_shard_placements_for_reference_table(
qualified_table_name text,
expected_shard_id bigint,
expected_coord_placement_id bigint)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT
(SELECT COUNT(*) FROM pg_dist_node WHERE noderole = ''primary'' AND isactive) =
(SELECT COUNT(*)
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s)
AND
(SELECT COUNT(*) = 1
FROM pg_dist_shard
JOIN pg_dist_placement USING (shardid)
JOIN pg_dist_node USING (groupid)
WHERE noderole = ''primary'' AND isactive AND
logicalrelid = ''%s''::regclass AND shardid = %s AND
placementid = %s AND groupid = 0)
$$)
);',
qualified_table_name, expected_shard_id,
qualified_table_name, expected_shard_id,
expected_coord_placement_id
)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given single-shard table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_single_shard_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''s'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;
-- This function checks pg_dist_partition on all nodes and returns true if the metadata
-- record for given reference table is correct.
CREATE OR REPLACE FUNCTION verify_pg_dist_partition_for_reference_table(
qualified_table_name text)
RETURNS BOOLEAN
AS $func$
DECLARE
verify_workers_query text;
result boolean;
BEGIN
SELECT format(
'SELECT true = ALL(
SELECT result::boolean FROM run_command_on_all_nodes($$
SELECT COUNT(*) = 1
FROM pg_dist_partition
WHERE logicalrelid = ''%s''::regclass AND
partmethod = ''n'' AND
partkey IS NULL AND
colocationid > 0 AND
repmodel = ''t'' AND
autoconverted = false
$$)
);',
qualified_table_name)
INTO verify_workers_query;
EXECUTE verify_workers_query INTO result;
RETURN result;
END;
$func$ LANGUAGE plpgsql;

View File

@ -530,7 +530,7 @@ SELECT create_distributed_table('FKTABLE', 'tid');
SELECT create_reference_table('FKTABLE');
-- show that the definition is expected
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY oid;
SELECT pg_get_constraintdef(oid) FROM pg_constraint WHERE conrelid = 'fktable'::regclass::oid ORDER BY 1;
\c - - - :worker_1_port

View File

@ -114,8 +114,68 @@ WHERE logicalrelid = 'tenant_1.tbl_1'::regclass AND
RESET citus.enable_schema_based_sharding;
-- Test lazy conversion from Citus local to single-shard tables
-- and reference tables, on single node. This means that no shard
-- replication should be needed.
CREATE TABLE ref_table_conversion_test (
a int PRIMARY KEY
);
SELECT citus_add_local_table_to_metadata('ref_table_conversion_test');
-- save old shardid and placementid
SELECT get_shard_id_for_distribution_column('single_node.ref_table_conversion_test') AS ref_table_conversion_test_old_shard_id \gset
SELECT placementid AS ref_table_conversion_test_old_coord_placement_id FROM pg_dist_placement WHERE shardid = :ref_table_conversion_test_old_shard_id \gset
SELECT create_reference_table('ref_table_conversion_test');
SELECT public.verify_pg_dist_partition_for_reference_table('single_node.ref_table_conversion_test');
SELECT public.verify_shard_placements_for_reference_table('single_node.ref_table_conversion_test',
:ref_table_conversion_test_old_shard_id,
:ref_table_conversion_test_old_coord_placement_id);
CREATE TABLE single_shard_conversion_test_1 (
int_col_1 int PRIMARY KEY,
text_col_1 text UNIQUE,
int_col_2 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_1');
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_1') AS single_shard_conversion_test_1_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_1', null, colocate_with=>'none');
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_1');
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_1', :single_shard_conversion_test_1_old_shard_id, true);
CREATE TABLE single_shard_conversion_test_2 (
int_col_1 int
);
SELECT citus_add_local_table_to_metadata('single_shard_conversion_test_2');
-- save old shardid
SELECT get_shard_id_for_distribution_column('single_node.single_shard_conversion_test_2') AS single_shard_conversion_test_2_old_shard_id \gset
SELECT create_distributed_table('single_shard_conversion_test_2', null, colocate_with=>'none');
SELECT public.verify_pg_dist_partition_for_single_shard_table('single_node.single_shard_conversion_test_2');
SELECT public.verify_shard_placement_for_single_shard_table('single_node.single_shard_conversion_test_2', :single_shard_conversion_test_2_old_shard_id, true);
-- make sure that they're created on different colocation groups
SELECT
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_1'::regclass
)
!=
(
SELECT colocationid FROM pg_dist_partition
WHERE logicalrelid = 'single_node.single_shard_conversion_test_2'::regclass
);
SET client_min_messages TO WARNING;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2;
DROP TABLE failover_to_local, single_node_nullkey_c1, single_node_nullkey_c2, ref_table_conversion_test, single_shard_conversion_test_1, single_shard_conversion_test_2;
DROP SCHEMA tenant_1 CASCADE;
RESET client_min_messages;