diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 390849e39..1f4cad680 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -66,6 +66,8 @@ $(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql cat $^ > $@ $(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql cat $^ > $@ +$(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-4--6.0-5.sql b/src/backend/distributed/citus--6.0-4--6.0-5.sql new file mode 100644 index 000000000..27f974428 --- /dev/null +++ b/src/backend/distributed/citus--6.0-4--6.0-5.sql @@ -0,0 +1,28 @@ +/* citus--5.2-1--5.2-2.sql */ + +/* + * Replace oid column in pg_dist_shard_placement with an sequence column. + */ +CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq + NO CYCLE; +ALTER SEQUENCE citus.pg_dist_shard_placement_placementid_seq + SET SCHEMA pg_catalog; +ALTER TABLE pg_catalog.pg_dist_shard_placement + ADD COLUMN placementid bigint; +-- keep existing oids, and update sequence to match max. +UPDATE pg_catalog.pg_dist_shard_placement SET placementid = oid; +ALTER TABLE pg_catalog.pg_dist_shard_placement + ALTER COLUMN placementid SET DEFAULT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq'), + ALTER COLUMN placementid SET NOT NULL, + SET WITHOUT OIDS; +CREATE UNIQUE INDEX pg_dist_shard_placement_placementid_index +ON pg_catalog.pg_dist_shard_placement using btree(placementid); +SELECT setval('pg_catalog.pg_dist_shard_placement_placementid_seq', max(placementid)) +FROM pg_catalog.pg_dist_shard_placement; + +CREATE FUNCTION master_get_new_placementid() + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_get_new_placementid$$; +COMMENT ON FUNCTION master_get_new_placementid() + IS 'fetch unique placementid'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 732479937..52c36064b 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-4' +default_version = '6.0-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bf762cc7e..aa7e4c51d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1030,7 +1030,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + InsertShardPlacementRow(failedPlacement->shardId, failedPlacement->placementId, + FILE_INACTIVE, shardLength, failedPlacement->nodeName, failedPlacement->nodePort); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 3f052870b..0eb4ad310 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -609,7 +609,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + InsertShardPlacementRow(failedPlacement->shardId, + failedPlacement->placementId, + FILE_INACTIVE, shardLength, failedPlacement->nodeName, failedPlacement->nodePort); } @@ -1406,9 +1408,11 @@ MarkRemainingInactivePlacements(void) uint64 shardId = shardConnSet->shardId; NodeConnectionKey *nodeKey = &participant->cacheKey; uint64 shardLength = 0; + uint64 placementId = INVALID_PLACEMENT_ID; - DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength, + placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, + nodeKey->nodePort); + InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, nodeKey->nodeName, nodeKey->nodePort); } } diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 292c7cf55..0ed584ed8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -376,12 +376,13 @@ DropShards(Oid relationId, char *schemaName, char *relationName, foreach(lingeringPlacementCell, lingeringPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; uint64 oldShardLength = placement->shardLength; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_TO_DELETE, oldShardLength, + InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength, workerName, workerPort); ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 769fa2d81..4d625671b 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -20,6 +20,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" @@ -315,9 +316,9 @@ ShardPlacementList(uint64 shardId) /* - * TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, and - * converts this tuple to an equivalent struct in memory. The function assumes - * the caller already has locks on the tuple, and doesn't perform any locking. + * TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, + * and converts this tuple to in-memory struct. The function assumes the + * caller already has locks on the tuple, and doesn't perform any locking. */ ShardPlacement * TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) @@ -325,7 +326,8 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) ShardPlacement *shardPlacement = NULL; bool isNull = false; - Oid tupleOid = HeapTupleGetOid(heapTuple); + Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid, + tupleDescriptor, &isNull); Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid, tupleDescriptor, &isNull); Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength, @@ -336,11 +338,14 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport, tupleDescriptor, &isNull); - - Assert(!HeapTupleHasNulls(heapTuple)); + if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement || + HeapTupleHasNulls(heapTuple)) + { + ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple"))); + } shardPlacement = CitusMakeNode(ShardPlacement); - shardPlacement->tupleOid = tupleOid; + shardPlacement->placementId = DatumGetInt64(placementId); shardPlacement->shardId = DatumGetInt64(shardId); shardPlacement->shardLength = DatumGetInt64(shardLength); shardPlacement->shardState = DatumGetUInt32(shardState); @@ -408,10 +413,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, /* * InsertShardPlacementRow opens the shard placement system catalog, and inserts - * a new row with the given values into that system catalog. + * a new row with the given values into that system catalog. If placementId is + * INVALID_PLACEMENT_ID, a new placement id will be assigned. */ void -InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, +InsertShardPlacementRow(uint64 shardId, uint64 placementId, + char shardState, uint64 shardLength, char *nodeName, uint32 nodePort) { Relation pgDistShardPlacement = NULL; @@ -424,11 +431,16 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); + if (placementId == INVALID_PLACEMENT_ID) + { + placementId = master_get_new_placementid(NULL); + } values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState); values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength); values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort); + values[Anum_pg_dist_shard_placement_placementid - 1] = Int64GetDatum(placementId); /* open shard placement relation and insert new tuple */ pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); @@ -496,7 +508,7 @@ DeleteShardRow(uint64 shardId) * first (unique) row that corresponds to the given shardId and worker node, and * deletes this row. */ -void +uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) { Relation pgDistShardPlacement = NULL; @@ -506,8 +518,12 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) bool indexOK = true; HeapTuple heapTuple = NULL; bool heapTupleFound = false; + TupleDesc tupleDescriptor = NULL; + int64 placementId = INVALID_PLACEMENT_ID; + bool isNull = false; pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistShardPlacement); ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); @@ -519,8 +535,6 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardPlacement); - ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple); if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 && placement->nodePort == workerPort) @@ -540,11 +554,73 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) shardId, workerName, workerPort))); } + placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid, + tupleDescriptor, &isNull); + if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement || + HeapTupleHasNulls(heapTuple)) + { + ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple"))); + } + simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); systable_endscan(scanDescriptor); CommandCounterIncrement(); heap_close(pgDistShardPlacement, RowExclusiveLock); + + return placementId; +} + + +/* + * UpdateShardPlacementState sets the shardState for the placement identified + * by placementId. + */ +void +UpdateShardPlacementState(uint64 placementId, char shardState) +{ + Relation pgDistShardPlacement = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + Datum values[Natts_pg_dist_shard_placement]; + bool isnull[Natts_pg_dist_shard_placement]; + bool replace[Natts_pg_dist_shard_placement]; + + pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistShardPlacement); + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_placementid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); + + scanDescriptor = systable_beginscan(pgDistShardPlacement, + DistShardPlacementPlacementidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard placement " + UINT64_FORMAT, + placementId))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState); + isnull[Anum_pg_dist_shard_placement_shardstate - 1] = false; + replace[Anum_pg_dist_shard_placement_shardstate - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistShardPlacement, NoLock); } @@ -794,7 +870,8 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS) EnsureTablePermissions(distributedRelationId, ACL_INSERT); /* finally insert placement */ - InsertShardPlacementRow(shardId, shardState, shardLength, nodeName, nodePort); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength, + nodeName, nodePort); heap_close(relation, NoLock); heap_close(pgDistShard, NoLock); diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 72543659a..625f9202a 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -67,6 +67,7 @@ static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescripto PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_new_shardid); +PG_FUNCTION_INFO_V1(master_get_new_placementid); PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_active_worker_nodes); @@ -251,8 +252,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) * ahead logs; writing to logs avoids the risk of having shardId collisions. * * Please note that the caller is still responsible for finalizing shard data - * and the shardId with the master node. Further note that this function relies - * on an internal sequence created in initdb to generate unique identifiers. + * and the shardId with the master node. * * NB: This can be called by any user; for now we have decided that that's * ok. We might want to restrict this to users part of a specific role or such @@ -280,6 +280,38 @@ master_get_new_shardid(PG_FUNCTION_ARGS) } +/* + * master_get_new_placementid allocates and returns a unique placementId for + * the placement to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having shardId + * collisions. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +Datum +master_get_new_placementid(PG_FUNCTION_ARGS) +{ + text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum shardIdDatum = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(shardIdDatum); +} + + /* * master_get_local_first_candidate_nodes returns a set of candidate host names * and port numbers on which to place new shards. The function makes sure to diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 62d6fd44a..5c0c30629 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -292,12 +292,14 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) foreach(failedPlacementCell, failedPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; uint64 oldShardLength = placement->shardLength; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength, + InsertShardPlacementRow(shardId, placementId, + FILE_INACTIVE, oldShardLength, workerName, workerPort); ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " @@ -398,7 +400,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, const RelayFileState shardState = FILE_FINALIZED; const uint64 shardSize = 0; - InsertShardPlacementRow(shardId, shardState, shardSize, nodeName, nodePort); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, + nodeName, nodePort); placementsCreated++; } else @@ -537,11 +540,12 @@ UpdateShardStatistics(int64 shardId) foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize, + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, workerName, workerPort); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c0c7b833a..5842513b6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5101,7 +5101,7 @@ CompareTasksByShardId(const void *leftElement, const void *rightElement) /* * ActiveShardPlacementLists finds the active shard placement list for each task in - * the given task list, sorts each shard placement list by tuple insertion time, + * the given task list, sorts each shard placement list by shard creation time, * and adds the sorted placement list into a new list of lists. The function also * ensures a one-to-one mapping between each placement list in the new list of * lists and each task in the given task list. @@ -5122,7 +5122,7 @@ ActiveShardPlacementLists(List *taskList) /* filter out shard placements that reside in inactive nodes */ List *activeShardPlacementList = ActivePlacementList(shardPlacementList); - /* sort shard placements by their insertion time */ + /* sort shard placements by their creation time */ activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList); @@ -5142,12 +5142,21 @@ CompareShardPlacements(const void *leftElement, const void *rightElement) const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); - Oid leftTupleOid = leftPlacement->tupleOid; - Oid rightTupleOid = rightPlacement->tupleOid; + uint64 leftPlacementId = leftPlacement->placementId; + uint64 rightPlacementId = rightPlacement->placementId; - /* tuples that are inserted earlier appear first */ - int tupleOidDiff = leftTupleOid - rightTupleOid; - return tupleOidDiff; + if (leftPlacementId < rightPlacementId) + { + return -1; + } + else if (leftPlacementId > rightPlacementId) + { + return 1; + } + else + { + return 0; + } } diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 8c6f9e85a..b86035d0b 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -309,7 +309,8 @@ create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS) int64 shardId = PG_GETARG_INT64(0); int64 shardLength = 0; - InsertShardPlacementRow(shardId, FILE_FINALIZED, shardLength, "localhost", 5432); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, shardLength, + "localhost", 5432); PG_RETURN_VOID(); } @@ -347,9 +348,11 @@ update_shard_placement_row_state(PG_FUNCTION_ARGS) bool successful = true; char *hostNameString = text_to_cstring(hostName); uint64 shardLength = 0; + uint64 placementId = INVALID_PLACEMENT_ID; - DeleteShardPlacementRow(shardId, hostNameString, hostPort); - InsertShardPlacementRow(shardId, shardState, shardLength, hostNameString, hostPort); + placementId = DeleteShardPlacementRow(shardId, hostNameString, hostPort); + InsertShardPlacementRow(shardId, placementId, shardState, shardLength, + hostNameString, hostPort); PG_RETURN_BOOL(successful); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 124185ff4..9109a114e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -431,7 +431,7 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node) { WRITE_NODE_TYPE("SHARDPLACEMENT"); - WRITE_OID_FIELD(tupleOid); + WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); WRITE_ENUM_FIELD(shardState, RelayFileState); diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index caf223c00..e0609f602 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1478,7 +1478,7 @@ _readShardPlacement(void) { READ_LOCALS(ShardPlacement); - READ_OID_FIELD(tupleOid); + READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); READ_ENUM_FIELD(shardState, RelayFileState); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 01e37c7db..34c127f8a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,7 @@ static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid; static Oid distShardShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid; +static Oid distShardPlacementPlacementidIndexId = InvalidOid; static Oid distShardPlacementNodeidIndexId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid; @@ -702,6 +703,17 @@ DistShardPlacementShardidIndexId(void) } +/* return oid of pg_dist_shard_placement_shardid_index */ +Oid +DistShardPlacementPlacementidIndexId(void) +{ + CachedRelationLookup("pg_dist_shard_placement_placementid_index", + &distShardPlacementPlacementidIndexId); + + return distShardPlacementPlacementidIndexId; +} + + /* return oid of pg_dist_shard_placement_nodeid_index */ Oid DistShardPlacementNodeidIndexId(void) @@ -1234,6 +1246,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardLogicalRelidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; + distShardPlacementPlacementidIndexId = InvalidOid; distNodeRelationId = InvalidOid; extraDataContainerFuncId = InvalidOid; } diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 01a6d7c4a..17ee5f539 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -47,7 +47,7 @@ typedef struct ShardInterval typedef struct ShardPlacement { CitusNodeTag type; - Oid tupleOid; /* unique oid that implies this row's insertion order */ + uint64 placementId; /* sequence that implies this placement creation order */ uint64 shardId; uint64 shardLength; RelayFileState shardState; @@ -71,10 +71,14 @@ extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue); -extern void InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, +extern void DeleteShardRow(uint64 shardId); +extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId, + char shardState, uint64 shardLength, char *nodeName, uint32 nodePort); extern void DeleteShardRow(uint64 shardId); -extern void DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void UpdateShardPlacementState(uint64 placementId, char shardState); +extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 + workerPort); /* Remaining metadata utility functions */ extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation, diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 060f4c546..18c5451a4 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -48,8 +48,8 @@ /* Name of columnar foreign data wrapper */ #define CSTORE_FDW_NAME "cstore_fdw" -/* ShardId sequence name as defined in initdb.c */ #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" +#define PLACEMENTID_SEQUENCE_NAME "pg_dist_shard_placement_placementid_seq" /* Remote call definitions to help with data staging and deletion */ #define WORKER_APPLY_SHARD_DDL_COMMAND \ @@ -97,10 +97,11 @@ extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventL int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); -/* Function declarations for generating metadata for shard creation */ +/* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS); extern Datum master_get_new_shardid(PG_FUNCTION_ARGS); +extern Datum master_get_new_placementid(PG_FUNCTION_ARGS); extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 20731eca8..47fed3d95 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -75,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void); +extern Oid DistShardPlacementPlacementidIndexId(void); extern Oid DistShardPlacementNodeidIndexId(void); /* function oids */ diff --git a/src/include/distributed/pg_dist_shard_placement.h b/src/include/distributed/pg_dist_shard_placement.h index 41bc17218..8fdf74835 100644 --- a/src/include/distributed/pg_dist_shard_placement.h +++ b/src/include/distributed/pg_dist_shard_placement.h @@ -29,6 +29,7 @@ typedef struct FormData_pg_dist_shard_placement #ifdef CATALOG_VARLEN /* variable-length fields start here */ text nodename; /* remote node's host name */ int32 nodeport; /* remote node's port number */ + int64 placementid; /* global placementId on remote node (added later) */ #endif } FormData_pg_dist_shard_placement; @@ -43,12 +44,13 @@ typedef FormData_pg_dist_shard_placement *Form_pg_dist_shard_placement; * compiler constants for pg_dist_shard_placement * ---------------- */ -#define Natts_pg_dist_shard_placement 5 +#define Natts_pg_dist_shard_placement 6 #define Anum_pg_dist_shard_placement_shardid 1 #define Anum_pg_dist_shard_placement_shardstate 2 #define Anum_pg_dist_shard_placement_shardlength 3 #define Anum_pg_dist_shard_placement_nodename 4 #define Anum_pg_dist_shard_placement_nodeport 5 +#define Anum_pg_dist_shard_placement_placementid 6 #endif /* PG_DIST_SHARD_PLACEMENT_H */ diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index ba93ad429..ac7aeff56 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -23,6 +23,7 @@ /* Shard name and identifier related defines */ #define SHARD_NAME_SEPARATOR '_' #define INVALID_SHARD_ID 0 +#define INVALID_PLACEMENT_ID 0 /* * RelayFileState represents last known states of shards on a given node. We diff --git a/src/test/regress/expected/multi_array_agg.out b/src/test/regress/expected/multi_array_agg.out index b3450b767..b473b7967 100644 --- a/src/test/regress/expected/multi_array_agg.out +++ b/src/test/regress/expected/multi_array_agg.out @@ -98,10 +98,10 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li GROUP BY l_quantity ORDER BY l_quantity; l_quantity | count | avg | array_agg ------------+-------+-----------------------+-------------------------------------------------------------------------------------------------- - 1.00 | 17 | 1477.1258823529411765 | {8997,9026,9158,9184,9220,9222,9348,9383,9476,5543,5633,5634,5698,5766,5856,5857,5986} - 2.00 | 19 | 3078.4242105263157895 | {9030,9058,9123,9124,9188,9344,9441,9476,5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923} - 3.00 | 14 | 4714.0392857142857143 | {9124,9157,9184,9223,9254,9349,9414,9475,9477,5509,5543,5605,5606,5827} - 4.00 | 19 | 5929.7136842105263158 | {9091,9120,9281,9347,9382,9440,9473,5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985} + 1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476} + 2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476} + 3.00 | 14 | 4714.0392857142857143 | {5509,5543,5605,5606,5827,9124,9157,9184,9223,9254,9349,9414,9475,9477} + 4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473} (4 rows) SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month @@ -109,10 +109,10 @@ SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; l_quantity | my_month ------------+------------------------------------------------ - 1.00 | {7,7,4,7,4,2,6,3,5,9,5,7,5,9,11,11,4} - 2.00 | {7,6,6,10,1,12,6,5,11,10,8,5,5,12,3,11,7,11,5} - 3.00 | {10,6,7,8,5,8,9,11,3,4,9,8,11,7} - 4.00 | {11,6,2,8,2,6,10,1,5,6,11,12,10,9,6,1,2,5,1} + 1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5} + 2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5} + 3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3} + 4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10} (4 rows) SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5 @@ -120,10 +120,10 @@ SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; l_quantity | array_agg ------------+--------------------------------------------- - 1.00 | {18317,18445,11269,11397,11713,11715,11973} - 2.00 | {18061,18247,18953,11847} + 1.00 | {11269,11397,11713,11715,11973,18317,18445} + 2.00 | {11847,18061,18247,18953} 3.00 | {18249,18315,18699,18951,18955} - 4.00 | {18241,18765,11653,11659} + 4.00 | {11653,11659,18241,18765} (4 rows) -- Check that we can execute array_agg() with an expression containing NULL values diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 45c143cfc..84e753c2d 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -72,17 +72,17 @@ SELECT master_create_worker_shards('cluster_management_test', 16, 1); (1 row) -- see that there are some active placements in the candidate node -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1220001 | 1 | 0 | localhost | 57638 - 1220003 | 1 | 0 | localhost | 57638 - 1220005 | 1 | 0 | localhost | 57638 - 1220007 | 1 | 0 | localhost | 57638 - 1220009 | 1 | 0 | localhost | 57638 - 1220011 | 1 | 0 | localhost | 57638 - 1220013 | 1 | 0 | localhost | 57638 - 1220015 | 1 | 0 | localhost | 57638 +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1220001 | 1 | localhost | 57638 + 1220003 | 1 | localhost | 57638 + 1220005 | 1 | localhost | 57638 + 1220007 | 1 | localhost | 57638 + 1220009 | 1 | localhost | 57638 + 1220011 | 1 | localhost | 57638 + 1220013 | 1 | localhost | 57638 + 1220015 | 1 | localhost | 57638 (8 rows) -- try to remove a node with active placements and see that node removal is failed @@ -97,17 +97,17 @@ SELECT master_get_active_worker_nodes(); -- mark all placements in the candidate node as inactive UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1220001 | 3 | 0 | localhost | 57638 - 1220003 | 3 | 0 | localhost | 57638 - 1220005 | 3 | 0 | localhost | 57638 - 1220007 | 3 | 0 | localhost | 57638 - 1220009 | 3 | 0 | localhost | 57638 - 1220011 | 3 | 0 | localhost | 57638 - 1220013 | 3 | 0 | localhost | 57638 - 1220015 | 3 | 0 | localhost | 57638 +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1220001 | 3 | localhost | 57638 + 1220003 | 3 | localhost | 57638 + 1220005 | 3 | localhost | 57638 + 1220007 | 3 | localhost | 57638 + 1220009 | 3 | localhost | 57638 + 1220011 | 3 | localhost | 57638 + 1220013 | 3 | localhost | 57638 + 1220015 | 3 | localhost | 57638 (8 rows) -- try to remove a node with only inactive placements and see that node is removed diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 2a46f4ea7..09ce8c246 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -444,16 +444,16 @@ LIMIT 10 OFFSET 20; DEBUG: push down of limit count: 30 l_partkey | o_custkey | l_quantity -----------+-----------+------------ - 135912 | 509 | 26.00 - 75351 | 1261 | 26.00 - 199475 | 1052 | 26.00 - 91309 | 8 | 26.00 - 53624 | 400 | 26.00 - 182736 | 1048 | 26.00 - 59694 | 163 | 26.00 - 20481 | 173 | 26.00 - 78748 | 1499 | 26.00 - 7614 | 1397 | 26.00 + 154380 | 421 | 26.00 + 103981 | 325 | 26.00 + 77886 | 817 | 26.00 + 147369 | 755 | 26.00 + 78175 | 1075 | 26.00 + 109784 | 1268 | 26.00 + 28635 | 1207 | 26.00 + 188845 | 554 | 26.00 + 189398 | 844 | 26.00 + 71383 | 1397 | 26.00 (10 rows) RESET client_min_messages; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index f3f60a4ed..cc26c6c15 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -40,7 +40,7 @@ Distributed Query into pg_merge_job_570000 Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Sort Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0 @@ -71,7 +71,7 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Seq Scan", "Parent Relationship": "Outer", - "Relation Name": "lineitem_290000", + "Relation Name": "lineitem_290001", "Alias": "lineitem" } ] @@ -140,7 +140,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan Outer - lineitem_290000 + lineitem_290001 lineitem @@ -206,7 +206,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Seq Scan" Parent Relationship: "Outer" - Relation Name: "lineitem_290000" + Relation Name: "lineitem_290001" Alias: "lineitem" Master Query: @@ -238,7 +238,7 @@ Distributed Query into pg_merge_job_570006 Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Sort Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0 @@ -256,7 +256,7 @@ Distributed Query into pg_merge_job_570007 Node: host=localhost port=57637 dbname=regression -> Aggregate Output: sum(l_quantity), sum(l_quantity), count(l_quantity) - -> Seq Scan on public.lineitem_290000 lineitem + -> Seq Scan on public.lineitem_290001 lineitem Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment Master Query -> Aggregate @@ -279,7 +279,7 @@ Distributed Query into pg_merge_job_570008 Sort Key: lineitem.l_quantity -> Hash Join Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Filter: (l_quantity < 5.0) -> Hash -> Seq Scan on orders_290008 orders @@ -296,7 +296,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Insert on lineitem_290000 -> Result -- Test update @@ -309,7 +309,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Update on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000 Recheck Cond: (l_orderkey = 1) @@ -325,7 +325,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000 Recheck Cond: (l_orderkey = 1) @@ -361,7 +361,7 @@ Distributed Query into pg_merge_job_570012 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Seq Scan on pg_merge_job_570012 -- Test all tasks output @@ -375,22 +375,22 @@ Distributed Query into pg_merge_job_570013 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290004 lineitem + -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290005 lineitem + -> Seq Scan on lineitem_290004 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290006 lineitem + -> Seq Scan on lineitem_290007 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290007 lineitem + -> Seq Scan on lineitem_290006 lineitem Filter: (l_orderkey > 9030) Master Query -> Aggregate @@ -413,7 +413,7 @@ Distributed Query into pg_merge_job_570016 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290004 lineitem + -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) Master Query -> Aggregate diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2084a1d94..0f0f9d0c5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -30,6 +30,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; +ALTER EXTENSION citus UPDATE TO '6.0-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index b11ea9162..b4ea8abe8 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -69,14 +69,14 @@ DEBUG: generated sql query for job 1250 and task 21 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" DEBUG: generated sql query for job 1250 and task 24 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 21 to node localhost:57637 -DEBUG: assigned task 24 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 24 to node localhost:57637 +DEBUG: assigned task 21 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -107,8 +107,8 @@ DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 10 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 13 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: completed cleanup query for job 1252 DEBUG: completed cleanup query for job 1252 @@ -178,20 +178,20 @@ DEBUG: generated sql query for job 1253 and task 14 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)" DEBUG: generated sql query for job 1253 and task 16 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 14 to node localhost:57637 -DEBUG: assigned task 16 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)" DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_task_assignment.out b/src/test/regress/expected/multi_large_table_task_assignment.out index ed3c6d634..bb7e3a1c5 100644 --- a/src/test/regress/expected/multi_large_table_task_assignment.out +++ b/src/test/regress/expected/multi_large_table_task_assignment.out @@ -27,8 +27,8 @@ FROM WHERE o_custkey = c_custkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] @@ -41,8 +41,8 @@ DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 11 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: CommitTransactionCommand count @@ -66,10 +66,10 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 9 to node localhost:57638 -DEBUG: assigned task 18 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1509] and [2951,4455] @@ -156,12 +156,12 @@ DEBUG: pruning merge fetch taskId 55 DETAIL: Creating dependency on merge taskId 68 DEBUG: pruning merge fetch taskId 58 DETAIL: Creating dependency on merge taskId 68 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 21 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 27 to node localhost:57638 -DEBUG: assigned task 33 to node localhost:57637 -DEBUG: assigned task 48 to node localhost:57638 +DEBUG: assigned task 21 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 27 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 48 to node localhost:57637 +DEBUG: assigned task 33 to node localhost:57638 DEBUG: assigned task 39 to node localhost:57637 DEBUG: assigned task 57 to node localhost:57638 DEBUG: propagating assignment from merge task 19 to constrained sql task 6 @@ -208,16 +208,16 @@ FROM WHERE l_partkey = c_nationkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 14 to node localhost:57637 -DEBUG: assigned task 16 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57637 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 536178675..a79212f5b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -790,11 +790,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh (1 row) -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1190000 | 1 | 8192 | localhost | 57638 - 1190000 | 1 | 0 | localhost | 57637 +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; + shardstate | nodename | nodeport +------------+-----------+---------- + 1 | localhost | 57638 + 1 | localhost | 57637 (2 rows) --test with search_path is set @@ -808,11 +808,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh (1 row) -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1190000 | 1 | 8192 | localhost | 57638 - 1190000 | 1 | 0 | localhost | 57637 +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; + shardstate | nodename | nodeport +------------+-----------+---------- + 1 | localhost | 57638 + 1 | localhost | 57637 (2 rows) -- test master_apply_delete_command with schemas diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 52940388d..7da6ce247 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -58,8 +58,8 @@ SELECT * FROM pg_dist_shard; (0 rows) SELECT * FROM pg_dist_shard_placement; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+----------+---------- + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- (0 rows) -- check that the extension now can be dropped (and recreated) diff --git a/src/test/regress/output/multi_outer_join.source b/src/test/regress/output/multi_outer_join.source index 24365317b..fa1296c80 100644 --- a/src/test/regress/output/multi_outer_join.source +++ b/src/test/regress/output/multi_outer_join.source @@ -353,16 +353,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] l_custkey | r_custkey | t_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | | 2 | | 3 | | @@ -378,6 +368,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- Right join with single shard right most table should error out @@ -399,16 +399,6 @@ FROM LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ] t_custkey | r_custkey | l_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 11 | 11 | 11 12 | 12 | 12 13 | 13 | 13 @@ -419,6 +409,16 @@ LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join 18 | 18 | 19 | 19 | 20 | 20 | + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (20 rows) -- Make it anti-join, should display values with l_custkey is null @@ -458,16 +458,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] l_custkey | r_custkey -----------+----------- - 21 | 21 - 22 | 22 - 23 | 23 - 24 | 24 - 25 | 25 - 26 | 26 - 27 | 27 - 28 | 28 - 29 | 29 - 30 | 30 1 | 2 | 3 | @@ -488,6 +478,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer | 18 | 19 | 16 + 21 | 21 + 22 | 22 + 23 | 23 + 24 | 24 + 25 | 25 + 26 | 26 + 27 | 27 + 28 | 28 + 29 | 29 + 30 | 30 (30 rows) -- full outer join + anti (right) should work with 1-1 matched shards @@ -577,6 +577,11 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] l_custkey | r_custkey | t_custkey -----------+-----------+----------- + 11 | 11 | 11 + 12 | 12 | 12 + 13 | 13 | 13 + 14 | 14 | 14 + 15 | 15 | 15 21 | 21 | 21 22 | 22 | 22 23 | 23 | 23 @@ -587,11 +592,6 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer 28 | 28 | 28 29 | 29 | 29 30 | 30 | 30 - 11 | 11 | 11 - 12 | 12 | 12 - 13 | 13 | 13 - 14 | 14 | 14 - 15 | 15 | 15 (15 rows) -- inner (broadcast) join + 2 shards left (local) join should work @@ -604,16 +604,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] l_custkey | t_custkey | r_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | 1 | 2 | 2 | 3 | 3 | @@ -629,6 +619,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_ 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- inner (local) join + 2 shards left (dual partition) join should error out @@ -650,16 +650,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] l_custkey | t_custkey | r_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | 1 | 2 | 2 | 3 | 3 | @@ -675,6 +665,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_ 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- inner (broadcast) join + 2 shards left (local) + anti join should work @@ -712,16 +712,6 @@ FROM LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ] t_custkey ----------- - 21 - 22 - 23 - 24 - 25 - 26 - 27 - 28 - 29 - 30 11 12 13 @@ -732,6 +722,16 @@ LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_oute 18 19 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 (20 rows) -- flattened out subqueries with outer joins are not supported diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index b85188896..79e44d757 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -29,7 +29,7 @@ SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash SELECT master_create_worker_shards('cluster_management_test', 16, 1); -- see that there are some active placements in the candidate node -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with active placements and see that node removal is failed SELECT master_remove_node('localhost', :worker_2_port); @@ -37,7 +37,7 @@ SELECT master_get_active_worker_nodes(); -- mark all placements in the candidate node as inactive UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with only inactive placements and see that node is removed SELECT master_remove_node('localhost', :worker_2_port); @@ -46,4 +46,4 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; -DROP TABLE cluster_management_test; \ No newline at end of file +DROP TABLE cluster_management_test; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index ddcbb2cf5..53e4cd68b 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; +ALTER EXTENSION citus UPDATE TO '6.0-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 384814fa3..cf88c63c0 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -504,7 +504,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; --test with search_path is set @@ -515,7 +515,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; -- test master_apply_delete_command with schemas