Merge branch 'main' into create_alter_database

pull/7240/head
Gürkan İndibay 2023-11-09 11:37:58 +03:00 committed by GitHub
commit 32c67963bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 532 additions and 206 deletions

View File

@ -10,8 +10,13 @@ on:
required: false
default: false
type: boolean
push:
branches:
- "main"
- "release-*"
pull_request:
types: [opened, reopened,synchronize]
merge_group:
jobs:
# Since GHA does not interpolate env varibles in matrix context, we need to
# define them in a separate job and use them in other jobs.
@ -492,7 +497,6 @@ jobs:
matrix: ${{ fromJson(needs.prepare_parallelization_matrix_32.outputs.json) }}
steps:
- uses: actions/checkout@v3.5.0
- uses: actions/download-artifact@v3.0.1
- uses: "./.github/actions/setup_extension"
- name: Run minimal tests
run: |-
@ -501,7 +505,7 @@ jobs:
for test in "${tests_array[@]}"
do
test_name=$(echo "$test" | sed -r "s/.+\/(.+)\..+/\1/")
gosu circleci src/test/regress/citus_tests/run_test.py $test_name --repeat ${{ env.runs }} --use-base-schedule --use-whole-schedule-line
gosu circleci src/test/regress/citus_tests/run_test.py $test_name --repeat ${{ env.runs }} --use-whole-schedule-line
done
shell: bash
- uses: "./.github/actions/save_logs_and_results"

View File

@ -71,7 +71,7 @@ jobs:
- uses: "./.github/actions/setup_extension"
- name: Run minimal tests
run: |-
gosu circleci src/test/regress/citus_tests/run_test.py ${{ env.test }} --repeat ${{ env.runs }} --use-base-schedule --use-whole-schedule-line
gosu circleci src/test/regress/citus_tests/run_test.py ${{ env.test }} --repeat ${{ env.runs }} --use-whole-schedule-line
shell: bash
- uses: "./.github/actions/save_logs_and_results"
if: always()

View File

@ -3,6 +3,7 @@ name: Build tests in packaging images
on:
pull_request:
types: [opened, reopened,synchronize]
merge_group:
workflow_dispatch:

View File

@ -1723,11 +1723,11 @@ Merge command the same principles as INSERT .. SELECT processing. However, due t
# DDL
DDL commands are primarily handled via the ProcessUtility hook, which gets the parse tree of the DDL command. For supported DDL commands, we always follow the same sequence of steps:
DDL commands are primarily handled via the citus_ProcessUtility hook, which gets the parse tree of the DDL command. For supported DDL commands, we always follow the same sequence of steps:
1. Qualify the table names in the parse tree (simplifies deparsing, avoids sensitivity to search_path changes)
2. Pre-process logic
3. Call original ProcessUtility to execute the command on the local shell table
3. Call original previous ProcessUtility to execute the command on the local shell table
4. Post-process logic
5. Execute command on all other nodes
6. Execute command on shards (in case of table DDL)
@ -1749,6 +1749,66 @@ The reason for handling dependencies and deparsing in post-process step is that
Not all table DDL is currently deparsed. In that case, the original command sent by the client is used. That is a shortcoming in our DDL logic that causes user-facing issues and should be addressed. We do not directly construct a separate DDL command for each shard. Instead, we call the `worker_apply_shard_ddl_command(shardid bigint, ddl_command text)` function which parses the DDL command, replaces the table names with shard names in the parse tree according to the shard ID, and then executes the command. That also has some shortcomings, because we cannot support more complex DDL commands in this manner (e.g. adding multiple foreign keys). Ideally, all DDL would be deparsed, and for table DDL the deparsed query string would have shard names, similar to regular queries.
`markDistributed` is used to indicate whether we add a record to `pg_dist_object` to mark the object as "distributed".
## Defining a new DDL command
All commands that are propagated by Citus should be defined in DistributeObjectOps struct. Below is a sample DistributeObjectOps for ALTER DATABASE command that is defined in [distribute_object_ops.c](commands/distribute_object_ops.c) file.
```c
static DistributeObjectOps Database_Alter = {
.deparse = DeparseAlterDatabaseStmt,
.qualify = NULL,
.preprocess = PreprocessAlterDatabaseStmt,
.postprocess = NULL,
.objectType = OBJECT_DATABASE,
.operationType = DIST_OPS_ALTER,
.address = NULL,
.markDistributed = false,
};
```
Each field in the struct is documented in the comments within the `DistributeObjectOps`. When defining a new DDL command, follow these guidelines:
- **Returning tasks for `preprocess` and `postprocess`**: Ensure that either `preprocess` or `postprocess` returns a list of "DDLJob"s. If both functions return non-empty lists, then you would get an assertion failure.
- **Generic `preprocess` and `postprocess` methods**: The generic methods, `PreprocessAlterDistributedObjectStmt` and `PostprocessAlterDistributedObjectStmt`, serve as generic pre and post methods utilized for various statements. Both of these methods find application in distributed object operations.
- The `PreprocessAlterDistributedObjectStmt` method carries out the following operations:
- Performs a qualification operation.
- Deparses the statement and generates a task list.
- As for the `PostprocessAlterDistributedObjectStmt` method, it:
- Invokes the `EnsureAllObjectDependenciesExistOnAllNodes` function to propagate missing dependencies, both on the coordinator and the worker.
- Before defining new `preprocess` or `postprocess` methods, it is advisable to assess whether the generic methods can be employed in your specific case.
- **`deparse`**: When propagating the command to worker nodes, make sure to define `deparse`. This is necessary because it generates a query string for each worker node.
- **`markDistributed`**: Set this flag to true if you want to add a record to the `pg_dist_object` table. This is particularly important for `CREATE` statements when introducing a new object to the system.
- **`address`**: If `markDistributed` is set to true, you must define the `address`. Failure to do so will result in a runtime error. The `address` is required to identify the fields that will be stored in the `pg_dist_object` table.
- **`markDistributed` usage in `DROP` Statements**: Please note that `markDistributed` does not apply to `DROP` statements. For `DROP` statements, instead you need to call `UnmarkObjectDistributed()` for the object either in `preprocess` or `postprocess`. Otherwise, state records in ``pg_dist_object`` table will cause errors in UDF calls such as ``citus_add_node()``, which will try to copy the non-existent db object.
- **`qualify`**: The `qualify` function is used to qualify the objects based on their schemas in the parse tree. It is employed to prevent sensitivity to changes in the `search_path` on worker nodes. Note that it is not mandatory to define this function for all DDL commands. It is only required for commands that involve objects that are bound to schemas, such as; tables, types, functions and so on.
After defining the `DistributeObjectOps` structure, this structure should be implemented in the `GetDistributeObjectOps()` function as shown below:
```c
// Example implementation in C code
const DistributeObjectOps *
GetDistributeObjectOps(Node *node)
{
switch (nodeTag(node))
{
case T_AlterDatabaseStmt:
{
return &Database_Alter;
}
...
```
## Object & dependency propagation

View File

@ -978,7 +978,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
char *argmodes = NULL;
int insertorderbyat = -1;
int argsprinted = 0;
int inputargno = 0;
HeapTuple proctup = SearchSysCache1(PROCOID, funcOid);
if (!HeapTupleIsValid(proctup))
@ -1058,7 +1057,6 @@ GetAggregateDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace)
}
}
inputargno++; /* this is a 1-based counter */
if (argsprinted == insertorderbyat)
{
appendStringInfoString(&buf, " ORDER BY ");

View File

@ -175,7 +175,6 @@ BuildCreatePublicationStmt(Oid publicationId)
PUBLICATION_PART_ROOT :
PUBLICATION_PART_LEAF);
Oid relationId = InvalidOid;
int citusTableCount PG_USED_FOR_ASSERTS_ONLY = 0;
/* mainly for consistent ordering in test output */
relationIds = SortList(relationIds, CompareOids);
@ -199,11 +198,6 @@ BuildCreatePublicationStmt(Oid publicationId)
createPubStmt->tables = lappend(createPubStmt->tables, rangeVar);
#endif
if (IsCitusTable(relationId))
{
citusTableCount++;
}
}
/* WITH (publish_via_partition_root = true) option */

View File

@ -184,7 +184,6 @@ ExecuteVacuumOnDistributedTables(VacuumStmt *vacuumStmt, List *relationIdList,
CitusVacuumParams vacuumParams)
{
int relationIndex = 0;
int executedVacuumCount = 0;
Oid relationId = InvalidOid;
foreach_oid(relationId, relationIdList)
@ -197,7 +196,6 @@ ExecuteVacuumOnDistributedTables(VacuumStmt *vacuumStmt, List *relationIdList,
/* local execution is not implemented for VACUUM commands */
bool localExecutionSupported = false;
ExecuteUtilityTaskList(taskList, localExecutionSupported);
executedVacuumCount++;
}
relationIndex++;
}

View File

@ -567,7 +567,7 @@ LogLocalCommand(Task *task)
*
* One slightly different case is modifications to replicated tables
* (e.g., reference tables) where a single task ends in two separate tasks
* and the local task is added to localTaskList and the remaning ones to
* and the local task is added to localTaskList and the remaining ones to
* the remoteTaskList.
*/
void

View File

@ -158,13 +158,6 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
"replication factor.")));
}
/* if we have enough nodes, add an extra placement attempt for backup */
uint32 placementAttemptCount = (uint32) replicationFactor;
if (workerNodeCount > replicationFactor)
{
placementAttemptCount++;
}
/* set shard storage type according to relation type */
char shardStorageType = ShardStorageType(distributedTableId);

View File

@ -702,6 +702,7 @@ DissuadePlannerFromUsingPlan(PlannedStmt *plan)
* Arbitrarily high cost, but low enough that it can be added up
* without overflowing by choose_custom_plan().
*/
Assert(plan != NULL);
plan->planTree->total_cost = FLT_MAX / 100000000;
}

View File

@ -525,8 +525,16 @@ ShardPlacementForFunctionColocatedWithDistTable(DistObjectCacheEntry *procedure,
if (partitionParam->paramkind == PARAM_EXTERN)
{
/* Don't log a message, we should end up here again without a parameter */
/*
* Don't log a message, we should end up here again without a
* parameter.
* Note that "plan" can be null, for example when a CALL statement
* is prepared.
*/
if (plan)
{
DissuadePlannerFromUsingPlan(plan);
}
return NULL;
}
}

View File

@ -90,6 +90,28 @@ activate_node_snapshot(PG_FUNCTION_ARGS)
}
/*
* IsMetadataSynced checks the workers to see if all workers with metadata are
* synced.
*/
static bool
IsMetadataSynced(void)
{
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
return false;
}
}
return true;
}
/*
* wait_until_metadata_sync waits until the maintenance daemon does a metadata
* sync, or times out.
@ -99,19 +121,10 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
{
uint32 timeout = PG_GETARG_UINT32(0);
List *workerList = ActivePrimaryNonCoordinatorNodeList(NoLock);
bool waitNotifications = false;
WorkerNode *workerNode = NULL;
foreach_ptr(workerNode, workerList)
{
/* if already has metadata, no need to do it again */
if (workerNode->hasMetadata && !workerNode->metadataSynced)
{
waitNotifications = true;
break;
}
}
/* First we start listening. */
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
LOCAL_HOST_NAME, PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
/*
* If all the metadata nodes have already been synced, we should not wait.
@ -119,15 +132,12 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
* the notification and we'd wait unnecessarily here. Worse, the test outputs
* might be inconsistent across executions due to the warning.
*/
if (!waitNotifications)
if (IsMetadataSynced())
{
CloseConnection(connection);
PG_RETURN_VOID();
}
MultiConnection *connection = GetNodeConnection(FORCE_NEW_CONNECTION,
LOCAL_HOST_NAME, PostPortNumber);
ExecuteCriticalRemoteCommand(connection, "LISTEN " METADATA_SYNC_CHANNEL);
int waitFlags = WL_SOCKET_READABLE | WL_TIMEOUT | WL_POSTMASTER_DEATH;
int waitResult = WaitLatchOrSocket(NULL, waitFlags, PQsocket(connection->pgConn),
timeout, 0);
@ -139,7 +149,7 @@ wait_until_metadata_sync(PG_FUNCTION_ARGS)
{
ClearResults(connection, true);
}
else if (waitResult & WL_TIMEOUT)
else if (waitResult & WL_TIMEOUT && !IsMetadataSynced())
{
elog(WARNING, "waiting for metadata sync timed out");
}

View File

@ -581,6 +581,14 @@ class QueryRunner(ABC):
with self.cur(**kwargs) as cur:
cur.execute(query, params=params)
def sql_prepared(self, query, params=None, **kwargs):
"""Run an SQL query, with prepare=True
This opens a new connection and closes it once the query is done
"""
with self.cur(**kwargs) as cur:
cur.execute(query, params=params, prepare=True)
def sql_row(self, query, params=None, allow_empty_result=False, **kwargs):
"""Run an SQL query that returns a single row and returns this row

View File

@ -125,7 +125,6 @@ DEPS = {
"multi_mx_create_table": TestDeps(
None,
[
"multi_test_helpers_superuser",
"multi_mx_node_metadata",
"multi_cluster_management",
"multi_mx_function_table_reference",
@ -176,6 +175,38 @@ DEPS = {
"grant_on_schema_propagation": TestDeps("minimal_schedule"),
"propagate_extension_commands": TestDeps("minimal_schedule"),
"multi_size_queries": TestDeps("base_schedule", ["multi_copy"]),
"multi_mx_node_metadata": TestDeps(
None,
[
"multi_extension",
"multi_test_helpers",
"multi_test_helpers_superuser",
],
),
"multi_mx_function_table_reference": TestDeps(
None,
[
"multi_cluster_management",
"remove_coordinator_from_metadata",
],
# because it queries node group id and it changes as we add / remove nodes
repeatable=False,
),
"multi_mx_add_coordinator": TestDeps(
None,
[
"multi_cluster_management",
"remove_coordinator_from_metadata",
"multi_mx_function_table_reference",
],
),
"metadata_sync_helpers": TestDeps(
None,
[
"multi_mx_node_metadata",
"multi_cluster_management",
],
),
}

View File

@ -0,0 +1,30 @@
def test_call_param(cluster):
# create a distributed table and an associated distributed procedure
# to ensure parameterized CALL succeed, even when the param is the
# distribution key.
coord = cluster.coordinator
coord.sql("CREATE TABLE test(i int)")
coord.sql(
"""
CREATE PROCEDURE p(_i INT) LANGUAGE plpgsql AS $$
BEGIN
INSERT INTO test(i) VALUES (_i);
END; $$
"""
)
sql = "CALL p(%s)"
# prepare/exec before distributing
coord.sql_prepared(sql, (1,))
coord.sql("SELECT create_distributed_table('test', 'i')")
coord.sql(
"SELECT create_distributed_function('p(int)', distribution_arg_name := '_i', colocate_with := 'test')"
)
# prepare/exec after distribution
coord.sql_prepared(sql, (2,))
sum_i = coord.sql_value("select sum(i) from test;")
assert sum_i == 3

View File

@ -107,6 +107,12 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema";
-- Replication slots should be cleaned up

View File

@ -696,3 +696,4 @@ SELECT rolname FROM pg_authid WHERE rolname LIKE '%existing%' ORDER BY 1;
(0 rows)
\c - - - :master_port
DROP ROLE nondist_cascade_1, nondist_cascade_2, nondist_cascade_3, dist_cascade;

View File

@ -14,6 +14,8 @@ SELECT citus.mitmproxy('conn.allow()');
(1 row)
SET citus.next_shard_id TO 100800;
-- Needed because of issue #7306
SET citus.force_max_query_parallelization TO true;
-- always try the 1st replica before the 2nd replica.
SET citus.task_assignment_policy TO 'first-replica';
--

View File

@ -277,12 +277,12 @@ CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 2 | 0
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981003 | 2 | 1
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
777 | 4 | citus_shard_split_publication_xxxxxxx_xxxxxxx_xxxxxxx | 2 | 0
@ -336,7 +336,7 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
@ -388,7 +388,7 @@ CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
@ -455,7 +455,7 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
@ -507,7 +507,7 @@ CONTEXT: while executing command on localhost:xxxxx
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981000 | 1 | 0
@ -574,7 +574,7 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)
@ -634,7 +634,7 @@ WARNING: connection to the remote node localhost:xxxxx failed with the followin
ERROR: connection not open
CONTEXT: while executing command on localhost:xxxxx
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
777 | 1 | citus_failure_split_cleanup_schema.table_to_split_8981002 | 1 | 1
@ -701,7 +701,7 @@ CONTEXT: while executing command on localhost:xxxxx
(1 row)
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
operation_id | object_type | object_name | node_group_id | policy_type
---------------------------------------------------------------------
(0 rows)

View File

@ -9,9 +9,14 @@ SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0);
RESET client_min_messages;
-- Kill maintenance daemon so it gets restarted and gets a gpid containing our
-- nodeid
SELECT pg_terminate_backend(pid)
SELECT COUNT(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' \gset
WHERE application_name = 'Citus Maintenance Daemon';
?column?
---------------------------------------------------------------------
t
(1 row)
-- reconnect to make sure we get a session with the gpid containing our nodeid
\c - - - -
CREATE SCHEMA global_cancel;
@ -77,6 +82,7 @@ ERROR: must be a superuser to terminate superuser process
SELECT pg_cancel_backend(citus_backend_gpid());
ERROR: canceling statement due to user request
\c - postgres - :master_port
DROP USER global_cancel_user;
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid
SELECT pg_cancel_backend(10000000000 * citus_coordinator_nodeid() + 0);

View File

@ -47,16 +47,16 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
(1 row)
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
@ -67,8 +67,8 @@ BEGIN;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
ROLLBACK;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
@ -84,16 +84,16 @@ SAVEPOINT s1;
INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table;
ERROR: null value in column "b" violates not-null constraint
ROLLBACK TO SAVEPOINT s1;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0
(1 row)
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
leaked_worker_1_connections | leaked_worker_2_connections
---------------------------------------------------------------------
0 | 0

View File

@ -94,7 +94,7 @@ step s2-commit:
COMMIT;
starting permutation: s4-record-pid s3-show-activity s5-kill s3-show-activity
starting permutation: s4-record-pid s3-show-activity s5-kill s3-wait-backend-termination
step s4-record-pid:
SELECT pg_backend_pid() INTO selected_pid;
@ -115,12 +115,22 @@ pg_terminate_backend
t
(1 row)
step s3-show-activity:
step s3-wait-backend-termination:
SET ROLE postgres;
select count(*) from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid);
count
---------------------------------------------------------------------
0
(1 row)
DO $$
DECLARE
i int;
BEGIN
i := 0;
-- try for 5 sec then timeout
WHILE (select count(*) > 0 from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid))
LOOP
PERFORM pg_sleep(0.1);
i := i + 1;
IF i > 50 THEN
RAISE EXCEPTION 'Timeout while waiting for backend to terminate';
END IF;
END LOOP;
END;
$$;

View File

@ -0,0 +1,68 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-begin: BEGIN;
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
step s2-begin: BEGIN;
step s2-update-node-1:
-- update a specific node by address
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
FROM pg_dist_node
WHERE nodename = 'localhost'
AND nodeport = 57637;
<waiting ...>
step s1-abort: ABORT;
step s2-update-node-1: <... completed>
master_update_node
---------------------------------------------------------------------
(1 row)
step s2-abort: ABORT;
master_remove_node
---------------------------------------------------------------------
(2 rows)
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-begin: BEGIN;
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
step s2-begin: BEGIN;
step s2-update-node-1-force:
-- update a specific node by address (force)
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
FROM pg_dist_node
WHERE nodename = 'localhost'
AND nodeport = 57637;
<waiting ...>
step s2-update-node-1-force: <... completed>
master_update_node
---------------------------------------------------------------------
(1 row)
step s2-abort: ABORT;
step s1-abort: ABORT;
FATAL: terminating connection due to administrator command
FATAL: terminating connection due to administrator command
SSL connection has been closed unexpectedly
server closed the connection unexpectedly
master_remove_node
---------------------------------------------------------------------
(2 rows)

View File

@ -32,23 +32,21 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
PUBLICATION citus_shard_move_publication_:postgres_oid
WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid);
NOTICE: created replication slot "citus_shard_move_slot_10" on publisher
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
1
citus_shard_move_subscription_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) FROM dist;
count
@ -58,22 +56,21 @@ SELECT count(*) FROM dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(0 rows)
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_move_publication_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) from pg_replication_slots;
count
---------------------------------------------------------------------
1
citus_shard_move_slot_10
(1 row)
SELECT count(*) FROM dist;
@ -90,25 +87,29 @@ select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localho
(1 row)
SELECT public.wait_for_resource_cleanup();
wait_for_resource_cleanup
---------------------------------------------------------------------
(1 row)
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
1
citus_shard_move_subscription_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from dist;
count
@ -120,22 +121,21 @@ SELECT count(*) from dist;
SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(0 rows)
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
citus_shard_move_publication_10
(1 row)
SELECT count(*) from pg_publication;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) from pg_replication_slots;
count
---------------------------------------------------------------------
1
citus_shard_move_slot_10
(1 row)
SELECT count(*) from dist;
@ -153,23 +153,20 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
count
SELECT subname from pg_subscription;
subname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_publication;
count
SELECT pubname from pg_publication;
pubname
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from pg_replication_slots;
count
SELECT slot_name from pg_replication_slots;
slot_name
---------------------------------------------------------------------
0
(1 row)
(0 rows)
SELECT count(*) from dist;
count

View File

@ -90,7 +90,7 @@ SELECT citus_disable_node('localhost', :worker_2_port);
(1 row)
SELECT public.wait_until_metadata_sync(60000);
SELECT public.wait_until_metadata_sync(20000);
wait_until_metadata_sync
---------------------------------------------------------------------
@ -812,7 +812,7 @@ SELECT citus_disable_node('localhost', 9999);
(1 row)
SELECT public.wait_until_metadata_sync(60000);
SELECT public.wait_until_metadata_sync(20000);
wait_until_metadata_sync
---------------------------------------------------------------------

View File

@ -124,7 +124,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM mx_add_coo
0
(1 row)
-- test that distributed functions also use local execution
-- test that distributed functions also use sequential execution
CREATE OR REPLACE FUNCTION my_group_id()
RETURNS void
LANGUAGE plpgsql
@ -365,5 +365,6 @@ SELECT verify_metadata('localhost', :worker_1_port),
SET client_min_messages TO error;
DROP SCHEMA mx_add_coordinator CASCADE;
DROP USER reprefuser;
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -3,6 +3,7 @@
--
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1220000;
SET client_min_messages TO WARNING;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
@ -15,6 +16,9 @@ SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
(1 row)
-- cannot drop them at the end of the test file as other tests depend on them
DROP SCHEMA IF EXISTS citus_mx_test_schema, citus_mx_test_schema_join_1, citus_mx_test_schema_join_2 CASCADE;
DROP TABLE IF EXISTS nation_hash, lineitem_mx, orders_mx, customer_mx, nation_mx, part_mx, supplier_mx, mx_ddl_table, limit_orders_mx, multiple_hash_mx, app_analytics_events_mx, researchers_mx, labs_mx, objects_mx, articles_hash_mx, articles_single_shard_hash_mx, company_employees_mx;
-- create schema to test schema support
CREATE SCHEMA citus_mx_test_schema;
CREATE SCHEMA citus_mx_test_schema_join_1;
@ -42,7 +46,7 @@ BEGIN
END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
CREATE FUNCTION public.immutable_append_mx(old_values int[], new_value int)
CREATE OR REPLACE FUNCTION public.immutable_append_mx(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
CREATE OPERATOR citus_mx_test_schema.=== (
LEFTARG = int,
@ -65,14 +69,16 @@ SELECT quote_ident(current_setting('lc_collate')) as current_locale \gset
\endif
CREATE COLLATION citus_mx_test_schema.english (LOCALE=:current_locale);
CREATE TYPE citus_mx_test_schema.new_composite_type as (key1 text, key2 text);
CREATE TYPE order_side_mx AS ENUM ('buy', 'sell');
CREATE TYPE citus_mx_test_schema.order_side_mx AS ENUM ('buy', 'sell');
-- now create required stuff in the worker 1
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
-- show that we do not support creating citus local tables from mx workers for now
CREATE TABLE citus_local_table(a int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.
DROP TABLE citus_local_table;
SET search_path TO citus_mx_test_schema;
-- create operator
CREATE OPERATOR citus_mx_test_schema.=== (
@ -85,6 +91,7 @@ CREATE OPERATOR citus_mx_test_schema.=== (
);
-- now create required stuff in the worker 2
\c - - - :worker_2_port
SET client_min_messages TO WARNING;
SET search_path TO citus_mx_test_schema;
-- create operator
CREATE OPERATOR citus_mx_test_schema.=== (
@ -97,6 +104,7 @@ CREATE OPERATOR citus_mx_test_schema.=== (
);
-- connect back to the master, and do some more tests
\c - - - :master_port
SET client_min_messages TO WARNING;
SET citus.shard_replication_factor TO 1;
SET search_path TO public;
CREATE TABLE nation_hash(
@ -315,7 +323,7 @@ CREATE TABLE limit_orders_mx (
symbol text NOT NULL,
bidder_id bigint NOT NULL,
placed_at timestamp NOT NULL,
kind order_side_mx NOT NULL,
kind citus_mx_test_schema.order_side_mx NOT NULL,
limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00)
);
SET citus.shard_count TO 2;
@ -473,6 +481,7 @@ ORDER BY table_name::text;
(23 rows)
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner
FROM citus_tables
ORDER BY table_name::text;
@ -978,6 +987,6 @@ SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards OR
(469 rows)
-- Show that altering type name is not supported from worker node
ALTER TYPE order_side_mx RENAME TO temp_order_side_mx;
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;
ERROR: operation is not allowed on this node
HINT: Connect to the coordinator and run it again.

View File

@ -103,10 +103,11 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i
4
(1 row)
-- we omit the "SELECT bytes FROM fetch_intermediate_results..." line since it is flaky
SET LOCAL citus.grep_remote_commands TO '%multi_mx_insert_select_repartition%';
insert into target_table SELECT a*2 FROM source_table RETURNING a;
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213581_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213581_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213583_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213583_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartitioned_results_xxxxx_from_4213582_to_0','repartitioned_results_xxxxx_from_4213584_to_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT intermediate_result.a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_0,repartitioned_results_xxxxx_from_4213582_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a
NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT intermediate_result.a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a
a

View File

@ -103,10 +103,11 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM multi_mx_i
4
(1 row)
-- we omit the "SELECT bytes FROM fetch_intermediate_results..." line since it is flaky
SET LOCAL citus.grep_remote_commands TO '%multi_mx_insert_select_repartition%';
insert into target_table SELECT a*2 FROM source_table RETURNING a;
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213581_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213581_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213581 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT partition_index, 'repartitioned_results_xxxxx_from_4213583_to' || '_' || partition_index::text , rows_written FROM worker_partition_query_result('repartitioned_results_xxxxx_from_4213583_to','SELECT (a OPERATOR(pg_catalog.*) 2) AS a FROM multi_mx_insert_select_repartition.source_table_4213583 source_table WHERE true',0,'hash','{-2147483648,-715827883,715827882}'::text[],'{-715827884,715827881,2147483647}'::text[],true) WHERE rows_written > 0
NOTICE: executing the command locally: SELECT bytes FROM fetch_intermediate_results(ARRAY['repartitioned_results_xxxxx_from_4213582_to_0','repartitioned_results_xxxxx_from_4213584_to_0']::text[],'localhost',57638) bytes
NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213585 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_0,repartitioned_results_xxxxx_from_4213582_to_0,repartitioned_results_xxxxx_from_4213584_to_0}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a
NOTICE: executing the command locally: INSERT INTO multi_mx_insert_select_repartition.target_table_4213587 AS citus_table_alias (a) SELECT a FROM read_intermediate_results('{repartitioned_results_xxxxx_from_4213581_to_2}'::text[], 'binary'::citus_copy_format) intermediate_result(a integer) RETURNING citus_table_alias.a
a

View File

@ -9,7 +9,7 @@ SET citus.shard_count TO 8;
SET citus.shard_replication_factor TO 1;
\set VERBOSITY terse
-- Simulates a readonly node by setting default_transaction_read_only.
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
CREATE OR REPLACE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
RETURNS TEXT
LANGUAGE sql
AS $$
@ -27,7 +27,7 @@ CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
RETURNS void
LANGUAGE C STRICT
AS 'citus';
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
CREATE OR REPLACE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
declare
counter integer := -1;
begin
@ -846,7 +846,22 @@ SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
db_to_drop
(1 row)
DROP DATABASE db_to_drop;
DO $$
DECLARE
i int := 0;
BEGIN
WHILE NOT (SELECT bool_and(success) from run_command_on_all_nodes('DROP DATABASE IF EXISTS db_to_drop'))
LOOP
BEGIN
i := i + 1;
IF i > 5 THEN
RAISE EXCEPTION 'DROP DATABASE timed out';
END IF;
PERFORM pg_sleep(1);
END;
END LOOP;
END;
$$;
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
datname
---------------------------------------------------------------------

View File

@ -374,12 +374,21 @@ SELECT * FROM run_command_on_workers($$ SELECT 'text_search.config3'::regconfig;
(2 rows)
-- verify they are all removed locally
SELECT 'text_search.config1'::regconfig;
ERROR: text search configuration "text_search.config1" does not exist
SELECT 'text_search.config2'::regconfig;
ERROR: text search configuration "text_search.config2" does not exist
SELECT 'text_search.config3'::regconfig;
ERROR: text search configuration "text_search.config3" does not exist
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config1' AND cfgnamespace = 'text_search'::regnamespace;
?column?
---------------------------------------------------------------------
(0 rows)
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config2' AND cfgnamespace = 'text_search'::regnamespace;
?column?
---------------------------------------------------------------------
(0 rows)
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config3' AND cfgnamespace = 'text_search'::regnamespace;
?column?
---------------------------------------------------------------------
(0 rows)
-- verify that indexes created concurrently that would propagate a TEXT SEARCH CONFIGURATION object
SET citus.enable_ddl_propagation TO off;
CREATE TEXT SEARCH CONFIGURATION concurrent_index_config ( PARSER = default );
@ -434,12 +443,12 @@ $$) ORDER BY 1,2;
CREATE TEXT SEARCH CONFIGURATION text_search.manually_created_wrongly ( copy = french );
-- now we expect manually_created_wrongly(citus_backup_XXX) to show up when querying the configurations
SELECT * FROM run_command_on_workers($$
SELECT array_agg(cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_wrongly%';
SELECT array_agg(cfgname ORDER BY cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_wrongly%';
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------
localhost | 57637 | t | {manually_created_wrongly(citus_backup_0),manually_created_wrongly}
localhost | 57638 | t | {manually_created_wrongly(citus_backup_0),manually_created_wrongly}
localhost | 57637 | t | {manually_created_wrongly,manually_created_wrongly(citus_backup_0)}
localhost | 57638 | t | {manually_created_wrongly,manually_created_wrongly(citus_backup_0)}
(2 rows)
-- verify the objects get reused appropriately when the specification is the same
@ -458,7 +467,7 @@ CREATE TEXT SEARCH CONFIGURATION text_search.manually_created_correct ( copy = f
-- now we don't expect manually_created_correct(citus_backup_XXX) to show up when querying the configurations as the
-- original one is reused
SELECT * FROM run_command_on_workers($$
SELECT array_agg(cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_correct%';
SELECT array_agg(cfgname ORDER BY cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_correct%';
$$) ORDER BY 1,2;
nodename | nodeport | success | result
---------------------------------------------------------------------

View File

@ -165,7 +165,8 @@ test: with_executors with_join with_partitioning with_transactions with_dml
# Tests around DDL statements run on distributed tables
# ----------
test: multi_index_statements
test: multi_alter_table_statements alter_table_add_column
test: multi_alter_table_statements
test: alter_table_add_column
test: multi_alter_table_add_constraints
test: multi_alter_table_add_constraints_without_name
test: multi_alter_table_add_foreign_key_without_name

View File

@ -107,6 +107,29 @@ step "s3-show-activity"
select count(*) from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid);
}
step "s3-wait-backend-termination"
{
SET ROLE postgres;
DO $$
DECLARE
i int;
BEGIN
i := 0;
-- try for 5 sec then timeout
WHILE (select count(*) > 0 from get_all_active_transactions() where process_id IN (SELECT * FROM selected_pid))
LOOP
PERFORM pg_sleep(0.1);
i := i + 1;
IF i > 50 THEN
RAISE EXCEPTION 'Timeout while waiting for backend to terminate';
END IF;
END LOOP;
END;
$$;
}
session "s4"
step "s4-record-pid"
@ -123,4 +146,4 @@ step "s5-kill"
permutation "s1-grant" "s1-begin-insert" "s2-begin-insert" "s3-as-admin" "s3-as-user-1" "s3-as-readonly" "s3-as-monitor" "s1-commit" "s2-commit"
permutation "s4-record-pid" "s3-show-activity" "s5-kill" "s3-show-activity"
permutation "s4-record-pid" "s3-show-activity" "s5-kill" "s3-wait-backend-termination"

View File

@ -22,6 +22,7 @@ setup
teardown
{
SELECT wait_until_metadata_sync();
DROP FUNCTION trigger_metadata_sync();
DROP TABLE deadlock_detection_test;
DROP TABLE t2;

View File

@ -79,6 +79,8 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
ARRAY[:worker_2_node, :worker_2_node, :worker_2_node],
'force_logical');
SELECT public.wait_for_resource_cleanup();
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema";
-- Replication slots should be cleaned up

View File

@ -277,3 +277,5 @@ SELECT rolname FROM pg_authid WHERE rolname LIKE '%existing%' ORDER BY 1;
\c - - - :worker_1_port
SELECT rolname FROM pg_authid WHERE rolname LIKE '%existing%' ORDER BY 1;
\c - - - :master_port
DROP ROLE nondist_cascade_1, nondist_cascade_2, nondist_cascade_3, dist_cascade;

View File

@ -15,6 +15,8 @@ SET client_min_messages TO WARNING;
SELECT citus.mitmproxy('conn.allow()');
SET citus.next_shard_id TO 100800;
-- Needed because of issue #7306
SET citus.force_max_query_parallelization TO true;
-- always try the 1st replica before the 2nd replica.
SET citus.task_assignment_policy TO 'first-replica';

View File

@ -136,7 +136,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -155,7 +155,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port
SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -182,7 +182,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -201,7 +201,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port
SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -228,7 +228,7 @@ SELECT create_distributed_table('table_to_split', 'id');
ARRAY[:worker_1_node, :worker_2_node],
'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -247,7 +247,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port
SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;
@ -275,7 +275,7 @@ SELECT create_distributed_table('table_to_split', 'id');
'force_logical');
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
SELECT relname FROM pg_class where relname LIKE '%table_to_split_%' AND relkind = 'r' order by relname;
-- we need to allow connection so that we can connect to proxy
SELECT citus.mitmproxy('conn.allow()');
@ -295,7 +295,7 @@ SELECT create_distributed_table('table_to_split', 'id');
\c - postgres - :master_port
SELECT public.wait_for_resource_cleanup();
SELECT operation_id, object_type, object_name, node_group_id, policy_type
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name;
FROM pg_dist_cleanup where operation_id = 777 ORDER BY object_name, node_group_id;
\c - - - :worker_2_proxy_port
SET search_path TO "citus_failure_split_cleanup_schema", public, pg_catalog;

View File

@ -5,9 +5,9 @@ RESET client_min_messages;
-- Kill maintenance daemon so it gets restarted and gets a gpid containing our
-- nodeid
SELECT pg_terminate_backend(pid)
SELECT COUNT(pg_terminate_backend(pid)) >= 0
FROM pg_stat_activity
WHERE application_name = 'Citus Maintenance Daemon' \gset
WHERE application_name = 'Citus Maintenance Daemon';
-- reconnect to make sure we get a session with the gpid containing our nodeid
\c - - - -
@ -58,6 +58,8 @@ SELECT pg_cancel_backend(citus_backend_gpid());
\c - postgres - :master_port
DROP USER global_cancel_user;
SET client_min_messages TO DEBUG;
-- 10000000000 is the node id multiplier for global pid

View File

@ -33,12 +33,12 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
-- ROLLBACK
BEGIN;
@ -46,8 +46,8 @@ INSERT INTO target_table SELECT * FROM source_table;
INSERT INTO target_table SELECT * FROM source_table;
ROLLBACK;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
\set VERBOSITY TERSE
@ -59,12 +59,12 @@ SELECT worker_connection_count(:worker_1_port) AS worker_1_connections,
SAVEPOINT s1;
INSERT INTO target_table SELECT a, CASE WHEN a < 50 THEN b ELSE null END FROM source_table;
ROLLBACK TO SAVEPOINT s1;
SELECT worker_connection_count(:worker_1_port) - :worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :worker_2_connections) AS leaked_worker_2_connections;
END;
SELECT worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections AS leaked_worker_1_connections,
worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections AS leaked_worker_2_connections;
SELECT GREATEST(0, worker_connection_count(:worker_1_port) - :pre_xact_worker_1_connections) AS leaked_worker_1_connections,
GREATEST(0, worker_connection_count(:worker_2_port) - :pre_xact_worker_2_connections) AS leaked_worker_2_connections;
SET client_min_messages TO WARNING;
DROP SCHEMA insert_select_connection_leak CASCADE;

View File

@ -35,17 +35,17 @@ CREATE SUBSCRIPTION citus_shard_move_subscription_:postgres_oid
WITH (enabled=false, slot_name=citus_shard_move_slot_:postgres_oid);
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) FROM dist;
\c - - - :worker_1_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) FROM dist;
\c - - - :master_port
@ -53,11 +53,13 @@ SET search_path TO logical_replication;
select citus_move_shard_placement(6830002, 'localhost', :worker_1_port, 'localhost', :worker_2_port, 'force_logical');
SELECT public.wait_for_resource_cleanup();
-- the subscription is still there, as there is no cleanup record for it
-- we have created it manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
\c - - - :worker_1_port
@ -65,9 +67,9 @@ SET search_path TO logical_replication;
-- the publication and repslot are still there, as there are no cleanup records for them
-- we have created them manually
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
DROP PUBLICATION citus_shard_move_publication_:postgres_oid;
@ -76,9 +78,9 @@ SELECT pg_drop_replication_slot('citus_shard_move_slot_' || :postgres_oid);
\c - - - :worker_2_port
SET search_path TO logical_replication;
SELECT count(*) from pg_subscription;
SELECT count(*) from pg_publication;
SELECT count(*) from pg_replication_slots;
SELECT subname from pg_subscription;
SELECT pubname from pg_publication;
SELECT slot_name from pg_replication_slots;
SELECT count(*) from dist;
\c - - - :master_port

View File

@ -39,7 +39,7 @@ SELECT master_get_active_worker_nodes();
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(60000);
SELECT public.wait_until_metadata_sync(20000);
SELECT master_get_active_worker_nodes();
-- add some shard placements to the cluster
@ -328,7 +328,7 @@ SELECT 1 FROM master_add_inactive_node('localhost', 9996, groupid => :worker_2_g
SELECT master_add_inactive_node('localhost', 9999, groupid => :worker_2_group, nodecluster => 'olap', noderole => 'secondary');
SELECT master_activate_node('localhost', 9999);
SELECT citus_disable_node('localhost', 9999);
SELECT public.wait_until_metadata_sync(60000);
SELECT public.wait_until_metadata_sync(20000);
SELECT master_remove_node('localhost', 9999);
-- check that you can't manually add two primaries to a group

View File

@ -67,7 +67,7 @@ SET client_min_messages TO DEBUG;
SELECT count(*) FROM ref;
SELECT count(*) FROM ref;
-- test that distributed functions also use local execution
-- test that distributed functions also use sequential execution
CREATE OR REPLACE FUNCTION my_group_id()
RETURNS void
LANGUAGE plpgsql
@ -190,5 +190,6 @@ SELECT verify_metadata('localhost', :worker_1_port),
SET client_min_messages TO error;
DROP SCHEMA mx_add_coordinator CASCADE;
DROP USER reprefuser;
SET search_path TO DEFAULT;
RESET client_min_messages;

View File

@ -5,9 +5,15 @@
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1220000;
SET client_min_messages TO WARNING;
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
SELECT start_metadata_sync_to_node('localhost', :worker_2_port);
-- cannot drop them at the end of the test file as other tests depend on them
DROP SCHEMA IF EXISTS citus_mx_test_schema, citus_mx_test_schema_join_1, citus_mx_test_schema_join_2 CASCADE;
DROP TABLE IF EXISTS nation_hash, lineitem_mx, orders_mx, customer_mx, nation_mx, part_mx, supplier_mx, mx_ddl_table, limit_orders_mx, multiple_hash_mx, app_analytics_events_mx, researchers_mx, labs_mx, objects_mx, articles_hash_mx, articles_single_shard_hash_mx, company_employees_mx;
-- create schema to test schema support
CREATE SCHEMA citus_mx_test_schema;
CREATE SCHEMA citus_mx_test_schema_join_1;
@ -38,7 +44,7 @@ END;
$$
LANGUAGE 'plpgsql' IMMUTABLE;
CREATE FUNCTION public.immutable_append_mx(old_values int[], new_value int)
CREATE OR REPLACE FUNCTION public.immutable_append_mx(old_values int[], new_value int)
RETURNS int[] AS $$ SELECT old_values || new_value $$ LANGUAGE SQL IMMUTABLE;
CREATE OPERATOR citus_mx_test_schema.=== (
@ -67,14 +73,16 @@ SELECT quote_ident(current_setting('lc_collate')) as current_locale \gset
CREATE COLLATION citus_mx_test_schema.english (LOCALE=:current_locale);
CREATE TYPE citus_mx_test_schema.new_composite_type as (key1 text, key2 text);
CREATE TYPE order_side_mx AS ENUM ('buy', 'sell');
CREATE TYPE citus_mx_test_schema.order_side_mx AS ENUM ('buy', 'sell');
-- now create required stuff in the worker 1
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
-- show that we do not support creating citus local tables from mx workers for now
CREATE TABLE citus_local_table(a int);
SELECT citus_add_local_table_to_metadata('citus_local_table');
DROP TABLE citus_local_table;
SET search_path TO citus_mx_test_schema;
-- create operator
@ -89,6 +97,7 @@ CREATE OPERATOR citus_mx_test_schema.=== (
-- now create required stuff in the worker 2
\c - - - :worker_2_port
SET client_min_messages TO WARNING;
SET search_path TO citus_mx_test_schema;
@ -104,6 +113,7 @@ CREATE OPERATOR citus_mx_test_schema.=== (
-- connect back to the master, and do some more tests
\c - - - :master_port
SET client_min_messages TO WARNING;
SET citus.shard_replication_factor TO 1;
SET search_path TO public;
@ -308,7 +318,7 @@ CREATE TABLE limit_orders_mx (
symbol text NOT NULL,
bidder_id bigint NOT NULL,
placed_at timestamp NOT NULL,
kind order_side_mx NOT NULL,
kind citus_mx_test_schema.order_side_mx NOT NULL,
limit_price decimal NOT NULL DEFAULT 0.00 CHECK (limit_price >= 0.00)
);
@ -386,6 +396,7 @@ FROM citus_tables
ORDER BY table_name::text;
\c - - - :worker_1_port
SET client_min_messages TO WARNING;
SELECT table_name, citus_table_type, distribution_column, shard_count, table_owner
FROM citus_tables
@ -394,4 +405,4 @@ ORDER BY table_name::text;
SELECT shard_name, table_name, citus_table_type, shard_size FROM citus_shards ORDER BY shard_name::text;
-- Show that altering type name is not supported from worker node
ALTER TYPE order_side_mx RENAME TO temp_order_side_mx;
ALTER TYPE citus_mx_test_schema.order_side_mx RENAME TO temp_order_side_mx;

View File

@ -55,6 +55,8 @@ SET citus.log_local_commands to on;
-- INSERT .. SELECT via repartitioning with local execution
BEGIN;
select count(*) from source_table WHERE a = 1;
-- we omit the "SELECT bytes FROM fetch_intermediate_results..." line since it is flaky
SET LOCAL citus.grep_remote_commands TO '%multi_mx_insert_select_repartition%';
insert into target_table SELECT a*2 FROM source_table RETURNING a;
ROLLBACK;

View File

@ -14,7 +14,7 @@ SET citus.shard_replication_factor TO 1;
\set VERBOSITY terse
-- Simulates a readonly node by setting default_transaction_read_only.
CREATE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
CREATE OR REPLACE FUNCTION mark_node_readonly(hostname TEXT, port INTEGER, isreadonly BOOLEAN)
RETURNS TEXT
LANGUAGE sql
AS $$
@ -35,7 +35,7 @@ CREATE OR REPLACE FUNCTION raise_error_in_metadata_sync()
LANGUAGE C STRICT
AS 'citus';
CREATE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
CREATE OR REPLACE PROCEDURE wait_until_process_count(appname text, target_count int) AS $$
declare
counter integer := -1;
begin
@ -378,7 +378,22 @@ SELECT trigger_metadata_sync();
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';
DROP DATABASE db_to_drop;
DO $$
DECLARE
i int := 0;
BEGIN
WHILE NOT (SELECT bool_and(success) from run_command_on_all_nodes('DROP DATABASE IF EXISTS db_to_drop'))
LOOP
BEGIN
i := i + 1;
IF i > 5 THEN
RAISE EXCEPTION 'DROP DATABASE timed out';
END IF;
PERFORM pg_sleep(1);
END;
END LOOP;
END;
$$;
SELECT datname FROM pg_stat_activity WHERE application_name LIKE 'Citus Met%';

View File

@ -199,9 +199,9 @@ SELECT * FROM run_command_on_workers($$ SELECT 'text_search.config1'::regconfig;
SELECT * FROM run_command_on_workers($$ SELECT 'text_search.config2'::regconfig; $$) ORDER BY 1,2;
SELECT * FROM run_command_on_workers($$ SELECT 'text_search.config3'::regconfig; $$) ORDER BY 1,2;
-- verify they are all removed locally
SELECT 'text_search.config1'::regconfig;
SELECT 'text_search.config2'::regconfig;
SELECT 'text_search.config3'::regconfig;
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config1' AND cfgnamespace = 'text_search'::regnamespace;
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config2' AND cfgnamespace = 'text_search'::regnamespace;
SELECT 1 FROM pg_ts_config WHERE cfgname = 'config3' AND cfgnamespace = 'text_search'::regnamespace;
-- verify that indexes created concurrently that would propagate a TEXT SEARCH CONFIGURATION object
SET citus.enable_ddl_propagation TO off;
@ -235,7 +235,7 @@ CREATE TEXT SEARCH CONFIGURATION text_search.manually_created_wrongly ( copy = f
-- now we expect manually_created_wrongly(citus_backup_XXX) to show up when querying the configurations
SELECT * FROM run_command_on_workers($$
SELECT array_agg(cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_wrongly%';
SELECT array_agg(cfgname ORDER BY cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_wrongly%';
$$) ORDER BY 1,2;
-- verify the objects get reused appropriately when the specification is the same
@ -249,7 +249,7 @@ CREATE TEXT SEARCH CONFIGURATION text_search.manually_created_correct ( copy = f
-- now we don't expect manually_created_correct(citus_backup_XXX) to show up when querying the configurations as the
-- original one is reused
SELECT * FROM run_command_on_workers($$
SELECT array_agg(cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_correct%';
SELECT array_agg(cfgname ORDER BY cfgname) FROM pg_ts_config WHERE cfgname LIKE 'manually_created_correct%';
$$) ORDER BY 1,2;
CREATE SCHEMA "Text Search Requiring Quote's";