mirror of https://github.com/citusdata/citus.git
Merge pull request #678 from citusdata/fix/fix_676_add_old_versions_of_api_functions_without_schema_parameter
Add old version(without schema name parameter) of api functions backpull/673/head
commit
30d8b74245
|
@ -6,7 +6,7 @@ citus_top_builddir = ../../..
|
||||||
MODULE_big = citus
|
MODULE_big = citus
|
||||||
EXTENSION = citus
|
EXTENSION = citus
|
||||||
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
EXTVERSIONS = 5.0 5.0-1 5.0-2 \
|
||||||
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5
|
5.1-1 5.1-2 5.1-3 5.1-4 5.1-5 5.1-6 5.1-7
|
||||||
|
|
||||||
# All citus--*.sql files in the source directory
|
# All citus--*.sql files in the source directory
|
||||||
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
|
||||||
|
@ -43,6 +43,10 @@ $(EXTENSION)--5.1-4.sql: $(EXTENSION)--5.1-3.sql $(EXTENSION)--5.1-3--5.1-4.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
$(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql
|
$(EXTENSION)--5.1-5.sql: $(EXTENSION)--5.1-4.sql $(EXTENSION)--5.1-4--5.1-5.sql
|
||||||
cat $^ > $@
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--5.1-6.sql: $(EXTENSION)--5.1-5.sql $(EXTENSION)--5.1-5--5.1-6.sql
|
||||||
|
cat $^ > $@
|
||||||
|
$(EXTENSION)--5.1-7.sql: $(EXTENSION)--5.1-6.sql $(EXTENSION)--5.1-6--5.1-7.sql
|
||||||
|
cat $^ > $@
|
||||||
|
|
||||||
NO_PGXS = 1
|
NO_PGXS = 1
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,26 @@
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.worker_apply_shard_ddl_command(bigint, text)
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE sql
|
||||||
|
AS $worker_apply_shard_ddl_command$
|
||||||
|
SELECT pg_catalog.worker_apply_shard_ddl_command($1, 'public', $2);
|
||||||
|
$worker_apply_shard_ddl_command$;
|
||||||
|
COMMENT ON FUNCTION worker_apply_shard_ddl_command(bigint, text)
|
||||||
|
IS 'extend ddl command with shardId and apply on database';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE sql
|
||||||
|
AS $worker_fetch_foreign_file$
|
||||||
|
SELECT pg_catalog.worker_fetch_foreign_file('public', $1, $2, $3, $4);
|
||||||
|
$worker_fetch_foreign_file$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
|
||||||
|
IS 'fetch foreign file from remote node and apply file';
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE sql
|
||||||
|
AS $worker_fetch_regular_table$
|
||||||
|
SELECT pg_catalog.worker_fetch_regular_table('public', $1, $2, $3, $4);
|
||||||
|
$worker_fetch_regular_table$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
|
||||||
|
IS 'fetch PostgreSQL table from remote node';
|
|
@ -0,0 +1,17 @@
|
||||||
|
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_foreign_file(text, text, bigint, text[], integer[]);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$worker_fetch_foreign_file$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.worker_fetch_foreign_file(text, bigint, text[], integer[])
|
||||||
|
IS 'fetch foreign file from remote node and apply file';
|
||||||
|
|
||||||
|
DROP FUNCTION IF EXISTS pg_catalog.worker_fetch_regular_table(text, text, bigint, text[], integer[]);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
|
||||||
|
RETURNS void
|
||||||
|
LANGUAGE C STRICT
|
||||||
|
AS 'MODULE_PATHNAME', $$worker_fetch_regular_table$$;
|
||||||
|
COMMENT ON FUNCTION pg_catalog.worker_fetch_regular_table(text, bigint, text[], integer[])
|
||||||
|
IS 'fetch PostgreSQL table from remote node';
|
|
@ -1,6 +1,6 @@
|
||||||
# Citus extension
|
# Citus extension
|
||||||
comment = 'Citus distributed database'
|
comment = 'Citus distributed database'
|
||||||
default_version = '5.1-5'
|
default_version = '5.1-7'
|
||||||
module_pathname = '$libdir/citus'
|
module_pathname = '$libdir/citus'
|
||||||
relocatable = false
|
relocatable = false
|
||||||
schema = pg_catalog
|
schema = pg_catalog
|
||||||
|
|
|
@ -954,13 +954,14 @@ CreateLocalTable(RangeVar *relation, char *nodeName, int32 nodePort)
|
||||||
|
|
||||||
char *relationName = relation->relname;
|
char *relationName = relation->relname;
|
||||||
char *schemaName = relation->schemaname;
|
char *schemaName = relation->schemaname;
|
||||||
|
char *qualifiedRelationName = quote_qualified_identifier(schemaName, relationName);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The warning message created in TableDDLCommandList() is descriptive
|
* The warning message created in TableDDLCommandList() is descriptive
|
||||||
* enough; therefore, we just throw an error which says that we could not
|
* enough; therefore, we just throw an error which says that we could not
|
||||||
* run the copy operation.
|
* run the copy operation.
|
||||||
*/
|
*/
|
||||||
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, relationName);
|
ddlCommandList = TableDDLCommandList(nodeName, nodePort, qualifiedRelationName);
|
||||||
if (ddlCommandList == NIL)
|
if (ddlCommandList == NIL)
|
||||||
{
|
{
|
||||||
ereport(ERROR, (errmsg("could not run copy from the worker node")));
|
ereport(ERROR, (errmsg("could not run copy from the worker node")));
|
||||||
|
|
|
@ -441,7 +441,6 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
||||||
{
|
{
|
||||||
Oid schemaId = get_rel_namespace(relationId);
|
Oid schemaId = get_rel_namespace(relationId);
|
||||||
char *schemaName = get_namespace_name(schemaId);
|
char *schemaName = get_namespace_name(schemaId);
|
||||||
char *escapedSchemaName = quote_literal_cstr(schemaName);
|
|
||||||
bool shardCreated = true;
|
bool shardCreated = true;
|
||||||
ListCell *ddlCommandCell = NULL;
|
ListCell *ddlCommandCell = NULL;
|
||||||
|
|
||||||
|
@ -452,8 +451,19 @@ WorkerCreateShard(Oid relationId, char *nodeName, uint32 nodePort,
|
||||||
List *queryResultList = NIL;
|
List *queryResultList = NIL;
|
||||||
StringInfo applyDDLCommand = makeStringInfo();
|
StringInfo applyDDLCommand = makeStringInfo();
|
||||||
|
|
||||||
|
if (strcmp(schemaName, "public") != 0)
|
||||||
|
{
|
||||||
|
char *escapedSchemaName = quote_literal_cstr(schemaName);
|
||||||
|
|
||||||
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
|
appendStringInfo(applyDDLCommand, WORKER_APPLY_SHARD_DDL_COMMAND, shardId,
|
||||||
escapedSchemaName, escapedDDLCommand);
|
escapedSchemaName, escapedDDLCommand);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(applyDDLCommand,
|
||||||
|
WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA, shardId,
|
||||||
|
escapedDDLCommand);
|
||||||
|
}
|
||||||
|
|
||||||
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
|
queryResultList = ExecuteRemoteQuery(nodeName, nodePort, newShardOwner,
|
||||||
applyDDLCommand);
|
applyDDLCommand);
|
||||||
|
|
|
@ -3779,15 +3779,39 @@ ShardFetchQueryString(uint64 shardId)
|
||||||
if (storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_RELAY ||
|
if (storageType == SHARD_STORAGE_TABLE || storageType == SHARD_STORAGE_RELAY ||
|
||||||
storageType == SHARD_STORAGE_COLUMNAR)
|
storageType == SHARD_STORAGE_COLUMNAR)
|
||||||
{
|
{
|
||||||
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND,
|
if (strcmp(shardSchemaName, "public") != 0)
|
||||||
shardSchemaName, shardTableName, shardLength,
|
{
|
||||||
nodeNameArrayString->data, nodePortArrayString->data);
|
char *qualifiedTableName = quote_qualified_identifier(shardSchemaName,
|
||||||
|
shardTableName);
|
||||||
|
|
||||||
|
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, qualifiedTableName,
|
||||||
|
shardLength, nodeNameArrayString->data,
|
||||||
|
nodePortArrayString->data);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(shardFetchQuery, TABLE_FETCH_COMMAND, shardTableName,
|
||||||
|
shardLength, nodeNameArrayString->data,
|
||||||
|
nodePortArrayString->data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (storageType == SHARD_STORAGE_FOREIGN)
|
else if (storageType == SHARD_STORAGE_FOREIGN)
|
||||||
{
|
{
|
||||||
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND,
|
if (strcmp(shardSchemaName, "public") != 0)
|
||||||
shardSchemaName, shardTableName, shardLength,
|
{
|
||||||
nodeNameArrayString->data, nodePortArrayString->data);
|
char *qualifiedTableName = quote_qualified_identifier(shardSchemaName,
|
||||||
|
shardTableName);
|
||||||
|
|
||||||
|
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, qualifiedTableName,
|
||||||
|
shardLength, nodeNameArrayString->data,
|
||||||
|
nodePortArrayString->data);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
appendStringInfo(shardFetchQuery, FOREIGN_FETCH_COMMAND, shardTableName,
|
||||||
|
shardLength, nodeNameArrayString->data,
|
||||||
|
nodePortArrayString->data);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return shardFetchQuery;
|
return shardFetchQuery;
|
||||||
|
|
|
@ -53,21 +53,20 @@ static bool ReceiveRegularFile(const char *nodeName, uint32 nodePort,
|
||||||
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
||||||
int32 fileDescriptor);
|
int32 fileDescriptor);
|
||||||
static void DeleteFile(const char *filename);
|
static void DeleteFile(const char *filename);
|
||||||
static void FetchTableCommon(text *tableSchemaNameText, text *tableName,
|
static void FetchTableCommon(text *tableName, uint64 remoteTableSize,
|
||||||
uint64 remoteTableSize, ArrayType *nodeNameObject,
|
ArrayType *nodeNameObject, ArrayType *nodePortObject,
|
||||||
ArrayType *nodePortObject,
|
|
||||||
bool (*FetchTableFunction)(const char *, uint32,
|
bool (*FetchTableFunction)(const char *, uint32,
|
||||||
const char *, const char *));
|
const char *));
|
||||||
static uint64 LocalTableSize(Oid relationId);
|
static uint64 LocalTableSize(Oid relationId);
|
||||||
static uint64 ExtractShardId(const char *tableName);
|
static uint64 ExtractShardId(const char *tableName);
|
||||||
static bool FetchRegularTable(const char *nodeName, uint32 nodePort,
|
static bool FetchRegularTable(const char *nodeName, uint32 nodePort,
|
||||||
const char *schemaName, const char *tableName);
|
const char *tableName);
|
||||||
static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
|
static bool FetchForeignTable(const char *nodeName, uint32 nodePort,
|
||||||
const char *schemaName, const char *tableName);
|
const char *tableName);
|
||||||
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
|
static const char * RemoteTableOwner(const char *nodeName, uint32 nodePort,
|
||||||
const char *schemaName, const char *tableName);
|
const char *tableName);
|
||||||
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
|
static StringInfo ForeignFilePath(const char *nodeName, uint32 nodePort,
|
||||||
const char *schemaName, const char *tableName);
|
const char *tableName);
|
||||||
static bool check_log_statement(List *stmt_list);
|
static bool check_log_statement(List *stmt_list);
|
||||||
|
|
||||||
|
|
||||||
|
@ -427,18 +426,17 @@ worker_apply_shard_ddl_command(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
worker_fetch_regular_table(PG_FUNCTION_ARGS)
|
worker_fetch_regular_table(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *regularSchemaName = PG_GETARG_TEXT_P(0);
|
text *regularTableName = PG_GETARG_TEXT_P(0);
|
||||||
text *regularTableName = PG_GETARG_TEXT_P(1);
|
uint64 generationStamp = PG_GETARG_INT64(1);
|
||||||
uint64 generationStamp = PG_GETARG_INT64(2);
|
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||||
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
|
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
|
||||||
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Run common logic to fetch the remote table, and use the provided function
|
* Run common logic to fetch the remote table, and use the provided function
|
||||||
* pointer to perform the actual table fetching.
|
* pointer to perform the actual table fetching.
|
||||||
*/
|
*/
|
||||||
FetchTableCommon(regularSchemaName, regularTableName, generationStamp,
|
FetchTableCommon(regularTableName, generationStamp, nodeNameObject, nodePortObject,
|
||||||
nodeNameObject, nodePortObject, &FetchRegularTable);
|
&FetchRegularTable);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -452,18 +450,17 @@ worker_fetch_regular_table(PG_FUNCTION_ARGS)
|
||||||
Datum
|
Datum
|
||||||
worker_fetch_foreign_file(PG_FUNCTION_ARGS)
|
worker_fetch_foreign_file(PG_FUNCTION_ARGS)
|
||||||
{
|
{
|
||||||
text *foreignSchemaName = PG_GETARG_TEXT_P(0);
|
text *foreignTableName = PG_GETARG_TEXT_P(0);
|
||||||
text *foreignTableName = PG_GETARG_TEXT_P(1);
|
uint64 foreignFileSize = PG_GETARG_INT64(1);
|
||||||
uint64 foreignFileSize = PG_GETARG_INT64(2);
|
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(2);
|
||||||
ArrayType *nodeNameObject = PG_GETARG_ARRAYTYPE_P(3);
|
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(3);
|
||||||
ArrayType *nodePortObject = PG_GETARG_ARRAYTYPE_P(4);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Run common logic to fetch the remote table, and use the provided function
|
* Run common logic to fetch the remote table, and use the provided function
|
||||||
* pointer to perform the actual table fetching.
|
* pointer to perform the actual table fetching.
|
||||||
*/
|
*/
|
||||||
FetchTableCommon(foreignSchemaName, foreignTableName, foreignFileSize,
|
FetchTableCommon(foreignTableName, foreignFileSize, nodeNameObject, nodePortObject,
|
||||||
nodeNameObject, nodePortObject, &FetchForeignTable);
|
&FetchForeignTable);
|
||||||
|
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
@ -476,18 +473,17 @@ worker_fetch_foreign_file(PG_FUNCTION_ARGS)
|
||||||
* are retried in case of node failures.
|
* are retried in case of node failures.
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTableSize,
|
FetchTableCommon(text *tableNameText, uint64 remoteTableSize,
|
||||||
ArrayType *nodeNameObject, ArrayType *nodePortObject,
|
ArrayType *nodeNameObject, ArrayType *nodePortObject,
|
||||||
bool (*FetchTableFunction)(const char *, uint32, const char *,
|
bool (*FetchTableFunction)(const char *, uint32, const char *))
|
||||||
const char *))
|
|
||||||
{
|
{
|
||||||
char *schemaName = NULL;
|
|
||||||
char *tableName = NULL;
|
|
||||||
Oid schemaId = InvalidOid;
|
|
||||||
uint64 shardId = INVALID_SHARD_ID;
|
uint64 shardId = INVALID_SHARD_ID;
|
||||||
Oid relationId = InvalidOid;
|
Oid relationId = InvalidOid;
|
||||||
|
List *relationNameList = NIL;
|
||||||
|
RangeVar *relation = NULL;
|
||||||
uint32 nodeIndex = 0;
|
uint32 nodeIndex = 0;
|
||||||
bool tableFetched = false;
|
bool tableFetched = false;
|
||||||
|
char *tableName = text_to_cstring(tableNameText);
|
||||||
|
|
||||||
Datum *nodeNameArray = DeconstructArrayObject(nodeNameObject);
|
Datum *nodeNameArray = DeconstructArrayObject(nodeNameObject);
|
||||||
Datum *nodePortArray = DeconstructArrayObject(nodePortObject);
|
Datum *nodePortArray = DeconstructArrayObject(nodePortObject);
|
||||||
|
@ -501,9 +497,6 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
|
||||||
" do not match", nodeNameCount, nodePortCount)));
|
" do not match", nodeNameCount, nodePortCount)));
|
||||||
}
|
}
|
||||||
|
|
||||||
schemaName = text_to_cstring(tableSchemaNameText);
|
|
||||||
tableName = text_to_cstring(tableNameText);
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* We lock on the shardId, but do not unlock. When the function returns, and
|
* We lock on the shardId, but do not unlock. When the function returns, and
|
||||||
* the transaction for this function commits, this lock will automatically
|
* the transaction for this function commits, this lock will automatically
|
||||||
|
@ -513,9 +506,11 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
|
||||||
shardId = ExtractShardId(tableName);
|
shardId = ExtractShardId(tableName);
|
||||||
LockShardResource(shardId, AccessExclusiveLock);
|
LockShardResource(shardId, AccessExclusiveLock);
|
||||||
|
|
||||||
|
relationNameList = textToQualifiedNameList(tableNameText);
|
||||||
|
relation = makeRangeVarFromNameList(relationNameList);
|
||||||
|
relationId = RangeVarGetRelid(relation, NoLock, true);
|
||||||
|
|
||||||
/* check if we already fetched the table */
|
/* check if we already fetched the table */
|
||||||
schemaId = get_namespace_oid(schemaName, true);
|
|
||||||
relationId = get_relname_relid(tableName, schemaId);
|
|
||||||
if (relationId != InvalidOid)
|
if (relationId != InvalidOid)
|
||||||
{
|
{
|
||||||
uint64 localTableSize = 0;
|
uint64 localTableSize = 0;
|
||||||
|
@ -566,7 +561,7 @@ FetchTableCommon(text *tableSchemaNameText, text *tableNameText, uint64 remoteTa
|
||||||
char *nodeName = TextDatumGetCString(nodeNameDatum);
|
char *nodeName = TextDatumGetCString(nodeNameDatum);
|
||||||
uint32 nodePort = DatumGetUInt32(nodePortDatum);
|
uint32 nodePort = DatumGetUInt32(nodePortDatum);
|
||||||
|
|
||||||
tableFetched = (*FetchTableFunction)(nodeName, nodePort, schemaName, tableName);
|
tableFetched = (*FetchTableFunction)(nodeName, nodePort, tableName);
|
||||||
|
|
||||||
nodeIndex++;
|
nodeIndex++;
|
||||||
}
|
}
|
||||||
|
@ -693,8 +688,7 @@ ExtractShardId(const char *tableName)
|
||||||
* false. On other types of failures, the function errors out.
|
* false. On other types of failures, the function errors out.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
FetchRegularTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||||
const char *tableName)
|
|
||||||
{
|
{
|
||||||
StringInfo localFilePath = NULL;
|
StringInfo localFilePath = NULL;
|
||||||
StringInfo remoteCopyCommand = NULL;
|
StringInfo remoteCopyCommand = NULL;
|
||||||
|
@ -704,12 +698,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
RangeVar *localTable = NULL;
|
RangeVar *localTable = NULL;
|
||||||
uint64 shardId = 0;
|
uint64 shardId = 0;
|
||||||
bool received = false;
|
bool received = false;
|
||||||
char *quotedTableName = NULL;
|
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
const char *tableOwner = NULL;
|
const char *tableOwner = NULL;
|
||||||
Oid tableOwnerId = InvalidOid;
|
Oid tableOwnerId = InvalidOid;
|
||||||
Oid savedUserId = InvalidOid;
|
Oid savedUserId = InvalidOid;
|
||||||
int savedSecurityContext = 0;
|
int savedSecurityContext = 0;
|
||||||
|
List *tableNameList = NIL;
|
||||||
|
|
||||||
/* copy remote table's data to this node in an idempotent manner */
|
/* copy remote table's data to this node in an idempotent manner */
|
||||||
shardId = ExtractShardId(tableName);
|
shardId = ExtractShardId(tableName);
|
||||||
|
@ -717,9 +711,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
|
appendStringInfo(localFilePath, "base/%s/%s" UINT64_FORMAT,
|
||||||
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
|
PG_JOB_CACHE_DIR, TABLE_FILE_PREFIX, shardId);
|
||||||
|
|
||||||
quotedTableName = quote_qualified_identifier(schemaName, tableName);
|
|
||||||
remoteCopyCommand = makeStringInfo();
|
remoteCopyCommand = makeStringInfo();
|
||||||
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, quotedTableName);
|
appendStringInfo(remoteCopyCommand, COPY_OUT_COMMAND, tableName);
|
||||||
|
|
||||||
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
|
received = ReceiveRegularFile(nodeName, nodePort, remoteCopyCommand, localFilePath);
|
||||||
if (!received)
|
if (!received)
|
||||||
|
@ -728,7 +721,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* fetch the ddl commands needed to create the table */
|
/* fetch the ddl commands needed to create the table */
|
||||||
tableOwner = RemoteTableOwner(nodeName, nodePort, schemaName, tableName);
|
tableOwner = RemoteTableOwner(nodeName, nodePort, tableName);
|
||||||
if (tableOwner == NULL)
|
if (tableOwner == NULL)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -736,7 +729,7 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
tableOwnerId = get_role_oid(tableOwner, false);
|
tableOwnerId = get_role_oid(tableOwner, false);
|
||||||
|
|
||||||
/* fetch the ddl commands needed to create the table */
|
/* fetch the ddl commands needed to create the table */
|
||||||
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
|
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName);
|
||||||
if (ddlCommandList == NIL)
|
if (ddlCommandList == NIL)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -768,11 +761,12 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
|
* directly calling DoCopy() because some extensions (e.g. cstore_fdw) hook
|
||||||
* into process utility to provide their custom COPY behavior.
|
* into process utility to provide their custom COPY behavior.
|
||||||
*/
|
*/
|
||||||
localTable = makeRangeVar((char *) schemaName, (char *) tableName, -1);
|
tableNameList = stringToQualifiedNameList(tableName);
|
||||||
|
localTable = makeRangeVarFromNameList(tableNameList);
|
||||||
localCopyCommand = CopyStatement(localTable, localFilePath->data);
|
localCopyCommand = CopyStatement(localTable, localFilePath->data);
|
||||||
|
|
||||||
queryString = makeStringInfo();
|
queryString = makeStringInfo();
|
||||||
appendStringInfo(queryString, COPY_IN_COMMAND, quotedTableName, localFilePath->data);
|
appendStringInfo(queryString, COPY_IN_COMMAND, tableName, localFilePath->data);
|
||||||
|
|
||||||
ProcessUtility((Node *) localCopyCommand, queryString->data,
|
ProcessUtility((Node *) localCopyCommand, queryString->data,
|
||||||
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, NULL);
|
||||||
|
@ -793,10 +787,8 @@ FetchRegularTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
* commands against the local database, the function errors out.
|
* commands against the local database, the function errors out.
|
||||||
*/
|
*/
|
||||||
static bool
|
static bool
|
||||||
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
FetchForeignTable(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||||
const char *tableName)
|
|
||||||
{
|
{
|
||||||
char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
|
|
||||||
StringInfo localFilePath = NULL;
|
StringInfo localFilePath = NULL;
|
||||||
StringInfo remoteFilePath = NULL;
|
StringInfo remoteFilePath = NULL;
|
||||||
StringInfo transmitCommand = NULL;
|
StringInfo transmitCommand = NULL;
|
||||||
|
@ -814,7 +806,7 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
localFilePath = makeStringInfo();
|
localFilePath = makeStringInfo();
|
||||||
appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName);
|
appendStringInfo(localFilePath, FOREIGN_CACHED_FILE_PATH, tableName);
|
||||||
|
|
||||||
remoteFilePath = ForeignFilePath(nodeName, nodePort, schemaName, tableName);
|
remoteFilePath = ForeignFilePath(nodeName, nodePort, tableName);
|
||||||
if (remoteFilePath == NULL)
|
if (remoteFilePath == NULL)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
|
@ -830,15 +822,15 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
}
|
}
|
||||||
|
|
||||||
/* fetch the ddl commands needed to create the table */
|
/* fetch the ddl commands needed to create the table */
|
||||||
ddlCommandList = TableDDLCommandList(nodeName, nodePort, schemaName, tableName);
|
ddlCommandList = TableDDLCommandList(nodeName, nodePort, tableName);
|
||||||
if (ddlCommandList == NIL)
|
if (ddlCommandList == NIL)
|
||||||
{
|
{
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
alterTableCommand = makeStringInfo();
|
alterTableCommand = makeStringInfo();
|
||||||
appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME,
|
appendStringInfo(alterTableCommand, SET_FOREIGN_TABLE_FILENAME, tableName,
|
||||||
qualifiedTableName, localFilePath->data);
|
localFilePath->data);
|
||||||
|
|
||||||
ddlCommandList = lappend(ddlCommandList, alterTableCommand);
|
ddlCommandList = lappend(ddlCommandList, alterTableCommand);
|
||||||
|
|
||||||
|
@ -865,16 +857,14 @@ FetchForeignTable(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
* the table. If an error occurs during fetching, return NULL.
|
* the table. If an error occurs during fetching, return NULL.
|
||||||
*/
|
*/
|
||||||
static const char *
|
static const char *
|
||||||
RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName,
|
RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||||
const char *tableName)
|
|
||||||
{
|
{
|
||||||
List *ownerList = NIL;
|
List *ownerList = NIL;
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
|
|
||||||
StringInfo relationOwner;
|
StringInfo relationOwner;
|
||||||
|
|
||||||
queryString = makeStringInfo();
|
queryString = makeStringInfo();
|
||||||
appendStringInfo(queryString, GET_TABLE_OWNER, qualifiedTableName);
|
appendStringInfo(queryString, GET_TABLE_OWNER, tableName);
|
||||||
|
|
||||||
ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
|
ownerList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
|
||||||
if (list_length(ownerList) != 1)
|
if (list_length(ownerList) != 1)
|
||||||
|
@ -894,15 +884,13 @@ RemoteTableOwner(const char *nodeName, uint32 nodePort, const char *schemaName,
|
||||||
* the function returns an empty list.
|
* the function returns an empty list.
|
||||||
*/
|
*/
|
||||||
List *
|
List *
|
||||||
TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaName,
|
TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||||
const char *tableName)
|
|
||||||
{
|
{
|
||||||
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
|
|
||||||
List *ddlCommandList = NIL;
|
List *ddlCommandList = NIL;
|
||||||
StringInfo queryString = NULL;
|
StringInfo queryString = NULL;
|
||||||
|
|
||||||
queryString = makeStringInfo();
|
queryString = makeStringInfo();
|
||||||
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, qualifiedTableName);
|
appendStringInfo(queryString, GET_TABLE_DDL_EVENTS, tableName);
|
||||||
|
|
||||||
ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
|
ddlCommandList = ExecuteRemoteQuery(nodeName, nodePort, NULL, queryString);
|
||||||
return ddlCommandList;
|
return ddlCommandList;
|
||||||
|
@ -915,16 +903,14 @@ TableDDLCommandList(const char *nodeName, uint32 nodePort, const char *schemaNam
|
||||||
* null.
|
* null.
|
||||||
*/
|
*/
|
||||||
static StringInfo
|
static StringInfo
|
||||||
ForeignFilePath(const char *nodeName, uint32 nodePort, const char *schemaName,
|
ForeignFilePath(const char *nodeName, uint32 nodePort, const char *tableName)
|
||||||
const char *tableName)
|
|
||||||
{
|
{
|
||||||
const char *qualifiedTableName = quote_qualified_identifier(schemaName, tableName);
|
|
||||||
List *foreignPathList = NIL;
|
List *foreignPathList = NIL;
|
||||||
StringInfo foreignPathCommand = NULL;
|
StringInfo foreignPathCommand = NULL;
|
||||||
StringInfo foreignPath = NULL;
|
StringInfo foreignPath = NULL;
|
||||||
|
|
||||||
foreignPathCommand = makeStringInfo();
|
foreignPathCommand = makeStringInfo();
|
||||||
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, qualifiedTableName);
|
appendStringInfo(foreignPathCommand, FOREIGN_FILE_PATH_COMMAND, tableName);
|
||||||
|
|
||||||
foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand);
|
foreignPathList = ExecuteRemoteQuery(nodeName, nodePort, NULL, foreignPathCommand);
|
||||||
if (foreignPathList != NIL)
|
if (foreignPathList != NIL)
|
||||||
|
|
|
@ -1387,23 +1387,15 @@ static bool
|
||||||
ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand)
|
ApplyShardDDLCommand(PGconn *workerNode, uint64 shardId, const char *ddlCommand)
|
||||||
{
|
{
|
||||||
const char *remoteCommand = APPLY_SHARD_DDL_COMMAND;
|
const char *remoteCommand = APPLY_SHARD_DDL_COMMAND;
|
||||||
const char *parameterValue[3];
|
const char *parameterValue[2];
|
||||||
const int parameterCount = 3;
|
const int parameterCount = 2;
|
||||||
PGresult *ddlResult = NULL;
|
PGresult *ddlResult = NULL;
|
||||||
|
|
||||||
char shardIdString[NAMEDATALEN];
|
char shardIdString[NAMEDATALEN];
|
||||||
snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId);
|
snprintf(shardIdString, NAMEDATALEN, UINT64_FORMAT, shardId);
|
||||||
|
|
||||||
/*
|
|
||||||
* We changed worker_apply_shard_ddl_command and now it requires schema name. Since
|
|
||||||
* \STAGE will be deprecated anyway, we use public schema for everything to make it
|
|
||||||
* work with worker_apply_shard_ddl_command. Please note that if user specifies
|
|
||||||
* schema name, this will not override it, because we prioritize schema names given
|
|
||||||
* in the query in worker_apply_shard_ddl_command.
|
|
||||||
*/
|
|
||||||
parameterValue[0] = shardIdString;
|
parameterValue[0] = shardIdString;
|
||||||
parameterValue[1] = "public";
|
parameterValue[1] = ddlCommand;
|
||||||
parameterValue[2] = ddlCommand;
|
|
||||||
|
|
||||||
ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand,
|
ddlResult = ExecuteRemoteCommand(workerNode, remoteCommand,
|
||||||
parameterValue, parameterCount);
|
parameterValue, parameterCount);
|
||||||
|
|
|
@ -65,7 +65,7 @@
|
||||||
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
|
"SELECT * FROM (SELECT (pg_options_to_table(ftoptions)).* FROM pg_foreign_table " \
|
||||||
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
|
"WHERE ftrelid = %u) AS Q WHERE option_name = 'filename';"
|
||||||
#define APPLY_SHARD_DDL_COMMAND \
|
#define APPLY_SHARD_DDL_COMMAND \
|
||||||
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text, $3::text)"
|
"SELECT * FROM worker_apply_shard_ddl_command ($1::int8, $2::text)"
|
||||||
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
|
#define REMOTE_FILE_SIZE_COMMAND "SELECT size FROM pg_stat_file('%s')"
|
||||||
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"
|
#define SHARD_COLUMNAR_TABLE_SIZE_COMMAND "SELECT cstore_table_size('%s')"
|
||||||
|
|
||||||
|
|
|
@ -51,6 +51,8 @@
|
||||||
/* Remote call definitions to help with data staging and deletion */
|
/* Remote call definitions to help with data staging and deletion */
|
||||||
#define WORKER_APPLY_SHARD_DDL_COMMAND \
|
#define WORKER_APPLY_SHARD_DDL_COMMAND \
|
||||||
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
|
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s, %s)"
|
||||||
|
#define WORKER_APPLY_SHARD_DDL_COMMAND_WITHOUT_SCHEMA \
|
||||||
|
"SELECT worker_apply_shard_ddl_command (" UINT64_FORMAT ", %s)"
|
||||||
#define WORKER_APPEND_TABLE_TO_SHARD \
|
#define WORKER_APPEND_TABLE_TO_SHARD \
|
||||||
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
|
"SELECT worker_append_table_to_shard (%s, %s, %s, %u)"
|
||||||
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
|
#define SHARD_MIN_VALUE_QUERY "SELECT min(%s) FROM %s"
|
||||||
|
|
|
@ -33,9 +33,9 @@
|
||||||
#define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
|
#define RESERVED_HASHED_COLUMN_ID MaxAttrNumber
|
||||||
#define MERGE_COLUMN_FORMAT "merge_column_%u"
|
#define MERGE_COLUMN_FORMAT "merge_column_%u"
|
||||||
#define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \
|
#define TABLE_FETCH_COMMAND "SELECT worker_fetch_regular_table \
|
||||||
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
|
('%s', " UINT64_FORMAT ", '%s', '%s')"
|
||||||
#define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \
|
#define FOREIGN_FETCH_COMMAND "SELECT worker_fetch_foreign_file \
|
||||||
('%s', '%s', " UINT64_FORMAT ", '%s', '%s')"
|
('%s', " UINT64_FORMAT ", '%s', '%s')"
|
||||||
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
|
#define MAP_OUTPUT_FETCH_COMMAND "SELECT worker_fetch_partition_file \
|
||||||
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
|
(" UINT64_FORMAT ", %u, %u, %u, '%s', %u)"
|
||||||
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
|
#define RANGE_PARTITION_COMMAND "SELECT worker_range_partition_table \
|
||||||
|
|
|
@ -120,7 +120,7 @@ extern Datum * DeconstructArrayObject(ArrayType *arrayObject);
|
||||||
extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
extern int32 ArrayObjectCount(ArrayType *arrayObject);
|
||||||
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
extern FmgrInfo * GetFunctionInfo(Oid typeId, Oid accessMethodId, int16 procedureId);
|
||||||
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
extern List * TableDDLCommandList(const char *nodeName, uint32 nodePort,
|
||||||
const char *schemaName, const char *tableName);
|
const char *tableName);
|
||||||
|
|
||||||
/* Function declarations shared with the master planner */
|
/* Function declarations shared with the master planner */
|
||||||
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
extern StringInfo TaskFilename(StringInfo directoryName, uint32 taskId);
|
||||||
|
|
|
@ -390,7 +390,7 @@ ORDER BY
|
||||||
customer_keys.o_custkey DESC
|
customer_keys.o_custkey DESC
|
||||||
LIMIT 10 OFFSET 20;
|
LIMIT 10 OFFSET 20;
|
||||||
DEBUG: push down of limit count: 30
|
DEBUG: push down of limit count: 30
|
||||||
DEBUG: building index "pg_toast_16966_index" on table "pg_toast_16966"
|
DEBUG: building index "pg_toast_16977_index" on table "pg_toast_16977"
|
||||||
o_custkey | total_order_count
|
o_custkey | total_order_count
|
||||||
-----------+-------------------
|
-----------+-------------------
|
||||||
1466 | 1
|
1466 | 1
|
||||||
|
|
|
@ -19,6 +19,8 @@ ALTER EXTENSION citus UPDATE TO '5.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-3';
|
ALTER EXTENSION citus UPDATE TO '5.1-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-4';
|
ALTER EXTENSION citus UPDATE TO '5.1-4';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-5';
|
ALTER EXTENSION citus UPDATE TO '5.1-5';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.1-6';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.1-7';
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
\c
|
\c
|
||||||
|
|
|
@ -24,6 +24,8 @@ ALTER EXTENSION citus UPDATE TO '5.1-2';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-3';
|
ALTER EXTENSION citus UPDATE TO '5.1-3';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-4';
|
ALTER EXTENSION citus UPDATE TO '5.1-4';
|
||||||
ALTER EXTENSION citus UPDATE TO '5.1-5';
|
ALTER EXTENSION citus UPDATE TO '5.1-5';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.1-6';
|
||||||
|
ALTER EXTENSION citus UPDATE TO '5.1-7';
|
||||||
|
|
||||||
-- drop extension an re-create in newest version
|
-- drop extension an re-create in newest version
|
||||||
DROP EXTENSION citus;
|
DROP EXTENSION citus;
|
||||||
|
|
Loading…
Reference in New Issue