diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 465e900ef..2d5bf7019 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -70,6 +70,8 @@ $(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql cat $^ > $@ $(EXTENSION)--6.0-6.sql: $(EXTENSION)--6.0-5.sql $(EXTENSION)--6.0-5--6.0-6.sql cat $^ > $@ +$(EXTENSION)--6.0-7.sql: $(EXTENSION)--6.0-6.sql $(EXTENSION)--6.0-6--6.0-7.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-4--6.0-5.sql b/src/backend/distributed/citus--6.0-4--6.0-5.sql index 27f974428..08ee51a78 100644 --- a/src/backend/distributed/citus--6.0-4--6.0-5.sql +++ b/src/backend/distributed/citus--6.0-4--6.0-5.sql @@ -26,3 +26,4 @@ CREATE FUNCTION master_get_new_placementid() AS 'MODULE_PATHNAME', $$master_get_new_placementid$$; COMMENT ON FUNCTION master_get_new_placementid() IS 'fetch unique placementid'; + diff --git a/src/backend/distributed/citus--6.0-5--6.0-6.sql b/src/backend/distributed/citus--6.0-5--6.0-6.sql index 33456f43e..ec4a7cd86 100644 --- a/src/backend/distributed/citus--6.0-5--6.0-6.sql +++ b/src/backend/distributed/citus--6.0-5--6.0-6.sql @@ -12,3 +12,4 @@ CREATE FUNCTION column_name_to_column(table_name regclass, column_name text) AS 'MODULE_PATHNAME', $$column_name_to_column$$; COMMENT ON FUNCTION column_name_to_column(table_name regclass, column_name text) IS 'convert a column name to its textual Var representation'; + diff --git a/src/backend/distributed/citus--6.0-6--6.0-7.sql b/src/backend/distributed/citus--6.0-6--6.0-7.sql new file mode 100644 index 000000000..9fe34e250 --- /dev/null +++ b/src/backend/distributed/citus--6.0-6--6.0-7.sql @@ -0,0 +1,21 @@ +/* citus--6.0-5--6.0-6.sql */ + +CREATE FUNCTION pg_catalog.get_colocated_table_array(regclass) + RETURNS regclass[] + AS 'citus' + LANGUAGE C STRICT; + +CREATE OR REPLACE FUNCTION pg_catalog.master_move_shard_placement(shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_move_shard_placement$$; +COMMENT ON FUNCTION pg_catalog.master_move_shard_placement(shard_id bigint, + source_node_name text, + source_node_port integer, + target_node_name text, + target_node_port integer) + IS 'move shard from remote node'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index d417266d3..3b26d61b3 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-6' +default_version = '6.0-7' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index f7e94d51d..3ae2e8e10 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -18,7 +18,9 @@ #include #include "catalog/pg_class.h" +#include "distributed/colocation_utils.h" #include "distributed/connection_cache.h" +#include "distributed/listutils.h" #include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_router_executor.h" @@ -36,68 +38,163 @@ /* local function forward declarations */ +static void EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, + int32 sourceNodePort, char *targetNodeName, + int32 targetNodePort); +static void EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, + int32 sourceNodePort); static ShardPlacement * SearchShardPlacementInList(List *shardPlacementList, - text *nodeName, uint32 nodePort); + char *nodeName, uint32 nodePort, + bool missingOk); +static void CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort, + bool doRepair); +static List * CopyShardCommandList(ShardInterval *shardInterval, char *sourceNodeName, + int32 sourceNodePort); +static char * ConstructQualifiedShardName(ShardInterval *shardInterval); static List * RecreateTableDDLCommandList(Oid relationId); -static bool CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId, - ShardPlacement *healthyPlacement, - ShardPlacement *placementToRepair); - +static void SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, + List *commandList); +static char * CitusExtensionOwnerName(void); /* declarations for dynamic loading */ PG_FUNCTION_INFO_V1(master_copy_shard_placement); +PG_FUNCTION_INFO_V1(master_move_shard_placement); /* - * master_copy_shard_placement implements a user-facing UDF to copy data from + * master_copy_shard_placement implements a user-facing UDF to repair data from * a healthy (source) node to an inactive (target) node. To accomplish this it * entirely recreates the table structure before copying all data. During this * time all modifications are paused to the shard. After successful repair, the * inactive placement is marked healthy and modifications may continue. If the * repair fails at any point, this function throws an error, leaving the node - * in an unhealthy state. + * in an unhealthy state. Please note that master_copy_shard_placement copies + * given shard along with its co-located shards. */ Datum master_copy_shard_placement(PG_FUNCTION_ARGS) { int64 shardId = PG_GETARG_INT64(0); - text *sourceNodeName = PG_GETARG_TEXT_P(1); + text *sourceNodeNameText = PG_GETARG_TEXT_P(1); int32 sourceNodePort = PG_GETARG_INT32(2); - text *targetNodeName = PG_GETARG_TEXT_P(3); + text *targetNodeNameText = PG_GETARG_TEXT_P(3); int32 targetNodePort = PG_GETARG_INT32(4); + bool doRepair = true; + + char *sourceNodeName = text_to_cstring(sourceNodeNameText); + char *targetNodeName = text_to_cstring(targetNodeNameText); ShardInterval *shardInterval = LoadShardInterval(shardId); Oid distributedTableId = shardInterval->relationId; + List *colocatedTableList = ColocatedTableList(distributedTableId); + ListCell *colocatedTableCell = NULL; + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; - char *relationOwner = NULL; - List *shardPlacementList = NIL; + foreach(colocatedTableCell, colocatedTableList) + { + Oid colocatedTableId = lfirst_oid(colocatedTableCell); + char relationKind = '\0'; + + /* check that user has owner rights in all co-located tables */ + EnsureTableOwner(colocatedTableId); + + relationKind = get_rel_relkind(colocatedTableId); + if (relationKind == RELKIND_FOREIGN_TABLE) + { + char *relationName = get_rel_name(colocatedTableId); + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot repair shard"), + errdetail("Table %s is a foreign table. Repairing " + "shards backed by foreign tables is " + "not supported.", relationName))); + } + } + + /* we sort colocatedShardList so that lock operations will not cause any deadlocks */ + colocatedShardList = SortList(colocatedShardList, CompareShardIntervalsById); + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + + /* + * We've stopped data modifications of this shard, but we plan to move + * a placement to the healthy state, so we need to grab a shard metadata + * lock (in exclusive mode) as well. + */ + LockShardDistributionMetadata(colocatedShardId, ExclusiveLock); + + /* + * If our aim is repairing, we should be sure that there is an unhealthy + * placement in target node. We use EnsureShardCanBeRepaired function + * to be sure that there is an unhealthy placement in target node. If + * we just want to copy the shard without any repair, it is enough to use + * EnsureShardCanBeCopied which just checks there is a placement in source + * and no placement in target node. + */ + if (doRepair) + { + /* + * After #810 is fixed, we should remove this check and call EnsureShardCanBeRepaired + * for all shard ids + */ + if (colocatedShardId == shardId) + { + EnsureShardCanBeRepaired(colocatedShardId, sourceNodeName, sourceNodePort, + targetNodeName, targetNodePort); + } + else + { + EnsureShardCanBeMoved(colocatedShardId, sourceNodeName, sourceNodePort); + } + } + else + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("master_copy_shard_placement() without repair option " + "is only supported on Citus Enterprise"))); + } + } + + + /* CopyShardPlacement function copies given shard with its co-located shards */ + CopyShardPlacement(shardId, sourceNodeName, sourceNodePort, targetNodeName, + targetNodePort, doRepair); + + PG_RETURN_VOID(); +} + + +/* + * master_move_shard_placement moves given shard (and its co-located shards) from one + * node to the other node. + */ +Datum +master_move_shard_placement(PG_FUNCTION_ARGS) +{ + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("master_move_shard_placement() is only supported on " + "Citus Enterprise"))); +} + + +/* + * EnsureShardCanBeRepaired checks if the given shard has a healthy placement in the source + * node and inactive node on the target node. + */ +static void +EnsureShardCanBeRepaired(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort) +{ + List *shardPlacementList = ShardPlacementList(shardId); ShardPlacement *sourcePlacement = NULL; ShardPlacement *targetPlacement = NULL; - WorkerNode *targetNode = NULL; - List *ddlCommandList = NIL; - bool dataCopied = false; - char relationKind = '\0'; + bool missingSourceOk = false; + bool missingTargetOk = false; - EnsureTableOwner(distributedTableId); - - /* - * By taking an exclusive lock on the shard, we both stop all modifications - * (INSERT, UPDATE, or DELETE) and prevent concurrent repair operations from - * being able to operate on this shard. - */ - LockShardResource(shardId, ExclusiveLock); - - /* - * We've stopped data modifications of this shard, but we plan to move - * a placement to the healthy state, so we need to grab a shard metadata - * lock (in exclusive mode) as well. - */ - LockShardDistributionMetadata(shardId, ExclusiveLock); - - relationOwner = TableOwner(distributedTableId); - - shardPlacementList = ShardPlacementList(shardId); sourcePlacement = SearchShardPlacementInList(shardPlacementList, sourceNodeName, - sourceNodePort); + sourceNodePort, missingSourceOk); if (sourcePlacement->shardState != FILE_FINALIZED) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), @@ -105,65 +202,44 @@ master_copy_shard_placement(PG_FUNCTION_ARGS) } targetPlacement = SearchShardPlacementInList(shardPlacementList, targetNodeName, - targetNodePort); + targetNodePort, missingTargetOk); if (targetPlacement->shardState != FILE_INACTIVE) { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("target placement must be in inactive state"))); } - - relationKind = get_rel_relkind(distributedTableId); - if (relationKind == RELKIND_FOREIGN_TABLE) - { - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot repair shard"), - errdetail("Repairing shards backed by foreign tables is " - "not supported."))); - } - - targetNode = palloc0(sizeof(WorkerNode)); - strlcpy(targetNode->workerName, targetPlacement->nodeName, WORKER_LENGTH); - targetNode->workerPort = targetPlacement->nodePort; - - /* retrieve DDL commands needed to drop and recreate table*/ - ddlCommandList = RecreateTableDDLCommandList(distributedTableId); - - /* remove existing (unhealthy) placement row; CreateShardPlacements will recreate */ - DeleteShardPlacementRow(targetPlacement->shardId, targetPlacement->nodeName, - targetPlacement->nodePort); - - /* finally, drop/recreate remote table and add back row (in healthy state) */ - CreateShardPlacements(distributedTableId, shardId, ddlCommandList, relationOwner, - list_make1(targetNode), 0, 1); - - HOLD_INTERRUPTS(); - - dataCopied = CopyDataFromFinalizedPlacement(distributedTableId, shardId, - sourcePlacement, targetPlacement); - if (!dataCopied) - { - ereport(ERROR, (errmsg("could not copy shard data"), - errhint("Consult recent messages in the server logs for " - "details."))); - } - - RESUME_INTERRUPTS(); - - PG_RETURN_VOID(); } /* - * SearchShardPlacementInList searches a provided list for a shard placement - * with the specified node name and port. This function throws an error if no - * such placement exists in the provided list. + * EnsureShardCanBeMoved checks if the given shard has a placement in the source node but + * not on the target node. It is important to note that SearchShardPlacementInList + * function already generates error if given shard does not have a placement in the + * source node. Therefore we do not perform extra check. + */ +static void +EnsureShardCanBeMoved(int64 shardId, char *sourceNodeName, int32 sourceNodePort) +{ + List *shardPlacementList = ShardPlacementList(shardId); + bool missingSourceOk = false; + + /* Actual check is done in SearchShardPlacementInList */ + SearchShardPlacementInList(shardPlacementList, sourceNodeName, sourceNodePort, + missingSourceOk); +} + + +/* + * SearchShardPlacementInList searches a provided list for a shard placement with the + * specified node name and port. If missingOk is set to true, this function returns NULL + * if no such placement exists in the provided list, otherwise it throws an error. */ static ShardPlacement * -SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32 nodePort) +SearchShardPlacementInList(List *shardPlacementList, char *nodeName, uint32 nodePort, bool + missingOk) { ListCell *shardPlacementCell = NULL; ShardPlacement *matchingPlacement = NULL; - char *nodeName = text_to_cstring(nodeNameText); foreach(shardPlacementCell, shardPlacementList) { @@ -179,6 +255,11 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32 if (matchingPlacement == NULL) { + if (missingOk) + { + return NULL; + } + ereport(ERROR, (errcode(ERRCODE_DATA_EXCEPTION), errmsg("could not find placement matching \"%s:%d\"", nodeName, nodePort), @@ -189,6 +270,134 @@ SearchShardPlacementInList(List *shardPlacementList, text *nodeNameText, uint32 } +/* + * CopyShardPlacement copies a shard along with its co-located shards from a source node + * to target node. CopyShardPlacement does not make any checks about state of the shards. + * It is caller's responsibility to make those checks if they are necessary. + */ +static void +CopyShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, + char *targetNodeName, int32 targetNodePort, bool doRepair) +{ + ShardInterval *shardInterval = LoadShardInterval(shardId); + List *colocatedShardList = ColocatedShardIntervalList(shardInterval); + ListCell *colocatedShardCell = NULL; + List *ddlCommandList = NIL; + + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + List *colocatedShardDdlList = NIL; + + colocatedShardDdlList = CopyShardCommandList(colocatedShard, sourceNodeName, + sourceNodePort); + + ddlCommandList = list_concat(ddlCommandList, colocatedShardDdlList); + } + + HOLD_INTERRUPTS(); + + SendCommandListInSingleTransaction(targetNodeName, targetNodePort, ddlCommandList); + + foreach(colocatedShardCell, colocatedShardList) + { + ShardInterval *colocatedShard = (ShardInterval *) lfirst(colocatedShardCell); + uint64 colocatedShardId = colocatedShard->shardId; + + /* + * If we call this function for repair purposes, the caller should have + * removed the old shard placement metadata. + */ + if (doRepair) + { + List *shardPlacementList = ShardPlacementList(colocatedShardId); + bool missingSourceOk = false; + + ShardPlacement *placement = SearchShardPlacementInList(shardPlacementList, + targetNodeName, + targetNodePort, + missingSourceOk); + + UpdateShardPlacementState(placement->placementId, FILE_FINALIZED); + } + else + { + InsertShardPlacementRow(colocatedShardId, INVALID_PLACEMENT_ID, + FILE_FINALIZED, ShardLength(colocatedShardId), + targetNodeName, + targetNodePort); + } + } + + RESUME_INTERRUPTS(); +} + + +/* + * CopyShardCommandList generates command list to copy the given shard placement + * from the source node to the target node. + */ +static List * +CopyShardCommandList(ShardInterval *shardInterval, + char *sourceNodeName, int32 sourceNodePort) +{ + char *shardName = ConstructQualifiedShardName(shardInterval); + List *ddlCommandList = NIL; + ListCell *ddlCommandCell = NULL; + List *copyShardToNodeCommandsList = NIL; + StringInfo copyShardDataCommand = makeStringInfo(); + + ddlCommandList = RecreateTableDDLCommandList(shardInterval->relationId); + + foreach(ddlCommandCell, ddlCommandList) + { + char *ddlCommand = lfirst(ddlCommandCell); + char *escapedDdlCommand = quote_literal_cstr(ddlCommand); + + StringInfo applyDdlCommand = makeStringInfo(); + appendStringInfo(applyDdlCommand, + WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, + shardInterval->shardId, escapedDdlCommand); + + copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, + applyDdlCommand->data); + } + + appendStringInfo(copyShardDataCommand, WORKER_APPEND_TABLE_TO_SHARD, + quote_literal_cstr(shardName), /* table to append */ + quote_literal_cstr(shardName), /* remote table name */ + quote_literal_cstr(sourceNodeName), /* remote host */ + sourceNodePort); /* remote port */ + + copyShardToNodeCommandsList = lappend(copyShardToNodeCommandsList, + copyShardDataCommand->data); + + return copyShardToNodeCommandsList; +} + + +/* + * ConstuctQualifiedShardName creates the fully qualified name string of the + * given shard in ._ format. + * + * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. + */ +static char * +ConstructQualifiedShardName(ShardInterval *shardInterval) +{ + Oid schemaId = get_rel_namespace(shardInterval->relationId); + char *schemaName = get_namespace_name(schemaId); + char *tableName = get_rel_name(shardInterval->relationId); + char *shardName = NULL; + + shardName = pstrdup(tableName); + AppendShardIdToName(&shardName, shardInterval->shardId); + shardName = quote_qualified_identifier(schemaName, shardName); + + return shardName; +} + + /* * RecreateTableDDLCommandList returns a list of DDL statements similar to that * returned by GetTableDDLEvents except that the list begins with a "DROP TABLE" @@ -237,45 +446,84 @@ RecreateTableDDLCommandList(Oid relationId) /* - * CopyDataFromFinalizedPlacement copies a the data for a shard (identified by - * a relation and shard identifier) from a healthy placement to one needing - * repair. The unhealthy placement must already have an empty relation in place - * to receive rows from the healthy placement. This function returns a boolean - * indicating success or failure. + * SendCommandListInSingleTransaction opens connection to the node with the given + * nodeName and nodePort. Then, the connection starts a transaction on the remote + * node and executes the commands in the transaction. The function raises error if + * any of the queries fails. + * + * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. */ -static bool -CopyDataFromFinalizedPlacement(Oid distributedTableId, int64 shardId, - ShardPlacement *healthyPlacement, - ShardPlacement *placementToRepair) +static void +SendCommandListInSingleTransaction(char *nodeName, int32 nodePort, List *commandList) { - const char *shardTableName = NULL; - const char *shardQualifiedName = NULL; - StringInfo copyRelationQuery = makeStringInfo(); - List *queryResultList = NIL; - bool copySuccessful = false; + char *nodeUser = CitusExtensionOwnerName(); + PGconn *workerConnection = NULL; + PGresult *queryResult = NULL; + ListCell *commandCell = NULL; - char *relationName = get_rel_name(distributedTableId); - Oid shardSchemaOid = get_rel_namespace(distributedTableId); - const char *shardSchemaName = get_namespace_name(shardSchemaOid); - - AppendShardIdToName(&relationName, shardId); - shardTableName = quote_identifier(relationName); - shardQualifiedName = quote_qualified_identifier(shardSchemaName, shardTableName); - - appendStringInfo(copyRelationQuery, WORKER_APPEND_TABLE_TO_SHARD, - quote_literal_cstr(shardQualifiedName), /* table to append */ - quote_literal_cstr(shardQualifiedName), /* remote table name */ - quote_literal_cstr(healthyPlacement->nodeName), /* remote host */ - healthyPlacement->nodePort); /* remote port */ - - queryResultList = ExecuteRemoteQuery(placementToRepair->nodeName, - placementToRepair->nodePort, - NULL, /* current user, just data manipulation */ - copyRelationQuery); - if (queryResultList != NIL) + workerConnection = ConnectToNode(nodeName, nodePort, nodeUser); + if (workerConnection == NULL) { - copySuccessful = true; + ereport(ERROR, (errmsg("could not open connection to %s:%d as %s", + nodeName, nodePort, nodeUser))); } - return copySuccessful; + /* start the transaction on the worker node */ + queryResult = PQexec(workerConnection, "BEGIN"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + + /* iterate over the commands and execute them in the same connection */ + foreach(commandCell, commandList) + { + char *commandString = lfirst(commandCell); + ExecStatusType resultStatus = PGRES_EMPTY_QUERY; + + queryResult = PQexec(workerConnection, commandString); + resultStatus = PQresultStatus(queryResult); + if (!(resultStatus == PGRES_SINGLE_TUPLE || resultStatus == PGRES_TUPLES_OK || + resultStatus == PGRES_COMMAND_OK)) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + } + + /* commit the transaction on the worker node */ + queryResult = PQexec(workerConnection, "COMMIT"); + if (PQresultStatus(queryResult) != PGRES_COMMAND_OK) + { + ReraiseRemoteError(workerConnection, queryResult); + } + + PQclear(queryResult); + + /* clear NULL result */ + PQgetResult(workerConnection); + + /* we no longer need this connection */ + PQfinish(workerConnection); +} + + +/* + * CitusExtensionOwnerName returns the name of the owner of the extension. + * + * FIXME: Copied from Citus-MX, should be removed once those changes checked-in to Citus. + */ +static char * +CitusExtensionOwnerName(void) +{ + Oid superUserId = CitusExtensionOwner(); + +#if (PG_VERSION_NUM < 90500) + return GetUserNameFromId(superUserId); +#else + return GetUserNameFromId(superUserId, false); +#endif } diff --git a/src/test/regress/expected/multi_colocated_shard_transfer.out b/src/test/regress/expected/multi_colocated_shard_transfer.out new file mode 100644 index 000000000..dc7c7e125 --- /dev/null +++ b/src/test/regress/expected/multi_colocated_shard_transfer.out @@ -0,0 +1,231 @@ +-- +-- MULTI_COLOCATED_SHARD_TRANSFER +-- +-- These tables are created in multi_colocation_utils test +-- test repair +-- manually set shardstate as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND (shardid = 1300000 OR shardid = 1300004); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300016; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300020; +-- test repairing colocated shards +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300000 | table1_group1 | 57638 | 1 | 3 + 1300000 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57638 | 1 | 3 + 1300004 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57638 | 1 | 1 +(16 rows) + +-- repair colocated shards +SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300000 | table1_group1 | 57638 | 1 | 1 + 1300000 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57638 | 1 | 1 +(16 rows) + +-- test repairing NOT colocated shard +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table5_groupX'::regclass +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300016 | table5_groupx | 57637 | 0 | 1 + 1300016 | table5_groupx | 57638 | 0 | 3 + 1300017 | table5_groupx | 57637 | 0 | 1 + 1300017 | table5_groupx | 57638 | 0 | 1 + 1300018 | table5_groupx | 57637 | 0 | 1 + 1300018 | table5_groupx | 57638 | 0 | 1 + 1300019 | table5_groupx | 57638 | 0 | 1 + 1300019 | table5_groupx | 57637 | 0 | 1 +(8 rows) + +-- repair NOT colocated shard +SELECT master_copy_shard_placement(1300016, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table5_groupX'::regclass +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300016 | table5_groupx | 57637 | 0 | 1 + 1300016 | table5_groupx | 57638 | 0 | 1 + 1300017 | table5_groupx | 57637 | 0 | 1 + 1300017 | table5_groupx | 57638 | 0 | 1 + 1300018 | table5_groupx | 57637 | 0 | 1 + 1300018 | table5_groupx | 57638 | 0 | 1 + 1300019 | table5_groupx | 57638 | 0 | 1 + 1300019 | table5_groupx | 57637 | 0 | 1 +(8 rows) + +-- test repairing shard in append distributed table +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table6_append'::regclass +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300020 | table6_append | 57637 | 0 | 1 + 1300020 | table6_append | 57638 | 0 | 3 + 1300021 | table6_append | 57637 | 0 | 1 + 1300021 | table6_append | 57638 | 0 | 1 +(4 rows) + +-- repair shard in append distributed table +SELECT master_copy_shard_placement(1300020, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + master_copy_shard_placement +----------------------------- + +(1 row) + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table6_append'::regclass +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300020 | table6_append | 57637 | 0 | 1 + 1300020 | table6_append | 57638 | 0 | 1 + 1300021 | table6_append | 57637 | 0 | 1 + 1300021 | table6_append | 57638 | 0 | 1 +(4 rows) + +-- test repair while all placements of one shard in colocation group is unhealthy +-- manually set shardstate as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1300000; +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300000 | table1_group1 | 57638 | 1 | 3 + 1300000 | table1_group1 | 57637 | 1 | 3 + 1300001 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57638 | 1 | 1 +(16 rows) + +-- repair while all placements of one shard in colocation group is unhealthy +SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); +ERROR: source placement must be in finalized state +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + shardid | logicalrelid | nodeport | colocationid | shardstate +---------+---------------+----------+--------------+------------ + 1300000 | table1_group1 | 57638 | 1 | 3 + 1300000 | table1_group1 | 57637 | 1 | 3 + 1300001 | table1_group1 | 57637 | 1 | 1 + 1300001 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57638 | 1 | 1 + 1300002 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57637 | 1 | 1 + 1300003 | table1_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57638 | 1 | 1 + 1300004 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57637 | 1 | 1 + 1300005 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57638 | 1 | 1 + 1300006 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57637 | 1 | 1 + 1300007 | table2_group1 | 57638 | 1 | 1 +(16 rows) + diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 33493d68d..1a15044b9 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -48,10 +48,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint) RETURNS bool AS 'citus' LANGUAGE C STRICT; -CREATE FUNCTION get_colocated_table_array(regclass) - RETURNS regclass[] - AS 'citus' - LANGUAGE C STRICT; CREATE FUNCTION get_colocated_shard_array(bigint) RETURNS BIGINT[] AS 'citus' @@ -71,7 +67,7 @@ SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); (1 row) -SELECT master_create_worker_shards('table1_group1', 4, 1); +SELECT master_create_worker_shards('table1_group1', 4, 2); master_create_worker_shards ----------------------------- @@ -84,7 +80,7 @@ SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); (1 row) -SELECT master_create_worker_shards('table2_group1', 4, 1); +SELECT master_create_worker_shards('table2_group1', 4, 2); master_create_worker_shards ----------------------------- @@ -97,7 +93,7 @@ SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); (1 row) -SELECT master_create_worker_shards('table3_group2', 4, 1); +SELECT master_create_worker_shards('table3_group2', 4, 2); master_create_worker_shards ----------------------------- @@ -110,7 +106,7 @@ SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); (1 row) -SELECT master_create_worker_shards('table4_group2', 4, 1); +SELECT master_create_worker_shards('table4_group2', 4, 2); master_create_worker_shards ----------------------------- @@ -123,7 +119,7 @@ SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); (1 row) -SELECT master_create_worker_shards('table5_groupX', 4, 1); +SELECT master_create_worker_shards('table5_groupX', 4, 2); master_create_worker_shards ----------------------------- diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 16423e901..60d14aa4d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -32,6 +32,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; ALTER EXTENSION citus UPDATE TO '6.0-5'; ALTER EXTENSION citus UPDATE TO '6.0-6'; +ALTER EXTENSION citus UPDATE TO '6.0-7'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_metadata_snapshot.out b/src/test/regress/expected/multi_metadata_snapshot.out index 329bb8fb0..26db19cc5 100644 --- a/src/test/regress/expected/multi_metadata_snapshot.out +++ b/src/test/regress/expected/multi_metadata_snapshot.out @@ -58,7 +58,7 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') - INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557) + INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (10 rows) @@ -78,7 +78,7 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE public.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE public.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's') - INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557) + INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (11 rows) @@ -100,7 +100,7 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') - INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557) + INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (12 rows) @@ -126,7 +126,7 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') - INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557) + INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (12 rows) @@ -145,7 +145,7 @@ SELECT unnest(master_metadata_snapshot()); ALTER TABLE mx_testing_schema.mx_test_table ADD CONSTRAINT mx_test_table_col_1_key UNIQUE (col_1) ALTER TABLE mx_testing_schema.mx_test_table OWNER TO postgres INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's') - INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 550),(1310001, 1, 0, 'localhost', 57638, 551),(1310002, 1, 0, 'localhost', 57637, 552),(1310003, 1, 0, 'localhost', 57638, 553),(1310004, 1, 0, 'localhost', 57637, 554),(1310005, 1, 0, 'localhost', 57638, 555),(1310006, 1, 0, 'localhost', 57637, 556),(1310007, 1, 0, 'localhost', 57638, 557) + INSERT INTO pg_dist_shard_placement (shardid, shardstate, shardlength, nodename, nodeport, placementid) VALUES (1310000, 1, 0, 'localhost', 57637, 549),(1310001, 1, 0, 'localhost', 57638, 550),(1310002, 1, 0, 'localhost', 57637, 551),(1310003, 1, 0, 'localhost', 57638, 552),(1310004, 1, 0, 'localhost', 57637, 553),(1310005, 1, 0, 'localhost', 57638, 554),(1310006, 1, 0, 'localhost', 57637, 555),(1310007, 1, 0, 'localhost', 57638, 556) INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647') (12 rows) diff --git a/src/test/regress/expected/multi_repair_shards.out b/src/test/regress/expected/multi_repair_shards.out index 29a5f96ec..e39e1e185 100644 --- a/src/test/regress/expected/multi_repair_shards.out +++ b/src/test/regress/expected/multi_repair_shards.out @@ -55,6 +55,8 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo -- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port; +-- we are done with dummyhost, it is safe to remove it +DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost'; -- get the data from the second placement SELECT * FROM customer_engagements; id | created_at | event_data @@ -93,4 +95,4 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :remotenewshar -- oops! we don't support repairing shards backed by foreign tables SELECT master_copy_shard_placement(:remotenewshardid, 'localhost', :worker_1_port, 'localhost', :worker_2_port); ERROR: cannot repair shard -DETAIL: Repairing shards backed by foreign tables is not supported. +DETAIL: Table remote_engagements is a foreign table. Repairing shards backed by foreign tables is not supported. diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index d42857cc5..0bcc143ad 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -177,5 +177,7 @@ test: multi_expire_table_cache # ---------- # multi_colocation_utils tests utility functions written for co-location feature & internal API +# multi_colocated_shard_transfer tests master_copy_shard_placement with colocated tables. # ---------- test: multi_colocation_utils +test: multi_colocated_shard_transfer diff --git a/src/test/regress/sql/multi_colocated_shard_transfer.sql b/src/test/regress/sql/multi_colocated_shard_transfer.sql new file mode 100644 index 000000000..5ce35e94e --- /dev/null +++ b/src/test/regress/sql/multi_colocated_shard_transfer.sql @@ -0,0 +1,114 @@ +-- +-- MULTI_COLOCATED_SHARD_TRANSFER +-- + +-- These tables are created in multi_colocation_utils test + +-- test repair +-- manually set shardstate as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND (shardid = 1300000 OR shardid = 1300004); +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300016; +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE nodeport = :worker_2_port AND shardid = 1300020; + + +-- test repairing colocated shards +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + +-- repair colocated shards +SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + + +-- test repairing NOT colocated shard +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table5_groupX'::regclass +ORDER BY s.shardid; + +-- repair NOT colocated shard +SELECT master_copy_shard_placement(1300016, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table5_groupX'::regclass +ORDER BY s.shardid; + + +-- test repairing shard in append distributed table +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table6_append'::regclass +ORDER BY s.shardid; + +-- repair shard in append distributed table +SELECT master_copy_shard_placement(1300020, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + p.logicalrelid = 'table6_append'::regclass +ORDER BY s.shardid; + + +-- test repair while all placements of one shard in colocation group is unhealthy +-- manually set shardstate as inactive +UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1300000; + +-- status before shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; + +-- repair while all placements of one shard in colocation group is unhealthy +SELECT master_copy_shard_placement(1300000, 'localhost', :worker_1_port, 'localhost', :worker_2_port); + +-- status after shard repair +SELECT s.shardid, s.logicalrelid::regclass, sp.nodeport, p.colocationid, sp.shardstate +FROM + pg_dist_partition p, pg_dist_shard s, pg_dist_shard_placement sp +WHERE + p.logicalrelid = s.logicalrelid AND + s.shardid = sp.shardid AND + colocationid = (SELECT colocationid FROM pg_dist_partition WHERE logicalrelid = 'table1_group1'::regclass) +ORDER BY s.shardid; diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 5d0afe27b..93684c6b5 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -57,11 +57,6 @@ CREATE FUNCTION shards_colocated(bigint, bigint) AS 'citus' LANGUAGE C STRICT; -CREATE FUNCTION get_colocated_table_array(regclass) - RETURNS regclass[] - AS 'citus' - LANGUAGE C STRICT; - CREATE FUNCTION get_colocated_shard_array(bigint) RETURNS BIGINT[] AS 'citus' @@ -79,23 +74,23 @@ CREATE FUNCTION find_shard_interval_index(bigint) -- create distributed table observe shard pruning CREATE TABLE table1_group1 ( id int ); SELECT master_create_distributed_table('table1_group1', 'id', 'hash'); -SELECT master_create_worker_shards('table1_group1', 4, 1); +SELECT master_create_worker_shards('table1_group1', 4, 2); CREATE TABLE table2_group1 ( id int ); SELECT master_create_distributed_table('table2_group1', 'id', 'hash'); -SELECT master_create_worker_shards('table2_group1', 4, 1); +SELECT master_create_worker_shards('table2_group1', 4, 2); CREATE TABLE table3_group2 ( id int ); SELECT master_create_distributed_table('table3_group2', 'id', 'hash'); -SELECT master_create_worker_shards('table3_group2', 4, 1); +SELECT master_create_worker_shards('table3_group2', 4, 2); CREATE TABLE table4_group2 ( id int ); SELECT master_create_distributed_table('table4_group2', 'id', 'hash'); -SELECT master_create_worker_shards('table4_group2', 4, 1); +SELECT master_create_worker_shards('table4_group2', 4, 2); CREATE TABLE table5_groupX ( id int ); SELECT master_create_distributed_table('table5_groupX', 'id', 'hash'); -SELECT master_create_worker_shards('table5_groupX', 4, 1); +SELECT master_create_worker_shards('table5_groupX', 4, 2); CREATE TABLE table6_append ( id int ); SELECT master_create_distributed_table('table6_append', 'id', 'append'); diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index f5fbd6357..77131a7c7 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -37,6 +37,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; ALTER EXTENSION citus UPDATE TO '6.0-5'; ALTER EXTENSION citus UPDATE TO '6.0-6'; +ALTER EXTENSION citus UPDATE TO '6.0-7'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_repair_shards.sql b/src/test/regress/sql/multi_repair_shards.sql index e7647a350..0fe2a1a62 100644 --- a/src/test/regress/sql/multi_repair_shards.sql +++ b/src/test/regress/sql/multi_repair_shards.sql @@ -55,6 +55,9 @@ SELECT master_copy_shard_placement(:newshardid, 'localhost', :worker_1_port, 'lo -- now, update first placement as unhealthy (and raise a notice) so that queries are not routed to there UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = :newshardid AND nodeport = :worker_1_port; +-- we are done with dummyhost, it is safe to remove it +DELETE FROM pg_dist_shard_placement WHERE nodename = 'dummyhost'; + -- get the data from the second placement SELECT * FROM customer_engagements;