mirror of https://github.com/citusdata/citus.git
Adds citus_pause_node udf (#7089)
DESCRIPTION: Presenting citus_pause_node UDF enabling pausing by node_id. citus_pause_node takes a node_id parameter and fetches all the shards in that node and puts AccessExclusiveLock on all the shards inside that node. With this lock, insert is disabled, until citus_pause_node transaction is closed. --------- Co-authored-by: Hanefi Onaldi <Hanefi.Onaldi@microsoft.com>pg16_grant_inherit_set
parent
f4075603a9
commit
713d8ebb30
|
@ -9,7 +9,6 @@
|
||||||
#include "funcapi.h"
|
#include "funcapi.h"
|
||||||
#include "utils/plancache.h"
|
#include "utils/plancache.h"
|
||||||
|
|
||||||
|
|
||||||
#include "access/genam.h"
|
#include "access/genam.h"
|
||||||
#include "access/heapam.h"
|
#include "access/heapam.h"
|
||||||
#include "access/htup.h"
|
#include "access/htup.h"
|
||||||
|
@ -102,8 +101,8 @@ static HeapTuple GetNodeByNodeId(int32 nodeId);
|
||||||
static int32 GetNextGroupId(void);
|
static int32 GetNextGroupId(void);
|
||||||
static int GetNextNodeId(void);
|
static int GetNextNodeId(void);
|
||||||
static void InsertPlaceholderCoordinatorRecord(void);
|
static void InsertPlaceholderCoordinatorRecord(void);
|
||||||
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
|
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport,
|
||||||
*nodeMetadata);
|
NodeMetadata *nodeMetadata);
|
||||||
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
static void DeleteNodeRow(char *nodename, int32 nodeport);
|
||||||
static void BlockDistributedQueriesOnMetadataNodes(void);
|
static void BlockDistributedQueriesOnMetadataNodes(void);
|
||||||
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
|
||||||
|
@ -134,6 +133,13 @@ static void MarkNodesNotSyncedInLoopBackConnection(MetadataSyncContext *context,
|
||||||
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
|
static void EnsureParentSessionHasExclusiveLockOnPgDistNode(pid_t parentSessionPid);
|
||||||
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
|
static void SetNodeMetadata(MetadataSyncContext *context, bool localOnly);
|
||||||
static void EnsureTransactionalMetadataSyncMode(void);
|
static void EnsureTransactionalMetadataSyncMode(void);
|
||||||
|
static void LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE
|
||||||
|
lockMode);
|
||||||
|
static BackgroundWorkerHandle * CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown);
|
||||||
|
static BackgroundWorkerHandle * LockPlacementsWithBackgroundWorkersInPrimaryNode(
|
||||||
|
WorkerNode *workerNode, bool force, int32 lock_cooldown);
|
||||||
|
|
||||||
|
/* Function definitions go here */
|
||||||
|
|
||||||
/* declarations for dynamic loading */
|
/* declarations for dynamic loading */
|
||||||
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
|
PG_FUNCTION_INFO_V1(citus_set_coordinator_host);
|
||||||
|
@ -152,6 +158,7 @@ PG_FUNCTION_INFO_V1(master_disable_node);
|
||||||
PG_FUNCTION_INFO_V1(citus_activate_node);
|
PG_FUNCTION_INFO_V1(citus_activate_node);
|
||||||
PG_FUNCTION_INFO_V1(master_activate_node);
|
PG_FUNCTION_INFO_V1(master_activate_node);
|
||||||
PG_FUNCTION_INFO_V1(citus_update_node);
|
PG_FUNCTION_INFO_V1(citus_update_node);
|
||||||
|
PG_FUNCTION_INFO_V1(citus_pause_node_within_txn);
|
||||||
PG_FUNCTION_INFO_V1(master_update_node);
|
PG_FUNCTION_INFO_V1(master_update_node);
|
||||||
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
PG_FUNCTION_INFO_V1(get_shard_id_for_distribution_column);
|
||||||
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
|
PG_FUNCTION_INFO_V1(citus_nodename_for_nodeid);
|
||||||
|
@ -160,7 +167,6 @@ PG_FUNCTION_INFO_V1(citus_coordinator_nodeid);
|
||||||
PG_FUNCTION_INFO_V1(citus_is_coordinator);
|
PG_FUNCTION_INFO_V1(citus_is_coordinator);
|
||||||
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
|
PG_FUNCTION_INFO_V1(citus_internal_mark_node_not_synced);
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
|
* DefaultNodeMetadata creates a NodeMetadata struct with the fields set to
|
||||||
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
|
* sane defaults, e.g. nodeRack = WORKER_DEFAULT_RACK.
|
||||||
|
@ -544,7 +550,8 @@ citus_disable_node(PG_FUNCTION_ARGS)
|
||||||
"metadata is not allowed"),
|
"metadata is not allowed"),
|
||||||
errhint("You can force disabling node, SELECT "
|
errhint("You can force disabling node, SELECT "
|
||||||
"citus_disable_node('%s', %d, "
|
"citus_disable_node('%s', %d, "
|
||||||
"synchronous:=true);", workerNode->workerName,
|
"synchronous:=true);",
|
||||||
|
workerNode->workerName,
|
||||||
nodePort),
|
nodePort),
|
||||||
errdetail("Citus uses the first worker node in the "
|
errdetail("Citus uses the first worker node in the "
|
||||||
"metadata for certain internal operations when "
|
"metadata for certain internal operations when "
|
||||||
|
@ -693,8 +700,7 @@ citus_set_node_property(PG_FUNCTION_ARGS)
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg(
|
ereport(ERROR, (errmsg(
|
||||||
"only the 'shouldhaveshards' property can be set using this function"
|
"only the 'shouldhaveshards' property can be set using this function")));
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TransactionModifiedNodeMetadata = true;
|
TransactionModifiedNodeMetadata = true;
|
||||||
|
@ -1160,6 +1166,100 @@ ActivateNodeList(MetadataSyncContext *context)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Acquires shard metadata locks on all shards residing in the given worker node
|
||||||
|
*
|
||||||
|
* TODO: This function is not compatible with query from any node feature.
|
||||||
|
* To ensure proper behavior, it is essential to acquire locks on placements across all nodes
|
||||||
|
* rather than limiting it to just the coordinator (or the specific node from which this function is called)
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
LockShardsInWorkerPlacementList(WorkerNode *workerNode, LOCKMODE lockMode)
|
||||||
|
{
|
||||||
|
List *placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
|
||||||
|
LockShardsInPlacementListMetadata(placementList, lockMode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function is used to start a background worker to kill backends holding conflicting
|
||||||
|
* locks with this backend. It returns NULL if the background worker could not be started.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *
|
||||||
|
CheckBackgroundWorkerToObtainLocks(int32 lock_cooldown)
|
||||||
|
{
|
||||||
|
BackgroundWorkerHandle *handle = StartLockAcquireHelperBackgroundWorker(MyProcPid,
|
||||||
|
lock_cooldown);
|
||||||
|
if (!handle)
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* We failed to start a background worker, which probably means that we exceeded
|
||||||
|
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
|
||||||
|
* to repeatedly throw an error because if citus_update_node is called to complete a
|
||||||
|
* failover then finishing is the only way to bring the cluster back up. Therefore we
|
||||||
|
* give up on killing other backends and simply wait for the lock. We do set
|
||||||
|
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
|
||||||
|
*/
|
||||||
|
SetLockTimeoutLocally(lock_cooldown);
|
||||||
|
ereport(WARNING, (errmsg(
|
||||||
|
"could not start background worker to kill backends with conflicting"
|
||||||
|
" locks to force the update. Degrading to acquiring locks "
|
||||||
|
"with a lock time out."),
|
||||||
|
errhint(
|
||||||
|
"Increasing max_worker_processes might help.")));
|
||||||
|
}
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function is used to lock shards in a primary node.
|
||||||
|
* If force is true, we start a background worker to kill backends holding
|
||||||
|
* conflicting locks with this backend.
|
||||||
|
*
|
||||||
|
* If the node is a primary node we block reads and writes.
|
||||||
|
*
|
||||||
|
* This lock has two purposes:
|
||||||
|
*
|
||||||
|
* - Ensure buggy code in Citus doesn't cause failures when the
|
||||||
|
* nodename/nodeport of a node changes mid-query
|
||||||
|
*
|
||||||
|
* - Provide fencing during failover, after this function returns all
|
||||||
|
* connections will use the new node location.
|
||||||
|
*
|
||||||
|
* Drawback:
|
||||||
|
*
|
||||||
|
* - This function blocks until all previous queries have finished. This
|
||||||
|
* means that long-running queries will prevent failover.
|
||||||
|
*
|
||||||
|
* In case of node failure said long-running queries will fail in the end
|
||||||
|
* anyway as they will be unable to commit successfully on the failed
|
||||||
|
* machine. To cause quick failure of these queries use force => true
|
||||||
|
* during the invocation of citus_update_node to terminate conflicting
|
||||||
|
* backends proactively.
|
||||||
|
*
|
||||||
|
* It might be worth blocking reads to a secondary for the same reasons,
|
||||||
|
* though we currently only query secondaries on follower clusters
|
||||||
|
* where these locks will have no effect.
|
||||||
|
*/
|
||||||
|
BackgroundWorkerHandle *
|
||||||
|
LockPlacementsWithBackgroundWorkersInPrimaryNode(WorkerNode *workerNode, bool force, int32
|
||||||
|
lock_cooldown)
|
||||||
|
{
|
||||||
|
BackgroundWorkerHandle *handle = NULL;
|
||||||
|
|
||||||
|
if (NodeIsPrimary(workerNode))
|
||||||
|
{
|
||||||
|
if (force)
|
||||||
|
{
|
||||||
|
handle = CheckBackgroundWorkerToObtainLocks(lock_cooldown);
|
||||||
|
}
|
||||||
|
LockShardsInWorkerPlacementList(workerNode, AccessExclusiveLock);
|
||||||
|
}
|
||||||
|
return handle;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* citus_update_node moves the requested node to a different nodename and nodeport. It
|
* citus_update_node moves the requested node to a different nodename and nodeport. It
|
||||||
* locks to ensure no queries are running concurrently; and is intended for customers who
|
* locks to ensure no queries are running concurrently; and is intended for customers who
|
||||||
|
@ -1188,8 +1288,6 @@ citus_update_node(PG_FUNCTION_ARGS)
|
||||||
int32 lock_cooldown = PG_GETARG_INT32(4);
|
int32 lock_cooldown = PG_GETARG_INT32(4);
|
||||||
|
|
||||||
char *newNodeNameString = text_to_cstring(newNodeName);
|
char *newNodeNameString = text_to_cstring(newNodeName);
|
||||||
List *placementList = NIL;
|
|
||||||
BackgroundWorkerHandle *handle = NULL;
|
|
||||||
|
|
||||||
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
|
WorkerNode *workerNodeWithSameAddress = FindWorkerNodeAnyCluster(newNodeNameString,
|
||||||
newNodePort);
|
newNodePort);
|
||||||
|
@ -1226,64 +1324,9 @@ citus_update_node(PG_FUNCTION_ARGS)
|
||||||
EnsureTransactionalMetadataSyncMode();
|
EnsureTransactionalMetadataSyncMode();
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
BackgroundWorkerHandle *handle = LockPlacementsWithBackgroundWorkersInPrimaryNode(
|
||||||
* If the node is a primary node we block reads and writes.
|
workerNode, force,
|
||||||
*
|
lock_cooldown);
|
||||||
* This lock has two purposes:
|
|
||||||
*
|
|
||||||
* - Ensure buggy code in Citus doesn't cause failures when the
|
|
||||||
* nodename/nodeport of a node changes mid-query
|
|
||||||
*
|
|
||||||
* - Provide fencing during failover, after this function returns all
|
|
||||||
* connections will use the new node location.
|
|
||||||
*
|
|
||||||
* Drawback:
|
|
||||||
*
|
|
||||||
* - This function blocks until all previous queries have finished. This
|
|
||||||
* means that long-running queries will prevent failover.
|
|
||||||
*
|
|
||||||
* In case of node failure said long-running queries will fail in the end
|
|
||||||
* anyway as they will be unable to commit successfully on the failed
|
|
||||||
* machine. To cause quick failure of these queries use force => true
|
|
||||||
* during the invocation of citus_update_node to terminate conflicting
|
|
||||||
* backends proactively.
|
|
||||||
*
|
|
||||||
* It might be worth blocking reads to a secondary for the same reasons,
|
|
||||||
* though we currently only query secondaries on follower clusters
|
|
||||||
* where these locks will have no effect.
|
|
||||||
*/
|
|
||||||
if (NodeIsPrimary(workerNode))
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* before acquiring the locks check if we want a background worker to help us to
|
|
||||||
* aggressively obtain the locks.
|
|
||||||
*/
|
|
||||||
if (force)
|
|
||||||
{
|
|
||||||
handle = StartLockAcquireHelperBackgroundWorker(MyProcPid, lock_cooldown);
|
|
||||||
if (!handle)
|
|
||||||
{
|
|
||||||
/*
|
|
||||||
* We failed to start a background worker, which probably means that we exceeded
|
|
||||||
* max_worker_processes, and this is unlikely to be resolved by retrying. We do not want
|
|
||||||
* to repeatedly throw an error because if citus_update_node is called to complete a
|
|
||||||
* failover then finishing is the only way to bring the cluster back up. Therefore we
|
|
||||||
* give up on killing other backends and simply wait for the lock. We do set
|
|
||||||
* lock_timeout to lock_cooldown, because we don't want to wait forever to get a lock.
|
|
||||||
*/
|
|
||||||
SetLockTimeoutLocally(lock_cooldown);
|
|
||||||
ereport(WARNING, (errmsg(
|
|
||||||
"could not start background worker to kill backends with conflicting"
|
|
||||||
" locks to force the update. Degrading to acquiring locks "
|
|
||||||
"with a lock time out."),
|
|
||||||
errhint(
|
|
||||||
"Increasing max_worker_processes might help.")));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
placementList = AllShardPlacementsOnNodeGroup(workerNode->groupId);
|
|
||||||
LockShardsInPlacementListMetadata(placementList, AccessExclusiveLock);
|
|
||||||
}
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* if we have planned statements such as prepared statements, we should clear the cache so that
|
* if we have planned statements such as prepared statements, we should clear the cache so that
|
||||||
|
@ -1330,6 +1373,34 @@ citus_update_node(PG_FUNCTION_ARGS)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* This function is designed to obtain locks for all the shards in a worker placement list.
|
||||||
|
* Once the transaction is committed, the acquired locks will be automatically released.
|
||||||
|
* Therefore, it is essential to invoke this function within a transaction.
|
||||||
|
* This function proves beneficial when there is a need to temporarily disable writes to a specific node within a transaction.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_pause_node_within_txn(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
CheckCitusVersion(ERROR);
|
||||||
|
|
||||||
|
int32 nodeId = PG_GETARG_INT32(0);
|
||||||
|
bool force = PG_GETARG_BOOL(1);
|
||||||
|
int32 lock_cooldown = PG_GETARG_INT32(2);
|
||||||
|
|
||||||
|
WorkerNode *workerNode = FindNodeAnyClusterByNodeId(nodeId);
|
||||||
|
if (workerNode == NULL)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errcode(ERRCODE_NO_DATA_FOUND),
|
||||||
|
errmsg("node %u not found", nodeId)));
|
||||||
|
}
|
||||||
|
|
||||||
|
LockPlacementsWithBackgroundWorkersInPrimaryNode(workerNode, force, lock_cooldown);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* master_update_node is a wrapper function for old UDF name.
|
* master_update_node is a wrapper function for old UDF name.
|
||||||
*/
|
*/
|
||||||
|
@ -1947,7 +2018,8 @@ ErrorIfNodeContainsNonRemovablePlacements(WorkerNode *workerNode)
|
||||||
ereport(ERROR, (errmsg("cannot remove or disable the node "
|
ereport(ERROR, (errmsg("cannot remove or disable the node "
|
||||||
"%s:%d because because it contains "
|
"%s:%d because because it contains "
|
||||||
"the only shard placement for "
|
"the only shard placement for "
|
||||||
"shard " UINT64_FORMAT, workerNode->workerName,
|
"shard " UINT64_FORMAT,
|
||||||
|
workerNode->workerName,
|
||||||
workerNode->workerPort, placement->shardId),
|
workerNode->workerPort, placement->shardId),
|
||||||
errdetail("One of the table(s) that prevents the operation "
|
errdetail("One of the table(s) that prevents the operation "
|
||||||
"complete successfully is %s",
|
"complete successfully is %s",
|
||||||
|
@ -2499,7 +2571,8 @@ ErrorIfCoordinatorMetadataSetFalse(WorkerNode *workerNode, Datum value, char *fi
|
||||||
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
|
if (!valueBool && workerNode->groupId == COORDINATOR_GROUP_ID)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
|
ereport(ERROR, (errmsg("cannot change \"%s\" field of the "
|
||||||
"coordinator node", field)));
|
"coordinator node",
|
||||||
|
field)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,7 @@
|
||||||
|
|
||||||
-- bump version to 12.1-1
|
-- bump version to 12.1-1
|
||||||
|
|
||||||
|
#include "udfs/citus_pause_node_within_txn/12.1-1.sql"
|
||||||
#include "udfs/citus_prepare_pg_upgrade/12.1-1.sql"
|
#include "udfs/citus_prepare_pg_upgrade/12.1-1.sql"
|
||||||
#include "udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
#include "udfs/citus_finish_pg_upgrade/12.1-1.sql"
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
-- citus--12.1-1--12.0-1
|
-- citus--12.1-1--12.0-1
|
||||||
|
DROP FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int);
|
||||||
-- we have modified the relevant upgrade script to include any_value changes
|
-- we have modified the relevant upgrade script to include any_value changes
|
||||||
-- we don't need to upgrade this downgrade path for any_value changes
|
-- we don't need to upgrade this downgrade path for any_value changes
|
||||||
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op.
|
-- since if we are doing a Citus downgrade, not PG downgrade, then it would be no-op.
|
||||||
|
@ -12,3 +12,4 @@ DROP FUNCTION pg_catalog.citus_internal_update_none_dist_table_metadata(
|
||||||
DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
|
DROP FUNCTION pg_catalog.citus_internal_delete_placement_metadata(
|
||||||
placement_id bigint
|
placement_id bigint
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,13 @@
|
||||||
|
CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
|
||||||
|
force bool DEFAULT false,
|
||||||
|
lock_cooldown int DEFAULT 10000)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$;
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
|
||||||
|
force bool ,
|
||||||
|
lock_cooldown int )
|
||||||
|
IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC;
|
|
@ -0,0 +1,13 @@
|
||||||
|
CREATE FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
|
||||||
|
force bool DEFAULT false,
|
||||||
|
lock_cooldown int DEFAULT 10000)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$citus_pause_node_within_txn$$;
|
||||||
|
|
||||||
|
COMMENT ON FUNCTION pg_catalog.citus_pause_node_within_txn(node_id int,
|
||||||
|
force bool ,
|
||||||
|
lock_cooldown int )
|
||||||
|
IS 'pauses node with given id which leads to add lock in tables and prevent any queries to be executed on that node';
|
||||||
|
|
||||||
|
REVOKE ALL ON FUNCTION pg_catalog.citus_pause_node_within_txn(int,bool,int) FROM PUBLIC;
|
|
@ -0,0 +1,317 @@
|
||||||
|
Parsed test spec with 2 sessions
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-insert-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-insert-distributed:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into employee values(11,'e11',3);
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-insert-distributed: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-delete-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-delete-distributed:
|
||||||
|
-- Execute the DELETE statement
|
||||||
|
delete from employee where id = 9;
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-delete-distributed: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-select-distributed:
|
||||||
|
select * from employee where id = 10;
|
||||||
|
|
||||||
|
id|name|company_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
10|e10 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-insert-reference s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-insert-reference:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into city values(3,'city3');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-insert-reference: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-pause-node s1-pause-node s1-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-node-not-found s1-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE: Node not found.
|
||||||
|
step s1-node-not-found:
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_node_id int:= -1;
|
||||||
|
v_node_exists boolean := true;
|
||||||
|
v_exception_message text;
|
||||||
|
v_expected_exception_message text := '';
|
||||||
|
BEGIN
|
||||||
|
select nextval('pg_dist_node_nodeid_seq')::int into v_node_id;
|
||||||
|
select citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
EXCEPTION
|
||||||
|
WHEN SQLSTATE 'P0002' THEN
|
||||||
|
GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT;
|
||||||
|
v_expected_exception_message := 'node ' || v_node_id || ' not found';
|
||||||
|
if v_exception_message = v_expected_exception_message then
|
||||||
|
RAISE NOTICE 'Node not found.';
|
||||||
|
end if;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-insert-distributed s1-pause-node-force s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-insert-distributed:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into employee values(11,'e11',3);
|
||||||
|
|
||||||
|
step s1-pause-node-force:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
v_force boolean := true;
|
||||||
|
v_lock_cooldown int := 100;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node with force true
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
<waiting ...>
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node-force: <... completed>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
FATAL: terminating connection due to administrator command
|
||||||
|
SSL connection has been closed unexpectedly
|
||||||
|
|
|
@ -0,0 +1,318 @@
|
||||||
|
Parsed test spec with 2 sessions
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-insert-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-insert-distributed:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into employee values(11,'e11',3);
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-insert-distributed: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-delete-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-delete-distributed:
|
||||||
|
-- Execute the DELETE statement
|
||||||
|
delete from employee where id = 9;
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-delete-distributed: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-pause-node s2-begin s2-select-distributed s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-select-distributed:
|
||||||
|
select * from employee where id = 10;
|
||||||
|
|
||||||
|
id|name|company_id
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
10|e10 | 3
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s1-pause-node s2-insert-reference s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s2-insert-reference:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into city values(3,'city3');
|
||||||
|
<waiting ...>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-insert-reference: <... completed>
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-pause-node s1-pause-node s1-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s1-node-not-found s1-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
s1: NOTICE: Node not found.
|
||||||
|
step s1-node-not-found:
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_node_id int:= -1;
|
||||||
|
v_node_exists boolean := true;
|
||||||
|
v_exception_message text;
|
||||||
|
v_expected_exception_message text := '';
|
||||||
|
BEGIN
|
||||||
|
select nextval('pg_dist_node_nodeid_seq')::int into v_node_id;
|
||||||
|
select citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
EXCEPTION
|
||||||
|
WHEN SQLSTATE 'P0002' THEN
|
||||||
|
GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT;
|
||||||
|
v_expected_exception_message := 'node ' || v_node_id || ' not found';
|
||||||
|
if v_exception_message = v_expected_exception_message then
|
||||||
|
RAISE NOTICE 'Node not found.';
|
||||||
|
end if;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
|
||||||
|
starting permutation: s1-begin s2-begin s2-insert-distributed s1-pause-node-force s1-end s2-end
|
||||||
|
step s1-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-begin:
|
||||||
|
BEGIN;
|
||||||
|
|
||||||
|
step s2-insert-distributed:
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into employee values(11,'e11',3);
|
||||||
|
|
||||||
|
step s1-pause-node-force:
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
v_force boolean := true;
|
||||||
|
v_lock_cooldown int := 100;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
-- Pause the node with force true
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
<waiting ...>
|
||||||
|
s1: NOTICE:
|
||||||
|
step s1-pause-node-force: <... completed>
|
||||||
|
step s1-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
step s2-end:
|
||||||
|
COMMIT;
|
||||||
|
|
||||||
|
FATAL: terminating connection due to administrator command
|
||||||
|
FATAL: terminating connection due to administrator command
|
||||||
|
SSL connection has been closed unexpectedly
|
||||||
|
|
|
@ -1403,7 +1403,8 @@ SELECT * FROM multi_extension.print_extension_changes();
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
| function citus_internal_delete_placement_metadata(bigint) void
|
| function citus_internal_delete_placement_metadata(bigint) void
|
||||||
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
|
| function citus_internal_update_none_dist_table_metadata(oid,"char",bigint,boolean) void
|
||||||
(2 rows)
|
| function citus_pause_node_within_txn(integer,boolean,integer) void
|
||||||
|
(3 rows)
|
||||||
|
|
||||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||||
-- show running version
|
-- show running version
|
||||||
|
|
|
@ -105,6 +105,7 @@ ORDER BY 1;
|
||||||
function citus_nodeid_for_gpid(bigint)
|
function citus_nodeid_for_gpid(bigint)
|
||||||
function citus_nodename_for_nodeid(integer)
|
function citus_nodename_for_nodeid(integer)
|
||||||
function citus_nodeport_for_nodeid(integer)
|
function citus_nodeport_for_nodeid(integer)
|
||||||
|
function citus_pause_node_within_txn(integer,boolean,integer)
|
||||||
function citus_pid_for_gpid(bigint)
|
function citus_pid_for_gpid(bigint)
|
||||||
function citus_prepare_pg_upgrade()
|
function citus_prepare_pg_upgrade()
|
||||||
function citus_query_stats()
|
function citus_query_stats()
|
||||||
|
@ -340,5 +341,5 @@ ORDER BY 1;
|
||||||
view citus_stat_tenants_local
|
view citus_stat_tenants_local
|
||||||
view pg_dist_shard_placement
|
view pg_dist_shard_placement
|
||||||
view time_partitions
|
view time_partitions
|
||||||
(330 rows)
|
(331 rows)
|
||||||
|
|
||||||
|
|
|
@ -77,6 +77,7 @@ test: isolation_global_pid
|
||||||
test: isolation_citus_locks
|
test: isolation_citus_locks
|
||||||
test: isolation_reference_table
|
test: isolation_reference_table
|
||||||
test: isolation_schema_based_sharding
|
test: isolation_schema_based_sharding
|
||||||
|
test: isolation_citus_pause_node
|
||||||
test: isolation_citus_schema_distribute_undistribute
|
test: isolation_citus_schema_distribute_undistribute
|
||||||
|
|
||||||
# Rebalancer
|
# Rebalancer
|
||||||
|
|
|
@ -0,0 +1,185 @@
|
||||||
|
setup
|
||||||
|
{
|
||||||
|
SET citus.shard_replication_factor to 1;
|
||||||
|
|
||||||
|
create table city (id int , name text );
|
||||||
|
SELECT create_reference_table('city');
|
||||||
|
|
||||||
|
CREATE TABLE company(id int primary key, name text, city_id int);
|
||||||
|
select create_distributed_table('company', 'id');
|
||||||
|
|
||||||
|
create table employee(id int , name text, company_id int );
|
||||||
|
alter table employee add constraint employee_pkey primary key (id,company_id);
|
||||||
|
|
||||||
|
select create_distributed_table('employee', 'company_id');
|
||||||
|
|
||||||
|
insert into city values(1,'city1');
|
||||||
|
insert into city values(2,'city2');
|
||||||
|
|
||||||
|
|
||||||
|
insert into company values(1,'c1', 1);
|
||||||
|
insert into company values(2,'c2',2);
|
||||||
|
insert into company values(3,'c3',1);
|
||||||
|
|
||||||
|
insert into employee values(1,'e1',1);
|
||||||
|
insert into employee values(2,'e2',1);
|
||||||
|
insert into employee values(3,'e3',1);
|
||||||
|
|
||||||
|
insert into employee values(4,'e4',2);
|
||||||
|
insert into employee values(5,'e5',2);
|
||||||
|
insert into employee values(6,'e6',2);
|
||||||
|
|
||||||
|
insert into employee values(7,'e7',3);
|
||||||
|
insert into employee values(8,'e8',3);
|
||||||
|
insert into employee values(9,'e9',3);
|
||||||
|
insert into employee values(10,'e10',3);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
teardown
|
||||||
|
{
|
||||||
|
DROP TABLE employee,company,city;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s1"
|
||||||
|
|
||||||
|
step "s1-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-node-not-found"
|
||||||
|
{
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_node_id int:= -1;
|
||||||
|
v_node_exists boolean := true;
|
||||||
|
v_exception_message text;
|
||||||
|
v_expected_exception_message text := '';
|
||||||
|
BEGIN
|
||||||
|
select nextval('pg_dist_node_nodeid_seq')::int into v_node_id;
|
||||||
|
select citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
EXCEPTION
|
||||||
|
WHEN SQLSTATE 'P0002' THEN
|
||||||
|
GET STACKED DIAGNOSTICS v_exception_message = MESSAGE_TEXT;
|
||||||
|
v_expected_exception_message := 'node ' || v_node_id || ' not found';
|
||||||
|
if v_exception_message = v_expected_exception_message then
|
||||||
|
RAISE NOTICE 'Node not found.';
|
||||||
|
end if;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-pause-node"
|
||||||
|
{
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
|
||||||
|
|
||||||
|
-- Pause the node
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-pause-node-force"
|
||||||
|
{
|
||||||
|
SET client_min_messages = 'notice';
|
||||||
|
DO $$
|
||||||
|
DECLARE
|
||||||
|
v_shard_id int;
|
||||||
|
v_node_id int;
|
||||||
|
v_node_name text;
|
||||||
|
v_node_port int;
|
||||||
|
v_force boolean := true;
|
||||||
|
v_lock_cooldown int := 100;
|
||||||
|
BEGIN
|
||||||
|
--The first message in the block is being printed on the top of the code block. So adding a dummy message
|
||||||
|
--to make sure that the first message is printed in correct place.
|
||||||
|
|
||||||
|
raise notice '';
|
||||||
|
-- Get the shard id for the distribution column
|
||||||
|
SELECT get_shard_id_for_distribution_column('employee', 3) into v_shard_id;
|
||||||
|
|
||||||
|
--Get the node id for the shard id
|
||||||
|
SELECT nodename,nodeport into v_node_name,v_node_port FROM citus_shards WHERE shardid = v_shard_id limit 1;
|
||||||
|
|
||||||
|
-- Get the node id for the shard id
|
||||||
|
SELECT nodeid into v_node_id FROM pg_dist_node WHERE nodename = v_node_name and nodeport = v_node_port limit 1;
|
||||||
|
|
||||||
|
|
||||||
|
-- Pause the node with force true
|
||||||
|
perform pg_catalog.citus_pause_node_within_txn(v_node_id,v_force,v_lock_cooldown) ;
|
||||||
|
END;
|
||||||
|
$$
|
||||||
|
LANGUAGE plpgsql;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s1-end"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
session "s2"
|
||||||
|
|
||||||
|
|
||||||
|
step "s2-begin"
|
||||||
|
{
|
||||||
|
BEGIN;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-insert-distributed"
|
||||||
|
{
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into employee values(11,'e11',3);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-insert-reference"{
|
||||||
|
-- Execute the INSERT statement
|
||||||
|
insert into city values(3,'city3');
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-select-distributed"{
|
||||||
|
|
||||||
|
select * from employee where id = 10;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
step "s2-delete-distributed"{
|
||||||
|
-- Execute the DELETE statement
|
||||||
|
delete from employee where id = 9;
|
||||||
|
}
|
||||||
|
|
||||||
|
step "s2-end"
|
||||||
|
{
|
||||||
|
COMMIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-insert-distributed" "s1-end" "s2-end"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-delete-distributed" "s1-end" "s2-end"
|
||||||
|
permutation "s1-begin" "s1-pause-node" "s2-begin" "s2-select-distributed" "s1-end" "s2-end"
|
||||||
|
permutation "s1-begin" "s2-begin" "s1-pause-node" "s2-insert-reference" "s1-end" "s2-end"
|
||||||
|
permutation "s1-begin" "s1-pause-node" "s1-pause-node" "s1-end"
|
||||||
|
permutation "s1-begin" "s1-node-not-found" "s1-end"
|
||||||
|
permutation "s1-begin" "s2-begin" "s2-insert-distributed" "s1-pause-node-force"(*) "s1-end" "s2-end"
|
Loading…
Reference in New Issue