mirror of https://github.com/citusdata/citus.git
Merge pull request #2890 from citusdata/check_shard_interval_search_fail
Avoid invalid array accesses to partitionFileArraypull/2897/head
commit
bc7a76d139
|
@ -0,0 +1,30 @@
|
||||||
|
#include "postgres.h"
|
||||||
|
|
||||||
|
#include "distributed/worker_protocol.h"
|
||||||
|
#include "distributed/transmit.h"
|
||||||
|
#include "distributed/master_metadata_utility.h"
|
||||||
|
#include "fmgr.h"
|
||||||
|
#include "lib/stringinfo.h"
|
||||||
|
|
||||||
|
PG_FUNCTION_INFO_V1(citus_rm_job_directory);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* citus_rm_job_directory removes the job directory for the given job id.
|
||||||
|
* Used before beginning multi_query_directory_cleanup.
|
||||||
|
*/
|
||||||
|
Datum
|
||||||
|
citus_rm_job_directory(PG_FUNCTION_ARGS)
|
||||||
|
{
|
||||||
|
uint64 jobId = PG_GETARG_INT64(0);
|
||||||
|
StringInfo jobCacheDirectory = makeStringInfo();
|
||||||
|
|
||||||
|
EnsureSuperUser();
|
||||||
|
|
||||||
|
appendStringInfo(jobCacheDirectory, "base/%s/%s%0*" INT64_MODIFIER "u",
|
||||||
|
PG_JOB_CACHE_DIR, JOB_DIRECTORY_PREFIX,
|
||||||
|
MIN_JOB_DIRNAME_WIDTH, jobId);
|
||||||
|
CitusRemoveDirectory(jobCacheDirectory);
|
||||||
|
FreeStringInfo(jobCacheDirectory);
|
||||||
|
|
||||||
|
PG_RETURN_VOID();
|
||||||
|
}
|
|
@ -917,8 +917,13 @@ FilterAndPartitionTable(const char *filterQuery,
|
||||||
if (SPI_processed > 0)
|
if (SPI_processed > 0)
|
||||||
{
|
{
|
||||||
TupleDesc rowDescriptor = SPI_tuptable->tupdesc;
|
TupleDesc rowDescriptor = SPI_tuptable->tupdesc;
|
||||||
partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName);
|
|
||||||
|
|
||||||
|
if (fileCount == 0)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("no partition to read into")));
|
||||||
|
}
|
||||||
|
|
||||||
|
partitionColumnIndex = ColumnIndex(rowDescriptor, partitionColumnName);
|
||||||
partitionColumnTypeId = SPI_gettypeid(rowDescriptor, partitionColumnIndex);
|
partitionColumnTypeId = SPI_gettypeid(rowDescriptor, partitionColumnIndex);
|
||||||
if (partitionColumnType != partitionColumnTypeId)
|
if (partitionColumnType != partitionColumnTypeId)
|
||||||
{
|
{
|
||||||
|
@ -942,6 +947,7 @@ FilterAndPartitionTable(const char *filterQuery,
|
||||||
while (SPI_processed > 0)
|
while (SPI_processed > 0)
|
||||||
{
|
{
|
||||||
int rowIndex = 0;
|
int rowIndex = 0;
|
||||||
|
|
||||||
for (rowIndex = 0; rowIndex < SPI_processed; rowIndex++)
|
for (rowIndex = 0; rowIndex < SPI_processed; rowIndex++)
|
||||||
{
|
{
|
||||||
HeapTuple row = SPI_tuptable->vals[rowIndex];
|
HeapTuple row = SPI_tuptable->vals[rowIndex];
|
||||||
|
@ -964,6 +970,10 @@ FilterAndPartitionTable(const char *filterQuery,
|
||||||
if (!partitionKeyNull)
|
if (!partitionKeyNull)
|
||||||
{
|
{
|
||||||
partitionId = (*PartitionIdFunction)(partitionKey, partitionIdContext);
|
partitionId = (*PartitionIdFunction)(partitionKey, partitionIdContext);
|
||||||
|
if (partitionId == INVALID_SHARD_INDEX)
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("invalid distribution column value")));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
|
|
@ -256,6 +256,9 @@ SELECT * FROM squares ORDER BY x;
|
||||||
5 | 25
|
5 | 25
|
||||||
(5 rows)
|
(5 rows)
|
||||||
|
|
||||||
|
-- empty shard interval array should raise error
|
||||||
|
SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]);
|
||||||
|
ERROR: invalid distribution column value
|
||||||
DROP SCHEMA intermediate_results CASCADE;
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
NOTICE: drop cascades to 5 other objects
|
NOTICE: drop cascades to 5 other objects
|
||||||
DETAIL: drop cascades to table interesting_squares
|
DETAIL: drop cascades to table interesting_squares
|
||||||
|
|
|
@ -7,6 +7,21 @@
|
||||||
-- result files.
|
-- result files.
|
||||||
SET citus.next_shard_id TO 810000;
|
SET citus.next_shard_id TO 810000;
|
||||||
SET citus.enable_unique_job_ids TO off;
|
SET citus.enable_unique_job_ids TO off;
|
||||||
|
CREATE FUNCTION citus_rm_job_directory(bigint)
|
||||||
|
RETURNS void
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
with silence as (
|
||||||
|
SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint)
|
||||||
|
from pg_ls_dir('base/pgsql_job_cache') f
|
||||||
|
)
|
||||||
|
select count(*) * 0 zero
|
||||||
|
from silence;
|
||||||
|
zero
|
||||||
|
------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- pg_ls_dir() displays jobids. We explicitly set the jobId sequence
|
-- pg_ls_dir() displays jobids. We explicitly set the jobId sequence
|
||||||
-- here so that the regression output becomes independent of the
|
-- here so that the regression output becomes independent of the
|
||||||
|
|
|
@ -7,6 +7,21 @@
|
||||||
-- result files.
|
-- result files.
|
||||||
SET citus.next_shard_id TO 810000;
|
SET citus.next_shard_id TO 810000;
|
||||||
SET citus.enable_unique_job_ids TO off;
|
SET citus.enable_unique_job_ids TO off;
|
||||||
|
CREATE FUNCTION citus_rm_job_directory(bigint)
|
||||||
|
RETURNS void
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
with silence as (
|
||||||
|
SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint)
|
||||||
|
from pg_ls_dir('base/pgsql_job_cache') f
|
||||||
|
)
|
||||||
|
select count(*) * 0 zero
|
||||||
|
from silence;
|
||||||
|
zero
|
||||||
|
------
|
||||||
|
0
|
||||||
|
(1 row)
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
-- pg_ls_dir() displays jobids. We explicitly set the jobId sequence
|
-- pg_ls_dir() displays jobids. We explicitly set the jobId sequence
|
||||||
-- here so that the regression output becomes independent of the
|
-- here so that the regression output becomes independent of the
|
||||||
|
|
|
@ -141,4 +141,7 @@ WITH (FORMAT text);
|
||||||
|
|
||||||
SELECT * FROM squares ORDER BY x;
|
SELECT * FROM squares ORDER BY x;
|
||||||
|
|
||||||
|
-- empty shard interval array should raise error
|
||||||
|
SELECT worker_hash_partition_table(42,1,'SELECT a FROM generate_series(1,100) AS a', 'a', 23, ARRAY[0]);
|
||||||
|
|
||||||
DROP SCHEMA intermediate_results CASCADE;
|
DROP SCHEMA intermediate_results CASCADE;
|
||||||
|
|
|
@ -11,6 +11,17 @@
|
||||||
SET citus.next_shard_id TO 810000;
|
SET citus.next_shard_id TO 810000;
|
||||||
SET citus.enable_unique_job_ids TO off;
|
SET citus.enable_unique_job_ids TO off;
|
||||||
|
|
||||||
|
CREATE FUNCTION citus_rm_job_directory(bigint)
|
||||||
|
RETURNS void
|
||||||
|
AS 'citus'
|
||||||
|
LANGUAGE C STRICT;
|
||||||
|
|
||||||
|
with silence as (
|
||||||
|
SELECT citus_rm_job_directory(split_part(f, '_', 2)::bigint)
|
||||||
|
from pg_ls_dir('base/pgsql_job_cache') f
|
||||||
|
)
|
||||||
|
select count(*) * 0 zero
|
||||||
|
from silence;
|
||||||
|
|
||||||
BEGIN;
|
BEGIN;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue