From 982ad667535ffefaa45cdda5ef55eaf70592bbb9 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Tue, 6 Sep 2016 19:23:17 -0700 Subject: [PATCH] Introduce placement IDs. So far placements were assigned an Oid, but that was just used to track insertion order. It also did so incompletely, as it was not preserved across changes of the shard state. The behaviour around oid wraparound was also not entirely as intended. The newly introduced, explicitly assigned, IDs are preserved across shard-state changes. The prime goal of this change is not to improve ordering of task assignment policies, but to make it easier to reference shards. The newly introduced UpdateShardPlacementState() makes use of that, and so will the in-progress connection and transaction management changes. --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--6.0-4--6.0-5.sql | 28 ++++ src/backend/distributed/citus.control | 2 +- src/backend/distributed/commands/multi_copy.c | 3 +- .../executor/multi_router_executor.c | 10 +- .../master/master_delete_protocol.c | 3 +- .../master/master_metadata_utility.c | 103 ++++++++++++-- .../distributed/master/master_node_protocol.c | 36 ++++- .../master/master_stage_protocol.c | 10 +- .../planner/multi_physical_planner.c | 23 +++- .../distributed/test/distribution_metadata.c | 9 +- .../distributed/utils/citus_outfuncs.c | 2 +- .../distributed/utils/citus_readfuncs_95.c | 2 +- .../distributed/utils/metadata_cache.c | 13 ++ .../distributed/master_metadata_utility.h | 10 +- src/include/distributed/master_protocol.h | 5 +- src/include/distributed/metadata_cache.h | 1 + .../distributed/pg_dist_shard_placement.h | 4 +- src/include/distributed/relay_utility.h | 1 + src/test/regress/expected/multi_array_agg.out | 22 +-- .../expected/multi_cluster_management.out | 44 +++--- .../expected/multi_complex_expressions.out | 20 +-- src/test/regress/expected/multi_explain.out | 32 ++--- src/test/regress/expected/multi_extension.out | 1 + .../multi_large_table_join_planning.out | 40 +++--- .../multi_large_table_task_assignment.out | 48 +++---- .../regress/expected/multi_schema_support.out | 20 +-- src/test/regress/expected/multi_table_ddl.out | 4 +- .../regress/output/multi_outer_join.source | 130 +++++++++--------- .../regress/sql/multi_cluster_management.sql | 6 +- src/test/regress/sql/multi_extension.sql | 1 + src/test/regress/sql/multi_schema_support.sql | 4 +- 32 files changed, 413 insertions(+), 228 deletions(-) create mode 100644 src/backend/distributed/citus--6.0-4--6.0-5.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index 390849e39..1f4cad680 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -8,7 +8,7 @@ EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.2-1 5.2-2 5.2-3 5.2-4 \ - 6.0-1 6.0-2 6.0-3 6.0-4 + 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -66,6 +66,8 @@ $(EXTENSION)--6.0-3.sql: $(EXTENSION)--6.0-2.sql $(EXTENSION)--6.0-2--6.0-3.sql cat $^ > $@ $(EXTENSION)--6.0-4.sql: $(EXTENSION)--6.0-3.sql $(EXTENSION)--6.0-3--6.0-4.sql cat $^ > $@ +$(EXTENSION)--6.0-5.sql: $(EXTENSION)--6.0-4.sql $(EXTENSION)--6.0-4--6.0-5.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--6.0-4--6.0-5.sql b/src/backend/distributed/citus--6.0-4--6.0-5.sql new file mode 100644 index 000000000..27f974428 --- /dev/null +++ b/src/backend/distributed/citus--6.0-4--6.0-5.sql @@ -0,0 +1,28 @@ +/* citus--5.2-1--5.2-2.sql */ + +/* + * Replace oid column in pg_dist_shard_placement with an sequence column. + */ +CREATE SEQUENCE citus.pg_dist_shard_placement_placementid_seq + NO CYCLE; +ALTER SEQUENCE citus.pg_dist_shard_placement_placementid_seq + SET SCHEMA pg_catalog; +ALTER TABLE pg_catalog.pg_dist_shard_placement + ADD COLUMN placementid bigint; +-- keep existing oids, and update sequence to match max. +UPDATE pg_catalog.pg_dist_shard_placement SET placementid = oid; +ALTER TABLE pg_catalog.pg_dist_shard_placement + ALTER COLUMN placementid SET DEFAULT nextval('pg_catalog.pg_dist_shard_placement_placementid_seq'), + ALTER COLUMN placementid SET NOT NULL, + SET WITHOUT OIDS; +CREATE UNIQUE INDEX pg_dist_shard_placement_placementid_index +ON pg_catalog.pg_dist_shard_placement using btree(placementid); +SELECT setval('pg_catalog.pg_dist_shard_placement_placementid_seq', max(placementid)) +FROM pg_catalog.pg_dist_shard_placement; + +CREATE FUNCTION master_get_new_placementid() + RETURNS bigint + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_get_new_placementid$$; +COMMENT ON FUNCTION master_get_new_placementid() + IS 'fetch unique placementid'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 732479937..52c36064b 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '6.0-4' +default_version = '6.0-5' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index bf762cc7e..aa7e4c51d 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -1030,7 +1030,8 @@ OpenCopyTransactions(CopyStmt *copyStatement, ShardConnections *shardConnections DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + InsertShardPlacementRow(failedPlacement->shardId, failedPlacement->placementId, + FILE_INACTIVE, shardLength, failedPlacement->nodeName, failedPlacement->nodePort); } diff --git a/src/backend/distributed/executor/multi_router_executor.c b/src/backend/distributed/executor/multi_router_executor.c index 3f052870b..0eb4ad310 100644 --- a/src/backend/distributed/executor/multi_router_executor.c +++ b/src/backend/distributed/executor/multi_router_executor.c @@ -609,7 +609,9 @@ ExecuteTaskAndStoreResults(QueryDesc *queryDesc, Task *task, DeleteShardPlacementRow(failedPlacement->shardId, failedPlacement->nodeName, failedPlacement->nodePort); - InsertShardPlacementRow(failedPlacement->shardId, FILE_INACTIVE, shardLength, + InsertShardPlacementRow(failedPlacement->shardId, + failedPlacement->placementId, + FILE_INACTIVE, shardLength, failedPlacement->nodeName, failedPlacement->nodePort); } @@ -1406,9 +1408,11 @@ MarkRemainingInactivePlacements(void) uint64 shardId = shardConnSet->shardId; NodeConnectionKey *nodeKey = &participant->cacheKey; uint64 shardLength = 0; + uint64 placementId = INVALID_PLACEMENT_ID; - DeleteShardPlacementRow(shardId, nodeKey->nodeName, nodeKey->nodePort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, shardLength, + placementId = DeleteShardPlacementRow(shardId, nodeKey->nodeName, + nodeKey->nodePort); + InsertShardPlacementRow(shardId, placementId, FILE_INACTIVE, shardLength, nodeKey->nodeName, nodeKey->nodePort); } } diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index 292c7cf55..0ed584ed8 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -376,12 +376,13 @@ DropShards(Oid relationId, char *schemaName, char *relationName, foreach(lingeringPlacementCell, lingeringPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(lingeringPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; uint64 oldShardLength = placement->shardLength; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_TO_DELETE, oldShardLength, + InsertShardPlacementRow(shardId, placementId, FILE_TO_DELETE, oldShardLength, workerName, workerPort); ereport(WARNING, (errmsg("could not delete shard \"%s\" on node \"%s:%u\"", diff --git a/src/backend/distributed/master/master_metadata_utility.c b/src/backend/distributed/master/master_metadata_utility.c index 769fa2d81..4d625671b 100644 --- a/src/backend/distributed/master/master_metadata_utility.c +++ b/src/backend/distributed/master/master_metadata_utility.c @@ -20,6 +20,7 @@ #include "catalog/pg_type.h" #include "distributed/citus_nodes.h" #include "distributed/master_metadata_utility.h" +#include "distributed/master_protocol.h" #include "distributed/metadata_cache.h" #include "distributed/multi_join_order.h" #include "distributed/multi_logical_optimizer.h" @@ -315,9 +316,9 @@ ShardPlacementList(uint64 shardId) /* - * TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, and - * converts this tuple to an equivalent struct in memory. The function assumes - * the caller already has locks on the tuple, and doesn't perform any locking. + * TupleToShardPlacement takes in a heap tuple from pg_dist_shard_placement, + * and converts this tuple to in-memory struct. The function assumes the + * caller already has locks on the tuple, and doesn't perform any locking. */ ShardPlacement * TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) @@ -325,7 +326,8 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) ShardPlacement *shardPlacement = NULL; bool isNull = false; - Oid tupleOid = HeapTupleGetOid(heapTuple); + Datum placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid, + tupleDescriptor, &isNull); Datum shardId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardid, tupleDescriptor, &isNull); Datum shardLength = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_shardlength, @@ -336,11 +338,14 @@ TupleToShardPlacement(TupleDesc tupleDescriptor, HeapTuple heapTuple) tupleDescriptor, &isNull); Datum nodePort = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_nodeport, tupleDescriptor, &isNull); - - Assert(!HeapTupleHasNulls(heapTuple)); + if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement || + HeapTupleHasNulls(heapTuple)) + { + ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple"))); + } shardPlacement = CitusMakeNode(ShardPlacement); - shardPlacement->tupleOid = tupleOid; + shardPlacement->placementId = DatumGetInt64(placementId); shardPlacement->shardId = DatumGetInt64(shardId); shardPlacement->shardLength = DatumGetInt64(shardLength); shardPlacement->shardState = DatumGetUInt32(shardState); @@ -408,10 +413,12 @@ InsertShardRow(Oid relationId, uint64 shardId, char storageType, /* * InsertShardPlacementRow opens the shard placement system catalog, and inserts - * a new row with the given values into that system catalog. + * a new row with the given values into that system catalog. If placementId is + * INVALID_PLACEMENT_ID, a new placement id will be assigned. */ void -InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, +InsertShardPlacementRow(uint64 shardId, uint64 placementId, + char shardState, uint64 shardLength, char *nodeName, uint32 nodePort) { Relation pgDistShardPlacement = NULL; @@ -424,11 +431,16 @@ InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, memset(values, 0, sizeof(values)); memset(isNulls, false, sizeof(isNulls)); + if (placementId == INVALID_PLACEMENT_ID) + { + placementId = master_get_new_placementid(NULL); + } values[Anum_pg_dist_shard_placement_shardid - 1] = Int64GetDatum(shardId); values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState); values[Anum_pg_dist_shard_placement_shardlength - 1] = Int64GetDatum(shardLength); values[Anum_pg_dist_shard_placement_nodename - 1] = CStringGetTextDatum(nodeName); values[Anum_pg_dist_shard_placement_nodeport - 1] = Int64GetDatum(nodePort); + values[Anum_pg_dist_shard_placement_placementid - 1] = Int64GetDatum(placementId); /* open shard placement relation and insert new tuple */ pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); @@ -496,7 +508,7 @@ DeleteShardRow(uint64 shardId) * first (unique) row that corresponds to the given shardId and worker node, and * deletes this row. */ -void +uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) { Relation pgDistShardPlacement = NULL; @@ -506,8 +518,12 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) bool indexOK = true; HeapTuple heapTuple = NULL; bool heapTupleFound = false; + TupleDesc tupleDescriptor = NULL; + int64 placementId = INVALID_PLACEMENT_ID; + bool isNull = false; pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistShardPlacement); ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_shardid, BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(shardId)); @@ -519,8 +535,6 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) heapTuple = systable_getnext(scanDescriptor); while (HeapTupleIsValid(heapTuple)) { - TupleDesc tupleDescriptor = RelationGetDescr(pgDistShardPlacement); - ShardPlacement *placement = TupleToShardPlacement(tupleDescriptor, heapTuple); if (strncmp(placement->nodeName, workerName, WORKER_LENGTH) == 0 && placement->nodePort == workerPort) @@ -540,11 +554,73 @@ DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort) shardId, workerName, workerPort))); } + placementId = heap_getattr(heapTuple, Anum_pg_dist_shard_placement_placementid, + tupleDescriptor, &isNull); + if (HeapTupleHeaderGetNatts(heapTuple->t_data) != Natts_pg_dist_shard_placement || + HeapTupleHasNulls(heapTuple)) + { + ereport(ERROR, (errmsg("unexpected null in pg_dist_shard_placement_tuple"))); + } + simple_heap_delete(pgDistShardPlacement, &heapTuple->t_self); systable_endscan(scanDescriptor); CommandCounterIncrement(); heap_close(pgDistShardPlacement, RowExclusiveLock); + + return placementId; +} + + +/* + * UpdateShardPlacementState sets the shardState for the placement identified + * by placementId. + */ +void +UpdateShardPlacementState(uint64 placementId, char shardState) +{ + Relation pgDistShardPlacement = NULL; + SysScanDesc scanDescriptor = NULL; + ScanKeyData scanKey[1]; + int scanKeyCount = 1; + bool indexOK = true; + HeapTuple heapTuple = NULL; + TupleDesc tupleDescriptor = NULL; + Datum values[Natts_pg_dist_shard_placement]; + bool isnull[Natts_pg_dist_shard_placement]; + bool replace[Natts_pg_dist_shard_placement]; + + pgDistShardPlacement = heap_open(DistShardPlacementRelationId(), RowExclusiveLock); + tupleDescriptor = RelationGetDescr(pgDistShardPlacement); + ScanKeyInit(&scanKey[0], Anum_pg_dist_shard_placement_placementid, + BTEqualStrategyNumber, F_INT8EQ, Int64GetDatum(placementId)); + + scanDescriptor = systable_beginscan(pgDistShardPlacement, + DistShardPlacementPlacementidIndexId(), indexOK, + NULL, scanKeyCount, scanKey); + + heapTuple = systable_getnext(scanDescriptor); + if (!HeapTupleIsValid(heapTuple)) + { + ereport(ERROR, (errmsg("could not find valid entry for shard placement " + UINT64_FORMAT, + placementId))); + } + + memset(replace, 0, sizeof(replace)); + + values[Anum_pg_dist_shard_placement_shardstate - 1] = CharGetDatum(shardState); + isnull[Anum_pg_dist_shard_placement_shardstate - 1] = false; + replace[Anum_pg_dist_shard_placement_shardstate - 1] = true; + + heapTuple = heap_modify_tuple(heapTuple, tupleDescriptor, values, isnull, replace); + simple_heap_update(pgDistShardPlacement, &heapTuple->t_self, heapTuple); + + CatalogUpdateIndexes(pgDistShardPlacement, heapTuple); + CommandCounterIncrement(); + + systable_endscan(scanDescriptor); + heap_close(pgDistShardPlacement, NoLock); } @@ -794,7 +870,8 @@ master_stage_shard_placement_row(PG_FUNCTION_ARGS) EnsureTablePermissions(distributedRelationId, ACL_INSERT); /* finally insert placement */ - InsertShardPlacementRow(shardId, shardState, shardLength, nodeName, nodePort); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardLength, + nodeName, nodePort); heap_close(relation, NoLock); heap_close(pgDistShard, NoLock); diff --git a/src/backend/distributed/master/master_node_protocol.c b/src/backend/distributed/master/master_node_protocol.c index 72543659a..625f9202a 100644 --- a/src/backend/distributed/master/master_node_protocol.c +++ b/src/backend/distributed/master/master_node_protocol.c @@ -67,6 +67,7 @@ static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescripto PG_FUNCTION_INFO_V1(master_get_table_metadata); PG_FUNCTION_INFO_V1(master_get_table_ddl_events); PG_FUNCTION_INFO_V1(master_get_new_shardid); +PG_FUNCTION_INFO_V1(master_get_new_placementid); PG_FUNCTION_INFO_V1(master_get_local_first_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_round_robin_candidate_nodes); PG_FUNCTION_INFO_V1(master_get_active_worker_nodes); @@ -251,8 +252,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS) * ahead logs; writing to logs avoids the risk of having shardId collisions. * * Please note that the caller is still responsible for finalizing shard data - * and the shardId with the master node. Further note that this function relies - * on an internal sequence created in initdb to generate unique identifiers. + * and the shardId with the master node. * * NB: This can be called by any user; for now we have decided that that's * ok. We might want to restrict this to users part of a specific role or such @@ -280,6 +280,38 @@ master_get_new_shardid(PG_FUNCTION_ARGS) } +/* + * master_get_new_placementid allocates and returns a unique placementId for + * the placement to be created. This allocation occurs both in shared memory + * and in write ahead logs; writing to logs avoids the risk of having shardId + * collisions. + * + * NB: This can be called by any user; for now we have decided that that's + * ok. We might want to restrict this to users part of a specific role or such + * at some later point. + */ +Datum +master_get_new_placementid(PG_FUNCTION_ARGS) +{ + text *sequenceName = cstring_to_text(PLACEMENTID_SEQUENCE_NAME); + Oid sequenceId = ResolveRelationId(sequenceName); + Datum sequenceIdDatum = ObjectIdGetDatum(sequenceId); + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + Datum shardIdDatum = 0; + + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + /* generate new and unique shardId from sequence */ + shardIdDatum = DirectFunctionCall1(nextval_oid, sequenceIdDatum); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + PG_RETURN_DATUM(shardIdDatum); +} + + /* * master_get_local_first_candidate_nodes returns a set of candidate host names * and port numbers on which to place new shards. The function makes sure to diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 62d6fd44a..5c0c30629 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -292,12 +292,14 @@ master_append_table_to_shard(PG_FUNCTION_ARGS) foreach(failedPlacementCell, failedPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(failedPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; uint64 oldShardLength = placement->shardLength; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_INACTIVE, oldShardLength, + InsertShardPlacementRow(shardId, placementId, + FILE_INACTIVE, oldShardLength, workerName, workerPort); ereport(WARNING, (errmsg("could not append table to shard \"%s\" on node " @@ -398,7 +400,8 @@ CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventList, const RelayFileState shardState = FILE_FINALIZED; const uint64 shardSize = 0; - InsertShardPlacementRow(shardId, shardState, shardSize, nodeName, nodePort); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, shardState, shardSize, + nodeName, nodePort); placementsCreated++; } else @@ -537,11 +540,12 @@ UpdateShardStatistics(int64 shardId) foreach(shardPlacementCell, shardPlacementList) { ShardPlacement *placement = (ShardPlacement *) lfirst(shardPlacementCell); + uint64 placementId = placement->placementId; char *workerName = placement->nodeName; uint32 workerPort = placement->nodePort; DeleteShardPlacementRow(shardId, workerName, workerPort); - InsertShardPlacementRow(shardId, FILE_FINALIZED, shardSize, + InsertShardPlacementRow(shardId, placementId, FILE_FINALIZED, shardSize, workerName, workerPort); } diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index c0c7b833a..5842513b6 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -5101,7 +5101,7 @@ CompareTasksByShardId(const void *leftElement, const void *rightElement) /* * ActiveShardPlacementLists finds the active shard placement list for each task in - * the given task list, sorts each shard placement list by tuple insertion time, + * the given task list, sorts each shard placement list by shard creation time, * and adds the sorted placement list into a new list of lists. The function also * ensures a one-to-one mapping between each placement list in the new list of * lists and each task in the given task list. @@ -5122,7 +5122,7 @@ ActiveShardPlacementLists(List *taskList) /* filter out shard placements that reside in inactive nodes */ List *activeShardPlacementList = ActivePlacementList(shardPlacementList); - /* sort shard placements by their insertion time */ + /* sort shard placements by their creation time */ activeShardPlacementList = SortList(activeShardPlacementList, CompareShardPlacements); shardPlacementLists = lappend(shardPlacementLists, activeShardPlacementList); @@ -5142,12 +5142,21 @@ CompareShardPlacements(const void *leftElement, const void *rightElement) const ShardPlacement *leftPlacement = *((const ShardPlacement **) leftElement); const ShardPlacement *rightPlacement = *((const ShardPlacement **) rightElement); - Oid leftTupleOid = leftPlacement->tupleOid; - Oid rightTupleOid = rightPlacement->tupleOid; + uint64 leftPlacementId = leftPlacement->placementId; + uint64 rightPlacementId = rightPlacement->placementId; - /* tuples that are inserted earlier appear first */ - int tupleOidDiff = leftTupleOid - rightTupleOid; - return tupleOidDiff; + if (leftPlacementId < rightPlacementId) + { + return -1; + } + else if (leftPlacementId > rightPlacementId) + { + return 1; + } + else + { + return 0; + } } diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index 8c6f9e85a..b86035d0b 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -309,7 +309,8 @@ create_healthy_local_shard_placement_row(PG_FUNCTION_ARGS) int64 shardId = PG_GETARG_INT64(0); int64 shardLength = 0; - InsertShardPlacementRow(shardId, FILE_FINALIZED, shardLength, "localhost", 5432); + InsertShardPlacementRow(shardId, INVALID_PLACEMENT_ID, FILE_FINALIZED, shardLength, + "localhost", 5432); PG_RETURN_VOID(); } @@ -347,9 +348,11 @@ update_shard_placement_row_state(PG_FUNCTION_ARGS) bool successful = true; char *hostNameString = text_to_cstring(hostName); uint64 shardLength = 0; + uint64 placementId = INVALID_PLACEMENT_ID; - DeleteShardPlacementRow(shardId, hostNameString, hostPort); - InsertShardPlacementRow(shardId, shardState, shardLength, hostNameString, hostPort); + placementId = DeleteShardPlacementRow(shardId, hostNameString, hostPort); + InsertShardPlacementRow(shardId, placementId, shardState, shardLength, + hostNameString, hostPort); PG_RETURN_BOOL(successful); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 124185ff4..9109a114e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -431,7 +431,7 @@ _outShardPlacement(StringInfo str, const ShardPlacement *node) { WRITE_NODE_TYPE("SHARDPLACEMENT"); - WRITE_OID_FIELD(tupleOid); + WRITE_UINT64_FIELD(placementId); WRITE_UINT64_FIELD(shardId); WRITE_UINT64_FIELD(shardLength); WRITE_ENUM_FIELD(shardState, RelayFileState); diff --git a/src/backend/distributed/utils/citus_readfuncs_95.c b/src/backend/distributed/utils/citus_readfuncs_95.c index caf223c00..e0609f602 100644 --- a/src/backend/distributed/utils/citus_readfuncs_95.c +++ b/src/backend/distributed/utils/citus_readfuncs_95.c @@ -1478,7 +1478,7 @@ _readShardPlacement(void) { READ_LOCALS(ShardPlacement); - READ_OID_FIELD(tupleOid); + READ_UINT64_FIELD(placementId); READ_UINT64_FIELD(shardId); READ_UINT64_FIELD(shardLength); READ_ENUM_FIELD(shardState, RelayFileState); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 01e37c7db..34c127f8a 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -58,6 +58,7 @@ static Oid distPartitionColocationidIndexId = InvalidOid; static Oid distShardLogicalRelidIndexId = InvalidOid; static Oid distShardShardidIndexId = InvalidOid; static Oid distShardPlacementShardidIndexId = InvalidOid; +static Oid distShardPlacementPlacementidIndexId = InvalidOid; static Oid distShardPlacementNodeidIndexId = InvalidOid; static Oid extraDataContainerFuncId = InvalidOid; @@ -702,6 +703,17 @@ DistShardPlacementShardidIndexId(void) } +/* return oid of pg_dist_shard_placement_shardid_index */ +Oid +DistShardPlacementPlacementidIndexId(void) +{ + CachedRelationLookup("pg_dist_shard_placement_placementid_index", + &distShardPlacementPlacementidIndexId); + + return distShardPlacementPlacementidIndexId; +} + + /* return oid of pg_dist_shard_placement_nodeid_index */ Oid DistShardPlacementNodeidIndexId(void) @@ -1234,6 +1246,7 @@ InvalidateDistRelationCacheCallback(Datum argument, Oid relationId) distShardLogicalRelidIndexId = InvalidOid; distShardShardidIndexId = InvalidOid; distShardPlacementShardidIndexId = InvalidOid; + distShardPlacementPlacementidIndexId = InvalidOid; distNodeRelationId = InvalidOid; extraDataContainerFuncId = InvalidOid; } diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index 01a6d7c4a..17ee5f539 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -47,7 +47,7 @@ typedef struct ShardInterval typedef struct ShardPlacement { CitusNodeTag type; - Oid tupleOid; /* unique oid that implies this row's insertion order */ + uint64 placementId; /* sequence that implies this placement creation order */ uint64 shardId; uint64 shardLength; RelayFileState shardState; @@ -71,10 +71,14 @@ extern ShardPlacement * TupleToShardPlacement(TupleDesc tupleDesc, /* Function declarations to modify shard and shard placement data */ extern void InsertShardRow(Oid relationId, uint64 shardId, char storageType, text *shardMinValue, text *shardMaxValue); -extern void InsertShardPlacementRow(uint64 shardId, char shardState, uint64 shardLength, +extern void DeleteShardRow(uint64 shardId); +extern void InsertShardPlacementRow(uint64 shardId, uint64 placementId, + char shardState, uint64 shardLength, char *nodeName, uint32 nodePort); extern void DeleteShardRow(uint64 shardId); -extern void DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 workerPort); +extern void UpdateShardPlacementState(uint64 placementId, char shardState); +extern uint64 DeleteShardPlacementRow(uint64 shardId, char *workerName, uint32 + workerPort); /* Remaining metadata utility functions */ extern Node * BuildDistributionKeyFromColumnName(Relation distributedRelation, diff --git a/src/include/distributed/master_protocol.h b/src/include/distributed/master_protocol.h index 060f4c546..18c5451a4 100644 --- a/src/include/distributed/master_protocol.h +++ b/src/include/distributed/master_protocol.h @@ -48,8 +48,8 @@ /* Name of columnar foreign data wrapper */ #define CSTORE_FDW_NAME "cstore_fdw" -/* ShardId sequence name as defined in initdb.c */ #define SHARDID_SEQUENCE_NAME "pg_dist_shardid_seq" +#define PLACEMENTID_SEQUENCE_NAME "pg_dist_shard_placement_placementid_seq" /* Remote call definitions to help with data staging and deletion */ #define WORKER_APPLY_SHARD_DDL_COMMAND \ @@ -97,10 +97,11 @@ extern void CreateShardPlacements(Oid relationId, int64 shardId, List *ddlEventL int workerStartIndex, int replicationFactor); extern uint64 UpdateShardStatistics(int64 shardId); -/* Function declarations for generating metadata for shard creation */ +/* Function declarations for generating metadata for shard and placement creation */ extern Datum master_get_table_metadata(PG_FUNCTION_ARGS); extern Datum master_get_table_ddl_events(PG_FUNCTION_ARGS); extern Datum master_get_new_shardid(PG_FUNCTION_ARGS); +extern Datum master_get_new_placementid(PG_FUNCTION_ARGS); extern Datum master_get_local_first_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_round_robin_candidate_nodes(PG_FUNCTION_ARGS); extern Datum master_get_active_worker_nodes(PG_FUNCTION_ARGS); diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index 20731eca8..47fed3d95 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -75,6 +75,7 @@ extern Oid DistPartitionColocationidIndexId(void); extern Oid DistShardLogicalRelidIndexId(void); extern Oid DistShardShardidIndexId(void); extern Oid DistShardPlacementShardidIndexId(void); +extern Oid DistShardPlacementPlacementidIndexId(void); extern Oid DistShardPlacementNodeidIndexId(void); /* function oids */ diff --git a/src/include/distributed/pg_dist_shard_placement.h b/src/include/distributed/pg_dist_shard_placement.h index 41bc17218..8fdf74835 100644 --- a/src/include/distributed/pg_dist_shard_placement.h +++ b/src/include/distributed/pg_dist_shard_placement.h @@ -29,6 +29,7 @@ typedef struct FormData_pg_dist_shard_placement #ifdef CATALOG_VARLEN /* variable-length fields start here */ text nodename; /* remote node's host name */ int32 nodeport; /* remote node's port number */ + int64 placementid; /* global placementId on remote node (added later) */ #endif } FormData_pg_dist_shard_placement; @@ -43,12 +44,13 @@ typedef FormData_pg_dist_shard_placement *Form_pg_dist_shard_placement; * compiler constants for pg_dist_shard_placement * ---------------- */ -#define Natts_pg_dist_shard_placement 5 +#define Natts_pg_dist_shard_placement 6 #define Anum_pg_dist_shard_placement_shardid 1 #define Anum_pg_dist_shard_placement_shardstate 2 #define Anum_pg_dist_shard_placement_shardlength 3 #define Anum_pg_dist_shard_placement_nodename 4 #define Anum_pg_dist_shard_placement_nodeport 5 +#define Anum_pg_dist_shard_placement_placementid 6 #endif /* PG_DIST_SHARD_PLACEMENT_H */ diff --git a/src/include/distributed/relay_utility.h b/src/include/distributed/relay_utility.h index ba93ad429..ac7aeff56 100644 --- a/src/include/distributed/relay_utility.h +++ b/src/include/distributed/relay_utility.h @@ -23,6 +23,7 @@ /* Shard name and identifier related defines */ #define SHARD_NAME_SEPARATOR '_' #define INVALID_SHARD_ID 0 +#define INVALID_PLACEMENT_ID 0 /* * RelayFileState represents last known states of shards on a given node. We diff --git a/src/test/regress/expected/multi_array_agg.out b/src/test/regress/expected/multi_array_agg.out index b3450b767..b473b7967 100644 --- a/src/test/regress/expected/multi_array_agg.out +++ b/src/test/regress/expected/multi_array_agg.out @@ -98,10 +98,10 @@ SELECT l_quantity, count(*), avg(l_extendedprice), array_agg(l_orderkey) FROM li GROUP BY l_quantity ORDER BY l_quantity; l_quantity | count | avg | array_agg ------------+-------+-----------------------+-------------------------------------------------------------------------------------------------- - 1.00 | 17 | 1477.1258823529411765 | {8997,9026,9158,9184,9220,9222,9348,9383,9476,5543,5633,5634,5698,5766,5856,5857,5986} - 2.00 | 19 | 3078.4242105263157895 | {9030,9058,9123,9124,9188,9344,9441,9476,5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923} - 3.00 | 14 | 4714.0392857142857143 | {9124,9157,9184,9223,9254,9349,9414,9475,9477,5509,5543,5605,5606,5827} - 4.00 | 19 | 5929.7136842105263158 | {9091,9120,9281,9347,9382,9440,9473,5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985} + 1.00 | 17 | 1477.1258823529411765 | {5543,5633,5634,5698,5766,5856,5857,5986,8997,9026,9158,9184,9220,9222,9348,9383,9476} + 2.00 | 19 | 3078.4242105263157895 | {5506,5540,5573,5669,5703,5730,5798,5831,5893,5920,5923,9030,9058,9123,9124,9188,9344,9441,9476} + 3.00 | 14 | 4714.0392857142857143 | {5509,5543,5605,5606,5827,9124,9157,9184,9223,9254,9349,9414,9475,9477} + 4.00 | 19 | 5929.7136842105263158 | {5504,5507,5508,5511,5538,5764,5766,5826,5829,5862,5959,5985,9091,9120,9281,9347,9382,9440,9473} (4 rows) SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month @@ -109,10 +109,10 @@ SELECT l_quantity, array_agg(extract (month FROM o_orderdate)) AS my_month AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; l_quantity | my_month ------------+------------------------------------------------ - 1.00 | {7,7,4,7,4,2,6,3,5,9,5,7,5,9,11,11,4} - 2.00 | {7,6,6,10,1,12,6,5,11,10,8,5,5,12,3,11,7,11,5} - 3.00 | {10,6,7,8,5,8,9,11,3,4,9,8,11,7} - 4.00 | {11,6,2,8,2,6,10,1,5,6,11,12,10,9,6,1,2,5,1} + 1.00 | {9,5,7,5,9,11,11,4,7,7,4,7,4,2,6,3,5} + 2.00 | {11,10,8,5,5,12,3,11,7,11,5,7,6,6,10,1,12,6,5} + 3.00 | {4,9,8,11,7,10,6,7,8,5,8,9,11,3} + 4.00 | {1,5,6,11,12,10,9,6,1,2,5,1,11,6,2,8,2,6,10} (4 rows) SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity < 5 @@ -120,10 +120,10 @@ SELECT l_quantity, array_agg(l_orderkey * 2 + 1) FROM lineitem WHERE l_quantity AND l_orderkey > 5500 AND l_orderkey < 9500 GROUP BY l_quantity ORDER BY l_quantity; l_quantity | array_agg ------------+--------------------------------------------- - 1.00 | {18317,18445,11269,11397,11713,11715,11973} - 2.00 | {18061,18247,18953,11847} + 1.00 | {11269,11397,11713,11715,11973,18317,18445} + 2.00 | {11847,18061,18247,18953} 3.00 | {18249,18315,18699,18951,18955} - 4.00 | {18241,18765,11653,11659} + 4.00 | {11653,11659,18241,18765} (4 rows) -- Check that we can execute array_agg() with an expression containing NULL values diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index 45c143cfc..84e753c2d 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -72,17 +72,17 @@ SELECT master_create_worker_shards('cluster_management_test', 16, 1); (1 row) -- see that there are some active placements in the candidate node -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1220001 | 1 | 0 | localhost | 57638 - 1220003 | 1 | 0 | localhost | 57638 - 1220005 | 1 | 0 | localhost | 57638 - 1220007 | 1 | 0 | localhost | 57638 - 1220009 | 1 | 0 | localhost | 57638 - 1220011 | 1 | 0 | localhost | 57638 - 1220013 | 1 | 0 | localhost | 57638 - 1220015 | 1 | 0 | localhost | 57638 +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1220001 | 1 | localhost | 57638 + 1220003 | 1 | localhost | 57638 + 1220005 | 1 | localhost | 57638 + 1220007 | 1 | localhost | 57638 + 1220009 | 1 | localhost | 57638 + 1220011 | 1 | localhost | 57638 + 1220013 | 1 | localhost | 57638 + 1220015 | 1 | localhost | 57638 (8 rows) -- try to remove a node with active placements and see that node removal is failed @@ -97,17 +97,17 @@ SELECT master_get_active_worker_nodes(); -- mark all placements in the candidate node as inactive UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1220001 | 3 | 0 | localhost | 57638 - 1220003 | 3 | 0 | localhost | 57638 - 1220005 | 3 | 0 | localhost | 57638 - 1220007 | 3 | 0 | localhost | 57638 - 1220009 | 3 | 0 | localhost | 57638 - 1220011 | 3 | 0 | localhost | 57638 - 1220013 | 3 | 0 | localhost | 57638 - 1220015 | 3 | 0 | localhost | 57638 +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; + shardid | shardstate | nodename | nodeport +---------+------------+-----------+---------- + 1220001 | 3 | localhost | 57638 + 1220003 | 3 | localhost | 57638 + 1220005 | 3 | localhost | 57638 + 1220007 | 3 | localhost | 57638 + 1220009 | 3 | localhost | 57638 + 1220011 | 3 | localhost | 57638 + 1220013 | 3 | localhost | 57638 + 1220015 | 3 | localhost | 57638 (8 rows) -- try to remove a node with only inactive placements and see that node is removed diff --git a/src/test/regress/expected/multi_complex_expressions.out b/src/test/regress/expected/multi_complex_expressions.out index 2a46f4ea7..09ce8c246 100644 --- a/src/test/regress/expected/multi_complex_expressions.out +++ b/src/test/regress/expected/multi_complex_expressions.out @@ -444,16 +444,16 @@ LIMIT 10 OFFSET 20; DEBUG: push down of limit count: 30 l_partkey | o_custkey | l_quantity -----------+-----------+------------ - 135912 | 509 | 26.00 - 75351 | 1261 | 26.00 - 199475 | 1052 | 26.00 - 91309 | 8 | 26.00 - 53624 | 400 | 26.00 - 182736 | 1048 | 26.00 - 59694 | 163 | 26.00 - 20481 | 173 | 26.00 - 78748 | 1499 | 26.00 - 7614 | 1397 | 26.00 + 154380 | 421 | 26.00 + 103981 | 325 | 26.00 + 77886 | 817 | 26.00 + 147369 | 755 | 26.00 + 78175 | 1075 | 26.00 + 109784 | 1268 | 26.00 + 28635 | 1207 | 26.00 + 188845 | 554 | 26.00 + 189398 | 844 | 26.00 + 71383 | 1397 | 26.00 (10 rows) RESET client_min_messages; diff --git a/src/test/regress/expected/multi_explain.out b/src/test/regress/expected/multi_explain.out index f3f60a4ed..cc26c6c15 100644 --- a/src/test/regress/expected/multi_explain.out +++ b/src/test/regress/expected/multi_explain.out @@ -40,7 +40,7 @@ Distributed Query into pg_merge_job_570000 Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Sort Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570000_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570000_0 @@ -71,7 +71,7 @@ EXPLAIN (COSTS FALSE, FORMAT JSON) { "Node Type": "Seq Scan", "Parent Relationship": "Outer", - "Relation Name": "lineitem_290000", + "Relation Name": "lineitem_290001", "Alias": "lineitem" } ] @@ -140,7 +140,7 @@ EXPLAIN (COSTS FALSE, FORMAT XML) Seq Scan Outer - lineitem_290000 + lineitem_290001 lineitem @@ -206,7 +206,7 @@ EXPLAIN (COSTS FALSE, FORMAT YAML) Plans: - Node Type: "Seq Scan" Parent Relationship: "Outer" - Relation Name: "lineitem_290000" + Relation Name: "lineitem_290001" Alias: "lineitem" Master Query: @@ -238,7 +238,7 @@ Distributed Query into pg_merge_job_570006 Node: host=localhost port=57637 dbname=regression -> HashAggregate Group Key: l_quantity - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Sort Sort Key: COALESCE((sum((COALESCE((sum(intermediate_column_570006_1))::bigint, '0'::bigint))))::bigint, '0'::bigint), intermediate_column_570006_0 @@ -256,7 +256,7 @@ Distributed Query into pg_merge_job_570007 Node: host=localhost port=57637 dbname=regression -> Aggregate Output: sum(l_quantity), sum(l_quantity), count(l_quantity) - -> Seq Scan on public.lineitem_290000 lineitem + -> Seq Scan on public.lineitem_290001 lineitem Output: l_orderkey, l_partkey, l_suppkey, l_linenumber, l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus, l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode, l_comment Master Query -> Aggregate @@ -279,7 +279,7 @@ Distributed Query into pg_merge_job_570008 Sort Key: lineitem.l_quantity -> Hash Join Hash Cond: (lineitem.l_orderkey = orders.o_orderkey) - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Filter: (l_quantity < 5.0) -> Hash -> Seq Scan on orders_290008 orders @@ -296,7 +296,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Insert on lineitem_290000 -> Result -- Test update @@ -309,7 +309,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Update on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000 Recheck Cond: (l_orderkey = 1) @@ -325,7 +325,7 @@ Distributed Query Task Count: 1 Tasks Shown: All -> Task - Node: host=localhost port=57637 dbname=regression + Node: host=localhost port=57638 dbname=regression -> Delete on lineitem_290000 -> Bitmap Heap Scan on lineitem_290000 Recheck Cond: (l_orderkey = 1) @@ -361,7 +361,7 @@ Distributed Query into pg_merge_job_570012 Tasks Shown: One of 8 -> Task Node: host=localhost port=57637 dbname=regression - -> Seq Scan on lineitem_290000 lineitem + -> Seq Scan on lineitem_290001 lineitem Master Query -> Seq Scan on pg_merge_job_570012 -- Test all tasks output @@ -375,22 +375,22 @@ Distributed Query into pg_merge_job_570013 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290004 lineitem + -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290005 lineitem + -> Seq Scan on lineitem_290004 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290006 lineitem + -> Seq Scan on lineitem_290007 lineitem Filter: (l_orderkey > 9030) -> Task Node: host=localhost port=57638 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290007 lineitem + -> Seq Scan on lineitem_290006 lineitem Filter: (l_orderkey > 9030) Master Query -> Aggregate @@ -413,7 +413,7 @@ Distributed Query into pg_merge_job_570016 -> Task Node: host=localhost port=57637 dbname=regression -> Aggregate - -> Seq Scan on lineitem_290004 lineitem + -> Seq Scan on lineitem_290005 lineitem Filter: (l_orderkey > 9030) Master Query -> Aggregate diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 2084a1d94..0f0f9d0c5 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -30,6 +30,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; +ALTER EXTENSION citus UPDATE TO '6.0-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; \c diff --git a/src/test/regress/expected/multi_large_table_join_planning.out b/src/test/regress/expected/multi_large_table_join_planning.out index b11ea9162..b4ea8abe8 100644 --- a/src/test/regress/expected/multi_large_table_join_planning.out +++ b/src/test/regress/expected/multi_large_table_join_planning.out @@ -69,14 +69,14 @@ DEBUG: generated sql query for job 1250 and task 21 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290006 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" DEBUG: generated sql query for job 1250 and task 24 DETAIL: query string: "SELECT lineitem.l_partkey, orders.o_orderkey, lineitem.l_quantity, lineitem.l_extendedprice, orders.o_custkey FROM (lineitem_290007 lineitem JOIN orders_290009 orders ON ((lineitem.l_orderkey = orders.o_orderkey))) WHERE ((lineitem.l_partkey < 1000) AND (orders.o_totalprice > 10.0))" -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 18 to node localhost:57638 -DEBUG: assigned task 21 to node localhost:57637 -DEBUG: assigned task 24 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 18 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 24 to node localhost:57637 +DEBUG: assigned task 21 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [6001,7000] and [1,1000] DEBUG: generated sql query for job 1251 and task 3 @@ -107,8 +107,8 @@ DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 10 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 13 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: completed cleanup query for job 1252 DEBUG: completed cleanup query for job 1252 @@ -178,20 +178,20 @@ DEBUG: generated sql query for job 1253 and task 14 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290006 lineitem WHERE (l_quantity < 5.0)" DEBUG: generated sql query for job 1253 and task 16 DETAIL: query string: "SELECT l_partkey, l_suppkey FROM lineitem_290007 lineitem WHERE (l_quantity < 5.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 14 to node localhost:57637 -DEBUG: assigned task 16 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 DEBUG: generated sql query for job 1254 and task 2 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290008 orders WHERE (o_totalprice <> 4.0)" DEBUG: generated sql query for job 1254 and task 4 DETAIL: query string: "SELECT o_orderkey, o_shippriority FROM orders_290009 orders WHERE (o_totalprice <> 4.0)" -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 DEBUG: join prunable for task partitionId 0 and 3 diff --git a/src/test/regress/expected/multi_large_table_task_assignment.out b/src/test/regress/expected/multi_large_table_task_assignment.out index ed3c6d634..bb7e3a1c5 100644 --- a/src/test/regress/expected/multi_large_table_task_assignment.out +++ b/src/test/regress/expected/multi_large_table_task_assignment.out @@ -27,8 +27,8 @@ FROM WHERE o_custkey = c_custkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 DEBUG: join prunable for intervals [1,1000] and [1001,2000] DEBUG: join prunable for intervals [1,1000] and [6001,7000] DEBUG: join prunable for intervals [1001,2000] and [1,1000] @@ -41,8 +41,8 @@ DEBUG: pruning merge fetch taskId 4 DETAIL: Creating dependency on merge taskId 8 DEBUG: pruning merge fetch taskId 7 DETAIL: Creating dependency on merge taskId 11 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 6 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: CommitTransactionCommand count @@ -66,10 +66,10 @@ WHERE o_custkey = c_custkey AND o_orderkey = l_orderkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 15 to node localhost:57637 -DEBUG: assigned task 9 to node localhost:57638 -DEBUG: assigned task 18 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 +DEBUG: assigned task 9 to node localhost:57637 +DEBUG: assigned task 15 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 18 to node localhost:57638 DEBUG: assigned task 3 to node localhost:57637 DEBUG: assigned task 6 to node localhost:57638 DEBUG: join prunable for intervals [1,1509] and [2951,4455] @@ -156,12 +156,12 @@ DEBUG: pruning merge fetch taskId 55 DETAIL: Creating dependency on merge taskId 68 DEBUG: pruning merge fetch taskId 58 DETAIL: Creating dependency on merge taskId 68 -DEBUG: assigned task 3 to node localhost:57637 -DEBUG: assigned task 21 to node localhost:57638 -DEBUG: assigned task 9 to node localhost:57637 -DEBUG: assigned task 27 to node localhost:57638 -DEBUG: assigned task 33 to node localhost:57637 -DEBUG: assigned task 48 to node localhost:57638 +DEBUG: assigned task 21 to node localhost:57637 +DEBUG: assigned task 3 to node localhost:57638 +DEBUG: assigned task 27 to node localhost:57637 +DEBUG: assigned task 9 to node localhost:57638 +DEBUG: assigned task 48 to node localhost:57637 +DEBUG: assigned task 33 to node localhost:57638 DEBUG: assigned task 39 to node localhost:57637 DEBUG: assigned task 57 to node localhost:57638 DEBUG: propagating assignment from merge task 19 to constrained sql task 6 @@ -208,16 +208,16 @@ FROM WHERE l_partkey = c_nationkey; DEBUG: StartTransactionCommand -DEBUG: assigned task 2 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 8 to node localhost:57638 -DEBUG: assigned task 10 to node localhost:57637 -DEBUG: assigned task 12 to node localhost:57638 -DEBUG: assigned task 14 to node localhost:57637 -DEBUG: assigned task 16 to node localhost:57638 -DEBUG: assigned task 6 to node localhost:57637 -DEBUG: assigned task 4 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 2 to node localhost:57638 +DEBUG: assigned task 8 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 +DEBUG: assigned task 12 to node localhost:57637 +DEBUG: assigned task 10 to node localhost:57638 +DEBUG: assigned task 16 to node localhost:57637 +DEBUG: assigned task 14 to node localhost:57638 +DEBUG: assigned task 4 to node localhost:57637 +DEBUG: assigned task 6 to node localhost:57638 DEBUG: assigned task 2 to node localhost:57637 DEBUG: join prunable for task partitionId 0 and 1 DEBUG: join prunable for task partitionId 0 and 2 diff --git a/src/test/regress/expected/multi_schema_support.out b/src/test/regress/expected/multi_schema_support.out index 536178675..a79212f5b 100644 --- a/src/test/regress/expected/multi_schema_support.out +++ b/src/test/regress/expected/multi_schema_support.out @@ -790,11 +790,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh (1 row) -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1190000 | 1 | 8192 | localhost | 57638 - 1190000 | 1 | 0 | localhost | 57637 +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; + shardstate | nodename | nodeport +------------+-----------+---------- + 1 | localhost | 57638 + 1 | localhost | 57637 (2 rows) --test with search_path is set @@ -808,11 +808,11 @@ SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localh (1 row) -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+-----------+---------- - 1190000 | 1 | 8192 | localhost | 57638 - 1190000 | 1 | 0 | localhost | 57637 +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; + shardstate | nodename | nodeport +------------+-----------+---------- + 1 | localhost | 57638 + 1 | localhost | 57637 (2 rows) -- test master_apply_delete_command with schemas diff --git a/src/test/regress/expected/multi_table_ddl.out b/src/test/regress/expected/multi_table_ddl.out index 52940388d..7da6ce247 100644 --- a/src/test/regress/expected/multi_table_ddl.out +++ b/src/test/regress/expected/multi_table_ddl.out @@ -58,8 +58,8 @@ SELECT * FROM pg_dist_shard; (0 rows) SELECT * FROM pg_dist_shard_placement; - shardid | shardstate | shardlength | nodename | nodeport ----------+------------+-------------+----------+---------- + shardid | shardstate | shardlength | nodename | nodeport | placementid +---------+------------+-------------+----------+----------+------------- (0 rows) -- check that the extension now can be dropped (and recreated) diff --git a/src/test/regress/output/multi_outer_join.source b/src/test/regress/output/multi_outer_join.source index 24365317b..fa1296c80 100644 --- a/src/test/regress/output/multi_outer_join.source +++ b/src/test/regress/output/multi_outer_join.source @@ -353,16 +353,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] l_custkey | r_custkey | t_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | | 2 | | 3 | | @@ -378,6 +368,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- Right join with single shard right most table should error out @@ -399,16 +399,6 @@ FROM LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_left" ] t_custkey | r_custkey | l_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 11 | 11 | 11 12 | 12 | 12 13 | 13 | 13 @@ -419,6 +409,16 @@ LOG: join order: [ "multi_outer_join_right" ][ broadcast join "multi_outer_join 18 | 18 | 19 | 19 | 20 | 20 | + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (20 rows) -- Make it anti-join, should display values with l_custkey is null @@ -458,16 +458,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ] l_custkey | r_custkey -----------+----------- - 21 | 21 - 22 | 22 - 23 | 23 - 24 | 24 - 25 | 25 - 26 | 26 - 27 | 27 - 28 | 28 - 29 | 29 - 30 | 30 1 | 2 | 3 | @@ -488,6 +478,16 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer | 18 | 19 | 16 + 21 | 21 + 22 | 22 + 23 | 23 + 24 | 24 + 25 | 25 + 26 | 26 + 27 | 27 + 28 | 28 + 29 | 29 + 30 | 30 (30 rows) -- full outer join + anti (right) should work with 1-1 matched shards @@ -577,6 +577,11 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer_join_right" ][ broadcast join "multi_outer_join_third" ] l_custkey | r_custkey | t_custkey -----------+-----------+----------- + 11 | 11 | 11 + 12 | 12 | 12 + 13 | 13 | 13 + 14 | 14 | 14 + 15 | 15 | 15 21 | 21 | 21 22 | 22 | 22 23 | 23 | 23 @@ -587,11 +592,6 @@ LOG: join order: [ "multi_outer_join_left" ][ local partition join "multi_outer 28 | 28 | 28 29 | 29 | 29 30 | 30 | 30 - 11 | 11 | 11 - 12 | 12 | 12 - 13 | 13 | 13 - 14 | 14 | 14 - 15 | 15 | 15 (15 rows) -- inner (broadcast) join + 2 shards left (local) join should work @@ -604,16 +604,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] l_custkey | t_custkey | r_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | 1 | 2 | 2 | 3 | 3 | @@ -629,6 +619,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_ 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- inner (local) join + 2 shards left (dual partition) join should error out @@ -650,16 +650,6 @@ FROM LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ][ local partition join "multi_outer_join_right" ] l_custkey | t_custkey | r_custkey -----------+-----------+----------- - 21 | 21 | 21 - 22 | 22 | 22 - 23 | 23 | 23 - 24 | 24 | 24 - 25 | 25 | 25 - 26 | 26 | 26 - 27 | 27 | 27 - 28 | 28 | 28 - 29 | 29 | 29 - 30 | 30 | 30 1 | 1 | 2 | 2 | 3 | 3 | @@ -675,6 +665,16 @@ LOG: join order: [ "multi_outer_join_left" ][ broadcast join "multi_outer_join_ 13 | 13 | 13 14 | 14 | 14 15 | 15 | 15 + 21 | 21 | 21 + 22 | 22 | 22 + 23 | 23 | 23 + 24 | 24 | 24 + 25 | 25 | 25 + 26 | 26 | 26 + 27 | 27 | 27 + 28 | 28 | 28 + 29 | 29 | 29 + 30 | 30 | 30 (25 rows) -- inner (broadcast) join + 2 shards left (local) + anti join should work @@ -712,16 +712,6 @@ FROM LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_outer_join_left" ][ broadcast join "multi_outer_join_third" ] t_custkey ----------- - 21 - 22 - 23 - 24 - 25 - 26 - 27 - 28 - 29 - 30 11 12 13 @@ -732,6 +722,16 @@ LOG: join order: [ "multi_outer_join_right" ][ local partition join "multi_oute 18 19 20 + 21 + 22 + 23 + 24 + 25 + 26 + 27 + 28 + 29 + 30 (20 rows) -- flattened out subqueries with outer joins are not supported diff --git a/src/test/regress/sql/multi_cluster_management.sql b/src/test/regress/sql/multi_cluster_management.sql index b85188896..79e44d757 100644 --- a/src/test/regress/sql/multi_cluster_management.sql +++ b/src/test/regress/sql/multi_cluster_management.sql @@ -29,7 +29,7 @@ SELECT master_create_distributed_table('cluster_management_test', 'col_1', 'hash SELECT master_create_worker_shards('cluster_management_test', 16, 1); -- see that there are some active placements in the candidate node -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with active placements and see that node removal is failed SELECT master_remove_node('localhost', :worker_2_port); @@ -37,7 +37,7 @@ SELECT master_get_active_worker_nodes(); -- mark all placements in the candidate node as inactive UPDATE pg_dist_shard_placement SET shardstate=3 WHERE nodeport=:worker_2_port; -SELECT * FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; +SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE nodeport=:worker_2_port; -- try to remove a node with only inactive placements and see that node is removed SELECT master_remove_node('localhost', :worker_2_port); @@ -46,4 +46,4 @@ SELECT master_get_active_worker_nodes(); -- clean-up SELECT master_add_node('localhost', :worker_2_port); UPDATE pg_dist_shard_placement SET shardstate=1 WHERE nodeport=:worker_2_port; -DROP TABLE cluster_management_test; \ No newline at end of file +DROP TABLE cluster_management_test; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index ddcbb2cf5..53e4cd68b 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -35,6 +35,7 @@ ALTER EXTENSION citus UPDATE TO '6.0-1'; ALTER EXTENSION citus UPDATE TO '6.0-2'; ALTER EXTENSION citus UPDATE TO '6.0-3'; ALTER EXTENSION citus UPDATE TO '6.0-4'; +ALTER EXTENSION citus UPDATE TO '6.0-5'; -- drop extension an re-create in newest version DROP EXTENSION citus; diff --git a/src/test/regress/sql/multi_schema_support.sql b/src/test/regress/sql/multi_schema_support.sql index 384814fa3..cf88c63c0 100644 --- a/src/test/regress/sql/multi_schema_support.sql +++ b/src/test/regress/sql/multi_schema_support.sql @@ -504,7 +504,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; --test with search_path is set @@ -515,7 +515,7 @@ UPDATE pg_dist_shard_placement SET shardstate = 3 WHERE shardid = 1190000 and no SELECT master_copy_shard_placement(1190000, 'localhost', :worker_2_port, 'localhost', :worker_1_port); -- verify shardstate -SELECT * FROM pg_dist_shard_placement WHERE shardid = 1190000; +SELECT shardstate, nodename, nodeport FROM pg_dist_shard_placement WHERE shardid = 1190000; -- test master_apply_delete_command with schemas