diff --git a/src/backend/distributed/citus--8.3-1--8.4-1.sql b/src/backend/distributed/citus--8.3-1--8.4-1.sql index d21f1b5c5..633404eee 100644 --- a/src/backend/distributed/citus--8.3-1--8.4-1.sql +++ b/src/backend/distributed/citus--8.3-1--8.4-1.sql @@ -1,6 +1,102 @@ /* citus--8.3-1--8.4-1 */ /* bump version to 8.4-1 */ +CREATE SCHEMA IF NOT EXISTS citus_internal; + +-- move citus internal functions to citus_internal to make space in the citus schema for +-- our public interface +ALTER FUNCTION citus.find_groupid_for_node SET SCHEMA citus_internal; +ALTER FUNCTION citus.pg_dist_node_trigger_func SET SCHEMA citus_internal; +ALTER FUNCTION citus.pg_dist_shard_placement_trigger_func SET SCHEMA citus_internal; +ALTER FUNCTION citus.refresh_isolation_tester_prepared_statement SET SCHEMA citus_internal; +ALTER FUNCTION citus.replace_isolation_tester_func SET SCHEMA citus_internal; +ALTER FUNCTION citus.restore_isolation_tester_func SET SCHEMA citus_internal; + +CREATE OR REPLACE FUNCTION citus_internal.pg_dist_shard_placement_trigger_func() +RETURNS TRIGGER AS $$ + BEGIN + IF (TG_OP = 'DELETE') THEN + DELETE FROM pg_dist_placement WHERE placementid = OLD.placementid; + RETURN OLD; + ELSIF (TG_OP = 'UPDATE') THEN + UPDATE pg_dist_placement + SET shardid = NEW.shardid, shardstate = NEW.shardstate, + shardlength = NEW.shardlength, placementid = NEW.placementid, + groupid = citus_internal.find_groupid_for_node(NEW.nodename, NEW.nodeport) + WHERE placementid = OLD.placementid; + RETURN NEW; + ELSIF (TG_OP = 'INSERT') THEN + INSERT INTO pg_dist_placement + (placementid, shardid, shardstate, shardlength, groupid) + VALUES (NEW.placementid, NEW.shardid, NEW.shardstate, NEW.shardlength, + citus_internal.find_groupid_for_node(NEW.nodename, NEW.nodeport)); + RETURN NEW; + END IF; + END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, objid oid, objsubid int) + RETURNS void + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_unmark_object_distributed$$; +COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, objid oid, objsubid int) + IS 'remove an object address from citus.pg_dist_object once the object has been deleted'; + +CREATE TABLE citus.pg_dist_object ( + classid oid NOT NULL, + objid oid NOT NULL, + objsubid integer NOT NULL, + + -- fields used for upgrades + type text DEFAULT NULL, + object_names text[] DEFAULT NULL, + object_args text[] DEFAULT NULL, + + CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid) +); + +CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger() + RETURNS event_trigger + LANGUAGE plpgsql + SET search_path = pg_catalog + AS $cdbdt$ +DECLARE + v_obj record; + sequence_names text[] := '{}'; + table_colocation_id integer; + propagate_drop boolean := false; +BEGIN + -- collect set of dropped sequences to drop on workers later + SELECT array_agg(object_identity) INTO sequence_names + FROM pg_event_trigger_dropped_objects() + WHERE object_type = 'sequence'; + + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() + WHERE object_type IN ('table', 'foreign table') + LOOP + -- first drop the table and metadata on the workers + -- then drop all the shards on the workers + -- finally remove the pg_dist_partition entry on the coordinator + PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name); + PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name); + PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name); + END LOOP; + + IF cardinality(sequence_names) > 0 THEN + PERFORM master_drop_sequences(sequence_names); + END IF; + + -- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects + FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects() + LOOP + PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid); + END LOOP; +END; +$cdbdt$; +COMMENT ON FUNCTION pg_catalog.citus_drop_trigger() + IS 'perform checks and actions at the end of DROP actions'; + + CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade() RETURNS void LANGUAGE plpgsql @@ -21,6 +117,10 @@ BEGIN -- enterprise catalog tables CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo; CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo; + + -- store upgrade stable identifiers on pg_dist_object catalog + UPDATE citus.pg_dist_object + SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid)); END; $cppu$; @@ -102,6 +202,23 @@ BEGIN 'n' as deptype FROM pg_catalog.pg_dist_partition p; + -- restore pg_dist_object from the stable identifiers + WITH old_records AS ( + DELETE FROM + citus.pg_dist_object + RETURNING + type, + object_names, + object_args + ) + INSERT INTO citus.pg_dist_object (classid, objid, objsubid) + SELECT + address.classid, + address.objid, + address.objsubid + FROM + old_records naming, + pg_get_object_address(naming.type, naming.object_names, naming.object_args) address; END; $cppu$; diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index d144c2ada..0e5c1edd4 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -132,6 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS) char distributionMethod = 0; char *colocateWithTableName = NULL; bool viaDeprecatedAPI = true; + ObjectAddress tableAddress = { 0 }; Relation relation = NULL; @@ -140,12 +141,13 @@ master_create_distributed_table(PG_FUNCTION_ARGS) EnsureTableOwner(relationId); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(relationId); + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); /* * Lock target relation with an exclusive lock - there's no way to make @@ -193,6 +195,7 @@ create_distributed_table(PG_FUNCTION_ARGS) text *distributionColumnText = NULL; Oid distributionMethodOid = InvalidOid; text *colocateWithTableNameText = NULL; + ObjectAddress tableAddress = { 0 }; Relation relation = NULL; char *distributionColumnName = NULL; @@ -214,12 +217,13 @@ create_distributed_table(PG_FUNCTION_ARGS) EnsureTableOwner(relationId); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(relationId); + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); /* * Lock target relation with an exclusive lock - there's no way to make @@ -272,6 +276,7 @@ create_reference_table(PG_FUNCTION_ARGS) List *workerNodeList = NIL; int workerCount = 0; Var *distributionColumn = NULL; + ObjectAddress tableAddress = { 0 }; bool viaDeprecatedAPI = false; @@ -280,12 +285,13 @@ create_reference_table(PG_FUNCTION_ARGS) EnsureTableOwner(relationId); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(relationId); + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); /* * Lock target relation with an exclusive lock - there's no way to make diff --git a/src/backend/distributed/commands/dependencies.c b/src/backend/distributed/commands/dependencies.c new file mode 100644 index 000000000..18c336b5a --- /dev/null +++ b/src/backend/distributed/commands/dependencies.c @@ -0,0 +1,240 @@ +/*------------------------------------------------------------------------- + * + * dependencies.c + * Commands to create dependencies of an object on all workers. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "catalog/dependency.h" +#include "catalog/objectaddress.h" +#include "distributed/commands.h" +#include "distributed/connection_management.h" +#include "distributed/metadata/dependency.h" +#include "distributed/metadata/distobject.h" +#include "distributed/metadata_sync.h" +#include "distributed/remote_commands.h" +#include "distributed/worker_manager.h" +#include "storage/lmgr.h" + +static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency); + + +/* + * EnsureDependenciesExists finds all the dependencies that we support and makes sure + * these are available on all workers. If not available they will be created on the + * workers via a separate session that will be committed directly so that the objects are + * visible to potentially multiple sessions creating the shards. + * + * Note; only the actual objects are created via a separate session, the local records to + * pg_dist_object are created in this session. As a side effect the objects could be + * created on the workers without a catalog entry on the coordinator. Updates to the + * objects on the coordinator are not propagated to the workers until the record is + * visible on the coordinator. + * + * This is solved by creating the dependencies in an idempotent manner, either via + * postgres native CREATE IF NOT EXISTS, or citus helper functions. + */ +void +EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target) +{ + const uint32 connectionFlag = FORCE_NEW_CONNECTION; + + /* local variables to work with dependencies */ + List *dependencies = NIL; + ListCell *dependencyCell = NULL; + + /* local variables to collect ddl commands */ + List *ddlCommands = NULL; + + /* local variables to work with worker nodes */ + List *workerNodeList = NULL; + ListCell *workerNodeCell = NULL; + List *connections = NULL; + ListCell *connectionCell = NULL; + + /* + * collect all dependencies in creation order and get their ddl commands + */ + dependencies = GetDependenciesForObject(target); + foreach(dependencyCell, dependencies) + { + ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell); + ddlCommands = list_concat(ddlCommands, + GetDependencyCreateDDLCommands(dependency)); + } + if (list_length(ddlCommands) <= 0) + { + /* no ddl commands to be executed */ + return; + } + + /* + * Make sure that no new nodes are added after this point until the end of the + * transaction by taking a RowShareLock on pg_dist_node, which conflicts with the + * ExclusiveLock taken by master_add_node. + * This guarantees that all active nodes will have the object, because they will + * either get it now, or get it in master_add_node after this transaction finishes and + * the pg_dist_object record becomes visible. + */ + LockRelationOid(DistNodeRelationId(), RowShareLock); + + /* + * right after we acquired the lock we mark our objects as distributed, these changes + * will not become visible before we have successfully created all the objects on our + * workers. + * + * It is possible to create distributed tables which depend on other dependencies + * before any node is in the cluster. If we would wait till we actually had connected + * to the nodes before marking the objects as distributed these objects would never be + * created on the workers when they get added, causing shards to fail to create. + */ + foreach(dependencyCell, dependencies) + { + ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell); + MarkObjectDistributed(dependency); + } + + /* + * collect and connect to all applicable nodes + */ + workerNodeList = ActivePrimaryNodeList(); + if (list_length(workerNodeList) <= 0) + { + /* no nodes to execute on */ + return; + } + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + MultiConnection *connection = NULL; + + char *nodeName = workerNode->workerName; + uint32 nodePort = workerNode->workerPort; + + connection = StartNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, + CitusExtensionOwnerName(), NULL); + + connections = lappend(connections, connection); + } + FinishConnectionListEstablishment(connections); + + /* + * create dependency on all nodes + */ + foreach(connectionCell, connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + ExecuteCriticalRemoteCommandList(connection, ddlCommands); + } + + /* + * disconnect from nodes + */ + foreach(connectionCell, connections) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + CloseConnection(connection); + } +} + + +/* + * GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl + * commands to execute on a worker to create the object. + */ +static List * +GetDependencyCreateDDLCommands(const ObjectAddress *dependency) +{ + switch (getObjectClass(dependency)) + { + case OCLASS_SCHEMA: + { + const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId); + + if (schemaDDLCommand == NULL) + { + /* no schema to create */ + return NIL; + } + + return list_make1((void *) schemaDDLCommand); + } + + default: + { + break; + } + } + + /* + * make sure it fails hard when in debug mode, leave a hint for the user if this ever + * happens in production + */ + Assert(false); + ereport(ERROR, (errmsg("unsupported object %s for distribution by citus", + getObjectTypeDescription(dependency)), + errdetail( + "citus tries to recreate an unsupported object on its workers"), + errhint("please report a bug as this should not be happening"))); + return NIL; +} + + +/* + * ReplicateAllDependenciesToNode replicate all previously marked objects to a worker node + */ +void +ReplicateAllDependenciesToNode(const char *nodeName, int nodePort) +{ + const uint32 connectionFlag = FORCE_NEW_CONNECTION; + ListCell *dependencyCell = NULL; + List *dependencies = NIL; + List *ddlCommands = NIL; + MultiConnection *connection = NULL; + + /* + * collect all dependencies in creation order and get their ddl commands + */ + dependencies = GetDistributedObjectAddressList(); + + /* + * When dependency lists are getting longer we see a delay in the creation time on the + * workers. We would like to inform the user. Currently we warn for lists greater then + * 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too + * low we can adjust this based on experience. + */ + if (list_length(dependencies) > 100) + { + ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName, + nodePort), + errdetail("There are %d objects to replicate, depending on your " + "environment this might take a while", + list_length(dependencies)))); + } + + dependencies = OrderObjectAddressListInDependencyOrder(dependencies); + foreach(dependencyCell, dependencies) + { + ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell); + ddlCommands = list_concat(ddlCommands, + GetDependencyCreateDDLCommands(dependency)); + } + if (list_length(ddlCommands) <= 0) + { + /* no commands to replicate dependencies to the new worker */ + return; + } + + /* + * connect to the new host and create all applicable dependencies + */ + connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort, + CitusExtensionOwnerName(), NULL); + ExecuteCriticalRemoteCommandList(connection, ddlCommands); + CloseConnection(connection); +} diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index 392f61141..a1b8dc258 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -138,53 +138,3 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt, return NIL; } - - -/* - * EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user - * and creates the schema of the given relationId. The function errors out if the - * command cannot be executed in any of the worker nodes. - */ -void -EnsureSchemaExistsOnAllNodes(Oid relationId) -{ - List *workerNodeList = ActivePrimaryNodeList(); - ListCell *workerNodeCell = NULL; - - foreach(workerNodeCell, workerNodeList) - { - WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); - char *nodeName = workerNode->workerName; - uint32 nodePort = workerNode->workerPort; - - EnsureSchemaExistsOnNode(relationId, nodeName, nodePort); - } -} - - -/* - * EnsureSchemaExistsOnNode connects to one node with citus extension user - * and creates the schema of the given relationId. The function errors out if the - * command cannot be executed in the node. - */ -void -EnsureSchemaExistsOnNode(Oid relationId, char *nodeName, int32 nodePort) -{ - uint64 connectionFlag = FORCE_NEW_CONNECTION; - MultiConnection *connection = NULL; - - /* if the schema creation command is not provided, create it */ - Oid schemaId = get_rel_namespace(relationId); - char *schemaCreationDDL = CreateSchemaDDLCommand(schemaId); - - /* if the relation lives in public namespace, no need to perform any queries in workers */ - if (schemaCreationDDL == NULL) - { - return; - } - - connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, - nodePort, CitusExtensionOwnerName(), NULL); - - ExecuteCriticalRemoteCommand(connection, schemaCreationDDL); -} diff --git a/src/backend/distributed/master/master_create_shards.c b/src/backend/distributed/master/master_create_shards.c index 42831105b..ae85e3982 100644 --- a/src/backend/distributed/master/master_create_shards.c +++ b/src/backend/distributed/master/master_create_shards.c @@ -67,6 +67,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS) text *tableNameText = PG_GETARG_TEXT_P(0); int32 shardCount = PG_GETARG_INT32(1); int32 replicationFactor = PG_GETARG_INT32(2); + ObjectAddress tableAddress = { 0 }; Oid distributedTableId = ResolveRelationId(tableNameText, false); @@ -77,12 +78,13 @@ master_create_worker_shards(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(distributedTableId); + ObjectAddressSet(tableAddress, RelationRelationId, distributedTableId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor, useExclusiveConnections); diff --git a/src/backend/distributed/master/master_repair_shards.c b/src/backend/distributed/master/master_repair_shards.c index 444d1dc94..a74ad5f85 100644 --- a/src/backend/distributed/master/master_repair_shards.c +++ b/src/backend/distributed/master/master_repair_shards.c @@ -237,15 +237,6 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort, EnsureTableOwner(distributedTableId); - /* - * Ensure schema exists on the target worker node. We can not run this - * function transactionally, since we may create shards over separate - * sessions and shard creation depends on the schema being present and - * visible from all sessions. - */ - EnsureSchemaExistsOnNode(distributedTableId, targetNodeName, - targetNodePort); - if (relationKind == RELKIND_FOREIGN_TABLE) { char *relationName = get_rel_name(distributedTableId); diff --git a/src/backend/distributed/master/master_stage_protocol.c b/src/backend/distributed/master/master_stage_protocol.c index 58dfda15a..140170df5 100644 --- a/src/backend/distributed/master/master_stage_protocol.c +++ b/src/backend/distributed/master/master_stage_protocol.c @@ -90,6 +90,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS) char *relationName = text_to_cstring(relationNameText); uint64 shardId = INVALID_SHARD_ID; uint32 attemptableNodeCount = 0; + ObjectAddress tableAddress = { 0 }; uint32 candidateNodeIndex = 0; List *candidateNodeList = NIL; @@ -108,12 +109,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS) CheckDistributedTable(relationId); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * distributed tables might have dependencies on different objects, since we create + * shards for a distributed table via multiple sessions these objects will be created + * via their own connection and committed immediately so they become visible to all + * sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(relationId); + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); /* don't allow the table to be dropped */ LockRelationOid(relationId, AccessShareLock); diff --git a/src/backend/distributed/metadata/dependency.c b/src/backend/distributed/metadata/dependency.c new file mode 100644 index 000000000..06ca6b4ed --- /dev/null +++ b/src/backend/distributed/metadata/dependency.c @@ -0,0 +1,489 @@ +/*------------------------------------------------------------------------- + * + * dependency.c + * Functions to reason about distributed objects and their dependencies + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/skey.h" +#include "catalog/dependency.h" +#include "catalog/indexing.h" +#include "catalog/pg_depend.h" +#include "distributed/metadata/dependency.h" +#include "distributed/metadata/distobject.h" +#include "utils/fmgroids.h" +#include "utils/hsearch.h" +#include "utils/lsyscache.h" + +/* + * ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used + * together with recurse_pg_depend. + * + * We keep two different datastructures for the following reasons + * - A List ordered by insert/collect order + * - A Set to quickly O(1) check if an ObjectAddress has already been collected + */ +typedef struct ObjectAddressCollector +{ + List *dependencyList; + HTAB *dependencySet; +} ObjectAddressCollector; + + +/* forward declarations for functions to interact with the ObjectAddressCollector */ +static void InitObjectAddressCollector(ObjectAddressCollector *collector); +static void CollectObjectAddress(ObjectAddressCollector *collector, const + ObjectAddress *address); +static bool IsObjectAddressCollected(const ObjectAddress *findAddress, + ObjectAddressCollector *collector); + +/* forward declaration of functions that recurse pg_depend */ +static void recurse_pg_depend(const ObjectAddress *target, + List * (*expand)(void *context, const + ObjectAddress *target), + bool (*follow)(void *context, Form_pg_depend row), + void (*apply)(void *context, Form_pg_depend row), + void *context); +static bool FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend); +static bool FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend); +static void ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend); + +/* forward declaration of support functions to decide what to follow */ +static bool SupportedDependencyByCitus(const ObjectAddress *address); +static bool IsObjectAddressOwnedByExtension(const ObjectAddress *target); + + +/* + * GetDependenciesForObject returns a list of ObjectAddesses to be created in order + * before the target object could safely be created on a worker. Some of the object might + * already be created on a worker. It should be created in an idempotent way. + */ +List * +GetDependenciesForObject(const ObjectAddress *target) +{ + ObjectAddressCollector collector = { 0 }; + + InitObjectAddressCollector(&collector); + + recurse_pg_depend(target, + NULL, + &FollowNewSupportedDependencies, + &ApplyAddToDependencyList, + &collector); + + return collector.dependencyList; +} + + +/* + * OrderObjectAddressListInDependencyOrder given a list of ObjectAddresses return a new + * list of the same ObjectAddresses ordered on dependency order where dependencies + * precedes the corresponding object in the list. + * + * The algortihm traveses pg_depend in a depth first order starting at the first object in + * the provided list. By traversing depth first it will put the first dependency at the + * head of the list with dependencies depending on them later. + * + * If the object is already in the list it is skipped for traversal. This happens when an + * object was already added to the target list before it occurred in the input list. + */ +List * +OrderObjectAddressListInDependencyOrder(List *objectAddressList) +{ + ObjectAddressCollector collector = { 0 }; + ListCell *ojectAddressCell = NULL; + + InitObjectAddressCollector(&collector); + + foreach(ojectAddressCell, objectAddressList) + { + ObjectAddress *objectAddress = (ObjectAddress *) lfirst(ojectAddressCell); + + if (IsObjectAddressCollected(objectAddress, &collector)) + { + /* skip objects that are already ordered */ + continue; + } + + recurse_pg_depend(objectAddress, + NULL, + &FollowAllSupportedDependencies, + &ApplyAddToDependencyList, + &collector); + + CollectObjectAddress(&collector, objectAddress); + } + + return collector.dependencyList; +} + + +/* + * recurse_pg_depend recursively visits pg_depend entries. + * + * `expand` allows based on the target ObjectAddress to generate extra entries for ease of + * traversal. + * + * Starting from the target ObjectAddress. For every existing and generated entry the + * `follow` function will be called. When `follow` returns true it will recursively visit + * the dependencies for that object. recurse_pg_depend will visit therefore all pg_depend + * entries. + * + * Visiting will happen in depth first order, which is useful to create or sorted lists of + * dependencies to create. + * + * For all pg_depend entries that should be visited the apply function will be called. + * This function is designed to be the mutating function for the context being passed. + * Although nothing prevents the follow function to also mutate the context. + * + * - follow will be called on the way down, so the invocation order is top to bottom of + * the dependency tree + * - apply is called on the way back, so the invocation order is bottom to top. Apply is + * not called for entries for which follow has returned false. + */ +static void +recurse_pg_depend(const ObjectAddress *target, + List * (*expand)(void *context, const ObjectAddress *target), + bool (*follow)(void *context, Form_pg_depend row), + void (*apply)(void *context, Form_pg_depend row), + void *context) +{ + Relation depRel = NULL; + ScanKeyData key[2]; + SysScanDesc depScan = NULL; + HeapTuple depTup = NULL; + List *pgDependEntries = NIL; + ListCell *pgDependCell = NULL; + + /* + * iterate the actual pg_depend catalog + */ + depRel = heap_open(DependRelationId, AccessShareLock); + + /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */ + ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target->classId)); + ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target->objectId)); + depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, key); + + while (HeapTupleIsValid(depTup = systable_getnext(depScan))) + { + Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); + Form_pg_depend pg_depend_copy = palloc0(sizeof(FormData_pg_depend)); + + *pg_depend_copy = *pg_depend; + + pgDependEntries = lappend(pgDependEntries, pg_depend_copy); + } + + systable_endscan(depScan); + relation_close(depRel, AccessShareLock); + + /* + * concat expended entries if applicable + */ + if (expand != NULL) + { + List *expandedEntries = NIL; + + expandedEntries = expand(context, target); + pgDependEntries = list_concat(pgDependEntries, expandedEntries); + } + + /* + * Iterate all entries and recurse depth first + */ + foreach(pgDependCell, pgDependEntries) + { + Form_pg_depend pg_depend = (Form_pg_depend) lfirst(pgDependCell); + ObjectAddress address = { 0 }; + ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); + + if (follow == NULL || !follow(context, pg_depend)) + { + /* skip all pg_depend entries the user didn't want to follow */ + return; + } + + /* + * recurse depth first, this makes sure we call apply for the deepest dependency + * first. + */ + recurse_pg_depend(&address, expand, follow, apply, context); + + /* now apply changes for current entry */ + if (apply != NULL) + { + apply(context, pg_depend); + } + } +} + + +/* + * InitObjectAddressCollector takes a pointer to an already allocated (possibly stack) + * ObjectAddressCollector struct. It makes sure this struct is ready to be used for object + * collection. + * + * If an already initialized collector is passed the collector will be cleared from its + * contents to be reused. + */ +static void +InitObjectAddressCollector(ObjectAddressCollector *collector) +{ + int hashFlags = 0; + HASHCTL info; + + memset(&info, 0, sizeof(info)); + info.keysize = sizeof(ObjectAddress); + info.entrysize = sizeof(ObjectAddress); + info.hcxt = CurrentMemoryContext; + hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS); + + collector->dependencySet = hash_create("dependency set", 128, &info, hashFlags); + collector->dependencyList = NULL; +} + + +/* + * CollectObjectAddress adds an ObjectAddress to the collector. + */ +static void +CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *collect) +{ + ObjectAddress *address = NULL; + bool found = false; + + /* add to set */ + address = (ObjectAddress *) hash_search(collector->dependencySet, collect, + HASH_ENTER, &found); + + if (!found) + { + /* copy object address in */ + *address = *collect; + } + + /* add to list*/ + collector->dependencyList = lappend(collector->dependencyList, address); +} + + +/* + * IsObjectAddressCollected is a helper function that can check if an ObjectAddress is + * already in a (unsorted) list of ObjectAddresses + */ +static bool +IsObjectAddressCollected(const ObjectAddress *findAddress, + ObjectAddressCollector *collector) +{ + bool found = false; + + /* add to set */ + hash_search(collector->dependencySet, findAddress, HASH_FIND, &found); + + return found; +} + + +/* + * SupportedDependencyByCitus returns whether citus has support to distribute the object + * addressed. + */ +static bool +SupportedDependencyByCitus(const ObjectAddress *address) +{ + /* + * looking at the type of a object to see if we know how to create the object on the + * workers. + */ + switch (getObjectClass(address)) + { + case OCLASS_SCHEMA: + { + return true; + } + + default: + { + /* unsupported type */ + return false; + } + } +} + + +/* + * IsObjectAddressOwnedByExtension returns whether or not the object is owned by an + * extension. It is assumed that an object having a dependency on an extension is created + * by that extension and therefore owned by that extension. + */ +static bool +IsObjectAddressOwnedByExtension(const ObjectAddress *target) +{ + Relation depRel = NULL; + ScanKeyData key[2]; + SysScanDesc depScan = NULL; + HeapTuple depTup = NULL; + bool result = false; + + depRel = heap_open(DependRelationId, AccessShareLock); + + /* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */ + ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target->classId)); + ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(target->objectId)); + depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, key); + + while (HeapTupleIsValid(depTup = systable_getnext(depScan))) + { + Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup); + if (pg_depend->deptype == DEPENDENCY_EXTENSION) + { + result = true; + break; + } + } + + systable_endscan(depScan); + heap_close(depRel, AccessShareLock); + + return result; +} + + +/* + * FollowNewSupportedDependencies applies filters on pg_depend entries to follow all + * objects which should be distributed before the root object can safely be created. + */ +static bool +FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend) +{ + ObjectAddressCollector *collector = (ObjectAddressCollector *) context; + ObjectAddress address = { 0 }; + ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); + + /* + * Distirbute only normal dependencies, other dependencies are internal dependencies + * and managed by postgres + */ + if (pg_depend->deptype != DEPENDENCY_NORMAL) + { + return false; + } + + /* + * We can only distribute dependencies that citus knows how to distribute + */ + if (!SupportedDependencyByCitus(&address)) + { + return false; + } + + /* + * If the object is already in our dependency list we do not have to follow any + * further + */ + if (IsObjectAddressCollected(&address, collector)) + { + return false; + } + + /* + * If the object is already distributed it is not a `new` object that needs to be + * distributed before we create a dependant object + */ + if (IsObjectDistributed(&address)) + { + return false; + } + + /* + * Objects owned by an extension are assumed to be created on the workers by creating + * the extension in the cluster + */ + if (IsObjectAddressOwnedByExtension(&address)) + { + return false; + } + + return true; +} + + +/* + * FollowAllSupportedDependencies applies filters on pg_depend entries to follow the + * dependency tree of objects in depth first order. We will visit all supported objects. + * This is used to sort a list of dependencies in dependency order. + */ +static bool +FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend) +{ + ObjectAddressCollector *collector = (ObjectAddressCollector *) context; + ObjectAddress address = { 0 }; + ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); + + /* + * Distirbute only normal dependencies, other dependencies are internal dependencies + * and managed by postgres + */ + if (pg_depend->deptype != DEPENDENCY_NORMAL) + { + return false; + } + + /* + * We can only distribute dependencies that citus knows how to distribute + */ + if (!SupportedDependencyByCitus(&address)) + { + return false; + } + + /* + * If the object is already in our dependency list we do not have to follow any + * further + */ + if (IsObjectAddressCollected(&address, collector)) + { + return false; + } + + /* + * Objects owned by an extension are assumed to be created on the workers by creating + * the extension in the cluster + */ + if (IsObjectAddressOwnedByExtension(&address)) + { + return false; + } + + return true; +} + + +/* + * ApplyAddToDependencyList is an apply function for recurse_pg_depend that will collect + * all the ObjectAddresses for pg_depend entries to the context. The context here is + * assumed to be a (ObjectAddressCollector *) to the location where all ObjectAddresses + * will be collected. + */ +static void +ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend) +{ + ObjectAddressCollector *collector = (ObjectAddressCollector *) context; + ObjectAddress address = { 0 }; + ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid); + + CollectObjectAddress(collector, &address); +} diff --git a/src/backend/distributed/metadata/distobject.c b/src/backend/distributed/metadata/distobject.c new file mode 100644 index 000000000..41cf783c3 --- /dev/null +++ b/src/backend/distributed/metadata/distobject.c @@ -0,0 +1,282 @@ +/*------------------------------------------------------------------------- + * + * distobject.c + * Functions to interact with distributed objects by their ObjectAddress + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "miscadmin.h" + +#include "access/genam.h" +#include "access/heapam.h" +#include "access/htup_details.h" +#include "access/skey.h" +#include "access/xact.h" +#include "catalog/dependency.h" +#include "catalog/namespace.h" +#include "catalog/objectaddress.h" +#include "catalog/pg_namespace.h" +#include "catalog/pg_type.h" +#include "distributed/metadata/distobject.h" +#include "distributed/metadata/pg_dist_object.h" +#include "distributed/metadata_cache.h" +#include "executor/spi.h" +#include "nodes/makefuncs.h" +#include "nodes/pg_list.h" +#include "parser/parse_type.h" +#include "utils/builtins.h" +#include "utils/fmgroids.h" +#include "utils/regproc.h" +#include "utils/rel.h" + + +static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, + Datum *paramValues); + + +PG_FUNCTION_INFO_V1(master_unmark_object_distributed); + + +/* + * master_unmark_object_distributed(classid oid, objid oid, objsubid int) + * + * removes the entry for an object address from pg_dist_object. Only removes the entry if + * the object does not exist anymore. + */ +Datum +master_unmark_object_distributed(PG_FUNCTION_ARGS) +{ + Oid classid = PG_GETARG_OID(0); + Oid objid = PG_GETARG_OID(1); + int32 objsubid = PG_GETARG_INT32(2); + + ObjectAddress address = { 0 }; + ObjectAddressSubSet(address, classid, objid, objsubid); + + if (!IsObjectDistributed(&address)) + { + /* if the object is not distributed there is no need to unmark it */ + PG_RETURN_VOID(); + } + + if (ObjectExists(&address)) + { + ereport(ERROR, (errmsg("object still exists"), + errdetail("the %s \"%s\" still exists", + getObjectTypeDescription(&address), + getObjectIdentity(&address)), + errhint("drop the object via a DROP command"))); + } + + UnmarkObjectDistributed(&address); + + PG_RETURN_VOID(); +} + + +/* + * ObjectExists checks if an object given by its object address exists + * + * This is done by opening the catalog for the object and search the catalog for the + * objects' oid. If we can find a tuple the object is existing. If no tuple is found, or + * we don't have the information to find the tuple by its oid we assume the object does + * not exist. + */ +bool +ObjectExists(const ObjectAddress *address) +{ + if (is_objectclass_supported(address->classId)) + { + HeapTuple objtup; + Relation catalog = heap_open(address->classId, AccessShareLock); + + objtup = get_catalog_object_by_oid(catalog, address->objectId); + heap_close(catalog, AccessShareLock); + if (objtup != NULL) + { + return true; + } + + return false; + } + + return false; +} + + +/* + * MarkObjectDistributed marks an object as a distributed object by citus. Marking is done + * by adding appropriate entries to citus.pg_dist_object + */ +void +MarkObjectDistributed(const ObjectAddress *distAddress) +{ + int paramCount = 3; + Oid paramTypes[3] = { + OIDOID, + OIDOID, + INT4OID + }; + Datum paramValues[3] = { + ObjectIdGetDatum(distAddress->classId), + ObjectIdGetDatum(distAddress->objectId), + Int32GetDatum(distAddress->objectSubId) + }; + int spiStatus = 0; + + char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) " + "VALUES ($1, $2, $3) ON CONFLICT DO NOTHING"; + + spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes, + paramValues); + if (spiStatus < 0) + { + ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object"))); + } +} + + +/* + * ExecuteCommandAsSuperuser executes a command via SPI as superuser. + */ +static int +ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes, + Datum *paramValues) +{ + int spiConnected = 0; + Oid savedUserId = InvalidOid; + int savedSecurityContext = 0; + int spiStatus = 0; + int spiFinished = 0; + + spiConnected = SPI_connect(); + if (spiConnected != SPI_OK_CONNECT) + { + ereport(ERROR, (errmsg("could not connect to SPI manager"))); + } + + /* make sure we have write access */ + GetUserIdAndSecContext(&savedUserId, &savedSecurityContext); + SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE); + + spiStatus = SPI_execute_with_args(query, paramCount, paramTypes, paramValues, + NULL, false, 0); + + SetUserIdAndSecContext(savedUserId, savedSecurityContext); + + spiFinished = SPI_finish(); + if (spiFinished != SPI_OK_FINISH) + { + ereport(ERROR, (errmsg("could not disconnect from SPI manager"))); + } + + return spiStatus; +} + + +/* + * UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as + * distributed. This will prevent updates to that object to be propagated to the worker. + */ +void +UnmarkObjectDistributed(const ObjectAddress *address) +{ + int paramCount = 3; + Oid paramTypes[3] = { + OIDOID, + OIDOID, + INT4OID + }; + Datum paramValues[3] = { + ObjectIdGetDatum(address->classId), + ObjectIdGetDatum(address->objectId), + Int32GetDatum(address->objectSubId) + }; + int spiStatus = 0; + + char *deleteQuery = "DELETE FROM citus.pg_dist_object WHERE classid = $1 AND " + "objid = $2 AND objsubid = $3"; + + spiStatus = ExecuteCommandAsSuperuser(deleteQuery, paramCount, paramTypes, + paramValues); + if (spiStatus < 0) + { + ereport(ERROR, (errmsg("failed to delete object from citus.pg_dist_object"))); + } +} + + +/* + * IsObjectDistributed returns if the object addressed is already distributed in the + * cluster. This performs a local indexed lookup in pg_dist_object. + */ +bool +IsObjectDistributed(const ObjectAddress *address) +{ + Relation pgDistObjectRel = NULL; + ScanKeyData key[3]; + SysScanDesc pgDistObjectScan = NULL; + HeapTuple pgDistObjectTup = NULL; + bool result = false; + + pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock); + + /* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */ + ScanKeyInit(&key[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(address->classId)); + ScanKeyInit(&key[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ, + ObjectIdGetDatum(address->objectId)); + ScanKeyInit(&key[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, F_INT4EQ, + ObjectIdGetDatum(address->objectSubId)); + pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(), + true, NULL, 3, key); + + pgDistObjectTup = systable_getnext(pgDistObjectScan); + if (HeapTupleIsValid(pgDistObjectTup)) + { + result = true; + } + + systable_endscan(pgDistObjectScan); + relation_close(pgDistObjectRel, AccessShareLock); + + return result; +} + + +/* + * GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all + * distributed objects as marked in pg_dist_object + */ +List * +GetDistributedObjectAddressList(void) +{ + Relation pgDistObjectRel = NULL; + SysScanDesc pgDistObjectScan = NULL; + HeapTuple pgDistObjectTup = NULL; + List *objectAddressList = NIL; + + pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock); + pgDistObjectScan = systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, + NULL); + while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan))) + { + Form_pg_dist_object pg_dist_object = + (Form_pg_dist_object) GETSTRUCT(pgDistObjectTup); + ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); + ObjectAddressSubSet(*objectAddress, + pg_dist_object->classid, + pg_dist_object->objid, + pg_dist_object->objsubid); + objectAddressList = lappend(objectAddressList, objectAddress); + } + + systable_endscan(pgDistObjectScan); + relation_close(pgDistObjectRel, AccessShareLock); + + return objectAddressList; +} diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 64be88d1c..ad42bd7f9 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -286,18 +286,20 @@ MetadataCreateCommands(void) DistTableCacheEntry *cacheEntry = (DistTableCacheEntry *) lfirst(distributedTableCell); Oid relationId = cacheEntry->relationId; + ObjectAddress tableAddress = { 0 }; List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); /* - * Ensure schema exists on each worker node. We can not run this function - * transactionally, since we may create shards over separate sessions and - * shard creation depends on the schema being present and visible from all - * sessions. + * Distributed tables might have dependencies on different objects, since we + * create shards for a distributed table via multiple sessions these objects will + * be created via their own connection and committed immediately so they become + * visible to all sessions creating shards. */ - EnsureSchemaExistsOnAllNodes(relationId); + ObjectAddressSet(tableAddress, RelationRelationId, relationId); + EnsureDependenciesExistsOnAllNodes(&tableAddress); metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, workerSequenceDDLCommands); diff --git a/src/backend/distributed/utils/metadata_cache.c b/src/backend/distributed/utils/metadata_cache.c index 492df0e7e..441c9464e 100644 --- a/src/backend/distributed/utils/metadata_cache.c +++ b/src/backend/distributed/utils/metadata_cache.c @@ -114,6 +114,8 @@ typedef struct MetadataCacheData Oid distNodeRelationId; Oid distNodeNodeIdIndexId; Oid distLocalGroupRelationId; + Oid distObjectRelationId; + Oid distObjectPrimaryKeyIndexId; Oid distColocationRelationId; Oid distColocationConfigurationIndexId; Oid distColocationColocationidIndexId; @@ -128,6 +130,7 @@ typedef struct MetadataCacheData Oid distTransactionRelationId; Oid distTransactionGroupIndexId; Oid distTransactionRecordIndexId; + Oid citusCatalogNamespaceId; Oid copyFormatTypeId; Oid readIntermediateResultFuncId; Oid extraDataContainerFuncId; @@ -214,7 +217,10 @@ static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMe static ShardInterval * TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid intervalTypeId, int32 intervalTypeMod); +static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid); static void CachedRelationLookup(const char *relationName, Oid *cachedOid); +static void CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, + Oid *cachedOid); static ShardPlacement * ResolveGroupShardPlacement( GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry); static WorkerNode * LookupNodeForGroup(int32 groupId); @@ -1806,6 +1812,38 @@ DistLocalGroupIdRelationId(void) } +/* return the oid of citus namespace */ +Oid +CitusCatalogNamespaceId(void) +{ + CachedNamespaceLookup("citus", &MetadataCache.citusCatalogNamespaceId); + return MetadataCache.citusCatalogNamespaceId; +} + + +/* return oid of pg_dist_shard relation */ +Oid +DistObjectRelationId(void) +{ + CachedRelationNamespaceLookup("pg_dist_object", CitusCatalogNamespaceId(), + &MetadataCache.distObjectRelationId); + + return MetadataCache.distObjectRelationId; +} + + +/* return oid of pg_dist_object_pkey */ +Oid +DistObjectPrimaryKeyIndexId(void) +{ + CachedRelationNamespaceLookup("pg_dist_object_pkey", + CitusCatalogNamespaceId(), + &MetadataCache.distObjectPrimaryKeyIndexId); + + return MetadataCache.distObjectPrimaryKeyIndexId; +} + + /* return oid of pg_dist_colocation relation */ Oid DistColocationRelationId(void) @@ -3573,18 +3611,49 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva /* - * CachedRelationLookup performs a cached lookup for the relation - * relationName, with the result cached in *cachedOid. + * CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the + * result cached in cachedOid. */ static void -CachedRelationLookup(const char *relationName, Oid *cachedOid) +CachedNamespaceLookup(const char *nspname, Oid *cachedOid) { /* force callbacks to be registered, so we always get notified upon changes */ InitializeCaches(); if (*cachedOid == InvalidOid) { - *cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE); + *cachedOid = get_namespace_oid(nspname, true); + + if (*cachedOid == InvalidOid) + { + ereport(ERROR, (errmsg( + "cache lookup failed for namespace %s, called too early?", + nspname))); + } + } +} + + +/* + * CachedRelationLookup performs a cached lookup for the relation + * relationName, with the result cached in *cachedOid. + */ +static void +CachedRelationLookup(const char *relationName, Oid *cachedOid) +{ + CachedRelationNamespaceLookup(relationName, PG_CATALOG_NAMESPACE, cachedOid); +} + + +static void +CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *cachedOid) +{ + /* force callbacks to be registered, so we always get notified upon changes */ + InitializeCaches(); + + if (*cachedOid == InvalidOid) + { + *cachedOid = get_relname_relid(relationName, relnamespace); if (*cachedOid == InvalidOid) { diff --git a/src/backend/distributed/utils/node_metadata.c b/src/backend/distributed/utils/node_metadata.c index acf18f225..53e4994df 100644 --- a/src/backend/distributed/utils/node_metadata.c +++ b/src/backend/distributed/utils/node_metadata.c @@ -455,6 +455,7 @@ ActivateNode(char *nodeName, int nodePort) if (WorkerNodeIsPrimary(workerNode)) { + ReplicateAllDependenciesToNode(nodeName, nodePort); ReplicateAllReferenceTablesToNode(nodeName, nodePort); } diff --git a/src/backend/distributed/utils/reference_table_utils.c b/src/backend/distributed/utils/reference_table_utils.c index 22c0b3b26..09b33d408 100644 --- a/src/backend/distributed/utils/reference_table_utils.c +++ b/src/backend/distributed/utils/reference_table_utils.c @@ -300,16 +300,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort) missingWorkerOk); char *tableOwner = TableOwner(shardInterval->relationId); - - /* - * Ensure schema exists on the worker node. We can not run this - * function transactionally, since we may create shards over separate - * sessions and shard creation depends on the schema being present and - * visible from all sessions. - */ - EnsureSchemaExistsOnNode(shardInterval->relationId, nodeName, nodePort); - - /* * Although this function is used for reference tables and reference table shard * placements always have shardState = FILE_FINALIZED, in case of an upgrade of diff --git a/src/include/distributed/master_metadata_utility.h b/src/include/distributed/master_metadata_utility.h index c388d417e..27fbfc120 100644 --- a/src/include/distributed/master_metadata_utility.h +++ b/src/include/distributed/master_metadata_utility.h @@ -18,6 +18,7 @@ #include "access/htup.h" #include "access/tupdesc.h" #include "catalog/indexing.h" +#include "catalog/objectaddress.h" #include "distributed/citus_nodes.h" #include "distributed/relay_utility.h" #include "utils/acl.h" @@ -130,9 +131,9 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributionMethod, char *colocateWithTableName, bool viaDeprecatedAPI); extern void CreateTruncateTrigger(Oid relationId); -extern void EnsureSchemaExistsOnAllNodes(Oid relationId); -extern void EnsureSchemaExistsOnNode(Oid relationId, char *nodeName, - int32 nodePort); + +extern void EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target); +extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort); /* Remaining metadata utility functions */ extern char * TableOwner(Oid relationId); diff --git a/src/include/distributed/metadata/dependency.h b/src/include/distributed/metadata/dependency.h new file mode 100644 index 000000000..e4cb406d8 --- /dev/null +++ b/src/include/distributed/metadata/dependency.h @@ -0,0 +1,23 @@ +/*------------------------------------------------------------------------- + * + * dependency.c + * Functions to follow and record dependencies for objects to be + * created in the right order. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_DEPENDENCY_H +#define CITUS_DEPENDENCY_H + +#include "postgres.h" + +#include "catalog/objectaddress.h" +#include "nodes/pg_list.h" + +extern List * GetDependenciesForObject(const ObjectAddress *target); +extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList); + +#endif /* CITUS_DEPENDENCY_H */ diff --git a/src/include/distributed/metadata/distobject.h b/src/include/distributed/metadata/distobject.h new file mode 100644 index 000000000..d2892a238 --- /dev/null +++ b/src/include/distributed/metadata/distobject.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * distobject.h + * Declarations for functions to work with pg_dist_object + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef CITUS_METADATA_DISTOBJECT_H +#define CITUS_METADATA_DISTOBJECT_H + +#include "postgres.h" + +#include "catalog/objectaddress.h" + + +extern bool ObjectExists(const ObjectAddress *address); +extern bool IsObjectDistributed(const ObjectAddress *address); +extern void MarkObjectDistributed(const ObjectAddress *distAddress); +extern void UnmarkObjectDistributed(const ObjectAddress *address); + +extern List * GetDistributedObjectAddressList(void); + +#endif /* CITUS_METADATA_DISTOBJECT_H */ diff --git a/src/include/distributed/metadata/pg_dist_object.h b/src/include/distributed/metadata/pg_dist_object.h new file mode 100644 index 000000000..ffd0845e1 --- /dev/null +++ b/src/include/distributed/metadata/pg_dist_object.h @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------- + * + * pg_dist_object.h + * definition of the system distributed objects relation (pg_dist_object). + * + * This table keeps metadata on all postgres objects that are distributed + * to all the nodes in the network. Objects in this table should all be + * present on all workers and kept in sync throughout their existance. + * This also means that all nodes joining the network are assumed to + * recreate all these objects. + * + * Copyright (c) 2019, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef PG_DIST_OBJECT_H +#define PG_DIST_OBJECT_H + + +/* ---------------- + * pg_dist_object definition. + * ---------------- + */ +typedef struct FormData_pg_dist_object +{ + Oid classid; /* class of the distributed object */ + Oid objid; /* object id of the distributed object */ + int32 objsubid; /* object sub id of the distributed object, eg. attnum */ + +#ifdef CATALOG_VARLEN /* variable-length fields start here */ + text type; + text[] object_names; + text[] object_arguments; +#endif +} FormData_pg_dist_object; + +/* ---------------- + * Form_pg_dist_partitions corresponds to a pointer to a tuple with + * the format of pg_dist_partitions relation. + * ---------------- + */ +typedef FormData_pg_dist_object *Form_pg_dist_object; + +/* ---------------- + * compiler constants for pg_dist_object + * ---------------- + */ +#define Natts_pg_dist_object 6 +#define Anum_pg_dist_object_classid 1 +#define Anum_pg_dist_object_objid 2 +#define Anum_pg_dist_object_objsubid 3 +#define Anum_pg_dist_object_type 4 +#define Anum_pg_dist_object_object_names 5 +#define Anum_pg_dist_object_object_args 6 + +#endif /* PG_DIST_OBJECT_H */ diff --git a/src/include/distributed/metadata_cache.h b/src/include/distributed/metadata_cache.h index c765ababd..b0e702b38 100644 --- a/src/include/distributed/metadata_cache.h +++ b/src/include/distributed/metadata_cache.h @@ -121,6 +121,9 @@ extern void EnsureModificationsCanRun(void); extern HTAB * GetWorkerNodeHash(void); extern WorkerNode * LookupNodeByNodeId(uint32 nodeId); +/* namespace oids */ +extern Oid CitusCatalogNamespaceId(void); + /* relation oids */ extern Oid DistColocationRelationId(void); extern Oid DistColocationConfigurationIndexId(void); @@ -130,6 +133,7 @@ extern Oid DistShardRelationId(void); extern Oid DistPlacementRelationId(void); extern Oid DistNodeRelationId(void); extern Oid DistLocalGroupIdRelationId(void); +extern Oid DistObjectRelationId(void); /* index oids */ extern Oid DistNodeNodeIdIndexId(void); @@ -143,6 +147,7 @@ extern Oid DistTransactionRelationId(void); extern Oid DistTransactionGroupIndexId(void); extern Oid DistTransactionRecordIndexId(void); extern Oid DistPlacementGroupidIndexId(void); +extern Oid DistObjectPrimaryKeyIndexId(void); /* type oids */ extern Oid CitusCopyFormatTypeId(void); diff --git a/src/test/regress/Makefile b/src/test/regress/Makefile index f6bc5d18a..d91f5faf5 100644 --- a/src/test/regress/Makefile +++ b/src/test/regress/Makefile @@ -78,6 +78,11 @@ check-failure-non-adaptive: all tempinstall-main --server-option=citus.task_executor_type=real-time \ -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS) +check-failure-non-adaptive-base: all tempinstall-main + $(pg_regress_multi_check) --load-extension=citus --mitmproxy \ + --server-option=citus.task_executor_type=real-time \ + -- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_base_schedule $(EXTRA_TESTS) + check-isolation-non-adaptive: all tempinstall-main $(pg_regress_multi_check) --load-extension=citus --isolationtester \ --server-option=citus.task_executor_type=real-time \ diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index e74229049..6006fd820 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -55,9 +55,6 @@ s/.*Custom Plan Provider.*Citus.*/ \"Custom Plan Provider\": \"Citu s/.*Custom-Plan-Provide.*/\Citus Unified\<\/Custom-Plan-Provider\> /g s/ +$//g -# normalize shard ids in failure_vaccum -s/10209[0-9] \| 3/10209x \| 3/g - # normalize failed task ids s/ERROR: failed to execute task [0-9]+/ERROR: failed to execute task X/g diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty.out b/src/test/regress/expected/failure_create_distributed_table_non_empty.out index 0fa0d03e3..1d57554d0 100644 --- a/src/test/regress/expected/failure_create_distributed_table_non_empty.out +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty.out @@ -343,6 +343,10 @@ SELECT recover_prepared_transactions(); (1 row) DROP TABLE test_table ; +-- since we want to interrupt the schema creation again we need to drop and recreate +-- for citus to redistribute the dependency +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); -- cancel as soon as the coordinator sends COMMIT @@ -572,6 +576,8 @@ SELECT citus.mitmproxy('conn.allow()'); DROP TABLE colocated_table; DROP TABLE test_table; +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); SET citus.multi_shard_commit_protocol TO '1pc'; diff --git a/src/test/regress/expected/failure_create_distributed_table_non_empty_9.out b/src/test/regress/expected/failure_create_distributed_table_non_empty_9.out index bea000678..7783b4c16 100644 --- a/src/test/regress/expected/failure_create_distributed_table_non_empty_9.out +++ b/src/test/regress/expected/failure_create_distributed_table_non_empty_9.out @@ -337,6 +337,10 @@ SELECT recover_prepared_transactions(); (1 row) DROP TABLE test_table ; +-- since we want to interrupt the schema creation again we need to drop and recreate +-- for citus to redistribute the dependency +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); -- cancel as soon as the coordinator sends COMMIT @@ -568,6 +572,8 @@ SELECT citus.mitmproxy('conn.allow()'); DROP TABLE colocated_table; DROP TABLE test_table; +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); SET citus.multi_shard_commit_protocol TO '1pc'; diff --git a/src/test/regress/expected/failure_create_reference_table.out b/src/test/regress/expected/failure_create_reference_table.out index 9e5af1961..abeaa40a8 100644 --- a/src/test/regress/expected/failure_create_reference_table.out +++ b/src/test/regress/expected/failure_create_reference_table.out @@ -182,6 +182,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) DROP TABLE ref_table; +DROP SCHEMA failure_reference_table; +CREATE SCHEMA failure_reference_table; CREATE TABLE ref_table(id int); INSERT INTO ref_table VALUES(1),(2),(3); -- Test in transaction diff --git a/src/test/regress/expected/failure_create_reference_table_9.out b/src/test/regress/expected/failure_create_reference_table_9.out index 4c4ede61b..1946c7e4f 100644 --- a/src/test/regress/expected/failure_create_reference_table_9.out +++ b/src/test/regress/expected/failure_create_reference_table_9.out @@ -182,6 +182,8 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) DROP TABLE ref_table; +DROP SCHEMA failure_reference_table; +CREATE SCHEMA failure_reference_table; CREATE TABLE ref_table(id int); INSERT INTO ref_table VALUES(1),(2),(3); -- Test in transaction diff --git a/src/test/regress/expected/failure_create_table.out b/src/test/regress/expected/failure_create_table.out index 21b6e0d44..5d376bdd9 100644 --- a/src/test/regress/expected/failure_create_table.out +++ b/src/test/regress/expected/failure_create_table.out @@ -332,7 +332,12 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (localhost,57637,t,0) (2 rows) +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE temp_table; +DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; +CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction -- Kill connection before sending query to the worker SELECT citus.mitmproxy('conn.kill()'); @@ -444,7 +449,10 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (localhost,57637,t,2) (2 rows) +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction and with 1PC SET citus.multi_shard_commit_protocol TO "1pc"; @@ -591,6 +599,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (2 rows) DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; -- Test master_create_worker_shards with 2pc SET citus.multi_shard_commit_protocol TO "2pc"; CREATE TABLE test_table_2(id int, value_1 int); @@ -600,18 +610,18 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'hash'); (1 row) --- Kill connection before sending query to the worker -SELECT citus.mitmproxy('conn.kill()'); +-- Kill connection before sending query to the worker +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); mitmproxy ----------- (1 row) SELECT master_create_worker_shards('test_table_2', 4, 2); +WARNING: connection not open +CONTEXT: while executing command on localhost:9060 ERROR: connection error: localhost:9060 -DETAIL: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +DETAIL: connection not open SELECT count(*) FROM pg_dist_shard; count ------- diff --git a/src/test/regress/expected/failure_create_table_9.out b/src/test/regress/expected/failure_create_table_9.out index c1e3197b8..cc4b90e94 100644 --- a/src/test/regress/expected/failure_create_table_9.out +++ b/src/test/regress/expected/failure_create_table_9.out @@ -327,7 +327,12 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (localhost,57637,t,0) (2 rows) +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE temp_table; +DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; +CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction -- Kill connection before sending query to the worker SELECT citus.mitmproxy('conn.kill()'); @@ -434,7 +439,10 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (localhost,57637,t,0) (2 rows) +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction and with 1PC SET citus.multi_shard_commit_protocol TO "1pc"; @@ -576,6 +584,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W (2 rows) DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; -- Test master_create_worker_shards with 2pc SET citus.multi_shard_commit_protocol TO "2pc"; CREATE TABLE test_table_2(id int, value_1 int); @@ -585,8 +595,8 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'hash'); (1 row) --- Kill connection before sending query to the worker -SELECT citus.mitmproxy('conn.kill()'); +-- Kill connection before sending query to the worker +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); mitmproxy ----------- diff --git a/src/test/regress/expected/failure_vacuum.out b/src/test/regress/expected/failure_vacuum.out index bab6490dd..d71e8194f 100644 --- a/src/test/regress/expected/failure_vacuum.out +++ b/src/test/regress/expected/failure_vacuum.out @@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; t (1 row) +SET citus.next_shard_id TO 12000000; SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- @@ -69,9 +70,9 @@ CONTEXT: while executing command on localhost:9060 -- show that we marked as INVALID on COMMIT FAILURE SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass); - shardid | shardstate ----------+------------ - 102093 | 3 + shardid | shardstate +----------+------------ + 12000000 | 3 (1 row) UPDATE pg_dist_shard_placement SET shardstate = 1 diff --git a/src/test/regress/expected/failure_vacuum_8.out b/src/test/regress/expected/failure_vacuum_8.out index c5c6af40d..a1123085b 100644 --- a/src/test/regress/expected/failure_vacuum_8.out +++ b/src/test/regress/expected/failure_vacuum_8.out @@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; f (1 row) +SET citus.next_shard_id TO 12000000; SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- @@ -64,9 +65,9 @@ ANALYZE vacuum_test; -- show that we marked as INVALID on COMMIT FAILURE SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass); - shardid | shardstate ----------+------------ - 102093 | 3 + shardid | shardstate +----------+------------ + 12000000 | 3 (1 row) UPDATE pg_dist_shard_placement SET shardstate = 1 diff --git a/src/test/regress/expected/failure_vacuum_9.out b/src/test/regress/expected/failure_vacuum_9.out index 2b6510f62..9e6813e94 100644 --- a/src/test/regress/expected/failure_vacuum_9.out +++ b/src/test/regress/expected/failure_vacuum_9.out @@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; t (1 row) +SET citus.next_shard_id TO 12000000; SELECT citus.mitmproxy('conn.allow()'); mitmproxy ----------- @@ -64,9 +65,9 @@ ANALYZE vacuum_test; -- show that we marked as INVALID on COMMIT FAILURE SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass); - shardid | shardstate ----------+------------ - 102093 | 3 + shardid | shardstate +----------+------------ + 12000000 | 3 (1 row) UPDATE pg_dist_shard_placement SET shardstate = 1 diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement.out b/src/test/regress/expected/foreign_key_restriction_enforcement.out index b527f5f16..3d0309031 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement.out @@ -1340,10 +1340,6 @@ CREATE TABLE reference_table(id int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" DEBUG: building index "reference_table_pkey" on table "reference_table" serially SELECT create_reference_table('reference_table'); -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57638 -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57637 create_reference_table ------------------------ @@ -1353,10 +1349,6 @@ CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially SELECT create_distributed_table('distributed_table', 'id'); -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57638 -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57637 create_distributed_table -------------------------- diff --git a/src/test/regress/expected/foreign_key_restriction_enforcement_0.out b/src/test/regress/expected/foreign_key_restriction_enforcement_0.out index 07855c836..fde87b2c2 100644 --- a/src/test/regress/expected/foreign_key_restriction_enforcement_0.out +++ b/src/test/regress/expected/foreign_key_restriction_enforcement_0.out @@ -1340,10 +1340,6 @@ CREATE TABLE reference_table(id int PRIMARY KEY); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table" DEBUG: building index "reference_table_pkey" on table "reference_table" SELECT create_reference_table('reference_table'); -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57638 -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57637 create_reference_table ------------------------ @@ -1353,10 +1349,6 @@ CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int); DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table" DEBUG: building index "distributed_table_pkey" on table "distributed_table" SELECT create_distributed_table('distributed_table', 'id'); -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57638 -DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping -DETAIL: NOTICE from localhost:57637 create_distributed_table -------------------------- diff --git a/src/test/regress/expected/isolation_citus_dist_activity.out b/src/test/regress/expected/isolation_citus_dist_activity.out index e80ab62b2..3120cc577 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity.out +++ b/src/test/regress/expected/isolation_citus_dist_activity.out @@ -41,16 +41,16 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT worker_apply_shard_ddl_command (102081, 'public', ' +SELECT worker_apply_shard_ddl_command (102141, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102080, 'public', ' +SELECT worker_apply_shard_ddl_command (102140, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102079, 'public', ' +SELECT worker_apply_shard_ddl_command (102139, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102078, 'public', ' +SELECT worker_apply_shard_ddl_command (102138, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: @@ -104,7 +104,7 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -INSERT INTO public.test_table_102084 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +INSERT INTO public.test_table_102144 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -118,12 +118,12 @@ step s3-rollback: starting permutation: s1-cache-connections s1-begin s2-begin s3-begin s1-select s2-sleep s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback create_distributed_table - + step s1-cache-connections: SET citus.max_cached_conns_per_worker TO 4; SET citus.force_max_query_parallelization TO on; UPDATE test_table SET column2 = 0; - + step s1-begin: BEGIN; @@ -159,10 +159,10 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -COPY (SELECT count(*) AS count FROM test_table_102089 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_102088 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_102087 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -COPY (SELECT count(*) AS count FROM test_table_102086 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_102149 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_102148 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_102147 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +COPY (SELECT count(*) AS count FROM test_table_102146 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -217,7 +217,7 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT count(*) AS count FROM public.test_table_102091 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression +SELECT count(*) AS count FROM public.test_table_102151 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/expected/isolation_citus_dist_activity_9.out b/src/test/regress/expected/isolation_citus_dist_activity_9.out index c66d5b172..686ad1fee 100644 --- a/src/test/regress/expected/isolation_citus_dist_activity_9.out +++ b/src/test/regress/expected/isolation_citus_dist_activity_9.out @@ -41,16 +41,16 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT worker_apply_shard_ddl_command (102081, 'public', ' +SELECT worker_apply_shard_ddl_command (102141, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102080, 'public', ' +SELECT worker_apply_shard_ddl_command (102140, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102079, 'public', ' +SELECT worker_apply_shard_ddl_command (102139, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT worker_apply_shard_ddl_command (102078, 'public', ' +SELECT worker_apply_shard_ddl_command (102138, 'public', ' ALTER TABLE test_table ADD COLUMN x INT; ')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: @@ -104,7 +104,7 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -INSERT INTO public.test_table_102084 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +INSERT INTO public.test_table_102144 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -159,10 +159,10 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT count(*) AS count FROM test_table_102089 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT count(*) AS count FROM test_table_102088 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT count(*) AS count FROM test_table_102087 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression -SELECT count(*) AS count FROM test_table_102086 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT count(*) AS count FROM test_table_102149 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT count(*) AS count FROM test_table_102148 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT count(*) AS count FROM test_table_102147 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT count(*) AS count FROM test_table_102146 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; @@ -217,7 +217,7 @@ step s3-view-worker: query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname -SELECT count(*) AS count FROM public.test_table_102091 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression +SELECT count(*) AS count FROM public.test_table_102151 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression step s2-rollback: ROLLBACK; diff --git a/src/test/regress/expected/isolation_distributed_transaction_id.out b/src/test/regress/expected/isolation_distributed_transaction_id.out index 00d9d9e6c..6b3cfefef 100644 --- a/src/test/regress/expected/isolation_distributed_transaction_id.out +++ b/src/test/regress/expected/isolation_distributed_transaction_id.out @@ -77,7 +77,7 @@ step s1-get-current-transaction-id: row -(0,174) +(0,229) step s2-get-first-worker-active-transactions: SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number) FROM @@ -88,7 +88,7 @@ step s2-get-first-worker-active-transactions: nodename nodeport success result -localhost 57637 t (0,174) +localhost 57637 t (0,229) step s1-commit: COMMIT; diff --git a/src/test/regress/expected/isolation_dump_global_wait_edges.out b/src/test/regress/expected/isolation_dump_global_wait_edges.out index 5331a6b61..30fb66054 100644 --- a/src/test/regress/expected/isolation_dump_global_wait_edges.out +++ b/src/test/regress/expected/isolation_dump_global_wait_edges.out @@ -29,11 +29,11 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -177 176 f +232 231 f transactionnumberwaitingtransactionnumbers -176 -177 176 +231 +232 231 step s1-abort: ABORT; @@ -77,14 +77,14 @@ step detector-dump-wait-edges: waiting_transaction_numblocking_transaction_numblocking_transaction_waiting -181 180 f -182 180 f -182 181 t +236 235 f +237 235 f +237 236 t transactionnumberwaitingtransactionnumbers -180 -181 180 -182 180,181 +235 +236 235 +237 235,236 step s1-abort: ABORT; diff --git a/src/test/regress/expected/isolation_ensure_dependency_activate_node.out b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out new file mode 100644 index 000000000..2a726ed6f --- /dev/null +++ b/src/test/regress/expected/isolation_ensure_dependency_activate_node.out @@ -0,0 +1,933 @@ +Parsed test spec with 4 sessions + +starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-create-table s1-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-public-schema: + SET search_path TO public; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s2-begin s1-add-worker s2-public-schema s2-create-table s1-commit s2-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-public-schema: + SET search_path TO public; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-public-schema s2-create-table s1-add-worker s2-commit s1-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-public-schema: + SET search_path TO public; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +create_distributed_table + + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +step s2-commit: + COMMIT; + +step s1-add-worker: <... completed> +nodename nodeport isactive + +localhost 57638 t +step s1-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-create-schema s2-create-table s1-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s2-begin s1-add-worker s2-create-schema s2-create-table s1-commit s2-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-create-schema s2-create-table s1-add-worker s2-commit s1-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +create_distributed_table + + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +step s2-commit: + COMMIT; + +step s1-add-worker: <... completed> +nodename nodeport isactive + +localhost 57638 t +step s1-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s2-create-schema s1-begin s2-begin s3-begin s1-add-worker s2-create-table s3-use-schema s3-create-table s1-commit s2-commit s3-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s3-use-schema: + SET search_path TO myschema; + +step s3-create-table: + CREATE TABLE t2 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t2', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s3-create-table: <... completed> +create_distributed_table + + +step s3-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s2-create-schema s1-begin s2-begin s3-begin s4-begin s1-add-worker s2-create-table s3-use-schema s3-create-table s4-use-schema s4-create-table s1-commit s2-commit s3-commit s4-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s4-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s3-use-schema: + SET search_path TO myschema; + +step s3-create-table: + CREATE TABLE t2 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t2', 'a'); + +step s4-use-schema: + SET search_path TO myschema; + +step s4-create-table: + CREATE TABLE t3 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t3', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s3-create-table: <... completed> +create_distributed_table + + +step s4-create-table: <... completed> +create_distributed_table + + +step s3-commit: + COMMIT; + +step s4-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-add-worker s2-create-schema s2-begin s3-begin s3-use-schema s2-create-table s3-create-table s2-commit s3-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s2-begin: + BEGIN; + +step s3-begin: + BEGIN; + +step s3-use-schema: + SET search_path TO myschema; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +create_distributed_table + + +step s3-create-table: + CREATE TABLE t2 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t2', 'a'); + +step s2-commit: + COMMIT; + +step s3-create-table: <... completed> +create_distributed_table + + +step s3-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + + +starting permutation: s1-print-distributed-objects s1-begin s2-begin s4-begin s1-add-worker s2-create-schema s4-create-schema2 s2-create-table s4-create-table s1-commit s2-commit s4-commit s2-print-distributed-objects +nodename nodeport isactive + +localhost 57637 t +step s1-print-distributed-objects: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +pg_identify_object_as_address + +count + +0 +run_command_on_workers + +(localhost,57637,t,0) +(localhost,57638,t,0) +master_remove_node + + +step s1-begin: + BEGIN; + +step s2-begin: + BEGIN; + +step s4-begin: + BEGIN; + +step s1-add-worker: + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + +nodename nodeport isactive + +localhost 57638 t +step s2-create-schema: + CREATE SCHEMA myschema; + SET search_path TO myschema; + +step s4-create-schema2: + CREATE SCHEMA myschema2; + SET search_path TO myschema2; + +step s2-create-table: + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); + +step s4-create-table: + CREATE TABLE t3 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t3', 'a'); + +step s1-commit: + COMMIT; + +step s2-create-table: <... completed> +create_distributed_table + + +step s4-create-table: <... completed> +create_distributed_table + + +step s2-commit: + COMMIT; + +step s4-commit: + COMMIT; + +step s2-print-distributed-objects: + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + +pg_identify_object_as_address + +(schema,{myschema},{}) +(schema,{myschema2},{}) +count + +1 +run_command_on_workers + +(localhost,57637,t,1) +(localhost,57638,t,1) +master_remove_node + + + diff --git a/src/test/regress/expected/isolation_replace_wait_function.out b/src/test/regress/expected/isolation_replace_wait_function.out index 48681f023..8cea30f25 100644 --- a/src/test/regress/expected/isolation_replace_wait_function.out +++ b/src/test/regress/expected/isolation_replace_wait_function.out @@ -16,7 +16,7 @@ step s1-commit: COMMIT; step s2-insert: <... completed> -error in steps s1-commit s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102353" +error in steps s1-commit s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102413" step s2-commit: COMMIT; diff --git a/src/test/regress/expected/multi_cluster_management.out b/src/test/regress/expected/multi_cluster_management.out index e5f1d6778..1305b232e 100644 --- a/src/test/regress/expected/multi_cluster_management.out +++ b/src/test/regress/expected/multi_cluster_management.out @@ -583,11 +583,11 @@ SELECT master_remove_node('localhost', 9999); INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole) VALUES ('localhost', 5000, :worker_1_group, 'primary'); ERROR: there cannot be two primary nodes in a group -CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 10 at RAISE +CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 10 at RAISE UPDATE pg_dist_node SET noderole = 'primary' WHERE groupid = :worker_1_group AND nodeport = 9998; ERROR: there cannot be two primary nodes in a group -CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 18 at RAISE +CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 at RAISE -- check that you can't manually add a primary to a non-default cluster INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster) VALUES ('localhost', 5000, 1000, 'primary', 'olap'); diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 3095bd817..ff4652bda 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -45,7 +45,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test'); count ------- 0 @@ -123,7 +123,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test'); count ------- 0 diff --git a/src/test/regress/expected/multi_router_planner_fast_path.out b/src/test/regress/expected/multi_router_planner_fast_path.out index 993624f87..8e204cc03 100644 --- a/src/test/regress/expected/multi_router_planner_fast_path.out +++ b/src/test/regress/expected/multi_router_planner_fast_path.out @@ -245,10 +245,6 @@ DEBUG: Plan is router executable CREATE TABLE company_employees (company_id int, employee_id int, manager_id int); SELECT master_create_distributed_table('company_employees', 'company_id', 'hash'); -DEBUG: schema "fast_path_router_select" already exists, skipping -DETAIL: NOTICE from localhost:57638 -DEBUG: schema "fast_path_router_select" already exists, skipping -DETAIL: NOTICE from localhost:57637 master_create_distributed_table --------------------------------- diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 356c1f9cb..40a5a4af0 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -159,8 +159,8 @@ SELECT count(*) FROM mx_table; 5 (1 row) --- master_add_node -SELECT 1 FROM master_add_node('localhost', 5432); +-- master_add_inactive_node +SELECT 1 FROM master_add_inactive_node('localhost', 5432); ERROR: operation is not allowed on this node HINT: Connect to the coordinator and run it again. SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; @@ -172,7 +172,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port DROP INDEX mx_test_uniq_index; -SELECT 1 FROM master_add_node('localhost', 5432); +SELECT 1 FROM master_add_inactive_node('localhost', 5432); ?column? ---------- 1 diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 8d181c423..1f323ac4a 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -4,6 +4,7 @@ test: isolation_update_node_lock_writes test: isolation_add_node_vs_reference_table_operations test: isolation_create_table_vs_add_remove_node test: isolation_master_update_node +test: isolation_ensure_dependency_activate_node # tests that change node metadata should precede # isolation_cluster_management such that tests diff --git a/src/test/regress/specs/isolation_ddl_vs_all.spec b/src/test/regress/specs/isolation_ddl_vs_all.spec index 6a885879c..1b57ddef6 100644 --- a/src/test/regress/specs/isolation_ddl_vs_all.spec +++ b/src/test/regress/specs/isolation_ddl_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of DDL in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE ddl_hash(id integer, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS ddl_hash CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/specs/isolation_delete_vs_all.spec b/src/test/regress/specs/isolation_delete_vs_all.spec index 558eae9d1..62b0d302a 100644 --- a/src/test/regress/specs/isolation_delete_vs_all.spec +++ b/src/test/regress/specs/isolation_delete_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of DELETE in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE delete_hash(id integer, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS delete_hash CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/specs/isolation_distributed_deadlock_detection.spec b/src/test/regress/specs/isolation_distributed_deadlock_detection.spec index f93355207..34aa929af 100644 --- a/src/test/regress/specs/isolation_distributed_deadlock_detection.spec +++ b/src/test/regress/specs/isolation_distributed_deadlock_detection.spec @@ -1,7 +1,7 @@ setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); CREATE TABLE deadlock_detection_reference (user_id int UNIQUE, some_val int); SELECT create_reference_table('deadlock_detection_reference'); @@ -26,7 +26,7 @@ teardown DROP TABLE local_deadlock_table; DROP TABLE deadlock_detection_test_rep_2; DROP TABLE deadlock_detection_reference; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); SET citus.shard_replication_factor = 1; } diff --git a/src/test/regress/specs/isolation_drop_vs_all.spec b/src/test/regress/specs/isolation_drop_vs_all.spec index 520d849f2..6bb7004d0 100644 --- a/src/test/regress/specs/isolation_drop_vs_all.spec +++ b/src/test/regress/specs/isolation_drop_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of DROP in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE drop_hash(id integer, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS drop_hash CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec new file mode 100644 index 000000000..efdef6985 --- /dev/null +++ b/src/test/regress/specs/isolation_ensure_dependency_activate_node.spec @@ -0,0 +1,180 @@ +# the test expects to have zero nodes in pg_dist_node at the beginning +# add single one of the nodes for the purpose of the test +setup +{ + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57637); +} + +# ensure that both nodes exists for the remaining of the isolation tests +teardown +{ + -- schema drops are not cascaded + SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$); + DROP SCHEMA IF EXISTS myschema CASCADE; + SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema2 CASCADE;$$); + DROP SCHEMA IF EXISTS myschema2 CASCADE; + + RESET search_path; + DROP TABLE IF EXISTS t1 CASCADE; + DROP TABLE IF EXISTS t2 CASCADE; + DROP TABLE IF EXISTS t3 CASCADE; + + SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-add-worker" +{ + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); +} + +step "s1-commit" +{ + COMMIT; +} + +# printing in session 1 adds the worker node, this makes we are sure we count the objects +# on that node as well. After counting objects is done we remove the node again. +step "s1-print-distributed-objects" +{ + SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638); + + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); + + SELECT master_remove_node('localhost', 57638); +} + +session "s2" + +step "s2-public-schema" +{ + SET search_path TO public; +} + +step "s2-create-schema" +{ + CREATE SCHEMA myschema; + SET search_path TO myschema; +} + +step "s2-create-table" +{ + CREATE TABLE t1 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t1', 'a'); +} + +step "s2-begin" +{ + BEGIN; +} + +step "s2-commit" +{ + COMMIT; +} + +# prints from session 2 are run at the end when the worker has already been added by the +# test +step "s2-print-distributed-objects" +{ + -- print an overview of all distributed objects + SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object; + + -- print if the schema has been created + SELECT count(*) FROM pg_namespace where nspname = 'myschema'; + SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$); +} + +session "s3" + +step "s3-public-schema" +{ + SET search_path TO public; +} + +step "s3-use-schema" +{ + SET search_path TO myschema; +} + +step "s3-create-table" +{ + CREATE TABLE t2 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t2', 'a'); +} + +step "s3-begin" +{ + BEGIN; +} + +step "s3-commit" +{ + COMMIT; +} + +session "s4" + +step "s4-public-schema" +{ + SET search_path TO public; +} + +step "s4-use-schema" +{ + SET search_path TO myschema; +} + +step "s4-create-schema2" +{ + CREATE SCHEMA myschema2; + SET search_path TO myschema2; +} + +step "s4-create-table" +{ + CREATE TABLE t3 (a int, b int); + -- session needs to have replication factor set to 1, can't do in setup + SET citus.shard_replication_factor TO 1; + SELECT create_distributed_table('t3', 'a'); +} + +step "s4-begin" +{ + BEGIN; +} + +step "s4-commit" +{ + COMMIT; +} + +# schema only tests +permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-public-schema" "s2-create-table" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-create-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-create-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-table" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects" + +# concurrency tests with multi schema distribution +permutation "s1-print-distributed-objects" "s2-create-schema" "s1-begin" "s2-begin" "s3-begin" "s1-add-worker" "s2-create-table" "s3-use-schema" "s3-create-table" "s1-commit" "s2-commit" "s3-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s2-create-schema" "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s1-add-worker" "s2-create-table" "s3-use-schema" "s3-create-table" "s4-use-schema" "s4-create-table" "s1-commit" "s2-commit" "s3-commit" "s4-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-add-worker" "s2-create-schema" "s2-begin" "s3-begin" "s3-use-schema" "s2-create-table" "s3-create-table" "s2-commit" "s3-commit" "s2-print-distributed-objects" +permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s4-begin" "s1-add-worker" "s2-create-schema" "s4-create-schema2" "s2-create-table" "s4-create-table" "s1-commit" "s2-commit" "s4-commit" "s2-print-distributed-objects" diff --git a/src/test/regress/specs/isolation_get_distributed_wait_queries.spec b/src/test/regress/specs/isolation_get_distributed_wait_queries.spec index 973b9a53a..085d7da62 100644 --- a/src/test/regress/specs/isolation_get_distributed_wait_queries.spec +++ b/src/test/regress/specs/isolation_get_distributed_wait_queries.spec @@ -17,8 +17,8 @@ setup LANGUAGE C STRICT VOLATILE AS 'citus', $$stop_session_level_connection_to_node$$; - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -- start_metadata_sync_to_node can not be run inside a transaction block -- following is a workaround to overcome that @@ -48,7 +48,7 @@ teardown { DROP TABLE ref_table; DROP TABLE tt1; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } session "s1" diff --git a/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec b/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec index 6e016f14f..9ab1dfabf 100644 --- a/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec +++ b/src/test/regress/specs/isolation_multi_shard_modify_vs_all.spec @@ -1,7 +1,7 @@ setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor to 1; SET citus.shard_count to 32; @@ -34,7 +34,7 @@ teardown { DROP TABLE users_test_table; DROP TABLE events_test_table; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); SET citus.shard_count to 4; } diff --git a/src/test/regress/specs/isolation_ref2ref_foreign_keys_on_mx.spec b/src/test/regress/specs/isolation_ref2ref_foreign_keys_on_mx.spec index cb515d91b..0436edec8 100644 --- a/src/test/regress/specs/isolation_ref2ref_foreign_keys_on_mx.spec +++ b/src/test/regress/specs/isolation_ref2ref_foreign_keys_on_mx.spec @@ -1,7 +1,7 @@ setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -- start_metadata_sync_to_node can not be run inside a transaction block. -- Following is a workaround to overcome that. Port numbers are hard coded @@ -32,7 +32,7 @@ setup teardown { DROP TABLE ref_table_1, ref_table_2, ref_table_3; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } session "s1" diff --git a/src/test/regress/specs/isolation_reference_on_mx.spec b/src/test/regress/specs/isolation_reference_on_mx.spec index df377429f..4e9db2ec0 100644 --- a/src/test/regress/specs/isolation_reference_on_mx.spec +++ b/src/test/regress/specs/isolation_reference_on_mx.spec @@ -17,8 +17,8 @@ setup LANGUAGE C STRICT VOLATILE AS 'citus', $$stop_session_level_connection_to_node$$; - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); -- start_metadata_sync_to_node can not be run inside a transaction block -- following is a workaround to overcome that @@ -42,7 +42,7 @@ setup teardown { DROP TABLE ref_table; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } session "s1" diff --git a/src/test/regress/specs/isolation_replace_wait_function.spec b/src/test/regress/specs/isolation_replace_wait_function.spec index 6f1b479d6..42d900e50 100644 --- a/src/test/regress/specs/isolation_replace_wait_function.spec +++ b/src/test/regress/specs/isolation_replace_wait_function.spec @@ -5,8 +5,8 @@ setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); CREATE TABLE test_locking (a int unique); SELECT create_distributed_table('test_locking', 'a'); @@ -15,7 +15,7 @@ setup teardown { DROP TABLE test_locking; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } session "s1" diff --git a/src/test/regress/specs/isolation_select_for_update.spec b/src/test/regress/specs/isolation_select_for_update.spec index 7745c3b35..e874e47d7 100644 --- a/src/test/regress/specs/isolation_select_for_update.spec +++ b/src/test/regress/specs/isolation_select_for_update.spec @@ -1,8 +1,8 @@ setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor to 1; @@ -27,7 +27,7 @@ teardown DROP TABLE test_table_2_rf1; DROP TABLE ref_table; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } session "s1" diff --git a/src/test/regress/specs/isolation_truncate_vs_all.spec b/src/test/regress/specs/isolation_truncate_vs_all.spec index cb71eb3b1..98bf9f81f 100644 --- a/src/test/regress/specs/isolation_truncate_vs_all.spec +++ b/src/test/regress/specs/isolation_truncate_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of TRUNCATE in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE truncate_append(id integer, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS truncate_append CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/specs/isolation_update_vs_all.spec b/src/test/regress/specs/isolation_update_vs_all.spec index a8292dbe3..6abc8fcb9 100644 --- a/src/test/regress/specs/isolation_update_vs_all.spec +++ b/src/test/regress/specs/isolation_update_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of UPDATE in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE update_hash(id integer, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS update_hash CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/specs/isolation_upsert_vs_all.spec b/src/test/regress/specs/isolation_upsert_vs_all.spec index 9392c975e..0e7fe67f7 100644 --- a/src/test/regress/specs/isolation_upsert_vs_all.spec +++ b/src/test/regress/specs/isolation_upsert_vs_all.spec @@ -5,8 +5,8 @@ # create range distributed table to test behavior of UPSERT in concurrent operations setup { - SELECT citus.replace_isolation_tester_func(); - SELECT citus.refresh_isolation_tester_prepared_statement(); + SELECT citus_internal.replace_isolation_tester_func(); + SELECT citus_internal.refresh_isolation_tester_prepared_statement(); SET citus.shard_replication_factor TO 1; CREATE TABLE upsert_hash(id integer PRIMARY KEY, data text); @@ -18,7 +18,7 @@ teardown { DROP TABLE IF EXISTS upsert_hash CASCADE; - SELECT citus.restore_isolation_tester_func(); + SELECT citus_internal.restore_isolation_tester_func(); } # session 1 diff --git a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql index 7138ac15e..8303a880b 100644 --- a/src/test/regress/sql/failure_create_distributed_table_non_empty.sql +++ b/src/test/regress/sql/failure_create_distributed_table_non_empty.sql @@ -117,6 +117,10 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT recover_prepared_transactions(); DROP TABLE test_table ; +-- since we want to interrupt the schema creation again we need to drop and recreate +-- for citus to redistribute the dependency +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); @@ -202,6 +206,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W SELECT citus.mitmproxy('conn.allow()'); DROP TABLE colocated_table; DROP TABLE test_table; +DROP SCHEMA create_distributed_table_non_empty_failure; +CREATE SCHEMA create_distributed_table_non_empty_failure; CREATE TABLE test_table(id int, value_1 int); INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4); SET citus.multi_shard_commit_protocol TO '1pc'; diff --git a/src/test/regress/sql/failure_create_reference_table.sql b/src/test/regress/sql/failure_create_reference_table.sql index 0e8176520..47a9592fb 100644 --- a/src/test/regress/sql/failure_create_reference_table.sql +++ b/src/test/regress/sql/failure_create_reference_table.sql @@ -75,6 +75,8 @@ SET client_min_messages TO NOTICE; SELECT citus.mitmproxy('conn.allow()'); DROP TABLE ref_table; +DROP SCHEMA failure_reference_table; +CREATE SCHEMA failure_reference_table; CREATE TABLE ref_table(id int); INSERT INTO ref_table VALUES(1),(2),(3); diff --git a/src/test/regress/sql/failure_create_table.sql b/src/test/regress/sql/failure_create_table.sql index 11e0dd5f6..fe9790fc9 100644 --- a/src/test/regress/sql/failure_create_table.sql +++ b/src/test/regress/sql/failure_create_table.sql @@ -104,7 +104,12 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT count(*) FROM pg_dist_shard; SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE temp_table; +DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; +CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction -- Kill connection before sending query to the worker @@ -143,7 +148,10 @@ SELECT citus.mitmproxy('conn.allow()'); SELECT count(*) FROM pg_dist_shard; SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); +-- drop tables and schema and recreate to start from a non-distributed schema again DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; CREATE TABLE test_table(id int, value_1 int); -- Test inside transaction and with 1PC @@ -196,14 +204,16 @@ SELECT count(*) FROM pg_dist_shard; SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$); DROP TABLE test_table; +DROP SCHEMA failure_create_table; +CREATE SCHEMA failure_create_table; -- Test master_create_worker_shards with 2pc SET citus.multi_shard_commit_protocol TO "2pc"; CREATE TABLE test_table_2(id int, value_1 int); SELECT master_create_distributed_table('test_table_2', 'id', 'hash'); --- Kill connection before sending query to the worker -SELECT citus.mitmproxy('conn.kill()'); +-- Kill connection before sending query to the worker +SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); SELECT master_create_worker_shards('test_table_2', 4, 2); SELECT count(*) FROM pg_dist_shard; diff --git a/src/test/regress/sql/failure_vacuum.sql b/src/test/regress/sql/failure_vacuum.sql index b51163ce4..0f929eef1 100644 --- a/src/test/regress/sql/failure_vacuum.sql +++ b/src/test/regress/sql/failure_vacuum.sql @@ -6,6 +6,8 @@ SHOW server_version \gset SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten; +SET citus.next_shard_id TO 12000000; + SELECT citus.mitmproxy('conn.allow()'); SET citus.shard_count = 1; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 300569ee7..f4837f646 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -46,7 +46,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test'); -- DROP EXTENSION pre-created by the regression suite @@ -120,7 +120,7 @@ FROM pg_depend AS pgd, WHERE pgd.refclassid = 'pg_extension'::regclass AND pgd.refobjid = pge.oid AND pge.extname = 'citus' AND - pgio.schema NOT IN ('pg_catalog', 'citus', 'test'); + pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test'); -- see incompatible version errors out RESET citus.enable_version_checks; diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index dccc04c61..d6f66aba4 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -100,15 +100,15 @@ SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE lo SELECT master_apply_delete_command('DELETE FROM mx_table'); SELECT count(*) FROM mx_table; --- master_add_node +-- master_add_inactive_node -SELECT 1 FROM master_add_node('localhost', 5432); +SELECT 1 FROM master_add_inactive_node('localhost', 5432); SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432; -- master_remove_node \c - - - :master_port DROP INDEX mx_test_uniq_index; -SELECT 1 FROM master_add_node('localhost', 5432); +SELECT 1 FROM master_add_inactive_node('localhost', 5432); \c - - - :worker_1_port SELECT master_remove_node('localhost', 5432);