Add worker_execute_sql_task UDF

pull/2487/head
Marco Slot 2018-11-22 01:08:47 +01:00
parent e3521ce320
commit 30bad7e66f
7 changed files with 51 additions and 3 deletions

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-8'
default_version = '8.0-10'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -17,7 +17,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
7.3-1 7.3-2 7.3-3 \
7.4-1 7.4-2 7.4-3 \
7.5-1 7.5-2 7.5-3 7.5-4 7.5-5 7.5-6 7.5-7 \
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9
8.0-1 8.0-2 8.0-3 8.0-4 8.0-5 8.0-6 8.0-7 8.0-8 8.0-9 8.0-10
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -233,6 +233,8 @@ $(EXTENSION)--8.0-8.sql: $(EXTENSION)--8.0-7.sql $(EXTENSION)--8.0-7--8.0-8.sql
cat $^ > $@
$(EXTENSION)--8.0-9.sql: $(EXTENSION)--8.0-8.sql $(EXTENSION)--8.0-8--8.0-9.sql
cat $^ > $@
$(EXTENSION)--8.0-10.sql: $(EXTENSION)--8.0-9.sql $(EXTENSION)--8.0-9--8.0-10.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,11 @@
/* citus--8.0-9--8.0-10 */
SET search_path = 'pg_catalog';
CREATE FUNCTION worker_execute_sql_task(jobid bigint, taskid integer, query text, binary bool)
RETURNS bigint
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_execute_sql_task$$;
COMMENT ON FUNCTION worker_execute_sql_task(bigint, integer, text, bool)
IS 'execute a query and write the results to a task file';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '8.0-9'
default_version = '8.0-10'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -60,6 +60,37 @@ static void TaskFileDestReceiverShutdown(DestReceiver *destReceiver);
static void TaskFileDestReceiverDestroy(DestReceiver *destReceiver);
/* exports for SQL callable functions */
PG_FUNCTION_INFO_V1(worker_execute_sql_task);
/*
* worker_execute_sql_task executes a query and writes the results to
* a file according to the usual task naming scheme.
*/
Datum
worker_execute_sql_task(PG_FUNCTION_ARGS)
{
uint64 jobId = PG_GETARG_INT64(0);
uint32 taskId = PG_GETARG_UINT32(1);
text *queryText = PG_GETARG_TEXT_P(2);
char *queryString = text_to_cstring(queryText);
bool binaryCopyFormat = PG_GETARG_BOOL(3);
int64 tuplesSent = 0;
Query *query = NULL;
/* job directory is created prior to scheduling the task */
StringInfo jobDirectoryName = JobDirectoryName(jobId);
StringInfo taskFilename = TaskFilename(jobDirectoryName, taskId);
query = ParseQueryString(queryString);
tuplesSent = WorkerExecuteSqlTask(query, taskFilename->data, binaryCopyFormat);
PG_RETURN_INT64(tuplesSent);
}
/*
* WorkerExecuteSqlTask executes an already-parsed query and writes the result
* to the given task file.

View File

@ -151,6 +151,8 @@ ALTER EXTENSION citus UPDATE TO '8.0-5';
ALTER EXTENSION citus UPDATE TO '8.0-6';
ALTER EXTENSION citus UPDATE TO '8.0-7';
ALTER EXTENSION citus UPDATE TO '8.0-8';
ALTER EXTENSION citus UPDATE TO '8.0-9';
ALTER EXTENSION citus UPDATE TO '8.0-10';
-- show running version
SHOW citus.version;
citus.version

View File

@ -151,6 +151,8 @@ ALTER EXTENSION citus UPDATE TO '8.0-5';
ALTER EXTENSION citus UPDATE TO '8.0-6';
ALTER EXTENSION citus UPDATE TO '8.0-7';
ALTER EXTENSION citus UPDATE TO '8.0-8';
ALTER EXTENSION citus UPDATE TO '8.0-9';
ALTER EXTENSION citus UPDATE TO '8.0-10';
-- show running version
SHOW citus.version;