Merge branch 'main' into grant_role_2pc

grant_database_2pc_onur_1
Gürkan İndibay 2024-01-18 13:04:29 +03:00 committed by GitHub
commit 4d9828a67c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
27 changed files with 225 additions and 120 deletions

View File

@ -471,14 +471,30 @@ jobs:
run: |- run: |-
detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*\.sql\|src/test/regress/spec/.*\.spec\|src/test/regress/citus_tests/test/test_.*\.py' || true)) detected_changes=$(git diff origin/main... --name-only --diff-filter=AM | (grep 'src/test/regress/sql/.*\.sql\|src/test/regress/spec/.*\.spec\|src/test/regress/citus_tests/test/test_.*\.py' || true))
tests=${detected_changes} tests=${detected_changes}
if [ -z "$tests" ]; then
echo "No test found." # split the tests to be skipped --today we only skip upgrade tests
skipped_tests=""
not_skipped_tests=""
for test in $tests; do
if [[ $test =~ ^src/test/regress/sql/upgrade_ ]]; then
skipped_tests="$skipped_tests $test"
else
not_skipped_tests="$not_skipped_tests $test"
fi
done
if [ ! -z "$skipped_tests" ]; then
echo "Skipped tests " $skipped_tests
fi
if [ -z "$not_skipped_tests" ]; then
echo "Not detected any tests that flaky test detection should run"
else else
echo "Detected tests " $tests echo "Detected tests " $not_skipped_tests
fi fi
echo 'tests<<EOF' >> $GITHUB_OUTPUT echo 'tests<<EOF' >> $GITHUB_OUTPUT
echo "$tests" >> "$GITHUB_OUTPUT" echo "$not_skipped_tests" >> "$GITHUB_OUTPUT"
echo 'EOF' >> $GITHUB_OUTPUT echo 'EOF' >> $GITHUB_OUTPUT
test-flakyness: test-flakyness:
if: ${{ needs.test-flakyness-pre.outputs.tests != ''}} if: ${{ needs.test-flakyness-pre.outputs.tests != ''}}

View File

@ -401,7 +401,7 @@ typedef struct WorkerPool
/* /*
* Placement executions destined for worker node, but not assigned to any * Placement executions destined for worker node, but not assigned to any
* connection and not ready to start. * connection and ready to start.
*/ */
dlist_head readyTaskQueue; dlist_head readyTaskQueue;
int readyTaskCount; int readyTaskCount;
@ -492,8 +492,6 @@ typedef struct WorkerSession
} WorkerSession; } WorkerSession;
struct TaskPlacementExecution;
/* GUC, determining whether Citus opens 1 connection per task */ /* GUC, determining whether Citus opens 1 connection per task */
bool ForceMaxQueryParallelization = false; bool ForceMaxQueryParallelization = false;
int MaxAdaptiveExecutorPoolSize = 16; int MaxAdaptiveExecutorPoolSize = 16;
@ -585,7 +583,7 @@ typedef enum TaskPlacementExecutionState
} TaskPlacementExecutionState; } TaskPlacementExecutionState;
/* /*
* TaskPlacementExecution represents the an execution of a command * TaskPlacementExecution represents the execution of a command
* on a shard placement. * on a shard placement.
*/ */
typedef struct TaskPlacementExecution typedef struct TaskPlacementExecution
@ -1908,7 +1906,7 @@ RunDistributedExecution(DistributedExecution *execution)
/* /*
* Iterate until all the tasks are finished. Once all the tasks * Iterate until all the tasks are finished. Once all the tasks
* are finished, ensure that that all the connection initializations * are finished, ensure that all the connection initializations
* are also finished. Otherwise, those connections are terminated * are also finished. Otherwise, those connections are terminated
* abruptly before they are established (or failed). Instead, we let * abruptly before they are established (or failed). Instead, we let
* the ConnectionStateMachine() to properly handle them. * the ConnectionStateMachine() to properly handle them.
@ -3118,7 +3116,7 @@ ConnectionStateMachine(WorkerSession *session)
* *
* We can only retry connection when the remote transaction has * We can only retry connection when the remote transaction has
* not started over the connection. Otherwise, we'd have to deal * not started over the connection. Otherwise, we'd have to deal
* with restoring the transaction state, which iis beyond our * with restoring the transaction state, which is beyond our
* purpose at this time. * purpose at this time.
*/ */
RemoteTransaction *transaction = &connection->remoteTransaction; RemoteTransaction *transaction = &connection->remoteTransaction;

View File

@ -168,7 +168,7 @@ CitusExecutorRun(QueryDesc *queryDesc,
executorBoundParams = queryDesc->params; executorBoundParams = queryDesc->params;
/* /*
* We do some potentially time consuming operations our self now before we hand of * We do some potentially time consuming operations ourself now before we hand off
* control to postgres' executor. To make sure that time spent is accurately measured * control to postgres' executor. To make sure that time spent is accurately measured
* we remove the totaltime instrumentation from the queryDesc. Instead we will start * we remove the totaltime instrumentation from the queryDesc. Instead we will start
* and stop the instrumentation of the total time and put it back on the queryDesc * and stop the instrumentation of the total time and put it back on the queryDesc

View File

@ -31,6 +31,7 @@
#include "nodes/makefuncs.h" #include "nodes/makefuncs.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "parser/parse_type.h" #include "parser/parse_type.h"
#include "postmaster/postmaster.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
@ -80,7 +81,14 @@ mark_object_distributed(PG_FUNCTION_ARGS)
Oid objectId = PG_GETARG_OID(2); Oid objectId = PG_GETARG_OID(2);
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress)); ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
ObjectAddressSet(*objectAddress, classId, objectId); ObjectAddressSet(*objectAddress, classId, objectId);
MarkObjectDistributedWithName(objectAddress, objectName);
/*
* This function is called when a query is run from a Citus non-main database.
* We need to insert into local pg_dist_object over a connection to make sure
* 2PC still works.
*/
bool useConnectionForLocalQuery = true;
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }
@ -184,7 +192,8 @@ ObjectExists(const ObjectAddress *address)
void void
MarkObjectDistributed(const ObjectAddress *distAddress) MarkObjectDistributed(const ObjectAddress *distAddress)
{ {
MarkObjectDistributedWithName(distAddress, ""); bool useConnectionForLocalQuery = false;
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery);
} }
@ -194,13 +203,32 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
* that is used in case the object does not exists for the current transaction. * that is used in case the object does not exists for the current transaction.
*/ */
void void
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName) MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName,
bool useConnectionForLocalQuery)
{ {
if (!CitusHasBeenLoaded()) if (!CitusHasBeenLoaded())
{ {
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded."); elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
} }
MarkObjectDistributedLocally(distAddress);
/*
* When a query is run from a Citus non-main database we need to insert into pg_dist_object
* over a connection to make sure 2PC still works.
*/
if (useConnectionForLocalQuery)
{
StringInfo insertQuery = makeStringInfo();
appendStringInfo(insertQuery,
"INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid)"
"VALUES (%d, %d, %d) ON CONFLICT DO NOTHING",
distAddress->classId, distAddress->objectId,
distAddress->objectSubId);
SendCommandToWorker(LocalHostName, PostPortNumber, insertQuery->data);
}
else
{
MarkObjectDistributedLocally(distAddress);
}
if (EnableMetadataSync) if (EnableMetadataSync)
{ {

View File

@ -1751,6 +1751,10 @@ citus_internal_mark_node_not_synced(PG_FUNCTION_ARGS)
/* /*
* FindWorkerNode searches over the worker nodes and returns the workerNode * FindWorkerNode searches over the worker nodes and returns the workerNode
* if it already exists. Else, the function returns NULL. * if it already exists. Else, the function returns NULL.
*
* NOTE: A special case that this handles is when nodeName and nodePort are set
* to LocalHostName and PostPortNumber. In that case we return the primary node
* for the local group.
*/ */
WorkerNode * WorkerNode *
FindWorkerNode(const char *nodeName, int32 nodePort) FindWorkerNode(const char *nodeName, int32 nodePort)
@ -1773,6 +1777,11 @@ FindWorkerNode(const char *nodeName, int32 nodePort)
return workerNode; return workerNode;
} }
if (strcmp(LocalHostName, nodeName) == 0 && nodePort == PostPortNumber)
{
return PrimaryNodeForGroup(GetLocalGroupId(), NULL);
}
return NULL; return NULL;
} }

View File

@ -252,7 +252,7 @@ FastPathRouterQuery(Query *query, Node **distributionKeyValue)
/* /*
* Distribution column must be used in a simple equality match check and it must be * Distribution column must be used in a simple equality match check and it must be
* place at top level conjustion operator. In simple words, we should have * place at top level conjunction operator. In simple words, we should have
* WHERE dist_key = VALUE [AND ....]; * WHERE dist_key = VALUE [AND ....];
* *
* We're also not allowing any other appearances of the distribution key in the quals. * We're also not allowing any other appearances of the distribution key in the quals.

View File

@ -197,9 +197,7 @@ CitusExplainScan(CustomScanState *node, List *ancestors, struct ExplainState *es
if (!ExplainDistributedQueries) if (!ExplainDistributedQueries)
{ {
appendStringInfoSpaces(es->str, es->indent * 2); ExplainPropertyBool("citus.explain_distributed_queries", false, es);
appendStringInfo(es->str, "explain statements for distributed queries ");
appendStringInfo(es->str, "are not enabled\n");
return; return;
} }

View File

@ -715,8 +715,8 @@ MultiNodeTree(Query *queryTree)
/* /*
* ContainsReadIntermediateResultFunction determines whether an expresion tree contains * ContainsReadIntermediateResultFunction determines whether an expression tree
* a call to the read_intermediate_result function. * contains a call to the read_intermediate_result function.
*/ */
bool bool
ContainsReadIntermediateResultFunction(Node *node) ContainsReadIntermediateResultFunction(Node *node)
@ -726,7 +726,7 @@ ContainsReadIntermediateResultFunction(Node *node)
/* /*
* ContainsReadIntermediateResultArrayFunction determines whether an expresion * ContainsReadIntermediateResultArrayFunction determines whether an expression
* tree contains a call to the read_intermediate_results(result_ids, format) * tree contains a call to the read_intermediate_results(result_ids, format)
* function. * function.
*/ */

View File

@ -434,7 +434,7 @@ ExtractSelectRangeTableEntry(Query *query)
* for the given modification query. * for the given modification query.
* *
* The function errors out if the input query is not a * The function errors out if the input query is not a
* modify query (e.g., INSERT, UPDATE or DELETE). So, this * modify query (e.g., INSERT, UPDATE, DELETE or MERGE). So, this
* function is not expected to be called on SELECT queries. * function is not expected to be called on SELECT queries.
*/ */
Oid Oid
@ -2271,13 +2271,13 @@ SelectsFromDistributedTable(List *rangeTableList, Query *query)
/* /*
* RouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and MERGE queries. * PlanRouterQuery runs router pruning logic for SELECT, UPDATE, DELETE, and
* If there are shards present and query is routable, all RTEs have been updated * MERGE queries. If there are shards present and query is routable, all RTEs
* to point to the relevant shards in the originalQuery. Also, placementList is * have been updated to point to the relevant shards in the originalQuery. Also,
* filled with the list of worker nodes that has all the required shard placements * placementList is filled with the list of worker nodes that has all the
* for the query execution. anchorShardId is set to the first pruned shardId of * required shard placements for the query execution. anchorShardId is set to
* the given query. Finally, relationShardList is filled with the list of * the first pruned shardId of the given query. Finally, relationShardList is
* relation-to-shard mappings for the query. * filled with the list of relation-to-shard mappings for the query.
* *
* If the given query is not routable, it fills planningError with the related * If the given query is not routable, it fills planningError with the related
* DeferredErrorMessage. The caller can check this error message to see if query * DeferredErrorMessage. The caller can check this error message to see if query
@ -2510,7 +2510,7 @@ AllShardsColocated(List *relationShardList)
if (currentTableType == RANGE_DISTRIBUTED || if (currentTableType == RANGE_DISTRIBUTED ||
currentTableType == APPEND_DISTRIBUTED) currentTableType == APPEND_DISTRIBUTED)
{ {
/* we do not have further strict colocation chceks */ /* we do not have further strict colocation checks */
continue; continue;
} }
} }
@ -2932,7 +2932,7 @@ TargetShardIntervalsForRestrictInfo(RelationRestrictionContext *restrictionConte
} }
/* /*
* Different resrictions might have different partition columns. * Different restrictions might have different partition columns.
* We report partition column value if there is only one. * We report partition column value if there is only one.
*/ */
if (multiplePartitionValuesExist) if (multiplePartitionValuesExist)

View File

@ -104,7 +104,7 @@ typedef struct FastPathRestrictionContext
* Set to true when distKey = Param; in the queryTree * Set to true when distKey = Param; in the queryTree
*/ */
bool distributionKeyHasParam; bool distributionKeyHasParam;
}FastPathRestrictionContext; } FastPathRestrictionContext;
typedef struct PlannerRestrictionContext typedef struct PlannerRestrictionContext
{ {

View File

@ -23,7 +23,8 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
extern bool IsAnyObjectDistributed(const List *addresses); extern bool IsAnyObjectDistributed(const List *addresses);
extern bool ClusterHasDistributedFunctionWithDistArgument(void); extern bool ClusterHasDistributedFunctionWithDistArgument(void);
extern void MarkObjectDistributed(const ObjectAddress *distAddress); extern void MarkObjectDistributed(const ObjectAddress *distAddress);
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name); extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name,
bool useConnectionForLocalQuery);
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress); extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress); extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
extern void UnmarkObjectDistributed(const ObjectAddress *address); extern void UnmarkObjectDistributed(const ObjectAddress *address);

View File

@ -238,8 +238,8 @@ typedef struct Task
TaskQuery taskQuery; TaskQuery taskQuery;
/* /*
* A task can have multiple queries, in which case queryCount will be > 1. If * A task can have multiple queries, in which case queryCount will be > 1, and
* a task has more one query, then taskQuery->queryType == TASK_QUERY_TEXT_LIST. * taskQuery->queryType == TASK_QUERY_TEXT_LIST.
*/ */
int queryCount; int queryCount;
@ -290,7 +290,7 @@ typedef struct Task
/* /*
* When we evaluate functions and parameters in the query string then * When we evaluate functions and parameters in the query string then
* we should no longer send the list of parameters long with the * we should no longer send the list of parameters along with the
* query. * query.
*/ */
bool parametersInQueryStringResolved; bool parametersInQueryStringResolved;

View File

@ -1,5 +1,5 @@
# this schedule is to be run on only coordinators # this schedule is to be run on only coordinators
test: upgrade_basic_before test: upgrade_basic_before upgrade_basic_before_non_mixed
test: upgrade_pg_dist_cleanup_before test: upgrade_pg_dist_cleanup_before
test: upgrade_post_11_before test: upgrade_post_11_before

View File

@ -1,5 +1,5 @@
# The basic tests runs analyze which depends on shard numbers # The basic tests runs analyze which depends on shard numbers
test: multi_test_helpers multi_test_helpers_superuser test: multi_test_helpers multi_test_helpers_superuser upgrade_basic_before_non_mixed
test: multi_test_catalog_views test: multi_test_catalog_views
test: upgrade_basic_before test: upgrade_basic_before
test: upgrade_ref2ref_before test: upgrade_ref2ref_before

View File

@ -215,7 +215,7 @@ s/^(ERROR: The index name \(test_index_creation1_p2020_09_26)_([0-9])+_(tenant_
s/^(DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26)_([0-9])+_(tenant_id_timeperiod_idx)/\1_xxxxxx_\3/g s/^(DEBUG: the index name on the shards of the partition is too long, switching to sequential and local execution mode to prevent self deadlocks: test_index_creation1_p2020_09_26)_([0-9])+_(tenant_id_timeperiod_idx)/\1_xxxxxx_\3/g
# normalize errors for not being able to connect to a non-existing host # normalize errors for not being able to connect to a non-existing host
s/could not translate host name "foobar" to address: .*$/could not translate host name "foobar" to address: <system specific error>/g s/could not translate host name "([A-Za-z0-9\.\-]+)" to address: .*$/could not translate host name "\1" to address: <system specific error>/g
# ignore PL/pgSQL line numbers that differ on Mac builds # ignore PL/pgSQL line numbers that differ on Mac builds
s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g s/(CONTEXT: PL\/pgSQL function .* line )([0-9]+)/\1XX/g

View File

@ -92,7 +92,7 @@ PG_MAJOR_VERSION = get_pg_major_version()
OLDEST_SUPPORTED_CITUS_VERSION_MATRIX = { OLDEST_SUPPORTED_CITUS_VERSION_MATRIX = {
14: "10.2.0", 14: "10.2.0",
15: "11.1.5", 15: "11.1.5",
16: "12.1devel", 16: "12.1.1",
} }
OLDEST_SUPPORTED_CITUS_VERSION = OLDEST_SUPPORTED_CITUS_VERSION_MATRIX[PG_MAJOR_VERSION] OLDEST_SUPPORTED_CITUS_VERSION = OLDEST_SUPPORTED_CITUS_VERSION_MATRIX[PG_MAJOR_VERSION]
@ -431,6 +431,12 @@ next_port = PORT_LOWER_BOUND
def notice_handler(diag: psycopg.errors.Diagnostic): def notice_handler(diag: psycopg.errors.Diagnostic):
print(f"{diag.severity}: {diag.message_primary}") print(f"{diag.severity}: {diag.message_primary}")
if diag.message_detail:
print(f"DETAIL: {diag.message_detail}")
if diag.message_hint:
print(f"HINT: {diag.message_hint}")
if diag.context:
print(f"CONTEXT: {diag.context}")
def cleanup_test_leftovers(nodes): def cleanup_test_leftovers(nodes):

View File

@ -153,6 +153,9 @@ DEPS = {
"isolation_extension_commands": TestDeps( "isolation_extension_commands": TestDeps(
None, ["isolation_setup", "isolation_add_remove_node"] None, ["isolation_setup", "isolation_add_remove_node"]
), ),
"isolation_update_node": TestDeps(
None, ["isolation_setup", "isolation_add_remove_node"]
),
"schema_based_sharding": TestDeps("minimal_schedule"), "schema_based_sharding": TestDeps("minimal_schedule"),
"multi_sequence_default": TestDeps( "multi_sequence_default": TestDeps(
None, ["multi_test_helpers", "multi_cluster_management", "multi_table_ddl"] None, ["multi_test_helpers", "multi_cluster_management", "multi_table_ddl"]

View File

@ -36,14 +36,25 @@ def test_main_commited_outer_not_yet(cluster):
assert ( assert (
int(role_before_commit) == 0 int(role_before_commit) == 0
), "role is on pg_dist_object despite not committing" ), "role is in pg_dist_object despite not committing"
# user should not be in pg_dist_object on the coordinator because outer transaction is not committed yet
pdo_coordinator_before_commit = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
)
assert (
int(pdo_coordinator_before_commit) == 0
), "role is in pg_dist_object on coordinator despite not committing"
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet # user should not be in pg_dist_object on the worker because outer transaction is not committed yet
pdo_before_commit = w0.sql_value( pdo_worker_before_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
) )
assert int(pdo_before_commit) == 0, "role is created despite not committing" assert (
int(pdo_worker_before_commit) == 0
), "role is in pg_dist_object on worker despite not committing"
# commit in cur1 so the transaction recovery thinks this is a successful transaction # commit in cur1 so the transaction recovery thinks this is a successful transaction
cur1.execute("COMMIT") cur1.execute("COMMIT")
@ -60,14 +71,23 @@ def test_main_commited_outer_not_yet(cluster):
int(role_after_commit) == 1 int(role_after_commit) == 1
), "role is not created during recovery despite committing" ), "role is not created during recovery despite committing"
# check that the user is on pg_dist_object on the worker after transaction recovery # check that the user is in pg_dist_object on the coordinator after transaction recovery
pdo_after_commit = w0.sql_value( pdo_coordinator_after_commit = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
)
assert (
int(pdo_coordinator_after_commit) == 1
), "role is not in pg_dist_object on coordinator after recovery despite committing"
# check that the user is in pg_dist_object on the worker after transaction recovery
pdo_worker_after_commit = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'" "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
) )
assert ( assert (
int(pdo_after_commit) == 1 int(pdo_worker_after_commit) == 1
), "role is not on pg_dist_object after recovery despite committing" ), "role is not in pg_dist_object on worker after recovery despite committing"
c.sql("DROP DATABASE db1") c.sql("DROP DATABASE db1")
c.sql( c.sql(
@ -81,6 +101,12 @@ def test_main_commited_outer_not_yet(cluster):
$$) $$)
""" """
) )
c.sql(
"""
DELETE FROM pg_dist_object
WHERE objid = 123123
"""
)
def test_main_commited_outer_aborted(cluster): def test_main_commited_outer_aborted(cluster):
@ -121,14 +147,23 @@ def test_main_commited_outer_aborted(cluster):
assert int(role_before_recovery) == 0, "role is already created before recovery" assert int(role_before_recovery) == 0, "role is already created before recovery"
# check that the user is not on pg_dist_object on the worker # check that the user is not in pg_dist_object on the coordinator
pdo_before_recovery = w0.sql_value( pdo_coordinator_before_recovery = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
)
assert (
int(pdo_coordinator_before_recovery) == 0
), "role is already in pg_dist_object on coordinator before recovery"
# check that the user is not in pg_dist_object on the worker
pdo_worker_before_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
) )
assert ( assert (
int(pdo_before_recovery) == 0 int(pdo_worker_before_recovery) == 0
), "role is already on pg_dist_object before recovery" ), "role is already in pg_dist_object on worker before recovery"
# run the transaction recovery # run the transaction recovery
c.sql("SELECT recover_prepared_transactions()") c.sql("SELECT recover_prepared_transactions()")
@ -142,13 +177,22 @@ def test_main_commited_outer_aborted(cluster):
int(role_after_recovery) == 0 int(role_after_recovery) == 0
), "role is created during recovery despite aborting" ), "role is created during recovery despite aborting"
# check that the user is not on pg_dist_object on the worker after transaction recovery # check that the user is not in pg_dist_object on the coordinator after transaction recovery
pdo_after_recovery = w0.sql_value( pdo_coordinator_after_recovery = c.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
)
assert (
int(pdo_coordinator_after_recovery) == 0
), "role is in pg_dist_object on coordinator after recovery despite aborting"
# check that the user is not in pg_dist_object on the worker after transaction recovery
pdo_worker_after_recovery = w0.sql_value(
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'" "SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
) )
assert ( assert (
int(pdo_after_recovery) == 0 int(pdo_worker_after_recovery) == 0
), "role is on pg_dist_object after recovery despite aborting" ), "role is in pg_dist_object on worker after recovery despite aborting"
c.sql("DROP DATABASE db2") c.sql("DROP DATABASE db2")

View File

@ -93,8 +93,8 @@ nodeid|nodename|nodeport
starting permutation: s1-begin s1-update-node-1 s2-begin s2-update-node-1 s1-commit s2-abort s1-show-nodes s3-update-node-1-back s3-manually-fix-metadata starting permutation: s1-begin s1-update-node-1 s2-begin s2-update-node-1 s1-commit s2-abort s1-show-nodes s3-update-node-1-back s3-manually-fix-metadata
nodeid|nodename |nodeport nodeid|nodename |nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
25|localhost| 57638 23|localhost| 57638
24|localhost| 57637 22|localhost| 57637
(2 rows) (2 rows)
step s1-begin: step s1-begin:
@ -139,8 +139,8 @@ step s1-show-nodes:
nodeid|nodename |nodeport|isactive nodeid|nodename |nodeport|isactive
--------------------------------------------------------------------- ---------------------------------------------------------------------
25|localhost| 57638|t 23|localhost| 57638|t
24|localhost| 58637|t 22|localhost| 58637|t
(2 rows) (2 rows)
step s3-update-node-1-back: step s3-update-node-1-back:
@ -178,8 +178,8 @@ nodeid|nodename|nodeport
starting permutation: s2-create-table s1-begin s1-update-node-nonexistent s1-prepare-transaction s2-cache-prepared-statement s1-commit-prepared s2-execute-prepared s1-update-node-existent s3-manually-fix-metadata starting permutation: s2-create-table s1-begin s1-update-node-nonexistent s1-prepare-transaction s2-cache-prepared-statement s1-commit-prepared s2-execute-prepared s1-update-node-existent s3-manually-fix-metadata
nodeid|nodename |nodeport nodeid|nodename |nodeport
--------------------------------------------------------------------- ---------------------------------------------------------------------
27|localhost| 57638 23|localhost| 57638
26|localhost| 57637 22|localhost| 57637
(2 rows) (2 rows)
step s2-create-table: step s2-create-table:
@ -250,7 +250,7 @@ count
step s1-commit-prepared: step s1-commit-prepared:
COMMIT prepared 'label'; COMMIT prepared 'label';
s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: Name or service not known s2: WARNING: connection to the remote node non-existent:57637 failed with the following error: could not translate host name "non-existent" to address: <system specific error>
step s2-execute-prepared: step s2-execute-prepared:
EXECUTE foo; EXECUTE foo;

View File

@ -77,10 +77,10 @@ DEBUG: Router planner cannot handle multi-shard select queries
LOG: join order: [ "lineitem" ][ local partition join "lineitem" ] LOG: join order: [ "lineitem" ][ local partition join "lineitem" ]
DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647] DEBUG: join prunable for intervals [-2147483648,-1] and [0,2147483647]
DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1] DEBUG: join prunable for intervals [0,2147483647] and [-2147483648,-1]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(2 rows) (2 rows)
SET client_min_messages TO LOG; SET client_min_messages TO LOG;
@ -92,11 +92,11 @@ SELECT count(*) FROM lineitem, orders
WHERE (l_orderkey = o_orderkey AND l_quantity > 5) WHERE (l_orderkey = o_orderkey AND l_quantity > 5)
OR (l_orderkey = o_orderkey AND l_quantity < 10); OR (l_orderkey = o_orderkey AND l_quantity < 10);
LOG: join order: [ "lineitem" ][ local partition join "orders" ] LOG: join order: [ "lineitem" ][ local partition join "orders" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)
@ -107,11 +107,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, lineitem_hash SELECT count(*) FROM orders, lineitem_hash
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem_hash" ] LOG: join order: [ "orders" ][ dual partition join "lineitem_hash" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Verify we handle local joins between two hash-partitioned tables. -- Verify we handle local joins between two hash-partitioned tables.
@ -119,11 +119,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders_hash, lineitem_hash SELECT count(*) FROM orders_hash, lineitem_hash
WHERE o_orderkey = l_orderkey; WHERE o_orderkey = l_orderkey;
LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ] LOG: join order: [ "orders_hash" ][ local partition join "lineitem_hash" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Validate that we can handle broadcast joins with hash-partitioned tables. -- Validate that we can handle broadcast joins with hash-partitioned tables.
@ -131,11 +131,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM customer_hash, nation SELECT count(*) FROM customer_hash, nation
WHERE c_nationkey = n_nationkey; WHERE c_nationkey = n_nationkey;
LOG: join order: [ "customer_hash" ][ reference join "nation" ] LOG: join order: [ "customer_hash" ][ reference join "nation" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Validate that we don't use a single-partition join method for a hash -- Validate that we don't use a single-partition join method for a hash
@ -144,11 +144,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, lineitem, customer_append SELECT count(*) FROM orders, lineitem, customer_append
WHERE o_custkey = l_partkey AND o_custkey = c_nationkey; WHERE o_custkey = l_partkey AND o_custkey = c_nationkey;
LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer_append" ] LOG: join order: [ "orders" ][ dual partition join "lineitem" ][ dual partition join "customer_append" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Validate that we don't chose a single-partition join method with a -- Validate that we don't chose a single-partition join method with a
@ -157,11 +157,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders, customer_hash SELECT count(*) FROM orders, customer_hash
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
LOG: join order: [ "orders" ][ dual partition join "customer_hash" ] LOG: join order: [ "orders" ][ dual partition join "customer_hash" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Validate that we can re-partition a hash partitioned table to join with a -- Validate that we can re-partition a hash partitioned table to join with a
@ -170,11 +170,11 @@ EXPLAIN (COSTS OFF)
SELECT count(*) FROM orders_hash, customer_append SELECT count(*) FROM orders_hash, customer_append
WHERE c_custkey = o_custkey; WHERE c_custkey = o_custkey;
LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ] LOG: join order: [ "orders_hash" ][ dual partition join "customer_append" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Validate a 4 way join that could be done locally is planned as such by the logical -- Validate a 4 way join that could be done locally is planned as such by the logical
@ -195,11 +195,11 @@ JOIN (
WHERE event_type = 5 WHERE event_type = 5
) AS some_users ON (some_users.user_id = bar.user_id); ) AS some_users ON (some_users.user_id = bar.user_id);
LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ] LOG: join order: [ "users_table" ][ local partition join "events_table" ][ local partition join "users_table" ][ local partition join "events_table" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Reset client logging level to its previous value -- Reset client logging level to its previous value

View File

@ -22,11 +22,11 @@ WHERE
and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_discount between 0.06 - 0.01 and 0.06 + 0.01
and l_quantity < 24; and l_quantity < 24;
LOG: join order: [ "lineitem" ] LOG: join order: [ "lineitem" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Query #3 from the TPC-H decision support benchmark -- Query #3 from the TPC-H decision support benchmark
@ -61,7 +61,7 @@ LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partitio
-> HashAggregate -> HashAggregate
Group Key: remote_scan.l_orderkey, remote_scan.o_orderdate, remote_scan.o_shippriority Group Key: remote_scan.l_orderkey, remote_scan.o_orderdate, remote_scan.o_shippriority
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(6 rows) (6 rows)
-- Query #10 from the TPC-H decision support benchmark -- Query #10 from the TPC-H decision support benchmark
@ -98,12 +98,12 @@ GROUP BY
ORDER BY ORDER BY
revenue DESC; revenue DESC;
LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ] LOG: join order: [ "orders" ][ local partition join "lineitem" ][ dual partition join "customer_append" ][ reference join "nation" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Sort Sort
Sort Key: remote_scan.revenue DESC Sort Key: remote_scan.revenue DESC
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(4 rows) (4 rows)
-- Query #19 from the TPC-H decision support benchmark (modified) -- Query #19 from the TPC-H decision support benchmark (modified)
@ -138,11 +138,11 @@ WHERE
AND l_shipinstruct = 'DELIVER IN PERSON' AND l_shipinstruct = 'DELIVER IN PERSON'
); );
LOG: join order: [ "lineitem" ][ dual partition join "part_append" ] LOG: join order: [ "lineitem" ][ dual partition join "part_append" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Query to test multiple re-partition jobs in a single query -- Query to test multiple re-partition jobs in a single query
@ -158,12 +158,12 @@ WHERE
GROUP BY GROUP BY
l_partkey; l_partkey;
LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ] LOG: join order: [ "lineitem" ][ local partition join "orders" ][ dual partition join "part_append" ][ dual partition join "customer_append" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
HashAggregate HashAggregate
Group Key: remote_scan.l_partkey Group Key: remote_scan.l_partkey
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(4 rows) (4 rows)
-- Reset client logging level to its previous value -- Reset client logging level to its previous value

View File

@ -17,11 +17,11 @@ WHERE
and l_discount between 0.06 - 0.01 and 0.06 + 0.01 and l_discount between 0.06 - 0.01 and 0.06 + 0.01
and l_quantity < 24; and l_quantity < 24;
LOG: join order: [ "lineitem" ] LOG: join order: [ "lineitem" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Query #3 from the TPC-H decision support benchmark -- Query #3 from the TPC-H decision support benchmark
@ -49,12 +49,12 @@ ORDER BY
revenue DESC, revenue DESC,
o_orderdate; o_orderdate;
LOG: join order: [ "orders" ][ reference join "customer" ][ local partition join "lineitem" ] LOG: join order: [ "orders" ][ reference join "customer" ][ local partition join "lineitem" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Sort Sort
Sort Key: remote_scan.revenue DESC, remote_scan.o_orderdate Sort Key: remote_scan.revenue DESC, remote_scan.o_orderdate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(4 rows) (4 rows)
-- Query #10 from the TPC-H decision support benchmark -- Query #10 from the TPC-H decision support benchmark
@ -98,7 +98,7 @@ LOG: join order: [ "orders" ][ reference join "customer" ][ reference join "nat
-> HashAggregate -> HashAggregate
Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment Group Key: remote_scan.c_custkey, remote_scan.c_name, remote_scan.c_acctbal, remote_scan.c_phone, remote_scan.n_name, remote_scan.c_address, remote_scan.c_comment
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(6 rows) (6 rows)
-- Query #19 from the TPC-H decision support benchmark (modified) -- Query #19 from the TPC-H decision support benchmark (modified)
@ -133,11 +133,11 @@ WHERE
AND l_shipinstruct = 'DELIVER IN PERSON' AND l_shipinstruct = 'DELIVER IN PERSON'
); );
LOG: join order: [ "lineitem" ][ reference join "part" ] LOG: join order: [ "lineitem" ][ reference join "part" ]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Reset client logging level to its previous value -- Reset client logging level to its previous value

View File

@ -104,11 +104,11 @@ SELECT count(*)
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] DEBUG: join prunable for intervals [{},{AZZXSP27F21T6,AZZXSP27F21T6}] and [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}]
DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}] DEBUG: join prunable for intervals [{BA1000U2AMO4ZGX,BZZXSP27F21T6},{CA1000U2AMO4ZGX,CZZXSP27F21T6}] and [{},{AZZXSP27F21T6,AZZXSP27F21T6}]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
EXPLAIN (COSTS OFF) EXPLAIN (COSTS OFF)
@ -118,11 +118,11 @@ SELECT count(*)
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)] DEBUG: join prunable for intervals [(a,3,b),(b,4,c)] and [(c,5,d),(d,6,e)]
DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)] DEBUG: join prunable for intervals [(c,5,d),(d,6,e)] and [(a,3,b),(b,4,c)]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Test that large table joins on partition varchar columns work -- Test that large table joins on partition varchar columns work
@ -133,11 +133,11 @@ SELECT count(*)
DEBUG: Router planner cannot handle multi-shard select queries DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6] DEBUG: join prunable for intervals [AA1000U2AMO4ZGX,AZZXSP27F21T6] and [BA1000U2AMO4ZGX,BZZXSP27F21T6]
DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6] DEBUG: join prunable for intervals [BA1000U2AMO4ZGX,BZZXSP27F21T6] and [AA1000U2AMO4ZGX,AZZXSP27F21T6]
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;

View File

@ -75,11 +75,11 @@ DEBUG: shard count after pruning for task_assignment_test_table: 3
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
@ -89,11 +89,11 @@ DEBUG: shard count after pruning for task_assignment_test_table: 3
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
-- Next test the first-replica task assignment policy -- Next test the first-replica task assignment policy
@ -105,11 +105,11 @@ DEBUG: shard count after pruning for task_assignment_test_table: 3
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table; EXPLAIN (COSTS OFF) SELECT count(*) FROM task_assignment_test_table;
@ -119,11 +119,11 @@ DEBUG: shard count after pruning for task_assignment_test_table: 3
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
DEBUG: assigned task to node localhost:xxxxx DEBUG: assigned task to node localhost:xxxxx
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Aggregate Aggregate
-> Custom Scan (Citus Adaptive) -> Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(3 rows) (3 rows)
COMMIT; COMMIT;
@ -142,38 +142,38 @@ SET LOCAL citus.task_assignment_policy TO 'greedy';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(2 rows) (2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(2 rows) (2 rows)
SET LOCAL citus.task_assignment_policy TO 'first-replica'; SET LOCAL citus.task_assignment_policy TO 'first-replica';
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(2 rows) (2 rows)
EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table; EXPLAIN (COSTS FALSE) SELECT * FROM task_assignment_reference_table;
DEBUG: Distributed planning for a fast-path router query DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan DEBUG: Creating router plan
QUERY PLAN QUERY PLAN
--------------------------------------------------------------------- ---------------------------------------------------------------------
Custom Scan (Citus Adaptive) Custom Scan (Citus Adaptive)
explain statements for distributed queries are not enabled citus.explain_distributed_queries: false
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;

View File

@ -3,6 +3,8 @@ setup
-- revert back to pg_isolation_test_session_is_blocked until the tests are fixed -- revert back to pg_isolation_test_session_is_blocked until the tests are fixed
SELECT citus_internal.restore_isolation_tester_func(); SELECT citus_internal.restore_isolation_tester_func();
ALTER SEQUENCE pg_dist_node_nodeid_seq RESTART 22;
SELECT 1 FROM master_add_node('localhost', 57637); SELECT 1 FROM master_add_node('localhost', 57637);
SELECT 1 FROM master_add_node('localhost', 57638); SELECT 1 FROM master_add_node('localhost', 57638);