mirror of https://github.com/citusdata/citus.git
Merge branch 'main' into move_remaining_functions2
commit
63ccdac6bf
|
@ -2663,7 +2663,6 @@ CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
|
||||||
CreateIntermediateResultsDirectory();
|
CreateIntermediateResultsDirectory();
|
||||||
|
|
||||||
const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC);
|
const int fileFlags = (O_CREAT | O_RDWR | O_TRUNC);
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
|
||||||
|
|
||||||
StringInfo filePath = makeStringInfo();
|
StringInfo filePath = makeStringInfo();
|
||||||
appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix,
|
appendStringInfo(filePath, "%s_%ld", copyDest->colocatedIntermediateResultIdPrefix,
|
||||||
|
@ -2671,7 +2670,7 @@ CreateLocalColocatedIntermediateFile(CitusCopyDestReceiver *copyDest,
|
||||||
|
|
||||||
const char *fileName = QueryResultFileName(filePath->data);
|
const char *fileName = QueryResultFileName(filePath->data);
|
||||||
shardState->fileDest =
|
shardState->fileDest =
|
||||||
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags, fileMode));
|
FileCompatFromFileStart(FileOpenForTransmit(fileName, fileFlags));
|
||||||
|
|
||||||
CopyOutState localFileCopyOutState = shardState->copyOutState;
|
CopyOutState localFileCopyOutState = shardState->copyOutState;
|
||||||
bool isBinaryCopy = localFileCopyOutState->binary;
|
bool isBinaryCopy = localFileCopyOutState->binary;
|
||||||
|
|
|
@ -295,7 +295,6 @@ PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest)
|
||||||
if (resultDest->writeLocalFile)
|
if (resultDest->writeLocalFile)
|
||||||
{
|
{
|
||||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
|
||||||
|
|
||||||
/* make sure the directory exists */
|
/* make sure the directory exists */
|
||||||
CreateIntermediateResultsDirectory();
|
CreateIntermediateResultsDirectory();
|
||||||
|
@ -303,8 +302,7 @@ PrepareIntermediateResultBroadcast(RemoteFileDestReceiver *resultDest)
|
||||||
const char *fileName = QueryResultFileName(resultId);
|
const char *fileName = QueryResultFileName(resultId);
|
||||||
|
|
||||||
resultDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(fileName,
|
resultDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(fileName,
|
||||||
fileFlags,
|
fileFlags));
|
||||||
fileMode));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
WorkerNode *workerNode = NULL;
|
WorkerNode *workerNode = NULL;
|
||||||
|
@ -606,7 +604,7 @@ CreateIntermediateResultsDirectory(void)
|
||||||
{
|
{
|
||||||
char *resultDirectory = IntermediateResultsDirectory();
|
char *resultDirectory = IntermediateResultsDirectory();
|
||||||
|
|
||||||
int makeOK = mkdir(resultDirectory, S_IRWXU);
|
int makeOK = MakePGDirectory(resultDirectory);
|
||||||
if (makeOK != 0)
|
if (makeOK != 0)
|
||||||
{
|
{
|
||||||
if (errno == EEXIST)
|
if (errno == EEXIST)
|
||||||
|
@ -976,7 +974,6 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId)
|
||||||
|
|
||||||
StringInfo copyCommand = makeStringInfo();
|
StringInfo copyCommand = makeStringInfo();
|
||||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
|
||||||
|
|
||||||
PGconn *pgConn = connection->pgConn;
|
PGconn *pgConn = connection->pgConn;
|
||||||
int socket = PQsocket(pgConn);
|
int socket = PQsocket(pgConn);
|
||||||
|
@ -998,7 +995,7 @@ FetchRemoteIntermediateResult(MultiConnection *connection, char *resultId)
|
||||||
|
|
||||||
PQclear(result);
|
PQclear(result);
|
||||||
|
|
||||||
File fileDesc = FileOpenForTransmit(localPath, fileFlags, fileMode);
|
File fileDesc = FileOpenForTransmit(localPath, fileFlags);
|
||||||
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
#include "pgstat.h"
|
#include "pgstat.h"
|
||||||
|
|
||||||
#include "commands/defrem.h"
|
#include "commands/defrem.h"
|
||||||
|
#include "common/file_perm.h"
|
||||||
#include "libpq/libpq.h"
|
#include "libpq/libpq.h"
|
||||||
#include "libpq/pqformat.h"
|
#include "libpq/pqformat.h"
|
||||||
#include "storage/fd.h"
|
#include "storage/fd.h"
|
||||||
|
@ -48,8 +49,7 @@ RedirectCopyDataToRegularFile(const char *filename)
|
||||||
{
|
{
|
||||||
StringInfo copyData = makeStringInfo();
|
StringInfo copyData = makeStringInfo();
|
||||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
File fileDesc = FileOpenForTransmit(filename, fileFlags);
|
||||||
File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
|
|
||||||
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
||||||
|
|
||||||
SendCopyInStart();
|
SendCopyInStart();
|
||||||
|
@ -92,7 +92,7 @@ SendRegularFile(const char *filename)
|
||||||
const int fileMode = 0;
|
const int fileMode = 0;
|
||||||
|
|
||||||
/* we currently do not check if the caller has permissions for this file */
|
/* we currently do not check if the caller has permissions for this file */
|
||||||
File fileDesc = FileOpenForTransmit(filename, fileFlags, fileMode);
|
File fileDesc = FileOpenForTransmitPerm(filename, fileFlags, fileMode);
|
||||||
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
FileCompat fileCompat = FileCompatFromFileStart(fileDesc);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -136,12 +136,23 @@ FreeStringInfo(StringInfo stringInfo)
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* FileOpenForTransmit opens file with the given filename and flags. On success,
|
* Open a file with FileOpenForTransmitPerm() and pass default file mode for
|
||||||
* the function returns the internal file handle for the opened file. On failure
|
* the fileMode parameter.
|
||||||
* the function errors out.
|
|
||||||
*/
|
*/
|
||||||
File
|
File
|
||||||
FileOpenForTransmit(const char *filename, int fileFlags, int fileMode)
|
FileOpenForTransmit(const char *filename, int fileFlags)
|
||||||
|
{
|
||||||
|
return FileOpenForTransmitPerm(filename, fileFlags, pg_file_create_mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* FileOpenForTransmitPerm opens file with the given filename and flags. On
|
||||||
|
* success, the function returns the internal file handle for the opened file.
|
||||||
|
* On failure the function errors out.
|
||||||
|
*/
|
||||||
|
File
|
||||||
|
FileOpenForTransmitPerm(const char *filename, int fileFlags, int fileMode)
|
||||||
{
|
{
|
||||||
struct stat fileStat;
|
struct stat fileStat;
|
||||||
|
|
||||||
|
|
|
@ -294,6 +294,17 @@ citus_move_shard_placement(PG_FUNCTION_ARGS)
|
||||||
CheckCitusVersion(ERROR);
|
CheckCitusVersion(ERROR);
|
||||||
EnsureCoordinator();
|
EnsureCoordinator();
|
||||||
|
|
||||||
|
List *referenceTableIdList = NIL;
|
||||||
|
|
||||||
|
if (HasNodesWithMissingReferenceTables(&referenceTableIdList))
|
||||||
|
{
|
||||||
|
ereport(ERROR, (errmsg("there are missing reference tables on some nodes"),
|
||||||
|
errhint("Copy reference tables first with "
|
||||||
|
"replicate_reference_tables() or use "
|
||||||
|
"citus_rebalance_start() that will do it automatically."
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
int64 shardId = PG_GETARG_INT64(0);
|
int64 shardId = PG_GETARG_INT64(0);
|
||||||
char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
|
char *sourceNodeName = text_to_cstring(PG_GETARG_TEXT_P(1));
|
||||||
int32 sourceNodePort = PG_GETARG_INT32(2);
|
int32 sourceNodePort = PG_GETARG_INT32(2);
|
||||||
|
|
|
@ -895,22 +895,13 @@ DecrementExternalClientBackendCounterAtExit(int code, Datum arg)
|
||||||
static void
|
static void
|
||||||
CreateRequiredDirectories(void)
|
CreateRequiredDirectories(void)
|
||||||
{
|
{
|
||||||
const char *subdirs[] = {
|
const char *subdir = ("base/" PG_JOB_CACHE_DIR);
|
||||||
"pg_foreign_file",
|
|
||||||
"pg_foreign_file/cached",
|
|
||||||
("base/" PG_JOB_CACHE_DIR)
|
|
||||||
};
|
|
||||||
|
|
||||||
for (int dirNo = 0; dirNo < lengthof(subdirs); dirNo++)
|
if (MakePGDirectory(subdir) != 0 && errno != EEXIST)
|
||||||
{
|
{
|
||||||
int ret = mkdir(subdirs[dirNo], S_IRWXU);
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
errmsg("could not create directory \"%s\": %m",
|
||||||
if (ret != 0 && errno != EEXIST)
|
subdir)));
|
||||||
{
|
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
|
||||||
errmsg("could not create directory \"%s\": %m",
|
|
||||||
subdirs[dirNo])));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ static bool FileIsLink(const char *filename, struct stat filestat);
|
||||||
void
|
void
|
||||||
CitusCreateDirectory(StringInfo directoryName)
|
CitusCreateDirectory(StringInfo directoryName)
|
||||||
{
|
{
|
||||||
int makeOK = mkdir(directoryName->data, S_IRWXU);
|
int makeOK = MakePGDirectory(directoryName->data);
|
||||||
if (makeOK != 0)
|
if (makeOK != 0)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errcode_for_file_access(),
|
ereport(ERROR, (errcode_for_file_access(),
|
||||||
|
|
|
@ -126,7 +126,6 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
const char *nullPrintCharacter = "\\N";
|
const char *nullPrintCharacter = "\\N";
|
||||||
|
|
||||||
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
const int fileFlags = (O_APPEND | O_CREAT | O_RDWR | O_TRUNC | PG_BINARY);
|
||||||
const int fileMode = (S_IRUSR | S_IWUSR);
|
|
||||||
|
|
||||||
/* use the memory context that was in place when the DestReceiver was created */
|
/* use the memory context that was in place when the DestReceiver was created */
|
||||||
MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext);
|
MemoryContext oldContext = MemoryContextSwitchTo(taskFileDest->memoryContext);
|
||||||
|
@ -148,8 +147,7 @@ TaskFileDestReceiverStartup(DestReceiver *dest, int operation,
|
||||||
|
|
||||||
taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(
|
taskFileDest->fileCompat = FileCompatFromFileStart(FileOpenForTransmit(
|
||||||
taskFileDest->filePath,
|
taskFileDest->filePath,
|
||||||
fileFlags,
|
fileFlags));
|
||||||
fileMode));
|
|
||||||
|
|
||||||
if (copyOutState->binary)
|
if (copyOutState->binary)
|
||||||
{
|
{
|
||||||
|
|
|
@ -21,7 +21,8 @@
|
||||||
/* Function declarations for transmitting files between two nodes */
|
/* Function declarations for transmitting files between two nodes */
|
||||||
extern void RedirectCopyDataToRegularFile(const char *filename);
|
extern void RedirectCopyDataToRegularFile(const char *filename);
|
||||||
extern void SendRegularFile(const char *filename);
|
extern void SendRegularFile(const char *filename);
|
||||||
extern File FileOpenForTransmit(const char *filename, int fileFlags, int fileMode);
|
extern File FileOpenForTransmit(const char *filename, int fileFlags);
|
||||||
|
extern File FileOpenForTransmitPerm(const char *filename, int fileFlags, int fileMode);
|
||||||
|
|
||||||
|
|
||||||
#endif /* TRANSMIT_H */
|
#endif /* TRANSMIT_H */
|
||||||
|
|
|
@ -2395,6 +2395,74 @@ SELECT count(*) FROM pg_dist_partition;
|
||||||
0
|
0
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
-- verify a system with a new node won't copy distributed table shards without reference tables
|
||||||
|
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
SELECT public.wait_until_metadata_sync(30000);
|
||||||
|
wait_until_metadata_sync
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE r1 (a int PRIMARY KEY, b int);
|
||||||
|
SELECT create_reference_table('r1');
|
||||||
|
create_reference_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
CREATE TABLE d1 (a int PRIMARY KEY, b int);
|
||||||
|
SELECT create_distributed_table('d1', 'a');
|
||||||
|
create_distributed_table
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 15;
|
||||||
|
SELECT 1 from master_add_node('localhost', :worker_2_port);
|
||||||
|
?column?
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- count the number of placements for the reference table to verify it is not available on
|
||||||
|
-- all nodes
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard
|
||||||
|
JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'r1'::regclass;
|
||||||
|
count
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
1
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- #7426 We can't move shards to the fresh node before we copy reference tables there.
|
||||||
|
-- rebalance_table_shards() will do the copy, but the low-level
|
||||||
|
-- citus_move_shard_placement() should raise an error
|
||||||
|
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
|
||||||
|
ERROR: there are missing reference tables on some nodes
|
||||||
|
SELECT replicate_reference_tables();
|
||||||
|
replicate_reference_tables
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
-- After replication, the move should succeed.
|
||||||
|
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
|
||||||
|
citus_move_shard_placement
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
|
||||||
|
(1 row)
|
||||||
|
|
||||||
|
DROP TABLE d1, r1;
|
||||||
-- verify a system having only reference tables will copy the reference tables when
|
-- verify a system having only reference tables will copy the reference tables when
|
||||||
-- executing the rebalancer
|
-- executing the rebalancer
|
||||||
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
||||||
|
|
|
@ -1340,6 +1340,43 @@ DROP TABLE t1, r1, r2;
|
||||||
-- test suites should clean up their distributed tables.
|
-- test suites should clean up their distributed tables.
|
||||||
SELECT count(*) FROM pg_dist_partition;
|
SELECT count(*) FROM pg_dist_partition;
|
||||||
|
|
||||||
|
-- verify a system with a new node won't copy distributed table shards without reference tables
|
||||||
|
|
||||||
|
SELECT 1 from master_remove_node('localhost', :worker_2_port);
|
||||||
|
SELECT public.wait_until_metadata_sync(30000);
|
||||||
|
|
||||||
|
CREATE TABLE r1 (a int PRIMARY KEY, b int);
|
||||||
|
SELECT create_reference_table('r1');
|
||||||
|
|
||||||
|
CREATE TABLE d1 (a int PRIMARY KEY, b int);
|
||||||
|
SELECT create_distributed_table('d1', 'a');
|
||||||
|
|
||||||
|
ALTER SEQUENCE pg_dist_groupid_seq RESTART WITH 15;
|
||||||
|
SELECT 1 from master_add_node('localhost', :worker_2_port);
|
||||||
|
|
||||||
|
-- count the number of placements for the reference table to verify it is not available on
|
||||||
|
-- all nodes
|
||||||
|
SELECT count(*)
|
||||||
|
FROM pg_dist_shard
|
||||||
|
JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'r1'::regclass;
|
||||||
|
|
||||||
|
-- #7426 We can't move shards to the fresh node before we copy reference tables there.
|
||||||
|
-- rebalance_table_shards() will do the copy, but the low-level
|
||||||
|
-- citus_move_shard_placement() should raise an error
|
||||||
|
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
|
||||||
|
|
||||||
|
SELECT replicate_reference_tables();
|
||||||
|
|
||||||
|
-- After replication, the move should succeed.
|
||||||
|
SELECT citus_move_shard_placement(pg_dist_shard.shardid, nodename, nodeport, 'localhost', :worker_2_port)
|
||||||
|
FROM pg_dist_shard JOIN pg_dist_shard_placement USING (shardid)
|
||||||
|
WHERE logicalrelid = 'd1'::regclass AND nodename = 'localhost' AND nodeport = :worker_1_port LIMIT 1;
|
||||||
|
|
||||||
|
DROP TABLE d1, r1;
|
||||||
|
|
||||||
-- verify a system having only reference tables will copy the reference tables when
|
-- verify a system having only reference tables will copy the reference tables when
|
||||||
-- executing the rebalancer
|
-- executing the rebalancer
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue