diff --git a/src/backend/columnar/cstore_customscan.c b/src/backend/columnar/cstore_customscan.c index 445ea68f2..7e913098c 100644 --- a/src/backend/columnar/cstore_customscan.c +++ b/src/backend/columnar/cstore_customscan.c @@ -160,7 +160,7 @@ ColumnarSetRelPathlistHook(PlannerInfo *root, RelOptInfo *rel, Index rti, PreviousSetRelPathlistHook(root, rel, rti, rte); } - if (!OidIsValid(rte->relid) || rte->rtekind != RTE_RELATION) + if (!OidIsValid(rte->relid) || rte->rtekind != RTE_RELATION || rte->inh) { /* some calls to the pathlist hook don't have a valid relation set. Do nothing */ return; diff --git a/src/backend/columnar/cstore_tableam.c b/src/backend/columnar/cstore_tableam.c index 1178b94b4..c845f519f 100644 --- a/src/backend/columnar/cstore_tableam.c +++ b/src/backend/columnar/cstore_tableam.c @@ -93,6 +93,7 @@ typedef struct ColumnarScanDescData typedef struct ColumnarScanDescData *ColumnarScanDesc; static object_access_hook_type PrevObjectAccessHook = NULL; +static ProcessUtility_hook_type PrevProcessUtilityHook = NULL; /* forward declaration for static functions */ static void ColumnarTableDropHook(Oid tgid); @@ -100,6 +101,13 @@ static void ColumnarTriggerCreateHook(Oid tgid); static void ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid objectId, int subId, void *arg); +static void ColumnarProcessUtility(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag); static bool ConditionalLockRelationWithTimeout(Relation rel, LOCKMODE lockMode, int timeout, int retryInterval); static void LogRelationStats(Relation rel, int elevel); @@ -1127,6 +1135,11 @@ columnar_tableam_init() PrevObjectAccessHook = object_access_hook; object_access_hook = ColumnarTableAMObjectAccessHook; + PrevProcessUtilityHook = ProcessUtility_hook ? + ProcessUtility_hook : + standard_ProcessUtility; + ProcessUtility_hook = ColumnarProcessUtility; + columnar_customscan_init(); TTSOpsColumnar = TTSOpsVirtual; @@ -1292,6 +1305,49 @@ ColumnarTableAMObjectAccessHook(ObjectAccessType access, Oid classId, Oid object } +/* + * Utility hook for columnar tables. + */ +static void +ColumnarProcessUtility(PlannedStmt *pstmt, + const char *queryString, + ProcessUtilityContext context, + ParamListInfo params, + struct QueryEnvironment *queryEnv, + DestReceiver *dest, + QueryCompletionCompat *completionTag) +{ + Node *parsetree = pstmt->utilityStmt; + + if (IsA(parsetree, IndexStmt)) + { + IndexStmt *indexStmt = (IndexStmt *) parsetree; + + /* + * We should reject CREATE INDEX CONCURRENTLY before DefineIndex() is + * called. Erroring in callbacks called from DefineIndex() will create + * the index and mark it as INVALID, which will cause segfault during + * inserts. + */ + if (indexStmt->concurrent) + { + Relation rel = relation_openrv(indexStmt->relation, + ShareUpdateExclusiveLock); + if (rel->rd_tableam == GetColumnarTableAmRoutine()) + { + ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("indexes not supported for columnar tables"))); + } + + RelationClose(rel); + } + } + + PrevProcessUtilityHook(pstmt, queryString, context, + params, queryEnv, dest, completionTag); +} + + /* * IsColumnarTableAmTable returns true if relation has columnar_tableam * access method. This can be called before extension creation. diff --git a/src/backend/distributed/commands/alter_table.c b/src/backend/distributed/commands/alter_table.c index e9739cae9..1849fb5cd 100644 --- a/src/backend/distributed/commands/alter_table.c +++ b/src/backend/distributed/commands/alter_table.c @@ -185,6 +185,8 @@ static void CreateCitusTableLike(TableConversionState *con); static List * GetViewCreationCommandsOfTable(Oid relationId); static void ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, bool suppressNoticeMessages); +static bool HasAnyGeneratedStoredColumns(Oid relationId); +static List * GetNonGeneratedStoredColumnNameList(Oid relationId); static void CheckAlterDistributedTableConversionParameters(TableConversionState *con); static char * CreateWorkerChangeSequenceDependencyCommand(char *sequenceSchemaName, char *sequenceName, @@ -907,6 +909,20 @@ CreateTableConversion(TableConversionParameters *params) ereport(ERROR, (errmsg("cannot complete operation " "because no such table exists"))); } + + TupleDesc relationDesc = RelationGetDescr(relation); + if (RelationUsesIdentityColumns(relationDesc)) + { + /* + * pg_get_tableschemadef_string doesn't know how to deparse identity + * columns so we cannot reflect those columns when creating table + * from scratch. For this reason, error out here. + */ + ereport(ERROR, (errmsg("cannot complete command because relation " + "%s has identity column", + generate_qualified_relation_name(con->relationId)), + errhint("Drop the identity columns and re-try the command"))); + } relation_close(relation, NoLock); con->distributionKey = BuildDistributionKeyFromColumnName(relation, con->distributionColumn); @@ -1122,9 +1138,33 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, quote_qualified_identifier(schemaName, sourceName)))); } - appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", - quote_qualified_identifier(schemaName, targetName), - quote_qualified_identifier(schemaName, sourceName)); + if (!HasAnyGeneratedStoredColumns(sourceId)) + { + /* + * Relation has no GENERATED STORED columns, copy the table via plain + * "INSERT INTO .. SELECT *"". + */ + appendStringInfo(query, "INSERT INTO %s SELECT * FROM %s", + quote_qualified_identifier(schemaName, targetName), + quote_qualified_identifier(schemaName, sourceName)); + } + else + { + /* + * Skip columns having GENERATED ALWAYS AS (...) STORED expressions + * since Postgres doesn't allow inserting into such columns. + * This is not bad since Postgres would already generate such columns. + * Note that here we intentionally don't skip columns having DEFAULT + * expressions since user might have inserted non-default values. + */ + List *nonStoredColumnNameList = GetNonGeneratedStoredColumnNameList(sourceId); + char *insertColumnString = StringJoin(nonStoredColumnNameList, ','); + appendStringInfo(query, "INSERT INTO %s (%s) SELECT %s FROM %s", + quote_qualified_identifier(schemaName, targetName), + insertColumnString, insertColumnString, + quote_qualified_identifier(schemaName, sourceName)); + } + ExecuteQueryViaSPI(query->data, SPI_OK_INSERT); } @@ -1183,6 +1223,55 @@ ReplaceTable(Oid sourceId, Oid targetId, List *justBeforeDropCommands, } +/* + * HasAnyGeneratedStoredColumns decides if relation has any columns that we + * might need to copy the data of when replacing table. + */ +static bool +HasAnyGeneratedStoredColumns(Oid relationId) +{ + return list_length(GetNonGeneratedStoredColumnNameList(relationId)) > 0; +} + + +/* + * GetNonGeneratedStoredColumnNameList returns a list of column names for + * columns not having GENERATED ALWAYS AS (...) STORED expressions. + */ +static List * +GetNonGeneratedStoredColumnNameList(Oid relationId) +{ + List *nonStoredColumnNameList = NIL; + + Relation relation = relation_open(relationId, AccessShareLock); + TupleDesc tupleDescriptor = RelationGetDescr(relation); + for (int columnIndex = 0; columnIndex < tupleDescriptor->natts; columnIndex++) + { + Form_pg_attribute currentColumn = TupleDescAttr(tupleDescriptor, columnIndex); + if (currentColumn->attisdropped) + { + /* skip dropped columns */ + continue; + } + +#if PG_VERSION_NUM >= 120000 + if (currentColumn->attgenerated == ATTRIBUTE_GENERATED_STORED) + { + continue; + } +#endif + + const char *quotedColumnName = quote_identifier(NameStr(currentColumn->attname)); + nonStoredColumnNameList = lappend(nonStoredColumnNameList, + pstrdup(quotedColumnName)); + } + + relation_close(relation, NoLock); + + return nonStoredColumnNameList; +} + + /* * CheckAlterDistributedTableConversionParameters errors for the cases where * alter_distributed_table UDF wouldn't work. diff --git a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c index 39b7d483f..d89c44630 100644 --- a/src/backend/distributed/commands/citus_add_local_table_to_metadata.c +++ b/src/backend/distributed/commands/citus_add_local_table_to_metadata.c @@ -49,6 +49,7 @@ static void citus_add_local_table_to_metadata_internal(Oid relationId, bool cascadeViaForeignKeys); static void ErrorIfUnsupportedCreateCitusLocalTable(Relation relation); static void ErrorIfUnsupportedCitusLocalTableKind(Oid relationId); +static void ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation); static List * GetShellTableDDLEventsForCitusLocalTable(Oid relationId); static uint64 ConvertLocalTableToShard(Oid relationId); static void RenameRelationToShardRelation(Oid shellRelationId, uint64 shardId); @@ -338,6 +339,7 @@ ErrorIfUnsupportedCreateCitusLocalTable(Relation relation) ErrorIfCoordinatorNotAddedAsWorkerNode(); ErrorIfUnsupportedCitusLocalTableKind(relationId); EnsureTableNotDistributed(relationId); + ErrorIfUnsupportedCitusLocalColumnDefinition(relation); /* * When creating other citus table types, we don't need to check that case as @@ -405,6 +407,30 @@ ErrorIfUnsupportedCitusLocalTableKind(Oid relationId) } +/* + * ErrorIfUnsupportedCitusLocalColumnDefinition errors out if given relation + * has unsupported column definition for citus local table creation. + */ +static void +ErrorIfUnsupportedCitusLocalColumnDefinition(Relation relation) +{ + TupleDesc relationDesc = RelationGetDescr(relation); + if (RelationUsesIdentityColumns(relationDesc)) + { + /* + * pg_get_tableschemadef_string doesn't know how to deparse identity + * columns so we cannot reflect those columns when creating shell + * relation. For this reason, error out here. + */ + Oid relationId = relation->rd_id; + ereport(ERROR, (errmsg("cannot add %s to citus metadata since table " + "has identity column", + generate_qualified_relation_name(relationId)), + errhint("Drop the identity columns and re-try the command"))); + } +} + + /* * GetShellTableDDLEventsForCitusLocalTable returns a list of DDL commands * to create the shell table from scratch. diff --git a/src/backend/distributed/commands/create_distributed_table.c b/src/backend/distributed/commands/create_distributed_table.c index e852aa8e3..744f08196 100644 --- a/src/backend/distributed/commands/create_distributed_table.c +++ b/src/backend/distributed/commands/create_distributed_table.c @@ -122,7 +122,6 @@ static void DropFKeysRelationInvolvedWithTableType(Oid relationId, int tableType static bool LocalTableEmpty(Oid tableId); static void CopyLocalDataIntoShards(Oid relationId); static List * TupleDescColumnNameList(TupleDesc tupleDescriptor); -static bool RelationUsesIdentityColumns(TupleDesc relationDesc); static bool DistributionColumnUsesGeneratedStoredColumn(TupleDesc relationDesc, Var *distributionColumn); static bool CanUseExclusiveConnections(Oid relationId, bool localTableEmpty); @@ -1636,7 +1635,7 @@ TupleDescColumnNameList(TupleDesc tupleDescriptor) * RelationUsesIdentityColumns returns whether a given relation uses * GENERATED ... AS IDENTITY */ -static bool +bool RelationUsesIdentityColumns(TupleDesc relationDesc) { for (int attributeIndex = 0; attributeIndex < relationDesc->natts; attributeIndex++) diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index 50ca597e3..4ee5ce7e8 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -79,6 +79,7 @@ #include "distributed/multi_physical_planner.h" #include "distributed/multi_router_planner.h" #include "distributed/multi_executor.h" +#include "distributed/listutils.h" #include "distributed/locally_reserved_shared_connections.h" #include "distributed/placement_connection.h" #include "distributed/relation_access_tracking.h" @@ -214,6 +215,18 @@ typedef struct ShardConnections } ShardConnections; +/* + * Represents the state for allowing copy via local + * execution. + */ +typedef enum LocalCopyStatus +{ + LOCAL_COPY_REQUIRED, + LOCAL_COPY_OPTIONAL, + LOCAL_COPY_DISABLED +} LocalCopyStatus; + + /* Local functions forward declarations */ static void CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionTag); @@ -323,7 +336,9 @@ static void CompleteCopyQueryTagCompat(QueryCompletionCompat *completionTag, uin processedRowCount); static void FinishLocalCopy(CitusCopyDestReceiver *copyDest); static void CloneCopyOutStateForLocalCopy(CopyOutState from, CopyOutState to); -static bool ShouldExecuteCopyLocally(bool isIntermediateResult); +static LocalCopyStatus GetLocalCopyStatus(List *shardIntervalList, bool + isIntermediateResult); +static bool ShardIntervalListHasLocalPlacements(List *shardIntervalList); static void LogLocalCopyExecution(uint64 shardId); @@ -2076,28 +2091,29 @@ CreateCitusCopyDestReceiver(Oid tableId, List *columnNameList, int partitionColu /* - * ShouldExecuteCopyLocally returns true if the current copy - * operation should be done locally for local placements. + * GetLocalCopyStatus returns the status for executing copy locally. + * If LOCAL_COPY_DISABLED or LOCAL_COPY_REQUIRED, the caller has to + * follow that. Else, the caller may decide to use local or remote + * execution depending on other information. */ -static bool -ShouldExecuteCopyLocally(bool isIntermediateResult) +static LocalCopyStatus +GetLocalCopyStatus(List *shardIntervalList, bool isIntermediateResult) { - if (!EnableLocalExecution) + if (!EnableLocalExecution || + GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_DISABLED) { - return false; + return LOCAL_COPY_DISABLED; } - - /* - * Intermediate files are written to a file, and files are visible to all - * transactions, and we use a custom copy format for copy therefore we will - * use the existing logic for that. - */ - if (isIntermediateResult) + else if (isIntermediateResult) { - return false; + /* + * Intermediate files are written to a file, and files are visible to all + * transactions, and we use a custom copy format for copy therefore we will + * use the existing logic for that. + */ + return LOCAL_COPY_DISABLED; } - - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) + else if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED) { /* * For various reasons, including the transaction visibility @@ -2116,12 +2132,35 @@ ShouldExecuteCopyLocally(bool isIntermediateResult) * those placements. That'd help to benefit more from parallelism. */ - return true; + return LOCAL_COPY_REQUIRED; + } + else if (IsMultiStatementTransaction()) + { + return LOCAL_COPY_REQUIRED; } - /* if we connected to the localhost via a connection, we might not be able to see some previous changes that are done via the connection */ - return GetCurrentLocalExecutionStatus() != LOCAL_EXECUTION_DISABLED && - IsMultiStatementTransaction(); + return LOCAL_COPY_OPTIONAL; +} + + +/* + * ShardIntervalListHasLocalPlacements returns true if any of the input + * shard placement has a local placement; + */ +static bool +ShardIntervalListHasLocalPlacements(List *shardIntervalList) +{ + int32 localGroupId = GetLocalGroupId(); + ShardInterval *shardInterval = NULL; + foreach_ptr(shardInterval, shardIntervalList) + { + if (FindShardPlacementOnGroup(localGroupId, shardInterval->shardId) != NULL) + { + return true; + } + } + + return false; } @@ -2136,8 +2175,6 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, { CitusCopyDestReceiver *copyDest = (CitusCopyDestReceiver *) dest; - bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; - copyDest->shouldUseLocalCopy = ShouldExecuteCopyLocally(isIntermediateResult); Oid tableId = copyDest->distributedRelationId; char *relationName = get_rel_name(tableId); @@ -2291,13 +2328,53 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, RecordRelationAccessIfNonDistTable(tableId, PLACEMENT_ACCESS_DML); /* - * For all the primary (e.g., writable) nodes, reserve a shared connection. - * We do this upfront because we cannot know which nodes are going to be - * accessed. Since the order of the reservation is important, we need to - * do it right here. For the details on why the order important, see - * the function. + * For all the primary (e.g., writable) remote nodes, reserve a shared + * connection. We do this upfront because we cannot know which nodes + * are going to be accessed. Since the order of the reservation is + * important, we need to do it right here. For the details on why the + * order important, see EnsureConnectionPossibilityForNodeList(). + * + * We don't need to care about local node because we either get a + * connection or use local connection, so it cannot be part of + * the starvation. As an edge case, if it cannot get a connection + * and cannot switch to local execution (e.g., disabled by user), + * COPY would fail hinting the user to change the relevant settiing. */ - EnsureConnectionPossibilityForPrimaryNodes(); + EnsureConnectionPossibilityForRemotePrimaryNodes(); + + bool isIntermediateResult = copyDest->intermediateResultIdPrefix != NULL; + LocalCopyStatus localCopyStatus = + GetLocalCopyStatus(shardIntervalList, isIntermediateResult); + if (localCopyStatus == LOCAL_COPY_DISABLED) + { + copyDest->shouldUseLocalCopy = false; + } + else if (localCopyStatus == LOCAL_COPY_REQUIRED) + { + copyDest->shouldUseLocalCopy = true; + } + else if (localCopyStatus == LOCAL_COPY_OPTIONAL) + { + /* + * At this point, there is no requirements for doing the copy locally. + * However, if there are local placements, we can try to reserve + * a connection to local node. If we cannot reserve, we can still use + * local execution. + * + * NB: It is not advantageous to use remote execution just with a + * single remote connection. In other words, a single remote connection + * would not perform better than local execution. However, we prefer to + * do this because it is likely that the COPY would get more connections + * to parallelize the operation. In the future, we might relax this + * requirement and failover to local execution as on connection attempt + * failures as the executor does. + */ + if (ShardIntervalListHasLocalPlacements(shardIntervalList)) + { + bool reservedConnection = TryConnectionPossibilityForLocalPrimaryNode(); + copyDest->shouldUseLocalCopy = !reservedConnection; + } + } } @@ -3424,6 +3501,7 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } + if (placement->groupId == GetLocalGroupId()) { /* @@ -3445,7 +3523,6 @@ InitializeCopyShardState(CopyShardState *shardState, continue; } - CopyConnectionState *connectionState = GetConnectionState(connectionStateHash, connection); diff --git a/src/backend/distributed/connection/locally_reserved_shared_connections.c b/src/backend/distributed/connection/locally_reserved_shared_connections.c index a4bd95f4c..19bc93ae6 100644 --- a/src/backend/distributed/connection/locally_reserved_shared_connections.c +++ b/src/backend/distributed/connection/locally_reserved_shared_connections.c @@ -89,12 +89,15 @@ typedef struct ReservedConnectionHashEntry static void StoreAllReservedConnections(Tuplestorestate *tupleStore, TupleDesc tupleDescriptor); -static ReservedConnectionHashEntry * AllocateOrGetReservedConectionEntry(char *hostName, - int nodePort, Oid - userId, Oid - databaseOid, - bool *found); +static ReservedConnectionHashEntry * AllocateOrGetReservedConnectionEntry(char *hostName, + int nodePort, + Oid + userId, Oid + databaseOid, + bool *found); static void EnsureConnectionPossibilityForNodeList(List *nodeList); +static bool EnsureConnectionPossibilityForNode(WorkerNode *workerNode, + bool waitForConnection); static uint32 LocalConnectionReserveHashHash(const void *key, Size keysize); static int LocalConnectionReserveHashCompare(const void *a, const void *b, Size keysize); @@ -294,11 +297,11 @@ MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, /* - * EnsureConnectionPossibilityForPrimaryNodes is a wrapper around + * EnsureConnectionPossibilityForRemotePrimaryNodes is a wrapper around * EnsureConnectionPossibilityForNodeList. */ void -EnsureConnectionPossibilityForPrimaryNodes(void) +EnsureConnectionPossibilityForRemotePrimaryNodes(void) { /* * By using NoLock there is a tiny risk of that we miss to reserve a @@ -306,17 +309,42 @@ EnsureConnectionPossibilityForPrimaryNodes(void) * seem to cause any problems as none of the placements that we are * going to access would be on the new node. */ - List *primaryNodeList = ActivePrimaryNodeList(NoLock); - + List *primaryNodeList = ActivePrimaryRemoteNodeList(NoLock); EnsureConnectionPossibilityForNodeList(primaryNodeList); } +/* + * TryConnectionPossibilityForLocalPrimaryNode returns true if the primary + * local node is in the metadata an we can reserve a connection for the node. + * If not, the function returns false. + */ +bool +TryConnectionPossibilityForLocalPrimaryNode(void) +{ + bool nodeIsInMetadata = false; + WorkerNode *localNode = + PrimaryNodeForGroup(GetLocalGroupId(), &nodeIsInMetadata); + + if (localNode == NULL) + { + /* + * If the local node is not a primary node, we should not try to + * reserve a connection as there cannot be any shards. + */ + return false; + } + + bool waitForConnection = false; + return EnsureConnectionPossibilityForNode(localNode, waitForConnection); +} + + /* * EnsureConnectionPossibilityForNodeList reserves a shared connection * counter per node in the nodeList unless: - * - Reservation is needed (see IsReservationPossible()) - * - there is at least one connection to the node so that we are guranteed + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guaranteed * to get a connection * - An earlier call already reserved a connection (e.g., we allow only a * single reservation per backend) @@ -324,11 +352,6 @@ EnsureConnectionPossibilityForPrimaryNodes(void) static void EnsureConnectionPossibilityForNodeList(List *nodeList) { - if (!IsReservationPossible()) - { - return; - } - /* * We sort the workerList because adaptive connection management * (e.g., OPTIONAL_CONNECTION) requires any concurrent executions @@ -342,62 +365,114 @@ EnsureConnectionPossibilityForNodeList(List *nodeList) */ nodeList = SortList(nodeList, CompareWorkerNodes); + WorkerNode *workerNode = NULL; + foreach_ptr(workerNode, nodeList) + { + bool waitForConnection = true; + EnsureConnectionPossibilityForNode(workerNode, waitForConnection); + } +} + + +/* + * EnsureConnectionPossibilityForNode reserves a shared connection + * counter per node in the nodeList unless: + * - Reservation is possible/allowed (see IsReservationPossible()) + * - there is at least one connection to the node so that we are guranteed + * to get a connection + * - An earlier call already reserved a connection (e.g., we allow only a + * single reservation per backend) + * - waitForConnection is false. When this is false, the function still tries + * to ensure connection possibility. If it fails (e.g., we + * reached max_shared_pool_size), it doesn't wait to get the connection. Instead, + * return false. + */ +static bool +EnsureConnectionPossibilityForNode(WorkerNode *workerNode, bool waitForConnection) +{ + if (!IsReservationPossible()) + { + return false; + } + char *databaseName = get_database_name(MyDatabaseId); Oid userId = GetUserId(); char *userName = GetUserNameFromId(userId, false); - WorkerNode *workerNode = NULL; - foreach_ptr(workerNode, nodeList) + if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, + userName, databaseName) != NULL) { - if (ConnectionAvailableToNode(workerNode->workerName, workerNode->workerPort, - userName, databaseName) != NULL) - { - /* - * The same user has already an active connection for the node. It - * means that the execution can use the same connection, so reservation - * is not necessary. - */ - continue; - } - /* - * We are trying to be defensive here by ensuring that the required hash - * table entry can be allocated. The main goal is that we don't want to be - * in a situation where shared connection counter is incremented but not - * the local reserved counter due to out-of-memory. - * - * Note that shared connection stats operate on the shared memory, and we - * pre-allocate all the necessary memory. In other words, it would never - * throw out of memory error. + * The same user has already an active connection for the node. It + * means that the execution can use the same connection, so reservation + * is not necessary. */ - bool found = false; - ReservedConnectionHashEntry *hashEntry = - AllocateOrGetReservedConectionEntry(workerNode->workerName, - workerNode->workerPort, - userId, MyDatabaseId, &found); + return true; + } - if (found) - { - /* - * We have already reserved a connection for this user and database - * on the worker. We only allow a single reservation per - * transaction block. The reason is that the earlier command (either in - * a transaction block or a function call triggered by a single command) - * was able to reserve or establish a connection. That connection is - * guranteed to be avaliable for us. - */ - continue; - } + /* + * We are trying to be defensive here by ensuring that the required hash + * table entry can be allocated. The main goal is that we don't want to be + * in a situation where shared connection counter is incremented but not + * the local reserved counter due to out-of-memory. + * + * Note that shared connection stats operate on the shared memory, and we + * pre-allocate all the necessary memory. In other words, it would never + * throw out of memory error. + */ + bool found = false; + ReservedConnectionHashEntry *hashEntry = + AllocateOrGetReservedConnectionEntry(workerNode->workerName, + workerNode->workerPort, + userId, MyDatabaseId, &found); + if (found) + { + /* + * We have already reserved a connection for this user and database + * on the worker. We only allow a single reservation per + * transaction block. The reason is that the earlier command (either in + * a transaction block or a function call triggered by a single command) + * was able to reserve or establish a connection. That connection is + * guranteed to be available for us. + */ + return true; + } + + if (waitForConnection) + { /* * Increment the shared counter, we may need to wait if there are * no space left. */ WaitLoopForSharedConnection(workerNode->workerName, workerNode->workerPort); - - /* locally mark that we have one connection reserved */ - hashEntry->usedReservation = false; } + else + { + bool incremented = + TryToIncrementSharedConnectionCounter(workerNode->workerName, + workerNode->workerPort); + if (!incremented) + { + /* + * We could not reserve a connection. First, remove the entry from the + * hash. The reason is that we allow single reservation per transaction + * block and leaving the entry in the hash would be qualified as there is a + * reserved connection to the node. + */ + bool foundForRemove = false; + hash_search(SessionLocalReservedConnections, hashEntry, HASH_REMOVE, + &foundForRemove); + Assert(foundForRemove); + + return false; + } + } + + /* locally mark that we have one connection reserved */ + hashEntry->usedReservation = false; + + return true; } @@ -442,8 +517,8 @@ IsReservationPossible(void) * the entry. */ static ReservedConnectionHashEntry * -AllocateOrGetReservedConectionEntry(char *hostName, int nodePort, Oid userId, - Oid databaseOid, bool *found) +AllocateOrGetReservedConnectionEntry(char *hostName, int nodePort, Oid userId, + Oid databaseOid, bool *found) { ReservedConnectionHashKey key; diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index 400fff505..c1f522ed1 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -105,6 +105,7 @@ static void SetUpDistributedTableDependencies(WorkerNode *workerNode); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static void PropagateNodeWideObjects(WorkerNode *newWorkerNode); static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); +static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static bool UnsetMetadataSyncedForAll(void); @@ -695,6 +696,17 @@ GroupForNode(char *nodeName, int nodePort) } +/* + * NodeIsPrimaryAndLocal returns whether the argument represents the local + * primary node. + */ +bool +NodeIsPrimaryAndRemote(WorkerNode *worker) +{ + return NodeIsPrimary(worker) && !NodeIsLocal(worker); +} + + /* * NodeIsPrimary returns whether the argument represents a primary node. */ @@ -713,6 +725,16 @@ NodeIsPrimary(WorkerNode *worker) } +/* + * NodeIsLocal returns whether the argument represents the local node. + */ +static bool +NodeIsLocal(WorkerNode *worker) +{ + return worker->groupId == GetLocalGroupId(); +} + + /* * NodeIsSecondary returns whether the argument represents a secondary node. */ diff --git a/src/backend/distributed/operations/worker_node_manager.c b/src/backend/distributed/operations/worker_node_manager.c index 12d3bdf06..7fbc53e32 100644 --- a/src/backend/distributed/operations/worker_node_manager.c +++ b/src/backend/distributed/operations/worker_node_manager.c @@ -318,20 +318,6 @@ ActivePrimaryNodeCount(void) } -/* - * ActiveReadableNonCoordinatorNodeCount returns the number of groups with a node we can read from. - * This method excludes coordinator even if it is added as a worker. - */ -uint32 -ActiveReadableNonCoordinatorNodeCount(void) -{ - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); - uint32 liveWorkerCount = list_length(workerNodeList); - - return liveWorkerCount; -} - - /* * NodeIsCoordinator returns true if the given node represents the coordinator. */ @@ -406,6 +392,18 @@ ActivePrimaryNodeList(LOCKMODE lockMode) } +/* + * ActivePrimaryRemoteNodeList returns a list of all active primary nodes in + * workerNodeHash. + */ +List * +ActivePrimaryRemoteNodeList(LOCKMODE lockMode) +{ + EnsureModificationsCanRun(); + return FilterActiveNodeListFunc(lockMode, NodeIsPrimaryAndRemote); +} + + /* * NodeIsPrimaryWorker returns true if the node is a primary worker node. */ diff --git a/src/backend/distributed/planner/intermediate_result_pruning.c b/src/backend/distributed/planner/intermediate_result_pruning.c index 1aa7fe7fd..2a3b4e423 100644 --- a/src/backend/distributed/planner/intermediate_result_pruning.c +++ b/src/backend/distributed/planner/intermediate_result_pruning.c @@ -32,7 +32,7 @@ bool LogIntermediateResults = false; static List * FindSubPlansUsedInNode(Node *node, SubPlanAccessType accessType); static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, - int workerNodeCount); + int nodeCount); static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry); static List * FindAllRemoteWorkerNodesUsingSubplan(IntermediateResultsHashEntry *entry); static List * RemoveLocalNodeFromWorkerList(List *workerNodeList); @@ -154,7 +154,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, List *usedSubPlanNodeList = distributedPlan->usedSubPlanNodeList; List *subPlanList = distributedPlan->subPlanList; ListCell *subPlanCell = NULL; - int workerNodeCount = ActiveReadableNonCoordinatorNodeCount(); + int nodeCount = list_length(ActiveReadableNodeList()); foreach(subPlanCell, usedSubPlanNodeList) { @@ -170,7 +170,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * will be written to a local file and sent to all nodes. Note that the * remaining subplans in the distributed plan should still be traversed. */ - if (list_length(entry->nodeIdList) == workerNodeCount && entry->writeLocalFile) + if (list_length(entry->nodeIdList) == nodeCount && entry->writeLocalFile) { elog(DEBUG4, "Subplan %s is used in all workers", resultId); continue; @@ -190,7 +190,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, * workers will be in the node list. We can improve intermediate result * pruning by deciding which reference table shard will be accessed earlier. */ - AppendAllAccessedWorkerNodes(entry, distributedPlan, workerNodeCount); + AppendAllAccessedWorkerNodes(entry, distributedPlan, nodeCount); elog(DEBUG4, "Subplan %s is used in %lu", resultId, distributedPlan->planId); } @@ -231,7 +231,7 @@ RecordSubplanExecutionsOnNodes(HTAB *intermediateResultsHash, static void AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, DistributedPlan *distributedPlan, - int workerNodeCount) + int nodeCount) { List *taskList = distributedPlan->workerJob->taskList; ListCell *taskCell = NULL; @@ -254,7 +254,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, list_append_unique_int(entry->nodeIdList, placement->nodeId); /* early return if all the workers are accessed */ - if (list_length(entry->nodeIdList) == workerNodeCount && + if (list_length(entry->nodeIdList) == nodeCount && entry->writeLocalFile) { return; @@ -272,7 +272,7 @@ AppendAllAccessedWorkerNodes(IntermediateResultsHashEntry *entry, static void AppendAllWorkerNodes(IntermediateResultsHashEntry *entry) { - List *workerNodeList = ActiveReadableNonCoordinatorNodeList(); + List *workerNodeList = ActiveReadableNodeList(); WorkerNode *workerNode = NULL; foreach_ptr(workerNode, workerNodeList) @@ -383,10 +383,11 @@ RemoveLocalNodeFromWorkerList(List *workerNodeList) { return list_delete_cell_compat(workerNodeList, workerNodeCell, prev); } + #if PG_VERSION_NUM < PG_VERSION_13 + prev = workerNodeCell; + #endif } - #if PG_VERSION_NUM < PG_VERSION_13 - prev = workerNodeCell; - #endif + return workerNodeList; } diff --git a/src/include/distributed/locally_reserved_shared_connections.h b/src/include/distributed/locally_reserved_shared_connections.h index a282beac0..adec8c9c4 100644 --- a/src/include/distributed/locally_reserved_shared_connections.h +++ b/src/include/distributed/locally_reserved_shared_connections.h @@ -20,7 +20,8 @@ extern bool CanUseReservedConnection(const char *hostName, int nodePort, extern void MarkReservedConnectionUsed(const char *hostName, int nodePort, Oid userId, Oid databaseOid); extern void DeallocateReservedConnections(void); -extern void EnsureConnectionPossibilityForPrimaryNodes(void); +extern void EnsureConnectionPossibilityForRemotePrimaryNodes(void); +extern bool TryConnectionPossibilityForLocalPrimaryNode(void); extern bool IsReservationPossible(void); #endif /* LOCALLY_RESERVED_SHARED_CONNECTIONS_H_ */ diff --git a/src/include/distributed/metadata_utility.h b/src/include/distributed/metadata_utility.h index 96db653d0..dfb5aeb0f 100644 --- a/src/include/distributed/metadata_utility.h +++ b/src/include/distributed/metadata_utility.h @@ -249,6 +249,7 @@ extern void EnsureTableNotDistributed(Oid relationId); extern void EnsureReplicationSettings(Oid relationId, char replicationModel); extern void EnsureRelationExists(Oid relationId); extern bool RegularTable(Oid relationId); +extern bool RelationUsesIdentityColumns(TupleDesc relationDesc); extern char * ConstructQualifiedShardName(ShardInterval *shardInterval); extern uint64 GetFirstShardId(Oid relationId); extern Datum StringToDatum(char *inputString, Oid dataType); diff --git a/src/include/distributed/worker_manager.h b/src/include/distributed/worker_manager.h index 0fe229f61..158c5a7ce 100644 --- a/src/include/distributed/worker_manager.h +++ b/src/include/distributed/worker_manager.h @@ -74,13 +74,13 @@ extern uint32 ActivePrimaryNonCoordinatorNodeCount(void); extern uint32 ActivePrimaryNodeCount(void); extern List * ActivePrimaryNonCoordinatorNodeList(LOCKMODE lockMode); extern List * ActivePrimaryNodeList(LOCKMODE lockMode); +extern List * ActivePrimaryRemoteNodeList(LOCKMODE lockMode); extern bool CoordinatorAddedAsWorkerNode(void); extern List * ReferenceTablePlacementNodeList(LOCKMODE lockMode); extern WorkerNode * CoordinatorNodeIfAddedAsWorkerOrError(void); extern void ErrorIfCoordinatorNotAddedAsWorkerNode(void); extern List * DistributedTablePlacementNodeList(LOCKMODE lockMode); extern bool NodeCanHaveDistTablePlacements(WorkerNode *node); -extern uint32 ActiveReadableNonCoordinatorNodeCount(void); extern List * ActiveReadableNonCoordinatorNodeList(void); extern List * ActiveReadableNodeList(void); extern WorkerNode * FindWorkerNode(const char *nodeName, int32 nodePort); @@ -91,6 +91,7 @@ extern void EnsureCoordinator(void); extern void InsertCoordinatorIfClusterEmpty(void); extern uint32 GroupForNode(char *nodeName, int32 nodePort); extern WorkerNode * PrimaryNodeForGroup(int32 groupId, bool *groupContainsNodes); +extern bool NodeIsPrimaryAndRemote(WorkerNode *worker); extern bool NodeIsPrimary(WorkerNode *worker); extern bool NodeIsSecondary(WorkerNode *worker); extern bool NodeIsReadable(WorkerNode *worker); diff --git a/src/test/regress/columnar_am_schedule b/src/test/regress/columnar_am_schedule index a4afb3229..f1a2498e1 100644 --- a/src/test/regress/columnar_am_schedule +++ b/src/test/regress/columnar_am_schedule @@ -8,6 +8,7 @@ test: am_query test: am_analyze test: am_data_types test: am_drop +test: am_indexes test: columnar_fallback_scan test: columnar_partitioning test: am_empty diff --git a/src/test/regress/expected/alter_table_set_access_method.out b/src/test/regress/expected/alter_table_set_access_method.out index 54484dff7..0c2366e1c 100644 --- a/src/test/regress/expected/alter_table_set_access_method.out +++ b/src/test/regress/expected/alter_table_set_access_method.out @@ -676,6 +676,11 @@ SELECT relname, relkind v_ref | v (6 rows) +CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42)); +-- errors out since we don't support alter_table.* udfs with tables having any identity columns +SELECT alter_table_set_access_method('identity_cols_test', 'columnar'); +ERROR: cannot complete command because relation alter_table_set_access_method.identity_cols_test has identity column +HINT: Drop the identity columns and re-try the command SET client_min_messages TO WARNING; DROP SCHEMA alter_table_set_access_method CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/expected/am_indexes.out b/src/test/regress/expected/am_indexes.out new file mode 100644 index 000000000..bd1c41802 --- /dev/null +++ b/src/test/regress/expected/am_indexes.out @@ -0,0 +1,62 @@ +-- +-- Testing indexes on on columnar tables. +-- +CREATE SCHEMA columnar_indexes; +SET search_path tO columnar_indexes, public; +-- +-- create index with the concurrent option. We should +-- error out during index creation. +-- https://github.com/citusdata/citus/issues/4599 +-- +create table t(a int, b int) using columnar; +create index CONCURRENTLY t_idx on t(a, b); +ERROR: indexes not supported for columnar tables +\d t + Table "columnar_indexes.t" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | integer | | | + +explain insert into t values (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Insert on t (cost=0.00..0.01 rows=1 width=8) + -> Result (cost=0.00..0.01 rows=1 width=8) +(2 rows) + +insert into t values (1, 2); +SELECT * FROM t; + a | b +--------------------------------------------------------------------- + 1 | 2 +(1 row) + +-- create index without the concurrent option. We should +-- error out during index creation. +create index t_idx on t(a, b); +ERROR: indexes not supported for columnar tables +\d t + Table "columnar_indexes.t" + Column | Type | Collation | Nullable | Default +--------------------------------------------------------------------- + a | integer | | | + b | integer | | | + +explain insert into t values (1, 2); + QUERY PLAN +--------------------------------------------------------------------- + Insert on t (cost=0.00..0.01 rows=1 width=8) + -> Result (cost=0.00..0.01 rows=1 width=8) +(2 rows) + +insert into t values (3, 4); +SELECT * FROM t; + a | b +--------------------------------------------------------------------- + 1 | 2 + 3 | 4 +(2 rows) + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_indexes CASCADE; diff --git a/src/test/regress/expected/citus_local_tables.out b/src/test/regress/expected/citus_local_tables.out index b1d2a1f22..5baca6098 100644 --- a/src/test/regress/expected/citus_local_tables.out +++ b/src/test/regress/expected/citus_local_tables.out @@ -56,6 +56,18 @@ BEGIN; SELECT citus_add_local_table_to_metadata('temp_table'); ERROR: constraints on temporary tables may reference only temporary tables ROLLBACK; +-- below two errors out since we don't support adding local tables +-- having any identity columns to metadata +BEGIN; + CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column +ROLLBACK; +BEGIN; + CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ERROR: cannot add citus_local_tables_test_schema.identity_cols_test to citus metadata since table has identity column +ROLLBACK; -- creating citus local table having no data initially would work SELECT citus_add_local_table_to_metadata('citus_local_table_1'); citus_add_local_table_to_metadata diff --git a/src/test/regress/expected/columnar_partitioning.out b/src/test/regress/expected/columnar_partitioning.out index 4f54824f8..00b941c4a 100644 --- a/src/test/regress/expected/columnar_partitioning.out +++ b/src/test/regress/expected/columnar_partitioning.out @@ -124,3 +124,240 @@ SET parallel_tuple_cost TO DEFAULT; SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; +-- +-- Test inheritance +-- +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row i_row_1 + -> Seq Scan on ij_row_row i_row_2 + -> Custom Scan (ColumnarScan) on ij_row_col i_row_3 +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Custom Scan (ColumnarScan) on i_col i_col_1 + -> Seq Scan on ij_col_row i_col_2 + -> Custom Scan (ColumnarScan) on ij_col_col i_col_3 +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan = FALSE; +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row i_row_1 + -> Seq Scan on ij_row_row i_row_2 + -> Seq Scan on ij_row_col i_row_3 +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_col i_col_1 + -> Seq Scan on ij_col_row i_col_2 + -> Seq Scan on ij_col_col i_col_3 +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan TO DEFAULT; +DROP TABLE i_row CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_row_row +drop cascades to table ij_row_col +DROP TABLE i_col CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_col_row +drop cascades to table ij_col_col diff --git a/src/test/regress/expected/columnar_partitioning_1.out b/src/test/regress/expected/columnar_partitioning_1.out index f68cf23ef..3fa70e600 100644 --- a/src/test/regress/expected/columnar_partitioning_1.out +++ b/src/test/regress/expected/columnar_partitioning_1.out @@ -124,3 +124,240 @@ SET parallel_tuple_cost TO DEFAULT; SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; +-- +-- Test inheritance +-- +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row + -> Seq Scan on ij_row_row + -> Custom Scan (ColumnarScan) on ij_row_col +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Custom Scan (ColumnarScan) on i_col + -> Seq Scan on ij_col_row + -> Custom Scan (ColumnarScan) on ij_col_col +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Custom Scan (ColumnarScan) on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan = FALSE; +EXPLAIN (costs off) SELECT * FROM i_row; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_row + -> Seq Scan on ij_row_row + -> Seq Scan on ij_row_col +(4 rows) + +SELECT * FROM i_row; + i +--------------------------------------------------------------------- + 100 + 300 + 400 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_row +(1 row) + +SELECT * FROM ONLY i_row; + i +--------------------------------------------------------------------- + 100 +(1 row) + +EXPLAIN (costs off) SELECT * FROM i_col; + QUERY PLAN +--------------------------------------------------------------------- + Append + -> Seq Scan on i_col + -> Seq Scan on ij_col_row + -> Seq Scan on ij_col_col +(4 rows) + +SELECT * FROM i_col; + i +--------------------------------------------------------------------- + 200 + 500 + 600 +(3 rows) + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on i_col +(1 row) + +SELECT * FROM ONLY i_col; + i +--------------------------------------------------------------------- + 200 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_row +(1 row) + +SELECT * FROM ij_row_row; + i | j +--------------------------------------------------------------------- + 300 | 1000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_row_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_row_col +(1 row) + +SELECT * FROM ij_row_col; + i | j +--------------------------------------------------------------------- + 400 | 2000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_row; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_row +(1 row) + +SELECT * FROM ij_col_row; + i | j +--------------------------------------------------------------------- + 500 | 3000 +(1 row) + +EXPLAIN (costs off) SELECT * FROM ij_col_col; + QUERY PLAN +--------------------------------------------------------------------- + Seq Scan on ij_col_col +(1 row) + +SELECT * FROM ij_col_col; + i | j +--------------------------------------------------------------------- + 600 | 4000 +(1 row) + +SET columnar.enable_custom_scan TO DEFAULT; +DROP TABLE i_row CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_row_row +drop cascades to table ij_row_col +DROP TABLE i_col CASCADE; +NOTICE: drop cascades to 2 other objects +DETAIL: drop cascades to table ij_col_row +drop cascades to table ij_col_col diff --git a/src/test/regress/expected/coordinator_shouldhaveshards.out b/src/test/regress/expected/coordinator_shouldhaveshards.out index 319bc0e93..7f8e03834 100644 --- a/src/test/regress/expected/coordinator_shouldhaveshards.out +++ b/src/test/regress/expected/coordinator_shouldhaveshards.out @@ -778,6 +778,107 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi (1 row) SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503102 AS citus_table_alias (key, value) VALUES (1,'1'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_1_1503105 AS citus_table_alias (key, value) VALUES (2,'2'::text) +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503106 AS citus_table_alias (key, value) VALUES (1,'1'::text), (5,'5'::text) +NOTICE: executing the command locally: INSERT INTO coordinator_shouldhaveshards.table_2_1503109 AS citus_table_alias (key, value) VALUES (2,'2'::text) +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT key, value FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true ORDER BY key, value DESC LIMIT '1'::bigint +DEBUG: Subplan XXX_2 will be written to local file +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503106 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT count(*) AS count, worker_column_1 AS key, max(worker_column_2) AS worker_column_3 FROM (SELECT a.key AS worker_column_1, table_2.value AS worker_column_2 FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN coordinator_shouldhaveshards.table_2_1503109 table_2(key, value) USING (key))) worker_subquery GROUP BY worker_column_1 +NOTICE: executing the command locally: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +NOTICE: executing the copy locally for shard xxxxx +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE true +NOTICE: executing the command locally: SELECT count(key) AS m FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE true +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503102 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT worker_column_1 AS key, (count(*))::text AS value FROM (SELECT table_1.key AS worker_column_1 FROM coordinator_shouldhaveshards.table_1_1503105 table_1 WHERE (table_1.key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats))) worker_subquery GROUP BY worker_column_1 HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT '1'::bigint +NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts + count +--------------------------------------------------------------------- + 0 +(1 row) + +RESET client_min_messages; \set VERBOSITY terse DROP TABLE ref_table; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_table_xxxxx CASCADE @@ -788,7 +889,7 @@ DROP TABLE ref; NOTICE: executing the command locally: DROP TABLE IF EXISTS coordinator_shouldhaveshards.ref_xxxxx CASCADE DROP TABLE test_append_table; DROP SCHEMA coordinator_shouldhaveshards CASCADE; -NOTICE: drop cascades to 13 other objects +NOTICE: drop cascades to 19 other objects SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); ?column? --------------------------------------------------------------------- diff --git a/src/test/regress/expected/local_shard_copy.out b/src/test/regress/expected/local_shard_copy.out index 6a29ce673..9d856653c 100644 --- a/src/test/regress/expected/local_shard_copy.out +++ b/src/test/regress/expected/local_shard_copy.out @@ -3,10 +3,10 @@ SET search_path TO local_shard_copy; SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); - master_add_node +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); + ?column? --------------------------------------------------------------------- - 32 + 1 (1 row) SET citus.shard_count TO 4; @@ -485,7 +485,7 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM local_shar -- shard creation should be done locally SELECT create_reference_table('ref_table'); -NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer)');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') +NOTICE: executing the command locally: SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'CREATE TABLE local_shard_copy.ref_table (a integer) ');SELECT worker_apply_shard_ddl_command (1330000, 'local_shard_copy', 'ALTER TABLE local_shard_copy.ref_table OWNER TO postgres') NOTICE: executing the copy locally for shard xxxxx NOTICE: Copying data from local table... NOTICE: copying the data has completed diff --git a/src/test/regress/expected/mx_coordinator_shouldhaveshards.out b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out new file mode 100644 index 000000000..5b5a87f05 --- /dev/null +++ b/src/test/regress/expected/mx_coordinator_shouldhaveshards.out @@ -0,0 +1,185 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +RESET client_min_messages; +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx + count | key +--------------------------------------------------------------------- + 1 | 1 +(1 row) + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); +DEBUG: Group by list without distribution column is not allowed in distributed INSERT ... SELECT queries +DEBUG: generating subplan XXX_1 for CTE a: SELECT key, value FROM mx_coordinator_shouldhaveshards.table_1 ORDER BY key, value DESC LIMIT 1 +DEBUG: push down of limit count: 1 +DEBUG: generating subplan XXX_2 for subquery SELECT count(*) AS count, a.key FROM ((SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a JOIN mx_coordinator_shouldhaveshards.table_2 USING (key)) GROUP BY a.key HAVING (max(table_2.value) OPERATOR(pg_catalog.>=) (SELECT a_1.value FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) a_1)) +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT int4(count) AS key, (key)::text AS value FROM (SELECT intermediate_result.count, intermediate_result.key FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(count bigint, key integer)) citus_insert_select_subquery +DEBUG: Collecting INSERT ... SELECT results on coordinator +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; +DEBUG: generating subplan XXX_1 for CTE stats: SELECT count(key) AS m FROM mx_coordinator_shouldhaveshards.table_1 +DEBUG: generating subplan XXX_2 for CTE inserts: INSERT INTO mx_coordinator_shouldhaveshards.table_2 (key, value) SELECT key, count(*) AS count FROM mx_coordinator_shouldhaveshards.table_1 WHERE (key OPERATOR(pg_catalog.>=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) GROUP BY key HAVING (count(*) OPERATOR(pg_catalog.<=) (SELECT stats.m FROM (SELECT intermediate_result.m FROM read_intermediate_result('XXX_1'::text, 'binary'::citus_copy_format) intermediate_result(m bigint)) stats)) LIMIT 1 RETURNING table_2.key, table_2.value +DEBUG: LIMIT clauses are not allowed in distributed INSERT ... SELECT queries +DEBUG: push down of limit count: 1 +DEBUG: Plan XXX query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (SELECT intermediate_result.key, intermediate_result.value FROM read_intermediate_result('XXX_2'::text, 'binary'::citus_copy_format) intermediate_result(key integer, value text)) inserts +DEBUG: Subplan XXX_1 will be written to local file +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_1 will be sent to localhost:xxxxx +DEBUG: Subplan XXX_2 will be written to local file +DEBUG: Collecting INSERT ... SELECT results on coordinator + count +--------------------------------------------------------------------- + 0 +(1 row) + +\c - - - :master_port +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; +NOTICE: drop cascades to 6 other objects +DETAIL: drop cascades to table mx_coordinator_shouldhaveshards.table_1 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130052 +drop cascades to table mx_coordinator_shouldhaveshards.table_1_1130055 +drop cascades to table mx_coordinator_shouldhaveshards.table_2 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130056 +drop cascades to table mx_coordinator_shouldhaveshards.table_2_1130059 +SELECT master_remove_node('localhost', :master_port); + master_remove_node +--------------------------------------------------------------------- + +(1 row) + diff --git a/src/test/regress/expected/pg12.out b/src/test/regress/expected/pg12.out index 6c7033b7b..cd7674552 100644 --- a/src/test/regress/expected/pg12.out +++ b/src/test/regress/expected/pg12.out @@ -449,6 +449,199 @@ BEGIN; generated_stored_col_test_60040 | y | s (2 rows) +ROLLBACK; +CREATE TABLE generated_stored_dist ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); +SELECT create_distributed_table ('generated_stored_dist', 'col_1'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 +(2 rows) + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT alter_distributed_table('generated_stored_dist', shard_count := 5, cascade_to_colocated := false); +NOTICE: creating a new table for test_pg12.generated_stored_dist +NOTICE: Moving the data of test_pg12.generated_stored_dist +NOTICE: Dropping the old test_pg12.generated_stored_dist +NOTICE: Renaming the new table to test_pg12.generated_stored_dist + alter_distributed_table +--------------------------------------------------------------------- + +(1 row) + +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 + 2 | text_2 | TEXT_2 +(4 rows) + +CREATE TABLE generated_stored_local ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); +SELECT citus_add_local_table_to_metadata('generated_stored_local'); + citus_add_local_table_to_metadata +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 +(2 rows) + +SELECT create_distributed_table ('generated_stored_local', 'col_1'); +NOTICE: Copying data from local table... +NOTICE: copying the data has completed +DETAIL: The local data in the table is no longer visible, but is still on disk. +HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$test_pg12.generated_stored_local$$) + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + col_1 | col\'_2 | col_3 +--------------------------------------------------------------------- + 1 | text_1 | TEXT_1 + 1 | text_1 | TEXT_1 + 2 | text_2 | TEXT_2 + 2 | text_2 | TEXT_2 +(4 rows) + +create table generated_stored_columnar(i int) partition by range(i); +create table generated_stored_columnar_p0 partition of generated_stored_columnar for values from (0) to (10); +create table generated_stored_columnar_p1 partition of generated_stored_columnar for values from (10) to (20); +SELECT alter_table_set_access_method('generated_stored_columnar_p0', 'columnar'); +NOTICE: creating a new table for test_pg12.generated_stored_columnar_p0 +NOTICE: Moving the data of test_pg12.generated_stored_columnar_p0 +NOTICE: Dropping the old test_pg12.generated_stored_columnar_p0 +NOTICE: Renaming the new table to test_pg12.generated_stored_columnar_p0 + alter_table_set_access_method +--------------------------------------------------------------------- + +(1 row) + +CREATE TABLE generated_stored_ref ( + col_1 int, + col_2 int, + col_3 int generated always as (col_1+col_2) stored, + col_4 int, + col_5 int generated always as (col_4*2-col_1) stored +); +SELECT create_reference_table ('generated_stored_ref'); + create_reference_table +--------------------------------------------------------------------- + +(1 row) + +INSERT INTO generated_stored_ref (col_1, col_4) VALUES (1,2), (11,12); +INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (100,101,102), (200,201,202); +SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + col_1 | col_2 | col_3 | col_4 | col_5 +--------------------------------------------------------------------- + 1 | | | 2 | 3 + 11 | | | 12 | 13 + 100 | 101 | 201 | 102 | 104 + 200 | 201 | 401 | 202 | 204 +(4 rows) + +BEGIN; + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO generated_stored_ref (col_1, col_4) VALUES (11,12), (21,22); + INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (200,201,202), (300,301,302); + SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + col_1 | col_2 | col_3 | col_4 | col_5 +--------------------------------------------------------------------- + 1 | | | 2 | 3 + 11 | | | 12 | 13 + 11 | | | 12 | 13 + 21 | | | 22 | 23 + 100 | 101 | 201 | 102 | 104 + 200 | 201 | 401 | 202 | 204 + 200 | 201 | 401 | 202 | 204 + 300 | 301 | 601 | 302 | 304 +(8 rows) + +ROLLBACK; +BEGIN; + -- drop some of the columns not having "generated always as stored" expressions + -- this would drop generated columns too + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + INSERT INTO generated_stored_ref VALUES (5); + SELECT * FROM generated_stored_REF ORDER BY 1; + col_2 +--------------------------------------------------------------------- + 5 + 101 + 201 + + +(5 rows) + +ROLLBACK; +BEGIN; + -- now drop all columns + ALTER TABLE generated_stored_ref DROP COLUMN col_3; + ALTER TABLE generated_stored_ref DROP COLUMN col_5; + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_2; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); +NOTICE: creating a new table for test_pg12.generated_stored_ref +NOTICE: Moving the data of test_pg12.generated_stored_ref +NOTICE: Dropping the old test_pg12.generated_stored_ref +NOTICE: Renaming the new table to test_pg12.generated_stored_ref + undistribute_table +--------------------------------------------------------------------- + +(1 row) + + SELECT * FROM generated_stored_ref; +-- +(4 rows) + ROLLBACK; RESET citus.replicate_reference_tables_on_activate; SELECT citus_remove_node('localhost', :master_port); @@ -459,6 +652,6 @@ SELECT citus_remove_node('localhost', :master_port); \set VERBOSITY terse drop schema test_pg12 cascade; -NOTICE: drop cascades to 10 other objects +NOTICE: drop cascades to 15 other objects \set VERBOSITY default SET citus.shard_replication_factor to 2; diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index 16f3c37fd..bdaafa689 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -1591,6 +1591,16 @@ NOTICE: executing the command locally: SELECT count(*) AS count FROM (SELECT in 7 (1 row) +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 1: "1" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 2: "2" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 3: "3" +NOTICE: executing the copy locally for shard xxxxx +CONTEXT: COPY another_schema_table, line 6: "6" -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -1613,6 +1623,11 @@ HINT: Enable local execution via SET citus.enable_local_execution TO true; INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); ERROR: could not establish any connections to the node localhost:xxxxx when local execution is also disabled. HINT: Enable local execution via SET citus.enable_local_execution TO true; +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; +ERROR: could not find an available connection +HINT: Set citus.max_shared_pool_size TO -1 to let COPY command finish +CONTEXT: COPY another_schema_table, line 1: "1" -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor; diff --git a/src/test/regress/input/multi_mx_copy_data.source b/src/test/regress/input/multi_mx_copy_data.source index f65bc42af..22ce69e6c 100644 --- a/src/test/regress/input/multi_mx_copy_data.source +++ b/src/test/regress/input/multi_mx_copy_data.source @@ -22,6 +22,25 @@ SET search_path TO public; \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; + +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' + +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); +SELECT pg_sleep(0.1); +show citus.local_shared_pool_size; + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index 5a1f1f7e2..ac5206d4b 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -26,6 +26,7 @@ test: multi_mx_hide_shard_names test: multi_mx_add_coordinator test: multi_mx_modifications_to_reference_tables test: multi_mx_partitioning +test: mx_coordinator_shouldhaveshards test: multi_mx_copy_data multi_mx_router_planner test: multi_mx_schema_support multi_mx_tpch_query1 multi_mx_tpch_query10 test: multi_mx_tpch_query12 multi_mx_tpch_query14 multi_mx_tpch_query19 diff --git a/src/test/regress/output/multi_mx_copy_data.source b/src/test/regress/output/multi_mx_copy_data.source index 521ebca99..53a36f7dc 100644 --- a/src/test/regress/output/multi_mx_copy_data.source +++ b/src/test/regress/output/multi_mx_copy_data.source @@ -16,6 +16,84 @@ SET search_path TO public; -- and use second worker as well \COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' \COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +-- get ready for the next test +TRUNCATE orders_mx; +\c - - - :worker_2_port +SET citus.log_local_commands TO ON; +-- simulate the case where there is no connection slots available +ALTER SYSTEM SET citus.local_shared_pool_size TO -1; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + -1 +(1 row) + +\COPY orders_mx FROM '@abs_srcdir@/data/orders.1.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 3: "3|1234|F|205654.30|1993-10-14|5-LOW|Clerk#000000955|0|sly final accounts boost. carefully regular id..." +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 5: "5|445|F|105367.67|1994-07-30|5-LOW|Clerk#000000925|0|quickly. bold deposits sleep slyly. packages us..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 9: "33|670|F|146567.24|1993-10-27|3-MEDIUM|Clerk#000000409|0|uriously. furiously final request" +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 15: "39|818|O|326565.37|1996-09-20|3-MEDIUM|Clerk#000000659|0|ole express, ironic requests: ir" +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 19: "67|568|O|182481.16|1996-12-19|4-NOT SPECIFIED|Clerk#000000547|0|symptotes haggle slyly around the fu..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 24: "96|1078|F|64364.30|1994-04-17|2-HIGH|Clerk#000000395|0|oost furiously. pinto" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 25: "97|211|F|100572.55|1993-01-29|3-MEDIUM|Clerk#000000547|0|hang blithely along the regular accounts. f..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 38: "134|62|F|208201.46|1992-05-01|4-NOT SPECIFIED|Clerk#000000711|0|lar theodolites boos" +\COPY orders_mx FROM '@abs_srcdir@/data/orders.2.data' with delimiter '|' +NOTICE: executing the copy locally for shard 1220079 +CONTEXT: COPY orders_mx, line 2: "8998|80|F|147264.16|1993-01-04|5-LOW|Clerk#000000733|0| fluffily pending sauternes cajo" +NOTICE: executing the copy locally for shard 1220077 +CONTEXT: COPY orders_mx, line 4: "9024|1469|F|298241.36|1992-06-03|3-MEDIUM|Clerk#000000901|0|ar the theodolites. fluffily stealthy re..." +NOTICE: executing the copy locally for shard 1220073 +CONTEXT: COPY orders_mx, line 6: "9026|677|O|63256.87|1996-07-24|5-LOW|Clerk#000000320|0|ironic escapades would wake carefully " +NOTICE: executing the copy locally for shard 1220071 +CONTEXT: COPY orders_mx, line 9: "9029|1213|F|78703.86|1992-11-20|3-MEDIUM|Clerk#000000965|0| excuses nag quickly carefully unusual ex..." +NOTICE: executing the copy locally for shard 1220083 +CONTEXT: COPY orders_mx, line 14: "9058|403|F|63464.13|1993-06-29|2-HIGH|Clerk#000000376|0|ealthily special deposits. quickly regular r..." +NOTICE: executing the copy locally for shard 1220081 +CONTEXT: COPY orders_mx, line 16: "9060|463|O|45295.71|1996-06-09|1-URGENT|Clerk#000000438|0|iously. slyly regular dol" +NOTICE: executing the copy locally for shard 1220075 +CONTEXT: COPY orders_mx, line 43: "9159|1135|O|99594.61|1995-07-26|1-URGENT|Clerk#000000892|0|xcuses. quickly ironic deposits wake alon..." +NOTICE: executing the copy locally for shard 1220069 +CONTEXT: COPY orders_mx, line 69: "9281|904|F|173278.28|1992-02-24|1-URGENT|Clerk#000000530|0|eep furiously according to the requests; ..." +-- set it back +ALTER SYSTEM RESET citus.local_shared_pool_size; +SELECT pg_reload_conf(); + pg_reload_conf +---------------- + t +(1 row) + +SELECT pg_sleep(0.1); + pg_sleep +---------- + +(1 row) + +show citus.local_shared_pool_size; + citus.local_shared_pool_size +------------------------------ + 50 +(1 row) + -- These copies were intended to test copying data to single sharded table from -- worker nodes, yet in order to remove broadcast logic related codes we change -- the table to reference table and copy data from master. Should be updated diff --git a/src/test/regress/sql/alter_table_set_access_method.sql b/src/test/regress/sql/alter_table_set_access_method.sql index 17c7438f8..69a9dd8d4 100644 --- a/src/test/regress/sql/alter_table_set_access_method.sql +++ b/src/test/regress/sql/alter_table_set_access_method.sql @@ -209,6 +209,10 @@ SELECT relname, relkind ) ORDER BY relname ASC; +CREATE TABLE identity_cols_test (a int, b int generated by default as identity (increment by 42)); +-- errors out since we don't support alter_table.* udfs with tables having any identity columns +SELECT alter_table_set_access_method('identity_cols_test', 'columnar'); + SET client_min_messages TO WARNING; DROP SCHEMA alter_table_set_access_method CASCADE; SELECT 1 FROM master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/am_indexes.sql b/src/test/regress/sql/am_indexes.sql new file mode 100644 index 000000000..831699dc4 --- /dev/null +++ b/src/test/regress/sql/am_indexes.sql @@ -0,0 +1,29 @@ +-- +-- Testing indexes on on columnar tables. +-- + +CREATE SCHEMA columnar_indexes; +SET search_path tO columnar_indexes, public; + +-- +-- create index with the concurrent option. We should +-- error out during index creation. +-- https://github.com/citusdata/citus/issues/4599 +-- +create table t(a int, b int) using columnar; +create index CONCURRENTLY t_idx on t(a, b); +\d t +explain insert into t values (1, 2); +insert into t values (1, 2); +SELECT * FROM t; + +-- create index without the concurrent option. We should +-- error out during index creation. +create index t_idx on t(a, b); +\d t +explain insert into t values (1, 2); +insert into t values (3, 4); +SELECT * FROM t; + +SET client_min_messages TO WARNING; +DROP SCHEMA columnar_indexes CASCADE; diff --git a/src/test/regress/sql/citus_local_tables.sql b/src/test/regress/sql/citus_local_tables.sql index 1cfa75d27..148244e51 100644 --- a/src/test/regress/sql/citus_local_tables.sql +++ b/src/test/regress/sql/citus_local_tables.sql @@ -46,6 +46,18 @@ BEGIN; SELECT citus_add_local_table_to_metadata('temp_table'); ROLLBACK; +-- below two errors out since we don't support adding local tables +-- having any identity columns to metadata +BEGIN; + CREATE TABLE identity_cols_test (a int generated by default as identity (start with 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ROLLBACK; + +BEGIN; + CREATE TABLE identity_cols_test (a int generated always as identity (increment by 42)); + SELECT citus_add_local_table_to_metadata('identity_cols_test'); +ROLLBACK; + -- creating citus local table having no data initially would work SELECT citus_add_local_table_to_metadata('citus_local_table_1'); diff --git a/src/test/regress/sql/columnar_partitioning.sql b/src/test/regress/sql/columnar_partitioning.sql index 98692c78a..ce1b8271d 100644 --- a/src/test/regress/sql/columnar_partitioning.sql +++ b/src/test/regress/sql/columnar_partitioning.sql @@ -53,3 +53,75 @@ SET max_parallel_workers TO DEFAULT; SET max_parallel_workers_per_gather TO DEFAULT; DROP TABLE parent; + +-- +-- Test inheritance +-- + +CREATE TABLE i_row(i int); +INSERT INTO i_row VALUES(100); +CREATE TABLE i_col(i int) USING columnar; +INSERT INTO i_col VALUES(200); +CREATE TABLE ij_row_row(j int) INHERITS(i_row); +INSERT INTO ij_row_row VALUES(300, 1000); +CREATE TABLE ij_row_col(j int) INHERITS(i_row) USING columnar; +INSERT INTO ij_row_col VALUES(400, 2000); +CREATE TABLE ij_col_row(j int) INHERITS(i_col); +INSERT INTO ij_col_row VALUES(500, 3000); +CREATE TABLE ij_col_col(j int) INHERITS(i_col) USING columnar; +INSERT INTO ij_col_col VALUES(600, 4000); + +EXPLAIN (costs off) SELECT * FROM i_row; +SELECT * FROM i_row; + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; +SELECT * FROM ONLY i_row; + +EXPLAIN (costs off) SELECT * FROM i_col; +SELECT * FROM i_col; + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; +SELECT * FROM ONLY i_col; + +EXPLAIN (costs off) SELECT * FROM ij_row_row; +SELECT * FROM ij_row_row; + +EXPLAIN (costs off) SELECT * FROM ij_row_col; +SELECT * FROM ij_row_col; + +EXPLAIN (costs off) SELECT * FROM ij_col_row; +SELECT * FROM ij_col_row; + +EXPLAIN (costs off) SELECT * FROM ij_col_col; +SELECT * FROM ij_col_col; + +SET columnar.enable_custom_scan = FALSE; + +EXPLAIN (costs off) SELECT * FROM i_row; +SELECT * FROM i_row; + +EXPLAIN (costs off) SELECT * FROM ONLY i_row; +SELECT * FROM ONLY i_row; + +EXPLAIN (costs off) SELECT * FROM i_col; +SELECT * FROM i_col; + +EXPLAIN (costs off) SELECT * FROM ONLY i_col; +SELECT * FROM ONLY i_col; + +EXPLAIN (costs off) SELECT * FROM ij_row_row; +SELECT * FROM ij_row_row; + +EXPLAIN (costs off) SELECT * FROM ij_row_col; +SELECT * FROM ij_row_col; + +EXPLAIN (costs off) SELECT * FROM ij_col_row; +SELECT * FROM ij_col_row; + +EXPLAIN (costs off) SELECT * FROM ij_col_col; +SELECT * FROM ij_col_col; + +SET columnar.enable_custom_scan TO DEFAULT; + +DROP TABLE i_row CASCADE; +DROP TABLE i_col CASCADE; diff --git a/src/test/regress/sql/coordinator_shouldhaveshards.sql b/src/test/regress/sql/coordinator_shouldhaveshards.sql index 5362db9c1..7501be265 100644 --- a/src/test/regress/sql/coordinator_shouldhaveshards.sql +++ b/src/test/regress/sql/coordinator_shouldhaveshards.sql @@ -323,6 +323,50 @@ SELECT table_name, citus_table_type, distribution_column, shard_count FROM publi SET client_min_messages TO DEFAULT; +-- issue 4508 table_1 and table_2 are used to test +-- some edge cases around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +SET citus.log_intermediate_results TO ON; +SET client_min_messages to debug1; +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +RESET client_min_messages; + + \set VERBOSITY terse DROP TABLE ref_table; diff --git a/src/test/regress/sql/local_shard_copy.sql b/src/test/regress/sql/local_shard_copy.sql index ca29bff7f..0f2535c73 100644 --- a/src/test/regress/sql/local_shard_copy.sql +++ b/src/test/regress/sql/local_shard_copy.sql @@ -5,7 +5,7 @@ SET client_min_messages TO DEBUG; SET citus.next_shard_id TO 1570000; SET citus.replicate_reference_tables_on_activate TO off; -SELECT * FROM master_add_node('localhost', :master_port, groupid := 0); +SELECT 1 FROM master_add_node('localhost', :master_port, groupid := 0); SET citus.shard_count TO 4; SET citus.shard_replication_factor TO 1; diff --git a/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql new file mode 100644 index 000000000..377b6acbe --- /dev/null +++ b/src/test/regress/sql/mx_coordinator_shouldhaveshards.sql @@ -0,0 +1,94 @@ +CREATE SCHEMA mx_coordinator_shouldhaveshards; +SET search_path TO mx_coordinator_shouldhaveshards; + +SET citus.shard_replication_factor to 1; +SET citus.replication_model TO streaming; +SET client_min_messages TO WARNING; +SELECT 1 FROM master_add_node('localhost', :master_port, groupid => 0); +RESET client_min_messages; + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', true); + +-- issue 4508 table_1 and table_2 are used to test some edge cases +-- around intermediate result pruning +CREATE TABLE table_1 (key int, value text); +SELECT create_distributed_table('table_1', 'key', colocate_with := 'none'); + +CREATE TABLE table_2 (key int, value text); +SELECT create_distributed_table('table_2', 'key', colocate_with := 'none'); + +INSERT INTO table_1 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'); +INSERT INTO table_2 VALUES (1, '1'), (2, '2'), (3, '3'), (4, '4'), (5, '5'), (6, '6'); + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :worker_1_port +SET search_path TO mx_coordinator_shouldhaveshards; + +set citus.log_intermediate_results TO ON; +set client_min_messages to debug1; + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH a AS (SELECT * FROM table_1 ORDER BY 1,2 DESC LIMIT 1) +INSERT INTO table_1 SELECT count(*), +key +FROM a JOIN table_2 USING (key) +GROUP BY key +HAVING (max(table_2.value) >= (SELECT value FROM a)); + +WITH stats AS ( + SELECT count(key) m FROM table_1 +), +inserts AS ( + INSERT INTO table_2 + SELECT key, count(*) + FROM table_1 + WHERE key >= (SELECT m FROM stats) + GROUP BY key + HAVING count(*) <= (SELECT m FROM stats) + LIMIT 1 + RETURNING * +) SELECT count(*) FROM inserts; + +\c - - - :master_port + +SELECT 1 FROM master_set_node_property('localhost', :master_port, 'shouldhaveshards', false); + +DROP SCHEMA mx_coordinator_shouldhaveshards CASCADE; + +SELECT master_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/pg12.sql b/src/test/regress/sql/pg12.sql index 53a4f11f4..83e76867a 100644 --- a/src/test/regress/sql/pg12.sql +++ b/src/test/regress/sql/pg12.sql @@ -296,6 +296,90 @@ BEGIN; ORDER BY 1,2; ROLLBACK; +CREATE TABLE generated_stored_dist ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); + +SELECT create_distributed_table ('generated_stored_dist', 'col_1'); + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + +INSERT INTO generated_stored_dist VALUES (1, 'text_1'), (2, 'text_2'); +SELECT alter_distributed_table('generated_stored_dist', shard_count := 5, cascade_to_colocated := false); +SELECT * FROM generated_stored_dist ORDER BY 1,2,3; + +CREATE TABLE generated_stored_local ( + col_1 int, + "col\'_2" text, + col_3 text generated always as (UPPER("col\'_2")) stored +); + +SELECT citus_add_local_table_to_metadata('generated_stored_local'); + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + +SELECT create_distributed_table ('generated_stored_local', 'col_1'); + +INSERT INTO generated_stored_local VALUES (1, 'text_1'), (2, 'text_2'); +SELECT * FROM generated_stored_local ORDER BY 1,2,3; + +create table generated_stored_columnar(i int) partition by range(i); +create table generated_stored_columnar_p0 partition of generated_stored_columnar for values from (0) to (10); +create table generated_stored_columnar_p1 partition of generated_stored_columnar for values from (10) to (20); +SELECT alter_table_set_access_method('generated_stored_columnar_p0', 'columnar'); + +CREATE TABLE generated_stored_ref ( + col_1 int, + col_2 int, + col_3 int generated always as (col_1+col_2) stored, + col_4 int, + col_5 int generated always as (col_4*2-col_1) stored +); + +SELECT create_reference_table ('generated_stored_ref'); + +INSERT INTO generated_stored_ref (col_1, col_4) VALUES (1,2), (11,12); +INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (100,101,102), (200,201,202); + +SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; + +BEGIN; + SELECT undistribute_table('generated_stored_ref'); + INSERT INTO generated_stored_ref (col_1, col_4) VALUES (11,12), (21,22); + INSERT INTO generated_stored_ref (col_1, col_2, col_4) VALUES (200,201,202), (300,301,302); + SELECT * FROM generated_stored_ref ORDER BY 1,2,3,4,5; +ROLLBACK; + +BEGIN; + -- drop some of the columns not having "generated always as stored" expressions + -- this would drop generated columns too + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); + INSERT INTO generated_stored_ref VALUES (5); + SELECT * FROM generated_stored_REF ORDER BY 1; +ROLLBACK; + +BEGIN; + -- now drop all columns + ALTER TABLE generated_stored_ref DROP COLUMN col_3; + ALTER TABLE generated_stored_ref DROP COLUMN col_5; + ALTER TABLE generated_stored_ref DROP COLUMN col_1; + ALTER TABLE generated_stored_ref DROP COLUMN col_2; + ALTER TABLE generated_stored_ref DROP COLUMN col_4; + + -- show that undistribute_table works fine + SELECT undistribute_table('generated_stored_ref'); + + SELECT * FROM generated_stored_ref; +ROLLBACK; + RESET citus.replicate_reference_tables_on_activate; SELECT citus_remove_node('localhost', :master_port); diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index a837258fe..4d7343668 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -817,6 +817,9 @@ ROLLBACK; WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) SELECT count(*) FROM cte_1; +-- copy can use local execution even if there is no connection available +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; @@ -829,6 +832,9 @@ WITH cte_1 AS (SELECT * FROM another_schema_table LIMIT 1000) INSERT INTO another_schema_table VALUES (1,1), (2,2), (3,3), (4,4), (5,5),(6,6),(7,7); +-- copy fails if local execution is disabled and there is no connection slot +COPY another_schema_table(a) FROM PROGRAM 'seq 32'; + -- set the values to originals back ALTER SYSTEM RESET citus.max_cached_conns_per_worker; ALTER SYSTEM RESET citus.distributed_deadlock_detection_factor;