Make undistribute_table() and citus_create_local_table() work with columnar (#4563)

* Make undistribute_table() and citus_create_local_table() work with columnar

* Rename and use LocallyExecuteUtilityTask for UDF check

* Remove 'local' references in ExecuteUtilityCommand
pull/4579/head
Naisila Puka 2021-01-27 01:17:20 +03:00 committed by GitHub
parent 9b6ccb313d
commit 94bc2703bc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 468 additions and 51 deletions

View File

@ -23,6 +23,7 @@
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/foreign_key_relationship.h" #include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h"
#include "distributed/multi_executor.h" #include "distributed/multi_executor.h"
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/reference_table_utils.h" #include "distributed/reference_table_utils.h"
@ -292,7 +293,7 @@ DropRelationForeignKeys(Oid relationId, int fKeyFlags)
SetLocalEnableLocalReferenceForeignKeys(false); SetLocalEnableLocalReferenceForeignKeys(false);
List *dropFkeyCascadeCommandList = GetRelationDropFkeyCommands(relationId, fKeyFlags); List *dropFkeyCascadeCommandList = GetRelationDropFkeyCommands(relationId, fKeyFlags);
ExecuteAndLogDDLCommandList(dropFkeyCascadeCommandList); ExecuteAndLogUtilityCommandList(dropFkeyCascadeCommandList);
SetLocalEnableLocalReferenceForeignKeys(oldEnableLocalReferenceForeignKeys); SetLocalEnableLocalReferenceForeignKeys(oldEnableLocalReferenceForeignKeys);
} }
@ -428,32 +429,30 @@ ExecuteCascadeOperationForRelationIdList(List *relationIdList,
/* /*
* ExecuteAndLogDDLCommandList takes a list of ddl commands and calls * ExecuteAndLogUtilityCommandList takes a list of utility commands and calls
* ExecuteAndLogDDLCommand function for each of them. * ExecuteAndLogUtilityCommand function for each of them.
*/ */
void void
ExecuteAndLogDDLCommandList(List *ddlCommandList) ExecuteAndLogUtilityCommandList(List *utilityCommandList)
{ {
char *ddlCommand = NULL; char *utilityCommand = NULL;
foreach_ptr(ddlCommand, ddlCommandList) foreach_ptr(utilityCommand, utilityCommandList)
{ {
ExecuteAndLogDDLCommand(ddlCommand); ExecuteAndLogUtilityCommand(utilityCommand);
} }
} }
/* /*
* ExecuteAndLogDDLCommand takes a ddl command and logs it in DEBUG4 log level. * ExecuteAndLogUtilityCommand takes a utility command and logs it in DEBUG4 log level.
* Then, parses and executes it via CitusProcessUtility. * Then, parses and executes it via CitusProcessUtility.
*/ */
void void
ExecuteAndLogDDLCommand(const char *commandString) ExecuteAndLogUtilityCommand(const char *commandString)
{ {
ereport(DEBUG4, (errmsg("executing \"%s\"", commandString))); ereport(DEBUG4, (errmsg("executing \"%s\"", commandString)));
Node *parseTree = ParseTreeNode(commandString); ExecuteUtilityCommand(commandString);
ProcessUtilityParseTree(parseTree, commandString, PROCESS_UTILITY_TOPLEVEL,
NULL, None_Receiver, NULL);
} }

View File

@ -154,7 +154,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
/* /*
* Creating Citus local tables relies on functions that accesses * Creating Citus local tables relies on functions that accesses
* shards locally (e.g., ExecuteAndLogDDLCommand()). As long as * shards locally (e.g., ExecuteAndLogUtilityCommand()). As long as
* we don't teach those functions to access shards remotely, we * we don't teach those functions to access shards remotely, we
* cannot relax this check. * cannot relax this check.
*/ */
@ -246,7 +246,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys)
* from scratch, below we simply recreate the shell table executing them * from scratch, below we simply recreate the shell table executing them
* via process utility. * via process utility.
*/ */
ExecuteAndLogDDLCommandList(shellTableDDLEvents); ExecuteAndLogUtilityCommandList(shellTableDDLEvents);
/* /*
* Set shellRelationId as the relation with relationId now points * Set shellRelationId as the relation with relationId now points
@ -436,7 +436,7 @@ RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId)
appendStringInfo(renameCommand, "ALTER TABLE %s RENAME TO %s;", appendStringInfo(renameCommand, "ALTER TABLE %s RENAME TO %s;",
qualifiedShellRelationName, quotedShardRelationName); qualifiedShellRelationName, quotedShardRelationName);
ExecuteAndLogDDLCommand(renameCommand->data); ExecuteAndLogUtilityCommand(renameCommand->data);
} }
@ -456,7 +456,7 @@ RenameShardRelationConstraints(Oid shardRelationId, uint64 shardId)
{ {
const char *commandString = const char *commandString =
GetRenameShardConstraintCommand(shardRelationId, constraintName, shardId); GetRenameShardConstraintCommand(shardRelationId, constraintName, shardId);
ExecuteAndLogDDLCommand(commandString); ExecuteAndLogUtilityCommand(commandString);
} }
} }
@ -550,7 +550,7 @@ RenameShardRelationIndexes(Oid shardRelationId, uint64 shardId)
foreach_oid(indexOid, indexOidList) foreach_oid(indexOid, indexOidList)
{ {
const char *commandString = GetRenameShardIndexCommand(indexOid, shardId); const char *commandString = GetRenameShardIndexCommand(indexOid, shardId);
ExecuteAndLogDDLCommand(commandString); ExecuteAndLogUtilityCommand(commandString);
} }
} }
@ -591,7 +591,7 @@ RenameShardRelationStatistics(Oid shardRelationId, uint64 shardId)
char *command = NULL; char *command = NULL;
foreach_ptr(command, statsCommandList) foreach_ptr(command, statsCommandList)
{ {
ExecuteAndLogDDLCommand(command); ExecuteAndLogUtilityCommand(command);
} }
} }
@ -636,7 +636,7 @@ RenameShardRelationNonTruncateTriggers(Oid shardRelationId, uint64 shardId)
char *triggerName = NameStr(triggerForm->tgname); char *triggerName = NameStr(triggerForm->tgname);
char *commandString = char *commandString =
GetRenameShardTriggerCommand(shardRelationId, triggerName, shardId); GetRenameShardTriggerCommand(shardRelationId, triggerName, shardId);
ExecuteAndLogDDLCommand(commandString); ExecuteAndLogUtilityCommand(commandString);
} }
heap_freetuple(triggerTuple); heap_freetuple(triggerTuple);
@ -688,7 +688,7 @@ DropRelationTruncateTriggers(Oid relationId)
{ {
char *triggerName = NameStr(triggerForm->tgname); char *triggerName = NameStr(triggerForm->tgname);
char *commandString = GetDropTriggerCommand(relationId, triggerName); char *commandString = GetDropTriggerCommand(relationId, triggerName);
ExecuteAndLogDDLCommand(commandString); ExecuteAndLogUtilityCommand(commandString);
} }
heap_freetuple(triggerTuple); heap_freetuple(triggerTuple);
@ -868,7 +868,7 @@ DropDefaultColumnDefinition(Oid relationId, char *columnName)
"ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT", "ALTER TABLE %s ALTER COLUMN %s DROP DEFAULT",
qualifiedRelationName, quotedColumnName); qualifiedRelationName, quotedColumnName);
ExecuteAndLogDDLCommand(sequenceDropCommand->data); ExecuteAndLogUtilityCommand(sequenceDropCommand->data);
} }
@ -891,7 +891,7 @@ TransferSequenceOwnership(Oid sequenceId, Oid targetRelationId, char *targetColu
qualifiedSequenceName, qualifiedTargetRelationName, qualifiedSequenceName, qualifiedTargetRelationName,
quotedTargetColumnName); quotedTargetColumnName);
ExecuteAndLogDDLCommand(sequenceOwnershipCommand->data); ExecuteAndLogUtilityCommand(sequenceOwnershipCommand->data);
} }

View File

@ -128,8 +128,7 @@ static uint64 LocallyPlanAndExecuteMultipleQueries(List *queryStrings,
static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo, static void ExtractParametersForLocalExecution(ParamListInfo paramListInfo,
Oid **parameterTypes, Oid **parameterTypes,
const char ***parameterValues); const char ***parameterValues);
static void LocallyExecuteUtilityTask(const char *utilityCommand); static void ExecuteUdfTaskQuery(Query *localUdfCommandQuery);
static void LocallyExecuteUdfTaskQuery(Query *localUdfCommandQuery);
static void EnsureTransitionPossible(LocalExecutionStatus from, static void EnsureTransitionPossible(LocalExecutionStatus from,
LocalExecutionStatus to); LocalExecutionStatus to);
@ -241,7 +240,7 @@ ExecuteLocalTaskListExtended(List *taskList,
if (isUtilityCommand) if (isUtilityCommand)
{ {
LocallyExecuteUtilityTask(TaskQueryString(task)); ExecuteUtilityCommand(TaskQueryString(task));
continue; continue;
} }
@ -373,43 +372,40 @@ ExtractParametersForLocalExecution(ParamListInfo paramListInfo, Oid **parameterT
/* /*
* LocallyExecuteUtilityTask executes the given local task query in the current * ExecuteUtilityCommand executes the given task query in the current
* session. * session.
*/ */
static void void
LocallyExecuteUtilityTask(const char *localTaskQueryCommand) ExecuteUtilityCommand(const char *taskQueryCommand)
{ {
List *parseTreeList = pg_parse_query(localTaskQueryCommand); List *parseTreeList = pg_parse_query(taskQueryCommand);
RawStmt *localTaskRawStmt = NULL; RawStmt *taskRawStmt = NULL;
foreach_ptr(localTaskRawStmt, parseTreeList) foreach_ptr(taskRawStmt, parseTreeList)
{ {
Node *localTaskRawParseTree = localTaskRawStmt->stmt; Node *taskRawParseTree = taskRawStmt->stmt;
/* /*
* Actually, the query passed to this function would mostly be a * The query passed to this function would mostly be a utility
* utility command to be executed locally. However, some utility * command. However, some utility commands trigger udf calls
* commands do trigger udf calls (e.g worker_apply_shard_ddl_command) * (e.g alter_columnar_table_set()). In that case, we execute
* to execute commands in a generic way. But as we support local * the query with the udf call in below conditional block.
* execution of utility commands, we should also process those udf
* calls locally as well. In that case, we simply execute the query
* implying the udf call in below conditional block.
*/ */
if (IsA(localTaskRawParseTree, SelectStmt)) if (IsA(taskRawParseTree, SelectStmt))
{ {
/* we have no external parameters to rewrite the UDF call RawStmt */ /* we have no external parameters to rewrite the UDF call RawStmt */
Query *localUdfTaskQuery = Query *udfTaskQuery =
RewriteRawQueryStmt(localTaskRawStmt, localTaskQueryCommand, NULL, 0); RewriteRawQueryStmt(taskRawStmt, taskQueryCommand, NULL, 0);
LocallyExecuteUdfTaskQuery(localUdfTaskQuery); ExecuteUdfTaskQuery(udfTaskQuery);
} }
else else
{ {
/* /*
* It is a regular utility command we should execute it locally via * It is a regular utility command we should execute it via
* process utility. * process utility.
*/ */
ProcessUtilityParseTree(localTaskRawParseTree, localTaskQueryCommand, ProcessUtilityParseTree(taskRawParseTree, taskQueryCommand,
PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver, PROCESS_UTILITY_TOPLEVEL, NULL, None_Receiver,
NULL); NULL);
} }
@ -418,15 +414,15 @@ LocallyExecuteUtilityTask(const char *localTaskQueryCommand)
/* /*
* LocallyExecuteUdfTaskQuery executes the given udf command locally. Local udf * ExecuteUdfTaskQuery executes the given udf command. A udf command
* command is simply a "SELECT udf_call()" query and so it cannot be executed * is simply a "SELECT udf_call()" query and so it cannot be executed
* via process utility. * via process utility.
*/ */
static void static void
LocallyExecuteUdfTaskQuery(Query *localUdfTaskQuery) ExecuteUdfTaskQuery(Query *udfTaskQuery)
{ {
/* we do not expect any results */ /* we do not expect any results */
ExecuteQueryIntoDestReceiver(localUdfTaskQuery, NULL, None_Receiver); ExecuteQueryIntoDestReceiver(udfTaskQuery, NULL, None_Receiver);
} }

View File

@ -486,8 +486,8 @@ extern void ErrorIfAnyPartitionRelationInvolvedInNonInheritedFKey(List *relation
extern bool RelationIdListHasReferenceTable(List *relationIdList); extern bool RelationIdListHasReferenceTable(List *relationIdList);
extern void DropRelationForeignKeys(Oid relationId, int flags); extern void DropRelationForeignKeys(Oid relationId, int flags);
extern void SetLocalEnableLocalReferenceForeignKeys(bool state); extern void SetLocalEnableLocalReferenceForeignKeys(bool state);
extern void ExecuteAndLogDDLCommandList(List *ddlCommandList); extern void ExecuteAndLogUtilityCommandList(List *ddlCommandList);
extern void ExecuteAndLogDDLCommand(const char *commandString); extern void ExecuteAndLogUtilityCommand(const char *commandString);
extern void ExecuteForeignKeyCreateCommandList(List *ddlCommandList, extern void ExecuteForeignKeyCreateCommandList(List *ddlCommandList,
bool skip_validation); bool skip_validation);

View File

@ -37,6 +37,7 @@ extern uint64 ExecuteLocalTaskListExtended(List *taskList, ParamListInfo
bool isUtilityCommand); bool isUtilityCommand);
extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList, extern void ExtractLocalAndRemoteTasks(bool readOnlyPlan, List *taskList,
List **localTaskList, List **remoteTaskList); List **localTaskList, List **remoteTaskList);
extern void ExecuteUtilityCommand(const char *utilityCommand);
extern bool ShouldExecuteTasksLocally(List *taskList); extern bool ShouldExecuteTasksLocally(List *taskList);
extern bool AnyTaskAccessesLocalNode(List *taskList); extern bool AnyTaskAccessesLocalNode(List *taskList);
extern bool TaskAccessesLocalNode(Task *task); extern bool TaskAccessesLocalNode(Task *task);

View File

@ -247,6 +247,28 @@ $cmd$);
(localhost,57638,20090007,t,"(100,1000,pglz,15)") (localhost,57638,20090007,t,"(100,1000,pglz,15)")
(4 rows) (4 rows)
-- verify undistribute works
SELECT undistribute_table('table_option');
NOTICE: creating a new table for columnar_citus_integration.table_option
NOTICE: Moving the data of columnar_citus_integration.table_option
NOTICE: Dropping the old columnar_citus_integration.table_option
NOTICE: Renaming the new table to columnar_citus_integration.table_option
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
(0 rows)
SELECT compression FROM columnar.options WHERE regclass = 'table_option'::regclass;
compression
---------------------------------------------------------------------
none
(1 row)
DROP TABLE table_option, table_option_2; DROP TABLE table_option, table_option_2;
-- verify settings get to all placements when there are multiple replica's -- verify settings get to all placements when there are multiple replica's
SET citus.shard_replication_factor TO 2; SET citus.shard_replication_factor TO 2;
@ -544,6 +566,28 @@ $cmd$);
(localhost,57638,20090015,t,"(100,1000,pglz,19)") (localhost,57638,20090015,t,"(100,1000,pglz,19)")
(8 rows) (8 rows)
-- verify undistribute works
SELECT undistribute_table('table_option');
NOTICE: creating a new table for columnar_citus_integration.table_option
NOTICE: Moving the data of columnar_citus_integration.table_option
NOTICE: Dropping the old columnar_citus_integration.table_option
NOTICE: Renaming the new table to columnar_citus_integration.table_option
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
(0 rows)
SELECT compression FROM columnar.options WHERE regclass = 'table_option'::regclass;
compression
---------------------------------------------------------------------
none
(1 row)
DROP TABLE table_option, table_option_2; DROP TABLE table_option, table_option_2;
-- test options on a reference table -- test options on a reference table
CREATE TABLE table_option_reference (a int, b text) USING columnar; CREATE TABLE table_option_reference (a int, b text) USING columnar;
@ -761,6 +805,267 @@ $cmd$);
(localhost,57638,20090017,t,"(100,1000,pglz,9)") (localhost,57638,20090017,t,"(100,1000,pglz,9)")
(2 rows) (2 rows)
-- verify undistribute works
SELECT undistribute_table('table_option_reference');
NOTICE: creating a new table for columnar_citus_integration.table_option_reference
NOTICE: Moving the data of columnar_citus_integration.table_option_reference
NOTICE: Dropping the old columnar_citus_integration.table_option_reference
NOTICE: Renaming the new table to columnar_citus_integration.table_option_reference
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_reference'::regclass;
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
(0 rows)
SELECT compression FROM columnar.options WHERE regclass = 'table_option_reference'::regclass;
compression
---------------------------------------------------------------------
none
(1 row)
DROP TABLE table_option_reference, table_option_reference_2; DROP TABLE table_option_reference, table_option_reference_2;
SET citus.shard_replication_factor TO 1;
-- test options on a citus local table
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
?column?
---------------------------------------------------------------------
1
(1 row)
CREATE TABLE table_option_citus_local (a int, b text) USING columnar;
SELECT create_citus_local_table('table_option_citus_local');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
-- setting: compression
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,none)
(1 row)
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', compression => 'pglz');
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,pglz)
(1 row)
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', compression => true);
alter_columnar_table_reset
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,none)
(1 row)
-- setting: compression_level
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,3)
(1 row)
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', compression_level => 11);
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,11)
(1 row)
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', compression_level => true);
alter_columnar_table_reset
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,3)
(1 row)
-- setting: chunk_row_count
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,10000)
(1 row)
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', chunk_row_count => 100);
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,100)
(1 row)
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', chunk_row_count => true);
alter_columnar_table_reset
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,10000)
(1 row)
-- setting: stripe_row_count
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,150000)
(1 row)
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', stripe_row_count => 100);
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,100)
(1 row)
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', stripe_row_count => true);
alter_columnar_table_reset
---------------------------------------------------------------------
(1 row)
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090018,t,150000)
(1 row)
-- verify settings are propagated when creating a table
CREATE TABLE table_option_citus_local_2 (a int, b text) USING columnar;
SELECT alter_columnar_table_set('table_option_citus_local_2',
chunk_row_count => 100,
stripe_row_count => 1000,
compression => 'pglz',
compression_level => 9);
alter_columnar_table_set
---------------------------------------------------------------------
(1 row)
SELECT create_citus_local_table('table_option_citus_local_2');
create_citus_local_table
---------------------------------------------------------------------
(1 row)
-- verify settings on placements
SELECT run_command_on_placements('table_option_citus_local_2',$cmd$
SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
run_command_on_placements
---------------------------------------------------------------------
(localhost,57636,20090019,t,"(100,1000,pglz,9)")
(1 row)
-- verify undistribute works
SELECT undistribute_table('table_option_citus_local');
NOTICE: creating a new table for columnar_citus_integration.table_option_citus_local
NOTICE: Moving the data of columnar_citus_integration.table_option_citus_local
NOTICE: Dropping the old columnar_citus_integration.table_option_citus_local
NOTICE: Renaming the new table to columnar_citus_integration.table_option_citus_local
undistribute_table
---------------------------------------------------------------------
(1 row)
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_citus_local'::regclass;
logicalrelid | partmethod | partkey | colocationid | repmodel
---------------------------------------------------------------------
(0 rows)
SELECT compression FROM columnar.options WHERE regclass = 'table_option_citus_local'::regclass;
compression
---------------------------------------------------------------------
none
(1 row)
DROP TABLE table_option_citus_local, table_option_citus_local_2;
SELECT 1 FROM master_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA columnar_citus_integration CASCADE; DROP SCHEMA columnar_citus_integration CASCADE;

View File

@ -98,6 +98,11 @@ SELECT run_command_on_placements('table_option_2',$cmd$
SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass; SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$); $cmd$);
-- verify undistribute works
SELECT undistribute_table('table_option');
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
SELECT compression FROM columnar.options WHERE regclass = 'table_option'::regclass;
DROP TABLE table_option, table_option_2; DROP TABLE table_option, table_option_2;
-- verify settings get to all placements when there are multiple replica's -- verify settings get to all placements when there are multiple replica's
@ -193,6 +198,11 @@ SELECT run_command_on_placements('table_option_2',$cmd$
SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass; SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$); $cmd$);
-- verify undistribute works
SELECT undistribute_table('table_option');
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option'::regclass;
SELECT compression FROM columnar.options WHERE regclass = 'table_option'::regclass;
DROP TABLE table_option, table_option_2; DROP TABLE table_option, table_option_2;
-- test options on a reference table -- test options on a reference table
@ -285,7 +295,113 @@ SELECT run_command_on_placements('table_option_reference_2',$cmd$
SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass; SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$); $cmd$);
-- verify undistribute works
SELECT undistribute_table('table_option_reference');
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_reference'::regclass;
SELECT compression FROM columnar.options WHERE regclass = 'table_option_reference'::regclass;
DROP TABLE table_option_reference, table_option_reference_2; DROP TABLE table_option_reference, table_option_reference_2;
SET citus.shard_replication_factor TO 1;
-- test options on a citus local table
SELECT 1 FROM master_add_node('localhost', :master_port, groupId => 0);
CREATE TABLE table_option_citus_local (a int, b text) USING columnar;
SELECT create_citus_local_table('table_option_citus_local');
-- setting: compression
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', compression => 'pglz');
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', compression => true);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- setting: compression_level
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', compression_level => 11);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', compression_level => true);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT compression_level FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- setting: chunk_row_count
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', chunk_row_count => 100);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', chunk_row_count => true);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT chunk_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- setting: stripe_row_count
-- get baseline for setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- change setting
SELECT alter_columnar_table_set('table_option_citus_local', stripe_row_count => 100);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- reset setting
SELECT alter_columnar_table_reset('table_option_citus_local', stripe_row_count => true);
-- verify setting
SELECT run_command_on_placements('table_option_citus_local',$cmd$
SELECT stripe_row_count FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- verify settings are propagated when creating a table
CREATE TABLE table_option_citus_local_2 (a int, b text) USING columnar;
SELECT alter_columnar_table_set('table_option_citus_local_2',
chunk_row_count => 100,
stripe_row_count => 1000,
compression => 'pglz',
compression_level => 9);
SELECT create_citus_local_table('table_option_citus_local_2');
-- verify settings on placements
SELECT run_command_on_placements('table_option_citus_local_2',$cmd$
SELECT ROW(chunk_row_count, stripe_row_count, compression, compression_level) FROM columnar.options WHERE regclass = '%s'::regclass;
$cmd$);
-- verify undistribute works
SELECT undistribute_table('table_option_citus_local');
SELECT * FROM pg_dist_partition WHERE logicalrelid = 'table_option_citus_local'::regclass;
SELECT compression FROM columnar.options WHERE regclass = 'table_option_citus_local'::regclass;
DROP TABLE table_option_citus_local, table_option_citus_local_2;
SELECT 1 FROM master_remove_node('localhost', :master_port);
SET client_min_messages TO WARNING; SET client_min_messages TO WARNING;
DROP SCHEMA columnar_citus_integration CASCADE; DROP SCHEMA columnar_citus_integration CASCADE;