From c16dec88c3a468dfd559a59227f377e6f13ab2ca Mon Sep 17 00:00:00 2001 From: Murat Tuncer Date: Thu, 8 Sep 2016 16:50:57 +0300 Subject: [PATCH] Add UDF master_expire_table_cache --- src/backend/distributed/Makefile | 4 +- .../distributed/citus--5.2-1--5.2-2.sql | 2 + .../distributed/citus--5.2-2--5.2-3.sql | 5 + src/backend/distributed/citus.control | 2 +- .../master/master_delete_protocol.c | 61 ------ .../master/master_expire_table_cache.c | 191 ++++++++++++++++++ .../worker/worker_data_fetch_protocol.c | 59 ++++++ src/include/distributed/worker_protocol.h | 2 + .../expected/multi_expire_table_cache.out | 109 ++++++++++ .../regress/expected/multi_router_planner.out | 2 +- src/test/regress/multi_schedule | 4 + .../regress/sql/multi_expire_table_cache.sql | 54 +++++ 12 files changed, 431 insertions(+), 64 deletions(-) create mode 100644 src/backend/distributed/citus--5.2-2--5.2-3.sql create mode 100644 src/backend/distributed/master/master_expire_table_cache.c create mode 100644 src/test/regress/expected/multi_expire_table_cache.out create mode 100644 src/test/regress/sql/multi_expire_table_cache.sql diff --git a/src/backend/distributed/Makefile b/src/backend/distributed/Makefile index f7758f978..9fa8fcf5f 100644 --- a/src/backend/distributed/Makefile +++ b/src/backend/distributed/Makefile @@ -7,7 +7,7 @@ MODULE_big = citus EXTENSION = citus EXTVERSIONS = 5.0 5.0-1 5.0-2 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ - 5.2-1 5.2-2 + 5.2-1 5.2-2 5.2-3 # All citus--*.sql files in the source directory DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) @@ -54,6 +54,8 @@ $(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql cat $^ > $@ $(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql cat $^ > $@ +$(EXTENSION)--5.2-3.sql: $(EXTENSION)--5.2-2.sql $(EXTENSION)--5.2-2--5.2-3.sql + cat $^ > $@ NO_PGXS = 1 diff --git a/src/backend/distributed/citus--5.2-1--5.2-2.sql b/src/backend/distributed/citus--5.2-1--5.2-2.sql index c0f98d52f..6232b1c22 100644 --- a/src/backend/distributed/citus--5.2-1--5.2-2.sql +++ b/src/backend/distributed/citus--5.2-1--5.2-2.sql @@ -1,3 +1,5 @@ +/* citus--5.2-1--5.2-2.sql */ + CREATE OR REPLACE FUNCTION pg_catalog.citus_truncate_trigger() RETURNS trigger LANGUAGE plpgsql diff --git a/src/backend/distributed/citus--5.2-2--5.2-3.sql b/src/backend/distributed/citus--5.2-2--5.2-3.sql new file mode 100644 index 000000000..61a0d859e --- /dev/null +++ b/src/backend/distributed/citus--5.2-2--5.2-3.sql @@ -0,0 +1,5 @@ +/* citus--5.2-2--5.2-3.sql */ +CREATE OR REPLACE FUNCTION master_expire_table_cache(table_name regclass) + RETURNS VOID + LANGUAGE C STRICT + AS 'MODULE_PATHNAME', $$master_expire_table_cache$$; diff --git a/src/backend/distributed/citus.control b/src/backend/distributed/citus.control index f8dcc6c7e..ddb8ccdfe 100644 --- a/src/backend/distributed/citus.control +++ b/src/backend/distributed/citus.control @@ -1,6 +1,6 @@ # Citus extension comment = 'Citus distributed database' -default_version = '5.2-2' +default_version = '5.2-3' module_pathname = '$libdir/citus' relocatable = false schema = pg_catalog diff --git a/src/backend/distributed/master/master_delete_protocol.c b/src/backend/distributed/master/master_delete_protocol.c index bb9a9eda5..f25d0a780 100644 --- a/src/backend/distributed/master/master_delete_protocol.c +++ b/src/backend/distributed/master/master_delete_protocol.c @@ -60,8 +60,6 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList, Node *deleteCriteria); static int DropShards(Oid relationId, char *schemaName, char *relationName, List *deletableShardIntervalList); -static bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, - StringInfo queryString); /* exports for SQL callable functions */ @@ -556,62 +554,3 @@ ShardsMatchingDeleteCriteria(Oid relationId, List *shardIntervalList, return dropShardIntervalList; } - - -/* - * ExecuteRemoteCommand executes the given SQL command. This command could be an - * Insert, Update, or Delete statement, or a utility command that returns - * nothing. If query is successfuly executed, the function returns true. - * Otherwise, it returns false. - */ -static bool -ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString) -{ - char *nodeDatabase = get_database_name(MyDatabaseId); - int32 connectionId = -1; - QueryStatus queryStatus = CLIENT_INVALID_QUERY; - bool querySent = false; - bool queryReady = false; - bool queryDone = false; - - connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL); - if (connectionId == INVALID_CONNECTION_ID) - { - return false; - } - - querySent = MultiClientSendQuery(connectionId, queryString->data); - if (!querySent) - { - MultiClientDisconnect(connectionId); - return false; - } - - while (!queryReady) - { - ResultStatus resultStatus = MultiClientResultStatus(connectionId); - if (resultStatus == CLIENT_RESULT_READY) - { - queryReady = true; - } - else if (resultStatus == CLIENT_RESULT_BUSY) - { - long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; - pg_usleep(sleepIntervalPerCycle); - } - else - { - MultiClientDisconnect(connectionId); - return false; - } - } - - queryStatus = MultiClientQueryStatus(connectionId); - if (queryStatus == CLIENT_QUERY_DONE) - { - queryDone = true; - } - - MultiClientDisconnect(connectionId); - return queryDone; -} diff --git a/src/backend/distributed/master/master_expire_table_cache.c b/src/backend/distributed/master/master_expire_table_cache.c new file mode 100644 index 000000000..f40a3eec3 --- /dev/null +++ b/src/backend/distributed/master/master_expire_table_cache.c @@ -0,0 +1,191 @@ +/*------------------------------------------------------------------------- + * + * master_expire_table_cache.c + * UDF to refresh shard cache at workers + * + * This file contains master_expire_table_cache function. The function + * accepts a table name and drops tables cached shards from all workers. + * It does not change existing shard placement. Only drops cached copies + * of shards. + * + * Copyright (c) 2012-2016, Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "funcapi.h" + +#include "catalog/pg_class.h" +#include "distributed/master_protocol.h" +#include "distributed/metadata_cache.h" +#include "distributed/multi_join_order.h" +#include "distributed/pg_dist_shard.h" +#include "distributed/worker_manager.h" +#include "distributed/worker_protocol.h" +#include "utils/builtins.h" +#include "utils/lsyscache.h" + + +static List * FindAbsentShardPlacementsOnWorker(WorkerNode *workerNode, + ShardInterval **shardIntervalArray, + List **placementListArray, + int shardCount); +static void DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, + List *shardIntervalList); + +PG_FUNCTION_INFO_V1(master_expire_table_cache); + + +/* + * master_expire_table_cache drops table's caches shards in all workers. The function + * expects a passed table to be a small distributed table meaning it has less than + * large_table_shard_count. + */ +Datum +master_expire_table_cache(PG_FUNCTION_ARGS) +{ + Oid relationId = PG_GETARG_OID(0); + DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId); + List *workerNodeList = WorkerNodeList(); + ListCell *workerNodeCell = NULL; + int shardCount = cacheEntry->shardIntervalArrayLength; + ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray; + List **placementListArray = NULL; + int shardIndex = 0; + + if (shardCount == 0) + { + ereport(WARNING, (errmsg("Table has no shards, no action is taken"))); + PG_RETURN_VOID(); + } + + if (shardCount >= LargeTableShardCount) + { + ereport(ERROR, (errmsg("Must be called on tables smaller than %d shards", + LargeTableShardCount))); + } + + placementListArray = palloc(shardCount * sizeof(List *)); + + for (shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; + placementListArray[shardIndex] = + FinalizedShardPlacementList(shardInterval->shardId); + } + + foreach(workerNodeCell, workerNodeList) + { + WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell); + List *shardDropList = FindAbsentShardPlacementsOnWorker(workerNode, + shardIntervalArray, + placementListArray, + shardCount); + DropShardsFromWorker(workerNode, relationId, shardDropList); + } + + pfree(placementListArray); + + PG_RETURN_VOID(); +} + + +/* + * FindAbsentShardPlacementsOnWorker compiles shard interval list of shards + * that do not have registered placement at given worker node. + */ +List * +FindAbsentShardPlacementsOnWorker(WorkerNode *workerNode, + ShardInterval **shardIntervalArray, + List **placementListArray, int shardCount) +{ + List *absentShardIntervalList = NIL; + + int shardIndex = 0; + for (shardIndex = 0; shardIndex < shardCount; shardIndex++) + { + ShardInterval *shardInterval = shardIntervalArray[shardIndex]; + List *placementList = placementListArray[shardIndex]; + + ListCell *placementCell = NULL; + foreach(placementCell, placementList) + { + ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell); + + /* + * Append shard interval to absent list if none of its placements is on + * the worker. + */ + if (placement->nodePort == workerNode->workerPort && + strncmp(placement->nodeName, workerNode->workerName, WORKER_LENGTH) == 0) + { + break; + } + else if (lnext(placementCell) == NULL) + { + absentShardIntervalList = lappend(absentShardIntervalList, shardInterval); + } + } + } + + return absentShardIntervalList; +} + + +/* + * DropShardsFromWorker drops provided shards belonging to a relation from + * given worker. It does not change any metadata at the master. + */ +static void +DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardIntervalList) +{ + Oid schemaId = get_rel_namespace(relationId); + char *schemaName = get_namespace_name(schemaId); + char *relationName = get_rel_name(relationId); + char relationKind = get_rel_relkind(relationId); + StringInfo shardName = makeStringInfo(); + StringInfo workerCommand = makeStringInfo(); + ListCell *shardIntervalCell = NULL; + + if (shardIntervalList == NIL) + { + return; + } + + if (relationKind == RELKIND_RELATION) + { + appendStringInfo(workerCommand, DROP_REGULAR_TABLE_COMMAND, ""); + } + else if (relationKind == RELKIND_FOREIGN_TABLE) + { + appendStringInfo(workerCommand, DROP_FOREIGN_TABLE_COMMAND, ""); + } + else + { + ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE), + errmsg("expire target is not a regular or foreign table"))); + } + + foreach(shardIntervalCell, shardIntervalList) + { + ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell); + char *quotedShardName = NULL; + + resetStringInfo(shardName); + appendStringInfo(shardName, "%s", relationName); + AppendShardIdToStringInfo(shardName, shardInterval->shardId); + quotedShardName = quote_qualified_identifier(schemaName, shardName->data); + appendStringInfo(workerCommand, "%s", quotedShardName); + + /* append a comma after the shard name if there are more shards */ + if (lnext(shardIntervalCell) != NULL) + { + appendStringInfo(workerCommand, ", "); + } + } + + appendStringInfo(workerCommand, " CASCADE"); + + ExecuteRemoteCommand(workerNode->workerName, workerNode->workerPort, workerCommand); +} diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index f39eeb4da..fe2a48787 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -1002,6 +1002,65 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, } +/* + * ExecuteRemoteCommand executes the given SQL command. This command could be an + * Insert, Update, or Delete statement, or a utility command that returns + * nothing. If query is successfuly executed, the function returns true. + * Otherwise, it returns false. + */ +bool +ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString) +{ + char *nodeDatabase = get_database_name(MyDatabaseId); + int32 connectionId = -1; + QueryStatus queryStatus = CLIENT_INVALID_QUERY; + bool querySent = false; + bool queryReady = false; + bool queryDone = false; + + connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL); + if (connectionId == INVALID_CONNECTION_ID) + { + return false; + } + + querySent = MultiClientSendQuery(connectionId, queryString->data); + if (!querySent) + { + MultiClientDisconnect(connectionId); + return false; + } + + while (!queryReady) + { + ResultStatus resultStatus = MultiClientResultStatus(connectionId); + if (resultStatus == CLIENT_RESULT_READY) + { + queryReady = true; + } + else if (resultStatus == CLIENT_RESULT_BUSY) + { + long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L; + pg_usleep(sleepIntervalPerCycle); + } + else + { + MultiClientDisconnect(connectionId); + return false; + } + } + + queryStatus = MultiClientQueryStatus(connectionId); + if (queryStatus == CLIENT_QUERY_DONE) + { + queryDone = true; + } + + MultiClientDisconnect(connectionId); + return queryDone; +} + + /* * Parses the given DDL command, and returns the tree node for parsed command. */ diff --git a/src/include/distributed/worker_protocol.h b/src/include/distributed/worker_protocol.h index 49cd1ec48..df0dcd72d 100644 --- a/src/include/distributed/worker_protocol.h +++ b/src/include/distributed/worker_protocol.h @@ -126,6 +126,8 @@ extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort, extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, StringInfo queryString); +extern bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, + StringInfo queryString); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); diff --git a/src/test/regress/expected/multi_expire_table_cache.out b/src/test/regress/expected/multi_expire_table_cache.out new file mode 100644 index 000000000..4de4475dc --- /dev/null +++ b/src/test/regress/expected/multi_expire_table_cache.out @@ -0,0 +1,109 @@ +--- +--- MULTI_EXPIRE_TABLE_CACHE +--- +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; +-- create test table +CREATE TABLE large_table(a int, b int); +SELECT master_create_distributed_table('large_table', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('large_table', 8, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +CREATE TABLE broadcast_table(a int, b int); +SELECT master_create_distributed_table('broadcast_table', 'a', 'hash'); + master_create_distributed_table +--------------------------------- + +(1 row) + +SELECT master_create_worker_shards('broadcast_table', 2, 1); + master_create_worker_shards +----------------------------- + +(1 row) + +-- verify only small tables are supported +SELECT master_expire_table_cache('large_table'); +ERROR: Must be called on tables smaller than 4 shards +SELECT master_expire_table_cache('broadcast_table'); + master_expire_table_cache +--------------------------- + +(1 row) + +-- run a join so that broadcast tables are cached on other workers +SELECT * from large_table l, broadcast_table b where l.a = b.b; + a | b | a | b +---+---+---+--- +(0 rows) + +-- insert some data +INSERT INTO large_table VALUES(1, 1); +INSERT INTO large_table VALUES(1, 2); +INSERT INTO large_table VALUES(2, 1); +INSERT INTO large_table VALUES(2, 2); +INSERT INTO large_table VALUES(3, 1); +INSERT INTO large_table VALUES(3, 2); +INSERT INTO broadcast_table VALUES(1, 1); +-- verify returned results are wrong +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + a | b | a | b +---+---+---+--- + 1 | 1 | 1 | 1 + 2 | 1 | 1 | 1 +(2 rows) + +-- expire cache and re-run, results should be correct this time +SELECT master_expire_table_cache('broadcast_table'); + master_expire_table_cache +--------------------------- + +(1 row) + +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + a | b | a | b +---+---+---+--- + 1 | 1 | 1 | 1 + 2 | 1 | 1 | 1 + 3 | 1 | 1 | 1 +(3 rows) + +-- insert some more data into broadcast table +INSERT INTO broadcast_table VALUES(2, 2); +-- run the same query, get wrong results +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + a | b | a | b +---+---+---+--- + 1 | 1 | 1 | 1 + 2 | 1 | 1 | 1 + 3 | 1 | 1 | 1 + 3 | 2 | 2 | 2 +(4 rows) + +-- expire cache and re-run, results should be correct this time +SELECT master_expire_table_cache('broadcast_table'); + master_expire_table_cache +--------------------------- + +(1 row) + +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + a | b | a | b +---+---+---+--- + 1 | 1 | 1 | 1 + 1 | 2 | 2 | 2 + 2 | 1 | 1 | 1 + 2 | 2 | 2 | 2 + 3 | 1 | 1 | 1 + 3 | 2 | 2 | 2 +(6 rows) + +DROP TABLE large_table, broadcast_table; diff --git a/src/test/regress/expected/multi_router_planner.out b/src/test/regress/expected/multi_router_planner.out index 66437fe48..68a44ac92 100644 --- a/src/test/regress/expected/multi_router_planner.out +++ b/src/test/regress/expected/multi_router_planner.out @@ -1549,7 +1549,7 @@ DROP MATERIALIZED VIEW mv_articles_hash; DEBUG: drop auto-cascades to type mv_articles_hash DEBUG: drop auto-cascades to type mv_articles_hash[] DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash -DEBUG: EventTriggerInvoke 16729 +DEBUG: EventTriggerInvoke 16733 CREATE MATERIALIZED VIEW mv_articles_hash_error AS SELECT * FROM articles_hash WHERE author_id in (1,2); NOTICE: cannot use shard pruning with ANY/ALL (array expression) diff --git a/src/test/regress/multi_schedule b/src/test/regress/multi_schedule index 694ea26a8..c21b5e5ac 100644 --- a/src/test/regress/multi_schedule +++ b/src/test/regress/multi_schedule @@ -165,3 +165,7 @@ test: multi_function_evaluation # multi_truncate tests truncate functionality for distributed tables # ---------- test: multi_truncate +# ---------- +# multi_expire_table_cache tests for broadcast tables +# ---------- +test: multi_expire_table_cache diff --git a/src/test/regress/sql/multi_expire_table_cache.sql b/src/test/regress/sql/multi_expire_table_cache.sql new file mode 100644 index 000000000..38d1ac07c --- /dev/null +++ b/src/test/regress/sql/multi_expire_table_cache.sql @@ -0,0 +1,54 @@ +--- +--- MULTI_EXPIRE_TABLE_CACHE +--- + + +ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000; +ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000; + +-- create test table +CREATE TABLE large_table(a int, b int); +SELECT master_create_distributed_table('large_table', 'a', 'hash'); +SELECT master_create_worker_shards('large_table', 8, 1); + +CREATE TABLE broadcast_table(a int, b int); +SELECT master_create_distributed_table('broadcast_table', 'a', 'hash'); +SELECT master_create_worker_shards('broadcast_table', 2, 1); + +-- verify only small tables are supported +SELECT master_expire_table_cache('large_table'); +SELECT master_expire_table_cache('broadcast_table'); + +-- run a join so that broadcast tables are cached on other workers +SELECT * from large_table l, broadcast_table b where l.a = b.b; + +-- insert some data +INSERT INTO large_table VALUES(1, 1); +INSERT INTO large_table VALUES(1, 2); +INSERT INTO large_table VALUES(2, 1); +INSERT INTO large_table VALUES(2, 2); +INSERT INTO large_table VALUES(3, 1); +INSERT INTO large_table VALUES(3, 2); + +INSERT INTO broadcast_table VALUES(1, 1); + +-- verify returned results are wrong +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + +-- expire cache and re-run, results should be correct this time +SELECT master_expire_table_cache('broadcast_table'); + +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + +-- insert some more data into broadcast table +INSERT INTO broadcast_table VALUES(2, 2); + +-- run the same query, get wrong results +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + +-- expire cache and re-run, results should be correct this time +SELECT master_expire_table_cache('broadcast_table'); +SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b; + + +DROP TABLE large_table, broadcast_table;