Merge branch 'master' into velioglu/table_wo_seq_prototype

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-22 11:24:40 +03:00
commit 81a6cb47d6
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
106 changed files with 1626 additions and 363 deletions

6
configure vendored
View File

@ -4543,7 +4543,9 @@ if test "${with_lz4+set}" = set; then :
withval=$with_lz4; withval=$with_lz4;
case $withval in case $withval in
yes) yes)
:
$as_echo "#define HAVE_CITUS_LIBLZ4 1" >>confdefs.h
;; ;;
no) no)
: :
@ -4556,6 +4558,8 @@ if test "${with_lz4+set}" = set; then :
else else
with_lz4=yes with_lz4=yes
$as_echo "#define HAVE_CITUS_LIBLZ4 1" >>confdefs.h
fi fi

View File

@ -220,7 +220,8 @@ AC_DEFINE_UNQUOTED(REPORTS_BASE_URL, "$REPORTS_BASE_URL",
# LZ4 # LZ4
# #
PGAC_ARG_BOOL(with, lz4, yes, PGAC_ARG_BOOL(with, lz4, yes,
[do not use lz4]) [do not use lz4],
[AC_DEFINE([HAVE_CITUS_LIBLZ4], 1, [Define to 1 to build with lz4 support. (--with-lz4)])])
AC_SUBST(with_lz4) AC_SUBST(with_lz4)
if test "$with_lz4" = yes; then if test "$with_lz4" = yes; then

View File

@ -29,7 +29,7 @@
#if HAVE_LIBZSTD #if HAVE_LIBZSTD
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_ZSTD #define DEFAULT_COMPRESSION_TYPE COMPRESSION_ZSTD
#elif HAVE_LIBLZ4 #elif HAVE_CITUS_LIBLZ4
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_LZ4 #define DEFAULT_COMPRESSION_TYPE COMPRESSION_LZ4
#else #else
#define DEFAULT_COMPRESSION_TYPE COMPRESSION_PG_LZ #define DEFAULT_COMPRESSION_TYPE COMPRESSION_PG_LZ
@ -44,7 +44,7 @@ static const struct config_enum_entry columnar_compression_options[] =
{ {
{ "none", COMPRESSION_NONE, false }, { "none", COMPRESSION_NONE, false },
{ "pglz", COMPRESSION_PG_LZ, false }, { "pglz", COMPRESSION_PG_LZ, false },
#if HAVE_LIBLZ4 #if HAVE_CITUS_LIBLZ4
{ "lz4", COMPRESSION_LZ4, false }, { "lz4", COMPRESSION_LZ4, false },
#endif #endif
#if HAVE_LIBZSTD #if HAVE_LIBZSTD

View File

@ -19,7 +19,7 @@
#include "columnar/columnar_compression.h" #include "columnar/columnar_compression.h"
#if HAVE_LIBLZ4 #if HAVE_CITUS_LIBLZ4
#include <lz4.h> #include <lz4.h>
#endif #endif
@ -63,7 +63,7 @@ CompressBuffer(StringInfo inputBuffer,
{ {
switch (compressionType) switch (compressionType)
{ {
#if HAVE_LIBLZ4 #if HAVE_CITUS_LIBLZ4
case COMPRESSION_LZ4: case COMPRESSION_LZ4:
{ {
int maximumLength = LZ4_compressBound(inputBuffer->len); int maximumLength = LZ4_compressBound(inputBuffer->len);
@ -170,7 +170,7 @@ DecompressBuffer(StringInfo buffer,
return buffer; return buffer;
} }
#if HAVE_LIBLZ4 #if HAVE_CITUS_LIBLZ4
case COMPRESSION_LZ4: case COMPRESSION_LZ4:
{ {
StringInfo decompressedBuffer = makeStringInfo(); StringInfo decompressedBuffer = makeStringInfo();

View File

@ -70,8 +70,6 @@ columnar_store_memory_stats(PG_FUNCTION_ARGS)
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
tuplestore_donestoring(tupleStore);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
} }

View File

@ -80,6 +80,7 @@ static void EnsureSequentialModeForFunctionDDL(void);
static void TriggerSyncMetadataToPrimaryNodes(void); static void TriggerSyncMetadataToPrimaryNodes(void);
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
static bool ShouldPropagateAlterFunction(const ObjectAddress *address); static bool ShouldPropagateAlterFunction(const ObjectAddress *address);
static bool ShouldAddFunctionSignature(FunctionParameterMode mode);
static ObjectAddress FunctionToObjectAddress(ObjectType objectType, static ObjectAddress FunctionToObjectAddress(ObjectType objectType,
ObjectWithArgs *objectWithArgs, ObjectWithArgs *objectWithArgs,
bool missing_ok); bool missing_ok);
@ -1328,7 +1329,11 @@ CreateFunctionStmtObjectAddress(Node *node, bool missing_ok)
FunctionParameter *funcParam = NULL; FunctionParameter *funcParam = NULL;
foreach_ptr(funcParam, stmt->parameters) foreach_ptr(funcParam, stmt->parameters)
{ {
objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType); if (ShouldAddFunctionSignature(funcParam->mode))
{
objectWithArgs->objargs = lappend(objectWithArgs->objargs,
funcParam->argType);
}
} }
return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok); return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok);
@ -1885,8 +1890,7 @@ ObjectWithArgsFromOid(Oid funcOid)
for (int i = 0; i < numargs; i++) for (int i = 0; i < numargs; i++)
{ {
if (argModes == NULL || if (argModes == NULL || ShouldAddFunctionSignature(argModes[i]))
argModes[i] != PROARGMODE_OUT || argModes[i] != PROARGMODE_TABLE)
{ {
objargs = lappend(objargs, makeTypeNameFromOid(argTypes[i], -1)); objargs = lappend(objargs, makeTypeNameFromOid(argTypes[i], -1));
} }
@ -1899,6 +1903,35 @@ ObjectWithArgsFromOid(Oid funcOid)
} }
/*
* ShouldAddFunctionSignature takes a FunctionParameterMode and returns true if it should
* be included in the function signature. Returns false otherwise.
*/
static bool
ShouldAddFunctionSignature(FunctionParameterMode mode)
{
/* only input parameters should be added to the generated signature */
switch (mode)
{
case FUNC_PARAM_IN:
case FUNC_PARAM_INOUT:
case FUNC_PARAM_VARIADIC:
{
return true;
}
case FUNC_PARAM_OUT:
case FUNC_PARAM_TABLE:
{
return false;
}
default:
return true;
}
}
/* /*
* FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on * FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on
* its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to * its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to

View File

@ -24,6 +24,7 @@
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/deparse_shard_query.h" #include "distributed/deparse_shard_query.h"
#include "distributed/deparser.h"
#include "distributed/distributed_planner.h" #include "distributed/distributed_planner.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
@ -71,6 +72,7 @@ static void RangeVarCallbackForReindexIndex(const RangeVar *rel, Oid relOid, Oid
oldRelOid, oldRelOid,
void *arg); void *arg);
static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement);
static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement);
static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt);
@ -676,21 +678,9 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
bool isCitusRelation = IsCitusTable(relationId); bool isCitusRelation = IsCitusTable(relationId);
if (isCitusRelation) if (isCitusRelation)
{ {
if (OidIsValid(distributedIndexId))
{
/*
* We already have a distributed index in the list, and Citus
* currently only support dropping a single distributed index.
*/
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop multiple distributed objects in "
"a single command"),
errhint("Try dropping each object in a separate DROP "
"command.")));
}
distributedIndexId = indexId; distributedIndexId = indexId;
distributedRelationId = relationId; distributedRelationId = relationId;
break;
} }
} }
@ -698,6 +688,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ErrorIfUnsupportedDropIndexStmt(dropIndexStatement);
if (AnyForeignKeyDependsOnIndex(distributedIndexId)) if (AnyForeignKeyDependsOnIndex(distributedIndexId))
{ {
MarkInvalidateForeignKeyGraph(); MarkInvalidateForeignKeyGraph();
@ -1159,6 +1151,26 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
} }
/*
* ErrorIfUnsupportedDropIndexStmt checks if the corresponding drop index statement is
* supported for distributed tables and errors out if it is not.
*/
static void
ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement)
{
Assert(dropIndexStatement->removeType == OBJECT_INDEX);
if (list_length(dropIndexStatement->objects) > 1)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot drop multiple distributed objects in a "
"single command"),
errhint("Try dropping each object in a separate DROP "
"command.")));
}
}
/* /*
* DropIndexTaskList builds a list of tasks to execute a DROP INDEX command * DropIndexTaskList builds a list of tasks to execute a DROP INDEX command
* against a specified distributed table. * against a specified distributed table.

View File

@ -387,7 +387,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
parsetree = ProcessCreateSubscriptionStmt(createSubStmt); parsetree = ProcessCreateSubscriptionStmt(createSubStmt);
} }
/* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */ /*
* Process SET LOCAL and SET TRANSACTION statements in multi-statement
* transactions.
*/
if (IsA(parsetree, VariableSetStmt)) if (IsA(parsetree, VariableSetStmt))
{ {
VariableSetStmt *setStmt = (VariableSetStmt *) parsetree; VariableSetStmt *setStmt = (VariableSetStmt *) parsetree;

View File

@ -35,6 +35,7 @@ static bool IsSettingSafeToPropagate(char *name);
* *
* We currently propagate: * We currently propagate:
* - SET LOCAL (for allowed settings) * - SET LOCAL (for allowed settings)
* - SET TRANSACTION
* - RESET (for allowed settings) * - RESET (for allowed settings)
* - RESET ALL * - RESET ALL
*/ */
@ -72,8 +73,8 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt)
case VAR_SET_MULTI: case VAR_SET_MULTI:
default: default:
{ {
/* SET (LOCAL) TRANSACTION should be handled locally */ /* SET TRANSACTION is similar to SET LOCAL */
return false; return strcmp(setStmt->name, "TRANSACTION") == 0;
} }
} }
} }
@ -121,7 +122,7 @@ PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString)
const bool raiseInterrupts = true; const bool raiseInterrupts = true;
List *connectionList = NIL; List *connectionList = NIL;
/* at present we only support SET LOCAL */ /* at present we only support SET LOCAL and SET TRANSACTION */
AssertArg(ShouldPropagateSetCommand(setStmt)); AssertArg(ShouldPropagateSetCommand(setStmt));
/* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */ /* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */

View File

@ -118,9 +118,6 @@ citus_reserved_connection_stats(PG_FUNCTION_ARGS)
StoreAllReservedConnections(tupleStore, tupleDescriptor); StoreAllReservedConnections(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -22,6 +22,8 @@
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/fmgrprotos.h"
#include "utils/palloc.h" #include "utils/palloc.h"
@ -34,6 +36,7 @@ int RemoteCopyFlushThreshold = 8 * 1024 * 1024;
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
bool LogRemoteCommands = false; bool LogRemoteCommands = false;
char *GrepRemoteCommands = "";
static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors, static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors,
@ -328,7 +331,6 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel)
/* *INDENT-ON* */ /* *INDENT-ON* */
/* /*
* LogRemoteCommand logs commands send to remote nodes if * LogRemoteCommand logs commands send to remote nodes if
* citus.log_remote_commands wants us to do so. * citus.log_remote_commands wants us to do so.
@ -341,6 +343,11 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
return; return;
} }
if (!CommandMatchesLogGrepPattern(command))
{
return;
}
ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)), ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)),
errdetail("on server %s@%s:%d connectionId: %ld", connection->user, errdetail("on server %s@%s:%d connectionId: %ld", connection->user,
connection->hostname, connection->hostname,
@ -348,6 +355,29 @@ LogRemoteCommand(MultiConnection *connection, const char *command)
} }
/*
* CommandMatchesLogGrepPattern returns true of the input command matches
* the pattern specified by citus.grep_remote_commands.
*
* If citus.grep_remote_commands set to an empty string, all commands are
* considered as a match.
*/
bool
CommandMatchesLogGrepPattern(const char *command)
{
if (GrepRemoteCommands && strnlen(GrepRemoteCommands, NAMEDATALEN) > 0)
{
Datum boolDatum =
DirectFunctionCall2(textlike, CStringGetTextDatum(command),
CStringGetTextDatum(GrepRemoteCommands));
return DatumGetBool(boolDatum);
}
return true;
}
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */

View File

@ -143,9 +143,6 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS)
StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -861,8 +861,6 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat,
tupleStore); tupleStore);
} }
} }
tuplestore_donestoring(tupleStore);
} }

View File

@ -513,8 +513,14 @@ LogLocalCommand(Task *task)
return; return;
} }
const char *command = TaskQueryString(task);
if (!CommandMatchesLogGrepPattern(command))
{
return;
}
ereport(NOTICE, (errmsg("executing the command locally: %s", ereport(NOTICE, (errmsg("executing the command locally: %s",
ApplyLogRedaction(TaskQueryString(task))))); ApplyLogRedaction(command))));
} }

View File

@ -262,8 +262,6 @@ worker_partition_query_result(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, returnTupleDesc, values, nulls); tuplestore_putvalues(tupleStore, returnTupleDesc, values, nulls);
} }
tuplestore_donestoring(tupleStore);
PortalDrop(portal, false); PortalDrop(portal, false);
FreeExecutorState(estate); FreeExecutorState(estate);

View File

@ -245,9 +245,6 @@ citus_shard_sizes(PG_FUNCTION_ARGS)
ReceiveShardNameAndSizeResults(connectionList, tupleStore, tupleDescriptor); ReceiveShardNameAndSizeResults(connectionList, tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -14,7 +14,6 @@
#include "access/htup_details.h" #include "access/htup_details.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "distributed/argutils.h"
#include "distributed/connection_management.h" #include "distributed/connection_management.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/multi_client_executor.h" #include "distributed/multi_client_executor.h"
@ -28,13 +27,9 @@
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/builtins.h" #include "utils/builtins.h"
/* simple query to run on workers to check connectivity */
#define CONNECTIVITY_CHECK_QUERY "SELECT 1"
PG_FUNCTION_INFO_V1(citus_check_connection_to_node);
PG_FUNCTION_INFO_V1(master_run_on_worker); PG_FUNCTION_INFO_V1(master_run_on_worker);
static bool CheckConnectionToNode(char *nodeName, uint32 nodePort);
static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray,
int **nodePortsArray, StringInfo **commandStringArray, int **nodePortsArray, StringInfo **commandStringArray,
bool *parallel); bool *parallel);
@ -63,36 +58,6 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor,
StringInfo *resultArray, int commandCount); StringInfo *resultArray, int commandCount);
/*
* citus_check_connection_to_node sends a simple query from a worker node to another
* node, and returns success status.
*/
Datum
citus_check_connection_to_node(PG_FUNCTION_ARGS)
{
char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0);
uint32 nodePort = PG_GETARG_UINT32(1);
bool success = CheckConnectionToNode(nodeName, nodePort);
PG_RETURN_BOOL(success);
}
/*
* CheckConnectionToNode sends a simple query to a node and returns success status
*/
static bool
CheckConnectionToNode(char *nodeName, uint32 nodePort)
{
int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int responseStatus = ExecuteOptionalRemoteCommand(connection,
CONNECTIVITY_CHECK_QUERY, NULL);
return responseStatus == RESPONSE_OKAY;
}
/* /*
* master_run_on_worker executes queries/commands to run on specified worker and * master_run_on_worker executes queries/commands to run on specified worker and
* returns success status and query/command result. Expected input is 3 arrays * returns success status and query/command result. Expected input is 3 arrays
@ -592,8 +557,5 @@ CreateTupleStore(TupleDesc tupleDescriptor,
pfree(nodeNameText); pfree(nodeNameText);
pfree(resultText); pfree(resultText);
} }
tuplestore_donestoring(tupleStore);
return tupleStore; return tupleStore;
} }

View File

@ -0,0 +1,198 @@
/*-------------------------------------------------------------------------
*
* health_check.c
*
* UDFs to run health check operations by coordinating simple queries to test connectivity
* between connection pairs in the cluster.
*
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "distributed/argutils.h"
#include "distributed/listutils.h"
#include "distributed/lock_graph.h"
#include "distributed/remote_commands.h"
#include "distributed/tuplestore.h"
#include "utils/builtins.h"
/* simple query to run on workers to check connectivity */
#define CONNECTIVITY_CHECK_QUERY "SELECT 1"
#define CONNECTIVITY_CHECK_COLUMNS 5
PG_FUNCTION_INFO_V1(citus_check_connection_to_node);
PG_FUNCTION_INFO_V1(citus_check_cluster_node_health);
static bool CheckConnectionToNode(char *nodeName, uint32 nodePort);
static void StoreAllConnectivityChecks(Tuplestorestate *tupleStore,
TupleDesc tupleDescriptor);
static char * GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort);
/*
* citus_check_connection_to_node sends a simple query from a worker node to another
* node, and returns success status.
*/
Datum
citus_check_connection_to_node(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0);
uint32 nodePort = PG_GETARG_UINT32(1);
bool success = CheckConnectionToNode(nodeName, nodePort);
PG_RETURN_BOOL(success);
}
/*
* CheckConnectionToNode sends a simple query to a node and returns success status
*/
static bool
CheckConnectionToNode(char *nodeName, uint32 nodePort)
{
int connectionFlags = 0;
MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort);
int responseStatus = ExecuteOptionalRemoteCommand(connection,
CONNECTIVITY_CHECK_QUERY, NULL);
return responseStatus == RESPONSE_OKAY;
}
/*
* citus_check_cluster_node_health UDF performs connectivity checks from all the nodes to
* all the nodes, and report success status
*/
Datum
citus_check_cluster_node_health(PG_FUNCTION_ARGS)
{
CheckCitusVersion(ERROR);
TupleDesc tupleDescriptor = NULL;
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
StoreAllConnectivityChecks(tupleStore, tupleDescriptor);
PG_RETURN_VOID();
}
/*
* StoreAllConnectivityChecks performs connectivity checks from all the nodes to all the
* nodes, and report success status.
*
* Algorithm is:
* for sourceNode in activeReadableNodeList:
* c = connectToNode(sourceNode)
* for targetNode in activeReadableNodeList:
* result = c.execute("SELECT citus_check_connection_to_node(targetNode.name, targetNode.port")
* emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result
*
* -- result -> true -> connection attempt from source to target succeeded
* -- result -> false -> connection attempt from source to target failed
* -- result -> NULL -> connection attempt from the current node to source node failed
*/
static void
StoreAllConnectivityChecks(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor)
{
Datum values[CONNECTIVITY_CHECK_COLUMNS];
bool isNulls[CONNECTIVITY_CHECK_COLUMNS];
/*
* Get all the readable node list so that we will check connectivity to followers in
* the cluster as well.
*/
List *workerNodeList = ActiveReadableNodeList();
/* we want to check for connectivity in a deterministic order */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
/*
* We iterate over the workerNodeList twice, for source and target worker nodes. This
* operation is safe for foreach_ptr macro, as long as we use different variables for
* each iteration.
*/
WorkerNode *sourceWorkerNode = NULL;
foreach_ptr(sourceWorkerNode, workerNodeList)
{
const char *sourceNodeName = sourceWorkerNode->workerName;
const int sourceNodePort = sourceWorkerNode->workerPort;
int32 connectionFlags = 0;
/* open a connection to the source node using the synchronous api */
MultiConnection *connectionToSourceNode =
GetNodeConnection(connectionFlags, sourceNodeName, sourceNodePort);
/* the second iteration over workerNodeList for the target worker nodes. */
WorkerNode *targetWorkerNode = NULL;
foreach_ptr(targetWorkerNode, workerNodeList)
{
const char *targetNodeName = targetWorkerNode->workerName;
const int targetNodePort = targetWorkerNode->workerPort;
char *connectivityCheckCommandToTargetNode =
GetConnectivityCheckCommand(targetNodeName, targetNodePort);
PGresult *result = NULL;
int executionResult =
ExecuteOptionalRemoteCommand(connectionToSourceNode,
connectivityCheckCommandToTargetNode,
&result);
/* get ready for the next tuple */
memset(values, 0, sizeof(values));
memset(isNulls, false, sizeof(isNulls));
values[0] = PointerGetDatum(cstring_to_text(sourceNodeName));
values[1] = Int32GetDatum(sourceNodePort);
values[2] = PointerGetDatum(cstring_to_text(targetNodeName));
values[3] = Int32GetDatum(targetNodePort);
/*
* If we could not send the query or the result was not ok, set success field
* to NULL. This may indicate connection errors to a worker node, however that
* node can potentially connect to other nodes.
*
* Therefore, we mark the success as NULL to indicate that the connectivity
* status is unknown.
*/
if (executionResult != RESPONSE_OKAY)
{
isNulls[4] = true;
}
else
{
int rowIndex = 0;
int columnIndex = 0;
values[4] = BoolGetDatum(ParseBoolField(result, rowIndex, columnIndex));
}
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
PQclear(result);
ForgetResults(connectionToSourceNode);
}
}
}
/*
* GetConnectivityCheckCommand returns the command to check connections to a node
*/
static char *
GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort)
{
StringInfo connectivityCheckCommand = makeStringInfo();
appendStringInfo(connectivityCheckCommand,
"SELECT citus_check_connection_to_node('%s', %d)",
nodeName, nodePort);
return connectivityCheckCommand->data;
}

View File

@ -1058,8 +1058,6 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupstore, tupdesc, values, nulls); tuplestore_putvalues(tupstore, tupdesc, values, nulls);
} }
tuplestore_donestoring(tupstore);
return (Datum) 0; return (Datum) 0;
} }
@ -1132,8 +1130,6 @@ get_rebalance_progress(PG_FUNCTION_ARGS)
} }
} }
tuplestore_donestoring(tupstore);
DetachFromDSMSegments(segmentList); DetachFromDSMSegments(segmentList);
return (Datum) 0; return (Datum) 0;

View File

@ -1000,8 +1000,6 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
} }
tuplestore_donestoring(tupleStore);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
} }
@ -1087,8 +1085,6 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
ExplainEndOutput(es); ExplainEndOutput(es);
tuplestore_donestoring(tupleStore);
/* save EXPLAIN ANALYZE result to be fetched later */ /* save EXPLAIN ANALYZE result to be fetched later */
MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext);
FreeSavedExplainPlan(); FreeSavedExplainPlan();

View File

@ -1088,6 +1088,18 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomStringVariable(
"citus.grep_remote_commands",
gettext_noop(
"Applies \"command\" like citus.grep_remote_commands, if returns "
"true, the command is logged."),
NULL,
&GrepRemoteCommands,
"",
PGC_USERSET,
GUC_NO_SHOW_ALL,
NULL, NULL, NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.isolation_test_session_process_id", "citus.isolation_test_session_process_id",
NULL, NULL,

View File

@ -4,6 +4,8 @@
#include "udfs/citus_disable_node/11.0-1.sql" #include "udfs/citus_disable_node/11.0-1.sql"
#include "udfs/citus_check_connection_to_node/11.0-1.sql" #include "udfs/citus_check_connection_to_node/11.0-1.sql"
#include "udfs/citus_check_cluster_node_health/11.0-1.sql"
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql" #include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text);

View File

@ -41,4 +41,6 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege
IS 'removes node from the cluster temporarily'; IS 'removes node from the cluster temporarily';
DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);
DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer); DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer);

View File

@ -0,0 +1,13 @@
CREATE FUNCTION pg_catalog.citus_check_cluster_node_health (
OUT from_nodename text,
OUT from_nodeport int,
OUT to_nodename text,
OUT to_nodeport int,
OUT result bool )
RETURNS SETOF RECORD
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$;
COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health ()
IS 'checks connections between all nodes in the cluster';

View File

@ -0,0 +1,13 @@
CREATE FUNCTION pg_catalog.citus_check_cluster_node_health (
OUT from_nodename text,
OUT from_nodeport int,
OUT to_nodename text,
OUT to_nodeport int,
OUT result bool )
RETURNS SETOF RECORD
LANGUAGE C
STRICT
AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$;
COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health ()
IS 'checks connections between all nodes in the cluster';

View File

@ -67,8 +67,5 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls);
} }
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -101,9 +101,6 @@ partition_task_list_results(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
} }
tuplestore_donestoring(tupleStore);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
} }
@ -186,8 +183,5 @@ redistribute_task_list_results(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls);
} }
tuplestore_donestoring(tupleStore);
PG_RETURN_DATUM(0); PG_RETURN_DATUM(0);
} }

View File

@ -215,8 +215,5 @@ get_foreign_key_connected_relations(PG_FUNCTION_ARGS)
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
} }
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -114,8 +114,6 @@ show_progress(PG_FUNCTION_ARGS)
} }
} }
tuplestore_donestoring(tupstore);
DetachFromDSMSegments(attachedDSMSegments); DetachFromDSMSegments(attachedDSMSegments);
return (Datum) 0; return (Datum) 0;

View File

@ -323,9 +323,6 @@ get_global_active_transactions(PG_FUNCTION_ARGS)
ForgetResults(connection); ForgetResults(connection);
} }
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -344,9 +341,6 @@ get_all_active_transactions(PG_FUNCTION_ARGS)
StoreAllActiveTransactions(tupleStore, tupleDescriptor); StoreAllActiveTransactions(tupleStore, tupleDescriptor);
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -1100,7 +1100,4 @@ ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo)
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
} }
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
} }

View File

@ -322,9 +322,6 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo)
tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); tuplestore_putvalues(tupleStore, tupleDesc, values, nulls);
} }
/* clean up and return the tuplestore */
tuplestore_donestoring(tupleStore);
} }

View File

@ -34,6 +34,7 @@
#define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u"
static char * AssignDistributedTransactionIdCommand(void);
static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, static void StartRemoteTransactionSavepointBegin(MultiConnection *connection,
SubTransactionId subId); SubTransactionId subId);
static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection,
@ -68,8 +69,15 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
transaction->transactionState = REMOTE_TRANS_STARTING; transaction->transactionState = REMOTE_TRANS_STARTING;
StringInfo beginAndSetDistributedTransactionId = StringInfo beginAndSetDistributedTransactionId = makeStringInfo();
BeginAndSetDistributedTransactionIdCommand();
/*
* Explicitly specify READ COMMITTED, the default on the remote
* side might have been changed, and that would cause problematic
* behaviour.
*/
appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
/* append context for in-progress SAVEPOINTs for this transaction */ /* append context for in-progress SAVEPOINTs for this transaction */
List *activeSubXacts = ActiveSubXactContexts(); List *activeSubXacts = ActiveSubXactContexts();
@ -98,6 +106,10 @@ StartRemoteTransactionBegin(struct MultiConnection *connection)
appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data); appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data);
} }
/* add SELECT assign_distributed_transaction_id ... */
appendStringInfoString(beginAndSetDistributedTransactionId,
AssignDistributedTransactionIdCommand());
if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data))
{ {
const bool raiseErrors = true; const bool raiseErrors = true;
@ -126,6 +138,22 @@ BeginAndSetDistributedTransactionIdCommand(void)
appendStringInfoString(beginAndSetDistributedTransactionId, appendStringInfoString(beginAndSetDistributedTransactionId,
"BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;");
appendStringInfoString(beginAndSetDistributedTransactionId,
AssignDistributedTransactionIdCommand());
return beginAndSetDistributedTransactionId;
}
/*
* AssignDistributedTransactionIdCommand returns a command to set the local
* distributed transaction ID on a remote transaction.
*/
static char *
AssignDistributedTransactionIdCommand(void)
{
StringInfo assignDistributedTransactionId = makeStringInfo();
/* /*
* Append BEGIN and assign_distributed_transaction_id() statements into a single command * Append BEGIN and assign_distributed_transaction_id() statements into a single command
* and send both in one step. The reason is purely performance, we don't want * and send both in one step. The reason is purely performance, we don't want
@ -134,14 +162,14 @@ BeginAndSetDistributedTransactionIdCommand(void)
DistributedTransactionId *distributedTransactionId = DistributedTransactionId *distributedTransactionId =
GetCurrentDistributedTransactionId(); GetCurrentDistributedTransactionId();
const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp); const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp);
appendStringInfo(beginAndSetDistributedTransactionId, appendStringInfo(assignDistributedTransactionId,
"SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT
", '%s');", ", '%s');",
distributedTransactionId->initiatorNodeIdentifier, distributedTransactionId->initiatorNodeIdentifier,
distributedTransactionId->transactionNumber, distributedTransactionId->transactionNumber,
timestamp); timestamp);
return beginAndSetDistributedTransactionId; return assignDistributedTransactionId->data;
} }

View File

@ -61,7 +61,7 @@ WrapCreateOrReplace(const char *sql)
* have this functionality or where their implementation is not sufficient. * have this functionality or where their implementation is not sufficient.
* *
* Besides checking if an object of said name exists it tries to compare the object to be * Besides checking if an object of said name exists it tries to compare the object to be
* created with the one in the local catalog. If there is a difference the on in the local * created with the one in the local catalog. If there is a difference the one in the local
* catalog will be renamed after which the statement can be executed on this worker to * catalog will be renamed after which the statement can be executed on this worker to
* create the object. * create the object.
* *

View File

@ -38,7 +38,7 @@
#undef HAVE_LIBCURL #undef HAVE_LIBCURL
/* Define to 1 if you have the `lz4' library (-llz4). */ /* Define to 1 if you have the `lz4' library (-llz4). */
#undef HAVE_LIBLZ4 #undef HAVE_CITUS_LIBLZ4
/* Define to 1 if you have the `zstd' library (-lzstd). */ /* Define to 1 if you have the `zstd' library (-lzstd). */
#undef HAVE_LIBZSTD #undef HAVE_LIBZSTD

View File

@ -25,7 +25,7 @@
#undef HAVE_LIBCURL #undef HAVE_LIBCURL
/* Define to 1 if you have the `liblz4' library (-llz4). */ /* Define to 1 if you have the `liblz4' library (-llz4). */
#undef HAVE_LIBLZ4 #undef HAVE_CITUS_LIBLZ4
/* Define to 1 if you have the `libzstd' library (-lzstd). */ /* Define to 1 if you have the `libzstd' library (-lzstd). */
#undef HAVE_LIBZSTD #undef HAVE_LIBZSTD

View File

@ -20,6 +20,7 @@
/* GUC, determining whether statements sent to remote nodes are logged */ /* GUC, determining whether statements sent to remote nodes are logged */
extern bool LogRemoteCommands; extern bool LogRemoteCommands;
extern char *GrepRemoteCommands;
/* GUC that determines the number of bytes after which remote COPY is flushed */ /* GUC that determines the number of bytes after which remote COPY is flushed */
extern int RemoteCopyFlushThreshold; extern int RemoteCopyFlushThreshold;
@ -38,6 +39,7 @@ extern void ReportResultError(MultiConnection *connection, PGresult *result,
int elevel); int elevel);
extern char * pchomp(const char *in); extern char * pchomp(const char *in);
extern void LogRemoteCommand(MultiConnection *connection, const char *command); extern void LogRemoteCommand(MultiConnection *connection, const char *command);
extern bool CommandMatchesLogGrepPattern(const char *command);
/* wrappers around libpq functions, with command logging support */ /* wrappers around libpq functions, with command logging support */
extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection, extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection,

View File

@ -1,6 +1,5 @@
# this schedule is to be run only on coordinators # this schedule is to be run only on coordinators
test: turn_mx_off
test: upgrade_basic_after test: upgrade_basic_after
test: upgrade_partition_constraints_after test: upgrade_partition_constraints_after
test: upgrade_pg_dist_object_test_after test: upgrade_pg_dist_object_test_after

View File

@ -1,7 +1,6 @@
# ---------- # ----------
# Only run few basic tests to set up a testing environment # Only run few basic tests to set up a testing environment
# ---------- # ----------
test: turn_mx_off
test: multi_cluster_management test: multi_cluster_management
test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw columnar_test_helpers test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw columnar_test_helpers
test: multi_test_catalog_views test: multi_test_catalog_views

View File

@ -1,5 +1,4 @@
# The basic tests runs analyze which depends on shard numbers # The basic tests runs analyze which depends on shard numbers
test: turn_mx_off
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views test: multi_test_catalog_views
test: upgrade_basic_before test: upgrade_basic_before

View File

@ -65,6 +65,9 @@ s/"(raw_events_second_user_id_value_1_key_|agg_events_user_id_value_1_agg_key_)[
# ignore WAL warnings # ignore WAL warnings
/DEBUG: .+creating and filling new WAL file/d /DEBUG: .+creating and filling new WAL file/d
# normalize debug connection failure
s/DEBUG: connection to the remote node/WARNING: connection to the remote node/g
# normalize file names for partitioned files # normalize file names for partitioned files
s/(task_[0-9]+\.)[0-9]+/\1xxxx/g s/(task_[0-9]+\.)[0-9]+/\1xxxx/g
s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g

View File

@ -272,16 +272,6 @@ class CitusUnusualExecutorConfig(CitusMXBaseClusterConfig):
} }
class CitusCacheManyConnectionsConfig(CitusMXBaseClusterConfig):
def __init__(self, arguments):
super().__init__(arguments)
self.new_settings = {
"citus.copy_switchover_threshold": "1B",
"citus.local_copy_flush_threshold": "1B",
"citus.remote_copy_flush_threshold": "1B",
}
class CitusSmallCopyBuffersConfig(CitusMXBaseClusterConfig): class CitusSmallCopyBuffersConfig(CitusMXBaseClusterConfig):
def __init__(self, arguments): def __init__(self, arguments):
super().__init__(arguments) super().__init__(arguments)

View File

@ -3,3 +3,4 @@ test: prepared_statements_create_load ch_benchmarks_create_load
test: dropped_columns_create_load distributed_planning_create_load test: dropped_columns_create_load distributed_planning_create_load
test: local_dist_join_load test: local_dist_join_load
test: partitioned_indexes_create test: partitioned_indexes_create
test: connectivity_checks

View File

@ -2,6 +2,7 @@
-- ADD_COORDINATOR -- ADD_COORDINATOR
-- --
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
-- adding the same node again should return the existing nodeid -- adding the same node again should return the existing nodeid
SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid; SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid;
?column? ?column?

View File

@ -0,0 +1,6 @@
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
bool_and
---------------------------------------------------------------------
t
(1 row)

View File

@ -610,7 +610,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist
-- show that we are able to propagate objects with multiple item on address arrays -- show that we are able to propagate objects with multiple item on address arrays
SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%'; SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%';
metadata_command metadata_command
--------------------------------------------------------------------- ---------------------------------------------------------------------
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data; WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data;
(1 row) (1 row)
@ -829,6 +829,266 @@ SELECT * FROM test ORDER BY id;
(2 rows) (2 rows)
DROP TABLE test; DROP TABLE test;
-- verify that recreating distributed functions with TABLE params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-01'::date;
END;
$$;
SELECT create_distributed_function('func_with_return_table(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-02'::date;
END;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_return_table';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_return_table')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed functions with OUT params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int)
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
SELECT create_distributed_function('func_with_out_param(int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO ERROR;
CREATE ROLE r1;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;
SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
RESET client_min_messages;
CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int)
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'func_with_out_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'func_with_out_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed functions with INOUT params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int)
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
-- this should error out
SELECT create_distributed_function('func_with_inout_param(int)');
ERROR: function "func_with_inout_param(int)" does not exist
-- this should work
SELECT create_distributed_function('func_with_inout_param(int,int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int)
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_inout_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_inout_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed functions with VARIADIC params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[])
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
-- this should work
SELECT create_distributed_function('func_with_variadic_param(int,int[])');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[])
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_variadic_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed functions returning setof records gets propagated to workers
CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval)
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 1;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT create_distributed_function('func_returning_setof_int(date,interval)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval)
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 2;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed functions with variadic param returning setof records gets propagated to workers
CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 1;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT create_distributed_function('func_returning_setof_int_with_variadic_param(date,int[])');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 2;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int_with_variadic_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed procedures with out params gets propagated to workers
CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
LANGUAGE SQL
AS $$
SELECT 1;
$$;
-- this should error out
SELECT create_distributed_function('proc_with_variadic_param(date)');
ERROR: function "proc_with_variadic_param(date)" does not exist
-- this should work
SELECT create_distributed_function('proc_with_variadic_param(date,int[])');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
LANGUAGE SQL
AS $$
SELECT 2;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_variadic_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
-- verify that recreating distributed procedures with INOUT param gets propagated to workers
CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 1;
$$;
-- this should error out
SELECT create_distributed_function('proc_with_inout_param(date)');
ERROR: function "proc_with_inout_param(date)" does not exist
-- this should work
SELECT create_distributed_function('proc_with_inout_param(date,int)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 2;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_inout_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_inout_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;

View File

@ -139,6 +139,27 @@ SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_
f f
(1 row) (1 row)
-- test worker_create_or_replace_object with a function that returns table
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-01'::date;
END;
$$;
SELECT worker_create_or_replace_object('CREATE OR REPLACE FUNCTION func_with_return_table(int) RETURNS TABLE (date date) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ''2011-01-01''::date; END; $$;');
worker_create_or_replace_object
---------------------------------------------------------------------
t
(1 row)
-- verify that a backup function is created
SELECT COUNT(*)=2 FROM pg_proc WHERE proname LIKE 'func_with_return_table%';
?column?
---------------------------------------------------------------------
t
(1 row)
-- hide cascades -- hide cascades
SET client_min_messages TO error; SET client_min_messages TO error;
DROP SCHEMA proc_conflict CASCADE; DROP SCHEMA proc_conflict CASCADE;

View File

@ -44,7 +44,8 @@ SELECT citus.mitmproxy('conn.delay(500)');
(1 row) (1 row)
ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no);
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms WARNING: could not establish connection after 400 ms
ERROR: connection to the remote node localhost:xxxxx failed
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -140,14 +141,6 @@ SELECT count(*) FROM products;
0 0
(1 row) (1 row)
-- use OFFSET 1 to prevent printing the line where source
-- is the worker, and LIMIT 1 in case there were multiple connections
SELECT citus.dump_network_traffic() ORDER BY 1 LIMIT 1 OFFSET 1;
dump_network_traffic
---------------------------------------------------------------------
(1,coordinator,"[initial message]")
(1 row)
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -190,7 +183,7 @@ FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
shardstate = 3 AND shardstate = 3 AND
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
invalid_placement_count invalid_placement_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -202,7 +195,7 @@ SELECT citus.mitmproxy('conn.delay(500)');
(1 row) (1 row)
INSERT INTO products VALUES (100, '100', 100); INSERT INTO single_replicatated VALUES (100);
ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms
COMMIT; COMMIT;
SELECT SELECT
@ -211,14 +204,20 @@ FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
shardstate = 3 AND shardstate = 3 AND
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
invalid_placement_count invalid_placement_count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
(1 row) (1 row)
-- show that INSERT failed -- show that INSERT failed
SELECT count(*) FROM products WHERE product_no = 100; SELECT citus.mitmproxy('conn.allow()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT count(*) FROM single_replicatated WHERE key = 100;
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
0 0
@ -227,8 +226,6 @@ SELECT count(*) FROM products WHERE product_no = 100;
RESET client_min_messages; RESET client_min_messages;
-- verify get_global_active_transactions works when a timeout happens on a connection -- verify get_global_active_transactions works when a timeout happens on a connection
SELECT get_global_active_transactions(); SELECT get_global_active_transactions();
WARNING: could not establish connection after 400 ms
WARNING: connection to the remote node localhost:xxxxx failed
get_global_active_transactions get_global_active_transactions
--------------------------------------------------------------------- ---------------------------------------------------------------------
(0 rows) (0 rows)
@ -314,6 +311,121 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
f f
(1 row) (1 row)
-- tests for citus_check_cluster_node_health
-- kill all connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 9060 | localhost | 9060 |
localhost | 9060 | localhost | 57637 |
localhost | 57637 | localhost | 9060 | t
localhost | 57637 | localhost | 57637 | t
(4 rows)
-- suggested summary queries for connectivity checks
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
bool_and
---------------------------------------------------------------------
f
(1 row)
SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1;
result | count
---------------------------------------------------------------------
t | 2
| 2
(2 rows)
-- cancel all connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
ERROR: canceling statement due to user request
-- kill all but first connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 9060 | localhost | 9060 | t
localhost | 9060 | localhost | 57637 |
localhost | 57637 | localhost | 9060 | t
localhost | 57637 | localhost | 57637 | t
(4 rows)
-- cancel all but first connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
ERROR: canceling statement due to user request
-- kill all connections to this node
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 9060 | localhost | 9060 |
localhost | 9060 | localhost | 57637 |
localhost | 57637 | localhost | 9060 | f
localhost | 57637 | localhost | 57637 | t
(4 rows)
-- cancel all connections to this node
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
ERROR: canceling statement due to user request
-- kill connection checks to this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 9060 | localhost | 9060 | f
localhost | 9060 | localhost | 57637 | t
localhost | 57637 | localhost | 9060 | f
localhost | 57637 | localhost | 57637 | t
(4 rows)
-- cancel connection checks to this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
mitmproxy
---------------------------------------------------------------------
(1 row)
SELECT * FROM citus_check_cluster_node_health();
ERROR: canceling statement due to user request
RESET client_min_messages; RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy

View File

@ -282,7 +282,6 @@ SELECT citus.mitmproxy('conn.kill()');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
CONTEXT: COPY test_table_2, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -36,7 +36,6 @@ SELECT citus.mitmproxy('conn.kill()');
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
ERROR: failure on connection marked as essential: localhost:xxxxx ERROR: failure on connection marked as essential: localhost:xxxxx
CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -64,7 +63,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
ERROR: failure on connection marked as essential: localhost:xxxxx ERROR: failure on connection marked as essential: localhost:xxxxx
CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -92,7 +90,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
ERROR: canceling statement due to user request ERROR: canceling statement due to user request
CONTEXT: COPY test_table, line 1: "1,2"
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -38,7 +38,7 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
-- verify index is not created -- verify index is not created
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;
nodename | nodeport | success | result nodename | nodeport | success | result
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -153,7 +153,7 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row) (1 row)
-- verify index is not dropped at worker 2 -- verify index is not dropped at worker 2
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;
nodename | nodeport | success | result nodename | nodeport | success | result
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -186,7 +186,7 @@ NOTICE: drop cascades to 2 other objects
DETAIL: drop cascades to table index_schema.index_test DETAIL: drop cascades to table index_schema.index_test
drop cascades to table index_schema.index_test_2 drop cascades to table index_schema.index_test_2
-- verify index is not at worker 2 upon cleanup -- verify index is not at worker 2 upon cleanup
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;
nodename | nodeport | success | result nodename | nodeport | success | result
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -9,7 +9,7 @@ SET search_path TO 'ddl_failure';
-- do not cache any connections -- do not cache any connections
SET citus.max_cached_conns_per_worker TO 0; SET citus.max_cached_conns_per_worker TO 0;
-- we don't want to see the prepared transaction numbers in the warnings -- we don't want to see the prepared transaction numbers in the warnings
SET client_min_messages TO ERROR; SET client_min_messages TO WARNING;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -69,9 +69,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
ALTER TABLE test_table ADD COLUMN new_column INT; ALTER TABLE test_table ADD COLUMN new_column INT;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly WARNING: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
connection not open
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass;
array_agg array_agg
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -153,6 +156,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pi
(1 row) (1 row)
ALTER TABLE test_table ADD COLUMN new_column INT; ALTER TABLE test_table ADD COLUMN new_column INT;
WARNING:
CONTEXT: while executing command on localhost:xxxxx
WARNING: failed to commit transaction on localhost:xxxxx
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -365,9 +371,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
ALTER TABLE test_table DROP COLUMN new_column; ALTER TABLE test_table DROP COLUMN new_column;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass;
array_agg array_agg
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -735,9 +739,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
ALTER TABLE test_table ADD COLUMN new_column INT; ALTER TABLE test_table ADD COLUMN new_column INT;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass;
array_agg array_agg
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1030,9 +1032,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
ALTER TABLE test_table ADD COLUMN new_column INT; ALTER TABLE test_table ADD COLUMN new_column INT;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass;
array_agg array_agg
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -86,7 +86,7 @@ CREATE TABLE distributed_result_info AS
SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex
FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table')
NATURAL JOIN pg_dist_node; NATURAL JOIN pg_dist_node;
DEBUG: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly
This probably means the server terminated abnormally This probably means the server terminated abnormally
before or while processing the request. before or while processing the request.
SELECT * FROM distributed_result_info ORDER BY resultId; SELECT * FROM distributed_result_info ORDER BY resultId;

View File

@ -21,7 +21,14 @@ SELECT create_distributed_table('t1', 'id');
(1 row) (1 row)
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
-- Initial metadata status -- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
NOTICE: dropping metadata on the node (localhost,9060)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
hasmetadata hasmetadata
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -365,6 +372,13 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
f f
(1 row) (1 row)
-- turn metadata sync back on
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SET SEARCH_PATH = mx_metadata_sync; SET SEARCH_PATH = mx_metadata_sync;
DROP TABLE t1; DROP TABLE t1;
DROP TABLE t2; DROP TABLE t2;

View File

@ -100,9 +100,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
TRUNCATE test_table; TRUNCATE test_table;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -649,9 +647,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
TRUNCATE test_table; TRUNCATE test_table;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -1027,9 +1023,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R
(1 row) (1 row)
TRUNCATE test_table; TRUNCATE test_table;
ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly ERROR: failure on connection marked as essential: localhost:xxxxx
This probably means the server terminated abnormally
before or while processing the request.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
mitmproxy mitmproxy
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -8,6 +8,7 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 93630500; SET citus.next_shard_id TO 93630500;
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1

View File

@ -629,6 +629,45 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port);
(1 row) (1 row)
\c - - - :master_port \c - - - :master_port
SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4;
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 57637 | localhost | 57637 | t
localhost | 57637 | localhost | 57638 | t
localhost | 57638 | localhost | 57637 | t
localhost | 57638 | localhost | 57638 | t
(4 rows)
-- test cluster connectivity when we have broken nodes
SET client_min_messages TO ERROR;
SET citus.node_connection_timeout TO 10;
BEGIN;
INSERT INTO pg_dist_node VALUES
(123456789, 123456789, 'localhost', 123456789),
(1234567890, 1234567890, 'www.citusdata.com', 5432);
SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4;
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 57637 | localhost | 123456789 | f
localhost | 57637 | www.citusdata.com | 5432 | f
localhost | 57638 | localhost | 123456789 | f
localhost | 57638 | www.citusdata.com | 5432 | f
localhost | 57637 | localhost | 57637 | t
localhost | 57637 | localhost | 57638 | t
localhost | 57638 | localhost | 57637 | t
localhost | 57638 | localhost | 57638 | t
localhost | 123456789 | localhost | 57637 |
localhost | 123456789 | localhost | 57638 |
localhost | 123456789 | localhost | 123456789 |
localhost | 123456789 | www.citusdata.com | 5432 |
www.citusdata.com | 5432 | localhost | 57637 |
www.citusdata.com | 5432 | localhost | 57638 |
www.citusdata.com | 5432 | localhost | 123456789 |
www.citusdata.com | 5432 | www.citusdata.com | 5432 |
(16 rows)
ROLLBACK;
RESET citus.node_connection_timeout;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA tools CASCADE; DROP SCHEMA tools CASCADE;
RESET SEARCH_PATH; RESET SEARCH_PATH;

View File

@ -493,6 +493,7 @@ SELECT * FROM multi_extension.print_extension_changes();
ALTER EXTENSION citus UPDATE TO '9.5-1'; ALTER EXTENSION citus UPDATE TO '9.5-1';
BEGIN; BEGIN;
SELECT master_add_node('localhost', :master_port, groupId=>0); SELECT master_add_node('localhost', :master_port, groupId=>0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
master_add_node master_add_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -972,10 +973,11 @@ SELECT * FROM multi_extension.print_extension_changes();
function master_append_table_to_shard(bigint,text,text,integer) real | function master_append_table_to_shard(bigint,text,text,integer) real |
function master_apply_delete_command(text) integer | function master_apply_delete_command(text) integer |
function master_get_table_metadata(text) record | function master_get_table_metadata(text) record |
| function citus_check_cluster_node_health() SETOF record
| function citus_check_connection_to_node(text,integer) boolean | function citus_check_connection_to_node(text,integer) boolean
| function citus_disable_node(text,integer,boolean) void | function citus_disable_node(text,integer,boolean) void
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void
(7 rows) (8 rows)
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
-- show running version -- show running version
@ -1222,6 +1224,9 @@ HINT: You can manually create a database and its extensions on workers.
\c - - - :master_port \c - - - :master_port
\c another \c another
CREATE EXTENSION citus; CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :master_port
SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node
SELECT FROM master_add_node('localhost', :worker_1_port); SELECT FROM master_add_node('localhost', :worker_1_port);
WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker
@ -1230,7 +1235,6 @@ DETAIL: distributed objects are only kept in sync when citus.enable_object_prop
(1 row) (1 row)
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE EXTENSION citus;
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
RENAME TO dummy_assign_function; RENAME TO dummy_assign_function;
\c - - - :master_port \c - - - :master_port

View File

@ -16,6 +16,7 @@ SELECT create_distributed_table('the_table', 'a');
(1 row) (1 row)
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
@ -281,11 +282,14 @@ DELETE FROM the_table;
ERROR: cannot assign TransactionIds during recovery ERROR: cannot assign TransactionIds during recovery
-- DDL is not possible -- DDL is not possible
TRUNCATE the_table; TRUNCATE the_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
TRUNCATE reference_table; TRUNCATE reference_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
TRUNCATE citus_local_table; TRUNCATE citus_local_table;
ERROR: cannot execute TRUNCATE TABLE in a read-only transaction ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.
ALTER TABLE the_table ADD COLUMN c int; ALTER TABLE the_table ADD COLUMN c int;
ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress
HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery.

View File

@ -159,6 +159,16 @@ SELECT * FROM the_table;
1 | 2 1 | 2
(2 rows) (2 rows)
-- Check for connectivity in the cluster
SELECT * FROM citus_check_cluster_node_health();
from_nodename | from_nodeport | to_nodename | to_nodeport | result
---------------------------------------------------------------------
localhost | 9071 | localhost | 9071 | t
localhost | 9071 | localhost | 9072 | t
localhost | 9072 | localhost | 9071 | t
localhost | 9072 | localhost | 9072 | t
(4 rows)
-- clean up after ourselves -- clean up after ourselves
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
DROP TABLE the_table; DROP TABLE the_table;

View File

@ -255,16 +255,11 @@ DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
ERROR: cannot drop multiple distributed objects in a single command ERROR: cannot drop multiple distributed objects in a single command
HINT: Try dropping each object in a separate DROP command. HINT: Try dropping each object in a separate DROP command.
-- Verify that we can succesfully drop indexes -- Verify that we can succesfully drop indexes
DROP INDEX lineitem_orderkey_index;
DROP INDEX lineitem_orderkey_index_new;
DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partkey_desc_index;
DROP INDEX lineitem_partial_index; DROP INDEX lineitem_partial_index;
DROP INDEX lineitem_colref_index; DROP INDEX lineitem_colref_index;
-- Verify that we can drop distributed indexes with local indexes
CREATE TABLE local_table(a int, b int);
CREATE INDEX local_index ON local_table(a);
CREATE INDEX local_index2 ON local_table(b);
DROP INDEX lineitem_orderkey_index, local_index;
DROP INDEX IF EXISTS lineitem_orderkey_index_new, local_index2, non_existing_index;
NOTICE: index "non_existing_index" does not exist, skipping
-- Verify that we handle if exists statements correctly -- Verify that we handle if exists statements correctly
DROP INDEX non_existent_index; DROP INDEX non_existent_index;
ERROR: index "non_existent_index" does not exist ERROR: index "non_existent_index" does not exist
@ -301,14 +296,15 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
(5 rows) (5 rows)
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; SET citus.override_table_visibility TO FALSE;
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem%\d' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
indrelid | indexrelid indrelid | indexrelid
--------------------------------------------------------------------- ---------------------------------------------------------------------
lineitem_360000 | lineitem_l_orderkey_idx_360000 lineitem_360000 | lineitem_l_orderkey_idx_360000
lineitem_360000 | lineitem_l_shipdate_idx_360000 lineitem_360000 | lineitem_l_shipdate_idx_360000
(2 rows) (2 rows)
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname;
schemaname | tablename | indexname | tablespace | indexdef schemaname | tablename | indexname | tablespace | indexdef
--------------------------------------------------------------------- ---------------------------------------------------------------------
multi_index_statements | index_test_hash_102082 | index_test_hash_a_idx_102082 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102082 ON multi_index_statements.index_test_hash_102082 USING btree (a) multi_index_statements | index_test_hash_102082 | index_test_hash_a_idx_102082 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102082 ON multi_index_statements.index_test_hash_102082 USING btree (a)
@ -413,6 +409,10 @@ CREATE INDEX ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creat
ON test_index_creation1 USING btree ON test_index_creation1 USING btree
(tenant_id, timeperiod); (tenant_id, timeperiod);
NOTICE: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c" NOTICE: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c"
DEBUG: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c"
DETAIL: from localhost:xxxxx
DEBUG: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c"
DETAIL: from localhost:xxxxx
RESET client_min_messages; RESET client_min_messages;
CREATE TABLE test_index_creation1_p2020_09_26 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00'); CREATE TABLE test_index_creation1_p2020_09_26 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00');
CREATE TABLE test_index_creation1_p2020_09_27 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00'); CREATE TABLE test_index_creation1_p2020_09_27 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00');

View File

@ -1,9 +1,5 @@
CREATE SCHEMA mx_add_coordinator; CREATE SCHEMA mx_add_coordinator;
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
CREATE USER reprefuser WITH LOGIN; CREATE USER reprefuser WITH LOGIN;
SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN');
@ -16,12 +12,43 @@ SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN');
SET citus.enable_alter_role_propagation TO ON; SET citus.enable_alter_role_propagation TO ON;
-- alter role for other than the extension owner works in enterprise, output differs accordingly -- alter role for other than the extension owner works in enterprise, output differs accordingly
ALTER ROLE reprefuser WITH CREATEDB; ALTER ROLE reprefuser WITH CREATEDB;
-- check connectivity in the cluster
-- verify that we test for 4 node pairs before we add coordinator to metadata
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
bool_and | count
---------------------------------------------------------------------
t | 4
(1 row)
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column? ?column?
--------------------------------------------------------------------- ---------------------------------------------------------------------
1 1
(1 row) (1 row)
-- verify that we test for 9 node pairs after we add coordinator to metadata
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
bool_and | count
---------------------------------------------------------------------
t | 9
(1 row)
-- test that we can test for connectivity when connected to worker nodes as well
\c - - - :worker_1_port
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
bool_and | count
---------------------------------------------------------------------
t | 9
(1 row)
\c - - - :master_port
-- set the configs after reconnecting to coordinator
SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET client_min_messages TO WARNING;
-- test that coordinator pg_dist_node entry is synced to the workers -- test that coordinator pg_dist_node entry is synced to the workers
SELECT wait_until_metadata_sync(30000); SELECT wait_until_metadata_sync(30000);
wait_until_metadata_sync wait_until_metadata_sync

View File

@ -669,7 +669,7 @@ ORDER BY shardid, nodeport;
(0 rows) (0 rows)
-- verify constraints have been created on the new node -- verify constraints have been created on the new node
SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';'); SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname similar to ''ref_table%\d'';');
run_command_on_workers run_command_on_workers
--------------------------------------------------------------------- ---------------------------------------------------------------------
(localhost,57637,t,2) (localhost,57637,t,2)
@ -950,12 +950,6 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
(1 row) (1 row)
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT master_copy_shard_placement( SELECT master_copy_shard_placement(
:ref_table_shard, :ref_table_shard,
'localhost', :worker_1_port, 'localhost', :worker_1_port,
@ -1027,13 +1021,6 @@ WHERE ref_table.a = dist_table.a;
\c - - - :master_port \c - - - :master_port
SET search_path TO replicate_reference_table; SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- --
-- The following case used to get stuck on create_distributed_table() instead -- The following case used to get stuck on create_distributed_table() instead
-- of detecting the distributed deadlock. -- of detecting the distributed deadlock.

View File

@ -45,12 +45,6 @@ ORDER BY logicalrelid;
mx_table_2 | s | 150000 mx_table_2 | s | 150000
(2 rows) (2 rows)
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
INSERT INTO mx_ref_table VALUES (-37, 'morbi'); INSERT INTO mx_ref_table VALUES (-37, 'morbi');
INSERT INTO mx_ref_table VALUES (-78, 'sapien'); INSERT INTO mx_ref_table VALUES (-78, 'sapien');
@ -232,7 +226,7 @@ HINT: Connect to the coordinator and run it again.
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port;
hasmetadata hasmetadata
--------------------------------------------------------------------- ---------------------------------------------------------------------
f t
(1 row) (1 row)
-- stop_metadata_sync_to_node -- stop_metadata_sync_to_node
@ -351,20 +345,5 @@ HINT: Connect to the coordinator and run it again.
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_table; DROP TABLE mx_table;
DROP TABLE mx_table_2; DROP TABLE mx_table_2;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
\c - - - :worker_1_port
DELETE FROM pg_dist_node;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx\_%table%';
worker_drop_distributed_table
---------------------------------------------------------------------
(0 rows)
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;

View File

@ -7,19 +7,6 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
1 1
(1 row) (1 row)
-- sync the metadata to both nodes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- create a role and give access one each node separately -- create a role and give access one each node separately
-- and increase the error level to prevent enterprise to diverge -- and increase the error level to prevent enterprise to diverge
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
@ -688,6 +675,19 @@ NOTICE: dropping metadata on the node (localhost,57638)
(1 row) (1 row)
-- finally sync metadata again so it doesn't break later tests
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
DROP SCHEMA "Mx Regular User" CASCADE; DROP SCHEMA "Mx Regular User" CASCADE;
NOTICE: drop cascades to 10 other objects NOTICE: drop cascades to 10 other objects
DETAIL: drop cascades to table "Mx Regular User".partitioned_table DETAIL: drop cascades to table "Mx Regular User".partitioned_table

View File

@ -38,6 +38,7 @@ WHERE stxnamespace IN (
FROM pg_namespace FROM pg_namespace
WHERE nspname IN ('statistics''TestTarget') WHERE nspname IN ('statistics''TestTarget')
) )
AND stxname SIMILAR TO '%\_\d+'
ORDER BY stxstattarget, stxrelid::regclass ASC; ORDER BY stxstattarget, stxrelid::regclass ASC;
stxstattarget | stxrelid stxstattarget | stxrelid
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -1221,5 +1221,54 @@ select * from nummultirange_test natural join nummultirange_test2 order by nmr;
{[1.1,2.2)} {[1.1,2.2)}
(7 rows) (7 rows)
-- verify that recreating distributed procedures with OUT param gets propagated to workers
CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 1;
$$;
-- this should error out
SELECT create_distributed_function('proc_with_out_param(date,int)');
ERROR: function "proc_with_out_param(date,int)" does not exist
-- this should work
SELECT create_distributed_function('proc_with_out_param(date)');
create_distributed_function
---------------------------------------------------------------------
(1 row)
SET client_min_messages TO ERROR;
CREATE ROLE r1;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1;
SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1;$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
RESET client_min_messages;
CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 2;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'proc_with_out_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'proc_with_out_param')
as test;
count
---------------------------------------------------------------------
1
(1 row)
set client_min_messages to error; set client_min_messages to error;
drop schema pg14 cascade; drop schema pg14 cascade;

View File

@ -19,9 +19,94 @@ SELECT current_setting('enable_hashagg') FROM test WHERE id = 1;
on on
(1 row) (1 row)
-- should not be propagated, error should be coming from coordinator -- triggers an error on the worker
SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ; SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ;
ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query WARNING: SET TRANSACTION ISOLATION LEVEL must be called before any query
CONTEXT: while executing command on localhost:xxxxx
ERROR: failure on connection marked as essential: localhost:xxxxx
END;
BEGIN;
SET TRANSACTION READ ONLY;
-- should fail after setting transaction to read only
INSERT INTO test VALUES (2,2);
ERROR: cannot execute INSERT in a read-only transaction
END;
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
-- should reflect new isolation level
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
BEGIN;
SET TRANSACTION READ ONLY;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
BEGIN;
SET LOCAL transaction_read_only TO on;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
BEGIN;
SET TRANSACTION READ ONLY;
SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
SELECT current_setting('transaction_isolation') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
repeatable read
(1 row)
END;
BEGIN;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SAVEPOINT goback;
SET TRANSACTION READ ONLY;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
on
(1 row)
ROLLBACK TO SAVEPOINT goback;
SELECT current_setting('transaction_read_only') FROM test WHERE id = 1;
current_setting
---------------------------------------------------------------------
off
(1 row)
END; END;
BEGIN; BEGIN;
-- set session commands are not propagated -- set session commands are not propagated

View File

@ -108,6 +108,7 @@ WHERE stxnamespace IN (
FROM pg_namespace FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
) )
AND stxname SIMILAR TO '%\_\d+'
ORDER BY stxname ASC; ORDER BY stxname ASC;
stxname stxname
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -215,7 +216,8 @@ WHERE stxnamespace IN (
SELECT oid SELECT oid
FROM pg_namespace FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
); )
AND stxname SIMILAR TO '%\_\d+';
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3
@ -227,7 +229,8 @@ WHERE stxnamespace IN (
SELECT oid SELECT oid
FROM pg_namespace FROM pg_namespace
WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2')
); )
AND stxname SIMILAR TO '%\_\d+';
count count
--------------------------------------------------------------------- ---------------------------------------------------------------------
3 3

View File

@ -506,6 +506,7 @@ SELECT master_remove_node('localhost', :master_port);
SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset
NOTICE: Replicating reference table "squares" to the node localhost:xxxxx NOTICE: Replicating reference table "squares" to the node localhost:xxxxx
NOTICE: Replicating reference table "numbers" to the node localhost:xxxxx NOTICE: Replicating reference table "numbers" to the node localhost:xxxxx
NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata
-- clean-up -- clean-up
SET client_min_messages TO ERROR; SET client_min_messages TO ERROR;
DROP SCHEMA replicate_ref_to_coordinator CASCADE; DROP SCHEMA replicate_ref_to_coordinator CASCADE;

View File

@ -267,6 +267,68 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER
5 | 6 5 | 6
(5 rows) (5 rows)
-- show that we can filter remote commands
-- given that citus.grep_remote_commands, we log all commands
SET citus.log_local_commands to true;
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
0
(1 row)
-- grep matches all commands
SET citus.grep_remote_commands TO "%%";
SELECT count(*) FROM public.another_schema_table WHERE a = 1;
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1)
count
---------------------------------------------------------------------
0
(1 row)
-- only filter a specific shard for the local execution
BEGIN;
SET LOCAL citus.grep_remote_commands TO "%90630515%";
SELECT count(*) FROM public.another_schema_table;
NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true
count
---------------------------------------------------------------------
0
(1 row)
-- match nothing
SET LOCAL citus.grep_remote_commands TO '%nothing%';
SELECT count(*) FROM public.another_schema_table;
count
---------------------------------------------------------------------
0
(1 row)
COMMIT;
-- only filter a specific shard for the remote execution
BEGIN;
SET LOCAL citus.enable_local_execution TO FALSE;
SET LOCAL citus.grep_remote_commands TO '%90630515%';
SET LOCAL citus.log_remote_commands TO ON;
SELECT count(*) FROM public.another_schema_table;
NOTICE: issuing SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true
DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx
count
---------------------------------------------------------------------
0
(1 row)
-- match nothing
SET LOCAL citus.grep_remote_commands TO '%nothing%';
SELECT count(*) FROM public.another_schema_table;
count
---------------------------------------------------------------------
0
(1 row)
COMMIT;
RESET citus.log_local_commands;
RESET citus.grep_remote_commands;
-- Test upsert with constraint -- Test upsert with constraint
CREATE TABLE upsert_test CREATE TABLE upsert_test
( (

View File

@ -38,6 +38,7 @@ ORDER BY 1;
function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real)
function citus_add_secondary_node(text,integer,text,integer,name) function citus_add_secondary_node(text,integer,text,integer,name)
function citus_blocking_pids(integer) function citus_blocking_pids(integer)
function citus_check_cluster_node_health()
function citus_check_connection_to_node(text,integer) function citus_check_connection_to_node(text,integer)
function citus_cleanup_orphaned_shards() function citus_cleanup_orphaned_shards()
function citus_conninfo_cache_invalidate() function citus_conninfo_cache_invalidate()
@ -261,5 +262,5 @@ ORDER BY 1;
view citus_worker_stat_activity view citus_worker_stat_activity
view pg_dist_shard_placement view pg_dist_shard_placement
view time_partitions view time_partitions
(245 rows) (246 rows)

View File

@ -1,5 +1,4 @@
# import this file (from psql you can use \i) to use mitmproxy manually # import this file (from psql you can use \i) to use mitmproxy manually
test: turn_mx_off
test: failure_test_helpers test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it # this should only be run by pg_regress_multi, you don't need it

View File

@ -1,5 +1,4 @@
# import this file (from psql you can use \i) to use mitmproxy manually # import this file (from psql you can use \i) to use mitmproxy manually
test: turn_mx_off
test: failure_test_helpers test: failure_test_helpers
# this should only be run by pg_regress_multi, you don't need it # this should only be run by pg_regress_multi, you don't need it
@ -17,15 +16,18 @@ test: failure_add_disable_node
test: failure_copy_to_reference test: failure_copy_to_reference
test: failure_copy_on_hash test: failure_copy_on_hash
test: failure_create_reference_table test: failure_create_reference_table
test: check_mx
test: turn_mx_off
test: failure_create_distributed_table_non_empty test: failure_create_distributed_table_non_empty
test: failure_create_table test: failure_create_table
test: failure_single_select
test: turn_mx_on
test: failure_multi_shard_update_delete test: failure_multi_shard_update_delete
test: failure_cte_subquery test: failure_cte_subquery
test: failure_insert_select_via_coordinator test: failure_insert_select_via_coordinator
test: failure_multi_dml test: failure_multi_dml
test: failure_vacuum test: failure_vacuum
test: failure_single_select
test: failure_ref_tables test: failure_ref_tables
test: failure_insert_select_pushdown test: failure_insert_select_pushdown
test: failure_single_mod test: failure_single_mod
@ -45,3 +47,4 @@ test: ensure_no_intermediate_data_leak
# in the shared memory # in the shared memory
# -------- # --------
test: ensure_no_shared_connection_leak test: ensure_no_shared_connection_leak
test: check_mx

View File

@ -572,20 +572,12 @@ SET session_replication_role = DEFAULT;
-- disable test_user on the first worker -- disable test_user on the first worker
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
SET citus.enable_object_propagation TO off;
ALTER USER test_user WITH nologin; ALTER USER test_user WITH nologin;
\c - test_user - :master_port \c - test_user - :master_port
-- reissue copy, and it should fail -- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
4,4
5,5
6,6
7,7
8,8
\.
-- verify shards in the none of the workers as marked invalid -- verify shards in the none of the workers as marked invalid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
@ -594,9 +586,6 @@ SELECT shardid, shardstate, nodename, nodeport
-- try to insert into a reference table copy should fail -- try to insert into a reference table copy should fail
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
3,1
4,2
\.
-- verify shards for reference table are still valid -- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
@ -608,10 +597,6 @@ SELECT shardid, shardstate, nodename, nodeport
-- since it can not insert into either copies of a shard. shards are expected to -- since it can not insert into either copies of a shard. shards are expected to
-- stay valid since the operation is rolled back. -- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
1,1
2,2
3,3
\.
-- verify shards for numbers_hash_other are still valid -- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether -- since copy has failed altogether
@ -621,6 +606,7 @@ SELECT shardid, shardstate, nodename, nodeport
-- re-enable test_user on the first worker -- re-enable test_user on the first worker
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
SET citus.enable_object_propagation TO off;
ALTER USER test_user WITH login; ALTER USER test_user WITH login;
\c - test_user - :master_port \c - test_user - :master_port

View File

@ -1,4 +1,3 @@
test: turn_mx_off
test: multi_cluster_management test: multi_cluster_management
test: multi_test_helpers multi_test_helpers_superuser columnar_test_helpers test: multi_test_helpers multi_test_helpers_superuser columnar_test_helpers
test: multi_test_catalog_views test: multi_test_catalog_views

View File

@ -1,3 +1,2 @@
test: turn_mx_off
test: upgrade_basic_after test: upgrade_basic_after
test: upgrade_pg_dist_object_test_after test: upgrade_pg_dist_object_test_after

View File

@ -15,8 +15,8 @@
# --- # ---
# Tests around schema changes, these are run first, so there's no preexisting objects. # Tests around schema changes, these are run first, so there's no preexisting objects.
# --- # ---
test: turn_mx_off
test: multi_extension test: multi_extension
test: turn_mx_off
test: single_node test: single_node
test: single_node_truncate test: single_node_truncate
test: turn_mx_on test: turn_mx_on
@ -48,8 +48,6 @@ test: multi_read_from_secondaries
# ---------- # ----------
# multi_citus_tools tests utility functions written for citus tools # multi_citus_tools tests utility functions written for citus tools
# ---------- # ----------
test: check_mx
test: turn_mx_off
test: multi_citus_tools test: multi_citus_tools
# ---------- # ----------
@ -57,7 +55,6 @@ test: multi_citus_tools
# multi_remove_node_reference_table tests metadata changes after master_remove_node # multi_remove_node_reference_table tests metadata changes after master_remove_node
# ---------- # ----------
test: multi_replicate_reference_table test: multi_replicate_reference_table
test: turn_mx_on
test: multi_remove_node_reference_table test: multi_remove_node_reference_table
# ---------- # ----------
@ -103,11 +100,8 @@ test: multi_create_fdw
# ---------- # ----------
# Tests for statistics propagation # Tests for statistics propagation
# ---------- # ----------
test: check_mx
test: turn_mx_off
test: propagate_statistics test: propagate_statistics
test: pg13_propagate_statistics test: pg13_propagate_statistics
test: turn_mx_on
# ---------- # ----------
# Test for updating table statistics # Test for updating table statistics
@ -153,6 +147,7 @@ test: with_executors with_join with_partitioning with_transactions with_dml
# ---------- # ----------
# Tests around DDL statements run on distributed tables # Tests around DDL statements run on distributed tables
# ---------- # ----------
test: multi_index_statements
test: multi_alter_table_statements test: multi_alter_table_statements
test: multi_alter_table_add_constraints test: multi_alter_table_add_constraints
@ -196,11 +191,9 @@ test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactio
test: multi_modifying_xacts test: multi_modifying_xacts
test: check_mx test: check_mx
test: turn_mx_off test: turn_mx_off
test: multi_index_statements
test: multi_generate_ddl_commands multi_repair_shards test: multi_generate_ddl_commands multi_repair_shards
test: multi_create_shards test: multi_create_shards
test: multi_transaction_recovery test: multi_transaction_recovery
test: multi_copy
test: turn_mx_on test: turn_mx_on
test: local_dist_join_modifications test: local_dist_join_modifications
@ -212,7 +205,7 @@ test: citus_local_dist_joins
# multi_copy creates hash and range-partitioned tables and performs COPY # multi_copy creates hash and range-partitioned tables and performs COPY
# multi_router_planner creates hash partitioned tables. # multi_router_planner creates hash partitioned tables.
# --------- # ---------
test: fast_path_router_modify pg_dump test: multi_copy fast_path_router_modify pg_dump
test: multi_router_planner test: multi_router_planner
# These 2 tests have prepared statements which sometimes get invalidated by concurrent tests, # These 2 tests have prepared statements which sometimes get invalidated by concurrent tests,
# changing the debug output. We should not run them in parallel with others # changing the debug output. We should not run them in parallel with others
@ -224,8 +217,6 @@ test: multi_router_planner_fast_path
# ---------- # ----------
test: multi_large_shardid test: multi_large_shardid
test: check_mx
test: turn_mx_off
# ---------- # ----------
# multi_size_queries tests various size commands on distributed tables # multi_size_queries tests various size commands on distributed tables
# ---------- # ----------
@ -240,9 +231,11 @@ test: multi_drop_extension
# multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers # multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers
# multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers # multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers
# ---------- # ----------
test: check_mx
test: turn_mx_off
test: multi_metadata_sync test: multi_metadata_sync
test: multi_unsupported_worker_operations
test: turn_mx_on test: turn_mx_on
test: multi_unsupported_worker_operations
# ---------- # ----------
# grant_on_schema_propagation tests if the GRANT ... ON SCHEMA queries are propagated correctly # grant_on_schema_propagation tests if the GRANT ... ON SCHEMA queries are propagated correctly
@ -287,10 +280,10 @@ test: multi_foreign_key_relation_graph
# and rerun some of the tests. # and rerun some of the tests.
# -------- # --------
test: check_mx test: check_mx
test: turn_mx_off
test: add_coordinator test: add_coordinator
test: foreign_key_to_reference_table test: foreign_key_to_reference_table
test: replicate_reference_tables_to_coordinator test: replicate_reference_tables_to_coordinator
test: turn_mx_off
test: citus_local_tables test: citus_local_tables
test: multi_row_router_insert mixed_relkind_tests test: multi_row_router_insert mixed_relkind_tests
test: turn_mx_on test: turn_mx_on
@ -303,11 +296,8 @@ test: undistribute_table_cascade
test: create_citus_local_table_cascade test: create_citus_local_table_cascade
test: fkeys_between_local_ref test: fkeys_between_local_ref
test: auto_undist_citus_local test: auto_undist_citus_local
test: check_mx
test: turn_mx_off
test: mx_regular_user test: mx_regular_user
test: remove_coordinator test: remove_coordinator
test: turn_mx_on
# ---------- # ----------
# multi_transactional_drop_shards tests for dropping shards using connection API # multi_transactional_drop_shards tests for dropping shards using connection API

View File

@ -1,4 +1,3 @@
test: turn_mx_off
test: multi_follower_sanity_check test: multi_follower_sanity_check
test: follower_single_node test: follower_single_node
test: multi_follower_select_statements test: multi_follower_select_statements
@ -7,3 +6,4 @@ test: multi_follower_configure_followers
# test that no tests leaked intermediate results. This should always be last # test that no tests leaked intermediate results. This should always be last
test: ensure_no_intermediate_data_leak test: ensure_no_intermediate_data_leak
test: check_mx

View File

@ -13,10 +13,8 @@
# --- # ---
# Tests around schema changes, these are run first, so there's no preexisting objects. # Tests around schema changes, these are run first, so there's no preexisting objects.
# --- # ---
test: turn_mx_off
test: multi_extension test: multi_extension
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser
test: turn_mx_on
test: multi_mx_node_metadata test: multi_mx_node_metadata
test: multi_cluster_management test: multi_cluster_management
test: multi_mx_function_table_reference test: multi_mx_function_table_reference

View File

@ -1,7 +1,6 @@
# ---------- # ----------
# Only run few basic tests to set up a testing environment # Only run few basic tests to set up a testing environment
# ---------- # ----------
test: turn_mx_off
test: multi_cluster_management test: multi_cluster_management
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views test: multi_test_catalog_views

View File

@ -1,7 +1,6 @@
# ---------- # ----------
# Only run few basic tests to set up a testing environment # Only run few basic tests to set up a testing environment
# ---------- # ----------
test: turn_mx_off
test: multi_cluster_management test: multi_cluster_management
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser
test: multi_test_catalog_views test: multi_test_catalog_views

View File

@ -737,12 +737,12 @@ UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port+10 WHERE shardid =
SET session_replication_role = DEFAULT; SET session_replication_role = DEFAULT;
-- disable test_user on the first worker -- disable test_user on the first worker
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
SET citus.enable_object_propagation TO off;
ALTER USER test_user WITH nologin; ALTER USER test_user WITH nologin;
\c - test_user - :master_port \c - test_user - :master_port
-- reissue copy, and it should fail -- reissue copy, and it should fail
COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash FROM STDIN WITH (FORMAT 'csv');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash, line 1: "1,1"
-- verify shards in the none of the workers as marked invalid -- verify shards in the none of the workers as marked invalid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid) FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
@ -762,7 +762,6 @@ SELECT shardid, shardstate, nodename, nodeport
-- try to insert into a reference table copy should fail -- try to insert into a reference table copy should fail
COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); COPY numbers_reference FROM STDIN WITH (FORMAT 'csv');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_reference, line 1: "3,1"
-- verify shards for reference table are still valid -- verify shards for reference table are still valid
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
FROM pg_dist_shard_placement join pg_dist_shard using(shardid) FROM pg_dist_shard_placement join pg_dist_shard using(shardid)
@ -778,7 +777,6 @@ SELECT shardid, shardstate, nodename, nodeport
-- stay valid since the operation is rolled back. -- stay valid since the operation is rolled back.
COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv');
ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in
CONTEXT: COPY numbers_hash_other, line 1: "1,1"
-- verify shards for numbers_hash_other are still valid -- verify shards for numbers_hash_other are still valid
-- since copy has failed altogether -- since copy has failed altogether
SELECT shardid, shardstate, nodename, nodeport SELECT shardid, shardstate, nodename, nodeport
@ -798,6 +796,7 @@ SELECT shardid, shardstate, nodename, nodeport
-- re-enable test_user on the first worker -- re-enable test_user on the first worker
\c - :default_user - :worker_1_port \c - :default_user - :worker_1_port
SET citus.enable_object_propagation TO off;
ALTER USER test_user WITH login; ALTER USER test_user WITH login;
\c - test_user - :master_port \c - test_user - :master_port
DROP TABLE numbers_hash; DROP TABLE numbers_hash;

View File

@ -0,0 +1 @@
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();

View File

@ -482,6 +482,204 @@ SELECT * FROM test ORDER BY id;
DROP TABLE test; DROP TABLE test;
-- verify that recreating distributed functions with TABLE params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-01'::date;
END;
$$;
SELECT create_distributed_function('func_with_return_table(int)');
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-02'::date;
END;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_return_table';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_return_table')
as test;
-- verify that recreating distributed functions with OUT params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int)
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
SELECT create_distributed_function('func_with_out_param(int)');
SET client_min_messages TO ERROR;
CREATE ROLE r1;
SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$);
GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;
SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;$$);
RESET client_min_messages;
CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int)
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'func_with_out_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'func_with_out_param')
as test;
-- verify that recreating distributed functions with INOUT params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int)
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
-- this should error out
SELECT create_distributed_function('func_with_inout_param(int)');
-- this should work
SELECT create_distributed_function('func_with_inout_param(int,int)');
CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int)
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_inout_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_inout_param')
as test;
-- verify that recreating distributed functions with VARIADIC params gets propagated to workers
CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[])
RETURNS int
LANGUAGE sql AS $$ select 1; $$;
-- this should work
SELECT create_distributed_function('func_with_variadic_param(int,int[])');
CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[])
RETURNS int
LANGUAGE sql AS $$ select 2; $$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_variadic_param')
as test;
-- verify that recreating distributed functions returning setof records gets propagated to workers
CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval)
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 1;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT create_distributed_function('func_returning_setof_int(date,interval)');
CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval)
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 2;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int')
as test;
-- verify that recreating distributed functions with variadic param returning setof records gets propagated to workers
CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 1;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT create_distributed_function('func_returning_setof_int_with_variadic_param(date,int[])');
CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
RETURNS SETOF integer AS
$BODY$
BEGIN
RETURN QUERY
SELECT 2;
END;
$BODY$
LANGUAGE plpgsql VOLATILE
COST 100;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int_with_variadic_param')
as test;
-- verify that recreating distributed procedures with out params gets propagated to workers
CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
LANGUAGE SQL
AS $$
SELECT 1;
$$;
-- this should error out
SELECT create_distributed_function('proc_with_variadic_param(date)');
-- this should work
SELECT create_distributed_function('proc_with_variadic_param(date,int[])');
CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[])
LANGUAGE SQL
AS $$
SELECT 2;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_variadic_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_variadic_param')
as test;
-- verify that recreating distributed procedures with INOUT param gets propagated to workers
CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 1;
$$;
-- this should error out
SELECT create_distributed_function('proc_with_inout_param(date)');
-- this should work
SELECT create_distributed_function('proc_with_inout_param(date,int)');
CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int)
LANGUAGE SQL
AS $$
SELECT 2;
$$;
SELECT count(*) FROM
(SELECT result FROM
run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_inout_param';$$)
UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_inout_param')
as test;
SET client_min_messages TO error; -- suppress cascading objects dropping SET client_min_messages TO error; -- suppress cascading objects dropping
DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests CASCADE;
DROP SCHEMA function_tests2 CASCADE; DROP SCHEMA function_tests2 CASCADE;

View File

@ -115,6 +115,20 @@ $$ LANGUAGE plpgsql STRICT IMMUTABLE;
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)'); SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)'); SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)');
-- test worker_create_or_replace_object with a function that returns table
CREATE OR REPLACE FUNCTION func_with_return_table(int)
RETURNS TABLE (date date)
LANGUAGE plpgsql AS $$
BEGIN
RETURN query SELECT '2011-01-01'::date;
END;
$$;
SELECT worker_create_or_replace_object('CREATE OR REPLACE FUNCTION func_with_return_table(int) RETURNS TABLE (date date) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ''2011-01-01''::date; END; $$;');
-- verify that a backup function is created
SELECT COUNT(*)=2 FROM pg_proc WHERE proname LIKE 'func_with_return_table%';
-- hide cascades -- hide cascades
SET client_min_messages TO error; SET client_min_messages TO error;
DROP SCHEMA proc_conflict CASCADE; DROP SCHEMA proc_conflict CASCADE;

View File

@ -80,10 +80,6 @@ SELECT citus.mitmproxy('conn.delay(500)');
SELECT count(*) FROM products; SELECT count(*) FROM products;
SELECT count(*) FROM products; SELECT count(*) FROM products;
-- use OFFSET 1 to prevent printing the line where source
-- is the worker, and LIMIT 1 in case there were multiple connections
SELECT citus.dump_network_traffic() ORDER BY 1 LIMIT 1 OFFSET 1;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
CREATE TABLE single_replicatated(key int); CREATE TABLE single_replicatated(key int);
@ -108,9 +104,9 @@ FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
shardstate = 3 AND shardstate = 3 AND
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
SELECT citus.mitmproxy('conn.delay(500)'); SELECT citus.mitmproxy('conn.delay(500)');
INSERT INTO products VALUES (100, '100', 100); INSERT INTO single_replicatated VALUES (100);
COMMIT; COMMIT;
SELECT SELECT
count(*) as invalid_placement_count count(*) as invalid_placement_count
@ -118,10 +114,11 @@ FROM
pg_dist_shard_placement pg_dist_shard_placement
WHERE WHERE
shardstate = 3 AND shardstate = 3 AND
shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass);
-- show that INSERT failed -- show that INSERT failed
SELECT count(*) FROM products WHERE product_no = 100; SELECT citus.mitmproxy('conn.allow()');
SELECT count(*) FROM single_replicatated WHERE key = 100;
RESET client_min_messages; RESET client_min_messages;
@ -160,6 +157,45 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
SELECT citus.mitmproxy('conn.delay(500)'); SELECT citus.mitmproxy('conn.delay(500)');
SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port);
-- tests for citus_check_cluster_node_health
-- kill all connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()');
SELECT * FROM citus_check_cluster_node_health();
-- suggested summary queries for connectivity checks
SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health();
SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1;
-- cancel all connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_cluster_node_health();
-- kill all but first connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()');
SELECT * FROM citus_check_cluster_node_health();
-- cancel all but first connectivity checks that originate from this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_cluster_node_health();
-- kill all connections to this node
SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()');
SELECT * FROM citus_check_cluster_node_health();
-- cancel all connections to this node
SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_cluster_node_health();
-- kill connection checks to this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()');
SELECT * FROM citus_check_cluster_node_health();
-- cancel connection checks to this node
SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')');
SELECT * FROM citus_check_cluster_node_health();
RESET client_min_messages; RESET client_min_messages;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SET citus.node_connection_timeout TO DEFAULT; SET citus.node_connection_timeout TO DEFAULT;

View File

@ -142,14 +142,6 @@ SELECT create_distributed_table('test_table_2','id');
SELECT citus.mitmproxy('conn.kill()'); SELECT citus.mitmproxy('conn.kill()');
\COPY test_table_2 FROM stdin delimiter ','; \COPY test_table_2 FROM stdin delimiter ',';
1,2
3,4
6,7
8,9
9,10
11,12
13,14
\.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate

View File

@ -25,10 +25,6 @@ CREATE VIEW unhealthy_shard_count AS
-- response we get from the worker -- response we get from the worker
SELECT citus.mitmproxy('conn.kill()'); SELECT citus.mitmproxy('conn.kill()');
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
1,2
2,3
3,4
\.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM unhealthy_shard_count; SELECT * FROM unhealthy_shard_count;
SELECT count(*) FROM test_table; SELECT count(*) FROM test_table;
@ -36,10 +32,6 @@ SELECT count(*) FROM test_table;
-- kill as soon as the coordinator sends begin -- kill as soon as the coordinator sends begin
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()');
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
1,2
2,3
3,4
\.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM unhealthy_shard_count; SELECT * FROM unhealthy_shard_count;
SELECT count(*) FROM test_table; SELECT count(*) FROM test_table;
@ -47,10 +39,6 @@ SELECT count(*) FROM test_table;
-- cancel as soon as the coordinator sends begin -- cancel as soon as the coordinator sends begin
SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')');
\copy test_table FROM STDIN DELIMITER ',' \copy test_table FROM STDIN DELIMITER ','
1,2
2,3
3,4
\.
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
SELECT * FROM unhealthy_shard_count; SELECT * FROM unhealthy_shard_count;
SELECT count(*) FROM test_table; SELECT count(*) FROM test_table;

View File

@ -20,7 +20,7 @@ CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1);
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
-- verify index is not created -- verify index is not created
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;
@ -78,7 +78,7 @@ DROP INDEX CONCURRENTLY IF EXISTS idx_index_test;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');
-- verify index is not dropped at worker 2 -- verify index is not dropped at worker 2
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;
-- test unique concurrent index creation failure when there are duplicates -- test unique concurrent index creation failure when there are duplicates
@ -97,5 +97,5 @@ RESET SEARCH_PATH;
DROP SCHEMA index_schema CASCADE; DROP SCHEMA index_schema CASCADE;
-- verify index is not at worker 2 upon cleanup -- verify index is not at worker 2 upon cleanup
SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$)
WHERE nodeport = :worker_2_proxy_port; WHERE nodeport = :worker_2_proxy_port;

View File

@ -14,7 +14,7 @@ SET search_path TO 'ddl_failure';
SET citus.max_cached_conns_per_worker TO 0; SET citus.max_cached_conns_per_worker TO 0;
-- we don't want to see the prepared transaction numbers in the warnings -- we don't want to see the prepared transaction numbers in the warnings
SET client_min_messages TO ERROR; SET client_min_messages TO WARNING;
SELECT citus.mitmproxy('conn.allow()'); SELECT citus.mitmproxy('conn.allow()');

View File

@ -14,7 +14,8 @@ CREATE TABLE t1 (id int PRIMARY KEY);
SELECT create_distributed_table('t1', 'id'); SELECT create_distributed_table('t1', 'id');
INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x);
-- Initial metadata status -- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations
SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
-- Failure to set groupid in the worker -- Failure to set groupid in the worker
@ -95,6 +96,10 @@ SELECT count(*) FROM pg_dist_node;
\c - - - :master_port \c - - - :master_port
SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port;
-- turn metadata sync back on
SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port);
SET SEARCH_PATH = mx_metadata_sync; SET SEARCH_PATH = mx_metadata_sync;
DROP TABLE t1; DROP TABLE t1;
DROP TABLE t2; DROP TABLE t2;

View File

@ -319,6 +319,20 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port);
\c - - - :master_port \c - - - :master_port
SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4;
-- test cluster connectivity when we have broken nodes
SET client_min_messages TO ERROR;
SET citus.node_connection_timeout TO 10;
BEGIN;
INSERT INTO pg_dist_node VALUES
(123456789, 123456789, 'localhost', 123456789),
(1234567890, 1234567890, 'www.citusdata.com', 5432);
SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4;
ROLLBACK;
RESET citus.node_connection_timeout;
RESET client_min_messages; RESET client_min_messages;
DROP SCHEMA tools CASCADE; DROP SCHEMA tools CASCADE;
RESET SEARCH_PATH; RESET SEARCH_PATH;

View File

@ -634,11 +634,14 @@ CREATE DATABASE another;
\c another \c another
CREATE EXTENSION citus; CREATE EXTENSION citus;
\c - - - :worker_1_port
CREATE EXTENSION citus;
\c - - - :master_port
SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node
SELECT FROM master_add_node('localhost', :worker_1_port); SELECT FROM master_add_node('localhost', :worker_1_port);
\c - - - :worker_1_port \c - - - :worker_1_port
CREATE EXTENSION citus;
ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone)
RENAME TO dummy_assign_function; RENAME TO dummy_assign_function;

View File

@ -111,6 +111,9 @@ UPDATE pg_dist_node SET nodecluster = 'second-cluster' WHERE noderole = 'seconda
\c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'"
SELECT * FROM the_table; SELECT * FROM the_table;
-- Check for connectivity in the cluster
SELECT * FROM citus_check_cluster_node_health();
-- clean up after ourselves -- clean up after ourselves
\c -reuse-previous=off regression - - :master_port \c -reuse-previous=off regression - - :master_port
DROP TABLE the_table; DROP TABLE the_table;

View File

@ -159,17 +159,12 @@ REINDEX SYSTEM regression;
DROP INDEX lineitem_orderkey_index, lineitem_partial_index; DROP INDEX lineitem_orderkey_index, lineitem_partial_index;
-- Verify that we can succesfully drop indexes -- Verify that we can succesfully drop indexes
DROP INDEX lineitem_orderkey_index;
DROP INDEX lineitem_orderkey_index_new;
DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partkey_desc_index;
DROP INDEX lineitem_partial_index; DROP INDEX lineitem_partial_index;
DROP INDEX lineitem_colref_index; DROP INDEX lineitem_colref_index;
-- Verify that we can drop distributed indexes with local indexes
CREATE TABLE local_table(a int, b int);
CREATE INDEX local_index ON local_table(a);
CREATE INDEX local_index2 ON local_table(b);
DROP INDEX lineitem_orderkey_index, local_index;
DROP INDEX IF EXISTS lineitem_orderkey_index_new, local_index2, non_existing_index;
-- Verify that we handle if exists statements correctly -- Verify that we handle if exists statements correctly
DROP INDEX non_existent_index; DROP INDEX non_existent_index;
@ -193,8 +188,9 @@ DROP INDEX CONCURRENTLY lineitem_concurrently_index;
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
\c - - - :worker_1_port \c - - - :worker_1_port
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; SET citus.override_table_visibility TO FALSE;
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem%\d' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname;
-- create index that will conflict with master operations -- create index that will conflict with master operations
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON multi_index_statements.index_test_hash_102089(b); CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON multi_index_statements.index_test_hash_102089(b);

View File

@ -1,9 +1,5 @@
CREATE SCHEMA mx_add_coordinator; CREATE SCHEMA mx_add_coordinator;
SET search_path TO mx_add_coordinator,public; SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
CREATE USER reprefuser WITH LOGIN; CREATE USER reprefuser WITH LOGIN;
@ -12,8 +8,29 @@ SET citus.enable_alter_role_propagation TO ON;
-- alter role for other than the extension owner works in enterprise, output differs accordingly -- alter role for other than the extension owner works in enterprise, output differs accordingly
ALTER ROLE reprefuser WITH CREATEDB; ALTER ROLE reprefuser WITH CREATEDB;
-- check connectivity in the cluster
-- verify that we test for 4 node pairs before we add coordinator to metadata
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
-- verify that we test for 9 node pairs after we add coordinator to metadata
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
-- test that we can test for connectivity when connected to worker nodes as well
\c - - - :worker_1_port
SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health();
\c - - - :master_port
-- set the configs after reconnecting to coordinator
SET search_path TO mx_add_coordinator,public;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 8;
SET citus.next_shard_id TO 7000000;
SET citus.next_placement_id TO 7000000;
SET client_min_messages TO WARNING;
-- test that coordinator pg_dist_node entry is synced to the workers -- test that coordinator pg_dist_node entry is synced to the workers
SELECT wait_until_metadata_sync(30000); SELECT wait_until_metadata_sync(30000);

View File

@ -451,7 +451,7 @@ WHERE
ORDER BY shardid, nodeport; ORDER BY shardid, nodeport;
-- verify constraints have been created on the new node -- verify constraints have been created on the new node
SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';'); SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname similar to ''ref_table%\d'';');
DROP TABLE ref_table_1, ref_table_2, ref_table_3; DROP TABLE ref_table_1, ref_table_2, ref_table_3;
@ -591,7 +591,6 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
SELECT 1 FROM master_add_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT master_copy_shard_placement( SELECT master_copy_shard_placement(
:ref_table_shard, :ref_table_shard,
@ -637,8 +636,6 @@ WHERE ref_table.a = dist_table.a;
SET search_path TO replicate_reference_table; SET search_path TO replicate_reference_table;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
-- --
-- The following case used to get stuck on create_distributed_table() instead -- The following case used to get stuck on create_distributed_table() instead
-- of detecting the distributed deadlock. -- of detecting the distributed deadlock.

View File

@ -33,8 +33,6 @@ FROM pg_dist_partition
WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass)
ORDER BY logicalrelid; ORDER BY logicalrelid;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv');
-37, 'lorem' -37, 'lorem'
65536, 'ipsum' 65536, 'ipsum'
@ -219,11 +217,6 @@ DROP SEQUENCE mx_table_col_3_seq CASCADE;
\c - - - :master_port \c - - - :master_port
DROP TABLE mx_table; DROP TABLE mx_table;
DROP TABLE mx_table_2; DROP TABLE mx_table_2;
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
\c - - - :worker_1_port
DELETE FROM pg_dist_node;
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx\_%table%';
\c - - - :master_port
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id;
RESET citus.shard_replication_factor; RESET citus.shard_replication_factor;

View File

@ -3,9 +3,6 @@ SET search_path TO "Mx Regular User";
-- add coordinator in idempotent way -- add coordinator in idempotent way
SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
-- sync the metadata to both nodes
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
-- create a role and give access one each node separately -- create a role and give access one each node separately
-- and increase the error level to prevent enterprise to diverge -- and increase the error level to prevent enterprise to diverge
@ -355,4 +352,8 @@ SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
-- finally sync metadata again so it doesn't break later tests
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
DROP SCHEMA "Mx Regular User" CASCADE; DROP SCHEMA "Mx Regular User" CASCADE;

Some files were not shown because too many files have changed in this diff Show More