Refactor Ensure Schema Exists to Ensure Dependecies Exists (#2882)

DESCRIPTION: Refactor ensure schema exists to dependency exists

Historically we only supported schema's as table dependencies to be created on the workers before a table gets distributed. This PR puts infrastructure in place to walk pg_depend to figure out which dependencies to create on the workers. Currently only schema's are supported as objects to create before creating a table.

We also keep track of dependencies that have been created in the cluster. When we add a new node to the cluster we use this catalog to know which objects need to be created on the worker.

Side effect of knowing which objects are already distributed is that we don't have debug messages anymore when creating schema's that are already created on the workers.
pull/2879/head
Nils Dijk 2019-09-04 14:10:20 +02:00 committed by GitHub
parent bc97523940
commit 936d546a3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
62 changed files with 2640 additions and 232 deletions

View File

@ -1,6 +1,102 @@
/* citus--8.3-1--8.4-1 */
/* bump version to 8.4-1 */
CREATE SCHEMA IF NOT EXISTS citus_internal;
-- move citus internal functions to citus_internal to make space in the citus schema for
-- our public interface
ALTER FUNCTION citus.find_groupid_for_node SET SCHEMA citus_internal;
ALTER FUNCTION citus.pg_dist_node_trigger_func SET SCHEMA citus_internal;
ALTER FUNCTION citus.pg_dist_shard_placement_trigger_func SET SCHEMA citus_internal;
ALTER FUNCTION citus.refresh_isolation_tester_prepared_statement SET SCHEMA citus_internal;
ALTER FUNCTION citus.replace_isolation_tester_func SET SCHEMA citus_internal;
ALTER FUNCTION citus.restore_isolation_tester_func SET SCHEMA citus_internal;
CREATE OR REPLACE FUNCTION citus_internal.pg_dist_shard_placement_trigger_func()
RETURNS TRIGGER AS $$
BEGIN
IF (TG_OP = 'DELETE') THEN
DELETE FROM pg_dist_placement WHERE placementid = OLD.placementid;
RETURN OLD;
ELSIF (TG_OP = 'UPDATE') THEN
UPDATE pg_dist_placement
SET shardid = NEW.shardid, shardstate = NEW.shardstate,
shardlength = NEW.shardlength, placementid = NEW.placementid,
groupid = citus_internal.find_groupid_for_node(NEW.nodename, NEW.nodeport)
WHERE placementid = OLD.placementid;
RETURN NEW;
ELSIF (TG_OP = 'INSERT') THEN
INSERT INTO pg_dist_placement
(placementid, shardid, shardstate, shardlength, groupid)
VALUES (NEW.placementid, NEW.shardid, NEW.shardstate, NEW.shardlength,
citus_internal.find_groupid_for_node(NEW.nodename, NEW.nodeport));
RETURN NEW;
END IF;
END;
$$ LANGUAGE plpgsql;
CREATE OR REPLACE FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, objid oid, objsubid int)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_unmark_object_distributed$$;
COMMENT ON FUNCTION pg_catalog.master_unmark_object_distributed(classid oid, objid oid, objsubid int)
IS 'remove an object address from citus.pg_dist_object once the object has been deleted';
CREATE TABLE citus.pg_dist_object (
classid oid NOT NULL,
objid oid NOT NULL,
objsubid integer NOT NULL,
-- fields used for upgrades
type text DEFAULT NULL,
object_names text[] DEFAULT NULL,
object_args text[] DEFAULT NULL,
CONSTRAINT pg_dist_object_pkey PRIMARY KEY (classid, objid, objsubid)
);
CREATE OR REPLACE FUNCTION pg_catalog.citus_drop_trigger()
RETURNS event_trigger
LANGUAGE plpgsql
SET search_path = pg_catalog
AS $cdbdt$
DECLARE
v_obj record;
sequence_names text[] := '{}';
table_colocation_id integer;
propagate_drop boolean := false;
BEGIN
-- collect set of dropped sequences to drop on workers later
SELECT array_agg(object_identity) INTO sequence_names
FROM pg_event_trigger_dropped_objects()
WHERE object_type = 'sequence';
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
WHERE object_type IN ('table', 'foreign table')
LOOP
-- first drop the table and metadata on the workers
-- then drop all the shards on the workers
-- finally remove the pg_dist_partition entry on the coordinator
PERFORM master_remove_distributed_table_metadata_from_workers(v_obj.objid, v_obj.schema_name, v_obj.object_name);
PERFORM master_drop_all_shards(v_obj.objid, v_obj.schema_name, v_obj.object_name);
PERFORM master_remove_partition_metadata(v_obj.objid, v_obj.schema_name, v_obj.object_name);
END LOOP;
IF cardinality(sequence_names) > 0 THEN
PERFORM master_drop_sequences(sequence_names);
END IF;
-- remove entries from citus.pg_dist_object for all dropped root (objsubid = 0) objects
FOR v_obj IN SELECT * FROM pg_event_trigger_dropped_objects()
LOOP
PERFORM master_unmark_object_distributed(v_obj.classid, v_obj.objid, v_obj.objsubid);
END LOOP;
END;
$cdbdt$;
COMMENT ON FUNCTION pg_catalog.citus_drop_trigger()
IS 'perform checks and actions at the end of DROP actions';
CREATE OR REPLACE FUNCTION pg_catalog.citus_prepare_pg_upgrade()
RETURNS void
LANGUAGE plpgsql
@ -21,6 +117,10 @@ BEGIN
-- enterprise catalog tables
CREATE TABLE public.pg_dist_authinfo AS SELECT * FROM pg_catalog.pg_dist_authinfo;
CREATE TABLE public.pg_dist_poolinfo AS SELECT * FROM pg_catalog.pg_dist_poolinfo;
-- store upgrade stable identifiers on pg_dist_object catalog
UPDATE citus.pg_dist_object
SET (type, object_names, object_args) = (SELECT * FROM pg_identify_object_as_address(classid, objid, objsubid));
END;
$cppu$;
@ -102,6 +202,23 @@ BEGIN
'n' as deptype
FROM pg_catalog.pg_dist_partition p;
-- restore pg_dist_object from the stable identifiers
WITH old_records AS (
DELETE FROM
citus.pg_dist_object
RETURNING
type,
object_names,
object_args
)
INSERT INTO citus.pg_dist_object (classid, objid, objsubid)
SELECT
address.classid,
address.objid,
address.objsubid
FROM
old_records naming,
pg_get_object_address(naming.type, naming.object_names, naming.object_args) address;
END;
$cppu$;

View File

@ -132,6 +132,7 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
char distributionMethod = 0;
char *colocateWithTableName = NULL;
bool viaDeprecatedAPI = true;
ObjectAddress tableAddress = { 0 };
Relation relation = NULL;
@ -140,12 +141,13 @@ master_create_distributed_table(PG_FUNCTION_ARGS)
EnsureTableOwner(relationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(relationId);
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
/*
* Lock target relation with an exclusive lock - there's no way to make
@ -193,6 +195,7 @@ create_distributed_table(PG_FUNCTION_ARGS)
text *distributionColumnText = NULL;
Oid distributionMethodOid = InvalidOid;
text *colocateWithTableNameText = NULL;
ObjectAddress tableAddress = { 0 };
Relation relation = NULL;
char *distributionColumnName = NULL;
@ -214,12 +217,13 @@ create_distributed_table(PG_FUNCTION_ARGS)
EnsureTableOwner(relationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(relationId);
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
/*
* Lock target relation with an exclusive lock - there's no way to make
@ -272,6 +276,7 @@ create_reference_table(PG_FUNCTION_ARGS)
List *workerNodeList = NIL;
int workerCount = 0;
Var *distributionColumn = NULL;
ObjectAddress tableAddress = { 0 };
bool viaDeprecatedAPI = false;
@ -280,12 +285,13 @@ create_reference_table(PG_FUNCTION_ARGS)
EnsureTableOwner(relationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(relationId);
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
/*
* Lock target relation with an exclusive lock - there's no way to make

View File

@ -0,0 +1,240 @@
/*-------------------------------------------------------------------------
*
* dependencies.c
* Commands to create dependencies of an object on all workers.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "catalog/dependency.h"
#include "catalog/objectaddress.h"
#include "distributed/commands.h"
#include "distributed/connection_management.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata_sync.h"
#include "distributed/remote_commands.h"
#include "distributed/worker_manager.h"
#include "storage/lmgr.h"
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
/*
* EnsureDependenciesExists finds all the dependencies that we support and makes sure
* these are available on all workers. If not available they will be created on the
* workers via a separate session that will be committed directly so that the objects are
* visible to potentially multiple sessions creating the shards.
*
* Note; only the actual objects are created via a separate session, the local records to
* pg_dist_object are created in this session. As a side effect the objects could be
* created on the workers without a catalog entry on the coordinator. Updates to the
* objects on the coordinator are not propagated to the workers until the record is
* visible on the coordinator.
*
* This is solved by creating the dependencies in an idempotent manner, either via
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
*/
void
EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target)
{
const uint32 connectionFlag = FORCE_NEW_CONNECTION;
/* local variables to work with dependencies */
List *dependencies = NIL;
ListCell *dependencyCell = NULL;
/* local variables to collect ddl commands */
List *ddlCommands = NULL;
/* local variables to work with worker nodes */
List *workerNodeList = NULL;
ListCell *workerNodeCell = NULL;
List *connections = NULL;
ListCell *connectionCell = NULL;
/*
* collect all dependencies in creation order and get their ddl commands
*/
dependencies = GetDependenciesForObject(target);
foreach(dependencyCell, dependencies)
{
ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell);
ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency));
}
if (list_length(ddlCommands) <= 0)
{
/* no ddl commands to be executed */
return;
}
/*
* Make sure that no new nodes are added after this point until the end of the
* transaction by taking a RowShareLock on pg_dist_node, which conflicts with the
* ExclusiveLock taken by master_add_node.
* This guarantees that all active nodes will have the object, because they will
* either get it now, or get it in master_add_node after this transaction finishes and
* the pg_dist_object record becomes visible.
*/
LockRelationOid(DistNodeRelationId(), RowShareLock);
/*
* right after we acquired the lock we mark our objects as distributed, these changes
* will not become visible before we have successfully created all the objects on our
* workers.
*
* It is possible to create distributed tables which depend on other dependencies
* before any node is in the cluster. If we would wait till we actually had connected
* to the nodes before marking the objects as distributed these objects would never be
* created on the workers when they get added, causing shards to fail to create.
*/
foreach(dependencyCell, dependencies)
{
ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell);
MarkObjectDistributed(dependency);
}
/*
* collect and connect to all applicable nodes
*/
workerNodeList = ActivePrimaryNodeList();
if (list_length(workerNodeList) <= 0)
{
/* no nodes to execute on */
return;
}
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
MultiConnection *connection = NULL;
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
connection = StartNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
CitusExtensionOwnerName(), NULL);
connections = lappend(connections, connection);
}
FinishConnectionListEstablishment(connections);
/*
* create dependency on all nodes
*/
foreach(connectionCell, connections)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
ExecuteCriticalRemoteCommandList(connection, ddlCommands);
}
/*
* disconnect from nodes
*/
foreach(connectionCell, connections)
{
MultiConnection *connection = (MultiConnection *) lfirst(connectionCell);
CloseConnection(connection);
}
}
/*
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
* commands to execute on a worker to create the object.
*/
static List *
GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{
switch (getObjectClass(dependency))
{
case OCLASS_SCHEMA:
{
const char *schemaDDLCommand = CreateSchemaDDLCommand(dependency->objectId);
if (schemaDDLCommand == NULL)
{
/* no schema to create */
return NIL;
}
return list_make1((void *) schemaDDLCommand);
}
default:
{
break;
}
}
/*
* make sure it fails hard when in debug mode, leave a hint for the user if this ever
* happens in production
*/
Assert(false);
ereport(ERROR, (errmsg("unsupported object %s for distribution by citus",
getObjectTypeDescription(dependency)),
errdetail(
"citus tries to recreate an unsupported object on its workers"),
errhint("please report a bug as this should not be happening")));
return NIL;
}
/*
* ReplicateAllDependenciesToNode replicate all previously marked objects to a worker node
*/
void
ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
{
const uint32 connectionFlag = FORCE_NEW_CONNECTION;
ListCell *dependencyCell = NULL;
List *dependencies = NIL;
List *ddlCommands = NIL;
MultiConnection *connection = NULL;
/*
* collect all dependencies in creation order and get their ddl commands
*/
dependencies = GetDistributedObjectAddressList();
/*
* When dependency lists are getting longer we see a delay in the creation time on the
* workers. We would like to inform the user. Currently we warn for lists greater then
* 100 items, where 100 is an arbitrarily chosen number. If we find it too high or too
* low we can adjust this based on experience.
*/
if (list_length(dependencies) > 100)
{
ereport(NOTICE, (errmsg("Replicating postgres objects to node %s:%d", nodeName,
nodePort),
errdetail("There are %d objects to replicate, depending on your "
"environment this might take a while",
list_length(dependencies))));
}
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
foreach(dependencyCell, dependencies)
{
ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell);
ddlCommands = list_concat(ddlCommands,
GetDependencyCreateDDLCommands(dependency));
}
if (list_length(ddlCommands) <= 0)
{
/* no commands to replicate dependencies to the new worker */
return;
}
/*
* connect to the new host and create all applicable dependencies
*/
connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName, nodePort,
CitusExtensionOwnerName(), NULL);
ExecuteCriticalRemoteCommandList(connection, ddlCommands);
CloseConnection(connection);
}

View File

@ -138,53 +138,3 @@ PlanAlterObjectSchemaStmt(AlterObjectSchemaStmt *alterObjectSchemaStmt,
return NIL;
}
/*
* EnsureSchemaExistsOnAllNodes connects to all nodes with citus extension user
* and creates the schema of the given relationId. The function errors out if the
* command cannot be executed in any of the worker nodes.
*/
void
EnsureSchemaExistsOnAllNodes(Oid relationId)
{
List *workerNodeList = ActivePrimaryNodeList();
ListCell *workerNodeCell = NULL;
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
char *nodeName = workerNode->workerName;
uint32 nodePort = workerNode->workerPort;
EnsureSchemaExistsOnNode(relationId, nodeName, nodePort);
}
}
/*
* EnsureSchemaExistsOnNode connects to one node with citus extension user
* and creates the schema of the given relationId. The function errors out if the
* command cannot be executed in the node.
*/
void
EnsureSchemaExistsOnNode(Oid relationId, char *nodeName, int32 nodePort)
{
uint64 connectionFlag = FORCE_NEW_CONNECTION;
MultiConnection *connection = NULL;
/* if the schema creation command is not provided, create it */
Oid schemaId = get_rel_namespace(relationId);
char *schemaCreationDDL = CreateSchemaDDLCommand(schemaId);
/* if the relation lives in public namespace, no need to perform any queries in workers */
if (schemaCreationDDL == NULL)
{
return;
}
connection = GetNodeUserDatabaseConnection(connectionFlag, nodeName,
nodePort, CitusExtensionOwnerName(), NULL);
ExecuteCriticalRemoteCommand(connection, schemaCreationDDL);
}

View File

@ -67,6 +67,7 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
text *tableNameText = PG_GETARG_TEXT_P(0);
int32 shardCount = PG_GETARG_INT32(1);
int32 replicationFactor = PG_GETARG_INT32(2);
ObjectAddress tableAddress = { 0 };
Oid distributedTableId = ResolveRelationId(tableNameText, false);
@ -77,12 +78,13 @@ master_create_worker_shards(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(distributedTableId);
ObjectAddressSet(tableAddress, RelationRelationId, distributedTableId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
CreateShardsWithRoundRobinPolicy(distributedTableId, shardCount, replicationFactor,
useExclusiveConnections);

View File

@ -237,15 +237,6 @@ RepairShardPlacement(int64 shardId, char *sourceNodeName, int32 sourceNodePort,
EnsureTableOwner(distributedTableId);
/*
* Ensure schema exists on the target worker node. We can not run this
* function transactionally, since we may create shards over separate
* sessions and shard creation depends on the schema being present and
* visible from all sessions.
*/
EnsureSchemaExistsOnNode(distributedTableId, targetNodeName,
targetNodePort);
if (relationKind == RELKIND_FOREIGN_TABLE)
{
char *relationName = get_rel_name(distributedTableId);

View File

@ -90,6 +90,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char *relationName = text_to_cstring(relationNameText);
uint64 shardId = INVALID_SHARD_ID;
uint32 attemptableNodeCount = 0;
ObjectAddress tableAddress = { 0 };
uint32 candidateNodeIndex = 0;
List *candidateNodeList = NIL;
@ -108,12 +109,13 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
CheckDistributedTable(relationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created
* via their own connection and committed immediately so they become visible to all
* sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(relationId);
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
/* don't allow the table to be dropped */
LockRelationOid(relationId, AccessShareLock);

View File

@ -0,0 +1,489 @@
/*-------------------------------------------------------------------------
*
* dependency.c
* Functions to reason about distributed objects and their dependencies
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/skey.h"
#include "catalog/dependency.h"
#include "catalog/indexing.h"
#include "catalog/pg_depend.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "utils/fmgroids.h"
#include "utils/hsearch.h"
#include "utils/lsyscache.h"
/*
* ObjectAddressCollector keeps track of collected ObjectAddresses. This can be used
* together with recurse_pg_depend.
*
* We keep two different datastructures for the following reasons
* - A List ordered by insert/collect order
* - A Set to quickly O(1) check if an ObjectAddress has already been collected
*/
typedef struct ObjectAddressCollector
{
List *dependencyList;
HTAB *dependencySet;
} ObjectAddressCollector;
/* forward declarations for functions to interact with the ObjectAddressCollector */
static void InitObjectAddressCollector(ObjectAddressCollector *collector);
static void CollectObjectAddress(ObjectAddressCollector *collector, const
ObjectAddress *address);
static bool IsObjectAddressCollected(const ObjectAddress *findAddress,
ObjectAddressCollector *collector);
/* forward declaration of functions that recurse pg_depend */
static void recurse_pg_depend(const ObjectAddress *target,
List * (*expand)(void *context, const
ObjectAddress *target),
bool (*follow)(void *context, Form_pg_depend row),
void (*apply)(void *context, Form_pg_depend row),
void *context);
static bool FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend);
static bool FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend);
static void ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend);
/* forward declaration of support functions to decide what to follow */
static bool SupportedDependencyByCitus(const ObjectAddress *address);
static bool IsObjectAddressOwnedByExtension(const ObjectAddress *target);
/*
* GetDependenciesForObject returns a list of ObjectAddesses to be created in order
* before the target object could safely be created on a worker. Some of the object might
* already be created on a worker. It should be created in an idempotent way.
*/
List *
GetDependenciesForObject(const ObjectAddress *target)
{
ObjectAddressCollector collector = { 0 };
InitObjectAddressCollector(&collector);
recurse_pg_depend(target,
NULL,
&FollowNewSupportedDependencies,
&ApplyAddToDependencyList,
&collector);
return collector.dependencyList;
}
/*
* OrderObjectAddressListInDependencyOrder given a list of ObjectAddresses return a new
* list of the same ObjectAddresses ordered on dependency order where dependencies
* precedes the corresponding object in the list.
*
* The algortihm traveses pg_depend in a depth first order starting at the first object in
* the provided list. By traversing depth first it will put the first dependency at the
* head of the list with dependencies depending on them later.
*
* If the object is already in the list it is skipped for traversal. This happens when an
* object was already added to the target list before it occurred in the input list.
*/
List *
OrderObjectAddressListInDependencyOrder(List *objectAddressList)
{
ObjectAddressCollector collector = { 0 };
ListCell *ojectAddressCell = NULL;
InitObjectAddressCollector(&collector);
foreach(ojectAddressCell, objectAddressList)
{
ObjectAddress *objectAddress = (ObjectAddress *) lfirst(ojectAddressCell);
if (IsObjectAddressCollected(objectAddress, &collector))
{
/* skip objects that are already ordered */
continue;
}
recurse_pg_depend(objectAddress,
NULL,
&FollowAllSupportedDependencies,
&ApplyAddToDependencyList,
&collector);
CollectObjectAddress(&collector, objectAddress);
}
return collector.dependencyList;
}
/*
* recurse_pg_depend recursively visits pg_depend entries.
*
* `expand` allows based on the target ObjectAddress to generate extra entries for ease of
* traversal.
*
* Starting from the target ObjectAddress. For every existing and generated entry the
* `follow` function will be called. When `follow` returns true it will recursively visit
* the dependencies for that object. recurse_pg_depend will visit therefore all pg_depend
* entries.
*
* Visiting will happen in depth first order, which is useful to create or sorted lists of
* dependencies to create.
*
* For all pg_depend entries that should be visited the apply function will be called.
* This function is designed to be the mutating function for the context being passed.
* Although nothing prevents the follow function to also mutate the context.
*
* - follow will be called on the way down, so the invocation order is top to bottom of
* the dependency tree
* - apply is called on the way back, so the invocation order is bottom to top. Apply is
* not called for entries for which follow has returned false.
*/
static void
recurse_pg_depend(const ObjectAddress *target,
List * (*expand)(void *context, const ObjectAddress *target),
bool (*follow)(void *context, Form_pg_depend row),
void (*apply)(void *context, Form_pg_depend row),
void *context)
{
Relation depRel = NULL;
ScanKeyData key[2];
SysScanDesc depScan = NULL;
HeapTuple depTup = NULL;
List *pgDependEntries = NIL;
ListCell *pgDependCell = NULL;
/*
* iterate the actual pg_depend catalog
*/
depRel = heap_open(DependRelationId, AccessShareLock);
/* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->classId));
ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->objectId));
depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, key);
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
Form_pg_depend pg_depend_copy = palloc0(sizeof(FormData_pg_depend));
*pg_depend_copy = *pg_depend;
pgDependEntries = lappend(pgDependEntries, pg_depend_copy);
}
systable_endscan(depScan);
relation_close(depRel, AccessShareLock);
/*
* concat expended entries if applicable
*/
if (expand != NULL)
{
List *expandedEntries = NIL;
expandedEntries = expand(context, target);
pgDependEntries = list_concat(pgDependEntries, expandedEntries);
}
/*
* Iterate all entries and recurse depth first
*/
foreach(pgDependCell, pgDependEntries)
{
Form_pg_depend pg_depend = (Form_pg_depend) lfirst(pgDependCell);
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
if (follow == NULL || !follow(context, pg_depend))
{
/* skip all pg_depend entries the user didn't want to follow */
return;
}
/*
* recurse depth first, this makes sure we call apply for the deepest dependency
* first.
*/
recurse_pg_depend(&address, expand, follow, apply, context);
/* now apply changes for current entry */
if (apply != NULL)
{
apply(context, pg_depend);
}
}
}
/*
* InitObjectAddressCollector takes a pointer to an already allocated (possibly stack)
* ObjectAddressCollector struct. It makes sure this struct is ready to be used for object
* collection.
*
* If an already initialized collector is passed the collector will be cleared from its
* contents to be reused.
*/
static void
InitObjectAddressCollector(ObjectAddressCollector *collector)
{
int hashFlags = 0;
HASHCTL info;
memset(&info, 0, sizeof(info));
info.keysize = sizeof(ObjectAddress);
info.entrysize = sizeof(ObjectAddress);
info.hcxt = CurrentMemoryContext;
hashFlags = (HASH_ELEM | HASH_CONTEXT | HASH_BLOBS);
collector->dependencySet = hash_create("dependency set", 128, &info, hashFlags);
collector->dependencyList = NULL;
}
/*
* CollectObjectAddress adds an ObjectAddress to the collector.
*/
static void
CollectObjectAddress(ObjectAddressCollector *collector, const ObjectAddress *collect)
{
ObjectAddress *address = NULL;
bool found = false;
/* add to set */
address = (ObjectAddress *) hash_search(collector->dependencySet, collect,
HASH_ENTER, &found);
if (!found)
{
/* copy object address in */
*address = *collect;
}
/* add to list*/
collector->dependencyList = lappend(collector->dependencyList, address);
}
/*
* IsObjectAddressCollected is a helper function that can check if an ObjectAddress is
* already in a (unsorted) list of ObjectAddresses
*/
static bool
IsObjectAddressCollected(const ObjectAddress *findAddress,
ObjectAddressCollector *collector)
{
bool found = false;
/* add to set */
hash_search(collector->dependencySet, findAddress, HASH_FIND, &found);
return found;
}
/*
* SupportedDependencyByCitus returns whether citus has support to distribute the object
* addressed.
*/
static bool
SupportedDependencyByCitus(const ObjectAddress *address)
{
/*
* looking at the type of a object to see if we know how to create the object on the
* workers.
*/
switch (getObjectClass(address))
{
case OCLASS_SCHEMA:
{
return true;
}
default:
{
/* unsupported type */
return false;
}
}
}
/*
* IsObjectAddressOwnedByExtension returns whether or not the object is owned by an
* extension. It is assumed that an object having a dependency on an extension is created
* by that extension and therefore owned by that extension.
*/
static bool
IsObjectAddressOwnedByExtension(const ObjectAddress *target)
{
Relation depRel = NULL;
ScanKeyData key[2];
SysScanDesc depScan = NULL;
HeapTuple depTup = NULL;
bool result = false;
depRel = heap_open(DependRelationId, AccessShareLock);
/* scan pg_depend for classid = $1 AND objid = $2 using pg_depend_depender_index */
ScanKeyInit(&key[0], Anum_pg_depend_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->classId));
ScanKeyInit(&key[1], Anum_pg_depend_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(target->objectId));
depScan = systable_beginscan(depRel, DependDependerIndexId, true, NULL, 2, key);
while (HeapTupleIsValid(depTup = systable_getnext(depScan)))
{
Form_pg_depend pg_depend = (Form_pg_depend) GETSTRUCT(depTup);
if (pg_depend->deptype == DEPENDENCY_EXTENSION)
{
result = true;
break;
}
}
systable_endscan(depScan);
heap_close(depRel, AccessShareLock);
return result;
}
/*
* FollowNewSupportedDependencies applies filters on pg_depend entries to follow all
* objects which should be distributed before the root object can safely be created.
*/
static bool
FollowNewSupportedDependencies(void *context, Form_pg_depend pg_depend)
{
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
/*
* Distirbute only normal dependencies, other dependencies are internal dependencies
* and managed by postgres
*/
if (pg_depend->deptype != DEPENDENCY_NORMAL)
{
return false;
}
/*
* We can only distribute dependencies that citus knows how to distribute
*/
if (!SupportedDependencyByCitus(&address))
{
return false;
}
/*
* If the object is already in our dependency list we do not have to follow any
* further
*/
if (IsObjectAddressCollected(&address, collector))
{
return false;
}
/*
* If the object is already distributed it is not a `new` object that needs to be
* distributed before we create a dependant object
*/
if (IsObjectDistributed(&address))
{
return false;
}
/*
* Objects owned by an extension are assumed to be created on the workers by creating
* the extension in the cluster
*/
if (IsObjectAddressOwnedByExtension(&address))
{
return false;
}
return true;
}
/*
* FollowAllSupportedDependencies applies filters on pg_depend entries to follow the
* dependency tree of objects in depth first order. We will visit all supported objects.
* This is used to sort a list of dependencies in dependency order.
*/
static bool
FollowAllSupportedDependencies(void *context, Form_pg_depend pg_depend)
{
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
/*
* Distirbute only normal dependencies, other dependencies are internal dependencies
* and managed by postgres
*/
if (pg_depend->deptype != DEPENDENCY_NORMAL)
{
return false;
}
/*
* We can only distribute dependencies that citus knows how to distribute
*/
if (!SupportedDependencyByCitus(&address))
{
return false;
}
/*
* If the object is already in our dependency list we do not have to follow any
* further
*/
if (IsObjectAddressCollected(&address, collector))
{
return false;
}
/*
* Objects owned by an extension are assumed to be created on the workers by creating
* the extension in the cluster
*/
if (IsObjectAddressOwnedByExtension(&address))
{
return false;
}
return true;
}
/*
* ApplyAddToDependencyList is an apply function for recurse_pg_depend that will collect
* all the ObjectAddresses for pg_depend entries to the context. The context here is
* assumed to be a (ObjectAddressCollector *) to the location where all ObjectAddresses
* will be collected.
*/
static void
ApplyAddToDependencyList(void *context, Form_pg_depend pg_depend)
{
ObjectAddressCollector *collector = (ObjectAddressCollector *) context;
ObjectAddress address = { 0 };
ObjectAddressSet(address, pg_depend->refclassid, pg_depend->refobjid);
CollectObjectAddress(collector, &address);
}

View File

@ -0,0 +1,282 @@
/*-------------------------------------------------------------------------
*
* distobject.c
* Functions to interact with distributed objects by their ObjectAddress
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "access/genam.h"
#include "access/heapam.h"
#include "access/htup_details.h"
#include "access/skey.h"
#include "access/xact.h"
#include "catalog/dependency.h"
#include "catalog/namespace.h"
#include "catalog/objectaddress.h"
#include "catalog/pg_namespace.h"
#include "catalog/pg_type.h"
#include "distributed/metadata/distobject.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/metadata_cache.h"
#include "executor/spi.h"
#include "nodes/makefuncs.h"
#include "nodes/pg_list.h"
#include "parser/parse_type.h"
#include "utils/builtins.h"
#include "utils/fmgroids.h"
#include "utils/regproc.h"
#include "utils/rel.h"
static int ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues);
PG_FUNCTION_INFO_V1(master_unmark_object_distributed);
/*
* master_unmark_object_distributed(classid oid, objid oid, objsubid int)
*
* removes the entry for an object address from pg_dist_object. Only removes the entry if
* the object does not exist anymore.
*/
Datum
master_unmark_object_distributed(PG_FUNCTION_ARGS)
{
Oid classid = PG_GETARG_OID(0);
Oid objid = PG_GETARG_OID(1);
int32 objsubid = PG_GETARG_INT32(2);
ObjectAddress address = { 0 };
ObjectAddressSubSet(address, classid, objid, objsubid);
if (!IsObjectDistributed(&address))
{
/* if the object is not distributed there is no need to unmark it */
PG_RETURN_VOID();
}
if (ObjectExists(&address))
{
ereport(ERROR, (errmsg("object still exists"),
errdetail("the %s \"%s\" still exists",
getObjectTypeDescription(&address),
getObjectIdentity(&address)),
errhint("drop the object via a DROP command")));
}
UnmarkObjectDistributed(&address);
PG_RETURN_VOID();
}
/*
* ObjectExists checks if an object given by its object address exists
*
* This is done by opening the catalog for the object and search the catalog for the
* objects' oid. If we can find a tuple the object is existing. If no tuple is found, or
* we don't have the information to find the tuple by its oid we assume the object does
* not exist.
*/
bool
ObjectExists(const ObjectAddress *address)
{
if (is_objectclass_supported(address->classId))
{
HeapTuple objtup;
Relation catalog = heap_open(address->classId, AccessShareLock);
objtup = get_catalog_object_by_oid(catalog, address->objectId);
heap_close(catalog, AccessShareLock);
if (objtup != NULL)
{
return true;
}
return false;
}
return false;
}
/*
* MarkObjectDistributed marks an object as a distributed object by citus. Marking is done
* by adding appropriate entries to citus.pg_dist_object
*/
void
MarkObjectDistributed(const ObjectAddress *distAddress)
{
int paramCount = 3;
Oid paramTypes[3] = {
OIDOID,
OIDOID,
INT4OID
};
Datum paramValues[3] = {
ObjectIdGetDatum(distAddress->classId),
ObjectIdGetDatum(distAddress->objectId),
Int32GetDatum(distAddress->objectSubId)
};
int spiStatus = 0;
char *insertQuery = "INSERT INTO citus.pg_dist_object (classid, objid, objsubid) "
"VALUES ($1, $2, $3) ON CONFLICT DO NOTHING";
spiStatus = ExecuteCommandAsSuperuser(insertQuery, paramCount, paramTypes,
paramValues);
if (spiStatus < 0)
{
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
}
}
/*
* ExecuteCommandAsSuperuser executes a command via SPI as superuser.
*/
static int
ExecuteCommandAsSuperuser(char *query, int paramCount, Oid *paramTypes,
Datum *paramValues)
{
int spiConnected = 0;
Oid savedUserId = InvalidOid;
int savedSecurityContext = 0;
int spiStatus = 0;
int spiFinished = 0;
spiConnected = SPI_connect();
if (spiConnected != SPI_OK_CONNECT)
{
ereport(ERROR, (errmsg("could not connect to SPI manager")));
}
/* make sure we have write access */
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
spiStatus = SPI_execute_with_args(query, paramCount, paramTypes, paramValues,
NULL, false, 0);
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
spiFinished = SPI_finish();
if (spiFinished != SPI_OK_FINISH)
{
ereport(ERROR, (errmsg("could not disconnect from SPI manager")));
}
return spiStatus;
}
/*
* UnmarkObjectDistributed removes the entry from pg_dist_object that marks this object as
* distributed. This will prevent updates to that object to be propagated to the worker.
*/
void
UnmarkObjectDistributed(const ObjectAddress *address)
{
int paramCount = 3;
Oid paramTypes[3] = {
OIDOID,
OIDOID,
INT4OID
};
Datum paramValues[3] = {
ObjectIdGetDatum(address->classId),
ObjectIdGetDatum(address->objectId),
Int32GetDatum(address->objectSubId)
};
int spiStatus = 0;
char *deleteQuery = "DELETE FROM citus.pg_dist_object WHERE classid = $1 AND "
"objid = $2 AND objsubid = $3";
spiStatus = ExecuteCommandAsSuperuser(deleteQuery, paramCount, paramTypes,
paramValues);
if (spiStatus < 0)
{
ereport(ERROR, (errmsg("failed to delete object from citus.pg_dist_object")));
}
}
/*
* IsObjectDistributed returns if the object addressed is already distributed in the
* cluster. This performs a local indexed lookup in pg_dist_object.
*/
bool
IsObjectDistributed(const ObjectAddress *address)
{
Relation pgDistObjectRel = NULL;
ScanKeyData key[3];
SysScanDesc pgDistObjectScan = NULL;
HeapTuple pgDistObjectTup = NULL;
bool result = false;
pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock);
/* scan pg_dist_object for classid = $1 AND objid = $2 AND objsubid = $3 via index */
ScanKeyInit(&key[0], Anum_pg_dist_object_classid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(address->classId));
ScanKeyInit(&key[1], Anum_pg_dist_object_objid, BTEqualStrategyNumber, F_OIDEQ,
ObjectIdGetDatum(address->objectId));
ScanKeyInit(&key[2], Anum_pg_dist_object_objsubid, BTEqualStrategyNumber, F_INT4EQ,
ObjectIdGetDatum(address->objectSubId));
pgDistObjectScan = systable_beginscan(pgDistObjectRel, DistObjectPrimaryKeyIndexId(),
true, NULL, 3, key);
pgDistObjectTup = systable_getnext(pgDistObjectScan);
if (HeapTupleIsValid(pgDistObjectTup))
{
result = true;
}
systable_endscan(pgDistObjectScan);
relation_close(pgDistObjectRel, AccessShareLock);
return result;
}
/*
* GetDistributedObjectAddressList returns a list of ObjectAddresses that contains all
* distributed objects as marked in pg_dist_object
*/
List *
GetDistributedObjectAddressList(void)
{
Relation pgDistObjectRel = NULL;
SysScanDesc pgDistObjectScan = NULL;
HeapTuple pgDistObjectTup = NULL;
List *objectAddressList = NIL;
pgDistObjectRel = heap_open(DistObjectRelationId(), AccessShareLock);
pgDistObjectScan = systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0,
NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan)))
{
Form_pg_dist_object pg_dist_object =
(Form_pg_dist_object) GETSTRUCT(pgDistObjectTup);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSubSet(*objectAddress,
pg_dist_object->classid,
pg_dist_object->objid,
pg_dist_object->objsubid);
objectAddressList = lappend(objectAddressList, objectAddress);
}
systable_endscan(pgDistObjectScan);
relation_close(pgDistObjectRel, AccessShareLock);
return objectAddressList;
}

View File

@ -286,18 +286,20 @@ MetadataCreateCommands(void)
DistTableCacheEntry *cacheEntry =
(DistTableCacheEntry *) lfirst(distributedTableCell);
Oid relationId = cacheEntry->relationId;
ObjectAddress tableAddress = { 0 };
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
/*
* Ensure schema exists on each worker node. We can not run this function
* transactionally, since we may create shards over separate sessions and
* shard creation depends on the schema being present and visible from all
* sessions.
* Distributed tables might have dependencies on different objects, since we
* create shards for a distributed table via multiple sessions these objects will
* be created via their own connection and committed immediately so they become
* visible to all sessions creating shards.
*/
EnsureSchemaExistsOnAllNodes(relationId);
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistsOnAllNodes(&tableAddress);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);

View File

@ -114,6 +114,8 @@ typedef struct MetadataCacheData
Oid distNodeRelationId;
Oid distNodeNodeIdIndexId;
Oid distLocalGroupRelationId;
Oid distObjectRelationId;
Oid distObjectPrimaryKeyIndexId;
Oid distColocationRelationId;
Oid distColocationConfigurationIndexId;
Oid distColocationColocationidIndexId;
@ -128,6 +130,7 @@ typedef struct MetadataCacheData
Oid distTransactionRelationId;
Oid distTransactionGroupIndexId;
Oid distTransactionRecordIndexId;
Oid citusCatalogNamespaceId;
Oid copyFormatTypeId;
Oid readIntermediateResultFuncId;
Oid extraDataContainerFuncId;
@ -214,7 +217,10 @@ static void GetPartitionTypeInputInfo(char *partitionKeyString, char partitionMe
static ShardInterval * TupleToShardInterval(HeapTuple heapTuple,
TupleDesc tupleDescriptor, Oid intervalTypeId,
int32 intervalTypeMod);
static void CachedNamespaceLookup(const char *nspname, Oid *cachedOid);
static void CachedRelationLookup(const char *relationName, Oid *cachedOid);
static void CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace,
Oid *cachedOid);
static ShardPlacement * ResolveGroupShardPlacement(
GroupShardPlacement *groupShardPlacement, ShardCacheEntry *shardEntry);
static WorkerNode * LookupNodeForGroup(int32 groupId);
@ -1806,6 +1812,38 @@ DistLocalGroupIdRelationId(void)
}
/* return the oid of citus namespace */
Oid
CitusCatalogNamespaceId(void)
{
CachedNamespaceLookup("citus", &MetadataCache.citusCatalogNamespaceId);
return MetadataCache.citusCatalogNamespaceId;
}
/* return oid of pg_dist_shard relation */
Oid
DistObjectRelationId(void)
{
CachedRelationNamespaceLookup("pg_dist_object", CitusCatalogNamespaceId(),
&MetadataCache.distObjectRelationId);
return MetadataCache.distObjectRelationId;
}
/* return oid of pg_dist_object_pkey */
Oid
DistObjectPrimaryKeyIndexId(void)
{
CachedRelationNamespaceLookup("pg_dist_object_pkey",
CitusCatalogNamespaceId(),
&MetadataCache.distObjectPrimaryKeyIndexId);
return MetadataCache.distObjectPrimaryKeyIndexId;
}
/* return oid of pg_dist_colocation relation */
Oid
DistColocationRelationId(void)
@ -3573,18 +3611,49 @@ TupleToShardInterval(HeapTuple heapTuple, TupleDesc tupleDescriptor, Oid interva
/*
* CachedRelationLookup performs a cached lookup for the relation
* relationName, with the result cached in *cachedOid.
* CachedNamespaceLookup performs a cached lookup for the namespace (schema), with the
* result cached in cachedOid.
*/
static void
CachedRelationLookup(const char *relationName, Oid *cachedOid)
CachedNamespaceLookup(const char *nspname, Oid *cachedOid)
{
/* force callbacks to be registered, so we always get notified upon changes */
InitializeCaches();
if (*cachedOid == InvalidOid)
{
*cachedOid = get_relname_relid(relationName, PG_CATALOG_NAMESPACE);
*cachedOid = get_namespace_oid(nspname, true);
if (*cachedOid == InvalidOid)
{
ereport(ERROR, (errmsg(
"cache lookup failed for namespace %s, called too early?",
nspname)));
}
}
}
/*
* CachedRelationLookup performs a cached lookup for the relation
* relationName, with the result cached in *cachedOid.
*/
static void
CachedRelationLookup(const char *relationName, Oid *cachedOid)
{
CachedRelationNamespaceLookup(relationName, PG_CATALOG_NAMESPACE, cachedOid);
}
static void
CachedRelationNamespaceLookup(const char *relationName, Oid relnamespace, Oid *cachedOid)
{
/* force callbacks to be registered, so we always get notified upon changes */
InitializeCaches();
if (*cachedOid == InvalidOid)
{
*cachedOid = get_relname_relid(relationName, relnamespace);
if (*cachedOid == InvalidOid)
{

View File

@ -455,6 +455,7 @@ ActivateNode(char *nodeName, int nodePort)
if (WorkerNodeIsPrimary(workerNode))
{
ReplicateAllDependenciesToNode(nodeName, nodePort);
ReplicateAllReferenceTablesToNode(nodeName, nodePort);
}

View File

@ -300,16 +300,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
missingWorkerOk);
char *tableOwner = TableOwner(shardInterval->relationId);
/*
* Ensure schema exists on the worker node. We can not run this
* function transactionally, since we may create shards over separate
* sessions and shard creation depends on the schema being present and
* visible from all sessions.
*/
EnsureSchemaExistsOnNode(shardInterval->relationId, nodeName, nodePort);
/*
* Although this function is used for reference tables and reference table shard
* placements always have shardState = FILE_FINALIZED, in case of an upgrade of

View File

@ -18,6 +18,7 @@
#include "access/htup.h"
#include "access/tupdesc.h"
#include "catalog/indexing.h"
#include "catalog/objectaddress.h"
#include "distributed/citus_nodes.h"
#include "distributed/relay_utility.h"
#include "utils/acl.h"
@ -130,9 +131,9 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char *colocateWithTableName,
bool viaDeprecatedAPI);
extern void CreateTruncateTrigger(Oid relationId);
extern void EnsureSchemaExistsOnAllNodes(Oid relationId);
extern void EnsureSchemaExistsOnNode(Oid relationId, char *nodeName,
int32 nodePort);
extern void EnsureDependenciesExistsOnAllNodes(const ObjectAddress *target);
extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort);
/* Remaining metadata utility functions */
extern char * TableOwner(Oid relationId);

View File

@ -0,0 +1,23 @@
/*-------------------------------------------------------------------------
*
* dependency.c
* Functions to follow and record dependencies for objects to be
* created in the right order.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_DEPENDENCY_H
#define CITUS_DEPENDENCY_H
#include "postgres.h"
#include "catalog/objectaddress.h"
#include "nodes/pg_list.h"
extern List * GetDependenciesForObject(const ObjectAddress *target);
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
#endif /* CITUS_DEPENDENCY_H */

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* distobject.h
* Declarations for functions to work with pg_dist_object
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef CITUS_METADATA_DISTOBJECT_H
#define CITUS_METADATA_DISTOBJECT_H
#include "postgres.h"
#include "catalog/objectaddress.h"
extern bool ObjectExists(const ObjectAddress *address);
extern bool IsObjectDistributed(const ObjectAddress *address);
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address);
extern List * GetDistributedObjectAddressList(void);
#endif /* CITUS_METADATA_DISTOBJECT_H */

View File

@ -0,0 +1,57 @@
/*-------------------------------------------------------------------------
*
* pg_dist_object.h
* definition of the system distributed objects relation (pg_dist_object).
*
* This table keeps metadata on all postgres objects that are distributed
* to all the nodes in the network. Objects in this table should all be
* present on all workers and kept in sync throughout their existance.
* This also means that all nodes joining the network are assumed to
* recreate all these objects.
*
* Copyright (c) 2019, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef PG_DIST_OBJECT_H
#define PG_DIST_OBJECT_H
/* ----------------
* pg_dist_object definition.
* ----------------
*/
typedef struct FormData_pg_dist_object
{
Oid classid; /* class of the distributed object */
Oid objid; /* object id of the distributed object */
int32 objsubid; /* object sub id of the distributed object, eg. attnum */
#ifdef CATALOG_VARLEN /* variable-length fields start here */
text type;
text[] object_names;
text[] object_arguments;
#endif
} FormData_pg_dist_object;
/* ----------------
* Form_pg_dist_partitions corresponds to a pointer to a tuple with
* the format of pg_dist_partitions relation.
* ----------------
*/
typedef FormData_pg_dist_object *Form_pg_dist_object;
/* ----------------
* compiler constants for pg_dist_object
* ----------------
*/
#define Natts_pg_dist_object 6
#define Anum_pg_dist_object_classid 1
#define Anum_pg_dist_object_objid 2
#define Anum_pg_dist_object_objsubid 3
#define Anum_pg_dist_object_type 4
#define Anum_pg_dist_object_object_names 5
#define Anum_pg_dist_object_object_args 6
#endif /* PG_DIST_OBJECT_H */

View File

@ -121,6 +121,9 @@ extern void EnsureModificationsCanRun(void);
extern HTAB * GetWorkerNodeHash(void);
extern WorkerNode * LookupNodeByNodeId(uint32 nodeId);
/* namespace oids */
extern Oid CitusCatalogNamespaceId(void);
/* relation oids */
extern Oid DistColocationRelationId(void);
extern Oid DistColocationConfigurationIndexId(void);
@ -130,6 +133,7 @@ extern Oid DistShardRelationId(void);
extern Oid DistPlacementRelationId(void);
extern Oid DistNodeRelationId(void);
extern Oid DistLocalGroupIdRelationId(void);
extern Oid DistObjectRelationId(void);
/* index oids */
extern Oid DistNodeNodeIdIndexId(void);
@ -143,6 +147,7 @@ extern Oid DistTransactionRelationId(void);
extern Oid DistTransactionGroupIndexId(void);
extern Oid DistTransactionRecordIndexId(void);
extern Oid DistPlacementGroupidIndexId(void);
extern Oid DistObjectPrimaryKeyIndexId(void);
/* type oids */
extern Oid CitusCopyFormatTypeId(void);

View File

@ -78,6 +78,11 @@ check-failure-non-adaptive: all tempinstall-main
--server-option=citus.task_executor_type=real-time \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_schedule $(EXTRA_TESTS)
check-failure-non-adaptive-base: all tempinstall-main
$(pg_regress_multi_check) --load-extension=citus --mitmproxy \
--server-option=citus.task_executor_type=real-time \
-- $(MULTI_REGRESS_OPTS) --schedule=$(citus_abs_srcdir)/failure_base_schedule $(EXTRA_TESTS)
check-isolation-non-adaptive: all tempinstall-main
$(pg_regress_multi_check) --load-extension=citus --isolationtester \
--server-option=citus.task_executor_type=real-time \

View File

@ -55,9 +55,6 @@ s/.*Custom Plan Provider.*Citus.*/ \"Custom Plan Provider\": \"Citu
s/.*Custom-Plan-Provide.*/\<Custom-Plan-Provider\>Citus Unified\<\/Custom-Plan-Provider\> /g
s/ +$//g
# normalize shard ids in failure_vaccum
s/10209[0-9] \| 3/10209x \| 3/g
# normalize failed task ids
s/ERROR: failed to execute task [0-9]+/ERROR: failed to execute task X/g

View File

@ -343,6 +343,10 @@ SELECT recover_prepared_transactions();
(1 row)
DROP TABLE test_table ;
-- since we want to interrupt the schema creation again we need to drop and recreate
-- for citus to redistribute the dependency
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
-- cancel as soon as the coordinator sends COMMIT
@ -572,6 +576,8 @@ SELECT citus.mitmproxy('conn.allow()');
DROP TABLE colocated_table;
DROP TABLE test_table;
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
SET citus.multi_shard_commit_protocol TO '1pc';

View File

@ -337,6 +337,10 @@ SELECT recover_prepared_transactions();
(1 row)
DROP TABLE test_table ;
-- since we want to interrupt the schema creation again we need to drop and recreate
-- for citus to redistribute the dependency
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
-- cancel as soon as the coordinator sends COMMIT
@ -568,6 +572,8 @@ SELECT citus.mitmproxy('conn.allow()');
DROP TABLE colocated_table;
DROP TABLE test_table;
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
SET citus.multi_shard_commit_protocol TO '1pc';

View File

@ -182,6 +182,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
DROP TABLE ref_table;
DROP SCHEMA failure_reference_table;
CREATE SCHEMA failure_reference_table;
CREATE TABLE ref_table(id int);
INSERT INTO ref_table VALUES(1),(2),(3);
-- Test in transaction

View File

@ -182,6 +182,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
DROP TABLE ref_table;
DROP SCHEMA failure_reference_table;
CREATE SCHEMA failure_reference_table;
CREATE TABLE ref_table(id int);
INSERT INTO ref_table VALUES(1),(2),(3);
-- Test in transaction

View File

@ -332,7 +332,12 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(localhost,57637,t,0)
(2 rows)
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE temp_table;
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.kill()');
@ -444,7 +449,10 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(localhost,57637,t,2)
(2 rows)
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction and with 1PC
SET citus.multi_shard_commit_protocol TO "1pc";
@ -591,6 +599,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(2 rows)
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
-- Test master_create_worker_shards with 2pc
SET citus.multi_shard_commit_protocol TO "2pc";
CREATE TABLE test_table_2(id int, value_1 int);
@ -600,18 +610,18 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'hash');
(1 row)
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.kill()');
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
mitmproxy
-----------
(1 row)
SELECT master_create_worker_shards('test_table_2', 4, 2);
WARNING: connection not open
CONTEXT: while executing command on localhost:9060
ERROR: connection error: localhost:9060
DETAIL: server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
DETAIL: connection not open
SELECT count(*) FROM pg_dist_shard;
count
-------

View File

@ -327,7 +327,12 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(localhost,57637,t,0)
(2 rows)
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE temp_table;
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.kill()');
@ -434,7 +439,10 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(localhost,57637,t,0)
(2 rows)
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction and with 1PC
SET citus.multi_shard_commit_protocol TO "1pc";
@ -576,6 +584,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
(2 rows)
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
-- Test master_create_worker_shards with 2pc
SET citus.multi_shard_commit_protocol TO "2pc";
CREATE TABLE test_table_2(id int, value_1 int);
@ -585,8 +595,8 @@ SELECT master_create_distributed_table('test_table_2', 'id', 'hash');
(1 row)
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.kill()');
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
mitmproxy
-----------

View File

@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
t
(1 row)
SET citus.next_shard_id TO 12000000;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
@ -69,9 +70,9 @@ CONTEXT: while executing command on localhost:9060
-- show that we marked as INVALID on COMMIT FAILURE
SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND
shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass);
shardid | shardstate
---------+------------
102093 | 3
shardid | shardstate
----------+------------
12000000 | 3
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 1

View File

@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
f
(1 row)
SET citus.next_shard_id TO 12000000;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
@ -64,9 +65,9 @@ ANALYZE vacuum_test;
-- show that we marked as INVALID on COMMIT FAILURE
SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND
shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass);
shardid | shardstate
---------+------------
102093 | 3
shardid | shardstate
----------+------------
12000000 | 3
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 1

View File

@ -9,6 +9,7 @@ SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
t
(1 row)
SET citus.next_shard_id TO 12000000;
SELECT citus.mitmproxy('conn.allow()');
mitmproxy
-----------
@ -64,9 +65,9 @@ ANALYZE vacuum_test;
-- show that we marked as INVALID on COMMIT FAILURE
SELECT shardid, shardstate FROM pg_dist_shard_placement where shardstate != 1 AND
shardid in ( SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'vacuum_test'::regclass);
shardid | shardstate
---------+------------
102093 | 3
shardid | shardstate
----------+------------
12000000 | 3
(1 row)
UPDATE pg_dist_shard_placement SET shardstate = 1

View File

@ -1340,10 +1340,6 @@ CREATE TABLE reference_table(id int PRIMARY KEY);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
DEBUG: building index "reference_table_pkey" on table "reference_table" serially
SELECT create_reference_table('reference_table');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
create_reference_table
------------------------
@ -1353,10 +1349,6 @@ CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
DEBUG: building index "distributed_table_pkey" on table "distributed_table" serially
SELECT create_distributed_table('distributed_table', 'id');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
create_distributed_table
--------------------------

View File

@ -1340,10 +1340,6 @@ CREATE TABLE reference_table(id int PRIMARY KEY);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "reference_table_pkey" for table "reference_table"
DEBUG: building index "reference_table_pkey" on table "reference_table"
SELECT create_reference_table('reference_table');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
create_reference_table
------------------------
@ -1353,10 +1349,6 @@ CREATE TABLE distributed_table(id int PRIMARY KEY, value_1 int);
DEBUG: CREATE TABLE / PRIMARY KEY will create implicit index "distributed_table_pkey" for table "distributed_table"
DEBUG: building index "distributed_table_pkey" on table "distributed_table"
SELECT create_distributed_table('distributed_table', 'id');
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "test_fkey_to_ref_in_tx" already exists, skipping
DETAIL: NOTICE from localhost:57637
create_distributed_table
--------------------------

View File

@ -41,16 +41,16 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
SELECT worker_apply_shard_ddl_command (102081, 'public', '
SELECT worker_apply_shard_ddl_command (102141, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102080, 'public', '
SELECT worker_apply_shard_ddl_command (102140, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102079, 'public', '
SELECT worker_apply_shard_ddl_command (102139, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102078, 'public', '
SELECT worker_apply_shard_ddl_command (102138, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
@ -104,7 +104,7 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
INSERT INTO public.test_table_102084 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
INSERT INTO public.test_table_102144 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
ROLLBACK;
@ -118,12 +118,12 @@ step s3-rollback:
starting permutation: s1-cache-connections s1-begin s2-begin s3-begin s1-select s2-sleep s2-view-dist s3-view-worker s2-rollback s1-commit s3-rollback
create_distributed_table
step s1-cache-connections:
SET citus.max_cached_conns_per_worker TO 4;
SET citus.force_max_query_parallelization TO on;
UPDATE test_table SET column2 = 0;
step s1-begin:
BEGIN;
@ -159,10 +159,10 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
COPY (SELECT count(*) AS count FROM test_table_102089 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102088 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102087 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102086 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102149 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102148 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102147 test_table WHERE true) TO STDOUTlocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
COPY (SELECT count(*) AS count FROM test_table_102146 test_table WHERE true) TO STDOUTlocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
ROLLBACK;
@ -217,7 +217,7 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
SELECT count(*) AS count FROM public.test_table_102091 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression
SELECT count(*) AS count FROM public.test_table_102151 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 0 idle Client ClientRead postgres regression
step s2-rollback:
ROLLBACK;

View File

@ -41,16 +41,16 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
SELECT worker_apply_shard_ddl_command (102081, 'public', '
SELECT worker_apply_shard_ddl_command (102141, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102080, 'public', '
SELECT worker_apply_shard_ddl_command (102140, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102079, 'public', '
SELECT worker_apply_shard_ddl_command (102139, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT worker_apply_shard_ddl_command (102078, 'public', '
SELECT worker_apply_shard_ddl_command (102138, 'public', '
ALTER TABLE test_table ADD COLUMN x INT;
')localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
@ -104,7 +104,7 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
INSERT INTO public.test_table_102084 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
INSERT INTO public.test_table_102144 (column1, column2) VALUES (100, 100)localhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
ROLLBACK;
@ -159,10 +159,10 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
SELECT count(*) AS count FROM test_table_102089 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102088 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102087 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102086 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102149 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102148 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102147 test_table WHERE truelocalhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM test_table_102146 test_table WHERE truelocalhost 57637 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
ROLLBACK;
@ -217,7 +217,7 @@ step s3-view-worker:
query query_hostname query_hostport master_query_host_namemaster_query_host_portstate wait_event_typewait_event usename datname
SELECT count(*) AS count FROM public.test_table_102091 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
SELECT count(*) AS count FROM public.test_table_102151 test_table WHERE (column1 OPERATOR(pg_catalog.=) 55)localhost 57638 coordinator_host57636 idle in transactionClient ClientRead postgres regression
step s2-rollback:
ROLLBACK;

View File

@ -77,7 +77,7 @@ step s1-get-current-transaction-id:
row
(0,174)
(0,229)
step s2-get-first-worker-active-transactions:
SELECT * FROM run_command_on_workers('SELECT row(initiator_node_identifier, transaction_number)
FROM
@ -88,7 +88,7 @@ step s2-get-first-worker-active-transactions:
nodename nodeport success result
localhost 57637 t (0,174)
localhost 57637 t (0,229)
step s1-commit:
COMMIT;

View File

@ -29,11 +29,11 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
177 176 f
232 231 f
transactionnumberwaitingtransactionnumbers
176
177 176
231
232 231
step s1-abort:
ABORT;
@ -77,14 +77,14 @@ step detector-dump-wait-edges:
waiting_transaction_numblocking_transaction_numblocking_transaction_waiting
181 180 f
182 180 f
182 181 t
236 235 f
237 235 f
237 236 t
transactionnumberwaitingtransactionnumbers
180
181 180
182 180,181
235
236 235
237 235,236
step s1-abort:
ABORT;

View File

@ -0,0 +1,933 @@
Parsed test spec with 4 sessions
starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-public-schema s2-create-table s1-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-public-schema:
SET search_path TO public;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s1-add-worker s2-public-schema s2-create-table s1-commit s2-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-public-schema:
SET search_path TO public;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-public-schema s2-create-table s1-add-worker s2-commit s1-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-public-schema:
SET search_path TO public;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
create_distributed_table
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s1-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s1-add-worker s2-create-schema s2-create-table s1-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s1-add-worker s2-create-schema s2-create-table s1-commit s2-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s2-create-schema s2-create-table s1-add-worker s2-commit s1-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
create_distributed_table
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
<waiting ...>
step s2-commit:
COMMIT;
step s1-add-worker: <... completed>
nodename nodeport isactive
localhost 57638 t
step s1-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s2-create-schema s1-begin s2-begin s3-begin s1-add-worker s2-create-table s3-use-schema s3-create-table s1-commit s2-commit s3-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s3-use-schema:
SET search_path TO myschema;
step s3-create-table:
CREATE TABLE t2 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t2', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-commit:
COMMIT;
step s3-create-table: <... completed>
create_distributed_table
step s3-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s2-create-schema s1-begin s2-begin s3-begin s4-begin s1-add-worker s2-create-table s3-use-schema s3-create-table s4-use-schema s4-create-table s1-commit s2-commit s3-commit s4-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s4-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s3-use-schema:
SET search_path TO myschema;
step s3-create-table:
CREATE TABLE t2 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t2', 'a');
<waiting ...>
step s4-use-schema:
SET search_path TO myschema;
step s4-create-table:
CREATE TABLE t3 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t3', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s2-commit:
COMMIT;
step s3-create-table: <... completed>
create_distributed_table
step s4-create-table: <... completed>
create_distributed_table
step s3-commit:
COMMIT;
step s4-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s1-add-worker s2-create-schema s2-begin s3-begin s3-use-schema s2-create-table s3-create-table s2-commit s3-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s2-begin:
BEGIN;
step s3-begin:
BEGIN;
step s3-use-schema:
SET search_path TO myschema;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
create_distributed_table
step s3-create-table:
CREATE TABLE t2 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t2', 'a');
<waiting ...>
step s2-commit:
COMMIT;
step s3-create-table: <... completed>
create_distributed_table
step s3-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node
starting permutation: s1-print-distributed-objects s1-begin s2-begin s4-begin s1-add-worker s2-create-schema s4-create-schema2 s2-create-table s4-create-table s1-commit s2-commit s4-commit s2-print-distributed-objects
nodename nodeport isactive
localhost 57637 t
step s1-print-distributed-objects:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
pg_identify_object_as_address
count
0
run_command_on_workers
(localhost,57637,t,0)
(localhost,57638,t,0)
master_remove_node
step s1-begin:
BEGIN;
step s2-begin:
BEGIN;
step s4-begin:
BEGIN;
step s1-add-worker:
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
nodename nodeport isactive
localhost 57638 t
step s2-create-schema:
CREATE SCHEMA myschema;
SET search_path TO myschema;
step s4-create-schema2:
CREATE SCHEMA myschema2;
SET search_path TO myschema2;
step s2-create-table:
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
<waiting ...>
step s4-create-table:
CREATE TABLE t3 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t3', 'a');
<waiting ...>
step s1-commit:
COMMIT;
step s2-create-table: <... completed>
create_distributed_table
step s4-create-table: <... completed>
create_distributed_table
step s2-commit:
COMMIT;
step s4-commit:
COMMIT;
step s2-print-distributed-objects:
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
pg_identify_object_as_address
(schema,{myschema},{})
(schema,{myschema2},{})
count
1
run_command_on_workers
(localhost,57637,t,1)
(localhost,57638,t,1)
master_remove_node

View File

@ -16,7 +16,7 @@ step s1-commit:
COMMIT;
step s2-insert: <... completed>
error in steps s1-commit s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102353"
error in steps s1-commit s2-insert: ERROR: duplicate key value violates unique constraint "test_locking_a_key_102413"
step s2-commit:
COMMIT;

View File

@ -583,11 +583,11 @@ SELECT master_remove_node('localhost', 9999);
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole)
VALUES ('localhost', 5000, :worker_1_group, 'primary');
ERROR: there cannot be two primary nodes in a group
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 10 at RAISE
CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 10 at RAISE
UPDATE pg_dist_node SET noderole = 'primary'
WHERE groupid = :worker_1_group AND nodeport = 9998;
ERROR: there cannot be two primary nodes in a group
CONTEXT: PL/pgSQL function citus.pg_dist_node_trigger_func() line 18 at RAISE
CONTEXT: PL/pgSQL function citus_internal.pg_dist_node_trigger_func() line 18 at RAISE
-- check that you can't manually add a primary to a non-default cluster
INSERT INTO pg_dist_node (nodename, nodeport, groupid, noderole, nodecluster)
VALUES ('localhost', 5000, 1000, 'primary', 'olap');

View File

@ -45,7 +45,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test');
count
-------
0
@ -123,7 +123,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test');
count
-------
0

View File

@ -245,10 +245,6 @@ DEBUG: Plan is router executable
CREATE TABLE company_employees (company_id int, employee_id int, manager_id int);
SELECT master_create_distributed_table('company_employees', 'company_id', 'hash');
DEBUG: schema "fast_path_router_select" already exists, skipping
DETAIL: NOTICE from localhost:57638
DEBUG: schema "fast_path_router_select" already exists, skipping
DETAIL: NOTICE from localhost:57637
master_create_distributed_table
---------------------------------

View File

@ -159,8 +159,8 @@ SELECT count(*) FROM mx_table;
5
(1 row)
-- master_add_node
SELECT 1 FROM master_add_node('localhost', 5432);
-- master_add_inactive_node
SELECT 1 FROM master_add_inactive_node('localhost', 5432);
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
@ -172,7 +172,7 @@ SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
-- master_remove_node
\c - - - :master_port
DROP INDEX mx_test_uniq_index;
SELECT 1 FROM master_add_node('localhost', 5432);
SELECT 1 FROM master_add_inactive_node('localhost', 5432);
?column?
----------
1

View File

@ -4,6 +4,7 @@ test: isolation_update_node_lock_writes
test: isolation_add_node_vs_reference_table_operations
test: isolation_create_table_vs_add_remove_node
test: isolation_master_update_node
test: isolation_ensure_dependency_activate_node
# tests that change node metadata should precede
# isolation_cluster_management such that tests

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of DDL in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE ddl_hash(id integer, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS ddl_hash CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of DELETE in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE delete_hash(id integer, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS delete_hash CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -1,7 +1,7 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE deadlock_detection_reference (user_id int UNIQUE, some_val int);
SELECT create_reference_table('deadlock_detection_reference');
@ -26,7 +26,7 @@ teardown
DROP TABLE local_deadlock_table;
DROP TABLE deadlock_detection_test_rep_2;
DROP TABLE deadlock_detection_reference;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
SET citus.shard_replication_factor = 1;
}

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of DROP in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE drop_hash(id integer, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS drop_hash CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -0,0 +1,180 @@
# the test expects to have zero nodes in pg_dist_node at the beginning
# add single one of the nodes for the purpose of the test
setup
{
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57637);
}
# ensure that both nodes exists for the remaining of the isolation tests
teardown
{
-- schema drops are not cascaded
SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema CASCADE;$$);
DROP SCHEMA IF EXISTS myschema CASCADE;
SELECT run_command_on_workers($$DROP SCHEMA IF EXISTS myschema2 CASCADE;$$);
DROP SCHEMA IF EXISTS myschema2 CASCADE;
RESET search_path;
DROP TABLE IF EXISTS t1 CASCADE;
DROP TABLE IF EXISTS t2 CASCADE;
DROP TABLE IF EXISTS t3 CASCADE;
SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node;
}
session "s1"
step "s1-begin"
{
BEGIN;
}
step "s1-add-worker"
{
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
}
step "s1-commit"
{
COMMIT;
}
# printing in session 1 adds the worker node, this makes we are sure we count the objects
# on that node as well. After counting objects is done we remove the node again.
step "s1-print-distributed-objects"
{
SELECT nodename, nodeport, isactive FROM master_add_node('localhost', 57638);
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
SELECT master_remove_node('localhost', 57638);
}
session "s2"
step "s2-public-schema"
{
SET search_path TO public;
}
step "s2-create-schema"
{
CREATE SCHEMA myschema;
SET search_path TO myschema;
}
step "s2-create-table"
{
CREATE TABLE t1 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t1', 'a');
}
step "s2-begin"
{
BEGIN;
}
step "s2-commit"
{
COMMIT;
}
# prints from session 2 are run at the end when the worker has already been added by the
# test
step "s2-print-distributed-objects"
{
-- print an overview of all distributed objects
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM citus.pg_dist_object;
-- print if the schema has been created
SELECT count(*) FROM pg_namespace where nspname = 'myschema';
SELECT run_command_on_workers($$SELECT count(*) FROM pg_namespace where nspname = 'myschema';$$);
}
session "s3"
step "s3-public-schema"
{
SET search_path TO public;
}
step "s3-use-schema"
{
SET search_path TO myschema;
}
step "s3-create-table"
{
CREATE TABLE t2 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t2', 'a');
}
step "s3-begin"
{
BEGIN;
}
step "s3-commit"
{
COMMIT;
}
session "s4"
step "s4-public-schema"
{
SET search_path TO public;
}
step "s4-use-schema"
{
SET search_path TO myschema;
}
step "s4-create-schema2"
{
CREATE SCHEMA myschema2;
SET search_path TO myschema2;
}
step "s4-create-table"
{
CREATE TABLE t3 (a int, b int);
-- session needs to have replication factor set to 1, can't do in setup
SET citus.shard_replication_factor TO 1;
SELECT create_distributed_table('t3', 'a');
}
step "s4-begin"
{
BEGIN;
}
step "s4-commit"
{
COMMIT;
}
# schema only tests
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-public-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-public-schema" "s2-create-table" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s1-add-worker" "s2-create-schema" "s2-create-table" "s1-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s1-add-worker" "s2-create-schema" "s2-create-table" "s1-commit" "s2-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s2-create-schema" "s2-create-table" "s1-add-worker" "s2-commit" "s1-commit" "s2-print-distributed-objects"
# concurrency tests with multi schema distribution
permutation "s1-print-distributed-objects" "s2-create-schema" "s1-begin" "s2-begin" "s3-begin" "s1-add-worker" "s2-create-table" "s3-use-schema" "s3-create-table" "s1-commit" "s2-commit" "s3-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s2-create-schema" "s1-begin" "s2-begin" "s3-begin" "s4-begin" "s1-add-worker" "s2-create-table" "s3-use-schema" "s3-create-table" "s4-use-schema" "s4-create-table" "s1-commit" "s2-commit" "s3-commit" "s4-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-add-worker" "s2-create-schema" "s2-begin" "s3-begin" "s3-use-schema" "s2-create-table" "s3-create-table" "s2-commit" "s3-commit" "s2-print-distributed-objects"
permutation "s1-print-distributed-objects" "s1-begin" "s2-begin" "s4-begin" "s1-add-worker" "s2-create-schema" "s4-create-schema2" "s2-create-table" "s4-create-table" "s1-commit" "s2-commit" "s4-commit" "s2-print-distributed-objects"

View File

@ -17,8 +17,8 @@ setup
LANGUAGE C STRICT VOLATILE
AS 'citus', $$stop_session_level_connection_to_node$$;
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
-- start_metadata_sync_to_node can not be run inside a transaction block
-- following is a workaround to overcome that
@ -48,7 +48,7 @@ teardown
{
DROP TABLE ref_table;
DROP TABLE tt1;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -1,7 +1,7 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor to 1;
SET citus.shard_count to 32;
@ -34,7 +34,7 @@ teardown
{
DROP TABLE users_test_table;
DROP TABLE events_test_table;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
SET citus.shard_count to 4;
}

View File

@ -1,7 +1,7 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
-- start_metadata_sync_to_node can not be run inside a transaction block.
-- Following is a workaround to overcome that. Port numbers are hard coded
@ -32,7 +32,7 @@ setup
teardown
{
DROP TABLE ref_table_1, ref_table_2, ref_table_3;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -17,8 +17,8 @@ setup
LANGUAGE C STRICT VOLATILE
AS 'citus', $$stop_session_level_connection_to_node$$;
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
-- start_metadata_sync_to_node can not be run inside a transaction block
-- following is a workaround to overcome that
@ -42,7 +42,7 @@ setup
teardown
{
DROP TABLE ref_table;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -5,8 +5,8 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
CREATE TABLE test_locking (a int unique);
SELECT create_distributed_table('test_locking', 'a');
@ -15,7 +15,7 @@ setup
teardown
{
DROP TABLE test_locking;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -1,8 +1,8 @@
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor to 1;
@ -27,7 +27,7 @@ teardown
DROP TABLE test_table_2_rf1;
DROP TABLE ref_table;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
session "s1"

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of TRUNCATE in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE truncate_append(id integer, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS truncate_append CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of UPDATE in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE update_hash(id integer, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS update_hash CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -5,8 +5,8 @@
# create range distributed table to test behavior of UPSERT in concurrent operations
setup
{
SELECT citus.replace_isolation_tester_func();
SELECT citus.refresh_isolation_tester_prepared_statement();
SELECT citus_internal.replace_isolation_tester_func();
SELECT citus_internal.refresh_isolation_tester_prepared_statement();
SET citus.shard_replication_factor TO 1;
CREATE TABLE upsert_hash(id integer PRIMARY KEY, data text);
@ -18,7 +18,7 @@ teardown
{
DROP TABLE IF EXISTS upsert_hash CASCADE;
SELECT citus.restore_isolation_tester_func();
SELECT citus_internal.restore_isolation_tester_func();
}
# session 1

View File

@ -117,6 +117,10 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT recover_prepared_transactions();
DROP TABLE test_table ;
-- since we want to interrupt the schema creation again we need to drop and recreate
-- for citus to redistribute the dependency
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
@ -202,6 +206,8 @@ SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables W
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE colocated_table;
DROP TABLE test_table;
DROP SCHEMA create_distributed_table_non_empty_failure;
CREATE SCHEMA create_distributed_table_non_empty_failure;
CREATE TABLE test_table(id int, value_1 int);
INSERT INTO test_table VALUES (1,1),(2,2),(3,3),(4,4);
SET citus.multi_shard_commit_protocol TO '1pc';

View File

@ -75,6 +75,8 @@ SET client_min_messages TO NOTICE;
SELECT citus.mitmproxy('conn.allow()');
DROP TABLE ref_table;
DROP SCHEMA failure_reference_table;
CREATE SCHEMA failure_reference_table;
CREATE TABLE ref_table(id int);
INSERT INTO ref_table VALUES(1),(2),(3);

View File

@ -104,7 +104,12 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT count(*) FROM pg_dist_shard;
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$);
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE temp_table;
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction
-- Kill connection before sending query to the worker
@ -143,7 +148,10 @@ SELECT citus.mitmproxy('conn.allow()');
SELECT count(*) FROM pg_dist_shard;
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$);
-- drop tables and schema and recreate to start from a non-distributed schema again
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
CREATE TABLE test_table(id int, value_1 int);
-- Test inside transaction and with 1PC
@ -196,14 +204,16 @@ SELECT count(*) FROM pg_dist_shard;
SELECT run_command_on_workers($$SELECT count(*) FROM information_schema.tables WHERE table_schema = 'failure_create_table' and table_name LIKE 'test_table%' ORDER BY 1$$);
DROP TABLE test_table;
DROP SCHEMA failure_create_table;
CREATE SCHEMA failure_create_table;
-- Test master_create_worker_shards with 2pc
SET citus.multi_shard_commit_protocol TO "2pc";
CREATE TABLE test_table_2(id int, value_1 int);
SELECT master_create_distributed_table('test_table_2', 'id', 'hash');
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.kill()');
-- Kill connection before sending query to the worker
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
SELECT master_create_worker_shards('test_table_2', 4, 2);
SELECT count(*) FROM pg_dist_shard;

View File

@ -6,6 +6,8 @@
SHOW server_version \gset
SELECT substring(:'server_version', '\d+')::int > 10 AS version_above_ten;
SET citus.next_shard_id TO 12000000;
SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_count = 1;

View File

@ -46,7 +46,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test');
-- DROP EXTENSION pre-created by the regression suite
@ -120,7 +120,7 @@ FROM pg_depend AS pgd,
WHERE pgd.refclassid = 'pg_extension'::regclass AND
pgd.refobjid = pge.oid AND
pge.extname = 'citus' AND
pgio.schema NOT IN ('pg_catalog', 'citus', 'test');
pgio.schema NOT IN ('pg_catalog', 'citus', 'citus_internal', 'test');
-- see incompatible version errors out
RESET citus.enable_version_checks;

View File

@ -100,15 +100,15 @@ SELECT count(*) FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement WHERE lo
SELECT master_apply_delete_command('DELETE FROM mx_table');
SELECT count(*) FROM mx_table;
-- master_add_node
-- master_add_inactive_node
SELECT 1 FROM master_add_node('localhost', 5432);
SELECT 1 FROM master_add_inactive_node('localhost', 5432);
SELECT count(1) FROM pg_dist_node WHERE nodename='localhost' AND nodeport=5432;
-- master_remove_node
\c - - - :master_port
DROP INDEX mx_test_uniq_index;
SELECT 1 FROM master_add_node('localhost', 5432);
SELECT 1 FROM master_add_inactive_node('localhost', 5432);
\c - - - :worker_1_port
SELECT master_remove_node('localhost', 5432);