Add UDF master_expire_table_cache

pull/784/head
Murat Tuncer 2016-09-08 16:50:57 +03:00
parent 0caf0d95f1
commit c16dec88c3
12 changed files with 431 additions and 64 deletions

View File

@ -7,7 +7,7 @@ MODULE_big = citus
EXTENSION = citus EXTENSION = citus
EXTVERSIONS = 5.0 5.0-1 5.0-2 \ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \ 5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7 5.1-8 \
5.2-1 5.2-2 5.2-1 5.2-2 5.2-3
# All citus--*.sql files in the source directory # All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql)) DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -54,6 +54,8 @@ $(EXTENSION)--5.2-1.sql: $(EXTENSION)--5.1-8.sql $(EXTENSION)--5.1-8--5.2-1.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql $(EXTENSION)--5.2-2.sql: $(EXTENSION)--5.2-1.sql $(EXTENSION)--5.2-1--5.2-2.sql
cat $^ > $@ cat $^ > $@
$(EXTENSION)--5.2-3.sql: $(EXTENSION)--5.2-2.sql $(EXTENSION)--5.2-2--5.2-3.sql
cat $^ > $@
NO_PGXS = 1 NO_PGXS = 1

View File

@ -1,3 +1,5 @@
/* citus--5.2-1--5.2-2.sql */
CREATE OR REPLACE FUNCTION pg_catalog.citus_truncate_trigger() CREATE OR REPLACE FUNCTION pg_catalog.citus_truncate_trigger()
RETURNS trigger RETURNS trigger
LANGUAGE plpgsql LANGUAGE plpgsql

View File

@ -0,0 +1,5 @@
/* citus--5.2-2--5.2-3.sql */
CREATE OR REPLACE FUNCTION master_expire_table_cache(table_name regclass)
RETURNS VOID
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_expire_table_cache$$;

View File

@ -1,6 +1,6 @@
# Citus extension # Citus extension
comment = 'Citus distributed database' comment = 'Citus distributed database'
default_version = '5.2-2' default_version = '5.2-3'
module_pathname = '$libdir/citus' module_pathname = '$libdir/citus'
relocatable = false relocatable = false
schema = pg_catalog schema = pg_catalog

View File

@ -60,8 +60,6 @@ static List * ShardsMatchingDeleteCriteria(Oid relationId, List *shardList,
Node *deleteCriteria); Node *deleteCriteria);
static int DropShards(Oid relationId, char *schemaName, char *relationName, static int DropShards(Oid relationId, char *schemaName, char *relationName,
List *deletableShardIntervalList); List *deletableShardIntervalList);
static bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort,
StringInfo queryString);
/* exports for SQL callable functions */ /* exports for SQL callable functions */
@ -556,62 +554,3 @@ ShardsMatchingDeleteCriteria(Oid relationId, List *shardIntervalList,
return dropShardIntervalList; return dropShardIntervalList;
} }
/*
* ExecuteRemoteCommand executes the given SQL command. This command could be an
* Insert, Update, or Delete statement, or a utility command that returns
* nothing. If query is successfuly executed, the function returns true.
* Otherwise, it returns false.
*/
static bool
ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString)
{
char *nodeDatabase = get_database_name(MyDatabaseId);
int32 connectionId = -1;
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
bool querySent = false;
bool queryReady = false;
bool queryDone = false;
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
if (connectionId == INVALID_CONNECTION_ID)
{
return false;
}
querySent = MultiClientSendQuery(connectionId, queryString->data);
if (!querySent)
{
MultiClientDisconnect(connectionId);
return false;
}
while (!queryReady)
{
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return false;
}
}
queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
queryDone = true;
}
MultiClientDisconnect(connectionId);
return queryDone;
}

View File

@ -0,0 +1,191 @@
/*-------------------------------------------------------------------------
*
* master_expire_table_cache.c
* UDF to refresh shard cache at workers
*
* This file contains master_expire_table_cache function. The function
* accepts a table name and drops tables cached shards from all workers.
* It does not change existing shard placement. Only drops cached copies
* of shards.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "catalog/pg_class.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_cache.h"
#include "distributed/multi_join_order.h"
#include "distributed/pg_dist_shard.h"
#include "distributed/worker_manager.h"
#include "distributed/worker_protocol.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
static List * FindAbsentShardPlacementsOnWorker(WorkerNode *workerNode,
ShardInterval **shardIntervalArray,
List **placementListArray,
int shardCount);
static void DropShardsFromWorker(WorkerNode *workerNode, Oid relationId,
List *shardIntervalList);
PG_FUNCTION_INFO_V1(master_expire_table_cache);
/*
* master_expire_table_cache drops table's caches shards in all workers. The function
* expects a passed table to be a small distributed table meaning it has less than
* large_table_shard_count.
*/
Datum
master_expire_table_cache(PG_FUNCTION_ARGS)
{
Oid relationId = PG_GETARG_OID(0);
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(relationId);
List *workerNodeList = WorkerNodeList();
ListCell *workerNodeCell = NULL;
int shardCount = cacheEntry->shardIntervalArrayLength;
ShardInterval **shardIntervalArray = cacheEntry->sortedShardIntervalArray;
List **placementListArray = NULL;
int shardIndex = 0;
if (shardCount == 0)
{
ereport(WARNING, (errmsg("Table has no shards, no action is taken")));
PG_RETURN_VOID();
}
if (shardCount >= LargeTableShardCount)
{
ereport(ERROR, (errmsg("Must be called on tables smaller than %d shards",
LargeTableShardCount)));
}
placementListArray = palloc(shardCount * sizeof(List *));
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
placementListArray[shardIndex] =
FinalizedShardPlacementList(shardInterval->shardId);
}
foreach(workerNodeCell, workerNodeList)
{
WorkerNode *workerNode = (WorkerNode *) lfirst(workerNodeCell);
List *shardDropList = FindAbsentShardPlacementsOnWorker(workerNode,
shardIntervalArray,
placementListArray,
shardCount);
DropShardsFromWorker(workerNode, relationId, shardDropList);
}
pfree(placementListArray);
PG_RETURN_VOID();
}
/*
* FindAbsentShardPlacementsOnWorker compiles shard interval list of shards
* that do not have registered placement at given worker node.
*/
List *
FindAbsentShardPlacementsOnWorker(WorkerNode *workerNode,
ShardInterval **shardIntervalArray,
List **placementListArray, int shardCount)
{
List *absentShardIntervalList = NIL;
int shardIndex = 0;
for (shardIndex = 0; shardIndex < shardCount; shardIndex++)
{
ShardInterval *shardInterval = shardIntervalArray[shardIndex];
List *placementList = placementListArray[shardIndex];
ListCell *placementCell = NULL;
foreach(placementCell, placementList)
{
ShardPlacement *placement = (ShardPlacement *) lfirst(placementCell);
/*
* Append shard interval to absent list if none of its placements is on
* the worker.
*/
if (placement->nodePort == workerNode->workerPort &&
strncmp(placement->nodeName, workerNode->workerName, WORKER_LENGTH) == 0)
{
break;
}
else if (lnext(placementCell) == NULL)
{
absentShardIntervalList = lappend(absentShardIntervalList, shardInterval);
}
}
}
return absentShardIntervalList;
}
/*
* DropShardsFromWorker drops provided shards belonging to a relation from
* given worker. It does not change any metadata at the master.
*/
static void
DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardIntervalList)
{
Oid schemaId = get_rel_namespace(relationId);
char *schemaName = get_namespace_name(schemaId);
char *relationName = get_rel_name(relationId);
char relationKind = get_rel_relkind(relationId);
StringInfo shardName = makeStringInfo();
StringInfo workerCommand = makeStringInfo();
ListCell *shardIntervalCell = NULL;
if (shardIntervalList == NIL)
{
return;
}
if (relationKind == RELKIND_RELATION)
{
appendStringInfo(workerCommand, DROP_REGULAR_TABLE_COMMAND, "");
}
else if (relationKind == RELKIND_FOREIGN_TABLE)
{
appendStringInfo(workerCommand, DROP_FOREIGN_TABLE_COMMAND, "");
}
else
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("expire target is not a regular or foreign table")));
}
foreach(shardIntervalCell, shardIntervalList)
{
ShardInterval *shardInterval = (ShardInterval *) lfirst(shardIntervalCell);
char *quotedShardName = NULL;
resetStringInfo(shardName);
appendStringInfo(shardName, "%s", relationName);
AppendShardIdToStringInfo(shardName, shardInterval->shardId);
quotedShardName = quote_qualified_identifier(schemaName, shardName->data);
appendStringInfo(workerCommand, "%s", quotedShardName);
/* append a comma after the shard name if there are more shards */
if (lnext(shardIntervalCell) != NULL)
{
appendStringInfo(workerCommand, ", ");
}
}
appendStringInfo(workerCommand, " CASCADE");
ExecuteRemoteCommand(workerNode->workerName, workerNode->workerPort, workerCommand);
}

View File

@ -1002,6 +1002,65 @@ ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
} }
/*
* ExecuteRemoteCommand executes the given SQL command. This command could be an
* Insert, Update, or Delete statement, or a utility command that returns
* nothing. If query is successfuly executed, the function returns true.
* Otherwise, it returns false.
*/
bool
ExecuteRemoteCommand(const char *nodeName, uint32 nodePort, StringInfo queryString)
{
char *nodeDatabase = get_database_name(MyDatabaseId);
int32 connectionId = -1;
QueryStatus queryStatus = CLIENT_INVALID_QUERY;
bool querySent = false;
bool queryReady = false;
bool queryDone = false;
connectionId = MultiClientConnect(nodeName, nodePort, nodeDatabase, NULL);
if (connectionId == INVALID_CONNECTION_ID)
{
return false;
}
querySent = MultiClientSendQuery(connectionId, queryString->data);
if (!querySent)
{
MultiClientDisconnect(connectionId);
return false;
}
while (!queryReady)
{
ResultStatus resultStatus = MultiClientResultStatus(connectionId);
if (resultStatus == CLIENT_RESULT_READY)
{
queryReady = true;
}
else if (resultStatus == CLIENT_RESULT_BUSY)
{
long sleepIntervalPerCycle = RemoteTaskCheckInterval * 1000L;
pg_usleep(sleepIntervalPerCycle);
}
else
{
MultiClientDisconnect(connectionId);
return false;
}
}
queryStatus = MultiClientQueryStatus(connectionId);
if (queryStatus == CLIENT_QUERY_DONE)
{
queryDone = true;
}
MultiClientDisconnect(connectionId);
return queryDone;
}
/* /*
* Parses the given DDL command, and returns the tree node for parsed command. * Parses the given DDL command, and returns the tree node for parsed command.
*/ */

View File

@ -126,6 +126,8 @@ extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId); extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser, extern List * ExecuteRemoteQuery(const char *nodeName, uint32 nodePort, char *runAsUser,
StringInfo queryString); StringInfo queryString);
extern bool ExecuteRemoteCommand(const char *nodeName, uint32 nodePort,
StringInfo queryString);
extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList); extern List * ColumnDefinitionList(List *columnNameList, List *columnTypeList);
extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList); extern CreateStmt * CreateStatement(RangeVar *relation, List *columnDefinitionList);
extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename); extern CopyStmt * CopyStatement(RangeVar *relation, char *sourceFilename);

View File

@ -0,0 +1,109 @@
---
--- MULTI_EXPIRE_TABLE_CACHE
---
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- create test table
CREATE TABLE large_table(a int, b int);
SELECT master_create_distributed_table('large_table', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('large_table', 8, 1);
master_create_worker_shards
-----------------------------
(1 row)
CREATE TABLE broadcast_table(a int, b int);
SELECT master_create_distributed_table('broadcast_table', 'a', 'hash');
master_create_distributed_table
---------------------------------
(1 row)
SELECT master_create_worker_shards('broadcast_table', 2, 1);
master_create_worker_shards
-----------------------------
(1 row)
-- verify only small tables are supported
SELECT master_expire_table_cache('large_table');
ERROR: Must be called on tables smaller than 4 shards
SELECT master_expire_table_cache('broadcast_table');
master_expire_table_cache
---------------------------
(1 row)
-- run a join so that broadcast tables are cached on other workers
SELECT * from large_table l, broadcast_table b where l.a = b.b;
a | b | a | b
---+---+---+---
(0 rows)
-- insert some data
INSERT INTO large_table VALUES(1, 1);
INSERT INTO large_table VALUES(1, 2);
INSERT INTO large_table VALUES(2, 1);
INSERT INTO large_table VALUES(2, 2);
INSERT INTO large_table VALUES(3, 1);
INSERT INTO large_table VALUES(3, 2);
INSERT INTO broadcast_table VALUES(1, 1);
-- verify returned results are wrong
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
a | b | a | b
---+---+---+---
1 | 1 | 1 | 1
2 | 1 | 1 | 1
(2 rows)
-- expire cache and re-run, results should be correct this time
SELECT master_expire_table_cache('broadcast_table');
master_expire_table_cache
---------------------------
(1 row)
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
a | b | a | b
---+---+---+---
1 | 1 | 1 | 1
2 | 1 | 1 | 1
3 | 1 | 1 | 1
(3 rows)
-- insert some more data into broadcast table
INSERT INTO broadcast_table VALUES(2, 2);
-- run the same query, get wrong results
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
a | b | a | b
---+---+---+---
1 | 1 | 1 | 1
2 | 1 | 1 | 1
3 | 1 | 1 | 1
3 | 2 | 2 | 2
(4 rows)
-- expire cache and re-run, results should be correct this time
SELECT master_expire_table_cache('broadcast_table');
master_expire_table_cache
---------------------------
(1 row)
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
a | b | a | b
---+---+---+---
1 | 1 | 1 | 1
1 | 2 | 2 | 2
2 | 1 | 1 | 1
2 | 2 | 2 | 2
3 | 1 | 1 | 1
3 | 2 | 2 | 2
(6 rows)
DROP TABLE large_table, broadcast_table;

View File

@ -1549,7 +1549,7 @@ DROP MATERIALIZED VIEW mv_articles_hash;
DEBUG: drop auto-cascades to type mv_articles_hash DEBUG: drop auto-cascades to type mv_articles_hash
DEBUG: drop auto-cascades to type mv_articles_hash[] DEBUG: drop auto-cascades to type mv_articles_hash[]
DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash DEBUG: drop auto-cascades to rule _RETURN on materialized view mv_articles_hash
DEBUG: EventTriggerInvoke 16729 DEBUG: EventTriggerInvoke 16733
CREATE MATERIALIZED VIEW mv_articles_hash_error AS CREATE MATERIALIZED VIEW mv_articles_hash_error AS
SELECT * FROM articles_hash WHERE author_id in (1,2); SELECT * FROM articles_hash WHERE author_id in (1,2);
NOTICE: cannot use shard pruning with ANY/ALL (array expression) NOTICE: cannot use shard pruning with ANY/ALL (array expression)

View File

@ -165,3 +165,7 @@ test: multi_function_evaluation
# multi_truncate tests truncate functionality for distributed tables # multi_truncate tests truncate functionality for distributed tables
# ---------- # ----------
test: multi_truncate test: multi_truncate
# ----------
# multi_expire_table_cache tests for broadcast tables
# ----------
test: multi_expire_table_cache

View File

@ -0,0 +1,54 @@
---
--- MULTI_EXPIRE_TABLE_CACHE
---
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1220000;
ALTER SEQUENCE pg_catalog.pg_dist_jobid_seq RESTART 1220000;
-- create test table
CREATE TABLE large_table(a int, b int);
SELECT master_create_distributed_table('large_table', 'a', 'hash');
SELECT master_create_worker_shards('large_table', 8, 1);
CREATE TABLE broadcast_table(a int, b int);
SELECT master_create_distributed_table('broadcast_table', 'a', 'hash');
SELECT master_create_worker_shards('broadcast_table', 2, 1);
-- verify only small tables are supported
SELECT master_expire_table_cache('large_table');
SELECT master_expire_table_cache('broadcast_table');
-- run a join so that broadcast tables are cached on other workers
SELECT * from large_table l, broadcast_table b where l.a = b.b;
-- insert some data
INSERT INTO large_table VALUES(1, 1);
INSERT INTO large_table VALUES(1, 2);
INSERT INTO large_table VALUES(2, 1);
INSERT INTO large_table VALUES(2, 2);
INSERT INTO large_table VALUES(3, 1);
INSERT INTO large_table VALUES(3, 2);
INSERT INTO broadcast_table VALUES(1, 1);
-- verify returned results are wrong
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
-- expire cache and re-run, results should be correct this time
SELECT master_expire_table_cache('broadcast_table');
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
-- insert some more data into broadcast table
INSERT INTO broadcast_table VALUES(2, 2);
-- run the same query, get wrong results
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
-- expire cache and re-run, results should be correct this time
SELECT master_expire_table_cache('broadcast_table');
SELECT * from large_table l, broadcast_table b WHERE l.b = b.b ORDER BY l.a, l.b;
DROP TABLE large_table, broadcast_table;