From 5efb742f8aed2666538446e1cf6139bcd8dc858d Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 3 Feb 2021 17:55:16 +0300 Subject: [PATCH 01/10] Skip copying GENERATED ALWAYS AS STORED cols in ReplaceTable (#4616) Postgres doesn't allow inserting into columns having GENERATED ALWAYS AS (...) STORED expressions. For this reason, when executing undistribute_table or an alter_* udf, we should skip copying such columns. This is not bad since Postgres would already generate such columns. --- .../distributed/commands/alter_table.c | 81 +++++++- src/test/regress/expected/pg12.out | 195 +++++++++++++++++- src/test/regress/sql/pg12.sql | 84 ++++++++ 3 files changed, 356 insertions(+), 4 deletions(-) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index e9739cae9..655272fcf 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -185,6 +185,8 @@ static void CreateCitusTableLike(TableConversionState *con); static List * GetViewCreationCommandsOfTable(Oid relationId); static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, bool suppressNoticeMessages); +static bool HasAnyGeneratedStoredColumns(Oid relationId); +static List * GetNonGeneratedStoredColumnNameList(Oid relationId); static void CheckAlterDistributedTableConversionParameters(TableConversionState *con); static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName, @@ -1122,9 +1124,33 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, quote_qualified_identifier(schemaName, sourceName)))); } - appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", - quote_qualified_identifier(schemaName, targetName), - quote_qualified_identifier(schemaName, sourceName)); + if (!HasAnyGeneratedStoredColumns(sourceId)) + { + /* + * Relation has no GENERATED STORED columns, copy the table via plain + * "INSERT INTO .. SELECT *"". + */ + appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", + quote_qualified_identifier(schemaName, targetName), + quote_qualified_identifier(schemaName, sourceName)); + } + else + { + /* + * Skip columns having GENERATED ALWAYS AS (...) STORED expressions + * since Postgres doesn't allow inserting into such columns. + * This is not bad since Postgres would already generate such columns. + * Note that here we intentionally don't skip columns having DEFAULT + * expressions since user might have inserted non-default values. + */ + List *nonStoredColumnNameList = GetNonGeneratedStoredColumnNameList(sourceId); + char *insertColumnString = StringJoin(nonStoredColumnNameList, ','); + appendStringInfo(query, "INSERT INTO %s (%s) SELECT %s FROM %s", + quote_qualified_identifier(schemaName, targetName), + insertColumnString, insertColumnString, + quote_qualified_identifier(schemaName, sourceName)); + } + ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); } @@ -1183,6 +1209,55 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, } +/* + * HasAnyGeneratedStoredColumns decides if relation has any columns that we + * might need to copy the data of when replacing table. + */ +static bool +HasAnyGeneratedStoredColumns(Oid relationId) +{ + return list_length(GetNonGeneratedStoredColumnNameList(relationId)) > 0; +} + + +/* + * GetNonGeneratedStoredColumnNameList returns a list of column names for + * columns not having GENERATED ALWAYS AS (...) STORED expressions. + */ +static List * +GetNonGeneratedStoredColumnNameList(Oid relationId) +{ + List *nonStoredColumnNameList = NIL; + + Relation relation = relation_open(relationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) + { + Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex); + if (currentColumn->attisdropped) + { + /* skip dropped columns */ + continue; + } + +#if PG_VERSION_NUM >= 120000 + if (currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED) + { + continue; + } +#endif + + const char *quotedColumnName = quote_identifier(NameStr(currentColumn->attname)); + nonStoredColumnNameList = lappend(nonStoredColumnNameList, + pstrdup(quotedColumnName)); + } + + relation_close(relation, NoLock); + + return nonStoredColumnNameList; +} + + /* * CheckAlterDistributedTableConversionParameters errors for the cases where * alter_distributed_table UDF wouldn't work. diff --git a/src/test/regress/expected/pg12.out b/src/test/regress/expected/pg12.out index 6c7033b7b..cd7674552 100644 --- a/src/test/regress/expected/pg12.out +++ b/src/test/regress/expected/pg12.out @@ -449,6 +449,199 @@ BEGIN; generated_stored_col_test_60040 | y | s (2 rows) +ROLLBACK; +CREATE TABLE generated_stored_dist ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); +SELECT create_distributed_table ('generated_stored_dist', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 +(2 rows) + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT alter_distributed_table('generated_stored_dist', shard_count := 5, cascade_to_colocated := false); +NOTICE: creating a new table for test_pg12.generated_stored_dist +NOTICE: Moving the data of test_pg12.generated_stored_dist +NOTICE: Dropping the old test_pg12.generated_stored_dist +NOTICE: Renaming the new table to test_pg12.generated_stored_dist + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 + 2 | text_2 | TEXT_2 +(4 rows) + +CREATE TABLE generated_stored_local ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); +SELECT citus_add_local_table_to_metadata('generated_stored_local'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 +(2 rows) + +SELECT create_distributed_table ('generated_stored_local', 'col_1'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_pg12.generated_stored_local$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 + 2 | text_2 | TEXT_2 +(4 rows) + +create table generated_stored_columnar(i int) partition by range(i); +create table generated_stored_columnar_p0 partition of generated_stored_columnar for values from (0) to (10); +create table generated_stored_columnar_p1 partition of generated_stored_columnar for values from (10) to (20); +SELECT alter_table_set_access_method('generated_stored_columnar_p0', 'columnar'); +NOTICE: creating a new table for test_pg12.generated_stored_columnar_p0 +NOTICE: Moving the data of test_pg12.generated_stored_columnar_p0 +NOTICE: Dropping the old test_pg12.generated_stored_columnar_p0 +NOTICE: Renaming the new table to test_pg12.generated_stored_columnar_p0 + alter_table_set_access_method +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE generated_stored_ref ( + col_1 int, + col_2 int, + col_3 int generated always as (col_1+col_2) stored, + col_4 int, + col_5 int generated always as (col_4*2-col_1) stored +); +SELECT create_reference_table ('generated_stored_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_ref (col_1, col_4) VALUES (1,2), (11,12); +INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (100,101,102), (200,201,202); +SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + col_1 | col_2 | col_3 | col_4 | col_5 +--------------------------------------------------------------------- + 1 | | | 2 | 3 + 11 | | | 12 | 13 + 100 | 101 | 201 | 102 | 104 + 200 | 201 | 401 | 202 | 204 +(4 rows) + +BEGIN; + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO generated_stored_ref (col_1, col_4) VALUES (11,12), (21,22); + INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (200,201,202), (300,301,302); + SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + col_1 | col_2 | col_3 | col_4 | col_5 +--------------------------------------------------------------------- + 1 | | | 2 | 3 + 11 | | | 12 | 13 + 11 | | | 12 | 13 + 21 | | | 22 | 23 + 100 | 101 | 201 | 102 | 104 + 200 | 201 | 401 | 202 | 204 + 200 | 201 | 401 | 202 | 204 + 300 | 301 | 601 | 302 | 304 +(8 rows) + +ROLLBACK; +BEGIN; + -- drop some of the columns not having "generated always as stored" expressions + -- this would drop generated columns too + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO generated_stored_ref VALUES (5); + SELECT * FROM generated_stored_REF ORDER BY 1; + col_2 +--------------------------------------------------------------------- + 5 + 101 + 201 + + +(5 rows) + +ROLLBACK; +BEGIN; + -- now drop all columns + ALTER TABLE generated_stored_ref DROP COLUMN col_3; + ALTER TABLE generated_stored_ref DROP COLUMN col_5; + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_2; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM generated_stored_ref; +-- +(4 rows) + ROLLBACK; RESET citus.replicate_reference_tables_on_activate; SELECT citus_remove_node('localhost', :master_port); @@ -459,6 +652,6 @@ SELECT citus_remove_node('localhost', :master_port); \set VERBOSITY terse drop schema test_pg12 cascade; -NOTICE: drop cascades to 10 other objects +NOTICE: drop cascades to 15 other objects \set VERBOSITY default SET citus.shard_replication_factor to 2; diff --git a/src/test/regress/sql/pg12.sql b/src/test/regress/sql/pg12.sql index 53a4f11f4..83e76867a 100644 --- a/src/test/regress/sql/pg12.sql +++ b/src/test/regress/sql/pg12.sql @@ -296,6 +296,90 @@ BEGIN; ORDER BY 1,2; ROLLBACK; +CREATE TABLE generated_stored_dist ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); + +SELECT create_distributed_table ('generated_stored_dist', 'col_1'); + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT alter_distributed_table('generated_stored_dist', shard_count := 5, cascade_to_colocated := false); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + +CREATE TABLE generated_stored_local ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); + +SELECT citus_add_local_table_to_metadata('generated_stored_local'); + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + +SELECT create_distributed_table ('generated_stored_local', 'col_1'); + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + +create table generated_stored_columnar(i int) partition by range(i); +create table generated_stored_columnar_p0 partition of generated_stored_columnar for values from (0) to (10); +create table generated_stored_columnar_p1 partition of generated_stored_columnar for values from (10) to (20); +SELECT alter_table_set_access_method('generated_stored_columnar_p0', 'columnar'); + +CREATE TABLE generated_stored_ref ( + col_1 int, + col_2 int, + col_3 int generated always as (col_1+col_2) stored, + col_4 int, + col_5 int generated always as (col_4*2-col_1) stored +); + +SELECT create_reference_table ('generated_stored_ref'); + +INSERT INTO generated_stored_ref (col_1, col_4) VALUES (1,2), (11,12); +INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (100,101,102), (200,201,202); + +SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + +BEGIN; + SELECT undistribute_table('generated_stored_ref'); + INSERT INTO generated_stored_ref (col_1, col_4) VALUES (11,12), (21,22); + INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (200,201,202), (300,301,302); + SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; +ROLLBACK; + +BEGIN; + -- drop some of the columns not having "generated always as stored" expressions + -- this would drop generated columns too + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); + INSERT INTO generated_stored_ref VALUES (5); + SELECT * FROM generated_stored_REF ORDER BY 1; +ROLLBACK; + +BEGIN; + -- now drop all columns + ALTER TABLE generated_stored_ref DROP COLUMN col_3; + ALTER TABLE generated_stored_ref DROP COLUMN col_5; + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_2; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); + + SELECT * FROM generated_stored_ref; +ROLLBACK; + RESET citus.replicate_reference_tables_on_activate; SELECT citus_remove_node('localhost', :master_port); From 3a403090fd2d4381321c0748dc702a28740e6e92 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 3 Feb 2021 19:05:17 +0300 Subject: [PATCH 02/10] Disallow adding local table with identity column to metadata (#4633) pg_get_tableschemadef_string doesn't know how to deparse identity columns so we cannot reflect those columns when creating shell relation. For this reason, we don't allow adding local tables -having identity cols- to metadata. --- .../citus_add_local_table_to_metadata.c | 26 +++++++++++++++++++ .../commands/create_distributed_table.c | 3 +-- src/include/distributed/metadata_utility.h | 1 + .../regress/expected/citus_local_tables.out | 12 +++++++++ src/test/regress/sql/citus_local_tables.sql | 12 +++++++++ 5 files changed, 52 insertions(+), 2 deletions(-) diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 39b7d483f..d89c44630 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -49,6 +49,7 @@ static void citus_add_local_table_to_metadata_internal(Oid relationId, bool cascadeViaForeignKeys); static void ErrorIfUnsupportedCreateCitusLocalTable(Relation relation); static void ErrorIfUnsupportedCitusLocalTableKind(Oid relationId); +static void ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation); static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId); static uint64 ConvertLocalTableToShard(Oid relationId); static void RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId); @@ -338,6 +339,7 @@ ErrorIfUnsupportedCreateCitusLocalTable(Relation relation) ErrorIfCoordinatorNotAddedAsWorkerNode(); ErrorIfUnsupportedCitusLocalTableKind(relationId); EnsureTableNotDistributed(relationId); + ErrorIfUnsupportedCitusLocalColumnDefinition(relation); /* * When creating other citus table types, we don't need to check that case as @@ -405,6 +407,30 @@ ErrorIfUnsupportedCitusLocalTableKind(Oid relationId) } +/* + * ErrorIfUnsupportedCitusLocalColumnDefinition errors out if given relation + * has unsupported column definition for citus local table creation. + */ +static void +ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation) +{ + TupleDesc relationDesc = RelationGetDescr(relation); + if (RelationUsesIdentityColumns(relationDesc)) + { + /* + * pg_get_tableschemadef_string doesn't know how to deparse identity + * columns so we cannot reflect those columns when creating shell + * relation. For this reason, error out here. + */ + Oid relationId = relation->rd_id; + ereport(ERROR, (errmsg("cannot add %s to citus metadata since table " + "has identity column", + generate_qualified_relation_name(relationId)), + errhint("Drop the identity columns and re-try the command"))); + } +} + + /* * GetShellTableDDLEventsForCitusLocalTable returns a list of DDL commands * to create the shell table from scratch. diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index e852aa8e3..744f08196 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -122,7 +122,6 @@ static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableType static bool LocalTableEmpty(Oid tableId); static void CopyLocalDataIntoShards(Oid relationId); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); -static bool RelationUsesIdentityColumns(TupleDesc relationDesc); static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); @@ -1636,7 +1635,7 @@ TupleDescColumnNameList(TupleDesc tupleDescriptor) * RelationUsesIdentityColumns returns whether a given relation uses * GENERATED ... AS IDENTITY */ -static bool +bool RelationUsesIdentityColumns(TupleDesc relationDesc) { for (int attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++) diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 96db653d0..dfb5aeb0f 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -249,6 +249,7 @@ extern void EnsureTableNotDistributed(Oid relationId); extern void EnsureReplicationSettings(Oid relationId, char replicationModel); extern void EnsureRelationExists(Oid relationId); extern bool RegularTable(Oid relationId); +extern bool RelationUsesIdentityColumns(TupleDesc relationDesc); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern uint64 GetFirstShardId(Oid relationId); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/test/regress/expected/citus_local_tables.out b/src/test/regress/expected/citus_local_tables.out index b1d2a1f22..5baca6098 100644 --- a/src/test/regress/expected/citus_local_tables.out +++ b/src/test/regress/expected/citus_local_tables.out @@ -56,6 +56,18 @@ BEGIN; SELECT citus_add_local_table_to_metadata('temp_table'); ERROR: constraints on temporary tables may reference only temporary tables ROLLBACK; +-- below two errors out since we don't support adding local tables +-- having any identity columns to metadata +BEGIN; + CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column +ROLLBACK; +BEGIN; + CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column +ROLLBACK; -- creating citus local table having no data initially would work SELECT citus_add_local_table_to_metadata('citus_local_table_1'); citus_add_local_table_to_metadata diff --git a/src/test/regress/sql/citus_local_tables.sql b/src/test/regress/sql/citus_local_tables.sql index 1cfa75d27..148244e51 100644 --- a/src/test/regress/sql/citus_local_tables.sql +++ b/src/test/regress/sql/citus_local_tables.sql @@ -46,6 +46,18 @@ BEGIN; SELECT citus_add_local_table_to_metadata('temp_table'); ROLLBACK; +-- below two errors out since we don't support adding local tables +-- having any identity columns to metadata +BEGIN; + CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ROLLBACK; + +BEGIN; + CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ROLLBACK; + -- creating citus local table having no data initially would work SELECT citus_add_local_table_to_metadata('citus_local_table_1'); From c0f2817b70d9130f0338028f78777d1334b19717 Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Wed, 3 Feb 2021 19:33:54 +0300 Subject: [PATCH 03/10] Disallow using alter_table udfs with tables having any identity cols (#4635) pg_get_tableschemadef_string doesn't know how to deparse identity columns so we cannot reflect those columns when creating table from scratch. For this reason, we don't allow using alter_table udfs with tables having any identity cols. --- src/backend/distributed/commands/alter_table.c | 14 ++++++++++++++ .../expected/alter_table_set_access_method.out | 5 +++++ .../regress/sql/alter_table_set_access_method.sql | 4 ++++ 3 files changed, 23 insertions(+) diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index 655272fcf..1849fb5cd 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -909,6 +909,20 @@ CreateTableConversion(TableConversionParameters *params) ereport(ERROR, (errmsg("cannot complete operation " "because no such table exists"))); } + + TupleDesc relationDesc = RelationGetDescr(relation); + if (RelationUsesIdentityColumns(relationDesc)) + { + /* + * pg_get_tableschemadef_string doesn't know how to deparse identity + * columns so we cannot reflect those columns when creating table + * from scratch. For this reason, error out here. + */ + ereport(ERROR, (errmsg("cannot complete command because relation " + "%s has identity column", + generate_qualified_relation_name(con->relationId)), + errhint("Drop the identity columns and re-try the command"))); + } relation_close(relation, NoLock); con->distributionKey = BuildDistributionKeyFromColumnName(relation, con->distributionColumn); diff --git a/src/test/regress/expected/alter_table_set_access_method.out b/src/test/regress/expected/alter_table_set_access_method.out index 54484dff7..0c2366e1c 100644 --- a/src/test/regress/expected/alter_table_set_access_method.out +++ b/src/test/regress/expected/alter_table_set_access_method.out @@ -676,6 +676,11 @@ SELECT relname, relkind v_ref | v (6 rows) +CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42)); +-- errors out since we don't support alter_table.* udfs with tables having any identity columns +SELECT alter_table_set_access_method('identity_cols_test', 'columnar'); +ERROR: cannot complete command because relation alter_table_set_access_method.identity_cols_test has identity column +HINT: Drop the identity columns and re-try the command SET client_min_messages TO WARNING; DROP SCHEMA alter_table_set_access_method CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/alter_table_set_access_method.sql b/src/test/regress/sql/alter_table_set_access_method.sql index 17c7438f8..69a9dd8d4 100644 --- a/src/test/regress/sql/alter_table_set_access_method.sql +++ b/src/test/regress/sql/alter_table_set_access_method.sql @@ -209,6 +209,10 @@ SELECT relname, relkind ) ORDER BY relname ASC; +CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42)); +-- errors out since we don't support alter_table.* udfs with tables having any identity columns +SELECT alter_table_set_access_method('identity_cols_test', 'columnar'); + SET client_min_messages TO WARNING; DROP SCHEMA alter_table_set_access_method CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); From 24e60b44a1d0ea95859ad7379b7a6557862f173e Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Thu, 14 Jan 2021 12:34:45 +0300 Subject: [PATCH 04/10] Consider coordinator in intermediate result optimization It seems that we were not considering the case where coordinator was added to the cluster as a worker in the optimization of intermediate results. This could lead to errors when coordinator was added as a worker. --- .../distributed/planner/distributed_planner.c | 1 - .../planner/intermediate_result_pruning.c | 4 +- .../expected/coordinator_shouldhaveshards.out | 103 +++++++++- .../regress/expected/local_shard_copy.out | 8 +- .../mx_coordinator_shouldhaveshards.out | 185 ++++++++++++++++++ src/test/regress/multi_mx_schedule | 1 + .../sql/coordinator_shouldhaveshards.sql | 44 +++++ src/test/regress/sql/local_shard_copy.sql | 2 +- .../sql/mx_coordinator_shouldhaveshards.sql | 94 +++++++++ 9 files changed, 433 insertions(+), 9 deletions(-) create mode 100644 src/test/regress/expected/mx_coordinator_shouldhaveshards.out create mode 100644 src/test/regress/sql/mx_coordinator_shouldhaveshards.sql diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index a0c27fc66..7d94939be 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -158,7 +158,6 @@ distributed_planner(Query *parse, } int rteIdCounter = 1; - DistributedPlanningContext planContext = { .query = parse, .cursorOptions = cursorOptions, diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 1aa7fe7fd..6ec08aad0 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = ActiveReadableNonCoordinatorNodeCount(); + int workerNodeCount = list_length(ActiveReadableNodeList()); foreach(subPlanCell, usedSubPlanNodeList) { @@ -272,7 +272,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 319bc0e93..7f8e03834 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -778,6 +778,107 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi (1 row) SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text) +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text) +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +NOTICE: executing the copy locally for shard xxxxx +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts + count +--------------------------------------------------------------------- + 0 +(1 row) + +RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE @@ -788,7 +889,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 19 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 6a29ce673..9d856653c 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -3,10 +3,10 @@ SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); - master_add_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); + ?column? --------------------------------------------------------------------- - 32 + 1 (1 row) SET citus.shard_count TO 4; @@ -485,7 +485,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- shard creation should be done locally SELECT create_reference_table('ref_table'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer) ');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: copying the data has completed diff --git a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out new file mode 100644 index 000000000..5b5a87f05 --- /dev/null +++ b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out @@ -0,0 +1,185 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :master_port +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table mx_coordinator_shouldhaveshards.table_1 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130052 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130055 +drop cascades to table mx_coordinator_shouldhaveshards.table_2 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130056 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130059 +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 5a1f1f7e2..ac5206d4b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -26,6 +26,7 @@ test: multi_mx_hide_shard_names test: multi_mx_add_coordinator test: multi_mx_modifications_to_reference_tables test: multi_mx_partitioning +test: mx_coordinator_shouldhaveshards test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 5362db9c1..7501be265 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -323,6 +323,50 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +RESET client_min_messages; + + \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index ca29bff7f..0f2535c73 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -5,7 +5,7 @@ SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; diff --git a/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql new file mode 100644 index 000000000..377b6acbe --- /dev/null +++ b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql @@ -0,0 +1,94 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; + +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :master_port + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; + +SELECT master_remove_node('localhost', :master_port); From 9ba3f70420f05a1e4dc256381a6a3dcd16eca630 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 3 Feb 2021 12:16:58 +0300 Subject: [PATCH 05/10] Remove unused method --- .../distributed/operations/worker_node_manager.c | 14 -------------- src/include/distributed/worker_manager.h | 1 - 2 files changed, 15 deletions(-) diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 12d3bdf06..57b4871c8 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -318,20 +318,6 @@ ActivePrimaryNodeCount(void) } -/* - * ActiveReadableNonCoordinatorNodeCount returns the number of groups with a node we can read from. - * This method excludes coordinator even if it is added as a worker. - */ -uint32 -ActiveReadableNonCoordinatorNodeCount(void) -{ - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); - uint32 liveWorkerCount = list_length(workerNodeList); - - return liveWorkerCount; -} - - /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 0fe229f61..047c01225 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -80,7 +80,6 @@ extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); -extern uint32 ActiveReadableNonCoordinatorNodeCount(void); extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); From eb5be579e308579e2fcf59e8d401bf653528be08 Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 3 Feb 2021 15:16:46 +0300 Subject: [PATCH 06/10] Set previous cell inside a for loop --- .../distributed/planner/intermediate_result_pruning.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 6ec08aad0..071c8669d 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -383,10 +383,11 @@ RemoveLocalNodeFromWorkerList(List *workerNodeList) { return list_delete_cell_compat(workerNodeList, workerNodeCell, prev); } + #if PG_VERSION_NUM < PG_VERSION_13 + prev = workerNodeCell; + #endif } - #if PG_VERSION_NUM < PG_VERSION_13 - prev = workerNodeCell; - #endif + return workerNodeList; } From ff82e85ea2e166dcfa72843aa658624cf482988e Mon Sep 17 00:00:00 2001 From: Sait Talha Nisanci Date: Wed, 3 Feb 2021 19:45:21 +0300 Subject: [PATCH 07/10] Replace workerNodeCount -> nodeCount --- .../distributed/planner/distributed_planner.c | 1 + .../planner/intermediate_result_pruning.c | 12 ++++++------ 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 7d94939be..a0c27fc66 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -158,6 +158,7 @@ distributed_planner(Query *parse, } int rteIdCounter = 1; + DistributedPlanningContext planContext = { .query = parse, .cursorOptions = cursorOptions, diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 071c8669d..2a3b4e423 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -32,7 +32,7 @@ bool LogIntermediateResults = false; static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType); static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, - int workerNodeCount); + int nodeCount); static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry); static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry); static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); @@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = list_length(ActiveReadableNodeList()); + int nodeCount = list_length(ActiveReadableNodeList()); foreach(subPlanCell, usedSubPlanNodeList) { @@ -170,7 +170,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * will be written to a local file and sent to all nodes. Note that the * remaining subplans in the distributed plan should still be traversed. */ - if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) + if (list_length(entry->nodeIdList) == nodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); continue; @@ -190,7 +190,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * workers will be in the node list. We can improve intermediate result * pruning by deciding which reference table shard will be accessed earlier. */ - AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount); + AppendAllAccessedWorkerNodes(entry, distributedPlan, nodeCount); elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId); } @@ -231,7 +231,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, - int workerNodeCount) + int nodeCount) { List *taskList = distributedPlan->workerJob->taskList; ListCell *taskCell = NULL; @@ -254,7 +254,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, list_append_unique_int(entry->nodeIdList, placement->nodeId); /* early return if all the workers are accessed */ - if (list_length(entry->nodeIdList) == workerNodeCount && + if (list_length(entry->nodeIdList) == nodeCount && entry->writeLocalFile) { return; From 4043731c4133658ccd8a22ba471350738f083c5c Mon Sep 17 00:00:00 2001 From: Jeff Davis Date: Tue, 2 Feb 2021 11:02:08 -0800 Subject: [PATCH 08/10] Columnar: fix inheritance planning. --- src/backend/columnar/cstore_customscan.c | 2 +- .../expected/columnar_partitioning.out | 237 ++++++++++++++++++ .../expected/columnar_partitioning_1.out | 237 ++++++++++++++++++ .../regress/sql/columnar_partitioning.sql | 72 ++++++ 4 files changed, 547 insertions(+), 1 deletion(-) diff --git a/src/backend/columnar/cstore_customscan.c b/src/backend/columnar/cstore_customscan.c index 445ea68f2..7e913098c 100644 --- a/src/backend/columnar/cstore_customscan.c +++ b/src/backend/columnar/cstore_customscan.c @@ -160,7 +160,7 @@ ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti, PreviousSetRelPathlistHook(root, rel, rti, rte); } - if (!OidIsValid(rte->relid) || rte->rtekind != RTE_RELATION) + if (!OidIsValid(rte->relid) || rte->rtekind != RTE_RELATION || rte->inh) { /* some calls to the pathlist hook don't have a valid relation set. Do nothing */ return; diff --git a/src/test/regress/expected/columnar_partitioning.out b/src/test/regress/expected/columnar_partitioning.out index 4f54824f8..00b941c4a 100644 --- a/src/test/regress/expected/columnar_partitioning.out +++ b/src/test/regress/expected/columnar_partitioning.out @@ -124,3 +124,240 @@ SET parallel_tuple_cost TO DEFAULT; SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; +-- +-- Test inheritance +-- +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row i_row_1 + -> Seq Scan on ij_row_row i_row_2 + -> Custom Scan (ColumnarScan) on ij_row_col i_row_3 +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Custom Scan (ColumnarScan) on i_col i_col_1 + -> Seq Scan on ij_col_row i_col_2 + -> Custom Scan (ColumnarScan) on ij_col_col i_col_3 +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan = FALSE; +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row i_row_1 + -> Seq Scan on ij_row_row i_row_2 + -> Seq Scan on ij_row_col i_row_3 +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_col i_col_1 + -> Seq Scan on ij_col_row i_col_2 + -> Seq Scan on ij_col_col i_col_3 +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan TO DEFAULT; +DROP TABLE i_row CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_row_row +drop cascades to table ij_row_col +DROP TABLE i_col CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_col_row +drop cascades to table ij_col_col diff --git a/src/test/regress/expected/columnar_partitioning_1.out b/src/test/regress/expected/columnar_partitioning_1.out index f68cf23ef..3fa70e600 100644 --- a/src/test/regress/expected/columnar_partitioning_1.out +++ b/src/test/regress/expected/columnar_partitioning_1.out @@ -124,3 +124,240 @@ SET parallel_tuple_cost TO DEFAULT; SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; +-- +-- Test inheritance +-- +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row + -> Seq Scan on ij_row_row + -> Custom Scan (ColumnarScan) on ij_row_col +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Custom Scan (ColumnarScan) on i_col + -> Seq Scan on ij_col_row + -> Custom Scan (ColumnarScan) on ij_col_col +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan = FALSE; +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row + -> Seq Scan on ij_row_row + -> Seq Scan on ij_row_col +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_col + -> Seq Scan on ij_col_row + -> Seq Scan on ij_col_col +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan TO DEFAULT; +DROP TABLE i_row CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_row_row +drop cascades to table ij_row_col +DROP TABLE i_col CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_col_row +drop cascades to table ij_col_col diff --git a/src/test/regress/sql/columnar_partitioning.sql b/src/test/regress/sql/columnar_partitioning.sql index 98692c78a..ce1b8271d 100644 --- a/src/test/regress/sql/columnar_partitioning.sql +++ b/src/test/regress/sql/columnar_partitioning.sql @@ -53,3 +53,75 @@ SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; + +-- +-- Test inheritance +-- + +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); + +EXPLAIN (costs off) SELECT * FROM i_row; +SELECT * FROM i_row; + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; +SELECT * FROM ONLY i_row; + +EXPLAIN (costs off) SELECT * FROM i_col; +SELECT * FROM i_col; + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; +SELECT * FROM ONLY i_col; + +EXPLAIN (costs off) SELECT * FROM ij_row_row; +SELECT * FROM ij_row_row; + +EXPLAIN (costs off) SELECT * FROM ij_row_col; +SELECT * FROM ij_row_col; + +EXPLAIN (costs off) SELECT * FROM ij_col_row; +SELECT * FROM ij_col_row; + +EXPLAIN (costs off) SELECT * FROM ij_col_col; +SELECT * FROM ij_col_col; + +SET columnar.enable_custom_scan = FALSE; + +EXPLAIN (costs off) SELECT * FROM i_row; +SELECT * FROM i_row; + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; +SELECT * FROM ONLY i_row; + +EXPLAIN (costs off) SELECT * FROM i_col; +SELECT * FROM i_col; + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; +SELECT * FROM ONLY i_col; + +EXPLAIN (costs off) SELECT * FROM ij_row_row; +SELECT * FROM ij_row_row; + +EXPLAIN (costs off) SELECT * FROM ij_row_col; +SELECT * FROM ij_row_col; + +EXPLAIN (costs off) SELECT * FROM ij_col_row; +SELECT * FROM ij_col_row; + +EXPLAIN (costs off) SELECT * FROM ij_col_col; +SELECT * FROM ij_col_col; + +SET columnar.enable_custom_scan TO DEFAULT; + +DROP TABLE i_row CASCADE; +DROP TABLE i_col CASCADE; From 5fde61722968f45e79d070e0e80a60921de379ea Mon Sep 17 00:00:00 2001 From: Hadi Moshayedi Date: Wed, 3 Feb 2021 12:10:00 -0800 Subject: [PATCH 09/10] Columnar: disallow CREATE INDEX CONCURRENTLY --- src/backend/columnar/cstore_tableam.c | 56 +++++++++++++++++++++ src/test/regress/columnar_am_schedule | 1 + src/test/regress/expected/am_indexes.out | 62 ++++++++++++++++++++++++ src/test/regress/sql/am_indexes.sql | 29 +++++++++++ 4 files changed, 148 insertions(+) create mode 100644 src/test/regress/expected/am_indexes.out create mode 100644 src/test/regress/sql/am_indexes.sql diff --git a/src/backend/columnar/cstore_tableam.c b/src/backend/columnar/cstore_tableam.c index 1178b94b4..c845f519f 100644 --- a/src/backend/columnar/cstore_tableam.c +++ b/src/backend/columnar/cstore_tableam.c @@ -93,6 +93,7 @@ typedef struct ColumnarScanDescData typedef struct ColumnarScanDescData *ColumnarScanDesc; static object_access_hook_type PrevObjectAccessHook = NULL; +static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; /* forward declaration for static functions */ static void ColumnarTableDropHook(Oid tgid); @@ -100,6 +101,13 @@ static void ColumnarTriggerCreateHook(Oid tgid); static void ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg); +static void ColumnarProcessUtility(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag); static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout, int retryInterval); static void LogRelationStats(Relation rel, int elevel); @@ -1127,6 +1135,11 @@ columnar_tableam_init() PrevObjectAccessHook = object_access_hook; object_access_hook = ColumnarTableAMObjectAccessHook; + PrevProcessUtilityHook = ProcessUtility_hook ? + ProcessUtility_hook : + standard_ProcessUtility; + ProcessUtility_hook = ColumnarProcessUtility; + columnar_customscan_init(); TTSOpsColumnar = TTSOpsVirtual; @@ -1292,6 +1305,49 @@ ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid object } +/* + * Utility hook for columnar tables. + */ +static void +ColumnarProcessUtility(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag) +{ + Node *parsetree = pstmt->utilityStmt; + + if (IsA(parsetree, IndexStmt)) + { + IndexStmt *indexStmt = (IndexStmt *) parsetree; + + /* + * We should reject CREATE INDEX CONCURRENTLY before DefineIndex() is + * called. Erroring in callbacks called from DefineIndex() will create + * the index and mark it as INVALID, which will cause segfault during + * inserts. + */ + if (indexStmt->concurrent) + { + Relation rel = relation_openrv(indexStmt->relation, + ShareUpdateExclusiveLock); + if (rel->rd_tableam == GetColumnarTableAmRoutine()) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("indexes not supported for columnar tables"))); + } + + RelationClose(rel); + } + } + + PrevProcessUtilityHook(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +} + + /* * IsColumnarTableAmTable returns true if relation has columnar_tableam * access method. This can be called before extension creation. diff --git a/src/test/regress/columnar_am_schedule b/src/test/regress/columnar_am_schedule index a4afb3229..f1a2498e1 100644 --- a/src/test/regress/columnar_am_schedule +++ b/src/test/regress/columnar_am_schedule @@ -8,6 +8,7 @@ test: am_query test: am_analyze test: am_data_types test: am_drop +test: am_indexes test: columnar_fallback_scan test: columnar_partitioning test: am_empty diff --git a/src/test/regress/expected/am_indexes.out b/src/test/regress/expected/am_indexes.out new file mode 100644 index 000000000..bd1c41802 --- /dev/null +++ b/src/test/regress/expected/am_indexes.out @@ -0,0 +1,62 @@ +-- +-- Testing indexes on on columnar tables. +-- +CREATE SCHEMA columnar_indexes; +SET search_path tO columnar_indexes, public; +-- +-- create index with the concurrent option. We should +-- error out during index creation. +-- https://github.com/citusdata/citus/issues/4599 +-- +create table t(a int, b int) using columnar; +create index CONCURRENTLY t_idx on t(a, b); +ERROR: indexes not supported for columnar tables +\d t + Table "columnar_indexes.t" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | integer | | | + +explain insert into t values (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Insert on t (cost=0.00..0.01 rows=1 width=8) + -> Result (cost=0.00..0.01 rows=1 width=8) +(2 rows) + +insert into t values (1, 2); +SELECT * FROM t; + a | b +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +-- create index without the concurrent option. We should +-- error out during index creation. +create index t_idx on t(a, b); +ERROR: indexes not supported for columnar tables +\d t + Table "columnar_indexes.t" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | integer | | | + +explain insert into t values (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Insert on t (cost=0.00..0.01 rows=1 width=8) + -> Result (cost=0.00..0.01 rows=1 width=8) +(2 rows) + +insert into t values (3, 4); +SELECT * FROM t; + a | b +--------------------------------------------------------------------- + 1 | 2 + 3 | 4 +(2 rows) + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_indexes CASCADE; diff --git a/src/test/regress/sql/am_indexes.sql b/src/test/regress/sql/am_indexes.sql new file mode 100644 index 000000000..831699dc4 --- /dev/null +++ b/src/test/regress/sql/am_indexes.sql @@ -0,0 +1,29 @@ +-- +-- Testing indexes on on columnar tables. +-- + +CREATE SCHEMA columnar_indexes; +SET search_path tO columnar_indexes, public; + +-- +-- create index with the concurrent option. We should +-- error out during index creation. +-- https://github.com/citusdata/citus/issues/4599 +-- +create table t(a int, b int) using columnar; +create index CONCURRENTLY t_idx on t(a, b); +\d t +explain insert into t values (1, 2); +insert into t values (1, 2); +SELECT * FROM t; + +-- create index without the concurrent option. We should +-- error out during index creation. +create index t_idx on t(a, b); +\d t +explain insert into t values (1, 2); +insert into t values (3, 4); +SELECT * FROM t; + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_indexes CASCADE; From fc9a23792c3042ce79af598acec7651754b061d2 Mon Sep 17 00:00:00 2001 From: Onder Kalaci Date: Fri, 29 Jan 2021 08:36:42 +0100 Subject: [PATCH 10/10] COPY uses adaptive connection management on local node With #4338, the executor is smart enough to failover to local node if there is not enough space in max_connections for remote connections. For COPY, the logic is different. With #4034, we made COPY work with the adaptive connection management slightly differently. The cause of the difference is that COPY doesn't know which placements are going to be accessed hence requires to get connections up-front. Similarly, COPY decides to use local execution up-front. With this commit, we change the logic for COPY on local nodes: Try to reserve a connection to local host. This logic follows the same logic (e.g., citus.local_shared_pool_size) as the executor because COPY also relies on TryToIncrementSharedConnectionCounter(). If reservation to local node fails, switch to local execution Apart from this, if local execution is disabled, we follow the exact same logic for multi-node Citus. It means that if we are out of the connection, we'd give an error. --- src/backend/distributed/commands/multi_copy.c | 137 ++++++++++--- .../locally_reserved_shared_connections.c | 193 ++++++++++++------ .../distributed/metadata/node_metadata.c | 22 ++ .../operations/worker_node_manager.c | 12 ++ .../locally_reserved_shared_connections.h | 3 +- src/include/distributed/worker_manager.h | 2 + src/test/regress/expected/single_node.out | 15 ++ .../regress/input/multi_mx_copy_data.source | 19 ++ .../regress/output/multi_mx_copy_data.source | 78 +++++++ src/test/regress/sql/single_node.sql | 6 + 10 files changed, 397 insertions(+), 90 deletions(-) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 50ca597e3..4ee5ce7e8 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -79,6 +79,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_executor.h" +#include "distributed/listutils.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -214,6 +215,18 @@ typedef struct ShardConnections } ShardConnections; +/* + * Represents the state for allowing copy via local + * execution. + */ +typedef enum LocalCopyStatus +{ + LOCAL_COPY_REQUIRED, + LOCAL_COPY_OPTIONAL, + LOCAL_COPY_DISABLED +} LocalCopyStatus; + + /* Local functions forward declarations */ static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); @@ -323,7 +336,9 @@ static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uin processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(bool isIntermediateResult); +static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool + isIntermediateResult); +static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static void LogLocalCopyExecution(uint64 shardId); @@ -2076,28 +2091,29 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu /* - * ShouldExecuteCopyLocally returns true if the current copy - * operation should be done locally for local placements. + * GetLocalCopyStatus returns the status for executing copy locally. + * If LOCAL_COPY_DISABLED or LOCAL_COPY_REQUIRED, the caller has to + * follow that. Else, the caller may decide to use local or remote + * execution depending on other information. */ -static bool -ShouldExecuteCopyLocally(bool isIntermediateResult) +static LocalCopyStatus +GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult) { - if (!EnableLocalExecution) + if (!EnableLocalExecution || + GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { - return false; + return LOCAL_COPY_DISABLED; } - - /* - * Intermediate files are written to a file, and files are visible to all - * transactions, and we use a custom copy format for copy therefore we will - * use the existing logic for that. - */ - if (isIntermediateResult) + else if (isIntermediateResult) { - return false; + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + return LOCAL_COPY_DISABLED; } - - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) + else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2116,12 +2132,35 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) * those placements. That'd help to benefit more from parallelism. */ - return true; + return LOCAL_COPY_REQUIRED; + } + else if (IsMultiStatementTransaction()) + { + return LOCAL_COPY_REQUIRED; } - /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED && - IsMultiStatementTransaction(); + return LOCAL_COPY_OPTIONAL; +} + + +/* + * ShardIntervalListHasLocalPlacements returns true if any of the input + * shard placement has a local placement; + */ +static bool +ShardIntervalListHasLocalPlacements(List *shardIntervalList) +{ + int32 localGroupId = GetLocalGroupId(); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL) + { + return true; + } + } + + return false; } @@ -2136,8 +2175,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId); @@ -2291,13 +2328,53 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* - * For all the primary (e.g., writable) nodes, reserve a shared connection. - * We do this upfront because we cannot know which nodes are going to be - * accessed. Since the order of the reservation is important, we need to - * do it right here. For the details on why the order important, see - * the function. + * For all the primary (e.g., writable) remote nodes, reserve a shared + * connection. We do this upfront because we cannot know which nodes + * are going to be accessed. Since the order of the reservation is + * important, we need to do it right here. For the details on why the + * order important, see EnsureConnectionPossibilityForNodeList(). + * + * We don't need to care about local node because we either get a + * connection or use local connection, so it cannot be part of + * the starvation. As an edge case, if it cannot get a connection + * and cannot switch to local execution (e.g., disabled by user), + * COPY would fail hinting the user to change the relevant settiing. */ - EnsureConnectionPossibilityForPrimaryNodes(); + EnsureConnectionPossibilityForRemotePrimaryNodes(); + + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + LocalCopyStatus localCopyStatus = + GetLocalCopyStatus(shardIntervalList, isIntermediateResult); + if (localCopyStatus == LOCAL_COPY_DISABLED) + { + copyDest->shouldUseLocalCopy = false; + } + else if (localCopyStatus == LOCAL_COPY_REQUIRED) + { + copyDest->shouldUseLocalCopy = true; + } + else if (localCopyStatus == LOCAL_COPY_OPTIONAL) + { + /* + * At this point, there is no requirements for doing the copy locally. + * However, if there are local placements, we can try to reserve + * a connection to local node. If we cannot reserve, we can still use + * local execution. + * + * NB: It is not advantageous to use remote execution just with a + * single remote connection. In other words, a single remote connection + * would not perform better than local execution. However, we prefer to + * do this because it is likely that the COPY would get more connections + * to parallelize the operation. In the future, we might relax this + * requirement and failover to local execution as on connection attempt + * failures as the executor does. + */ + if (ShardIntervalListHasLocalPlacements(shardIntervalList)) + { + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); + copyDest->shouldUseLocalCopy = !reservedConnection; + } + } } @@ -3424,6 +3501,7 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + if (placement->groupId == GetLocalGroupId()) { /* @@ -3445,7 +3523,6 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } - CopyConnectionState *connectionState = GetConnectionState(connectionStateHash, connection); diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a4bd95f4c..19bc93ae6 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -89,12 +89,15 @@ typedef struct ReservedConnectionHashEntry static void StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName, - int nodePort, Oid - userId, Oid - databaseOid, - bool *found); +static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *hostName, + int nodePort, + Oid + userId, Oid + databaseOid, + bool *found); static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, + bool waitForConnection); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -294,11 +297,11 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, /* - * EnsureConnectionPossibilityForPrimaryNodes is a wrapper around + * EnsureConnectionPossibilityForRemotePrimaryNodes is a wrapper around * EnsureConnectionPossibilityForNodeList. */ void -EnsureConnectionPossibilityForPrimaryNodes(void) +EnsureConnectionPossibilityForRemotePrimaryNodes(void) { /* * By using NoLock there is a tiny risk of that we miss to reserve a @@ -306,17 +309,42 @@ EnsureConnectionPossibilityForPrimaryNodes(void) * seem to cause any problems as none of the placements that we are * going to access would be on the new node. */ - List *primaryNodeList = ActivePrimaryNodeList(NoLock); - + List *primaryNodeList = ActivePrimaryRemoteNodeList(NoLock); EnsureConnectionPossibilityForNodeList(primaryNodeList); } +/* + * TryConnectionPossibilityForLocalPrimaryNode returns true if the primary + * local node is in the metadata an we can reserve a connection for the node. + * If not, the function returns false. + */ +bool +TryConnectionPossibilityForLocalPrimaryNode(void) +{ + bool nodeIsInMetadata = false; + WorkerNode *localNode = + PrimaryNodeForGroup(GetLocalGroupId(), &nodeIsInMetadata); + + if (localNode == NULL) + { + /* + * If the local node is not a primary node, we should not try to + * reserve a connection as there cannot be any shards. + */ + return false; + } + + bool waitForConnection = false; + return EnsureConnectionPossibilityForNode(localNode, waitForConnection); +} + + /* * EnsureConnectionPossibilityForNodeList reserves a shared connection * counter per node in the nodeList unless: - * - Reservation is needed (see IsReservationPossible()) - * - there is at least one connection to the node so that we are guranteed + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guaranteed * to get a connection * - An earlier call already reserved a connection (e.g., we allow only a * single reservation per backend) @@ -324,11 +352,6 @@ EnsureConnectionPossibilityForPrimaryNodes(void) static void EnsureConnectionPossibilityForNodeList(List *nodeList) { - if (!IsReservationPossible()) - { - return; - } - /* * We sort the workerList because adaptive connection management * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions @@ -342,62 +365,114 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) */ nodeList = SortList(nodeList, CompareWorkerNodes); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + bool waitForConnection = true; + EnsureConnectionPossibilityForNode(workerNode, waitForConnection); + } +} + + +/* + * EnsureConnectionPossibilityForNode reserves a shared connection + * counter per node in the nodeList unless: + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guranteed + * to get a connection + * - An earlier call already reserved a connection (e.g., we allow only a + * single reservation per backend) + * - waitForConnection is false. When this is false, the function still tries + * to ensure connection possibility. If it fails (e.g., we + * reached max_shared_pool_size), it doesn't wait to get the connection. Instead, + * return false. + */ +static bool +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) +{ + if (!IsReservationPossible()) + { + return false; + } + char *databaseName = get_database_name(MyDatabaseId); Oid userId = GetUserId(); char *userName = GetUserNameFromId(userId, false); - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) + if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, + userName, databaseName) != NULL) { - if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, - userName, databaseName) != NULL) - { - /* - * The same user has already an active connection for the node. It - * means that the execution can use the same connection, so reservation - * is not necessary. - */ - continue; - } - /* - * We are trying to be defensive here by ensuring that the required hash - * table entry can be allocated. The main goal is that we don't want to be - * in a situation where shared connection counter is incremented but not - * the local reserved counter due to out-of-memory. - * - * Note that shared connection stats operate on the shared memory, and we - * pre-allocate all the necessary memory. In other words, it would never - * throw out of memory error. + * The same user has already an active connection for the node. It + * means that the execution can use the same connection, so reservation + * is not necessary. */ - bool found = false; - ReservedConnectionHashEntry *hashEntry = - AllocateOrGetReservedConectionEntry(workerNode->workerName, - workerNode->workerPort, - userId, MyDatabaseId, &found); + return true; + } - if (found) - { - /* - * We have already reserved a connection for this user and database - * on the worker. We only allow a single reservation per - * transaction block. The reason is that the earlier command (either in - * a transaction block or a function call triggered by a single command) - * was able to reserve or establish a connection. That connection is - * guranteed to be avaliable for us. - */ - continue; - } + /* + * We are trying to be defensive here by ensuring that the required hash + * table entry can be allocated. The main goal is that we don't want to be + * in a situation where shared connection counter is incremented but not + * the local reserved counter due to out-of-memory. + * + * Note that shared connection stats operate on the shared memory, and we + * pre-allocate all the necessary memory. In other words, it would never + * throw out of memory error. + */ + bool found = false; + ReservedConnectionHashEntry *hashEntry = + AllocateOrGetReservedConnectionEntry(workerNode->workerName, + workerNode->workerPort, + userId, MyDatabaseId, &found); + if (found) + { + /* + * We have already reserved a connection for this user and database + * on the worker. We only allow a single reservation per + * transaction block. The reason is that the earlier command (either in + * a transaction block or a function call triggered by a single command) + * was able to reserve or establish a connection. That connection is + * guranteed to be available for us. + */ + return true; + } + + if (waitForConnection) + { /* * Increment the shared counter, we may need to wait if there are * no space left. */ WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); - - /* locally mark that we have one connection reserved */ - hashEntry->usedReservation = false; } + else + { + bool incremented = + TryToIncrementSharedConnectionCounter(workerNode->workerName, + workerNode->workerPort); + if (!incremented) + { + /* + * We could not reserve a connection. First, remove the entry from the + * hash. The reason is that we allow single reservation per transaction + * block and leaving the entry in the hash would be qualified as there is a + * reserved connection to the node. + */ + bool foundForRemove = false; + hash_search(SessionLocalReservedConnections, hashEntry, HASH_REMOVE, + &foundForRemove); + Assert(foundForRemove); + + return false; + } + } + + /* locally mark that we have one connection reserved */ + hashEntry->usedReservation = false; + + return true; } @@ -442,8 +517,8 @@ IsReservationPossible(void) * the entry. */ static ReservedConnectionHashEntry * -AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId, - Oid databaseOid, bool *found) +AllocateOrGetReservedConnectionEntry(char *hostName, int nodePort, Oid userId, + Oid databaseOid, bool *found) { ReservedConnectionHashKey key; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 400fff505..c1f522ed1 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -105,6 +105,7 @@ static void SetUpDistributedTableDependencies(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); +static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); @@ -695,6 +696,17 @@ GroupForNode(char *nodeName, int nodePort) } +/* + * NodeIsPrimaryAndLocal returns whether the argument represents the local + * primary node. + */ +bool +NodeIsPrimaryAndRemote(WorkerNode *worker) +{ + return NodeIsPrimary(worker) && !NodeIsLocal(worker); +} + + /* * NodeIsPrimary returns whether the argument represents a primary node. */ @@ -713,6 +725,16 @@ NodeIsPrimary(WorkerNode *worker) } +/* + * NodeIsLocal returns whether the argument represents the local node. + */ +static bool +NodeIsLocal(WorkerNode *worker) +{ + return worker->groupId == GetLocalGroupId(); +} + + /* * NodeIsSecondary returns whether the argument represents a secondary node. */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 57b4871c8..7fbc53e32 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -392,6 +392,18 @@ ActivePrimaryNodeList(LOCKMODE lockMode) } +/* + * ActivePrimaryRemoteNodeList returns a list of all active primary nodes in + * workerNodeHash. + */ +List * +ActivePrimaryRemoteNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote); +} + + /* * NodeIsPrimaryWorker returns true if the node is a primary worker node. */ diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h index a282beac0..adec8c9c4 100644 --- a/src/include/distributed/locally_reserved_shared_connections.h +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -20,7 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, Oid databaseOid); extern void DeallocateReservedConnections(void); -extern void EnsureConnectionPossibilityForPrimaryNodes(void); +extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); +extern bool TryConnectionPossibilityForLocalPrimaryNode(void); extern bool IsReservationPossible(void); #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 047c01225..158c5a7ce 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -74,6 +74,7 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); +extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode); extern bool CoordinatorAddedAsWorkerNode(void); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); @@ -90,6 +91,7 @@ extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); +extern bool NodeIsPrimaryAndRemote(WorkerNode *worker); extern bool NodeIsPrimary(WorkerNode *worker); extern bool NodeIsSecondary(WorkerNode *worker); extern bool NodeIsReadable(WorkerNode *worker); diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 16f3c37fd..bdaafa689 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1591,6 +1591,16 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 7 (1 row) +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 1: "1" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 2: "2" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 3: "3" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 6: "6" -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -1613,6 +1623,11 @@ HINT: Enable local execution via SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. HINT: Enable local execution via SET citus.enable_local_execution TO true; +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +ERROR: could not find an available connection +HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish +CONTEXT: COPY another_schema_table, line 1: "1" -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; diff --git a/src/test/regress/input/multi_mx_copy_data.source b/src/test/regress/input/multi_mx_copy_data.source index f65bc42af..22ce69e6c 100644 --- a/src/test/regress/input/multi_mx_copy_data.source +++ b/src/test/regress/input/multi_mx_copy_data.source @@ -22,6 +22,25 @@ SET search_path TO public; \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; + +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/output/multi_mx_copy_data.source b/src/test/regress/output/multi_mx_copy_data.source index 521ebca99..53a36f7dc 100644 --- a/src/test/regress/output/multi_mx_copy_data.source +++ b/src/test/regress/output/multi_mx_copy_data.source @@ -16,6 +16,84 @@ SET search_path TO public; -- and use second worker as well \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + -1 +(1 row) + +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..." +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request" +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir" +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos" +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully " +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..." +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol" +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..." +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + 50 +(1 row) + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index a837258fe..4d7343668 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -817,6 +817,9 @@ ROLLBACK; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -829,6 +832,9 @@ WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;