diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index e97169c6c..1ea5260b6 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -11,7 +11,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 6.0-1 6.0-2 6.0-3 6.0-4 6.0-5 6.0-6 6.0-7 6.0-8 6.0-9 6.0-10 6.0-11 6.0-12 6.0-13 6.0-14 6.0-15 6.0-16 6.0-17 6.0-18 \ 6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \ 6.2-1 6.2-2 6.2-3 6.2-4 \ - 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 + 7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -161,6 +161,8 @@ $(EXTENSION)--7.0-10.sql: $(EXTENSION)--7.0-9.sql $(EXTENSION)--7.0-9--7.0-10.sq cat $^ > $@ $(EXTENSION)--7.0-11.sql: $(EXTENSION)--7.0-10.sql $(EXTENSION)--7.0-10--7.0-11.sql cat $^ > $@ +$(EXTENSION)--7.0-12.sql: $(EXTENSION)--7.0-11.sql $(EXTENSION)--7.0-11--7.0-12.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--7.0-11--7.0-12.sql b/src/backend/distributed/citus--7.0-11--7.0-12.sql new file mode 100644 index 000000000..8b987afec --- /dev/null +++ b/src/backend/distributed/citus--7.0-11--7.0-12.sql @@ -0,0 +1,8 @@ +/* citus--7.0-11--7.0-12.sql */ + +CREATE OR REPLACE FUNCTION pg_catalog.citus_create_restore_point(text) +RETURNS pg_lsn +LANGUAGE C STRICT +AS 'MODULE_PATHNAME', $$citus_create_restore_point$$; +COMMENT ON FUNCTION pg_catalog.citus_create_restore_point(text) +IS 'temporarily block writes and create a named restore point on all nodes'; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index 756c98ce6..9311aa71f 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '7.0-11' +default_version = '7.0-12' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/citus_create_restore_point.c b/src/backend/distributed/master/citus_create_restore_point.c new file mode 100644 index 000000000..7c9c0015a --- /dev/null +++ b/src/backend/distributed/master/citus_create_restore_point.c @@ -0,0 +1,205 @@ +/*------------------------------------------------------------------------- + * + * citus_create_restore_point.c + * + * UDF for creating a consistent restore point across all nodes. + * + * Copyright (c) 2017, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "libpq-fe.h" + +#include "access/xlog.h" +#include "access/xlog_internal.h" +#include "catalog/pg_type.h" +#include "distributed/connection_management.h" +#include "distributed/master_metadata_utility.h" +#include "distributed/metadata_cache.h" +#include "distributed/remote_commands.h" +#include "nodes/pg_list.h" +#include "storage/lmgr.h" +#include "storage/lock.h" +#include "utils/builtins.h" +#include "utils/pg_lsn.h" + + +#define CREATE_RESTORE_POINT_COMMAND "SELECT pg_catalog.pg_create_restore_point($1::text)" + + +/* local functions forward declarations */ +static List * OpenConnectionsToAllNodes(void); +static void BlockAllDistributedWrites(void); +static void CreateRemoteRestorePoints(char *restoreName, List *connectionList); + + +/* exports for SQL callable functions */ +PG_FUNCTION_INFO_V1(citus_create_restore_point); + + +/* + * citus_create_restore_point blocks writes to distributed tables and then + * runs pg_create_restore_point on all nodes. This creates a consistent + * restore point under the assumption that there are no other writers + * than the coordinator. + */ +Datum +citus_create_restore_point(PG_FUNCTION_ARGS) +{ + text *restoreNameText = PG_GETARG_TEXT_P(0); + char *restoreNameString = NULL; + XLogRecPtr localRestorePoint = InvalidXLogRecPtr; + List *connectionList = NIL; + + CheckCitusVersion(ERROR); + EnsureSuperUser(); + EnsureCoordinator(); + + if (RecoveryInProgress()) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + (errmsg("recovery is in progress"), + errhint("WAL control functions cannot be executed during recovery.")))); + } + + if (!XLogIsNeeded()) + { + ereport(ERROR, + (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), + errmsg("WAL level not sufficient for creating a restore point"), + errhint("wal_level must be set to \"replica\" or \"logical\" at server " + "start."))); + } + + restoreNameString = text_to_cstring(restoreNameText); + if (strlen(restoreNameString) >= MAXFNAMELEN) + { + ereport(ERROR, + (errcode(ERRCODE_INVALID_PARAMETER_VALUE), + errmsg("value too long for restore point (maximum %d characters)", + MAXFNAMELEN - 1))); + } + + /* establish connections to all nodes before taking any locks */ + connectionList = OpenConnectionsToAllNodes(); + + /* + * Send a BEGIN to bust through pgbouncer. We won't actually commit since + * that takes time. Instead we just close the connections and roll back, + * which doesn't undo pg_create_restore_point. + */ + RemoteTransactionListBegin(connectionList); + + /* DANGER: finish as quickly as possible after this */ + BlockAllDistributedWrites(); + + /* do local restore point first to bail out early if something goes wrong */ + localRestorePoint = XLogRestorePoint(restoreNameString); + + /* run pg_create_restore_point on all nodes */ + CreateRemoteRestorePoints(restoreNameString, connectionList); + + PG_RETURN_LSN(localRestorePoint); +} + + +/* + * OpenConnectionsToAllNodes opens connections to all nodes and returns the list + * of connections. + */ +static List * +OpenConnectionsToAllNodes(void) +{ + List *connectionList = NIL; + List *workerNodeList = NIL; + ListCell *workerNodeCell = NULL; + int connectionFlags = FORCE_NEW_CONNECTION; + + workerNodeList = ActivePrimaryNodeList(); + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + MultiConnection *connection = NULL; + + connection = StartNodeConnection(connectionFlags, workerNode->workerName, + workerNode->workerPort); + MarkRemoteTransactionCritical(connection); + + connectionList = lappend(connectionList, connection); + } + + FinishConnectionListEstablishment(connectionList); + + return connectionList; +} + + +/* + * BlockAllDistributedWrites blocks all modifications to distributed tables + * by taking ShareRowExclusive locks on all distributed tables. + */ +static void +BlockAllDistributedWrites(void) +{ + ListCell *distributedTableCell = NULL; + List *distributedTableList = DistributedTableList(); + + LockRelationOid(DistNodeRelationId(), ExclusiveLock); + LockRelationOid(DistPartitionRelationId(), ExclusiveLock); + + foreach(distributedTableCell, distributedTableList) + { + DistTableCacheEntry *cacheEntry = + (DistTableCacheEntry *) lfirst(distributedTableCell); + + /* block all modifications */ + LockRelationOid(cacheEntry->relationId, ShareRowExclusiveLock); + } +} + + +/* + * CreateRemoteRestorePoints creates a restore point via each of the + * connections in the list in parallel. + */ +static void +CreateRemoteRestorePoints(char *restoreName, List *connectionList) +{ + ListCell *connectionCell = NULL; + int parameterCount = 1; + Oid parameterTypes[1] = { TEXTOID }; + const char *parameterValues[1] = { restoreName }; + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + int querySent = SendRemoteCommandParams(connection, CREATE_RESTORE_POINT_COMMAND, + parameterCount, parameterTypes, + parameterValues); + if (querySent == 0) + { + ReportConnectionError(connection, ERROR); + } + } + + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + PGresult *result = GetRemoteCommandResult(connection, true); + if (!IsResponseOK(result)) + { + ReportResultError(connection, result, ERROR); + } + + PQclear(result); + + ForgetResults(connection); + CloseConnection(connection); + } +} diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 330b16581..30e1be29a 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -126,6 +126,33 @@ RemoteTransactionBegin(struct MultiConnection *connection) } +/* + * RemoteTransactionListBegin sends BEGIN over all connections in the + * given connection list and waits for all of them to finish. + */ +void +RemoteTransactionListBegin(List *connectionList) +{ + ListCell *connectionCell = NULL; + + /* send BEGIN to all nodes */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + StartRemoteTransactionBegin(connection); + } + + /* wait for BEGIN to finish on all nodes */ + foreach(connectionCell, connectionList) + { + MultiConnection *connection = (MultiConnection *) lfirst(connectionCell); + + FinishRemoteTransactionBegin(connection); + } +} + + /* * StartRemoteTransactionCommit initiates transaction commit in a non-blocking * manner. If the transaction is in a failed state, it'll instead get rolled diff --git a/src/include/distributed/remote_transaction.h b/src/include/distributed/remote_transaction.h index e0ffb5dcd..45743c9b8 100644 --- a/src/include/distributed/remote_transaction.h +++ b/src/include/distributed/remote_transaction.h @@ -70,6 +70,7 @@ typedef struct RemoteTransaction extern void StartRemoteTransactionBegin(struct MultiConnection *connection); extern void FinishRemoteTransactionBegin(struct MultiConnection *connection); extern void RemoteTransactionBegin(struct MultiConnection *connection); +extern void RemoteTransactionListBegin(List *connectionList); extern void StartRemoteTransactionPrepare(struct MultiConnection *connection); extern void FinishRemoteTransactionPrepare(struct MultiConnection *connection); diff --git a/src/test/regress/expected/isolation_create_restore_point.out b/src/test/regress/expected/isolation_create_restore_point.out new file mode 100644 index 000000000..f4a29402f --- /dev/null +++ b/src/test/regress/expected/isolation_create_restore_point.out @@ -0,0 +1,182 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-create-distributed s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-create-distributed: + CREATE TABLE test_create_distributed_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('test_create_distributed_table', 'test_id'); + +create_distributed_table + + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-insert s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-insert: + INSERT INTO restore_table VALUES (1,'hello'); + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-modify-multiple s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-modify-multiple: + SELECT master_modify_multiple_shards($$UPDATE restore_table SET data = 'world'$$); + +master_modify_multiple_shards + +0 +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-ddl s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-ddl: + ALTER TABLE restore_table ADD COLUMN x int; + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-copy s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-copy: + COPY restore_table FROM PROGRAM 'echo 1,hello' WITH CSV; + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-drop s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-drop: + DROP TABLE restore_table; + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-add-node s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-add-node: + SELECT 1 FROM master_add_inactive_node('localhost', 9999); + +?column? + +1 +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 + +starting permutation: s1-begin s1-remove-node s2-create-restore s1-commit +create_distributed_table + + +step s1-begin: + BEGIN; + +step s1-remove-node: + SELECT master_remove_node('localhost', 9999); + +master_remove_node + + +step s2-create-restore: + SELECT 1 FROM citus_create_restore_point('citus-test'); + +step s1-commit: + COMMIT; + +step s2-create-restore: <... completed> +?column? + +1 diff --git a/src/test/regress/expected/multi_extension.out b/src/test/regress/expected/multi_extension.out index 58cfbe626..f861eea78 100644 --- a/src/test/regress/expected/multi_extension.out +++ b/src/test/regress/expected/multi_extension.out @@ -121,6 +121,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-8'; ALTER EXTENSION citus UPDATE TO '7.0-9'; ALTER EXTENSION citus UPDATE TO '7.0-10'; ALTER EXTENSION citus UPDATE TO '7.0-11'; +ALTER EXTENSION citus UPDATE TO '7.0-12'; -- show running version SHOW citus.version; citus.version diff --git a/src/test/regress/expected/multi_utilities.out b/src/test/regress/expected/multi_utilities.out index 381ca9fba..898c9b7f5 100644 --- a/src/test/regress/expected/multi_utilities.out +++ b/src/test/regress/expected/multi_utilities.out @@ -348,6 +348,13 @@ SELECT worker_hash('(1, 2)'::test_composite_type); SELECT citus_truncate_trigger(); ERROR: must be called as trigger +-- confirm that citus_create_restore_point works +SELECT 1 FROM citus_create_restore_point('regression-test'); + ?column? +---------- + 1 +(1 row) + -- TODO: support VERBOSE -- VACUUM VERBOSE dustbunnies; -- VACUUM (FULL, VERBOSE) dustbunnies; diff --git a/src/test/regress/isolation_schedule b/src/test/regress/isolation_schedule index 135fbad55..5e31d931c 100644 --- a/src/test/regress/isolation_schedule +++ b/src/test/regress/isolation_schedule @@ -16,3 +16,7 @@ test: isolation_distributed_transaction_id isolation_progress_monitoring test: isolation_dump_local_wait_edges isolation_dump_global_wait_edges test: isolation_replace_wait_function + +# creating a restore point briefly blocks all +# writes, run this test serially. +test: isolation_create_restore_point diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 3fda96e55..cd3f03286 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -226,6 +226,7 @@ push(@pgOptions, '-c', "listen_addresses='${host}'"); push(@pgOptions, '-c', "unix_socket_directories="); push(@pgOptions, '-c', "fsync=off"); push(@pgOptions, '-c', "shared_preload_libraries=citus"); +push(@pgOptions, '-c', "wal_level=logical"); # Citus options set for the tests push(@pgOptions, '-c', "citus.shard_max_size=300kB"); diff --git a/src/test/regress/specs/isolation_create_restore_point.spec b/src/test/regress/specs/isolation_create_restore_point.spec new file mode 100644 index 000000000..ed9bc7078 --- /dev/null +++ b/src/test/regress/specs/isolation_create_restore_point.spec @@ -0,0 +1,94 @@ +setup +{ + CREATE TABLE restore_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('restore_table', 'test_id'); +} + +teardown +{ + DROP TABLE IF EXISTS restore_table, test_create_distributed_table; +} + +session "s1" + +step "s1-begin" +{ + BEGIN; +} + +step "s1-create-distributed" +{ + CREATE TABLE test_create_distributed_table (test_id integer NOT NULL, data text); + SELECT create_distributed_table('test_create_distributed_table', 'test_id'); +} + +step "s1-insert" +{ + INSERT INTO restore_table VALUES (1,'hello'); +} + +step "s1-modify-multiple" +{ + SELECT master_modify_multiple_shards($$UPDATE restore_table SET data = 'world'$$); +} + +step "s1-ddl" +{ + ALTER TABLE restore_table ADD COLUMN x int; +} + +step "s1-copy" +{ + COPY restore_table FROM PROGRAM 'echo 1,hello' WITH CSV; +} + +step "s1-drop" +{ + DROP TABLE restore_table; +} + +step "s1-add-node" +{ + SELECT 1 FROM master_add_inactive_node('localhost', 9999); +} + +step "s1-remove-node" +{ + SELECT master_remove_node('localhost', 9999); +} + +step "s1-commit" +{ + COMMIT; +} + +session "s2" + +step "s2-create-restore" +{ + SELECT 1 FROM citus_create_restore_point('citus-test'); +} + +# verify that citus_create_restore_point is blocked by concurrent create_distributed_table +permutation "s1-begin" "s1-create-distributed" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent INSERT +permutation "s1-begin" "s1-insert" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent master_modify_multiple_shards +permutation "s1-begin" "s1-modify-multiple" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent DDL +permutation "s1-begin" "s1-ddl" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent COPY +permutation "s1-begin" "s1-copy" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent DROP TABLE +permutation "s1-begin" "s1-drop" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent master_add_node +permutation "s1-begin" "s1-add-node" "s2-create-restore" "s1-commit" + +# verify that citus_create_restore_point is blocked by concurrent master_remove_node +permutation "s1-begin" "s1-remove-node" "s2-create-restore" "s1-commit" diff --git a/src/test/regress/sql/multi_extension.sql b/src/test/regress/sql/multi_extension.sql index dd5f7fc2a..027152af6 100644 --- a/src/test/regress/sql/multi_extension.sql +++ b/src/test/regress/sql/multi_extension.sql @@ -121,6 +121,7 @@ ALTER EXTENSION citus UPDATE TO '7.0-8'; ALTER EXTENSION citus UPDATE TO '7.0-9'; ALTER EXTENSION citus UPDATE TO '7.0-10'; ALTER EXTENSION citus UPDATE TO '7.0-11'; +ALTER EXTENSION citus UPDATE TO '7.0-12'; -- show running version SHOW citus.version; diff --git a/src/test/regress/sql/multi_utilities.sql b/src/test/regress/sql/multi_utilities.sql index a1d6cd433..dae0ca2f7 100644 --- a/src/test/regress/sql/multi_utilities.sql +++ b/src/test/regress/sql/multi_utilities.sql @@ -224,6 +224,9 @@ SELECT worker_hash('(1, 2)'::test_composite_type); SELECT citus_truncate_trigger(); +-- confirm that citus_create_restore_point works +SELECT 1 FROM citus_create_restore_point('regression-test'); + -- TODO: support VERBOSE -- VACUUM VERBOSE dustbunnies; -- VACUUM (FULL, VERBOSE) dustbunnies;