diff --git a/src/backend/columnar/columnar_debug.c b/src/backend/columnar/columnar_debug.c index 220d259fe..e6b19f768 100644 --- a/src/backend/columnar/columnar_debug.c +++ b/src/backend/columnar/columnar_debug.c @@ -115,8 +115,6 @@ columnar_storage_info(PG_FUNCTION_ARGS) RelationGetRelationName(rel)))); } - RelationOpenSmgr(rel); - Datum values[STORAGE_INFO_NATTS] = { 0 }; bool nulls[STORAGE_INFO_NATTS] = { 0 }; diff --git a/src/backend/columnar/columnar_metadata.c b/src/backend/columnar/columnar_metadata.c index 62d64861f..8b9b9efc6 100644 --- a/src/backend/columnar/columnar_metadata.c +++ b/src/backend/columnar/columnar_metadata.c @@ -1433,7 +1433,7 @@ DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple) simple_heap_delete(state->rel, tid); /* execute AFTER ROW DELETE Triggers to enforce constraints */ - ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL); + ExecARDeleteTriggers_compat(estate, resultRelInfo, tid, NULL, NULL, false); } @@ -1738,11 +1738,10 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade) return; } - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks < 2) { - ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId()); + ColumnarStorageInit(RelationGetSmgr(rel), ColumnarMetadataNewStorageId()); return; } diff --git a/src/backend/columnar/columnar_storage.c b/src/backend/columnar/columnar_storage.c index 71fc75ccb..9712e7160 100644 --- a/src/backend/columnar/columnar_storage.c +++ b/src/backend/columnar/columnar_storage.c @@ -44,6 +44,8 @@ #include "storage/bufmgr.h" #include "storage/lmgr.h" +#include "pg_version_compat.h" + #include "columnar/columnar.h" #include "columnar/columnar_storage.h" @@ -354,8 +356,7 @@ ColumnarStorageGetReservedOffset(Relation rel, bool force) bool ColumnarStorageIsCurrent(Relation rel) { - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks < 2) { @@ -439,8 +440,7 @@ ColumnarStorageReserveData(Relation rel, uint64 amount) PhysicalAddr final = LogicalToPhysical(nextReservation - 1); /* extend with new pages */ - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); while (nblocks <= final.blockno) { @@ -547,8 +547,7 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation) rel->rd_id, newDataReservation); } - RelationOpenSmgr(rel); - BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (old_rel_pages == 0) { /* nothing to do */ @@ -627,8 +626,7 @@ ColumnarOverwriteMetapage(Relation relation, ColumnarMetapage columnarMetapage) static ColumnarMetapage ColumnarMetapageRead(Relation rel, bool force) { - RelationOpenSmgr(rel); - BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (nblocks == 0) { /* diff --git a/src/backend/columnar/columnar_tableam.c b/src/backend/columnar/columnar_tableam.c index b6179ac8c..a16210fb7 100644 --- a/src/backend/columnar/columnar_tableam.c +++ b/src/backend/columnar/columnar_tableam.c @@ -881,7 +881,7 @@ columnar_relation_set_new_filenode(Relation rel, *freezeXid = RecentXmin; *minmulti = GetOldestMultiXactId(); - SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true); ColumnarStorageInit(srel, ColumnarMetadataNewStorageId()); InitColumnarOptions(rel->rd_id); @@ -913,8 +913,7 @@ columnar_relation_nontransactional_truncate(Relation rel) RelationTruncate(rel, 0); uint64 storageId = ColumnarMetadataNewStorageId(); - RelationOpenSmgr(rel); - ColumnarStorageInit(rel->rd_smgr, storageId); + ColumnarStorageInit(RelationGetSmgr(rel), storageId); } @@ -1136,8 +1135,7 @@ LogRelationStats(Relation rel, int elevel) totalStripeLength += stripe->dataLength; } - RelationOpenSmgr(rel); - uint64 relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); RelationCloseSmgr(rel); Datum storageId = DirectFunctionCall1(columnar_relation_storageid, @@ -1239,8 +1237,7 @@ TruncateColumnar(Relation rel, int elevel) uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1, ColumnarFirstLogicalOffset); - RelationOpenSmgr(rel); - BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); if (!ColumnarStorageTruncate(rel, newDataReservation)) { @@ -1248,8 +1245,7 @@ TruncateColumnar(Relation rel, int elevel) return; } - RelationOpenSmgr(rel); - BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); /* * We can release the exclusive lock as soon as we have truncated. @@ -1784,20 +1780,17 @@ columnar_relation_size(Relation rel, ForkNumber forkNumber) uint64 nblocks = 0; - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(rel); - /* InvalidForkNumber indicates returning the size for all forks */ if (forkNumber == InvalidForkNumber) { for (int i = 0; i < MAX_FORKNUM; i++) { - nblocks += smgrnblocks(rel->rd_smgr, i); + nblocks += smgrnblocks(RelationGetSmgr(rel), i); } } else { - nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber); } return nblocks * BLCKSZ; @@ -1819,8 +1812,7 @@ columnar_estimate_rel_size(Relation rel, int32 *attr_widths, double *allvisfrac) { CheckCitusColumnarVersion(ERROR); - RelationOpenSmgr(rel); - *pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM); + *pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM); *tuples = ColumnarTableRowCount(rel); /* @@ -2300,18 +2292,30 @@ detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull) static void ColumnarCheckLogicalReplication(Relation rel) { + bool pubActionInsert = false; + if (!is_publishable_relation(rel)) { return; } +#if PG_VERSION_NUM >= PG_VERSION_15 + { + PublicationDesc pubdesc; + + RelationBuildPublicationDesc(rel, &pubdesc); + pubActionInsert = pubdesc.pubactions.pubinsert; + } +#else if (rel->rd_pubactions == NULL) { GetRelationPublicationActions(rel); Assert(rel->rd_pubactions != NULL); } + pubActionInsert = rel->rd_pubactions->pubinsert; +#endif - if (rel->rd_pubactions->pubinsert) + if (pubActionInsert) { ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), errmsg( diff --git a/src/backend/distributed/commands/collation.c b/src/backend/distributed/commands/collation.c index c284404ce..bc642bed1 100644 --- a/src/backend/distributed/commands/collation.c +++ b/src/backend/distributed/commands/collation.c @@ -10,6 +10,8 @@ */ #include "postgres.h" +#include "pg_version_compat.h" + #include "access/htup_details.h" #include "access/xact.h" #include "catalog/pg_collation.h" @@ -60,12 +62,30 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(heapTuple); char collprovider = collationForm->collprovider; - const char *collcollate = NameStr(collationForm->collcollate); - const char *collctype = NameStr(collationForm->collctype); Oid collnamespace = collationForm->collnamespace; const char *collname = NameStr(collationForm->collname); bool collisdeterministic = collationForm->collisdeterministic; +#if PG_VERSION_NUM >= PG_VERSION_15 + bool isnull; + Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate, + &isnull); + Assert(!isnull); + char *collcollate = TextDatumGetCString(datum); + datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collctype, &isnull); + Assert(!isnull); + char *collctype = TextDatumGetCString(datum); +#else + + /* + * In versions before 15, collcollate and collctype were type "name". Use + * pstrdup() to match the interface of 15 so that we consistently free the + * result later. + */ + char *collcollate = pstrdup(NameStr(collationForm->collcollate)); + char *collctype = pstrdup(NameStr(collationForm->collctype)); +#endif + if (collowner != NULL) { *collowner = collationForm->collowner; @@ -103,6 +123,9 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati quote_literal_cstr(collctype)); } + pfree(collcollate); + pfree(collctype); + if (!collisdeterministic) { appendStringInfoString(&collationNameDef, ", deterministic = false"); @@ -500,7 +523,7 @@ GenerateBackupNameForCollationCollision(const ObjectAddress *address) return NULL; } Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(collationTuple); - Value *namespace = makeString(get_namespace_name(collationForm->collnamespace)); + String *namespace = makeString(get_namespace_name(collationForm->collnamespace)); ReleaseSysCache(collationTuple); while (true) diff --git a/src/backend/distributed/commands/database.c b/src/backend/distributed/commands/database.c index 59902b038..76d74119e 100644 --- a/src/backend/distributed/commands/database.c +++ b/src/backend/distributed/commands/database.c @@ -115,7 +115,7 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok) AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); Assert(stmt->objectType == OBJECT_DATABASE); - Oid databaseOid = get_database_oid(strVal((Value *) stmt->object), missing_ok); + Oid databaseOid = get_database_oid(strVal((String *) stmt->object), missing_ok); ObjectAddress address = { 0 }; ObjectAddressSet(address, DatabaseRelationId, databaseOid); diff --git a/src/backend/distributed/commands/extension.c b/src/backend/distributed/commands/extension.c index c1cf06039..e4b720b4c 100644 --- a/src/backend/distributed/commands/extension.c +++ b/src/backend/distributed/commands/extension.c @@ -295,7 +295,7 @@ FilterDistributedExtensions(List *extensionObjectList) { List *extensionNameList = NIL; - Value *objectName = NULL; + String *objectName = NULL; foreach_ptr(objectName, extensionObjectList) { const char *extensionName = strVal(objectName); @@ -334,7 +334,7 @@ ExtensionNameListToObjectAddressList(List *extensionObjectList) { List *extensionObjectAddressList = NIL; - Value *objectName; + String *objectName; foreach_ptr(objectName, extensionObjectList) { /* @@ -513,7 +513,8 @@ MarkExistingObjectDependenciesDistributedIfSupported() ObjectAddress tableAddress = { 0 }; ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); - if (ShouldSyncTableMetadata(citusTableId)) + /* refrain reading the metadata cache for all tables */ + if (ShouldSyncTableMetadataViaCatalog(citusTableId)) { /* we need to pass pointer allocated in the heap */ ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); @@ -671,7 +672,7 @@ IsDropCitusExtensionStmt(Node *parseTree) } /* now that we have a DropStmt, check if citus extension is among the objects to dropped */ - Value *objectName; + String *objectName; foreach_ptr(objectName, dropStmt->objects) { const char *extensionName = strVal(objectName); diff --git a/src/backend/distributed/commands/foreign_server.c b/src/backend/distributed/commands/foreign_server.c index 0777814df..76acf7f66 100644 --- a/src/backend/distributed/commands/foreign_server.c +++ b/src/backend/distributed/commands/foreign_server.c @@ -190,7 +190,7 @@ PreprocessDropForeignServerStmt(Node *node, const char *queryString, Assert(list_length(stmt->objects) == 1); - Value *serverValue = linitial(stmt->objects); + String *serverValue = linitial(stmt->objects); ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); /* unmark distributed server */ @@ -362,7 +362,7 @@ RecreateForeignServerStmt(Oid serverId) static bool NameListHasDistributedServer(List *serverNames) { - Value *serverValue = NULL; + String *serverValue = NULL; foreach_ptr(serverValue, serverNames) { ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); diff --git a/src/backend/distributed/commands/function.c b/src/backend/distributed/commands/function.c index 879aa4770..cda28d375 100644 --- a/src/backend/distributed/commands/function.c +++ b/src/backend/distributed/commands/function.c @@ -490,7 +490,7 @@ GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName, distributionArgumentName++; /* throws error if the input is not an integer */ - distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0); + distributionArgumentIndex = pg_strtoint32(distributionArgumentName); if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs) { @@ -1893,7 +1893,7 @@ AlterFunctionSchemaStmtObjectAddress(Node *node, bool missing_ok) */ /* the name of the function is the last in the list of names */ - Value *funcNameStr = lfirst(list_tail(names)); + String *funcNameStr = lfirst(list_tail(names)); List *newNames = list_make2(makeString(stmt->newschema), funcNameStr); /* @@ -1938,8 +1938,8 @@ GenerateBackupNameForProcCollision(const ObjectAddress *address) char *newName = palloc0(NAMEDATALEN); char suffix[NAMEDATALEN] = { 0 }; int count = 0; - Value *namespace = makeString(get_namespace_name(get_func_namespace( - address->objectId))); + String *namespace = makeString(get_namespace_name(get_func_namespace( + address->objectId))); char *baseName = get_func_name(address->objectId); int baseLength = strlen(baseName); Oid *argtypes = NULL; diff --git a/src/backend/distributed/commands/index.c b/src/backend/distributed/commands/index.c index 5ff984f66..3e25483b0 100644 --- a/src/backend/distributed/commands/index.c +++ b/src/backend/distributed/commands/index.c @@ -464,7 +464,8 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, + CreateIndexStmtGetRelationId(createIndexStatement)); ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->metadataSyncCommand = createIndexCommand; ddlJob->taskList = CreateIndexTaskList(createIndexStatement); @@ -598,7 +599,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand, } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement, "concurrently"); ddlJob->metadataSyncCommand = reindexCommand; @@ -695,7 +696,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand, MarkInvalidateForeignKeyGraph(); } - ddlJob->targetRelationId = distributedRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, + distributedRelationId); /* * We do not want DROP INDEX CONCURRENTLY to commit locally before diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d2d7d9b23..6036ec00a 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -2009,7 +2009,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation, foreach(columnNameCell, columnNameList) { char *columnName = (char *) lfirst(columnNameCell); - Value *columnNameValue = makeString(columnName); + String *columnNameValue = makeString(columnName); attributeList = lappend(attributeList, columnNameValue); } diff --git a/src/backend/distributed/commands/rename.c b/src/backend/distributed/commands/rename.c index d777c420b..3ece05a0a 100644 --- a/src/backend/distributed/commands/rename.c +++ b/src/backend/distributed/commands/rename.c @@ -127,7 +127,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand, } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = tableRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId); ddlJob->metadataSyncCommand = renameCommand; ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); diff --git a/src/backend/distributed/commands/role.c b/src/backend/distributed/commands/role.c index 608dc0060..af0a3a856 100644 --- a/src/backend/distributed/commands/role.c +++ b/src/backend/distributed/commands/role.c @@ -150,7 +150,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString) if (encryptedPassword != NULL) { - Value *encryptedPasswordValue = makeString((char *) encryptedPassword); + String *encryptedPasswordValue = makeString((char *) encryptedPassword); option->arg = (Node *) encryptedPasswordValue; } else @@ -741,8 +741,13 @@ makeStringConst(char *str, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.sval.type = T_String; + n->val.sval.sval = str; +#else n->val.type = T_String; n->val.val.str = str; +#endif n->location = location; return (Node *) n; @@ -759,8 +764,13 @@ makeIntConst(int val, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.ival.type = T_Integer; + n->val.ival.ival = val; +#else n->val.type = T_Integer; n->val.val.ival = val; +#endif n->location = location; return (Node *) n; @@ -777,8 +787,13 @@ makeFloatConst(char *str, int location) { A_Const *n = makeNode(A_Const); +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.fval.type = T_Float; + n->val.fval.fval = str; +#else n->val.type = T_Float; n->val.val.str = str; +#endif n->location = location; return (Node *) n; diff --git a/src/backend/distributed/commands/schema.c b/src/backend/distributed/commands/schema.c index cdee81349..32aac0106 100644 --- a/src/backend/distributed/commands/schema.c +++ b/src/backend/distributed/commands/schema.c @@ -107,7 +107,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString, EnsureSequentialMode(OBJECT_SCHEMA); - Value *schemaVal = NULL; + String *schemaVal = NULL; foreach_ptr(schemaVal, distributedSchemas) { if (SchemaHasDistributedTableWithFKey(strVal(schemaVal))) @@ -288,7 +288,7 @@ FilterDistributedSchemas(List *schemas) { List *distributedSchemas = NIL; - Value *schemaValue = NULL; + String *schemaValue = NULL; foreach_ptr(schemaValue, schemas) { const char *schemaName = strVal(schemaValue); diff --git a/src/backend/distributed/commands/statistics.c b/src/backend/distributed/commands/statistics.c index 79a758c10..6f8e6df54 100644 --- a/src/backend/distributed/commands/statistics.c +++ b/src/backend/distributed/commands/statistics.c @@ -92,7 +92,7 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -197,7 +197,7 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -236,7 +236,7 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -274,7 +274,7 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -295,7 +295,7 @@ PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString) AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); Assert(stmt->objectType == OBJECT_STATISTIC_EXT); - Value *statName = llast((List *) stmt->object); + String *statName = llast((List *) stmt->object); Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), statName), false); Oid relationId = GetRelIdByStatsOid(statsOid); @@ -328,7 +328,7 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk) AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); ObjectAddress address = { 0 }; - Value *statName = llast((List *) stmt->object); + String *statName = llast((List *) stmt->object); Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), statName), missingOk); ObjectAddressSet(address, StatisticExtRelationId, statsOid); @@ -376,7 +376,7 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); @@ -416,7 +416,7 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->startNewTransaction = false; ddlJob->metadataSyncCommand = ddlCommand; ddlJob->taskList = DDLTaskList(relationId, ddlCommand); diff --git a/src/backend/distributed/commands/table.c b/src/backend/distributed/commands/table.c index 220a4d049..832df667c 100644 --- a/src/backend/distributed/commands/table.c +++ b/src/backend/distributed/commands/table.c @@ -1102,7 +1102,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand, /* fill them here as it is possible to use them in some conditional blocks below */ DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = leftRelationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, leftRelationId); const char *sqlForTaskList = alterTableCommand; if (deparseAT) @@ -1779,7 +1779,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString, DDLJob *ddlJob = palloc0(sizeof(DDLJob)); QualifyTreeNode((Node *) stmt); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt); ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand); return list_make1(ddlJob); diff --git a/src/backend/distributed/commands/trigger.c b/src/backend/distributed/commands/trigger.c index a277cb372..94f4f4cef 100644 --- a/src/backend/distributed/commands/trigger.c +++ b/src/backend/distributed/commands/trigger.c @@ -44,8 +44,8 @@ /* local function forward declarations */ static bool IsCreateCitusTruncateTriggerStmt(CreateTrigStmt *createTriggerStmt); -static Value * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt * - alterTriggerDependsStmt); +static String * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt * + alterTriggerDependsStmt); static void ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt); static RangeVar * GetDropTriggerStmtRelation(DropStmt *dropTriggerStmt); static void ExtractDropStmtTriggerAndRelationName(DropStmt *dropTriggerStmt, @@ -416,7 +416,7 @@ PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString, * workers */ - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); ereport(ERROR, (errmsg( "Triggers \"%s\" on distributed tables and local tables added to metadata " @@ -454,7 +454,7 @@ PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString) EnsureCoordinator(); ErrorOutForTriggerIfNotSupported(relationId); - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue), queryString); @@ -476,7 +476,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS char **relationName = &(relation->relname); AppendShardIdToName(relationName, shardId); - Value *triggerNameValue = + String *triggerNameValue = GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); AppendShardIdToName(&strVal(triggerNameValue), shardId); @@ -489,7 +489,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS * GetAlterTriggerDependsTriggerName returns Value object for the trigger * name that given AlterObjectDependsStmt is executed for. */ -static Value * +static String * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDependsStmt) { List *triggerObjectNameList = (List *) alterTriggerDependsStmt->object; @@ -503,7 +503,7 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen * be the name of the trigger in either before or after standard process * utility. */ - Value *triggerNameValue = llast(triggerObjectNameList); + String *triggerNameValue = llast(triggerObjectNameList); return triggerNameValue; } @@ -642,12 +642,12 @@ DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName, uint64 ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, &relationName); AppendShardIdToName(&triggerName, shardId); - Value *triggerNameValue = makeString(triggerName); + String *triggerNameValue = makeString(triggerName); AppendShardIdToName(&relationName, shardId); - Value *relationNameValue = makeString(relationName); + String *relationNameValue = makeString(relationName); - Value *schemaNameValue = makeString(pstrdup(schemaName)); + String *schemaNameValue = makeString(pstrdup(schemaName)); List *shardTriggerNameList = list_make3(schemaNameValue, relationNameValue, triggerNameValue); @@ -712,7 +712,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName, const char *queryString) { DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = queryString; if (!triggerName) diff --git a/src/backend/distributed/commands/type.c b/src/backend/distributed/commands/type.c index 4973aafd0..e80f5b14e 100644 --- a/src/backend/distributed/commands/type.c +++ b/src/backend/distributed/commands/type.c @@ -878,7 +878,7 @@ AlterTypeSchemaStmtObjectAddress(Node *node, bool missing_ok) */ /* typename is the last in the list of names */ - Value *typeNameStr = lfirst(list_tail(names)); + String *typeNameStr = lfirst(list_tail(names)); /* * we don't error here either, as the error would be not a good user facing diff --git a/src/backend/distributed/commands/utility_hook.c b/src/backend/distributed/commands/utility_hook.c index 9c93f0737..fba205cfc 100644 --- a/src/backend/distributed/commands/utility_hook.c +++ b/src/backend/distributed/commands/utility_hook.c @@ -1044,16 +1044,20 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob) EnsureCoordinator(); - Oid targetRelationId = ddlJob->targetRelationId; + ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress; - if (OidIsValid(targetRelationId)) + if (OidIsValid(targetObjectAddress.classId)) { /* - * Only for ddlJobs that are targetting a relation (table) we want to sync - * its metadata and verify some properties around the table. + * Only for ddlJobs that are targetting an object we want to sync + * its metadata. */ - shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId); - EnsurePartitionTableNotReplicated(targetRelationId); + shouldSyncMetadata = ShouldSyncUserCommandForObject(targetObjectAddress); + + if (targetObjectAddress.classId == RelationRelationId) + { + EnsurePartitionTableNotReplicated(targetObjectAddress.objectId); + } } bool localExecutionSupported = true; @@ -1304,7 +1308,7 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command) } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = relationId; + ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId); ddlJob->metadataSyncCommand = GetTableDDLCommand(command); ddlJob->taskList = taskList; @@ -1555,7 +1559,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands) } DDLJob *ddlJob = palloc0(sizeof(DDLJob)); - ddlJob->targetRelationId = InvalidOid; + ddlJob->targetObjectAddress = InvalidObjectAddress; ddlJob->metadataSyncCommand = NULL; ddlJob->taskList = list_make1(task); diff --git a/src/backend/distributed/commands/vacuum.c b/src/backend/distributed/commands/vacuum.c index 7f1e04f76..9b1e0bfb3 100644 --- a/src/backend/distributed/commands/vacuum.c +++ b/src/backend/distributed/commands/vacuum.c @@ -432,7 +432,7 @@ DeparseVacuumColumnNames(List *columnNameList) appendStringInfoString(columnNames, " ("); - Value *columnName = NULL; + String *columnName = NULL; foreach_ptr(columnName, columnNameList) { appendStringInfo(columnNames, "%s,", strVal(columnName)); diff --git a/src/backend/distributed/deparser/deparse_database_stmts.c b/src/backend/distributed/deparser/deparse_database_stmts.c index 0ebc69238..b72787993 100644 --- a/src/backend/distributed/deparser/deparse_database_stmts.c +++ b/src/backend/distributed/deparser/deparse_database_stmts.c @@ -11,6 +11,8 @@ #include "postgres.h" +#include "pg_version_compat.h" + #include "catalog/namespace.h" #include "lib/stringinfo.h" #include "nodes/parsenodes.h" @@ -44,6 +46,6 @@ AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) appendStringInfo(buf, "ALTER DATABASE %s OWNER TO %s;", - quote_identifier(strVal((Value *) stmt->object)), + quote_identifier(strVal((String *) stmt->object)), RoleSpecString(stmt->newowner, true)); } diff --git a/src/backend/distributed/deparser/deparse_foreign_server_stmts.c b/src/backend/distributed/deparser/deparse_foreign_server_stmts.c index 62c5f98c8..805f24f90 100644 --- a/src/backend/distributed/deparser/deparse_foreign_server_stmts.c +++ b/src/backend/distributed/deparser/deparse_foreign_server_stmts.c @@ -223,7 +223,7 @@ AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt) static void AppendServerNames(StringInfo buf, DropStmt *stmt) { - Value *serverValue = NULL; + String *serverValue = NULL; foreach_ptr(serverValue, stmt->objects) { const char *serverString = quote_identifier(strVal(serverValue)); diff --git a/src/backend/distributed/deparser/deparse_function_stmts.c b/src/backend/distributed/deparser/deparse_function_stmts.c index d58faabfb..93bb65b4d 100644 --- a/src/backend/distributed/deparser/deparse_function_stmts.c +++ b/src/backend/distributed/deparser/deparse_function_stmts.c @@ -396,18 +396,18 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) appendStringInfo(buf, " SET %s =", quote_identifier(setStmt->name)); } - Value value = varArgConst->val; - switch (value.type) + Node *value = (Node *) &varArgConst->val; + switch (value->type) { case T_Integer: { - appendStringInfo(buf, " %d", intVal(&value)); + appendStringInfo(buf, " %d", intVal(value)); break; } case T_Float: { - appendStringInfo(buf, " %s", strVal(&value)); + appendStringInfo(buf, " %s", strVal(value)); break; } @@ -428,7 +428,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) Datum interval = DirectFunctionCall3(interval_in, - CStringGetDatum(strVal(&value)), + CStringGetDatum(strVal(value)), ObjectIdGetDatum(InvalidOid), Int32GetDatum(typmod)); @@ -440,7 +440,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt) else { appendStringInfo(buf, " %s", quote_literal_cstr(strVal( - &value))); + value))); } break; } diff --git a/src/backend/distributed/deparser/deparse_schema_stmts.c b/src/backend/distributed/deparser/deparse_schema_stmts.c index ebc76d5e8..21ea16fbe 100644 --- a/src/backend/distributed/deparser/deparse_schema_stmts.c +++ b/src/backend/distributed/deparser/deparse_schema_stmts.c @@ -126,7 +126,7 @@ AppendDropSchemaStmt(StringInfo buf, DropStmt *stmt) appendStringInfoString(buf, "IF EXISTS "); } - Value *schemaValue = NULL; + String *schemaValue = NULL; foreach_ptr(schemaValue, stmt->objects) { const char *schemaString = quote_identifier(strVal(schemaValue)); diff --git a/src/backend/distributed/deparser/deparse_statistics_stmts.c b/src/backend/distributed/deparser/deparse_statistics_stmts.c index fb1e67977..90828cc67 100644 --- a/src/backend/distributed/deparser/deparse_statistics_stmts.c +++ b/src/backend/distributed/deparser/deparse_statistics_stmts.c @@ -200,10 +200,10 @@ AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt) static void AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt) { - Value *schemaNameVal = (Value *) linitial(stmt->defnames); + String *schemaNameVal = (String *) linitial(stmt->defnames); const char *schemaName = quote_identifier(strVal(schemaNameVal)); - Value *statNameVal = (Value *) lsecond(stmt->defnames); + String *statNameVal = (String *) lsecond(stmt->defnames); const char *statName = quote_identifier(strVal(statNameVal)); appendStringInfo(buf, "%s.%s", schemaName, statName); @@ -220,7 +220,7 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt) appendStringInfoString(buf, " ("); - Value *statType = NULL; + String *statType = NULL; foreach_ptr(statType, stmt->stat_types) { appendStringInfoString(buf, strVal(statType)); diff --git a/src/backend/distributed/deparser/deparse_text_search.c b/src/backend/distributed/deparser/deparse_text_search.c index 43d162678..e0c750d0d 100644 --- a/src/backend/distributed/deparser/deparse_text_search.c +++ b/src/backend/distributed/deparser/deparse_text_search.c @@ -464,7 +464,7 @@ DeparseTextSearchDictionaryCommentStmt(Node *node) static void AppendStringInfoTokentypeList(StringInfo buf, List *tokentypes) { - Value *tokentype = NULL; + String *tokentype = NULL; bool first = true; foreach_ptr(tokentype, tokentypes) { diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index 2b32916ee..347bd4d35 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -171,7 +171,6 @@ #include "storage/fd.h" #include "storage/latch.h" #include "utils/builtins.h" -#include "utils/int8.h" #include "utils/lsyscache.h" #include "utils/memutils.h" #include "utils/syscache.h" @@ -4513,7 +4512,7 @@ ReceiveResults(WorkerSession *session, bool storeRows) /* if there are multiple replicas, make sure to consider only one */ if (storeRows && *currentAffectedTupleString != '\0') { - scanint8(currentAffectedTupleString, false, ¤tAffectedTupleCount); + currentAffectedTupleCount = pg_strtoint64(currentAffectedTupleString); Assert(currentAffectedTupleCount >= 0); execution->rowsProcessed += currentAffectedTupleCount; } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index 7c0426c0a..306b7f3e1 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList, uint64 totalRowsProcessed = 0; ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); - /* - * Even if we are executing local tasks, we still enable - * coordinated transaction. This is because - * (a) we might be in a transaction, and the next commands may - * require coordinated transaction - * (b) we might be executing some tasks locally and the others - * via remote execution - * - * Also, there is no harm enabling coordinated transaction even if - * we only deal with local tasks in the transaction. - */ - UseCoordinatedTransaction(); - LocalExecutorLevel++; PG_TRY(); { diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 435ab1ea5..43cdb4edf 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -617,7 +617,8 @@ RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, i numParams) { List *queryTreeList = - pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL); + pg_analyze_and_rewrite_fixedparams(rawStmt, queryString, paramOids, numParams, + NULL); if (list_length(queryTreeList) != 1) { diff --git a/src/backend/distributed/metadata/metadata_cache.c b/src/backend/distributed/metadata/metadata_cache.c index 63c2f8695..af80df0c4 100644 --- a/src/backend/distributed/metadata/metadata_cache.c +++ b/src/backend/distributed/metadata/metadata_cache.c @@ -7,7 +7,9 @@ *------------------------------------------------------------------------- */ +#include "postgres.h" #include "distributed/pg_version_constants.h" +#include "pg_version_compat.h" #include "stdint.h" #include "postgres.h" @@ -2864,8 +2866,8 @@ CurrentUserName(void) Oid LookupTypeOid(char *schemaNameSting, char *typeNameString) { - Value *schemaName = makeString(schemaNameSting); - Value *typeName = makeString(typeNameString); + String *schemaName = makeString(schemaNameSting); + String *typeName = makeString(typeNameString); List *qualifiedName = list_make2(schemaName, typeName); TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName); diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index 4b62afc3b..ee9634617 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -425,6 +425,22 @@ ClusterHasKnownMetadataWorkers() } +/* + * ShouldSyncUserCommandForObject checks if the user command should be synced to the + * worker nodes for the given object. + */ +bool +ShouldSyncUserCommandForObject(ObjectAddress objectAddress) +{ + if (objectAddress.classId == RelationRelationId) + { + return ShouldSyncTableMetadata(objectAddress.objectId); + } + + return false; +} + + /* * ShouldSyncTableMetadata checks if the metadata of a distributed table should be * propagated to metadata workers, i.e. the table is a hash distributed table or diff --git a/src/backend/distributed/metadata/node_metadata.c b/src/backend/distributed/metadata/node_metadata.c index c66b0d3e2..f74305012 100644 --- a/src/backend/distributed/metadata/node_metadata.c +++ b/src/backend/distributed/metadata/node_metadata.c @@ -116,7 +116,7 @@ static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort); static bool NodeIsLocal(WorkerNode *worker); static void SetLockTimeoutLocally(int32 lock_cooldown); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); -static bool UnsetMetadataSyncedForAll(void); +static bool UnsetMetadataSyncedForAllWorkers(void); static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, int columnIndex, Datum value); @@ -535,7 +535,7 @@ citus_disable_node(PG_FUNCTION_ARGS) * metadata at this point. Instead, we defer that to citus_activate_node() * where we expect all nodes up and running. */ - if (UnsetMetadataSyncedForAll()) + if (UnsetMetadataSyncedForAllWorkers()) { TriggerNodeMetadataSyncOnCommit(); } @@ -1319,7 +1319,7 @@ citus_update_node(PG_FUNCTION_ARGS) * early, but that's fine, since this will start a retry loop with * 5 second intervals until sync is complete. */ - if (UnsetMetadataSyncedForAll()) + if (UnsetMetadataSyncedForAllWorkers()) { TriggerNodeMetadataSyncOnCommit(); } @@ -2646,15 +2646,15 @@ DatumToString(Datum datum, Oid dataType) /* - * UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata - * nodes to false. It returns true if it updated at least a node. + * UnsetMetadataSyncedForAllWorkers sets the metadatasynced column of all metadata + * worker nodes to false. It returns true if it updated at least a node. */ static bool -UnsetMetadataSyncedForAll(void) +UnsetMetadataSyncedForAllWorkers(void) { bool updatedAtLeastOne = false; - ScanKeyData scanKey[2]; - int scanKeyCount = 2; + ScanKeyData scanKey[3]; + int scanKeyCount = 3; bool indexOK = false; /* @@ -2669,6 +2669,11 @@ UnsetMetadataSyncedForAll(void) ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced, BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); + /* coordinator always has the up to date metadata */ + ScanKeyInit(&scanKey[2], Anum_pg_dist_node_groupid, + BTGreaterStrategyNumber, F_INT4GT, + Int32GetDatum(COORDINATOR_GROUP_ID)); + CatalogIndexState indstate = CatalogOpenIndexes(relation); SysScanDesc scanDescriptor = systable_beginscan(relation, diff --git a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c similarity index 98% rename from src/backend/distributed/metadata/pg_get_object_address_12_13_14.c rename to src/backend/distributed/metadata/pg_get_object_address_13_14_15.c index 47e401499..3a7c1fc49 100644 --- a/src/backend/distributed/metadata/pg_get_object_address_12_13_14.c +++ b/src/backend/distributed/metadata/pg_get_object_address_13_14_15.c @@ -1,6 +1,6 @@ /*------------------------------------------------------------------------- * - * pg_get_object_address_12_13_14.c + * pg_get_object_address_13_14_15.c * * Copied functions from Postgres pg_get_object_address with acl/owner check. * Since we need to use intermediate data types Relation and Node from @@ -40,11 +40,6 @@ static void ErrorIfCurrentUserCanNotDistributeObject(ObjectType type, Relation *relation); static List * textarray_to_strvaluelist(ArrayType *arr); -/* It is defined on PG >= 13 versions by default */ -#if PG_VERSION_NUM < PG_VERSION_13 - #define TYPALIGN_INT 'i' -#endif - /* * PgGetObjectAddress gets the object address. This function is mostly copied from * pg_get_object_address of the PG code. We need to copy that function to use @@ -283,6 +278,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) case OBJECT_FDW: case OBJECT_FOREIGN_SERVER: case OBJECT_LANGUAGE: +#if PG_VERSION_NUM >= PG_VERSION_15 + case OBJECT_PARAMETER_ACL: +#endif case OBJECT_PUBLICATION: case OBJECT_ROLE: case OBJECT_SCHEMA: @@ -320,6 +318,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr) break; } +#if PG_VERSION_NUM >= PG_VERSION_15 + case OBJECT_PUBLICATION_NAMESPACE: +#endif case OBJECT_USER_MAPPING: { objnode = (Node *) list_make2(linitial(name), linitial(args)); diff --git a/src/backend/distributed/operations/shard_rebalancer.c b/src/backend/distributed/operations/shard_rebalancer.c index 16ee50c52..ecb0d6673 100644 --- a/src/backend/distributed/operations/shard_rebalancer.c +++ b/src/backend/distributed/operations/shard_rebalancer.c @@ -54,7 +54,6 @@ #include "storage/lmgr.h" #include "utils/builtins.h" #include "utils/fmgroids.h" -#include "utils/int8.h" #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/memutils.h" @@ -1396,9 +1395,9 @@ GetShardStatistics(MultiConnection *connection, HTAB *shardIds) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) { char *shardIdString = PQgetvalue(result, rowIndex, 0); - uint64 shardId = pg_strtouint64(shardIdString, NULL, 10); + uint64 shardId = strtou64(shardIdString, NULL, 10); char *sizeString = PQgetvalue(result, rowIndex, 1); - uint64 totalSize = pg_strtouint64(sizeString, NULL, 10); + uint64 totalSize = strtou64(sizeString, NULL, 10); ShardStatistics *statistics = hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); diff --git a/src/backend/distributed/operations/stage_protocol.c b/src/backend/distributed/operations/stage_protocol.c index d6e9c0f2a..8f77205cb 100644 --- a/src/backend/distributed/operations/stage_protocol.c +++ b/src/backend/distributed/operations/stage_protocol.c @@ -923,7 +923,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam } errno = 0; - uint64 tableSize = pg_strtouint64(tableSizeString, &tableSizeStringEnd, 0); + uint64 tableSize = strtou64(tableSizeString, &tableSizeStringEnd, 0); if (errno != 0 || (*tableSizeStringEnd) != '\0') { PQclear(queryResult); diff --git a/src/backend/distributed/planner/cte_inline.c b/src/backend/distributed/planner/cte_inline.c index 4a3ba156f..2356ebf48 100644 --- a/src/backend/distributed/planner/cte_inline.c +++ b/src/backend/distributed/planner/cte_inline.c @@ -12,6 +12,7 @@ *------------------------------------------------------------------------- */ #include "postgres.h" +#include "pg_version_compat.h" #include "distributed/pg_version_constants.h" #include "distributed/cte_inline.h" @@ -309,7 +310,7 @@ inline_cte_walker(Node *node, inline_cte_walker_context *context) */ if (columnAliasCount >= columnIndex) { - Value *columnAlias = (Value *) list_nth(columnAliasList, columnIndex - 1); + String *columnAlias = (String *) list_nth(columnAliasList, columnIndex - 1); Assert(IsA(columnAlias, String)); TargetEntry *targetEntry = list_nth(rte->subquery->targetList, columnIndex - 1); diff --git a/src/backend/distributed/planner/distributed_planner.c b/src/backend/distributed/planner/distributed_planner.c index 6e053cecd..39f6c0b63 100644 --- a/src/backend/distributed/planner/distributed_planner.c +++ b/src/backend/distributed/planner/distributed_planner.c @@ -1065,7 +1065,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi /* - * EnsurePartitionTableNotReplicated errors out if the infput relation is + * EnsurePartitionTableNotReplicated errors out if the input relation is * a partition table and the table has a replication factor greater than * one. * @@ -1353,7 +1353,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan) TargetEntry *targetEntry = NULL; foreach_ptr(targetEntry, customScan->scan.plan.targetlist) { - Value *columnName = makeString(targetEntry->resname); + String *columnName = makeString(targetEntry->resname); columnNameList = lappend(columnNameList, columnName); } diff --git a/src/backend/distributed/planner/fast_path_router_planner.c b/src/backend/distributed/planner/fast_path_router_planner.c index 8a2d87fe7..5d02be07c 100644 --- a/src/backend/distributed/planner/fast_path_router_planner.c +++ b/src/backend/distributed/planner/fast_path_router_planner.c @@ -102,15 +102,15 @@ PlannedStmt * GeneratePlaceHolderPlannedStmt(Query *parse) { PlannedStmt *result = makeNode(PlannedStmt); - SeqScan *seqScanNode = makeNode(SeqScan); - Plan *plan = &seqScanNode->plan; + Scan *scanNode = makeNode(Scan); + Plan *plan = &scanNode->plan; Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; AssertArg(FastPathRouterQuery(parse, &distKey)); /* there is only a single relation rte */ - seqScanNode->scanrelid = 1; + scanNode->scanrelid = 1; plan->targetlist = copyObject(FetchStatementTargetList((Node *) parse)); diff --git a/src/backend/distributed/planner/multi_explain.c b/src/backend/distributed/planner/multi_explain.c index 4c6370e5a..a807085af 100644 --- a/src/backend/distributed/planner/multi_explain.c +++ b/src/backend/distributed/planner/multi_explain.c @@ -1062,8 +1062,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS) } /* resolve OIDs of unknown (user-defined) types */ - Query *analyzedQuery = parse_analyze_varparams(parseTree, queryString, - ¶mTypes, &numParams); + Query *analyzedQuery = parse_analyze_varparams_compat(parseTree, queryString, + ¶mTypes, &numParams, NULL); #if PG_VERSION_NUM >= PG_VERSION_14 diff --git a/src/backend/distributed/planner/multi_physical_planner.c b/src/backend/distributed/planner/multi_physical_planner.c index a26bf158d..b8d87c4b7 100644 --- a/src/backend/distributed/planner/multi_physical_planner.c +++ b/src/backend/distributed/planner/multi_physical_planner.c @@ -798,7 +798,7 @@ DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId) appendStringInfo(columnName, UINT64_FORMAT "_", generatingJobId); appendStringInfo(columnName, "%u", columnIndex); - Value *columnValue = makeString(columnName->data); + String *columnValue = makeString(columnName->data); columnNameList = lappend(columnNameList, columnValue); } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 017b46149..7c57a77f2 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -151,7 +151,7 @@ static Job * RouterJob(Query *originalQuery, static bool RelationPrunesToMultipleShards(List *relationShardList); static void NormalizeMultiRowInsertTargetList(Query *query); static void AppendNextDummyColReference(Alias *expendedReferenceNames); -static Value * MakeDummyColumnString(int dummyColumnId); +static String * MakeDummyColumnString(int dummyColumnId); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * GroupInsertValuesByShardId(List *insertValuesList); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); @@ -3249,7 +3249,7 @@ AppendNextDummyColReference(Alias *expendedReferenceNames) { int existingColReferences = list_length(expendedReferenceNames->colnames); int nextColReferenceId = existingColReferences + 1; - Value *missingColumnString = MakeDummyColumnString(nextColReferenceId); + String *missingColumnString = MakeDummyColumnString(nextColReferenceId); expendedReferenceNames->colnames = lappend(expendedReferenceNames->colnames, missingColumnString); } @@ -3259,12 +3259,12 @@ AppendNextDummyColReference(Alias *expendedReferenceNames) * MakeDummyColumnString returns a String (Value) object by appending given * integer to end of the "column" string. */ -static Value * +static String * MakeDummyColumnString(int dummyColumnId) { StringInfo dummyColumnStringInfo = makeStringInfo(); appendStringInfo(dummyColumnStringInfo, "column%d", dummyColumnId); - Value *dummyColumnString = makeString(dummyColumnStringInfo->data); + String *dummyColumnString = makeString(dummyColumnStringInfo->data); return dummyColumnString; } diff --git a/src/backend/distributed/planner/recursive_planning.c b/src/backend/distributed/planner/recursive_planning.c index 9138b1b80..e84c821fa 100644 --- a/src/backend/distributed/planner/recursive_planning.c +++ b/src/backend/distributed/planner/recursive_planning.c @@ -1952,7 +1952,7 @@ BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList, */ if (columnAliasCount >= columnNumber) { - Value *columnAlias = (Value *) list_nth(columnAliasList, columnNumber - 1); + String *columnAlias = (String *) list_nth(columnAliasList, columnNumber - 1); Assert(IsA(columnAlias, String)); newTargetEntry->resname = strVal(columnAlias); } diff --git a/src/backend/distributed/relay/relay_event_utility.c b/src/backend/distributed/relay/relay_event_utility.c index 7388ff383..8f4821bc1 100644 --- a/src/backend/distributed/relay/relay_event_utility.c +++ b/src/backend/distributed/relay/relay_event_utility.c @@ -326,8 +326,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX || objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER) { - Value *relationSchemaNameValue = NULL; - Value *relationNameValue = NULL; + String *relationSchemaNameValue = NULL; + String *relationNameValue = NULL; uint32 dropCount = list_length(dropStmt->objects); if (dropCount > 1) @@ -381,11 +381,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId) /* prefix with schema name if it is not added already */ if (relationSchemaNameValue == NULL) { - Value *schemaNameValue = makeString(pstrdup(schemaName)); + String *schemaNameValue = makeString(pstrdup(schemaName)); relationNameList = lcons(schemaNameValue, relationNameList); } - char **relationName = &(relationNameValue->val.str); + char **relationName = &(strVal(relationNameValue)); AppendShardIdToName(relationName, shardId); } else if (objectType == OBJECT_POLICY) @@ -750,10 +750,10 @@ UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId) * extend the penultimate element with the shardId. */ int colrefFieldCount = list_length(columnRef->fields); - Value *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); + String *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); Assert(IsA(relnameValue, String)); - AppendShardIdToName(&relnameValue->val.str, *shardId); + AppendShardIdToName(&strVal(relnameValue), *shardId); } /* might be more than one ColumnRef to visit */ diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index 00b541968..fddd1b49d 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -159,9 +159,9 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra, static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source); static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSource source); -static bool HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, - GucSource source); -static void HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra); +static bool ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, + GucSource source); +static void ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra); static void ApplicationNameAssignHook(const char *newval, void *extra); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static void NodeConninfoGucAssignHook(const char *newval, void *extra); @@ -1174,24 +1174,6 @@ RegisterCitusConfigVariables(void) GUC_NO_SHOW_ALL, NULL, NULL, NULL); - DefineCustomStringVariable( - "citus.hide_shards_from_app_name_prefixes", - gettext_noop("If application_name starts with one of these values, hide shards"), - gettext_noop("Citus places distributed tables and shards in the same schema. " - "That can cause confusion when inspecting the list of tables on " - "a node with shards. This GUC can be used to hide the shards from " - "pg_class for certain applications based on the application_name " - "of the connection. The default is *, which hides shards from all " - "applications. This behaviour can be overridden using the " - "citus.override_table_visibility setting"), - &HideShardsFromAppNamePrefixes, - "*", - PGC_USERSET, - GUC_STANDARD, - HideShardsFromAppNamePrefixesCheckHook, - HideShardsFromAppNamePrefixesAssignHook, - NULL); - DefineCustomIntVariable( "citus.isolation_test_session_process_id", NULL, @@ -1716,6 +1698,25 @@ RegisterCitusConfigVariables(void) GUC_STANDARD, NULL, NULL, NULL); + DefineCustomStringVariable( + "citus.show_shards_for_app_name_prefixes", + gettext_noop("If application_name starts with one of these values, show shards"), + gettext_noop("Citus places distributed tables and shards in the same schema. " + "That can cause confusion when inspecting the list of tables on " + "a node with shards. By default the shards are hidden from " + "pg_class. This GUC can be used to show the shards to certain " + "applications based on the application_name of the connection. " + "The default is empty string, which hides shards from all " + "applications. This behaviour can be overridden using the " + "citus.override_table_visibility setting"), + &ShowShardsForAppNamePrefixes, + "", + PGC_USERSET, + GUC_STANDARD, + ShowShardsForAppNamePrefixesCheckHook, + ShowShardsForAppNamePrefixesAssignHook, + NULL); + DefineCustomBoolVariable( "citus.sort_returning", gettext_noop("Sorts the RETURNING clause to get consistent test output"), @@ -1985,12 +1986,12 @@ WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source) /* - * HideShardsFromAppNamePrefixesCheckHook ensures that the - * citus.hide_shards_from_app_name_prefixes holds a valid list of application_name + * ShowShardsForAppNamePrefixesCheckHook ensures that the + * citus.show_shards_for_app_name_prefixes holds a valid list of application_name * values. */ static bool -HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source) +ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source) { List *prefixList = NIL; @@ -2020,7 +2021,7 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so if (strcmp(prefixAscii, appNamePrefix) != 0) { - GUC_check_errdetail("prefix %s in citus.hide_shards_from_app_name_prefixes " + GUC_check_errdetail("prefix %s in citus.show_shards_for_app_name_prefixes " "contains non-ascii characters", appNamePrefix); return false; } @@ -2031,12 +2032,12 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so /* - * HideShardsFromAppNamePrefixesAssignHook ensures changes to - * citus.hide_shards_from_app_name_prefixes are reflected in the decision + * ShowShardsForAppNamePrefixesAssignHook ensures changes to + * citus.show_shards_for_app_name_prefixes are reflected in the decision * whether or not to show shards. */ static void -HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra) +ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra) { ResetHideShardsDecision(); } diff --git a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql index 7f39b5980..54fdd2f44 100644 --- a/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql +++ b/src/backend/distributed/sql/citus--11.0-1--11.0-2.sql @@ -1 +1,2 @@ --- bump version to 11.0-2 +#include "udfs/citus_shards_on_worker/11.0-2.sql" +#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql" diff --git a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql index 163dca315..fca765ba7 100644 --- a/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql +++ b/src/backend/distributed/sql/downgrades/citus--11.0-2--11.0-1.sql @@ -1 +1,2 @@ --- bump down version to 11.0-1 +#include "../udfs/citus_shards_on_worker/11.0-1.sql" +#include "../udfs/citus_shard_indexes_on_worker/11.0-1.sql" diff --git a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql new file mode 100644 index 000000000..fd4684b18 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/11.0-2.sql @@ -0,0 +1,39 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker( + OUT schema_name name, + OUT index_name name, + OUT table_type text, + OUT owner_name name, + OUT shard_name name) + RETURNS SETOF record + LANGUAGE plpgsql + SET citus.show_shards_for_app_name_prefixes = '*' + AS $$ +BEGIN + -- this is the query that \di produces, except pg_table_is_visible + -- is replaced with pg_catalog.relation_is_a_known_shard(c.oid) + RETURN QUERY + SELECT n.nspname as "Schema", + c.relname as "Name", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner", + c2.relname as "Table" + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + LEFT JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid + LEFT JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid + WHERE c.relkind IN ('i','') + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + AND n.nspname !~ '^pg_toast' + AND pg_catalog.relation_is_a_known_shard(c.oid) + ORDER BY 1,2; +END; +$$; + +CREATE OR REPLACE VIEW pg_catalog.citus_shard_indexes_on_worker AS + SELECT schema_name as "Schema", + index_name as "Name", + table_type as "Type", + owner_name as "Owner", + shard_name as "Table" + FROM pg_catalog.citus_shard_indexes_on_worker() s; diff --git a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql index d98cdafe5..fd4684b18 100644 --- a/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_shard_indexes_on_worker/latest.sql @@ -6,7 +6,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker( OUT shard_name name) RETURNS SETOF record LANGUAGE plpgsql - SET citus.hide_shards_from_app_name_prefixes = '' + SET citus.show_shards_for_app_name_prefixes = '*' AS $$ BEGIN -- this is the query that \di produces, except pg_table_is_visible diff --git a/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql b/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql new file mode 100644 index 000000000..dbb7498e8 --- /dev/null +++ b/src/backend/distributed/sql/udfs/citus_shards_on_worker/11.0-2.sql @@ -0,0 +1,34 @@ +CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker( + OUT schema_name name, + OUT shard_name name, + OUT table_type text, + OUT owner_name name) + RETURNS SETOF record + LANGUAGE plpgsql + SET citus.show_shards_for_app_name_prefixes = '*' + AS $$ +BEGIN + -- this is the query that \d produces, except pg_table_is_visible + -- is replaced with pg_catalog.relation_is_a_known_shard(c.oid) + RETURN QUERY + SELECT n.nspname as "Schema", + c.relname as "Name", + CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type", + pg_catalog.pg_get_userbyid(c.relowner) as "Owner" + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.relkind IN ('r','p','v','m','S','f','') + AND n.nspname <> 'pg_catalog' + AND n.nspname <> 'information_schema' + AND n.nspname !~ '^pg_toast' + AND pg_catalog.relation_is_a_known_shard(c.oid) + ORDER BY 1,2; +END; +$$; + +CREATE OR REPLACE VIEW pg_catalog.citus_shards_on_worker AS + SELECT schema_name as "Schema", + shard_name as "Name", + table_type as "Type", + owner_name as "Owner" + FROM pg_catalog.citus_shards_on_worker() s; diff --git a/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql b/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql index 895c92ae8..dbb7498e8 100644 --- a/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql +++ b/src/backend/distributed/sql/udfs/citus_shards_on_worker/latest.sql @@ -5,7 +5,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker( OUT owner_name name) RETURNS SETOF record LANGUAGE plpgsql - SET citus.hide_shards_from_app_name_prefixes = '' + SET citus.show_shards_for_app_name_prefixes = '*' AS $$ BEGIN -- this is the query that \d produces, except pg_table_is_visible diff --git a/src/backend/distributed/test/deparse_shard_query.c b/src/backend/distributed/test/deparse_shard_query.c index 1961ad52d..a6196146f 100644 --- a/src/backend/distributed/test/deparse_shard_query.c +++ b/src/backend/distributed/test/deparse_shard_query.c @@ -49,9 +49,9 @@ deparse_shard_query_test(PG_FUNCTION_ARGS) Node *parsetree = NULL; foreach_ptr(parsetree, parseTreeList) { - List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, - queryStringChar, - NULL, 0, NULL); + List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree, + queryStringChar, + NULL, 0, NULL); Query *query = NULL; foreach_ptr(query, queryTreeList) diff --git a/src/backend/distributed/test/distribution_metadata.c b/src/backend/distributed/test/distribution_metadata.c index f9afd3b68..6d769ef27 100644 --- a/src/backend/distributed/test/distribution_metadata.c +++ b/src/backend/distributed/test/distribution_metadata.c @@ -259,9 +259,9 @@ relation_count_in_query(PG_FUNCTION_ARGS) Node *parsetree = NULL; foreach_ptr(parsetree, parseTreeList) { - List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, - queryStringChar, - NULL, 0, NULL); + List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree, + queryStringChar, + NULL, 0, NULL); Query *query = NULL; foreach_ptr(query, queryTreeList) diff --git a/src/backend/distributed/test/fake_am.c b/src/backend/distributed/test/fake_am.c index ce7784510..5a8ede316 100644 --- a/src/backend/distributed/test/fake_am.c +++ b/src/backend/distributed/test/fake_am.c @@ -20,6 +20,7 @@ #include "postgres.h" #include "distributed/pg_version_constants.h" +#include "pg_version_compat.h" #include "access/amapi.h" @@ -325,7 +326,7 @@ fake_relation_set_new_filenode(Relation rel, */ *minmulti = GetOldestMultiXactId(); - SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); + SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true); /* * If required, set up an init fork for an unlogged table so that it can @@ -446,20 +447,17 @@ fake_relation_size(Relation rel, ForkNumber forkNumber) uint64 nblocks = 0; - /* Open it at the smgr level if not already done */ - RelationOpenSmgr(rel); - /* InvalidForkNumber indicates returning the size for all forks */ if (forkNumber == InvalidForkNumber) { for (int i = 0; i < MAX_FORKNUM; i++) { - nblocks += smgrnblocks(rel->rd_smgr, i); + nblocks += smgrnblocks(RelationGetSmgr(rel), i); } } else { - nblocks = smgrnblocks(rel->rd_smgr, forkNumber); + nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber); } return nblocks * BLCKSZ; diff --git a/src/backend/distributed/test/shard_rebalancer.c b/src/backend/distributed/test/shard_rebalancer.c index ea770cb6e..f3640f415 100644 --- a/src/backend/distributed/test/shard_rebalancer.c +++ b/src/backend/distributed/test/shard_rebalancer.c @@ -28,7 +28,6 @@ #include "funcapi.h" #include "miscadmin.h" #include "utils/builtins.h" -#include "utils/int8.h" #include "utils/json.h" #include "utils/lsyscache.h" #include "utils/memutils.h" diff --git a/src/backend/distributed/test/shared_connection_counters.c b/src/backend/distributed/test/shared_connection_counters.c index e95a2ccbb..641cfd314 100644 --- a/src/backend/distributed/test/shared_connection_counters.c +++ b/src/backend/distributed/test/shared_connection_counters.c @@ -37,6 +37,29 @@ wake_up_connection_pool_waiters(PG_FUNCTION_ARGS) } +/* + * makeIntConst creates a Const Node that stores a given integer + * + * copied from backend/parser/gram.c + */ +static Node * +makeIntConst(int val, int location) +{ + A_Const *n = makeNode(A_Const); + +#if PG_VERSION_NUM >= PG_VERSION_15 + n->val.ival.type = T_Integer; + n->val.ival.ival = val; +#else + n->val.type = T_Integer; + n->val.val.ival = val; +#endif + n->location = location; + + return (Node *) n; +} + + /* * set_max_shared_pool_size is a SQL * interface for setting MaxSharedPoolSize. We use this function in isolation @@ -49,9 +72,8 @@ set_max_shared_pool_size(PG_FUNCTION_ARGS) AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt)); - A_Const *aConstValue = makeNode(A_Const); + A_Const *aConstValue = castNode(A_Const, makeIntConst(value, 0)); - aConstValue->val = *makeInteger(value); alterSystemStmt->setstmt = makeNode(VariableSetStmt); alterSystemStmt->setstmt->name = "citus.max_shared_pool_size"; alterSystemStmt->setstmt->is_local = false; diff --git a/src/backend/distributed/transaction/lock_graph.c b/src/backend/distributed/transaction/lock_graph.c index 62b5e4e04..e672dafd8 100644 --- a/src/backend/distributed/transaction/lock_graph.c +++ b/src/backend/distributed/transaction/lock_graph.c @@ -309,7 +309,7 @@ ParseIntField(PGresult *result, int rowIndex, int colIndex) char *resultString = PQgetvalue(result, rowIndex, colIndex); - return pg_strtouint64(resultString, NULL, 10); + return strtou64(resultString, NULL, 10); } diff --git a/src/backend/distributed/transaction/remote_transaction.c b/src/backend/distributed/transaction/remote_transaction.c index 2859ec4c9..55a560575 100644 --- a/src/backend/distributed/transaction/remote_transaction.c +++ b/src/backend/distributed/transaction/remote_transaction.c @@ -1408,7 +1408,7 @@ ParsePreparedTransactionName(char *preparedTransactionName, /* step ahead of the current '_' character */ ++currentCharPointer; - *transactionNumber = pg_strtouint64(currentCharPointer, NULL, 10); + *transactionNumber = strtou64(currentCharPointer, NULL, 10); if ((*transactionNumber == 0 && errno != 0) || (*transactionNumber == ULLONG_MAX && errno == ERANGE)) { diff --git a/src/backend/distributed/transaction/transaction_management.c b/src/backend/distributed/transaction/transaction_management.c index d7c0d7279..f4472ba95 100644 --- a/src/backend/distributed/transaction/transaction_management.c +++ b/src/backend/distributed/transaction/transaction_management.c @@ -205,8 +205,17 @@ InCoordinatedTransaction(void) void Use2PCForCoordinatedTransaction(void) { - Assert(InCoordinatedTransaction()); - + /* + * If this transaction is also a coordinated + * transaction, use 2PC. Otherwise, this + * state change does nothing. + * + * In other words, when this flag is set, + * we "should" use 2PC when needed (e.g., + * we are in a coordinated transaction and + * the coordinated transaction does a remote + * modification). + */ ShouldCoordinatedTransactionUse2PC = true; } diff --git a/src/backend/distributed/utils/multi_partitioning_utils.c b/src/backend/distributed/utils/multi_partitioning_utils.c index a7477e5e5..e11bc5419 100644 --- a/src/backend/distributed/utils/multi_partitioning_utils.c +++ b/src/backend/distributed/utils/multi_partitioning_utils.c @@ -917,7 +917,7 @@ try_relation_open_nolock(Oid relationId) return NULL; } - pgstat_initstats(relation); + pgstat_init_relation(relation); return relation; } diff --git a/src/backend/distributed/worker/worker_data_fetch_protocol.c b/src/backend/distributed/worker/worker_data_fetch_protocol.c index 5e78c19ce..cbc7af89a 100644 --- a/src/backend/distributed/worker/worker_data_fetch_protocol.c +++ b/src/backend/distributed/worker/worker_data_fetch_protocol.c @@ -387,7 +387,7 @@ ExtractShardIdFromTableName(const char *tableName, bool missingOk) shardIdString++; errno = 0; - uint64 shardId = pg_strtouint64(shardIdString, &shardIdStringEnd, 0); + uint64 shardId = strtou64(shardIdString, &shardIdStringEnd, 0); if (errno != 0 || (*shardIdStringEnd != '\0')) { diff --git a/src/backend/distributed/worker/worker_shard_visibility.c b/src/backend/distributed/worker/worker_shard_visibility.c index da9c87a22..e482a955c 100644 --- a/src/backend/distributed/worker/worker_shard_visibility.c +++ b/src/backend/distributed/worker/worker_shard_visibility.c @@ -40,8 +40,8 @@ typedef enum HideShardsMode bool OverrideTableVisibility = true; bool EnableManualChangesToShards = false; -/* hide shards when the application_name starts with one of: */ -char *HideShardsFromAppNamePrefixes = "*"; +/* show shards when the application_name starts with one of: */ +char *ShowShardsForAppNamePrefixes = ""; /* cache of whether or not to hide shards */ static HideShardsMode HideShards = CHECK_APPLICATION_NAME; @@ -271,8 +271,8 @@ RelationIsAKnownShard(Oid shardRelationId) /* * HideShardsFromSomeApplications transforms queries to pg_class to - * filter out known shards if the application_name matches any of - * the prefixes in citus.hide_shards_from_app_name_prefixes. + * filter out known shards if the application_name does not match any of + * the prefixes in citus.show_shards_for_app_name_prefix. */ void HideShardsFromSomeApplications(Query *query) @@ -294,7 +294,7 @@ HideShardsFromSomeApplications(Query *query) * ShouldHideShards returns whether we should hide shards in the current * session. It only checks the application_name once and then uses a * cached response unless either the application_name or - * citus.hide_shards_from_app_name_prefixes changes. + * citus.show_shards_for_app_name_prefix changes. */ static bool ShouldHideShards(void) @@ -367,32 +367,33 @@ ShouldHideShardsInternal(void) List *prefixList = NIL; /* SplitGUCList scribbles on the input */ - char *splitCopy = pstrdup(HideShardsFromAppNamePrefixes); + char *splitCopy = pstrdup(ShowShardsForAppNamePrefixes); if (!SplitGUCList(splitCopy, ',', &prefixList)) { /* invalid GUC value, ignore */ - return false; + return true; } char *appNamePrefix = NULL; foreach_ptr(appNamePrefix, prefixList) { - /* always hide shards when one of the prefixes is * */ + /* never hide shards when one of the prefixes is * */ if (strcmp(appNamePrefix, "*") == 0) { - return true; + return false; } /* compare only the first first characters */ int prefixLength = strlen(appNamePrefix); if (strncmp(application_name, appNamePrefix, prefixLength) == 0) { - return true; + return false; } } - return false; + /* default behaviour: hide shards */ + return true; } diff --git a/src/include/columnar/columnar_version_compat.h b/src/include/columnar/columnar_version_compat.h index 45b8a0e55..611b40d15 100644 --- a/src/include/columnar/columnar_version_compat.h +++ b/src/include/columnar/columnar_version_compat.h @@ -14,6 +14,14 @@ #include "distributed/pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_15 +#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \ + ExecARDeleteTriggers(a, b, c, d, e, f) +#else +#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \ + ExecARDeleteTriggers(a, b, c, d, e) +#endif + #if PG_VERSION_NUM >= PG_VERSION_14 #define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \ ColumnarProcessUtility(a, b, c, d, e, f, g, h) diff --git a/src/include/distributed/commands/utility_hook.h b/src/include/distributed/commands/utility_hook.h index 615a7c6d2..246d413d9 100644 --- a/src/include/distributed/commands/utility_hook.h +++ b/src/include/distributed/commands/utility_hook.h @@ -50,13 +50,13 @@ extern bool InDelegatedProcedureCall; /* * A DDLJob encapsulates the remote tasks and commands needed to process all or - * part of a distributed DDL command. It hold the distributed relation's oid, + * part of a distributed DDL command. It hold the target object's address, * the original DDL command string (for MX DDL propagation), and a task list of * DDL_TASK-type Tasks to be executed. */ typedef struct DDLJob { - Oid targetRelationId; /* oid of the target distributed relation */ + ObjectAddress targetObjectAddress; /* target distributed object address */ /* * Whether to commit and start a new transaction before sending commands diff --git a/src/include/distributed/metadata_sync.h b/src/include/distributed/metadata_sync.h index e67726bfc..babecd210 100644 --- a/src/include/distributed/metadata_sync.h +++ b/src/include/distributed/metadata_sync.h @@ -13,6 +13,7 @@ #define METADATA_SYNC_H +#include "distributed/commands/utility_hook.h" #include "distributed/coordinator_protocol.h" #include "distributed/metadata_cache.h" #include "nodes/pg_list.h" @@ -34,6 +35,7 @@ extern void SyncCitusTableMetadata(Oid relationId); extern void EnsureSequentialModeMetadataOperations(void); extern bool ClusterHasKnownMetadataWorkers(void); extern char * LocalGroupIdUpdateCommand(int32 groupId); +extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress); extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern List * NodeMetadataCreateCommands(void); diff --git a/src/include/distributed/worker_shard_visibility.h b/src/include/distributed/worker_shard_visibility.h index 957992fed..7eea5fbf7 100644 --- a/src/include/distributed/worker_shard_visibility.h +++ b/src/include/distributed/worker_shard_visibility.h @@ -15,7 +15,7 @@ extern bool OverrideTableVisibility; extern bool EnableManualChangesToShards; -extern char *HideShardsFromAppNamePrefixes; +extern char *ShowShardsForAppNamePrefixes; extern void HideShardsFromSomeApplications(Query *query); diff --git a/src/include/pg_version_compat.h b/src/include/pg_version_compat.h index ad7a8bbb0..2f076cf07 100644 --- a/src/include/pg_version_compat.h +++ b/src/include/pg_version_compat.h @@ -13,6 +13,55 @@ #include "distributed/pg_version_constants.h" +#if PG_VERSION_NUM >= PG_VERSION_15 +#define ProcessCompletedNotifies() +#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b, c) +#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d, \ + e) +#else + +#include "nodes/value.h" +#include "storage/smgr.h" +#include "utils/int8.h" +#include "utils/rel.h" + +typedef Value String; + +#ifdef HAVE_LONG_INT_64 +#define strtoi64(str, endptr, base) ((int64) strtol(str, endptr, base)) +#define strtou64(str, endptr, base) ((uint64) strtoul(str, endptr, base)) +#else +#define strtoi64(str, endptr, base) ((int64) strtoll(str, endptr, base)) +#define strtou64(str, endptr, base) ((uint64) strtoull(str, endptr, base)) +#endif +#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b) +#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d) +#define pgstat_init_relation(r) pgstat_initstats(r) +#define pg_analyze_and_rewrite_fixedparams(a, b, c, d, e) pg_analyze_and_rewrite(a, b, c, \ + d, e) + +static inline int64 +pg_strtoint64(char *s) +{ + int64 result; + (void) scanint8(s, false, &result); + return result; +} + + +static inline SMgrRelation +RelationGetSmgr(Relation rel) +{ + if (unlikely(rel->rd_smgr == NULL)) + { + smgrsetowner(&(rel->rd_smgr), smgropen(rel->rd_node, rel->rd_backend)); + } + return rel->rd_smgr; +} + + +#endif + #if PG_VERSION_NUM >= PG_VERSION_14 #define AlterTableStmtObjType_compat(a) ((a)->objtype) #define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b) diff --git a/src/test/regress/expected/isolation_master_update_node_2.out b/src/test/regress/expected/isolation_master_update_node_2.out new file mode 100644 index 000000000..46e0d23d5 --- /dev/null +++ b/src/test/regress/expected/isolation_master_update_node_2.out @@ -0,0 +1,68 @@ +Parsed test spec with 2 sessions + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1: + -- update a specific node by address + SELECT master_update_node(nodeid, 'localhost', nodeport + 10) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s1-abort: ABORT; +step s2-update-node-1: <... completed> +master_update_node +--------------------------------------------------------------------- + +(1 row) + +step s2-abort: ABORT; +master_remove_node +--------------------------------------------------------------------- + + +(2 rows) + + +starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort +create_distributed_table +--------------------------------------------------------------------- + +(1 row) + +step s1-begin: BEGIN; +step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100); +step s2-begin: BEGIN; +step s2-update-node-1-force: + -- update a specific node by address (force) + SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100) + FROM pg_dist_node + WHERE nodename = 'localhost' + AND nodeport = 57637; + +step s2-update-node-1-force: <... completed> +master_update_node +--------------------------------------------------------------------- + +(1 row) + +step s2-abort: ABORT; +step s1-abort: ABORT; +FATAL: terminating connection due to administrator command +server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. + +master_remove_node +--------------------------------------------------------------------- + + +(2 rows) + diff --git a/src/test/regress/expected/local_shard_execution.out b/src/test/regress/expected/local_shard_execution.out index 0bb82a08e..6c3407bf4 100644 --- a/src/test/regress/expected/local_shard_execution.out +++ b/src/test/regress/expected/local_shard_execution.out @@ -1989,6 +1989,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); (1 row) INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + create_distributed_table +--------------------------------------------------------------------- + +(1 row) + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -2438,6 +2449,755 @@ DEBUG: Creating router plan 17 | 777 | no (2 rows) +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +SET citus.log_remote_commands TO ON; +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 2 +NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +DEBUG: Distributed planning for a fast-path router query +DEBUG: Creating router plan +DEBUG: query has a single distribution column value: 1 +NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1) + event_id | user_id | response +--------------------------------------------------------------------- +(0 rows) + +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + t +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +ERROR: The transaction is not a coordinated transaction +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 13 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + event_id | user_id | response +--------------------------------------------------------------------- + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes + 2 | 2 | yes +(9 rows) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 9 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 17 +(1 row) + + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + event_id | user_id | response +--------------------------------------------------------------------- + 1 | 2 | yes +(1 row) + + SELECT count(*) FROM event_responses_no_pkey; + count +--------------------------------------------------------------------- + 20 +(1 row) + + SELECT coordinated_transaction_should_use_2PC(); + coordinated_transaction_should_use_2pc +--------------------------------------------------------------------- + f +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + f +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; + bool_or +--------------------------------------------------------------------- + t +(1 row) + +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 1 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + count +--------------------------------------------------------------------- + 10 +(1 row) + +COMMIT; +SELECT count(*) FROM pg_dist_transaction; + count +--------------------------------------------------------------------- + 0 +(1 row) + +SELECT recover_prepared_transactions(); + recover_prepared_transactions +--------------------------------------------------------------------- + 0 +(1 row) + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the -- local host diff --git a/src/test/regress/expected/multi_mx_hide_shard_names.out b/src/test/regress/expected/multi_mx_hide_shard_names.out index c0265b282..60003caa2 100644 --- a/src/test/regress/expected/multi_mx_hide_shard_names.out +++ b/src/test/regress/expected/multi_mx_hide_shard_names.out @@ -114,7 +114,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name (2 rows) -- changing application_name reveals the shards -SET application_name TO ''; +SET application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- @@ -137,7 +137,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- changing application_name in transaction reveals the shards BEGIN; -SET LOCAL application_name TO ''; +SET LOCAL application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- @@ -160,7 +160,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- now with session-level GUC, but ROLLBACK; BEGIN; -SET application_name TO ''; +SET application_name TO 'pg_regress'; ROLLBACK; -- shards are hidden again after GUCs are reset SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; @@ -173,7 +173,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- we should hide correctly based on application_name with savepoints BEGIN; SAVEPOINT s1; -SET application_name TO ''; +SET application_name TO 'pg_regress'; -- changing application_name reveals the shards SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname @@ -196,9 +196,9 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name (2 rows) ROLLBACK; --- changing citus.hide_shards_from_app_name_prefixes reveals the shards +-- changing citus.show_shards_for_app_name_prefix reveals the shards BEGIN; -SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; +SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; relname --------------------------------------------------------------------- diff --git a/src/test/regress/expected/multi_remove_node_reference_table.out b/src/test/regress/expected/multi_remove_node_reference_table.out index b2d38a196..c39b20735 100644 --- a/src/test/regress/expected/multi_remove_node_reference_table.out +++ b/src/test/regress/expected/multi_remove_node_reference_table.out @@ -978,6 +978,12 @@ ORDER BY shardid ASC; (0 rows) \c - - - :master_port +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT citus_disable_node('localhost', :worker_2_port); citus_disable_node --------------------------------------------------------------------- @@ -997,6 +1003,19 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; 1 (1 row) +-- never mark coordinator metadatasynced = false +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port; + hasmetadata | metadatasynced +--------------------------------------------------------------------- + t | t +(1 row) + +SELECT 1 FROM citus_remove_node('localhost', :master_port); + ?column? +--------------------------------------------------------------------- + 1 +(1 row) + SELECT shardid, shardstate, shardlength, nodename, nodeport FROM diff --git a/src/test/regress/expected/single_node.out b/src/test/regress/expected/single_node.out index c854ec48a..5051bbee9 100644 --- a/src/test/regress/expected/single_node.out +++ b/src/test/regress/expected/single_node.out @@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_ (1 row) ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; -NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1 - coordinated_transaction_should_use_2pc ---------------------------------------------------------------------- - t -(1 row) - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false; diff --git a/src/test/regress/multi_mx_schedule b/src/test/regress/multi_mx_schedule index ff5cccf11..a88b4ac0d 100644 --- a/src/test/regress/multi_mx_schedule +++ b/src/test/regress/multi_mx_schedule @@ -43,7 +43,9 @@ test: coordinator_evaluation_modify test: coordinator_evaluation_select test: multi_mx_call test: multi_mx_function_call_delegation -test: multi_mx_modifications local_shard_execution local_shard_execution_replicated +test: multi_mx_modifications local_shard_execution_replicated +# the following test has to be run sequentially +test: local_shard_execution test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: local_shard_copy test: undistribute_table_cascade_mx diff --git a/src/test/regress/pg_regress_multi.pl b/src/test/regress/pg_regress_multi.pl index 9b03b88d8..b3147b06b 100755 --- a/src/test/regress/pg_regress_multi.pl +++ b/src/test/regress/pg_regress_multi.pl @@ -466,7 +466,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); # Some tests look at shards in pg_class, make sure we can usually see them: -push(@pgOptions, "citus.hide_shards_from_app_name_prefixes='psql,pg_dump'"); +push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'"); # we disable slow start by default to encourage parallelism within tests push(@pgOptions, "citus.executor_slow_start_interval=0ms"); diff --git a/src/test/regress/sql/local_shard_execution.sql b/src/test/regress/sql/local_shard_execution.sql index b74966227..76ed8b555 100644 --- a/src/test/regress/sql/local_shard_execution.sql +++ b/src/test/regress/sql/local_shard_execution.sql @@ -928,6 +928,17 @@ SELECT create_distributed_table('event_responses', 'event_id'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); + +CREATE TABLE event_responses_no_pkey ( + event_id int, + user_id int, + response invite_resp +); + +SELECT create_distributed_table('event_responses_no_pkey', 'event_id'); + + + CREATE OR REPLACE FUNCTION regular_func(p invite_resp) RETURNS int AS $$ DECLARE @@ -1120,6 +1131,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no') ON CONFLICT (event_id, user_id) DO UPDATE SET response = EXCLUDED.response RETURNING *; +-- set back to sane settings +RESET citus.enable_local_execution; +RESET citus.enable_fast_path_router_planner; + + +-- we'll test some 2PC states +SET citus.enable_metadata_sync TO OFF; + +-- coordinated_transaction_should_use_2PC prints the internal +-- state for 2PC decision on Citus. However, even if 2PC is decided, +-- we may not necessarily use 2PC over a connection unless it does +-- a modification +CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC() +RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus', +$$coordinated_transaction_should_use_2PC$$; + +-- make tests consistent +SET citus.max_adaptive_executor_pool_size TO 1; + +RESET citus.enable_metadata_sync; +SELECT recover_prepared_transactions(); + + +SET citus.log_remote_commands TO ON; + +-- we use event_id = 2 for local execution and event_id = 1 for reemote execution +--show it here, if anything changes here, all the tests below might be broken +-- we prefer this to avoid excessive logging below +SELECT * FROM event_responses_no_pkey WHERE event_id = 2; +SELECT * FROM event_responses_no_pkey WHERE event_id = 1; +RESET citus.log_remote_commands; +RESET citus.log_local_commands; +RESET client_min_messages; + +-- single shard local command without transaction block does set the +-- internal state for 2PC, but does not require any actual entries +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local commands without transaction block set the internal 2PC state +-- but does not use remotely +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by another single shard +-- local modification sets the 2PC state, but does not use remotely +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC because multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by a single shard +-- remote modification uses 2PC even if it is not in an explicit +-- tx block as multiple nodes involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC as multiple nodes involved +-- in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local modification uses 2PC even if it is not in an explicit +-- tx block +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2) +SELECT count(*) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two local SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote SELECT does not require to +-- use actual 2PC +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT count(*) FROM event_responses_no_pkey; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT * FROM event_responses_no_pkey WHERE event_id = 2; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a single shard +-- local SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard local SELECT command without transaction block does not set the +-- internal state for 2PC +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT coordinated_transaction_should_use_2PC() FROM cte_1; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state +-- and does not use remotely +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi-shard shard SELECT followed by a single shard +-- remote modification does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- multi shard SELECT followed by a single shard +-- remote single shard modification does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey), + cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +BEGIN; + INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *; + SELECT count(*) FROM event_responses_no_pkey; + SELECT coordinated_transaction_should_use_2PC(); +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard remote modification followed by a multi shard +-- SELECT does not use 2PC, because only a single +-- machine involved in the modification +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *), + cte_2 AS (SELECT count(*) FROM event_responses_no_pkey) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- single shard local modification followed by remote multi-shard +-- modification uses 2PC as multiple nodes are involved in modifications +WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *), + cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *) +SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote multi-shard UPDATE requires to +-- use actual 2PC as multiple nodes are involved in modifications +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a local SELECT followed by a remote single-shard UPDATE does not require to +-- use actual 2PC. This is because a single node is involved in modification +BEGIN; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + +-- a remote single-shard UPDATE followed by a local single shard SELECT +-- does not require to use actual 2PC. This is because a single node +-- is involved in modification +BEGIN; + UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1; + SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2; +COMMIT; +SELECT count(*) FROM pg_dist_transaction; +SELECT recover_prepared_transactions(); + \c - - - :master_port -- verify the local_hostname guc is used for local executions that should connect to the diff --git a/src/test/regress/sql/multi_mx_hide_shard_names.sql b/src/test/regress/sql/multi_mx_hide_shard_names.sql index b56329150..558a699a6 100644 --- a/src/test/regress/sql/multi_mx_hide_shard_names.sql +++ b/src/test/regress/sql/multi_mx_hide_shard_names.sql @@ -67,7 +67,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; -- changing application_name reveals the shards -SET application_name TO ''; +SET application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; RESET application_name; @@ -76,7 +76,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- changing application_name in transaction reveals the shards BEGIN; -SET LOCAL application_name TO ''; +SET LOCAL application_name TO 'pg_regress'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; @@ -85,7 +85,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- now with session-level GUC, but ROLLBACK; BEGIN; -SET application_name TO ''; +SET application_name TO 'pg_regress'; ROLLBACK; -- shards are hidden again after GUCs are reset @@ -94,7 +94,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name -- we should hide correctly based on application_name with savepoints BEGIN; SAVEPOINT s1; -SET application_name TO ''; +SET application_name TO 'pg_regress'; -- changing application_name reveals the shards SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK TO SAVEPOINT s1; @@ -102,9 +102,9 @@ ROLLBACK TO SAVEPOINT s1; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; --- changing citus.hide_shards_from_app_name_prefixes reveals the shards +-- changing citus.show_shards_for_app_name_prefix reveals the shards BEGIN; -SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; +SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql'; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; ROLLBACK; diff --git a/src/test/regress/sql/multi_remove_node_reference_table.sql b/src/test/regress/sql/multi_remove_node_reference_table.sql index 310002b74..37c5a0cb2 100644 --- a/src/test/regress/sql/multi_remove_node_reference_table.sql +++ b/src/test/regress/sql/multi_remove_node_reference_table.sql @@ -580,13 +580,19 @@ WHERE ORDER BY shardid ASC; \c - - - :master_port - +SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port); SELECT citus_disable_node('localhost', :worker_2_port); SELECT public.wait_until_metadata_sync(); -- status after citus_disable_node_and_wait SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; +-- never mark coordinator metadatasynced = false +SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port; + +SELECT 1 FROM citus_remove_node('localhost', :master_port); + + SELECT shardid, shardstate, shardlength, nodename, nodeport FROM diff --git a/src/test/regress/sql/single_node.sql b/src/test/regress/sql/single_node.sql index 74c857d4e..0e1714003 100644 --- a/src/test/regress/sql/single_node.sql +++ b/src/test/regress/sql/single_node.sql @@ -1102,10 +1102,6 @@ BEGIN; SELECT coordinated_transaction_should_use_2PC(); ROLLBACK; --- same without transaction block -WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *) -SELECT coordinated_transaction_should_use_2PC() FROM cte_1; - -- if the local execution is disabled, we cannot failover to -- local execution and the queries would fail SET citus.enable_local_execution TO false;