diff --git a/src/backend/distributed/test/file_utils.c b/src/backend/distributed/test/file_utils.c new file mode 100644 index 000000000..02ecaa0bd --- /dev/null +++ b/src/backend/distributed/test/file_utils.c @@ -0,0 +1,30 @@ +#include "postgres.h" + +#include "distributed/worker_protocol.h" +#include "distributed/transmit.h" +#include "distributed/master_metadata_utility.h" +#include "fmgr.h" +#include "lib/stringinfo.h" + +PG_FUNCTION_INFO_V1(citus_rm_job_directory); + +/* + * citus_rm_job_directory removes the job directory for the given job id. + * Used before beginning multi_query_directory_cleanup. + */ +Datum +citus_rm_job_directory(PG_FUNCTION_ARGS) +{ + uint64 jobId = PG_GETARG_INT64(0); + StringInfo jobCacheDirectory = makeStringInfo(); + + EnsureSuperUser(); + + appendStringInfo(jobCacheDirectory, "base/%s/%s%0*" INT64_MODIFIER "u", + PG_JOB_CACHE_DIR, JOB_DIRECTORY_PREFIX, + MIN_JOB_DIRNAME_WIDTH, jobId); + CitusRemoveDirectory(jobCacheDirectory); + FreeStringInfo(jobCacheDirectory); + + PG_RETURN_VOID(); +} diff --git a/src/backend/distributed/worker/worker_partition_protocol.c b/src/backend/distributed/worker/worker_partition_protocol.c index 962e4ea7c..d22ffe1d7 100644 --- a/src/backend/distributed/worker/worker_partition_protocol.c +++ b/src/backend/distributed/worker/worker_partition_protocol.c @@ -917,8 +917,13 @@ FilterAndPartitionTable(const char *filterQuery, if (SPI_processed > 0) { TupleDesc rowDescriptor = SPI_tuptable->tupdesc; - partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName); + if (fileCount == 0) + { + ereport(ERROR, (errmsg("no partition to read into"))); + } + + partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName); partitionColumnTypeId = SPI_gettypeid(rowDescriptor, partitionColumnIndex); if (partitionColumnType != partitionColumnTypeId) { @@ -942,6 +947,7 @@ FilterAndPartitionTable(const char *filterQuery, while (SPI_processed > 0) { int rowIndex = 0; + for (rowIndex = 0; rowIndex < SPI_processed; rowIndex++) { HeapTuple row = SPI_tuptable->vals[rowIndex]; @@ -964,6 +970,10 @@ FilterAndPartitionTable(const char *filterQuery, if (!partitionKeyNull) { partitionId = (*PartitionIdFunction)(partitionKey, partitionIdContext); + if (partitionId == INVALID_SHARD_INDEX) + { + ereport(ERROR, (errmsg("invalid distribution column value"))); + } } else { diff --git a/src/test/regress/expected/intermediate_results.out b/src/test/regress/expected/intermediate_results.out index d0dc1a6c3..bb4a7671a 100644 --- a/src/test/regress/expected/intermediate_results.out +++ b/src/test/regress/expected/intermediate_results.out @@ -256,6 +256,9 @@ SELECT * FROM squares ORDER BY x; 5 | 25 (5 rows) +-- empty shard interval array should raise error +SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]); +ERROR: invalid distribution column value DROP SCHEMA intermediate_results CASCADE; NOTICE: drop cascades to 5 other objects DETAIL: drop cascades to table interesting_squares diff --git a/src/test/regress/expected/multi_query_directory_cleanup.out b/src/test/regress/expected/multi_query_directory_cleanup.out index 12d9c6cd3..5d3e38f5e 100644 --- a/src/test/regress/expected/multi_query_directory_cleanup.out +++ b/src/test/regress/expected/multi_query_directory_cleanup.out @@ -7,6 +7,21 @@ -- result files. SET citus.next_shard_id TO 810000; SET citus.enable_unique_job_ids TO off; +CREATE FUNCTION citus_rm_job_directory(bigint) + RETURNS void + AS 'citus' + LANGUAGE C STRICT; +with silence as ( + SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint) + from pg_ls_dir('base/pgsql_job_cache') f +) +select count(*) * 0 zero +from silence; + zero +------ + 0 +(1 row) + BEGIN; -- pg_ls_dir() displays jobids. We explicitly set the jobId sequence -- here so that the regression output becomes independent of the diff --git a/src/test/regress/expected/multi_query_directory_cleanup_0.out b/src/test/regress/expected/multi_query_directory_cleanup_0.out index 8547afd99..ed98c36b4 100644 --- a/src/test/regress/expected/multi_query_directory_cleanup_0.out +++ b/src/test/regress/expected/multi_query_directory_cleanup_0.out @@ -7,6 +7,21 @@ -- result files. SET citus.next_shard_id TO 810000; SET citus.enable_unique_job_ids TO off; +CREATE FUNCTION citus_rm_job_directory(bigint) + RETURNS void + AS 'citus' + LANGUAGE C STRICT; +with silence as ( + SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint) + from pg_ls_dir('base/pgsql_job_cache') f +) +select count(*) * 0 zero +from silence; + zero +------ + 0 +(1 row) + BEGIN; -- pg_ls_dir() displays jobids. We explicitly set the jobId sequence -- here so that the regression output becomes independent of the diff --git a/src/test/regress/sql/intermediate_results.sql b/src/test/regress/sql/intermediate_results.sql index 512fcd4cb..3c2f27eef 100644 --- a/src/test/regress/sql/intermediate_results.sql +++ b/src/test/regress/sql/intermediate_results.sql @@ -141,4 +141,7 @@ WITH (FORMAT text); SELECT * FROM squares ORDER BY x; +-- empty shard interval array should raise error +SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]); + DROP SCHEMA intermediate_results CASCADE; diff --git a/src/test/regress/sql/multi_query_directory_cleanup.sql b/src/test/regress/sql/multi_query_directory_cleanup.sql index cde747bf8..c024670f3 100644 --- a/src/test/regress/sql/multi_query_directory_cleanup.sql +++ b/src/test/regress/sql/multi_query_directory_cleanup.sql @@ -11,6 +11,17 @@ SET citus.next_shard_id TO 810000; SET citus.enable_unique_job_ids TO off; +CREATE FUNCTION citus_rm_job_directory(bigint) + RETURNS void + AS 'citus' + LANGUAGE C STRICT; + +with silence as ( + SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint) + from pg_ls_dir('base/pgsql_job_cache') f +) +select count(*) * 0 zero +from silence; BEGIN;