diff --git a/configure b/configure index ea3374c64..5766ce8d6 100755 --- a/configure +++ b/configure @@ -4543,7 +4543,9 @@ if test "${with_lz4+set}" = set; then : withval=$with_lz4; case $withval in yes) - : + +$as_echo "#define HAVE_CITUS_LIBLZ4 1" >>confdefs.h + ;; no) : @@ -4556,6 +4558,8 @@ if test "${with_lz4+set}" = set; then : else with_lz4=yes +$as_echo "#define HAVE_CITUS_LIBLZ4 1" >>confdefs.h + fi diff --git a/configure.in b/configure.in index 8a5192f58..cb1d15b6b 100644 --- a/configure.in +++ b/configure.in @@ -220,7 +220,8 @@ AC_DEFINE_UNQUOTED(REPORTS_BASE_URL, "$REPORTS_BASE_URL", # LZ4 # PGAC_ARG_BOOL(with, lz4, yes, - [do not use lz4]) + [do not use lz4], + [AC_DEFINE([HAVE_CITUS_LIBLZ4], 1, [Define to 1 to build with lz4 support. (--with-lz4)])]) AC_SUBST(with_lz4) if test "$with_lz4" = yes; then diff --git a/src/backend/columnar/columnar.c b/src/backend/columnar/columnar.c index d555a565c..35a6f6da9 100644 --- a/src/backend/columnar/columnar.c +++ b/src/backend/columnar/columnar.c @@ -29,7 +29,7 @@ #if HAVE_LIBZSTD #define DEFAULT_COMPRESSION_TYPE COMPRESSION_ZSTD -#elif HAVE_LIBLZ4 +#elif HAVE_CITUS_LIBLZ4 #define DEFAULT_COMPRESSION_TYPE COMPRESSION_LZ4 #else #define DEFAULT_COMPRESSION_TYPE COMPRESSION_PG_LZ @@ -44,7 +44,7 @@ static const struct config_enum_entry columnar_compression_options[] = { { "none", COMPRESSION_NONE, false }, { "pglz", COMPRESSION_PG_LZ, false }, -#if HAVE_LIBLZ4 +#if HAVE_CITUS_LIBLZ4 { "lz4", COMPRESSION_LZ4, false }, #endif #if HAVE_LIBZSTD diff --git a/src/backend/columnar/columnar_compression.c b/src/backend/columnar/columnar_compression.c index 961048018..b73c1dac6 100644 --- a/src/backend/columnar/columnar_compression.c +++ b/src/backend/columnar/columnar_compression.c @@ -19,7 +19,7 @@ #include "columnar/columnar_compression.h" -#if HAVE_LIBLZ4 +#if HAVE_CITUS_LIBLZ4 #include #endif @@ -63,7 +63,7 @@ CompressBuffer(StringInfo inputBuffer, { switch (compressionType) { -#if HAVE_LIBLZ4 +#if HAVE_CITUS_LIBLZ4 case COMPRESSION_LZ4: { int maximumLength = LZ4_compressBound(inputBuffer->len); @@ -170,7 +170,7 @@ DecompressBuffer(StringInfo buffer, return buffer; } -#if HAVE_LIBLZ4 +#if HAVE_CITUS_LIBLZ4 case COMPRESSION_LZ4: { StringInfo decompressedBuffer = makeStringInfo(); diff --git a/src/backend/columnar/columnar_debug.c b/src/backend/columnar/columnar_debug.c index 5525bb032..83af86ee5 100644 --- a/src/backend/columnar/columnar_debug.c +++ b/src/backend/columnar/columnar_debug.c @@ -70,8 +70,6 @@ columnar_store_memory_stats(PG_FUNCTION_ARGS) Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); - tuplestore_donestoring(tupleStore); - PG_RETURN_DATUM(0); } diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 13ec391d2..f7bc29c34 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -80,6 +80,7 @@ static void EnsureSequentialModeForFunctionDDL(void); static void TriggerSyncMetadataToPrimaryNodes(void); static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt); static bool ShouldPropagateAlterFunction(const ObjectAddress *address); +static bool ShouldAddFunctionSignature(FunctionParameterMode mode); static ObjectAddress FunctionToObjectAddress(ObjectType objectType, ObjectWithArgs *objectWithArgs, bool missing_ok); @@ -1328,7 +1329,11 @@ CreateFunctionStmtObjectAddress(Node *node, bool missing_ok) FunctionParameter *funcParam = NULL; foreach_ptr(funcParam, stmt->parameters) { - objectWithArgs->objargs = lappend(objectWithArgs->objargs, funcParam->argType); + if (ShouldAddFunctionSignature(funcParam->mode)) + { + objectWithArgs->objargs = lappend(objectWithArgs->objargs, + funcParam->argType); + } } return FunctionToObjectAddress(objectType, objectWithArgs, missing_ok); @@ -1885,8 +1890,7 @@ ObjectWithArgsFromOid(Oid funcOid) for (int i = 0; i < numargs; i++) { - if (argModes == NULL || - argModes[i] != PROARGMODE_OUT || argModes[i] != PROARGMODE_TABLE) + if (argModes == NULL || ShouldAddFunctionSignature(argModes[i])) { objargs = lappend(objargs, makeTypeNameFromOid(argTypes[i], -1)); } @@ -1899,6 +1903,35 @@ ObjectWithArgsFromOid(Oid funcOid) } +/* + * ShouldAddFunctionSignature takes a FunctionParameterMode and returns true if it should + * be included in the function signature. Returns false otherwise. + */ +static bool +ShouldAddFunctionSignature(FunctionParameterMode mode) +{ + /* only input parameters should be added to the generated signature */ + switch (mode) + { + case FUNC_PARAM_IN: + case FUNC_PARAM_INOUT: + case FUNC_PARAM_VARIADIC: + { + return true; + } + + case FUNC_PARAM_OUT: + case FUNC_PARAM_TABLE: + { + return false; + } + + default: + return true; + } +} + + /* * FunctionToObjectAddress returns the ObjectAddress of a Function or Procedure based on * its type and ObjectWithArgs describing the Function/Procedure. If missing_ok is set to diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index b43090d45..cfdd6ad63 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -24,6 +24,7 @@ #include "distributed/commands.h" #include "distributed/commands/utility_hook.h" #include "distributed/deparse_shard_query.h" +#include "distributed/deparser.h" #include "distributed/distributed_planner.h" #include "distributed/listutils.h" #include "distributed/local_executor.h" @@ -71,6 +72,7 @@ static void RangeVarCallbackForReindexIndex(const RangeVar *rel, Oid relOid, Oid oldRelOid, void *arg); static void ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement); +static void ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement); static List * DropIndexTaskList(Oid relationId, Oid indexId, DropStmt *dropStmt); @@ -676,21 +678,9 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, bool isCitusRelation = IsCitusTable(relationId); if (isCitusRelation) { - if (OidIsValid(distributedIndexId)) - { - /* - * We already have a distributed index in the list, and Citus - * currently only support dropping a single distributed index. - */ - ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), - errmsg("cannot drop multiple distributed objects in " - "a single command"), - errhint("Try dropping each object in a separate DROP " - "command."))); - } - distributedIndexId = indexId; distributedRelationId = relationId; + break; } } @@ -698,6 +688,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); + ErrorIfUnsupportedDropIndexStmt(dropIndexStatement); + if (AnyForeignKeyDependsOnIndex(distributedIndexId)) { MarkInvalidateForeignKeyGraph(); @@ -1159,6 +1151,26 @@ ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement) } +/* + * ErrorIfUnsupportedDropIndexStmt checks if the corresponding drop index statement is + * supported for distributed tables and errors out if it is not. + */ +static void +ErrorIfUnsupportedDropIndexStmt(DropStmt *dropIndexStatement) +{ + Assert(dropIndexStatement->removeType == OBJECT_INDEX); + + if (list_length(dropIndexStatement->objects) > 1) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot drop multiple distributed objects in a " + "single command"), + errhint("Try dropping each object in a separate DROP " + "command."))); + } +} + + /* * DropIndexTaskList builds a list of tasks to execute a DROP INDEX command * against a specified distributed table. diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index efa4de221..aae37b7d4 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -387,7 +387,10 @@ ProcessUtilityInternal(PlannedStmt *pstmt, parsetree = ProcessCreateSubscriptionStmt(createSubStmt); } - /* process SET LOCAL stmts of allowed GUCs in multi-stmt xacts */ + /* + * Process SET LOCAL and SET TRANSACTION statements in multi-statement + * transactions. + */ if (IsA(parsetree, VariableSetStmt)) { VariableSetStmt *setStmt = (VariableSetStmt *) parsetree; diff --git a/src/backend/distributed/commands/variableset.c b/src/backend/distributed/commands/variableset.c index 9f93e7c65..c7ad22df2 100644 --- a/src/backend/distributed/commands/variableset.c +++ b/src/backend/distributed/commands/variableset.c @@ -35,6 +35,7 @@ static bool IsSettingSafeToPropagate(char *name); * * We currently propagate: * - SET LOCAL (for allowed settings) + * - SET TRANSACTION * - RESET (for allowed settings) * - RESET ALL */ @@ -72,8 +73,8 @@ ShouldPropagateSetCommand(VariableSetStmt *setStmt) case VAR_SET_MULTI: default: { - /* SET (LOCAL) TRANSACTION should be handled locally */ - return false; + /* SET TRANSACTION is similar to SET LOCAL */ + return strcmp(setStmt->name, "TRANSACTION") == 0; } } } @@ -121,7 +122,7 @@ PostprocessVariableSetStmt(VariableSetStmt *setStmt, const char *setStmtString) const bool raiseInterrupts = true; List *connectionList = NIL; - /* at present we only support SET LOCAL */ + /* at present we only support SET LOCAL and SET TRANSACTION */ AssertArg(ShouldPropagateSetCommand(setStmt)); /* haven't seen any SET stmts so far in this (sub-)xact: initialize StringInfo */ diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index 44e1340c2..a26983104 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -118,9 +118,6 @@ citus_reserved_connection_stats(PG_FUNCTION_ARGS) StoreAllReservedConnections(tupleStore, tupleDescriptor); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/connection/remote_commands.c b/src/backend/distributed/connection/remote_commands.c index b566b53da..6511a675c 100644 --- a/src/backend/distributed/connection/remote_commands.c +++ b/src/backend/distributed/connection/remote_commands.c @@ -22,6 +22,8 @@ #include "lib/stringinfo.h" #include "miscadmin.h" #include "storage/latch.h" +#include "utils/builtins.h" +#include "utils/fmgrprotos.h" #include "utils/palloc.h" @@ -34,6 +36,7 @@ int RemoteCopyFlushThreshold = 8 * 1024 * 1024; /* GUC, determining whether statements sent to remote nodes are logged */ bool LogRemoteCommands = false; +char *GrepRemoteCommands = ""; static bool ClearResultsInternal(MultiConnection *connection, bool raiseErrors, @@ -328,7 +331,6 @@ ReportResultError(MultiConnection *connection, PGresult *result, int elevel) /* *INDENT-ON* */ - /* * LogRemoteCommand logs commands send to remote nodes if * citus.log_remote_commands wants us to do so. @@ -341,6 +343,11 @@ LogRemoteCommand(MultiConnection *connection, const char *command) return; } + if (!CommandMatchesLogGrepPattern(command)) + { + return; + } + ereport(NOTICE, (errmsg("issuing %s", ApplyLogRedaction(command)), errdetail("on server %s@%s:%d connectionId: %ld", connection->user, connection->hostname, @@ -348,6 +355,29 @@ LogRemoteCommand(MultiConnection *connection, const char *command) } +/* + * CommandMatchesLogGrepPattern returns true of the input command matches + * the pattern specified by citus.grep_remote_commands. + * + * If citus.grep_remote_commands set to an empty string, all commands are + * considered as a match. + */ +bool +CommandMatchesLogGrepPattern(const char *command) +{ + if (GrepRemoteCommands && strnlen(GrepRemoteCommands, NAMEDATALEN) > 0) + { + Datum boolDatum = + DirectFunctionCall2(textlike, CStringGetTextDatum(command), + CStringGetTextDatum(GrepRemoteCommands)); + + return DatumGetBool(boolDatum); + } + + return true; +} + + /* wrappers around libpq functions, with command logging support */ diff --git a/src/backend/distributed/connection/shared_connection_stats.c b/src/backend/distributed/connection/shared_connection_stats.c index 89fb1cd19..7592a6feb 100644 --- a/src/backend/distributed/connection/shared_connection_stats.c +++ b/src/backend/distributed/connection/shared_connection_stats.c @@ -143,9 +143,6 @@ citus_remote_connection_stats(PG_FUNCTION_ARGS) StoreAllRemoteConnectionStats(tupleStore, tupleDescriptor); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/executor/intermediate_results.c b/src/backend/distributed/executor/intermediate_results.c index c1b8f86dc..3442e23a3 100644 --- a/src/backend/distributed/executor/intermediate_results.c +++ b/src/backend/distributed/executor/intermediate_results.c @@ -861,8 +861,6 @@ ReadIntermediateResultsIntoFuncOutput(FunctionCallInfo fcinfo, char *copyFormat, tupleStore); } } - - tuplestore_donestoring(tupleStore); } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index eefef65c2..26bf12ba0 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -513,8 +513,14 @@ LogLocalCommand(Task *task) return; } + const char *command = TaskQueryString(task); + if (!CommandMatchesLogGrepPattern(command)) + { + return; + } + ereport(NOTICE, (errmsg("executing the command locally: %s", - ApplyLogRedaction(TaskQueryString(task))))); + ApplyLogRedaction(command)))); } diff --git a/src/backend/distributed/executor/partitioned_intermediate_results.c b/src/backend/distributed/executor/partitioned_intermediate_results.c index 21949feed..8feb31a95 100644 --- a/src/backend/distributed/executor/partitioned_intermediate_results.c +++ b/src/backend/distributed/executor/partitioned_intermediate_results.c @@ -262,8 +262,6 @@ worker_partition_query_result(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, returnTupleDesc, values, nulls); } - - tuplestore_donestoring(tupleStore); PortalDrop(portal, false); FreeExecutorState(estate); diff --git a/src/backend/distributed/metadata/metadata_utility.c b/src/backend/distributed/metadata/metadata_utility.c index c19917ebc..4568c29ad 100644 --- a/src/backend/distributed/metadata/metadata_utility.c +++ b/src/backend/distributed/metadata/metadata_utility.c @@ -245,9 +245,6 @@ citus_shard_sizes(PG_FUNCTION_ARGS) ReceiveShardNameAndSizeResults(connectionList, tupleStore, tupleDescriptor); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/operations/citus_tools.c b/src/backend/distributed/operations/citus_tools.c index 77f5190e3..cb87e70b1 100644 --- a/src/backend/distributed/operations/citus_tools.c +++ b/src/backend/distributed/operations/citus_tools.c @@ -14,7 +14,6 @@ #include "access/htup_details.h" #include "catalog/pg_type.h" -#include "distributed/argutils.h" #include "distributed/connection_management.h" #include "distributed/metadata_cache.h" #include "distributed/multi_client_executor.h" @@ -28,13 +27,9 @@ #include "miscadmin.h" #include "utils/builtins.h" -/* simple query to run on workers to check connectivity */ -#define CONNECTIVITY_CHECK_QUERY "SELECT 1" -PG_FUNCTION_INFO_V1(citus_check_connection_to_node); PG_FUNCTION_INFO_V1(master_run_on_worker); -static bool CheckConnectionToNode(char *nodeName, uint32 nodePort); static int ParseCommandParameters(FunctionCallInfo fcinfo, StringInfo **nodeNameArray, int **nodePortsArray, StringInfo **commandStringArray, bool *parallel); @@ -63,36 +58,6 @@ static Tuplestorestate * CreateTupleStore(TupleDesc tupleDescriptor, StringInfo *resultArray, int commandCount); -/* - * citus_check_connection_to_node sends a simple query from a worker node to another - * node, and returns success status. - */ -Datum -citus_check_connection_to_node(PG_FUNCTION_ARGS) -{ - char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0); - uint32 nodePort = PG_GETARG_UINT32(1); - - bool success = CheckConnectionToNode(nodeName, nodePort); - PG_RETURN_BOOL(success); -} - - -/* - * CheckConnectionToNode sends a simple query to a node and returns success status - */ -static bool -CheckConnectionToNode(char *nodeName, uint32 nodePort) -{ - int connectionFlags = 0; - MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); - int responseStatus = ExecuteOptionalRemoteCommand(connection, - CONNECTIVITY_CHECK_QUERY, NULL); - - return responseStatus == RESPONSE_OKAY; -} - - /* * master_run_on_worker executes queries/commands to run on specified worker and * returns success status and query/command result. Expected input is 3 arrays @@ -592,8 +557,5 @@ CreateTupleStore(TupleDesc tupleDescriptor, pfree(nodeNameText); pfree(resultText); } - - tuplestore_donestoring(tupleStore); - return tupleStore; } diff --git a/src/backend/distributed/operations/health_check.c b/src/backend/distributed/operations/health_check.c new file mode 100644 index 000000000..db640cc64 --- /dev/null +++ b/src/backend/distributed/operations/health_check.c @@ -0,0 +1,198 @@ +/*------------------------------------------------------------------------- + * + * health_check.c + * + * UDFs to run health check operations by coordinating simple queries to test connectivity + * between connection pairs in the cluster. + * + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" + +#include "distributed/argutils.h" +#include "distributed/listutils.h" +#include "distributed/lock_graph.h" +#include "distributed/remote_commands.h" +#include "distributed/tuplestore.h" +#include "utils/builtins.h" + +/* simple query to run on workers to check connectivity */ +#define CONNECTIVITY_CHECK_QUERY "SELECT 1" +#define CONNECTIVITY_CHECK_COLUMNS 5 + +PG_FUNCTION_INFO_V1(citus_check_connection_to_node); +PG_FUNCTION_INFO_V1(citus_check_cluster_node_health); + +static bool CheckConnectionToNode(char *nodeName, uint32 nodePort); + +static void StoreAllConnectivityChecks(Tuplestorestate *tupleStore, + TupleDesc tupleDescriptor); +static char * GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort); + + +/* + * citus_check_connection_to_node sends a simple query from a worker node to another + * node, and returns success status. + */ +Datum +citus_check_connection_to_node(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + char *nodeName = PG_GETARG_TEXT_TO_CSTRING(0); + uint32 nodePort = PG_GETARG_UINT32(1); + + bool success = CheckConnectionToNode(nodeName, nodePort); + PG_RETURN_BOOL(success); +} + + +/* + * CheckConnectionToNode sends a simple query to a node and returns success status + */ +static bool +CheckConnectionToNode(char *nodeName, uint32 nodePort) +{ + int connectionFlags = 0; + MultiConnection *connection = GetNodeConnection(connectionFlags, nodeName, nodePort); + int responseStatus = ExecuteOptionalRemoteCommand(connection, + CONNECTIVITY_CHECK_QUERY, NULL); + + return responseStatus == RESPONSE_OKAY; +} + + +/* + * citus_check_cluster_node_health UDF performs connectivity checks from all the nodes to + * all the nodes, and report success status + */ +Datum +citus_check_cluster_node_health(PG_FUNCTION_ARGS) +{ + CheckCitusVersion(ERROR); + + TupleDesc tupleDescriptor = NULL; + Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor); + + StoreAllConnectivityChecks(tupleStore, tupleDescriptor); + + PG_RETURN_VOID(); +} + + +/* + * StoreAllConnectivityChecks performs connectivity checks from all the nodes to all the + * nodes, and report success status. + * + * Algorithm is: + * for sourceNode in activeReadableNodeList: + * c = connectToNode(sourceNode) + * for targetNode in activeReadableNodeList: + * result = c.execute("SELECT citus_check_connection_to_node(targetNode.name, targetNode.port") + * emit sourceNode.name, sourceNode.port, targetNode.name, targetNode.port, result + * + * -- result -> true -> connection attempt from source to target succeeded + * -- result -> false -> connection attempt from source to target failed + * -- result -> NULL -> connection attempt from the current node to source node failed + */ +static void +StoreAllConnectivityChecks(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor) +{ + Datum values[CONNECTIVITY_CHECK_COLUMNS]; + bool isNulls[CONNECTIVITY_CHECK_COLUMNS]; + + /* + * Get all the readable node list so that we will check connectivity to followers in + * the cluster as well. + */ + List *workerNodeList = ActiveReadableNodeList(); + + /* we want to check for connectivity in a deterministic order */ + workerNodeList = SortList(workerNodeList, CompareWorkerNodes); + + /* + * We iterate over the workerNodeList twice, for source and target worker nodes. This + * operation is safe for foreach_ptr macro, as long as we use different variables for + * each iteration. + */ + WorkerNode *sourceWorkerNode = NULL; + foreach_ptr(sourceWorkerNode, workerNodeList) + { + const char *sourceNodeName = sourceWorkerNode->workerName; + const int sourceNodePort = sourceWorkerNode->workerPort; + int32 connectionFlags = 0; + + /* open a connection to the source node using the synchronous api */ + MultiConnection *connectionToSourceNode = + GetNodeConnection(connectionFlags, sourceNodeName, sourceNodePort); + + /* the second iteration over workerNodeList for the target worker nodes. */ + WorkerNode *targetWorkerNode = NULL; + foreach_ptr(targetWorkerNode, workerNodeList) + { + const char *targetNodeName = targetWorkerNode->workerName; + const int targetNodePort = targetWorkerNode->workerPort; + + char *connectivityCheckCommandToTargetNode = + GetConnectivityCheckCommand(targetNodeName, targetNodePort); + + PGresult *result = NULL; + int executionResult = + ExecuteOptionalRemoteCommand(connectionToSourceNode, + connectivityCheckCommandToTargetNode, + &result); + + /* get ready for the next tuple */ + memset(values, 0, sizeof(values)); + memset(isNulls, false, sizeof(isNulls)); + + values[0] = PointerGetDatum(cstring_to_text(sourceNodeName)); + values[1] = Int32GetDatum(sourceNodePort); + values[2] = PointerGetDatum(cstring_to_text(targetNodeName)); + values[3] = Int32GetDatum(targetNodePort); + + /* + * If we could not send the query or the result was not ok, set success field + * to NULL. This may indicate connection errors to a worker node, however that + * node can potentially connect to other nodes. + * + * Therefore, we mark the success as NULL to indicate that the connectivity + * status is unknown. + */ + if (executionResult != RESPONSE_OKAY) + { + isNulls[4] = true; + } + else + { + int rowIndex = 0; + int columnIndex = 0; + values[4] = BoolGetDatum(ParseBoolField(result, rowIndex, columnIndex)); + } + + tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); + + PQclear(result); + ForgetResults(connectionToSourceNode); + } + } +} + + +/* + * GetConnectivityCheckCommand returns the command to check connections to a node + */ +static char * +GetConnectivityCheckCommand(const char *nodeName, const uint32 nodePort) +{ + StringInfo connectivityCheckCommand = makeStringInfo(); + appendStringInfo(connectivityCheckCommand, + "SELECT citus_check_connection_to_node('%s', %d)", + nodeName, nodePort); + + return connectivityCheckCommand->data; +} diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 773c9e8b6..166423cf0 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -1058,8 +1058,6 @@ get_rebalance_table_shards_plan(PG_FUNCTION_ARGS) tuplestore_putvalues(tupstore, tupdesc, values, nulls); } - tuplestore_donestoring(tupstore); - return (Datum) 0; } @@ -1132,8 +1130,6 @@ get_rebalance_progress(PG_FUNCTION_ARGS) } } - tuplestore_donestoring(tupstore); - DetachFromDSMSegments(segmentList); return (Datum) 0; diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index f4c38a3bd..bca9bbaa1 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -1000,8 +1000,6 @@ worker_last_saved_explain_analyze(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); } - - tuplestore_donestoring(tupleStore); PG_RETURN_DATUM(0); } @@ -1087,8 +1085,6 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) ExplainEndOutput(es); - tuplestore_donestoring(tupleStore); - /* save EXPLAIN ANALYZE result to be fetched later */ MemoryContext oldContext = MemoryContextSwitchTo(TopTransactionContext); FreeSavedExplainPlan(); diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index d6a1005e9..7ab700af6 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -1088,6 +1088,18 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.grep_remote_commands", + gettext_noop( + "Applies \"command\" like citus.grep_remote_commands, if returns " + "true, the command is logged."), + NULL, + &GrepRemoteCommands, + "", + PGC_USERSET, + GUC_NO_SHOW_ALL, + NULL, NULL, NULL); + DefineCustomIntVariable( "citus.isolation_test_session_process_id", NULL, diff --git a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql index 3b7814c63..c5f333152 100644 --- a/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql +++ b/src/backend/distributed/sql/citus--10.2-4--11.0-1.sql @@ -4,6 +4,8 @@ #include "udfs/citus_disable_node/11.0-1.sql" #include "udfs/citus_check_connection_to_node/11.0-1.sql" +#include "udfs/citus_check_cluster_node_health/11.0-1.sql" + #include "udfs/citus_internal_add_object_metadata/11.0-1.sql" DROP FUNCTION IF EXISTS pg_catalog.master_apply_delete_command(text); diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql index 0087ecf1c..e5a7cb2bd 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-1--10.2-4.sql @@ -41,4 +41,6 @@ COMMENT ON FUNCTION pg_catalog.citus_disable_node(nodename text, nodeport intege IS 'removes node from the cluster temporarily'; DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer); +DROP FUNCTION pg_catalog.citus_check_cluster_node_health (); + DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer); diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/11.0-1.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql new file mode 100644 index 000000000..378e510ed --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_check_cluster_node_health/latest.sql @@ -0,0 +1,13 @@ +CREATE FUNCTION pg_catalog.citus_check_cluster_node_health ( + OUT from_nodename text, + OUT from_nodeport int, + OUT to_nodename text, + OUT to_nodeport int, + OUT result bool ) + RETURNS SETOF RECORD + LANGUAGE C + STRICT + AS 'MODULE_PATHNAME', $$citus_check_cluster_node_health$$; + +COMMENT ON FUNCTION pg_catalog.citus_check_cluster_node_health () + IS 'checks connections between all nodes in the cluster'; diff --git a/src/backend/distributed/test/distributed_deadlock_detection.c b/src/backend/distributed/test/distributed_deadlock_detection.c index 84739cb49..448228158 100644 --- a/src/backend/distributed/test/distributed_deadlock_detection.c +++ b/src/backend/distributed/test/distributed_deadlock_detection.c @@ -67,8 +67,5 @@ get_adjacency_list_wait_graph(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, tupleDescriptor, values, isNulls); } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/test/distributed_intermediate_results.c b/src/backend/distributed/test/distributed_intermediate_results.c index 5c450d88b..d99e11474 100644 --- a/src/backend/distributed/test/distributed_intermediate_results.c +++ b/src/backend/distributed/test/distributed_intermediate_results.c @@ -101,9 +101,6 @@ partition_task_list_results(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); } - - tuplestore_donestoring(tupleStore); - PG_RETURN_DATUM(0); } @@ -186,8 +183,5 @@ redistribute_task_list_results(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, tupleDescriptor, columnValues, columnNulls); } - - tuplestore_donestoring(tupleStore); - PG_RETURN_DATUM(0); } diff --git a/src/backend/distributed/test/foreign_key_relationship_query.c b/src/backend/distributed/test/foreign_key_relationship_query.c index bae2e77e0..f6266d709 100644 --- a/src/backend/distributed/test/foreign_key_relationship_query.c +++ b/src/backend/distributed/test/foreign_key_relationship_query.c @@ -215,8 +215,5 @@ get_foreign_key_connected_relations(PG_FUNCTION_ARGS) tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls); } - - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/test/progress_utils.c b/src/backend/distributed/test/progress_utils.c index c50e2dd5e..42b065dae 100644 --- a/src/backend/distributed/test/progress_utils.c +++ b/src/backend/distributed/test/progress_utils.c @@ -114,8 +114,6 @@ show_progress(PG_FUNCTION_ARGS) } } - tuplestore_donestoring(tupstore); - DetachFromDSMSegments(attachedDSMSegments); return (Datum) 0; diff --git a/src/backend/distributed/transaction/backend_data.c b/src/backend/distributed/transaction/backend_data.c index 26d853507..c76a80460 100644 --- a/src/backend/distributed/transaction/backend_data.c +++ b/src/backend/distributed/transaction/backend_data.c @@ -323,9 +323,6 @@ get_global_active_transactions(PG_FUNCTION_ARGS) ForgetResults(connection); } - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } @@ -344,9 +341,6 @@ get_all_active_transactions(PG_FUNCTION_ARGS) StoreAllActiveTransactions(tupleStore, tupleDescriptor); - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); - PG_RETURN_VOID(); } diff --git a/src/backend/distributed/transaction/citus_dist_stat_activity.c b/src/backend/distributed/transaction/citus_dist_stat_activity.c index ec631204d..d1aa9a034 100644 --- a/src/backend/distributed/transaction/citus_dist_stat_activity.c +++ b/src/backend/distributed/transaction/citus_dist_stat_activity.c @@ -1100,7 +1100,4 @@ ReturnCitusDistStats(List *citusStatsList, FunctionCallInfo fcinfo) tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } - - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); } diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 30dec5d80..aa37e4371 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -322,9 +322,6 @@ ReturnWaitGraph(WaitGraph *waitGraph, FunctionCallInfo fcinfo) tuplestore_putvalues(tupleStore, tupleDesc, values, nulls); } - - /* clean up and return the tuplestore */ - tuplestore_donestoring(tupleStore); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index ba67843de..2859ec4c9 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -34,6 +34,7 @@ #define PREPARED_TRANSACTION_NAME_FORMAT "citus_%u_%u_"UINT64_FORMAT "_%u" +static char * AssignDistributedTransactionIdCommand(void); static void StartRemoteTransactionSavepointBegin(MultiConnection *connection, SubTransactionId subId); static void FinishRemoteTransactionSavepointBegin(MultiConnection *connection, @@ -68,8 +69,15 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) transaction->transactionState = REMOTE_TRANS_STARTING; - StringInfo beginAndSetDistributedTransactionId = - BeginAndSetDistributedTransactionIdCommand(); + StringInfo beginAndSetDistributedTransactionId = makeStringInfo(); + + /* + * Explicitly specify READ COMMITTED, the default on the remote + * side might have been changed, and that would cause problematic + * behaviour. + */ + appendStringInfoString(beginAndSetDistributedTransactionId, + "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); /* append context for in-progress SAVEPOINTs for this transaction */ List *activeSubXacts = ActiveSubXactContexts(); @@ -98,6 +106,10 @@ StartRemoteTransactionBegin(struct MultiConnection *connection) appendStringInfoString(beginAndSetDistributedTransactionId, activeSetStmts->data); } + /* add SELECT assign_distributed_transaction_id ... */ + appendStringInfoString(beginAndSetDistributedTransactionId, + AssignDistributedTransactionIdCommand()); + if (!SendRemoteCommand(connection, beginAndSetDistributedTransactionId->data)) { const bool raiseErrors = true; @@ -126,6 +138,22 @@ BeginAndSetDistributedTransactionIdCommand(void) appendStringInfoString(beginAndSetDistributedTransactionId, "BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED;"); + appendStringInfoString(beginAndSetDistributedTransactionId, + AssignDistributedTransactionIdCommand()); + + return beginAndSetDistributedTransactionId; +} + + +/* + * AssignDistributedTransactionIdCommand returns a command to set the local + * distributed transaction ID on a remote transaction. + */ +static char * +AssignDistributedTransactionIdCommand(void) +{ + StringInfo assignDistributedTransactionId = makeStringInfo(); + /* * Append BEGIN and assign_distributed_transaction_id() statements into a single command * and send both in one step. The reason is purely performance, we don't want @@ -134,14 +162,14 @@ BeginAndSetDistributedTransactionIdCommand(void) DistributedTransactionId *distributedTransactionId = GetCurrentDistributedTransactionId(); const char *timestamp = timestamptz_to_str(distributedTransactionId->timestamp); - appendStringInfo(beginAndSetDistributedTransactionId, + appendStringInfo(assignDistributedTransactionId, "SELECT assign_distributed_transaction_id(%d, " UINT64_FORMAT ", '%s');", distributedTransactionId->initiatorNodeIdentifier, distributedTransactionId->transactionNumber, timestamp); - return beginAndSetDistributedTransactionId; + return assignDistributedTransactionId->data; } diff --git a/src/backend/distributed/worker/worker_create_or_replace.c b/src/backend/distributed/worker/worker_create_or_replace.c index 2bb34ea8c..c067abc11 100644 --- a/src/backend/distributed/worker/worker_create_or_replace.c +++ b/src/backend/distributed/worker/worker_create_or_replace.c @@ -61,7 +61,7 @@ WrapCreateOrReplace(const char *sql) * have this functionality or where their implementation is not sufficient. * * Besides checking if an object of said name exists it tries to compare the object to be - * created with the one in the local catalog. If there is a difference the on in the local + * created with the one in the local catalog. If there is a difference the one in the local * catalog will be renamed after which the statement can be executed on this worker to * create the object. * diff --git a/src/include/citus_config.h.in b/src/include/citus_config.h.in index 30868d4ba..b9b639ef6 100644 --- a/src/include/citus_config.h.in +++ b/src/include/citus_config.h.in @@ -38,7 +38,7 @@ #undef HAVE_LIBCURL /* Define to 1 if you have the `lz4' library (-llz4). */ -#undef HAVE_LIBLZ4 +#undef HAVE_CITUS_LIBLZ4 /* Define to 1 if you have the `zstd' library (-lzstd). */ #undef HAVE_LIBZSTD diff --git a/src/include/citus_version.h.in b/src/include/citus_version.h.in index 29e64adb7..68670dc34 100644 --- a/src/include/citus_version.h.in +++ b/src/include/citus_version.h.in @@ -25,7 +25,7 @@ #undef HAVE_LIBCURL /* Define to 1 if you have the `liblz4' library (-llz4). */ -#undef HAVE_LIBLZ4 +#undef HAVE_CITUS_LIBLZ4 /* Define to 1 if you have the `libzstd' library (-lzstd). */ #undef HAVE_LIBZSTD diff --git a/src/include/distributed/remote_commands.h b/src/include/distributed/remote_commands.h index 4463ad8f0..7e2c4852f 100644 --- a/src/include/distributed/remote_commands.h +++ b/src/include/distributed/remote_commands.h @@ -20,6 +20,7 @@ /* GUC, determining whether statements sent to remote nodes are logged */ extern bool LogRemoteCommands; +extern char *GrepRemoteCommands; /* GUC that determines the number of bytes after which remote COPY is flushed */ extern int RemoteCopyFlushThreshold; @@ -38,6 +39,7 @@ extern void ReportResultError(MultiConnection *connection, PGresult *result, int elevel); extern char * pchomp(const char *in); extern void LogRemoteCommand(MultiConnection *connection, const char *command); +extern bool CommandMatchesLogGrepPattern(const char *command); /* wrappers around libpq functions, with command logging support */ extern void ExecuteCriticalRemoteCommandList(MultiConnection *connection, diff --git a/src/test/regress/after_citus_upgrade_coord_schedule b/src/test/regress/after_citus_upgrade_coord_schedule index 7bca08db2..49b4e73d9 100644 --- a/src/test/regress/after_citus_upgrade_coord_schedule +++ b/src/test/regress/after_citus_upgrade_coord_schedule @@ -1,6 +1,5 @@ # this schedule is to be run only on coordinators -test: turn_mx_off test: upgrade_basic_after test: upgrade_partition_constraints_after test: upgrade_pg_dist_object_test_after diff --git a/src/test/regress/base_schedule b/src/test/regress/base_schedule index 97c6902f6..e3ea4de24 100644 --- a/src/test/regress/base_schedule +++ b/src/test/regress/base_schedule @@ -1,7 +1,6 @@ # ---------- # Only run few basic tests to set up a testing environment # ---------- -test: turn_mx_off test: multi_cluster_management test: multi_test_helpers multi_test_helpers_superuser multi_create_fdw columnar_test_helpers test: multi_test_catalog_views diff --git a/src/test/regress/before_pg_upgrade_schedule b/src/test/regress/before_pg_upgrade_schedule index 3af949a47..a5292e0ce 100644 --- a/src/test/regress/before_pg_upgrade_schedule +++ b/src/test/regress/before_pg_upgrade_schedule @@ -1,5 +1,4 @@ # The basic tests runs analyze which depends on shard numbers -test: turn_mx_off test: multi_test_helpers multi_test_helpers_superuser test: multi_test_catalog_views test: upgrade_basic_before diff --git a/src/test/regress/bin/normalize.sed b/src/test/regress/bin/normalize.sed index d4a0db225..25f4388e1 100644 --- a/src/test/regress/bin/normalize.sed +++ b/src/test/regress/bin/normalize.sed @@ -65,6 +65,9 @@ s/"(raw_events_second_user_id_value_1_key_|agg_events_user_id_value_1_agg_key_)[ # ignore WAL warnings /DEBUG: .+creating and filling new WAL file/d +# normalize debug connection failure +s/DEBUG: connection to the remote node/WARNING: connection to the remote node/g + # normalize file names for partitioned files s/(task_[0-9]+\.)[0-9]+/\1xxxx/g s/(job_[0-9]+\/task_[0-9]+\/p_[0-9]+\.)[0-9]+/\1xxxx/g diff --git a/src/test/regress/citus_tests/config.py b/src/test/regress/citus_tests/config.py index c80b3d14c..e783b3a46 100644 --- a/src/test/regress/citus_tests/config.py +++ b/src/test/regress/citus_tests/config.py @@ -272,16 +272,6 @@ class CitusUnusualExecutorConfig(CitusMXBaseClusterConfig): } -class CitusCacheManyConnectionsConfig(CitusMXBaseClusterConfig): - def __init__(self, arguments): - super().__init__(arguments) - self.new_settings = { - "citus.copy_switchover_threshold": "1B", - "citus.local_copy_flush_threshold": "1B", - "citus.remote_copy_flush_threshold": "1B", - } - - class CitusSmallCopyBuffersConfig(CitusMXBaseClusterConfig): def __init__(self, arguments): super().__init__(arguments) diff --git a/src/test/regress/create_schedule b/src/test/regress/create_schedule index 50a299d7e..17fc6559b 100644 --- a/src/test/regress/create_schedule +++ b/src/test/regress/create_schedule @@ -3,3 +3,4 @@ test: prepared_statements_create_load ch_benchmarks_create_load test: dropped_columns_create_load distributed_planning_create_load test: local_dist_join_load test: partitioned_indexes_create +test: connectivity_checks diff --git a/src/test/regress/expected/add_coordinator.out b/src/test/regress/expected/add_coordinator.out index d1e5d7cd8..51808515b 100644 --- a/src/test/regress/expected/add_coordinator.out +++ b/src/test/regress/expected/add_coordinator.out @@ -2,6 +2,7 @@ -- ADD_COORDINATOR -- SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata -- adding the same node again should return the existing nodeid SELECT master_add_node('localhost', :master_port, groupid => 0) = :master_nodeid; ?column? diff --git a/src/test/regress/expected/connectivity_checks.out b/src/test/regress/expected/connectivity_checks.out new file mode 100644 index 000000000..9fbc61482 --- /dev/null +++ b/src/test/regress/expected/connectivity_checks.out @@ -0,0 +1,6 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + t +(1 row) + diff --git a/src/test/regress/expected/distributed_functions.out b/src/test/regress/expected/distributed_functions.out index 23b06782e..637441b46 100644 --- a/src/test/regress/expected/distributed_functions.out +++ b/src/test/regress/expected/distributed_functions.out @@ -610,7 +610,7 @@ SELECT create_distributed_function('eq_with_param_names(macaddr, macaddr)', dist -- show that we are able to propagate objects with multiple item on address arrays SELECT * FROM (SELECT unnest(master_metadata_snapshot()) as metadata_command order by 1) as innerResult WHERE metadata_command like '%distributed_object_data%'; - metadata_command + metadata_command --------------------------------------------------------------------- WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid) AS (VALUES ('type', ARRAY['public.usage_access_type']::text[], ARRAY[]::text[], -1, 0), ('type', ARRAY['function_tests.dup_result']::text[], ARRAY[]::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func']::text[], ARRAY['public.usage_access_type', 'integer[]']::text[], -1, 0), ('function', ARRAY['public', 'usage_access_func_third']::text[], ARRAY['integer', 'integer[]']::text[], 0, 50), ('function', ARRAY['function_tests', 'notice']::text[], ARRAY['pg_catalog.text']::text[], -1, 0), ('function', ARRAY['function_tests', 'dup']::text[], ARRAY['pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_with_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], 0, 52), ('function', ARRAY['function_tests', 'eq_mi''xed_param_names']::text[], ARRAY['pg_catalog.macaddr', 'pg_catalog.macaddr']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_sfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_invfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_finalfunc']::text[], ARRAY['integer', 'integer']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'my_rank']::text[], ARRAY['pg_catalog."any"']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_sfunc']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('function', ARRAY['function_tests', 'agg_names_finalfunc']::text[], ARRAY['function_tests.dup_result']::text[], -1, 0), ('aggregate', ARRAY['function_tests', 'agg_names']::text[], ARRAY['function_tests.dup_result', 'function_tests.dup_result']::text[], -1, 0), ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['schema_colocation']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests']::text[], ARRAY[]::text[], -1, 0), ('schema', ARRAY['function_tests2']::text[], ARRAY[]::text[], -1, 0), ('extension', ARRAY['plpgsql']::text[], ARRAY[]::text[], -1, 0)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int) FROM distributed_object_data; (1 row) @@ -829,6 +829,266 @@ SELECT * FROM test ORDER BY id; (2 rows) DROP TABLE test; +-- verify that recreating distributed functions with TABLE params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-01'::date; +END; +$$; +SELECT create_distributed_function('func_with_return_table(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-02'::date; +END; +$$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_return_table';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_return_table') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed functions with OUT params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; +SELECT create_distributed_function('func_with_out_param(int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO ERROR; +CREATE ROLE r1; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +GRANT EXECUTE ON FUNCTION func_with_out_param TO r1; +SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +RESET client_min_messages; +CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'func_with_out_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'func_with_out_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed functions with INOUT params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; +-- this should error out +SELECT create_distributed_function('func_with_inout_param(int)'); +ERROR: function "func_with_inout_param(int)" does not exist +-- this should work +SELECT create_distributed_function('func_with_inout_param(int,int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_inout_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_inout_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed functions with VARIADIC params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[]) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; +-- this should work +SELECT create_distributed_function('func_with_variadic_param(int,int[])'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[]) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_variadic_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed functions returning setof records gets propagated to workers +CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 1; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; +SELECT create_distributed_function('func_returning_setof_int(date,interval)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 2; + +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed functions with variadic param returning setof records gets propagated to workers +CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 1; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; +SELECT create_distributed_function('func_returning_setof_int_with_variadic_param(date,int[])'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 2; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int_with_variadic_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed procedures with out params gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; +-- this should error out +SELECT create_distributed_function('proc_with_variadic_param(date)'); +ERROR: function "proc_with_variadic_param(date)" does not exist +-- this should work +SELECT create_distributed_function('proc_with_variadic_param(date,int[])'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_variadic_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + +-- verify that recreating distributed procedures with INOUT param gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; +-- this should error out +SELECT create_distributed_function('proc_with_inout_param(date)'); +ERROR: function "proc_with_inout_param(date)" does not exist +-- this should work +SELECT create_distributed_function('proc_with_inout_param(date,int)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_inout_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_inout_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; diff --git a/src/test/regress/expected/distributed_functions_conflict.out b/src/test/regress/expected/distributed_functions_conflict.out index 995668e64..537da8c4e 100644 --- a/src/test/regress/expected/distributed_functions_conflict.out +++ b/src/test/regress/expected/distributed_functions_conflict.out @@ -139,6 +139,27 @@ SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_ f (1 row) +-- test worker_create_or_replace_object with a function that returns table +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-01'::date; +END; +$$; +SELECT worker_create_or_replace_object('CREATE OR REPLACE FUNCTION func_with_return_table(int) RETURNS TABLE (date date) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ''2011-01-01''::date; END; $$;'); + worker_create_or_replace_object +--------------------------------------------------------------------- + t +(1 row) + +-- verify that a backup function is created +SELECT COUNT(*)=2 FROM pg_proc WHERE proname LIKE 'func_with_return_table%'; + ?column? +--------------------------------------------------------------------- + t +(1 row) + -- hide cascades SET client_min_messages TO error; DROP SCHEMA proc_conflict CASCADE; diff --git a/src/test/regress/expected/failure_connection_establishment.out b/src/test/regress/expected/failure_connection_establishment.out index bb9b27681..99b9efcda 100644 --- a/src/test/regress/expected/failure_connection_establishment.out +++ b/src/test/regress/expected/failure_connection_establishment.out @@ -44,7 +44,8 @@ SELECT citus.mitmproxy('conn.delay(500)'); (1 row) ALTER TABLE products ADD CONSTRAINT p_key PRIMARY KEY(product_no); -ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms +WARNING: could not establish connection after 400 ms +ERROR: connection to the remote node localhost:xxxxx failed SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -140,14 +141,6 @@ SELECT count(*) FROM products; 0 (1 row) --- use OFFSET 1 to prevent printing the line where source --- is the worker, and LIMIT 1 in case there were multiple connections -SELECT citus.dump_network_traffic() ORDER BY 1 LIMIT 1 OFFSET 1; - dump_network_traffic ---------------------------------------------------------------------- - (1,coordinator,"[initial message]") -(1 row) - SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -190,7 +183,7 @@ FROM pg_dist_shard_placement WHERE shardstate = 3 AND - shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); + shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); invalid_placement_count --------------------------------------------------------------------- 0 @@ -202,7 +195,7 @@ SELECT citus.mitmproxy('conn.delay(500)'); (1 row) -INSERT INTO products VALUES (100, '100', 100); +INSERT INTO single_replicatated VALUES (100); ERROR: could not establish any connections to the node localhost:xxxxx after 400 ms COMMIT; SELECT @@ -211,14 +204,20 @@ FROM pg_dist_shard_placement WHERE shardstate = 3 AND - shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); + shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); invalid_placement_count --------------------------------------------------------------------- 0 (1 row) -- show that INSERT failed -SELECT count(*) FROM products WHERE product_no = 100; +SELECT citus.mitmproxy('conn.allow()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT count(*) FROM single_replicatated WHERE key = 100; count --------------------------------------------------------------------- 0 @@ -227,8 +226,6 @@ SELECT count(*) FROM products WHERE product_no = 100; RESET client_min_messages; -- verify get_global_active_transactions works when a timeout happens on a connection SELECT get_global_active_transactions(); -WARNING: could not establish connection after 400 ms -WARNING: connection to the remote node localhost:xxxxx failed get_global_active_transactions --------------------------------------------------------------------- (0 rows) @@ -314,6 +311,121 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); f (1 row) +-- tests for citus_check_cluster_node_health +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); + bool_and +--------------------------------------------------------------------- + f +(1 row) + +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + result | count +--------------------------------------------------------------------- + t | 2 + | 2 +(2 rows) + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | t + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | t + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | + localhost | 9060 | localhost | 57637 | + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9060 | localhost | 9060 | f + localhost | 9060 | localhost | 57637 | t + localhost | 57637 | localhost | 9060 | f + localhost | 57637 | localhost | 57637 | t +(4 rows) + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); + mitmproxy +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM citus_check_cluster_node_health(); +ERROR: canceling statement due to user request RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); mitmproxy diff --git a/src/test/regress/expected/failure_copy_on_hash.out b/src/test/regress/expected/failure_copy_on_hash.out index 749345392..9707d2d2e 100644 --- a/src/test/regress/expected/failure_copy_on_hash.out +++ b/src/test/regress/expected/failure_copy_on_hash.out @@ -282,7 +282,6 @@ SELECT citus.mitmproxy('conn.kill()'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. -CONTEXT: COPY test_table_2, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_copy_to_reference.out b/src/test/regress/expected/failure_copy_to_reference.out index a3cb3a2d8..a26e7290f 100644 --- a/src/test/regress/expected/failure_copy_to_reference.out +++ b/src/test/regress/expected/failure_copy_to_reference.out @@ -36,7 +36,6 @@ SELECT citus.mitmproxy('conn.kill()'); \copy test_table FROM STDIN DELIMITER ',' ERROR: failure on connection marked as essential: localhost:xxxxx -CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -64,7 +63,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R \copy test_table FROM STDIN DELIMITER ',' ERROR: failure on connection marked as essential: localhost:xxxxx -CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -92,7 +90,6 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R \copy test_table FROM STDIN DELIMITER ',' ERROR: canceling statement due to user request -CONTEXT: COPY test_table, line 1: "1,2" SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_create_index_concurrently.out b/src/test/regress/expected/failure_create_index_concurrently.out index 0a040c17c..1a0dc4dec 100644 --- a/src/test/regress/expected/failure_create_index_concurrently.out +++ b/src/test/regress/expected/failure_create_index_concurrently.out @@ -38,7 +38,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -- verify index is not created -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; nodename | nodeport | success | result --------------------------------------------------------------------- @@ -153,7 +153,7 @@ SELECT citus.mitmproxy('conn.allow()'); (1 row) -- verify index is not dropped at worker 2 -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; nodename | nodeport | success | result --------------------------------------------------------------------- @@ -186,7 +186,7 @@ NOTICE: drop cascades to 2 other objects DETAIL: drop cascades to table index_schema.index_test drop cascades to table index_schema.index_test_2 -- verify index is not at worker 2 upon cleanup -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; nodename | nodeport | success | result --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_ddl.out b/src/test/regress/expected/failure_ddl.out index 20082792a..9a30156c4 100644 --- a/src/test/regress/expected/failure_ddl.out +++ b/src/test/regress/expected/failure_ddl.out @@ -9,7 +9,7 @@ SET search_path TO 'ddl_failure'; -- do not cache any connections SET citus.max_cached_conns_per_worker TO 0; -- we don't want to see the prepared transaction numbers in the warnings -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -69,9 +69,12 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) ALTER TABLE test_table ADD COLUMN new_column INT; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly +WARNING: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. +connection not open +CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; array_agg --------------------------------------------------------------------- @@ -153,6 +156,9 @@ SELECT citus.mitmproxy('conn.onQuery(query="^COMMIT").cancel(' || pg_backend_pi (1 row) ALTER TABLE test_table ADD COLUMN new_column INT; +WARNING: +CONTEXT: while executing command on localhost:xxxxx +WARNING: failed to commit transaction on localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -365,9 +371,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) ALTER TABLE test_table DROP COLUMN new_column; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; array_agg --------------------------------------------------------------------- @@ -735,9 +739,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) ALTER TABLE test_table ADD COLUMN new_column INT; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; array_agg --------------------------------------------------------------------- @@ -1030,9 +1032,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) ALTER TABLE test_table ADD COLUMN new_column INT; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT array_agg(name::text ORDER BY name::text) FROM public.table_attrs where relid = 'test_table'::regclass; array_agg --------------------------------------------------------------------- diff --git a/src/test/regress/expected/failure_distributed_results.out b/src/test/regress/expected/failure_distributed_results.out index bc1cfca00..fa2fa5abc 100644 --- a/src/test/regress/expected/failure_distributed_results.out +++ b/src/test/regress/expected/failure_distributed_results.out @@ -86,7 +86,7 @@ CREATE TABLE distributed_result_info AS SELECT resultId, nodeport, rowcount, targetShardId, targetShardIndex FROM partition_task_list_results('test', $$ SELECT * FROM source_table $$, 'target_table') NATURAL JOIN pg_dist_node; -DEBUG: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly +WARNING: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly This probably means the server terminated abnormally before or while processing the request. SELECT * FROM distributed_result_info ORDER BY resultId; diff --git a/src/test/regress/expected/failure_mx_metadata_sync.out b/src/test/regress/expected/failure_mx_metadata_sync.out index 85180dcc1..f3856410a 100644 --- a/src/test/regress/expected/failure_mx_metadata_sync.out +++ b/src/test/regress/expected/failure_mx_metadata_sync.out @@ -21,7 +21,14 @@ SELECT create_distributed_table('t1', 'id'); (1 row) INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); --- Initial metadata status +-- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations +SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); +NOTICE: dropping metadata on the node (localhost,9060) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; hasmetadata --------------------------------------------------------------------- @@ -365,6 +372,13 @@ SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; f (1 row) +-- turn metadata sync back on +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + SET SEARCH_PATH = mx_metadata_sync; DROP TABLE t1; DROP TABLE t2; diff --git a/src/test/regress/expected/failure_truncate.out b/src/test/regress/expected/failure_truncate.out index 645ec0392..4d9c0d6d6 100644 --- a/src/test/regress/expected/failure_truncate.out +++ b/src/test/regress/expected/failure_truncate.out @@ -100,9 +100,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) TRUNCATE test_table; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -649,9 +647,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) TRUNCATE test_table; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- @@ -1027,9 +1023,7 @@ SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL R (1 row) TRUNCATE test_table; -ERROR: connection to the remote node localhost:xxxxx failed with the following error: server closed the connection unexpectedly - This probably means the server terminated abnormally - before or while processing the request. +ERROR: failure on connection marked as essential: localhost:xxxxx SELECT citus.mitmproxy('conn.allow()'); mitmproxy --------------------------------------------------------------------- diff --git a/src/test/regress/expected/follower_single_node.out b/src/test/regress/expected/follower_single_node.out index efc855824..7fbc40280 100644 --- a/src/test/regress/expected/follower_single_node.out +++ b/src/test/regress/expected/follower_single_node.out @@ -8,6 +8,7 @@ SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; SET citus.next_shard_id TO 93630500; SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? --------------------------------------------------------------------- 1 diff --git a/src/test/regress/expected/multi_citus_tools.out b/src/test/regress/expected/multi_citus_tools.out index 7c091a0ba..792839d87 100644 --- a/src/test/regress/expected/multi_citus_tools.out +++ b/src/test/regress/expected/multi_citus_tools.out @@ -629,6 +629,45 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); (1 row) \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t +(4 rows) + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 57637 | localhost | 123456789 | f + localhost | 57637 | www.citusdata.com | 5432 | f + localhost | 57638 | localhost | 123456789 | f + localhost | 57638 | www.citusdata.com | 5432 | f + localhost | 57637 | localhost | 57637 | t + localhost | 57637 | localhost | 57638 | t + localhost | 57638 | localhost | 57637 | t + localhost | 57638 | localhost | 57638 | t + localhost | 123456789 | localhost | 57637 | + localhost | 123456789 | localhost | 57638 | + localhost | 123456789 | localhost | 123456789 | + localhost | 123456789 | www.citusdata.com | 5432 | + www.citusdata.com | 5432 | localhost | 57637 | + www.citusdata.com | 5432 | localhost | 57638 | + www.citusdata.com | 5432 | localhost | 123456789 | + www.citusdata.com | 5432 | www.citusdata.com | 5432 | +(16 rows) + +ROLLBACK; +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 8b1135812..fe48f175d 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -493,6 +493,7 @@ SELECT * FROM multi_extension.print_extension_changes(); ALTER EXTENSION citus UPDATE TO '9.5-1'; BEGIN; SELECT master_add_node('localhost', :master_port, groupId=>0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata master_add_node --------------------------------------------------------------------- 1 @@ -972,10 +973,11 @@ SELECT * FROM multi_extension.print_extension_changes(); function master_append_table_to_shard(bigint,text,text,integer) real | function master_apply_delete_command(text) integer | function master_get_table_metadata(text) record | + | function citus_check_cluster_node_health() SETOF record | function citus_check_connection_to_node(text,integer) boolean | function citus_disable_node(text,integer,boolean) void | function citus_internal_add_object_metadata(text,text[],text[],integer,integer) void -(7 rows) +(8 rows) DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff; -- show running version @@ -1222,6 +1224,9 @@ HINT: You can manually create a database and its extensions on workers. \c - - - :master_port \c another CREATE EXTENSION citus; +\c - - - :worker_1_port +CREATE EXTENSION citus; +\c - - - :master_port SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node SELECT FROM master_add_node('localhost', :worker_1_port); WARNING: citus.enable_object_propagation is off, not creating distributed objects on worker @@ -1230,7 +1235,6 @@ DETAIL: distributed objects are only kept in sync when citus.enable_object_prop (1 row) \c - - - :worker_1_port -CREATE EXTENSION citus; ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) RENAME TO dummy_assign_function; \c - - - :master_port diff --git a/src/test/regress/expected/multi_follower_dml.out b/src/test/regress/expected/multi_follower_dml.out index 08e47e5c6..d1c714647 100644 --- a/src/test/regress/expected/multi_follower_dml.out +++ b/src/test/regress/expected/multi_follower_dml.out @@ -16,6 +16,7 @@ SELECT create_distributed_table('the_table', 'a'); (1 row) SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata ?column? --------------------------------------------------------------------- 1 @@ -281,11 +282,14 @@ DELETE FROM the_table; ERROR: cannot assign TransactionIds during recovery -- DDL is not possible TRUNCATE the_table; -ERROR: cannot execute TRUNCATE TABLE in a read-only transaction +ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress +HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. TRUNCATE reference_table; -ERROR: cannot execute TRUNCATE TABLE in a read-only transaction +ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress +HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. TRUNCATE citus_local_table; -ERROR: cannot execute TRUNCATE TABLE in a read-only transaction +ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress +HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. ALTER TABLE the_table ADD COLUMN c int; ERROR: cannot acquire lock mode AccessExclusiveLock on database objects while recovery is in progress HINT: Only RowExclusiveLock or less can be acquired on database objects during recovery. diff --git a/src/test/regress/expected/multi_follower_select_statements.out b/src/test/regress/expected/multi_follower_select_statements.out index 13e09a6ca..9f92db197 100644 --- a/src/test/regress/expected/multi_follower_select_statements.out +++ b/src/test/regress/expected/multi_follower_select_statements.out @@ -159,6 +159,16 @@ SELECT * FROM the_table; 1 | 2 (2 rows) +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + from_nodename | from_nodeport | to_nodename | to_nodeport | result +--------------------------------------------------------------------- + localhost | 9071 | localhost | 9071 | t + localhost | 9071 | localhost | 9072 | t + localhost | 9072 | localhost | 9071 | t + localhost | 9072 | localhost | 9072 | t +(4 rows) + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/expected/multi_index_statements.out b/src/test/regress/expected/multi_index_statements.out index 875eb3634..aab013acf 100644 --- a/src/test/regress/expected/multi_index_statements.out +++ b/src/test/regress/expected/multi_index_statements.out @@ -255,16 +255,11 @@ DROP INDEX lineitem_orderkey_index, lineitem_partial_index; ERROR: cannot drop multiple distributed objects in a single command HINT: Try dropping each object in a separate DROP command. -- Verify that we can succesfully drop indexes +DROP INDEX lineitem_orderkey_index; +DROP INDEX lineitem_orderkey_index_new; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; DROP INDEX lineitem_colref_index; --- Verify that we can drop distributed indexes with local indexes -CREATE TABLE local_table(a int, b int); -CREATE INDEX local_index ON local_table(a); -CREATE INDEX local_index2 ON local_table(b); -DROP INDEX lineitem_orderkey_index, local_index; -DROP INDEX IF EXISTS lineitem_orderkey_index_new, local_index2, non_existing_index; -NOTICE: index "non_existing_index" does not exist, skipping -- Verify that we handle if exists statements correctly DROP INDEX non_existent_index; ERROR: index "non_existent_index" does not exist @@ -301,14 +296,15 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; (5 rows) \c - - - :worker_1_port -SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; +SET citus.override_table_visibility TO FALSE; +SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem%\d' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; indrelid | indexrelid --------------------------------------------------------------------- lineitem_360000 | lineitem_l_orderkey_idx_360000 lineitem_360000 | lineitem_l_shipdate_idx_360000 (2 rows) -SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; +SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname; schemaname | tablename | indexname | tablespace | indexdef --------------------------------------------------------------------- multi_index_statements | index_test_hash_102082 | index_test_hash_a_idx_102082 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102082 ON multi_index_statements.index_test_hash_102082 USING btree (a) @@ -413,6 +409,10 @@ CREATE INDEX ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creat ON test_index_creation1 USING btree (tenant_id, timeperiod); NOTICE: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c" +DEBUG: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c" +DETAIL: from localhost:xxxxx +DEBUG: identifier "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1" will be truncated to "ix_test_index_creation1_ix_test_index_creation1_ix_test_index_c" +DETAIL: from localhost:xxxxx RESET client_min_messages; CREATE TABLE test_index_creation1_p2020_09_26 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-26 00:00:00') TO ('2020-09-27 00:00:00'); CREATE TABLE test_index_creation1_p2020_09_27 PARTITION OF test_index_creation1 FOR VALUES FROM ('2020-09-27 00:00:00') TO ('2020-09-28 00:00:00'); diff --git a/src/test/regress/expected/multi_mx_add_coordinator.out b/src/test/regress/expected/multi_mx_add_coordinator.out index 88ca84b9c..f08fc78b9 100644 --- a/src/test/regress/expected/multi_mx_add_coordinator.out +++ b/src/test/regress/expected/multi_mx_add_coordinator.out @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); @@ -16,12 +12,43 @@ SELECT run_command_on_workers('CREATE USER reprefuser WITH LOGIN'); SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 4 +(1 row) + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); ?column? --------------------------------------------------------------------- 1 (1 row) +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + bool_and | count +--------------------------------------------------------------------- + t | 9 +(1 row) + +\c - - - :master_port +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); wait_until_metadata_sync diff --git a/src/test/regress/expected/multi_replicate_reference_table.out b/src/test/regress/expected/multi_replicate_reference_table.out index e239a8c78..e77ce2df1 100644 --- a/src/test/regress/expected/multi_replicate_reference_table.out +++ b/src/test/regress/expected/multi_replicate_reference_table.out @@ -669,7 +669,7 @@ ORDER BY shardid, nodeport; (0 rows) -- verify constraints have been created on the new node -SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';'); +SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname similar to ''ref_table%\d'';'); run_command_on_workers --------------------------------------------------------------------- (localhost,57637,t,2) @@ -950,12 +950,6 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port); (1 row) SET citus.shard_replication_factor TO 1; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - SELECT master_copy_shard_placement( :ref_table_shard, 'localhost', :worker_1_port, @@ -1027,13 +1021,6 @@ WHERE ref_table.a = dist_table.a; \c - - - :master_port SET search_path TO replicate_reference_table; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -NOTICE: dropping metadata on the node (localhost,57637) - stop_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -- -- The following case used to get stuck on create_distributed_table() instead -- of detecting the distributed deadlock. diff --git a/src/test/regress/expected/multi_unsupported_worker_operations.out b/src/test/regress/expected/multi_unsupported_worker_operations.out index 2cb91b719..fc51e202e 100644 --- a/src/test/regress/expected/multi_unsupported_worker_operations.out +++ b/src/test/regress/expected/multi_unsupported_worker_operations.out @@ -45,12 +45,6 @@ ORDER BY logicalrelid; mx_table_2 | s | 150000 (2 rows) -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); INSERT INTO mx_ref_table VALUES (-37, 'morbi'); INSERT INTO mx_ref_table VALUES (-78, 'sapien'); @@ -232,7 +226,7 @@ HINT: Connect to the coordinator and run it again. SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_port; hasmetadata --------------------------------------------------------------------- - f + t (1 row) -- stop_metadata_sync_to_node @@ -351,20 +345,5 @@ HINT: Connect to the coordinator and run it again. \c - - - :master_port DROP TABLE mx_table; DROP TABLE mx_table_2; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -NOTICE: dropping metadata on the node (localhost,57637) - stop_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -\c - - - :worker_1_port -DELETE FROM pg_dist_node; -SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx\_%table%'; - worker_drop_distributed_table ---------------------------------------------------------------------- -(0 rows) - -\c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; RESET citus.shard_replication_factor; diff --git a/src/test/regress/expected/mx_regular_user.out b/src/test/regress/expected/mx_regular_user.out index e57bf8a99..2538929cc 100644 --- a/src/test/regress/expected/mx_regular_user.out +++ b/src/test/regress/expected/mx_regular_user.out @@ -7,19 +7,6 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); 1 (1 row) --- sync the metadata to both nodes -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -SELECT start_metadata_sync_to_node('localhost', :worker_2_port); - start_metadata_sync_to_node ---------------------------------------------------------------------- - -(1 row) - -- create a role and give access one each node separately -- and increase the error level to prevent enterprise to diverge SET client_min_messages TO ERROR; @@ -688,6 +675,19 @@ NOTICE: dropping metadata on the node (localhost,57638) (1 row) +-- finally sync metadata again so it doesn't break later tests +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + DROP SCHEMA "Mx Regular User" CASCADE; NOTICE: drop cascades to 10 other objects DETAIL: drop cascades to table "Mx Regular User".partitioned_table diff --git a/src/test/regress/expected/pg13_propagate_statistics.out b/src/test/regress/expected/pg13_propagate_statistics.out index b490e6c55..583a17d86 100644 --- a/src/test/regress/expected/pg13_propagate_statistics.out +++ b/src/test/regress/expected/pg13_propagate_statistics.out @@ -38,6 +38,7 @@ WHERE stxnamespace IN ( FROM pg_namespace WHERE nspname IN ('statistics''TestTarget') ) +AND stxname SIMILAR TO '%\_\d+' ORDER BY stxstattarget, stxrelid::regclass ASC; stxstattarget | stxrelid --------------------------------------------------------------------- diff --git a/src/test/regress/expected/pg14.out b/src/test/regress/expected/pg14.out index e5257c5a0..cbdd70e05 100644 --- a/src/test/regress/expected/pg14.out +++ b/src/test/regress/expected/pg14.out @@ -1221,5 +1221,54 @@ select * from nummultirange_test natural join nummultirange_test2 order by nmr; {[1.1,2.2)} (7 rows) +-- verify that recreating distributed procedures with OUT param gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; +-- this should error out +SELECT create_distributed_function('proc_with_out_param(date,int)'); +ERROR: function "proc_with_out_param(date,int)" does not exist +-- this should work +SELECT create_distributed_function('proc_with_out_param(date)'); + create_distributed_function +--------------------------------------------------------------------- + +(1 row) + +SET client_min_messages TO ERROR; +CREATE ROLE r1; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1; +SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1;$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + +RESET client_min_messages; +CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'proc_with_out_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'proc_with_out_param') + as test; + count +--------------------------------------------------------------------- + 1 +(1 row) + set client_min_messages to error; drop schema pg14 cascade; diff --git a/src/test/regress/expected/propagate_set_commands.out b/src/test/regress/expected/propagate_set_commands.out index 1f122c55d..4a3731f7d 100644 --- a/src/test/regress/expected/propagate_set_commands.out +++ b/src/test/regress/expected/propagate_set_commands.out @@ -19,9 +19,94 @@ SELECT current_setting('enable_hashagg') FROM test WHERE id = 1; on (1 row) --- should not be propagated, error should be coming from coordinator +-- triggers an error on the worker SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ; -ERROR: SET TRANSACTION ISOLATION LEVEL must be called before any query +WARNING: SET TRANSACTION ISOLATION LEVEL must be called before any query +CONTEXT: while executing command on localhost:xxxxx +ERROR: failure on connection marked as essential: localhost:xxxxx +END; +BEGIN; +SET TRANSACTION READ ONLY; +-- should fail after setting transaction to read only +INSERT INTO test VALUES (2,2); +ERROR: cannot execute INSERT in a read-only transaction +END; +BEGIN; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +-- should reflect new isolation level +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +BEGIN; +SET TRANSACTION READ ONLY; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +BEGIN; +SET LOCAL transaction_read_only TO on; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +BEGIN; +SET TRANSACTION READ ONLY; +SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + repeatable read +(1 row) + +END; +BEGIN; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SAVEPOINT goback; +SET TRANSACTION READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + on +(1 row) + +ROLLBACK TO SAVEPOINT goback; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; + current_setting +--------------------------------------------------------------------- + off +(1 row) + END; BEGIN; -- set session commands are not propagated diff --git a/src/test/regress/expected/propagate_statistics.out b/src/test/regress/expected/propagate_statistics.out index fd4f8e6e2..1068fa6d2 100644 --- a/src/test/regress/expected/propagate_statistics.out +++ b/src/test/regress/expected/propagate_statistics.out @@ -108,6 +108,7 @@ WHERE stxnamespace IN ( FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') ) +AND stxname SIMILAR TO '%\_\d+' ORDER BY stxname ASC; stxname --------------------------------------------------------------------- @@ -215,7 +216,8 @@ WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -); +) +AND stxname SIMILAR TO '%\_\d+'; count --------------------------------------------------------------------- 3 @@ -227,7 +229,8 @@ WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -); +) +AND stxname SIMILAR TO '%\_\d+'; count --------------------------------------------------------------------- 3 diff --git a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out index c7067451a..987c52bdc 100644 --- a/src/test/regress/expected/replicate_reference_tables_to_coordinator.out +++ b/src/test/regress/expected/replicate_reference_tables_to_coordinator.out @@ -506,6 +506,7 @@ SELECT master_remove_node('localhost', :master_port); SELECT master_add_node('localhost', :master_port, groupid => 0) AS master_nodeid \gset NOTICE: Replicating reference table "squares" to the node localhost:xxxxx NOTICE: Replicating reference table "numbers" to the node localhost:xxxxx +NOTICE: localhost:xxxxx is the coordinator and already contains metadata, skipping syncing the metadata -- clean-up SET client_min_messages TO ERROR; DROP SCHEMA replicate_ref_to_coordinator CASCADE; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index f6f48fdaf..75a55e05f 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -267,6 +267,68 @@ WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER 5 | 6 (5 rows) +-- show that we can filter remote commands +-- given that citus.grep_remote_commands, we log all commands +SET citus.log_local_commands to true; +SELECT count(*) FROM public.another_schema_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- grep matches all commands +SET citus.grep_remote_commands TO "%%"; +SELECT count(*) FROM public.another_schema_table WHERE a = 1; +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE (a OPERATOR(pg_catalog.=) 1) + count +--------------------------------------------------------------------- + 0 +(1 row) + +-- only filter a specific shard for the local execution +BEGIN; + SET LOCAL citus.grep_remote_commands TO "%90630515%"; + SELECT count(*) FROM public.another_schema_table; +NOTICE: executing the command locally: SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- match nothing + SET LOCAL citus.grep_remote_commands TO '%nothing%'; + SELECT count(*) FROM public.another_schema_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COMMIT; +-- only filter a specific shard for the remote execution +BEGIN; + SET LOCAL citus.enable_local_execution TO FALSE; + SET LOCAL citus.grep_remote_commands TO '%90630515%'; + SET LOCAL citus.log_remote_commands TO ON; + SELECT count(*) FROM public.another_schema_table; +NOTICE: issuing SELECT count(*) AS count FROM public.another_schema_table_90630515 another_schema_table WHERE true +DETAIL: on server postgres@localhost:xxxxx connectionId: xxxxxxx + count +--------------------------------------------------------------------- + 0 +(1 row) + + -- match nothing + SET LOCAL citus.grep_remote_commands TO '%nothing%'; + SELECT count(*) FROM public.another_schema_table; + count +--------------------------------------------------------------------- + 0 +(1 row) + +COMMIT; +RESET citus.log_local_commands; +RESET citus.grep_remote_commands; -- Test upsert with constraint CREATE TABLE upsert_test ( diff --git a/src/test/regress/expected/upgrade_list_citus_objects.out b/src/test/regress/expected/upgrade_list_citus_objects.out index 53992aa93..9ffef1f70 100644 --- a/src/test/regress/expected/upgrade_list_citus_objects.out +++ b/src/test/regress/expected/upgrade_list_citus_objects.out @@ -38,6 +38,7 @@ ORDER BY 1; function citus_add_rebalance_strategy(name,regproc,regproc,regproc,real,real,real) function citus_add_secondary_node(text,integer,text,integer,name) function citus_blocking_pids(integer) + function citus_check_cluster_node_health() function citus_check_connection_to_node(text,integer) function citus_cleanup_orphaned_shards() function citus_conninfo_cache_invalidate() @@ -261,5 +262,5 @@ ORDER BY 1; view citus_worker_stat_activity view pg_dist_shard_placement view time_partitions -(245 rows) +(246 rows) diff --git a/src/test/regress/failure_base_schedule b/src/test/regress/failure_base_schedule index b0c5a8d01..03ee96ff7 100644 --- a/src/test/regress/failure_base_schedule +++ b/src/test/regress/failure_base_schedule @@ -1,5 +1,4 @@ # import this file (from psql you can use \i) to use mitmproxy manually -test: turn_mx_off test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it diff --git a/src/test/regress/failure_schedule b/src/test/regress/failure_schedule index 17b4d0c14..550544a7f 100644 --- a/src/test/regress/failure_schedule +++ b/src/test/regress/failure_schedule @@ -1,5 +1,4 @@ # import this file (from psql you can use \i) to use mitmproxy manually -test: turn_mx_off test: failure_test_helpers # this should only be run by pg_regress_multi, you don't need it @@ -17,15 +16,18 @@ test: failure_add_disable_node test: failure_copy_to_reference test: failure_copy_on_hash test: failure_create_reference_table +test: check_mx +test: turn_mx_off test: failure_create_distributed_table_non_empty test: failure_create_table +test: failure_single_select +test: turn_mx_on test: failure_multi_shard_update_delete test: failure_cte_subquery test: failure_insert_select_via_coordinator test: failure_multi_dml test: failure_vacuum -test: failure_single_select test: failure_ref_tables test: failure_insert_select_pushdown test: failure_single_mod @@ -45,3 +47,4 @@ test: ensure_no_intermediate_data_leak # in the shared memory # -------- test: ensure_no_shared_connection_leak +test: check_mx diff --git a/src/test/regress/input/multi_copy.source b/src/test/regress/input/multi_copy.source index 19be89a25..dfd094943 100644 --- a/src/test/regress/input/multi_copy.source +++ b/src/test/regress/input/multi_copy.source @@ -572,20 +572,12 @@ SET session_replication_role = DEFAULT; -- disable test_user on the first worker \c - :default_user - :worker_1_port +SET citus.enable_object_propagation TO off; ALTER USER test_user WITH nologin; \c - test_user - :master_port -- reissue copy, and it should fail COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); -1,1 -2,2 -3,3 -4,4 -5,5 -6,6 -7,7 -8,8 -\. -- verify shards in the none of the workers as marked invalid SELECT shardid, shardstate, nodename, nodeport @@ -594,9 +586,6 @@ SELECT shardid, shardstate, nodename, nodeport -- try to insert into a reference table copy should fail COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); -3,1 -4,2 -\. -- verify shards for reference table are still valid SELECT shardid, shardstate, nodename, nodeport @@ -608,10 +597,6 @@ SELECT shardid, shardstate, nodename, nodeport -- since it can not insert into either copies of a shard. shards are expected to -- stay valid since the operation is rolled back. COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); -1,1 -2,2 -3,3 -\. -- verify shards for numbers_hash_other are still valid -- since copy has failed altogether @@ -621,6 +606,7 @@ SELECT shardid, shardstate, nodename, nodeport -- re-enable test_user on the first worker \c - :default_user - :worker_1_port +SET citus.enable_object_propagation TO off; ALTER USER test_user WITH login; \c - test_user - :master_port diff --git a/src/test/regress/minimal_schedule b/src/test/regress/minimal_schedule index 908b5f561..c5d15ff52 100644 --- a/src/test/regress/minimal_schedule +++ b/src/test/regress/minimal_schedule @@ -1,4 +1,3 @@ -test: turn_mx_off test: multi_cluster_management test: multi_test_helpers multi_test_helpers_superuser columnar_test_helpers test: multi_test_catalog_views diff --git a/src/test/regress/mixed_after_citus_upgrade_schedule b/src/test/regress/mixed_after_citus_upgrade_schedule index f7d21b6fd..223d7f349 100644 --- a/src/test/regress/mixed_after_citus_upgrade_schedule +++ b/src/test/regress/mixed_after_citus_upgrade_schedule @@ -1,3 +1,2 @@ -test: turn_mx_off test: upgrade_basic_after test: upgrade_pg_dist_object_test_after diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index ab8c8be6f..d7cf50e14 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -15,8 +15,8 @@ # --- # Tests around schema changes, these are run first, so there's no preexisting objects. # --- -test: turn_mx_off test: multi_extension +test: turn_mx_off test: single_node test: single_node_truncate test: turn_mx_on @@ -48,8 +48,6 @@ test: multi_read_from_secondaries # ---------- # multi_citus_tools tests utility functions written for citus tools # ---------- -test: check_mx -test: turn_mx_off test: multi_citus_tools # ---------- @@ -57,7 +55,6 @@ test: multi_citus_tools # multi_remove_node_reference_table tests metadata changes after master_remove_node # ---------- test: multi_replicate_reference_table -test: turn_mx_on test: multi_remove_node_reference_table # ---------- @@ -103,11 +100,8 @@ test: multi_create_fdw # ---------- # Tests for statistics propagation # ---------- -test: check_mx -test: turn_mx_off test: propagate_statistics test: pg13_propagate_statistics -test: turn_mx_on # ---------- # Test for updating table statistics @@ -153,6 +147,7 @@ test: with_executors with_join with_partitioning with_transactions with_dml # ---------- # Tests around DDL statements run on distributed tables # ---------- +test: multi_index_statements test: multi_alter_table_statements test: multi_alter_table_add_constraints @@ -196,11 +191,9 @@ test: multi_repartition_udt multi_repartitioned_subquery_udf multi_subtransactio test: multi_modifying_xacts test: check_mx test: turn_mx_off -test: multi_index_statements test: multi_generate_ddl_commands multi_repair_shards test: multi_create_shards test: multi_transaction_recovery -test: multi_copy test: turn_mx_on test: local_dist_join_modifications @@ -212,7 +205,7 @@ test: citus_local_dist_joins # multi_copy creates hash and range-partitioned tables and performs COPY # multi_router_planner creates hash partitioned tables. # --------- -test: fast_path_router_modify pg_dump +test: multi_copy fast_path_router_modify pg_dump test: multi_router_planner # These 2 tests have prepared statements which sometimes get invalidated by concurrent tests, # changing the debug output. We should not run them in parallel with others @@ -224,8 +217,6 @@ test: multi_router_planner_fast_path # ---------- test: multi_large_shardid -test: check_mx -test: turn_mx_off # ---------- # multi_size_queries tests various size commands on distributed tables # ---------- @@ -240,9 +231,11 @@ test: multi_drop_extension # multi_metadata_sync tests the propagation of mx-related metadata changes to metadata workers # multi_unsupported_worker_operations tests that unsupported operations error out on metadata workers # ---------- +test: check_mx +test: turn_mx_off test: multi_metadata_sync -test: multi_unsupported_worker_operations test: turn_mx_on +test: multi_unsupported_worker_operations # ---------- # grant_on_schema_propagation tests if the GRANT ... ON SCHEMA queries are propagated correctly @@ -287,10 +280,10 @@ test: multi_foreign_key_relation_graph # and rerun some of the tests. # -------- test: check_mx -test: turn_mx_off test: add_coordinator test: foreign_key_to_reference_table test: replicate_reference_tables_to_coordinator +test: turn_mx_off test: citus_local_tables test: multi_row_router_insert mixed_relkind_tests test: turn_mx_on @@ -303,11 +296,8 @@ test: undistribute_table_cascade test: create_citus_local_table_cascade test: fkeys_between_local_ref test: auto_undist_citus_local -test: check_mx -test: turn_mx_off test: mx_regular_user test: remove_coordinator -test: turn_mx_on # ---------- # multi_transactional_drop_shards tests for dropping shards using connection API diff --git a/src/test/regress/multi_follower_schedule b/src/test/regress/multi_follower_schedule index b46aba3d1..c1f0ac6cb 100644 --- a/src/test/regress/multi_follower_schedule +++ b/src/test/regress/multi_follower_schedule @@ -1,4 +1,3 @@ -test: turn_mx_off test: multi_follower_sanity_check test: follower_single_node test: multi_follower_select_statements @@ -7,3 +6,4 @@ test: multi_follower_configure_followers # test that no tests leaked intermediate results. This should always be last test: ensure_no_intermediate_data_leak +test: check_mx diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index cda1c0c53..08828af4b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -13,10 +13,8 @@ # --- # Tests around schema changes, these are run first, so there's no preexisting objects. # --- -test: turn_mx_off test: multi_extension test: multi_test_helpers multi_test_helpers_superuser -test: turn_mx_on test: multi_mx_node_metadata test: multi_cluster_management test: multi_mx_function_table_reference diff --git a/src/test/regress/mx_base_schedule b/src/test/regress/mx_base_schedule index 1c614c4b0..7cea6d739 100644 --- a/src/test/regress/mx_base_schedule +++ b/src/test/regress/mx_base_schedule @@ -1,7 +1,6 @@ # ---------- # Only run few basic tests to set up a testing environment # ---------- -test: turn_mx_off test: multi_cluster_management test: multi_test_helpers multi_test_helpers_superuser test: multi_test_catalog_views diff --git a/src/test/regress/mx_minimal_schedule b/src/test/regress/mx_minimal_schedule index 5b6a82a4a..c697f79dc 100644 --- a/src/test/regress/mx_minimal_schedule +++ b/src/test/regress/mx_minimal_schedule @@ -1,7 +1,6 @@ # ---------- # Only run few basic tests to set up a testing environment # ---------- -test: turn_mx_off test: multi_cluster_management test: multi_test_helpers multi_test_helpers_superuser test: multi_test_catalog_views diff --git a/src/test/regress/output/multi_copy.source b/src/test/regress/output/multi_copy.source index c521acc34..fb2a33d20 100644 --- a/src/test/regress/output/multi_copy.source +++ b/src/test/regress/output/multi_copy.source @@ -737,12 +737,12 @@ UPDATE pg_dist_shard_placement SET nodeport = :worker_1_port+10 WHERE shardid = SET session_replication_role = DEFAULT; -- disable test_user on the first worker \c - :default_user - :worker_1_port +SET citus.enable_object_propagation TO off; ALTER USER test_user WITH nologin; \c - test_user - :master_port -- reissue copy, and it should fail COPY numbers_hash FROM STDIN WITH (FORMAT 'csv'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash, line 1: "1,1" -- verify shards in the none of the workers as marked invalid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) @@ -762,7 +762,6 @@ SELECT shardid, shardstate, nodename, nodeport -- try to insert into a reference table copy should fail COPY numbers_reference FROM STDIN WITH (FORMAT 'csv'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_reference, line 1: "3,1" -- verify shards for reference table are still valid SELECT shardid, shardstate, nodename, nodeport FROM pg_dist_shard_placement join pg_dist_shard using(shardid) @@ -778,7 +777,6 @@ SELECT shardid, shardstate, nodename, nodeport -- stay valid since the operation is rolled back. COPY numbers_hash_other FROM STDIN WITH (FORMAT 'csv'); ERROR: connection to the remote node localhost:xxxxx failed with the following error: FATAL: role "test_user" is not permitted to log in -CONTEXT: COPY numbers_hash_other, line 1: "1,1" -- verify shards for numbers_hash_other are still valid -- since copy has failed altogether SELECT shardid, shardstate, nodename, nodeport @@ -798,6 +796,7 @@ SELECT shardid, shardstate, nodename, nodeport -- re-enable test_user on the first worker \c - :default_user - :worker_1_port +SET citus.enable_object_propagation TO off; ALTER USER test_user WITH login; \c - test_user - :master_port DROP TABLE numbers_hash; diff --git a/src/test/regress/sql/connectivity_checks.sql b/src/test/regress/sql/connectivity_checks.sql new file mode 100644 index 000000000..070cf8024 --- /dev/null +++ b/src/test/regress/sql/connectivity_checks.sql @@ -0,0 +1 @@ +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); diff --git a/src/test/regress/sql/distributed_functions.sql b/src/test/regress/sql/distributed_functions.sql index 8444344ee..c33ae5893 100644 --- a/src/test/regress/sql/distributed_functions.sql +++ b/src/test/regress/sql/distributed_functions.sql @@ -482,6 +482,204 @@ SELECT * FROM test ORDER BY id; DROP TABLE test; +-- verify that recreating distributed functions with TABLE params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-01'::date; +END; +$$; + +SELECT create_distributed_function('func_with_return_table(int)'); + +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-02'::date; +END; +$$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_return_table';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_return_table') + as test; + +-- verify that recreating distributed functions with OUT params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; + +SELECT create_distributed_function('func_with_out_param(int)'); + +SET client_min_messages TO ERROR; +CREATE ROLE r1; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$); +GRANT EXECUTE ON FUNCTION func_with_out_param TO r1; +SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON FUNCTION func_with_out_param TO r1;$$); +RESET client_min_messages; + +CREATE OR REPLACE FUNCTION func_with_out_param(a int, out b int) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'func_with_out_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'func_with_out_param') + as test; + +-- verify that recreating distributed functions with INOUT params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; + +-- this should error out +SELECT create_distributed_function('func_with_inout_param(int)'); +-- this should work +SELECT create_distributed_function('func_with_inout_param(int,int)'); + +CREATE OR REPLACE FUNCTION func_with_inout_param(a int, inout b int) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_inout_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_inout_param') + as test; + +-- verify that recreating distributed functions with VARIADIC params gets propagated to workers +CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[]) + RETURNS int +LANGUAGE sql AS $$ select 1; $$; + +-- this should work +SELECT create_distributed_function('func_with_variadic_param(int,int[])'); + +CREATE OR REPLACE FUNCTION func_with_variadic_param(a int, variadic b int[]) + RETURNS int +LANGUAGE sql AS $$ select 2; $$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_with_variadic_param') + as test; + +-- verify that recreating distributed functions returning setof records gets propagated to workers +CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 1; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; + +SELECT create_distributed_function('func_returning_setof_int(date,interval)'); + +CREATE OR REPLACE FUNCTION func_returning_setof_int(IN parm1 date, IN parm2 interval) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 2; + +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int') + as test; + +-- verify that recreating distributed functions with variadic param returning setof records gets propagated to workers +CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 1; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; + +SELECT create_distributed_function('func_returning_setof_int_with_variadic_param(date,int[])'); + +CREATE OR REPLACE FUNCTION func_returning_setof_int_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + RETURNS SETOF integer AS +$BODY$ +BEGIN + RETURN QUERY + SELECT 2; +END; +$BODY$ + LANGUAGE plpgsql VOLATILE + COST 100; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'func_returning_setof_int_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'func_returning_setof_int_with_variadic_param') + as test; + +-- verify that recreating distributed procedures with out params gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; + +-- this should error out +SELECT create_distributed_function('proc_with_variadic_param(date)'); +-- this should work +SELECT create_distributed_function('proc_with_variadic_param(date,int[])'); + +CREATE OR REPLACE PROCEDURE proc_with_variadic_param(IN parm1 date, VARIADIC parm2 int[]) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_variadic_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_variadic_param') + as test; + +-- verify that recreating distributed procedures with INOUT param gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; + +-- this should error out +SELECT create_distributed_function('proc_with_inout_param(date)'); +-- this should work +SELECT create_distributed_function('proc_with_inout_param(date,int)'); + +CREATE OR REPLACE PROCEDURE proc_with_inout_param(IN parm1 date, INOUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc) from pg_proc where proname = 'proc_with_inout_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc)::text from pg_proc where proname = 'proc_with_inout_param') + as test; + SET client_min_messages TO error; -- suppress cascading objects dropping DROP SCHEMA function_tests CASCADE; DROP SCHEMA function_tests2 CASCADE; diff --git a/src/test/regress/sql/distributed_functions_conflict.sql b/src/test/regress/sql/distributed_functions_conflict.sql index f4fdeddd0..ebbb6c6aa 100644 --- a/src/test/regress/sql/distributed_functions_conflict.sql +++ b/src/test/regress/sql/distributed_functions_conflict.sql @@ -115,6 +115,20 @@ $$ LANGUAGE plpgsql STRICT IMMUTABLE; SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)'); SELECT worker_create_or_replace_object('CREATE AGGREGATE proc_conflict.existing_agg(integer) (STYPE = integer,SFUNC = proc_conflict.existing_func2)'); +-- test worker_create_or_replace_object with a function that returns table +CREATE OR REPLACE FUNCTION func_with_return_table(int) +RETURNS TABLE (date date) +LANGUAGE plpgsql AS $$ +BEGIN + RETURN query SELECT '2011-01-01'::date; +END; +$$; + +SELECT worker_create_or_replace_object('CREATE OR REPLACE FUNCTION func_with_return_table(int) RETURNS TABLE (date date) LANGUAGE plpgsql AS $$ BEGIN RETURN query SELECT ''2011-01-01''::date; END; $$;'); + +-- verify that a backup function is created +SELECT COUNT(*)=2 FROM pg_proc WHERE proname LIKE 'func_with_return_table%'; + -- hide cascades SET client_min_messages TO error; DROP SCHEMA proc_conflict CASCADE; diff --git a/src/test/regress/sql/failure_connection_establishment.sql b/src/test/regress/sql/failure_connection_establishment.sql index 70812e56a..76e699132 100644 --- a/src/test/regress/sql/failure_connection_establishment.sql +++ b/src/test/regress/sql/failure_connection_establishment.sql @@ -80,10 +80,6 @@ SELECT citus.mitmproxy('conn.delay(500)'); SELECT count(*) FROM products; SELECT count(*) FROM products; --- use OFFSET 1 to prevent printing the line where source --- is the worker, and LIMIT 1 in case there were multiple connections -SELECT citus.dump_network_traffic() ORDER BY 1 LIMIT 1 OFFSET 1; - SELECT citus.mitmproxy('conn.allow()'); SET citus.shard_replication_factor TO 1; CREATE TABLE single_replicatated(key int); @@ -108,9 +104,9 @@ FROM pg_dist_shard_placement WHERE shardstate = 3 AND - shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); + shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); SELECT citus.mitmproxy('conn.delay(500)'); -INSERT INTO products VALUES (100, '100', 100); +INSERT INTO single_replicatated VALUES (100); COMMIT; SELECT count(*) as invalid_placement_count @@ -118,10 +114,11 @@ FROM pg_dist_shard_placement WHERE shardstate = 3 AND - shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'products'::regclass); + shardid IN (SELECT shardid from pg_dist_shard where logicalrelid = 'single_replicatated'::regclass); -- show that INSERT failed -SELECT count(*) FROM products WHERE product_no = 100; +SELECT citus.mitmproxy('conn.allow()'); +SELECT count(*) FROM single_replicatated WHERE key = 100; RESET client_min_messages; @@ -160,6 +157,45 @@ SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); SELECT citus.mitmproxy('conn.delay(500)'); SELECT * FROM citus_check_connection_to_node('localhost', :worker_2_proxy_port); +-- tests for citus_check_cluster_node_health + +-- kill all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- suggested summary queries for connectivity checks +SELECT bool_and(coalesce(result, false)) FROM citus_check_cluster_node_health(); +SELECT result, count(*) FROM citus_check_cluster_node_health() GROUP BY result ORDER BY 1; + +-- cancel all connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all but first connectivity checks that originate from this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT citus_check_connection_to_node").after(1).cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel all connections to this node +SELECT citus.mitmproxy('conn.onAuthenticationOk().cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + +-- kill connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").kill()'); +SELECT * FROM citus_check_cluster_node_health(); + +-- cancel connection checks to this node +SELECT citus.mitmproxy('conn.onQuery(query="^SELECT 1$").cancel(' || pg_backend_pid() || ')'); +SELECT * FROM citus_check_cluster_node_health(); + + RESET client_min_messages; SELECT citus.mitmproxy('conn.allow()'); SET citus.node_connection_timeout TO DEFAULT; diff --git a/src/test/regress/sql/failure_copy_on_hash.sql b/src/test/regress/sql/failure_copy_on_hash.sql index c9f98bc5c..d60242803 100644 --- a/src/test/regress/sql/failure_copy_on_hash.sql +++ b/src/test/regress/sql/failure_copy_on_hash.sql @@ -142,14 +142,6 @@ SELECT create_distributed_table('test_table_2','id'); SELECT citus.mitmproxy('conn.kill()'); \COPY test_table_2 FROM stdin delimiter ','; -1,2 -3,4 -6,7 -8,9 -9,10 -11,12 -13,14 -\. SELECT citus.mitmproxy('conn.allow()'); SELECT pds.logicalrelid, pdsd.shardid, pdsd.shardstate diff --git a/src/test/regress/sql/failure_copy_to_reference.sql b/src/test/regress/sql/failure_copy_to_reference.sql index 15b5037b9..b6c8d6c81 100644 --- a/src/test/regress/sql/failure_copy_to_reference.sql +++ b/src/test/regress/sql/failure_copy_to_reference.sql @@ -25,10 +25,6 @@ CREATE VIEW unhealthy_shard_count AS -- response we get from the worker SELECT citus.mitmproxy('conn.kill()'); \copy test_table FROM STDIN DELIMITER ',' -1,2 -2,3 -3,4 -\. SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; @@ -36,10 +32,6 @@ SELECT count(*) FROM test_table; -- kill as soon as the coordinator sends begin SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").kill()'); \copy test_table FROM STDIN DELIMITER ',' -1,2 -2,3 -3,4 -\. SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; @@ -47,10 +39,6 @@ SELECT count(*) FROM test_table; -- cancel as soon as the coordinator sends begin SELECT citus.mitmproxy('conn.onQuery(query="^BEGIN TRANSACTION ISOLATION LEVEL READ COMMITTED").cancel(' || pg_backend_pid() || ')'); \copy test_table FROM STDIN DELIMITER ',' -1,2 -2,3 -3,4 -\. SELECT citus.mitmproxy('conn.allow()'); SELECT * FROM unhealthy_shard_count; SELECT count(*) FROM test_table; diff --git a/src/test/regress/sql/failure_create_index_concurrently.sql b/src/test/regress/sql/failure_create_index_concurrently.sql index 4f624bd22..23b07b4b4 100644 --- a/src/test/regress/sql/failure_create_index_concurrently.sql +++ b/src/test/regress/sql/failure_create_index_concurrently.sql @@ -20,7 +20,7 @@ CREATE INDEX CONCURRENTLY idx_index_test ON index_test(id, value_1); SELECT citus.mitmproxy('conn.allow()'); -- verify index is not created -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; @@ -78,7 +78,7 @@ DROP INDEX CONCURRENTLY IF EXISTS idx_index_test; SELECT citus.mitmproxy('conn.allow()'); -- verify index is not dropped at worker 2 -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; -- test unique concurrent index creation failure when there are duplicates @@ -97,5 +97,5 @@ RESET SEARCH_PATH; DROP SCHEMA index_schema CASCADE; -- verify index is not at worker 2 upon cleanup -SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test%' $$) +SELECT * FROM run_command_on_workers($$SELECT count(*) FROM pg_indexes WHERE indexname LIKE 'idx_index_test_%' $$) WHERE nodeport = :worker_2_proxy_port; diff --git a/src/test/regress/sql/failure_ddl.sql b/src/test/regress/sql/failure_ddl.sql index 2618771ab..a13d563e0 100644 --- a/src/test/regress/sql/failure_ddl.sql +++ b/src/test/regress/sql/failure_ddl.sql @@ -14,7 +14,7 @@ SET search_path TO 'ddl_failure'; SET citus.max_cached_conns_per_worker TO 0; -- we don't want to see the prepared transaction numbers in the warnings -SET client_min_messages TO ERROR; +SET client_min_messages TO WARNING; SELECT citus.mitmproxy('conn.allow()'); diff --git a/src/test/regress/sql/failure_mx_metadata_sync.sql b/src/test/regress/sql/failure_mx_metadata_sync.sql index 9c7db9a68..171d49a1a 100644 --- a/src/test/regress/sql/failure_mx_metadata_sync.sql +++ b/src/test/regress/sql/failure_mx_metadata_sync.sql @@ -14,7 +14,8 @@ CREATE TABLE t1 (id int PRIMARY KEY); SELECT create_distributed_table('t1', 'id'); INSERT INTO t1 SELECT x FROM generate_series(1,100) AS f(x); --- Initial metadata status +-- Initially turn metadata sync off because we'll ingest errors to start/stop metadata sync operations +SELECT stop_metadata_sync_to_node('localhost', :worker_2_proxy_port); SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; -- Failure to set groupid in the worker @@ -95,6 +96,10 @@ SELECT count(*) FROM pg_dist_node; \c - - - :master_port SELECT hasmetadata FROM pg_dist_node WHERE nodeport=:worker_2_proxy_port; + +-- turn metadata sync back on +SELECT start_metadata_sync_to_node('localhost', :worker_2_proxy_port); + SET SEARCH_PATH = mx_metadata_sync; DROP TABLE t1; DROP TABLE t2; diff --git a/src/test/regress/sql/multi_citus_tools.sql b/src/test/regress/sql/multi_citus_tools.sql index 6db60c8d4..76a4a93b1 100644 --- a/src/test/regress/sql/multi_citus_tools.sql +++ b/src/test/regress/sql/multi_citus_tools.sql @@ -319,6 +319,20 @@ SELECT citus_check_connection_to_node('localhost', :worker_2_port); \c - - - :master_port +SELECT * FROM citus_check_cluster_node_health() ORDER BY 1,2,3,4; + +-- test cluster connectivity when we have broken nodes +SET client_min_messages TO ERROR; +SET citus.node_connection_timeout TO 10; + +BEGIN; +INSERT INTO pg_dist_node VALUES + (123456789, 123456789, 'localhost', 123456789), + (1234567890, 1234567890, 'www.citusdata.com', 5432); +SELECT * FROM citus_check_cluster_node_health() ORDER BY 5,1,2,3,4; +ROLLBACK; + +RESET citus.node_connection_timeout; RESET client_min_messages; DROP SCHEMA tools CASCADE; RESET SEARCH_PATH; diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index 01702c793..d237bf5a1 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -634,11 +634,14 @@ CREATE DATABASE another; \c another CREATE EXTENSION citus; +\c - - - :worker_1_port +CREATE EXTENSION citus; +\c - - - :master_port + SET citus.enable_object_propagation TO off; -- prevent distributed transactions during add node SELECT FROM master_add_node('localhost', :worker_1_port); \c - - - :worker_1_port -CREATE EXTENSION citus; ALTER FUNCTION assign_distributed_transaction_id(initiator_node_identifier integer, transaction_number bigint, transaction_stamp timestamp with time zone) RENAME TO dummy_assign_function; diff --git a/src/test/regress/sql/multi_follower_select_statements.sql b/src/test/regress/sql/multi_follower_select_statements.sql index f0e7bd404..edaccc869 100644 --- a/src/test/regress/sql/multi_follower_select_statements.sql +++ b/src/test/regress/sql/multi_follower_select_statements.sql @@ -111,6 +111,9 @@ UPDATE pg_dist_node SET nodecluster = 'second-cluster' WHERE noderole = 'seconda \c "port=9070 dbname=regression options='-c\ citus.use_secondary_nodes=always\ -c\ citus.cluster_name=second-cluster'" SELECT * FROM the_table; +-- Check for connectivity in the cluster +SELECT * FROM citus_check_cluster_node_health(); + -- clean up after ourselves \c -reuse-previous=off regression - - :master_port DROP TABLE the_table; diff --git a/src/test/regress/sql/multi_index_statements.sql b/src/test/regress/sql/multi_index_statements.sql index e6ae776a6..6c22a8403 100644 --- a/src/test/regress/sql/multi_index_statements.sql +++ b/src/test/regress/sql/multi_index_statements.sql @@ -159,17 +159,12 @@ REINDEX SYSTEM regression; DROP INDEX lineitem_orderkey_index, lineitem_partial_index; -- Verify that we can succesfully drop indexes +DROP INDEX lineitem_orderkey_index; +DROP INDEX lineitem_orderkey_index_new; DROP INDEX lineitem_partkey_desc_index; DROP INDEX lineitem_partial_index; DROP INDEX lineitem_colref_index; --- Verify that we can drop distributed indexes with local indexes -CREATE TABLE local_table(a int, b int); -CREATE INDEX local_index ON local_table(a); -CREATE INDEX local_index2 ON local_table(b); -DROP INDEX lineitem_orderkey_index, local_index; -DROP INDEX IF EXISTS lineitem_orderkey_index_new, local_index2, non_existing_index; - -- Verify that we handle if exists statements correctly DROP INDEX non_existent_index; @@ -193,8 +188,9 @@ DROP INDEX CONCURRENTLY lineitem_concurrently_index; SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; \c - - - :worker_1_port -SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; -SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname; +SET citus.override_table_visibility TO FALSE; +SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname SIMILAR TO 'lineitem%\d' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2; +SELECT * FROM pg_indexes WHERE tablename SIMILAR TO 'index_test_%\d' ORDER BY indexname; -- create index that will conflict with master operations CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON multi_index_statements.index_test_hash_102089(b); diff --git a/src/test/regress/sql/multi_mx_add_coordinator.sql b/src/test/regress/sql/multi_mx_add_coordinator.sql index 86e17b3db..c318e78aa 100644 --- a/src/test/regress/sql/multi_mx_add_coordinator.sql +++ b/src/test/regress/sql/multi_mx_add_coordinator.sql @@ -1,9 +1,5 @@ CREATE SCHEMA mx_add_coordinator; SET search_path TO mx_add_coordinator,public; -SET citus.shard_replication_factor TO 1; -SET citus.shard_count TO 8; -SET citus.next_shard_id TO 7000000; -SET citus.next_placement_id TO 7000000; SET client_min_messages TO WARNING; CREATE USER reprefuser WITH LOGIN; @@ -12,8 +8,29 @@ SET citus.enable_alter_role_propagation TO ON; -- alter role for other than the extension owner works in enterprise, output differs accordingly ALTER ROLE reprefuser WITH CREATEDB; +-- check connectivity in the cluster +-- verify that we test for 4 node pairs before we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0); +-- verify that we test for 9 node pairs after we add coordinator to metadata +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +-- test that we can test for connectivity when connected to worker nodes as well +\c - - - :worker_1_port +SELECT bool_and(coalesce(result, false)), count(*) FROM citus_check_cluster_node_health(); + +\c - - - :master_port + +-- set the configs after reconnecting to coordinator +SET search_path TO mx_add_coordinator,public; +SET citus.shard_replication_factor TO 1; +SET citus.shard_count TO 8; +SET citus.next_shard_id TO 7000000; +SET citus.next_placement_id TO 7000000; +SET client_min_messages TO WARNING; + -- test that coordinator pg_dist_node entry is synced to the workers SELECT wait_until_metadata_sync(30000); diff --git a/src/test/regress/sql/multi_replicate_reference_table.sql b/src/test/regress/sql/multi_replicate_reference_table.sql index 497f38619..121e35c0f 100644 --- a/src/test/regress/sql/multi_replicate_reference_table.sql +++ b/src/test/regress/sql/multi_replicate_reference_table.sql @@ -451,7 +451,7 @@ WHERE ORDER BY shardid, nodeport; -- verify constraints have been created on the new node -SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname like ''ref_table%'';'); +SELECT run_command_on_workers('select count(*) from pg_constraint where contype=''f'' AND conname similar to ''ref_table%\d'';'); DROP TABLE ref_table_1, ref_table_2, ref_table_3; @@ -591,7 +591,6 @@ SELECT 1 FROM master_remove_node('localhost', :worker_2_port); SELECT 1 FROM master_add_node('localhost', :worker_2_port); SET citus.shard_replication_factor TO 1; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT master_copy_shard_placement( :ref_table_shard, @@ -637,8 +636,6 @@ WHERE ref_table.a = dist_table.a; SET search_path TO replicate_reference_table; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); - -- -- The following case used to get stuck on create_distributed_table() instead -- of detecting the distributed deadlock. diff --git a/src/test/regress/sql/multi_unsupported_worker_operations.sql b/src/test/regress/sql/multi_unsupported_worker_operations.sql index 162268108..b9f4ee337 100644 --- a/src/test/regress/sql/multi_unsupported_worker_operations.sql +++ b/src/test/regress/sql/multi_unsupported_worker_operations.sql @@ -33,8 +33,6 @@ FROM pg_dist_partition WHERE logicalrelid IN ('mx_table'::regclass, 'mx_table_2'::regclass) ORDER BY logicalrelid; -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); - COPY mx_table (col_1, col_2) FROM STDIN WITH (FORMAT 'csv'); -37, 'lorem' 65536, 'ipsum' @@ -219,11 +217,6 @@ DROP SEQUENCE mx_table_col_3_seq CASCADE; \c - - - :master_port DROP TABLE mx_table; DROP TABLE mx_table_2; -SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); -\c - - - :worker_1_port -DELETE FROM pg_dist_node; -SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition WHERE logicalrelid::text LIKE 'mx\_%table%'; -\c - - - :master_port ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART :last_colocation_id; RESET citus.shard_replication_factor; diff --git a/src/test/regress/sql/mx_regular_user.sql b/src/test/regress/sql/mx_regular_user.sql index ed8c50b07..1c52ebc80 100644 --- a/src/test/regress/sql/mx_regular_user.sql +++ b/src/test/regress/sql/mx_regular_user.sql @@ -3,9 +3,6 @@ SET search_path TO "Mx Regular User"; -- add coordinator in idempotent way SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); --- sync the metadata to both nodes -SELECT start_metadata_sync_to_node('localhost', :worker_1_port); -SELECT start_metadata_sync_to_node('localhost', :worker_2_port); -- create a role and give access one each node separately -- and increase the error level to prevent enterprise to diverge @@ -355,4 +352,8 @@ SELECT start_metadata_sync_to_node('localhost', :worker_2_port); SELECT stop_metadata_sync_to_node('localhost', :worker_1_port); SELECT stop_metadata_sync_to_node('localhost', :worker_2_port); +-- finally sync metadata again so it doesn't break later tests +SELECT start_metadata_sync_to_node('localhost', :worker_1_port); +SELECT start_metadata_sync_to_node('localhost', :worker_2_port); + DROP SCHEMA "Mx Regular User" CASCADE; diff --git a/src/test/regress/sql/pg13_propagate_statistics.sql b/src/test/regress/sql/pg13_propagate_statistics.sql index f32cbcb77..e47b111fb 100644 --- a/src/test/regress/sql/pg13_propagate_statistics.sql +++ b/src/test/regress/sql/pg13_propagate_statistics.sql @@ -38,6 +38,7 @@ WHERE stxnamespace IN ( FROM pg_namespace WHERE nspname IN ('statistics''TestTarget') ) +AND stxname SIMILAR TO '%\_\d+' ORDER BY stxstattarget, stxrelid::regclass ASC; \c - - - :master_port diff --git a/src/test/regress/sql/pg14.sql b/src/test/regress/sql/pg14.sql index 7998d33f1..8bc422f5d 100644 --- a/src/test/regress/sql/pg14.sql +++ b/src/test/regress/sql/pg14.sql @@ -609,5 +609,36 @@ set enable_hashjoin=f; set enable_mergejoin=f; select * from nummultirange_test natural join nummultirange_test2 order by nmr; +-- verify that recreating distributed procedures with OUT param gets propagated to workers +CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 1; +$$; + +-- this should error out +SELECT create_distributed_function('proc_with_out_param(date,int)'); +-- this should work +SELECT create_distributed_function('proc_with_out_param(date)'); + +SET client_min_messages TO ERROR; +CREATE ROLE r1; +SELECT 1 FROM run_command_on_workers($$CREATE ROLE r1;$$); +GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1; +SELECT 1 FROM run_command_on_workers($$GRANT EXECUTE ON PROCEDURE proc_with_out_param TO r1;$$); +RESET client_min_messages; + +CREATE OR REPLACE PROCEDURE proc_with_out_param(IN parm1 date, OUT parm2 int) + LANGUAGE SQL +AS $$ + SELECT 2; +$$; + +SELECT count(*) FROM + (SELECT result FROM + run_command_on_workers($$select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner) from pg_proc where proname = 'proc_with_out_param';$$) + UNION select row(pg_proc.pronargs, pg_proc.proargtypes, pg_proc.prosrc, pg_proc.proowner)::text from pg_proc where proname = 'proc_with_out_param') + as test; + set client_min_messages to error; drop schema pg14 cascade; diff --git a/src/test/regress/sql/propagate_set_commands.sql b/src/test/regress/sql/propagate_set_commands.sql index 3c7bef3b0..68b13547b 100644 --- a/src/test/regress/sql/propagate_set_commands.sql +++ b/src/test/regress/sql/propagate_set_commands.sql @@ -13,10 +13,53 @@ SET citus.select_opens_transaction_block TO on; BEGIN; SELECT current_setting('enable_hashagg') FROM test WHERE id = 1; --- should not be propagated, error should be coming from coordinator +-- triggers an error on the worker SET LOCAL TRANSACTION ISOLATION LEVEL REPEATABLE READ; END; +BEGIN; +SET TRANSACTION READ ONLY; +-- should fail after setting transaction to read only +INSERT INTO test VALUES (2,2); +END; + +BEGIN; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +-- should reflect new isolation level +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN; +SET TRANSACTION READ ONLY; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN; +SET LOCAL transaction_read_only TO on; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN; +SET TRANSACTION READ ONLY; +SET TRANSACTION ISOLATION LEVEL SERIALIZABLE; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +SELECT current_setting('transaction_isolation') FROM test WHERE id = 1; +END; + +BEGIN; +SET TRANSACTION ISOLATION LEVEL REPEATABLE READ; +SAVEPOINT goback; +SET TRANSACTION READ ONLY; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +ROLLBACK TO SAVEPOINT goback; +SELECT current_setting('transaction_read_only') FROM test WHERE id = 1; +END; + BEGIN; -- set session commands are not propagated SET enable_hashagg TO false; diff --git a/src/test/regress/sql/propagate_statistics.sql b/src/test/regress/sql/propagate_statistics.sql index 898387eaa..a66a108c6 100644 --- a/src/test/regress/sql/propagate_statistics.sql +++ b/src/test/regress/sql/propagate_statistics.sql @@ -89,6 +89,7 @@ WHERE stxnamespace IN ( FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') ) +AND stxname SIMILAR TO '%\_\d+' ORDER BY stxname ASC; SELECT count(DISTINCT stxnamespace) @@ -97,7 +98,8 @@ WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -); +) +AND stxname SIMILAR TO '%\_\d+'; SELECT COUNT(DISTINCT stxowner) FROM pg_statistic_ext @@ -105,7 +107,8 @@ WHERE stxnamespace IN ( SELECT oid FROM pg_namespace WHERE nspname IN ('public', 'statistics''Test', 'sc1', 'sc2') -); +) +AND stxname SIMILAR TO '%\_\d+'; \c - - - :master_port SET client_min_messages TO WARNING; diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 772434599..0305cbd48 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -110,6 +110,38 @@ SELECT * FROM test ORDER BY x; UPDATE test SET y = y + 1 RETURNING *; WITH cte_1 AS (UPDATE test SET y = y - 1 RETURNING *) SELECT * FROM cte_1 ORDER BY 1,2; +-- show that we can filter remote commands +-- given that citus.grep_remote_commands, we log all commands +SET citus.log_local_commands to true; +SELECT count(*) FROM public.another_schema_table WHERE a = 1; + +-- grep matches all commands +SET citus.grep_remote_commands TO "%%"; +SELECT count(*) FROM public.another_schema_table WHERE a = 1; + +-- only filter a specific shard for the local execution +BEGIN; + SET LOCAL citus.grep_remote_commands TO "%90630515%"; + SELECT count(*) FROM public.another_schema_table; + -- match nothing + SET LOCAL citus.grep_remote_commands TO '%nothing%'; + SELECT count(*) FROM public.another_schema_table; +COMMIT; + +-- only filter a specific shard for the remote execution +BEGIN; + SET LOCAL citus.enable_local_execution TO FALSE; + SET LOCAL citus.grep_remote_commands TO '%90630515%'; + SET LOCAL citus.log_remote_commands TO ON; + SELECT count(*) FROM public.another_schema_table; + -- match nothing + SET LOCAL citus.grep_remote_commands TO '%nothing%'; + SELECT count(*) FROM public.another_schema_table; +COMMIT; + +RESET citus.log_local_commands; +RESET citus.grep_remote_commands; + -- Test upsert with constraint CREATE TABLE upsert_test ( @@ -1023,6 +1055,8 @@ ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; ALTER SYSTEM RESET citus.local_shared_pool_size; SELECT pg_reload_conf(); + + -- suppress notices SET client_min_messages TO error; diff --git a/src/test/regress/sql_schedule b/src/test/regress/sql_schedule index 40395bcbf..a8073825a 100644 --- a/src/test/regress/sql_schedule +++ b/src/test/regress/sql_schedule @@ -5,3 +5,4 @@ test: ch_benchmarks_4 ch_benchmarks_5 ch_benchmarks_6 test: intermediate_result_pruning_queries_1 intermediate_result_pruning_queries_2 test: dropped_columns_1 distributed_planning test: local_dist_join +test: connectivity_checks