Merge branch 'master' into velioglu/view_propagation

velioglu/prop_view_temp
Burak Velioglu 2022-05-07 18:48:53 +03:00
commit 1123468ebd
77 changed files with 1597 additions and 248 deletions

View File

@ -115,8 +115,6 @@ columnar_storage_info(PG_FUNCTION_ARGS)
RelationGetRelationName(rel)))); RelationGetRelationName(rel))));
} }
RelationOpenSmgr(rel);
Datum values[STORAGE_INFO_NATTS] = { 0 }; Datum values[STORAGE_INFO_NATTS] = { 0 };
bool nulls[STORAGE_INFO_NATTS] = { 0 }; bool nulls[STORAGE_INFO_NATTS] = { 0 };

View File

@ -1433,7 +1433,7 @@ DeleteTupleAndEnforceConstraints(ModifyState *state, HeapTuple heapTuple)
simple_heap_delete(state->rel, tid); simple_heap_delete(state->rel, tid);
/* execute AFTER ROW DELETE Triggers to enforce constraints */ /* execute AFTER ROW DELETE Triggers to enforce constraints */
ExecARDeleteTriggers(estate, resultRelInfo, tid, NULL, NULL); ExecARDeleteTriggers_compat(estate, resultRelInfo, tid, NULL, NULL, false);
} }
@ -1738,11 +1738,10 @@ ColumnarStorageUpdateIfNeeded(Relation rel, bool isUpgrade)
return; return;
} }
RelationOpenSmgr(rel); BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks < 2) if (nblocks < 2)
{ {
ColumnarStorageInit(rel->rd_smgr, ColumnarMetadataNewStorageId()); ColumnarStorageInit(RelationGetSmgr(rel), ColumnarMetadataNewStorageId());
return; return;
} }

View File

@ -44,6 +44,8 @@
#include "storage/bufmgr.h" #include "storage/bufmgr.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "pg_version_compat.h"
#include "columnar/columnar.h" #include "columnar/columnar.h"
#include "columnar/columnar_storage.h" #include "columnar/columnar_storage.h"
@ -354,8 +356,7 @@ ColumnarStorageGetReservedOffset(Relation rel, bool force)
bool bool
ColumnarStorageIsCurrent(Relation rel) ColumnarStorageIsCurrent(Relation rel)
{ {
RelationOpenSmgr(rel); BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks < 2) if (nblocks < 2)
{ {
@ -439,8 +440,7 @@ ColumnarStorageReserveData(Relation rel, uint64 amount)
PhysicalAddr final = LogicalToPhysical(nextReservation - 1); PhysicalAddr final = LogicalToPhysical(nextReservation - 1);
/* extend with new pages */ /* extend with new pages */
RelationOpenSmgr(rel); BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
while (nblocks <= final.blockno) while (nblocks <= final.blockno)
{ {
@ -547,8 +547,7 @@ ColumnarStorageTruncate(Relation rel, uint64 newDataReservation)
rel->rd_id, newDataReservation); rel->rd_id, newDataReservation);
} }
RelationOpenSmgr(rel); BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (old_rel_pages == 0) if (old_rel_pages == 0)
{ {
/* nothing to do */ /* nothing to do */
@ -627,8 +626,7 @@ ColumnarOverwriteMetapage(Relation relation, ColumnarMetapage columnarMetapage)
static ColumnarMetapage static ColumnarMetapage
ColumnarMetapageRead(Relation rel, bool force) ColumnarMetapageRead(Relation rel, bool force)
{ {
RelationOpenSmgr(rel); BlockNumber nblocks = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber nblocks = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (nblocks == 0) if (nblocks == 0)
{ {
/* /*

View File

@ -881,7 +881,7 @@ columnar_relation_set_new_filenode(Relation rel,
*freezeXid = RecentXmin; *freezeXid = RecentXmin;
*minmulti = GetOldestMultiXactId(); *minmulti = GetOldestMultiXactId();
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true);
ColumnarStorageInit(srel, ColumnarMetadataNewStorageId()); ColumnarStorageInit(srel, ColumnarMetadataNewStorageId());
InitColumnarOptions(rel->rd_id); InitColumnarOptions(rel->rd_id);
@ -913,8 +913,7 @@ columnar_relation_nontransactional_truncate(Relation rel)
RelationTruncate(rel, 0); RelationTruncate(rel, 0);
uint64 storageId = ColumnarMetadataNewStorageId(); uint64 storageId = ColumnarMetadataNewStorageId();
RelationOpenSmgr(rel); ColumnarStorageInit(RelationGetSmgr(rel), storageId);
ColumnarStorageInit(rel->rd_smgr, storageId);
} }
@ -1136,8 +1135,7 @@ LogRelationStats(Relation rel, int elevel)
totalStripeLength += stripe->dataLength; totalStripeLength += stripe->dataLength;
} }
RelationOpenSmgr(rel); uint64 relPages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
uint64 relPages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
RelationCloseSmgr(rel); RelationCloseSmgr(rel);
Datum storageId = DirectFunctionCall1(columnar_relation_storageid, Datum storageId = DirectFunctionCall1(columnar_relation_storageid,
@ -1239,8 +1237,7 @@ TruncateColumnar(Relation rel, int elevel)
uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1, uint64 newDataReservation = Max(GetHighestUsedAddress(rel->rd_node) + 1,
ColumnarFirstLogicalOffset); ColumnarFirstLogicalOffset);
RelationOpenSmgr(rel); BlockNumber old_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber old_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
if (!ColumnarStorageTruncate(rel, newDataReservation)) if (!ColumnarStorageTruncate(rel, newDataReservation))
{ {
@ -1248,8 +1245,7 @@ TruncateColumnar(Relation rel, int elevel)
return; return;
} }
RelationOpenSmgr(rel); BlockNumber new_rel_pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
BlockNumber new_rel_pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
/* /*
* We can release the exclusive lock as soon as we have truncated. * We can release the exclusive lock as soon as we have truncated.
@ -1784,20 +1780,17 @@ columnar_relation_size(Relation rel, ForkNumber forkNumber)
uint64 nblocks = 0; uint64 nblocks = 0;
/* Open it at the smgr level if not already done */
RelationOpenSmgr(rel);
/* InvalidForkNumber indicates returning the size for all forks */ /* InvalidForkNumber indicates returning the size for all forks */
if (forkNumber == InvalidForkNumber) if (forkNumber == InvalidForkNumber)
{ {
for (int i = 0; i < MAX_FORKNUM; i++) for (int i = 0; i < MAX_FORKNUM; i++)
{ {
nblocks += smgrnblocks(rel->rd_smgr, i); nblocks += smgrnblocks(RelationGetSmgr(rel), i);
} }
} }
else else
{ {
nblocks = smgrnblocks(rel->rd_smgr, forkNumber); nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber);
} }
return nblocks * BLCKSZ; return nblocks * BLCKSZ;
@ -1819,8 +1812,7 @@ columnar_estimate_rel_size(Relation rel, int32 *attr_widths,
double *allvisfrac) double *allvisfrac)
{ {
CheckCitusColumnarVersion(ERROR); CheckCitusColumnarVersion(ERROR);
RelationOpenSmgr(rel); *pages = smgrnblocks(RelationGetSmgr(rel), MAIN_FORKNUM);
*pages = smgrnblocks(rel->rd_smgr, MAIN_FORKNUM);
*tuples = ColumnarTableRowCount(rel); *tuples = ColumnarTableRowCount(rel);
/* /*
@ -2300,18 +2292,30 @@ detoast_values(TupleDesc tupleDesc, Datum *orig_values, bool *isnull)
static void static void
ColumnarCheckLogicalReplication(Relation rel) ColumnarCheckLogicalReplication(Relation rel)
{ {
bool pubActionInsert = false;
if (!is_publishable_relation(rel)) if (!is_publishable_relation(rel))
{ {
return; return;
} }
#if PG_VERSION_NUM >= PG_VERSION_15
{
PublicationDesc pubdesc;
RelationBuildPublicationDesc(rel, &pubdesc);
pubActionInsert = pubdesc.pubactions.pubinsert;
}
#else
if (rel->rd_pubactions == NULL) if (rel->rd_pubactions == NULL)
{ {
GetRelationPublicationActions(rel); GetRelationPublicationActions(rel);
Assert(rel->rd_pubactions != NULL); Assert(rel->rd_pubactions != NULL);
} }
pubActionInsert = rel->rd_pubactions->pubinsert;
#endif
if (rel->rd_pubactions->pubinsert) if (pubActionInsert)
{ {
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg( errmsg(

View File

@ -10,6 +10,8 @@
*/ */
#include "postgres.h" #include "postgres.h"
#include "pg_version_compat.h"
#include "access/htup_details.h" #include "access/htup_details.h"
#include "access/xact.h" #include "access/xact.h"
#include "catalog/pg_collation.h" #include "catalog/pg_collation.h"
@ -60,12 +62,30 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(heapTuple); Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(heapTuple);
char collprovider = collationForm->collprovider; char collprovider = collationForm->collprovider;
const char *collcollate = NameStr(collationForm->collcollate);
const char *collctype = NameStr(collationForm->collctype);
Oid collnamespace = collationForm->collnamespace; Oid collnamespace = collationForm->collnamespace;
const char *collname = NameStr(collationForm->collname); const char *collname = NameStr(collationForm->collname);
bool collisdeterministic = collationForm->collisdeterministic; bool collisdeterministic = collationForm->collisdeterministic;
#if PG_VERSION_NUM >= PG_VERSION_15
bool isnull;
Datum datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collcollate,
&isnull);
Assert(!isnull);
char *collcollate = TextDatumGetCString(datum);
datum = SysCacheGetAttr(COLLOID, heapTuple, Anum_pg_collation_collctype, &isnull);
Assert(!isnull);
char *collctype = TextDatumGetCString(datum);
#else
/*
* In versions before 15, collcollate and collctype were type "name". Use
* pstrdup() to match the interface of 15 so that we consistently free the
* result later.
*/
char *collcollate = pstrdup(NameStr(collationForm->collcollate));
char *collctype = pstrdup(NameStr(collationForm->collctype));
#endif
if (collowner != NULL) if (collowner != NULL)
{ {
*collowner = collationForm->collowner; *collowner = collationForm->collowner;
@ -103,6 +123,9 @@ CreateCollationDDLInternal(Oid collationId, Oid *collowner, char **quotedCollati
quote_literal_cstr(collctype)); quote_literal_cstr(collctype));
} }
pfree(collcollate);
pfree(collctype);
if (!collisdeterministic) if (!collisdeterministic)
{ {
appendStringInfoString(&collationNameDef, ", deterministic = false"); appendStringInfoString(&collationNameDef, ", deterministic = false");
@ -500,7 +523,7 @@ GenerateBackupNameForCollationCollision(const ObjectAddress *address)
return NULL; return NULL;
} }
Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(collationTuple); Form_pg_collation collationForm = (Form_pg_collation) GETSTRUCT(collationTuple);
Value *namespace = makeString(get_namespace_name(collationForm->collnamespace)); String *namespace = makeString(get_namespace_name(collationForm->collnamespace));
ReleaseSysCache(collationTuple); ReleaseSysCache(collationTuple);
while (true) while (true)

View File

@ -115,7 +115,7 @@ AlterDatabaseOwnerObjectAddress(Node *node, bool missing_ok)
AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node); AlterOwnerStmt *stmt = castNode(AlterOwnerStmt, node);
Assert(stmt->objectType == OBJECT_DATABASE); Assert(stmt->objectType == OBJECT_DATABASE);
Oid databaseOid = get_database_oid(strVal((Value *) stmt->object), missing_ok); Oid databaseOid = get_database_oid(strVal((String *) stmt->object), missing_ok);
ObjectAddress address = { 0 }; ObjectAddress address = { 0 };
ObjectAddressSet(address, DatabaseRelationId, databaseOid); ObjectAddressSet(address, DatabaseRelationId, databaseOid);

View File

@ -295,7 +295,7 @@ FilterDistributedExtensions(List *extensionObjectList)
{ {
List *extensionNameList = NIL; List *extensionNameList = NIL;
Value *objectName = NULL; String *objectName = NULL;
foreach_ptr(objectName, extensionObjectList) foreach_ptr(objectName, extensionObjectList)
{ {
const char *extensionName = strVal(objectName); const char *extensionName = strVal(objectName);
@ -334,7 +334,7 @@ ExtensionNameListToObjectAddressList(List *extensionObjectList)
{ {
List *extensionObjectAddressList = NIL; List *extensionObjectAddressList = NIL;
Value *objectName; String *objectName;
foreach_ptr(objectName, extensionObjectList) foreach_ptr(objectName, extensionObjectList)
{ {
/* /*
@ -513,7 +513,8 @@ MarkExistingObjectDependenciesDistributedIfSupported()
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, citusTableId); ObjectAddressSet(tableAddress, RelationRelationId, citusTableId);
if (ShouldSyncTableMetadata(citusTableId)) /* refrain reading the metadata cache for all tables */
if (ShouldSyncTableMetadataViaCatalog(citusTableId))
{ {
/* we need to pass pointer allocated in the heap */ /* we need to pass pointer allocated in the heap */
ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress)); ObjectAddress *addressPointer = palloc0(sizeof(ObjectAddress));
@ -671,7 +672,7 @@ IsDropCitusExtensionStmt(Node *parseTree)
} }
/* now that we have a DropStmt, check if citus extension is among the objects to dropped */ /* now that we have a DropStmt, check if citus extension is among the objects to dropped */
Value *objectName; String *objectName;
foreach_ptr(objectName, dropStmt->objects) foreach_ptr(objectName, dropStmt->objects)
{ {
const char *extensionName = strVal(objectName); const char *extensionName = strVal(objectName);

View File

@ -190,7 +190,7 @@ PreprocessDropForeignServerStmt(Node *node, const char *queryString,
Assert(list_length(stmt->objects) == 1); Assert(list_length(stmt->objects) == 1);
Value *serverValue = linitial(stmt->objects); String *serverValue = linitial(stmt->objects);
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);
/* unmark distributed server */ /* unmark distributed server */
@ -362,7 +362,7 @@ RecreateForeignServerStmt(Oid serverId)
static bool static bool
NameListHasDistributedServer(List *serverNames) NameListHasDistributedServer(List *serverNames)
{ {
Value *serverValue = NULL; String *serverValue = NULL;
foreach_ptr(serverValue, serverNames) foreach_ptr(serverValue, serverNames)
{ {
ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false); ObjectAddress address = GetObjectAddressByServerName(strVal(serverValue), false);

View File

@ -490,7 +490,7 @@ GetDistributionArgIndex(Oid functionOid, char *distributionArgumentName,
distributionArgumentName++; distributionArgumentName++;
/* throws error if the input is not an integer */ /* throws error if the input is not an integer */
distributionArgumentIndex = pg_atoi(distributionArgumentName, 4, 0); distributionArgumentIndex = pg_strtoint32(distributionArgumentName);
if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs) if (distributionArgumentIndex < 1 || distributionArgumentIndex > numberOfArgs)
{ {
@ -1893,7 +1893,7 @@ AlterFunctionSchemaStmtObjectAddress(Node *node, bool missing_ok)
*/ */
/* the name of the function is the last in the list of names */ /* the name of the function is the last in the list of names */
Value *funcNameStr = lfirst(list_tail(names)); String *funcNameStr = lfirst(list_tail(names));
List *newNames = list_make2(makeString(stmt->newschema), funcNameStr); List *newNames = list_make2(makeString(stmt->newschema), funcNameStr);
/* /*
@ -1938,8 +1938,8 @@ GenerateBackupNameForProcCollision(const ObjectAddress *address)
char *newName = palloc0(NAMEDATALEN); char *newName = palloc0(NAMEDATALEN);
char suffix[NAMEDATALEN] = { 0 }; char suffix[NAMEDATALEN] = { 0 };
int count = 0; int count = 0;
Value *namespace = makeString(get_namespace_name(get_func_namespace( String *namespace = makeString(get_namespace_name(get_func_namespace(
address->objectId))); address->objectId)));
char *baseName = get_func_name(address->objectId); char *baseName = get_func_name(address->objectId);
int baseLength = strlen(baseName); int baseLength = strlen(baseName);
Oid *argtypes = NULL; Oid *argtypes = NULL;

View File

@ -464,7 +464,8 @@ GenerateCreateIndexDDLJob(IndexStmt *createIndexStatement, const char *createInd
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = CreateIndexStmtGetRelationId(createIndexStatement); ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
CreateIndexStmtGetRelationId(createIndexStatement));
ddlJob->startNewTransaction = createIndexStatement->concurrent; ddlJob->startNewTransaction = createIndexStatement->concurrent;
ddlJob->metadataSyncCommand = createIndexCommand; ddlJob->metadataSyncCommand = createIndexCommand;
ddlJob->taskList = CreateIndexTaskList(createIndexStatement); ddlJob->taskList = CreateIndexTaskList(createIndexStatement);
@ -598,7 +599,7 @@ PreprocessReindexStmt(Node *node, const char *reindexCommand,
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement, ddlJob->startNewTransaction = IsReindexWithParam_compat(reindexStatement,
"concurrently"); "concurrently");
ddlJob->metadataSyncCommand = reindexCommand; ddlJob->metadataSyncCommand = reindexCommand;
@ -695,7 +696,8 @@ PreprocessDropIndexStmt(Node *node, const char *dropIndexCommand,
MarkInvalidateForeignKeyGraph(); MarkInvalidateForeignKeyGraph();
} }
ddlJob->targetRelationId = distributedRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId,
distributedRelationId);
/* /*
* We do not want DROP INDEX CONCURRENTLY to commit locally before * We do not want DROP INDEX CONCURRENTLY to commit locally before

View File

@ -2009,7 +2009,7 @@ CitusCopyDestReceiverStartup(DestReceiver *dest, int operation,
foreach(columnNameCell, columnNameList) foreach(columnNameCell, columnNameList)
{ {
char *columnName = (char *) lfirst(columnNameCell); char *columnName = (char *) lfirst(columnNameCell);
Value *columnNameValue = makeString(columnName); String *columnNameValue = makeString(columnName);
attributeList = lappend(attributeList, columnNameValue); attributeList = lappend(attributeList, columnNameValue);
} }

View File

@ -127,7 +127,7 @@ PreprocessRenameStmt(Node *node, const char *renameCommand,
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = tableRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, tableRelationId);
ddlJob->metadataSyncCommand = renameCommand; ddlJob->metadataSyncCommand = renameCommand;
ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand); ddlJob->taskList = DDLTaskList(tableRelationId, renameCommand);

View File

@ -150,7 +150,7 @@ PostprocessAlterRoleStmt(Node *node, const char *queryString)
if (encryptedPassword != NULL) if (encryptedPassword != NULL)
{ {
Value *encryptedPasswordValue = makeString((char *) encryptedPassword); String *encryptedPasswordValue = makeString((char *) encryptedPassword);
option->arg = (Node *) encryptedPasswordValue; option->arg = (Node *) encryptedPasswordValue;
} }
else else
@ -741,8 +741,13 @@ makeStringConst(char *str, int location)
{ {
A_Const *n = makeNode(A_Const); A_Const *n = makeNode(A_Const);
#if PG_VERSION_NUM >= PG_VERSION_15
n->val.sval.type = T_String;
n->val.sval.sval = str;
#else
n->val.type = T_String; n->val.type = T_String;
n->val.val.str = str; n->val.val.str = str;
#endif
n->location = location; n->location = location;
return (Node *) n; return (Node *) n;
@ -759,8 +764,13 @@ makeIntConst(int val, int location)
{ {
A_Const *n = makeNode(A_Const); A_Const *n = makeNode(A_Const);
#if PG_VERSION_NUM >= PG_VERSION_15
n->val.ival.type = T_Integer;
n->val.ival.ival = val;
#else
n->val.type = T_Integer; n->val.type = T_Integer;
n->val.val.ival = val; n->val.val.ival = val;
#endif
n->location = location; n->location = location;
return (Node *) n; return (Node *) n;
@ -777,8 +787,13 @@ makeFloatConst(char *str, int location)
{ {
A_Const *n = makeNode(A_Const); A_Const *n = makeNode(A_Const);
#if PG_VERSION_NUM >= PG_VERSION_15
n->val.fval.type = T_Float;
n->val.fval.fval = str;
#else
n->val.type = T_Float; n->val.type = T_Float;
n->val.val.str = str; n->val.val.str = str;
#endif
n->location = location; n->location = location;
return (Node *) n; return (Node *) n;

View File

@ -107,7 +107,7 @@ PreprocessDropSchemaStmt(Node *node, const char *queryString,
EnsureSequentialMode(OBJECT_SCHEMA); EnsureSequentialMode(OBJECT_SCHEMA);
Value *schemaVal = NULL; String *schemaVal = NULL;
foreach_ptr(schemaVal, distributedSchemas) foreach_ptr(schemaVal, distributedSchemas)
{ {
if (SchemaHasDistributedTableWithFKey(strVal(schemaVal))) if (SchemaHasDistributedTableWithFKey(strVal(schemaVal)))
@ -288,7 +288,7 @@ FilterDistributedSchemas(List *schemas)
{ {
List *distributedSchemas = NIL; List *distributedSchemas = NIL;
Value *schemaValue = NULL; String *schemaValue = NULL;
foreach_ptr(schemaValue, schemas) foreach_ptr(schemaValue, schemas)
{ {
const char *schemaName = strVal(schemaValue); const char *schemaName = strVal(schemaValue);

View File

@ -92,7 +92,7 @@ PreprocessCreateStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -197,7 +197,7 @@ PreprocessDropStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -236,7 +236,7 @@ PreprocessAlterStatisticsRenameStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -274,7 +274,7 @@ PreprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -295,7 +295,7 @@ PostprocessAlterStatisticsSchemaStmt(Node *node, const char *queryString)
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
Assert(stmt->objectType == OBJECT_STATISTIC_EXT); Assert(stmt->objectType == OBJECT_STATISTIC_EXT);
Value *statName = llast((List *) stmt->object); String *statName = llast((List *) stmt->object);
Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema),
statName), false); statName), false);
Oid relationId = GetRelIdByStatsOid(statsOid); Oid relationId = GetRelIdByStatsOid(statsOid);
@ -328,7 +328,7 @@ AlterStatisticsSchemaStmtObjectAddress(Node *node, bool missingOk)
AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node); AlterObjectSchemaStmt *stmt = castNode(AlterObjectSchemaStmt, node);
ObjectAddress address = { 0 }; ObjectAddress address = { 0 };
Value *statName = llast((List *) stmt->object); String *statName = llast((List *) stmt->object);
Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema), Oid statsOid = get_statistics_object_oid(list_make2(makeString(stmt->newschema),
statName), missingOk); statName), missingOk);
ObjectAddressSet(address, StatisticExtRelationId, statsOid); ObjectAddressSet(address, StatisticExtRelationId, statsOid);
@ -376,7 +376,7 @@ PreprocessAlterStatisticsStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);
@ -416,7 +416,7 @@ PreprocessAlterStatisticsOwnerStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->startNewTransaction = false; ddlJob->startNewTransaction = false;
ddlJob->metadataSyncCommand = ddlCommand; ddlJob->metadataSyncCommand = ddlCommand;
ddlJob->taskList = DDLTaskList(relationId, ddlCommand); ddlJob->taskList = DDLTaskList(relationId, ddlCommand);

View File

@ -1102,7 +1102,7 @@ PreprocessAlterTableStmt(Node *node, const char *alterTableCommand,
/* fill them here as it is possible to use them in some conditional blocks below */ /* fill them here as it is possible to use them in some conditional blocks below */
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = leftRelationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, leftRelationId);
const char *sqlForTaskList = alterTableCommand; const char *sqlForTaskList = alterTableCommand;
if (deparseAT) if (deparseAT)
@ -1779,7 +1779,7 @@ PreprocessAlterTableSchemaStmt(Node *node, const char *queryString,
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
QualifyTreeNode((Node *) stmt); QualifyTreeNode((Node *) stmt);
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt); ddlJob->metadataSyncCommand = DeparseTreeNode((Node *) stmt);
ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand); ddlJob->taskList = DDLTaskList(relationId, ddlJob->metadataSyncCommand);
return list_make1(ddlJob); return list_make1(ddlJob);

View File

@ -44,8 +44,8 @@
/* local function forward declarations */ /* local function forward declarations */
static bool IsCreateCitusTruncateTriggerStmt(CreateTrigStmt *createTriggerStmt); static bool IsCreateCitusTruncateTriggerStmt(CreateTrigStmt *createTriggerStmt);
static Value * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt * static String * GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *
alterTriggerDependsStmt); alterTriggerDependsStmt);
static void ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt); static void ErrorIfUnsupportedDropTriggerCommand(DropStmt *dropTriggerStmt);
static RangeVar * GetDropTriggerStmtRelation(DropStmt *dropTriggerStmt); static RangeVar * GetDropTriggerStmtRelation(DropStmt *dropTriggerStmt);
static void ExtractDropStmtTriggerAndRelationName(DropStmt *dropTriggerStmt, static void ExtractDropStmtTriggerAndRelationName(DropStmt *dropTriggerStmt,
@ -416,7 +416,7 @@ PreprocessAlterTriggerDependsStmt(Node *node, const char *queryString,
* workers * workers
*/ */
Value *triggerNameValue = String *triggerNameValue =
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
ereport(ERROR, (errmsg( ereport(ERROR, (errmsg(
"Triggers \"%s\" on distributed tables and local tables added to metadata " "Triggers \"%s\" on distributed tables and local tables added to metadata "
@ -454,7 +454,7 @@ PostprocessAlterTriggerDependsStmt(Node *node, const char *queryString)
EnsureCoordinator(); EnsureCoordinator();
ErrorOutForTriggerIfNotSupported(relationId); ErrorOutForTriggerIfNotSupported(relationId);
Value *triggerNameValue = String *triggerNameValue =
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue), return CitusCreateTriggerCommandDDLJob(relationId, strVal(triggerNameValue),
queryString); queryString);
@ -476,7 +476,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS
char **relationName = &(relation->relname); char **relationName = &(relation->relname);
AppendShardIdToName(relationName, shardId); AppendShardIdToName(relationName, shardId);
Value *triggerNameValue = String *triggerNameValue =
GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt); GetAlterTriggerDependsTriggerNameValue(alterTriggerDependsStmt);
AppendShardIdToName(&strVal(triggerNameValue), shardId); AppendShardIdToName(&strVal(triggerNameValue), shardId);
@ -489,7 +489,7 @@ AlterTriggerDependsEventExtendNames(AlterObjectDependsStmt *alterTriggerDependsS
* GetAlterTriggerDependsTriggerName returns Value object for the trigger * GetAlterTriggerDependsTriggerName returns Value object for the trigger
* name that given AlterObjectDependsStmt is executed for. * name that given AlterObjectDependsStmt is executed for.
*/ */
static Value * static String *
GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDependsStmt) GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDependsStmt)
{ {
List *triggerObjectNameList = (List *) alterTriggerDependsStmt->object; List *triggerObjectNameList = (List *) alterTriggerDependsStmt->object;
@ -503,7 +503,7 @@ GetAlterTriggerDependsTriggerNameValue(AlterObjectDependsStmt *alterTriggerDepen
* be the name of the trigger in either before or after standard process * be the name of the trigger in either before or after standard process
* utility. * utility.
*/ */
Value *triggerNameValue = llast(triggerObjectNameList); String *triggerNameValue = llast(triggerObjectNameList);
return triggerNameValue; return triggerNameValue;
} }
@ -642,12 +642,12 @@ DropTriggerEventExtendNames(DropStmt *dropTriggerStmt, char *schemaName, uint64
ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, &relationName); ExtractDropStmtTriggerAndRelationName(dropTriggerStmt, &triggerName, &relationName);
AppendShardIdToName(&triggerName, shardId); AppendShardIdToName(&triggerName, shardId);
Value *triggerNameValue = makeString(triggerName); String *triggerNameValue = makeString(triggerName);
AppendShardIdToName(&relationName, shardId); AppendShardIdToName(&relationName, shardId);
Value *relationNameValue = makeString(relationName); String *relationNameValue = makeString(relationName);
Value *schemaNameValue = makeString(pstrdup(schemaName)); String *schemaNameValue = makeString(pstrdup(schemaName));
List *shardTriggerNameList = List *shardTriggerNameList =
list_make3(schemaNameValue, relationNameValue, triggerNameValue); list_make3(schemaNameValue, relationNameValue, triggerNameValue);
@ -712,7 +712,7 @@ CitusCreateTriggerCommandDDLJob(Oid relationId, char *triggerName,
const char *queryString) const char *queryString)
{ {
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = queryString; ddlJob->metadataSyncCommand = queryString;
if (!triggerName) if (!triggerName)

View File

@ -878,7 +878,7 @@ AlterTypeSchemaStmtObjectAddress(Node *node, bool missing_ok)
*/ */
/* typename is the last in the list of names */ /* typename is the last in the list of names */
Value *typeNameStr = lfirst(list_tail(names)); String *typeNameStr = lfirst(list_tail(names));
/* /*
* we don't error here either, as the error would be not a good user facing * we don't error here either, as the error would be not a good user facing

View File

@ -1044,16 +1044,20 @@ ExecuteDistributedDDLJob(DDLJob *ddlJob)
EnsureCoordinator(); EnsureCoordinator();
Oid targetRelationId = ddlJob->targetRelationId; ObjectAddress targetObjectAddress = ddlJob->targetObjectAddress;
if (OidIsValid(targetRelationId)) if (OidIsValid(targetObjectAddress.classId))
{ {
/* /*
* Only for ddlJobs that are targetting a relation (table) we want to sync * Only for ddlJobs that are targetting an object we want to sync
* its metadata and verify some properties around the table. * its metadata.
*/ */
shouldSyncMetadata = ShouldSyncTableMetadata(targetRelationId); shouldSyncMetadata = ShouldSyncUserCommandForObject(targetObjectAddress);
EnsurePartitionTableNotReplicated(targetRelationId);
if (targetObjectAddress.classId == RelationRelationId)
{
EnsurePartitionTableNotReplicated(targetObjectAddress.objectId);
}
} }
bool localExecutionSupported = true; bool localExecutionSupported = true;
@ -1304,7 +1308,7 @@ CreateCustomDDLTaskList(Oid relationId, TableDDLCommand *command)
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = relationId; ObjectAddressSet(ddlJob->targetObjectAddress, RelationRelationId, relationId);
ddlJob->metadataSyncCommand = GetTableDDLCommand(command); ddlJob->metadataSyncCommand = GetTableDDLCommand(command);
ddlJob->taskList = taskList; ddlJob->taskList = taskList;
@ -1555,7 +1559,7 @@ NodeDDLTaskList(TargetWorkerSet targets, List *commands)
} }
DDLJob *ddlJob = palloc0(sizeof(DDLJob)); DDLJob *ddlJob = palloc0(sizeof(DDLJob));
ddlJob->targetRelationId = InvalidOid; ddlJob->targetObjectAddress = InvalidObjectAddress;
ddlJob->metadataSyncCommand = NULL; ddlJob->metadataSyncCommand = NULL;
ddlJob->taskList = list_make1(task); ddlJob->taskList = list_make1(task);

View File

@ -432,7 +432,7 @@ DeparseVacuumColumnNames(List *columnNameList)
appendStringInfoString(columnNames, " ("); appendStringInfoString(columnNames, " (");
Value *columnName = NULL; String *columnName = NULL;
foreach_ptr(columnName, columnNameList) foreach_ptr(columnName, columnNameList)
{ {
appendStringInfo(columnNames, "%s,", strVal(columnName)); appendStringInfo(columnNames, "%s,", strVal(columnName));

View File

@ -11,6 +11,8 @@
#include "postgres.h" #include "postgres.h"
#include "pg_version_compat.h"
#include "catalog/namespace.h" #include "catalog/namespace.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
@ -44,6 +46,6 @@ AppendAlterDatabaseOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
appendStringInfo(buf, appendStringInfo(buf,
"ALTER DATABASE %s OWNER TO %s;", "ALTER DATABASE %s OWNER TO %s;",
quote_identifier(strVal((Value *) stmt->object)), quote_identifier(strVal((String *) stmt->object)),
RoleSpecString(stmt->newowner, true)); RoleSpecString(stmt->newowner, true));
} }

View File

@ -223,7 +223,7 @@ AppendDropForeignServerStmt(StringInfo buf, DropStmt *stmt)
static void static void
AppendServerNames(StringInfo buf, DropStmt *stmt) AppendServerNames(StringInfo buf, DropStmt *stmt)
{ {
Value *serverValue = NULL; String *serverValue = NULL;
foreach_ptr(serverValue, stmt->objects) foreach_ptr(serverValue, stmt->objects)
{ {
const char *serverString = quote_identifier(strVal(serverValue)); const char *serverString = quote_identifier(strVal(serverValue));

View File

@ -396,18 +396,18 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt)
appendStringInfo(buf, " SET %s =", quote_identifier(setStmt->name)); appendStringInfo(buf, " SET %s =", quote_identifier(setStmt->name));
} }
Value value = varArgConst->val; Node *value = (Node *) &varArgConst->val;
switch (value.type) switch (value->type)
{ {
case T_Integer: case T_Integer:
{ {
appendStringInfo(buf, " %d", intVal(&value)); appendStringInfo(buf, " %d", intVal(value));
break; break;
} }
case T_Float: case T_Float:
{ {
appendStringInfo(buf, " %s", strVal(&value)); appendStringInfo(buf, " %s", strVal(value));
break; break;
} }
@ -428,7 +428,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt)
Datum interval = Datum interval =
DirectFunctionCall3(interval_in, DirectFunctionCall3(interval_in,
CStringGetDatum(strVal(&value)), CStringGetDatum(strVal(value)),
ObjectIdGetDatum(InvalidOid), ObjectIdGetDatum(InvalidOid),
Int32GetDatum(typmod)); Int32GetDatum(typmod));
@ -440,7 +440,7 @@ AppendVarSetValue(StringInfo buf, VariableSetStmt *setStmt)
else else
{ {
appendStringInfo(buf, " %s", quote_literal_cstr(strVal( appendStringInfo(buf, " %s", quote_literal_cstr(strVal(
&value))); value)));
} }
break; break;
} }

View File

@ -126,7 +126,7 @@ AppendDropSchemaStmt(StringInfo buf, DropStmt *stmt)
appendStringInfoString(buf, "IF EXISTS "); appendStringInfoString(buf, "IF EXISTS ");
} }
Value *schemaValue = NULL; String *schemaValue = NULL;
foreach_ptr(schemaValue, stmt->objects) foreach_ptr(schemaValue, stmt->objects)
{ {
const char *schemaString = quote_identifier(strVal(schemaValue)); const char *schemaString = quote_identifier(strVal(schemaValue));

View File

@ -200,10 +200,10 @@ AppendAlterStatisticsOwnerStmt(StringInfo buf, AlterOwnerStmt *stmt)
static void static void
AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt) AppendStatisticsName(StringInfo buf, CreateStatsStmt *stmt)
{ {
Value *schemaNameVal = (Value *) linitial(stmt->defnames); String *schemaNameVal = (String *) linitial(stmt->defnames);
const char *schemaName = quote_identifier(strVal(schemaNameVal)); const char *schemaName = quote_identifier(strVal(schemaNameVal));
Value *statNameVal = (Value *) lsecond(stmt->defnames); String *statNameVal = (String *) lsecond(stmt->defnames);
const char *statName = quote_identifier(strVal(statNameVal)); const char *statName = quote_identifier(strVal(statNameVal));
appendStringInfo(buf, "%s.%s", schemaName, statName); appendStringInfo(buf, "%s.%s", schemaName, statName);
@ -220,7 +220,7 @@ AppendStatTypes(StringInfo buf, CreateStatsStmt *stmt)
appendStringInfoString(buf, " ("); appendStringInfoString(buf, " (");
Value *statType = NULL; String *statType = NULL;
foreach_ptr(statType, stmt->stat_types) foreach_ptr(statType, stmt->stat_types)
{ {
appendStringInfoString(buf, strVal(statType)); appendStringInfoString(buf, strVal(statType));

View File

@ -464,7 +464,7 @@ DeparseTextSearchDictionaryCommentStmt(Node *node)
static void static void
AppendStringInfoTokentypeList(StringInfo buf, List *tokentypes) AppendStringInfoTokentypeList(StringInfo buf, List *tokentypes)
{ {
Value *tokentype = NULL; String *tokentype = NULL;
bool first = true; bool first = true;
foreach_ptr(tokentype, tokentypes) foreach_ptr(tokentype, tokentypes)
{ {

View File

@ -171,7 +171,6 @@
#include "storage/fd.h" #include "storage/fd.h"
#include "storage/latch.h" #include "storage/latch.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
#include "utils/syscache.h" #include "utils/syscache.h"
@ -4513,7 +4512,7 @@ ReceiveResults(WorkerSession *session, bool storeRows)
/* if there are multiple replicas, make sure to consider only one */ /* if there are multiple replicas, make sure to consider only one */
if (storeRows && *currentAffectedTupleString != '\0') if (storeRows && *currentAffectedTupleString != '\0')
{ {
scanint8(currentAffectedTupleString, false, &currentAffectedTupleCount); currentAffectedTupleCount = pg_strtoint64(currentAffectedTupleString);
Assert(currentAffectedTupleCount >= 0); Assert(currentAffectedTupleCount >= 0);
execution->rowsProcessed += currentAffectedTupleCount; execution->rowsProcessed += currentAffectedTupleCount;
} }

View File

@ -207,19 +207,6 @@ ExecuteLocalTaskListExtended(List *taskList,
uint64 totalRowsProcessed = 0; uint64 totalRowsProcessed = 0;
ParamListInfo paramListInfo = copyParamList(orig_paramListInfo); ParamListInfo paramListInfo = copyParamList(orig_paramListInfo);
/*
* Even if we are executing local tasks, we still enable
* coordinated transaction. This is because
* (a) we might be in a transaction, and the next commands may
* require coordinated transaction
* (b) we might be executing some tasks locally and the others
* via remote execution
*
* Also, there is no harm enabling coordinated transaction even if
* we only deal with local tasks in the transaction.
*/
UseCoordinatedTransaction();
LocalExecutorLevel++; LocalExecutorLevel++;
PG_TRY(); PG_TRY();
{ {

View File

@ -617,7 +617,8 @@ RewriteRawQueryStmt(RawStmt *rawStmt, const char *queryString, Oid *paramOids, i
numParams) numParams)
{ {
List *queryTreeList = List *queryTreeList =
pg_analyze_and_rewrite(rawStmt, queryString, paramOids, numParams, NULL); pg_analyze_and_rewrite_fixedparams(rawStmt, queryString, paramOids, numParams,
NULL);
if (list_length(queryTreeList) != 1) if (list_length(queryTreeList) != 1)
{ {

View File

@ -7,7 +7,9 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "pg_version_compat.h"
#include "stdint.h" #include "stdint.h"
#include "postgres.h" #include "postgres.h"
@ -2864,8 +2866,8 @@ CurrentUserName(void)
Oid Oid
LookupTypeOid(char *schemaNameSting, char *typeNameString) LookupTypeOid(char *schemaNameSting, char *typeNameString)
{ {
Value *schemaName = makeString(schemaNameSting); String *schemaName = makeString(schemaNameSting);
Value *typeName = makeString(typeNameString); String *typeName = makeString(typeNameString);
List *qualifiedName = list_make2(schemaName, typeName); List *qualifiedName = list_make2(schemaName, typeName);
TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName); TypeName *enumTypeName = makeTypeNameFromNameList(qualifiedName);

View File

@ -425,6 +425,22 @@ ClusterHasKnownMetadataWorkers()
} }
/*
* ShouldSyncUserCommandForObject checks if the user command should be synced to the
* worker nodes for the given object.
*/
bool
ShouldSyncUserCommandForObject(ObjectAddress objectAddress)
{
if (objectAddress.classId == RelationRelationId)
{
return ShouldSyncTableMetadata(objectAddress.objectId);
}
return false;
}
/* /*
* ShouldSyncTableMetadata checks if the metadata of a distributed table should be * ShouldSyncTableMetadata checks if the metadata of a distributed table should be
* propagated to metadata workers, i.e. the table is a hash distributed table or * propagated to metadata workers, i.e. the table is a hash distributed table or

View File

@ -116,7 +116,7 @@ static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
static bool NodeIsLocal(WorkerNode *worker); static bool NodeIsLocal(WorkerNode *worker);
static void SetLockTimeoutLocally(int32 lock_cooldown); static void SetLockTimeoutLocally(int32 lock_cooldown);
static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort); static void UpdateNodeLocation(int32 nodeId, char *newNodeName, int32 newNodePort);
static bool UnsetMetadataSyncedForAll(void); static bool UnsetMetadataSyncedForAllWorkers(void);
static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode, static char * GetMetadataSyncCommandToSetNodeColumn(WorkerNode *workerNode,
int columnIndex, int columnIndex,
Datum value); Datum value);
@ -535,7 +535,7 @@ citus_disable_node(PG_FUNCTION_ARGS)
* metadata at this point. Instead, we defer that to citus_activate_node() * metadata at this point. Instead, we defer that to citus_activate_node()
* where we expect all nodes up and running. * where we expect all nodes up and running.
*/ */
if (UnsetMetadataSyncedForAll()) if (UnsetMetadataSyncedForAllWorkers())
{ {
TriggerNodeMetadataSyncOnCommit(); TriggerNodeMetadataSyncOnCommit();
} }
@ -1319,7 +1319,7 @@ citus_update_node(PG_FUNCTION_ARGS)
* early, but that's fine, since this will start a retry loop with * early, but that's fine, since this will start a retry loop with
* 5 second intervals until sync is complete. * 5 second intervals until sync is complete.
*/ */
if (UnsetMetadataSyncedForAll()) if (UnsetMetadataSyncedForAllWorkers())
{ {
TriggerNodeMetadataSyncOnCommit(); TriggerNodeMetadataSyncOnCommit();
} }
@ -2646,15 +2646,15 @@ DatumToString(Datum datum, Oid dataType)
/* /*
* UnsetMetadataSyncedForAll sets the metadatasynced column of all metadata * UnsetMetadataSyncedForAllWorkers sets the metadatasynced column of all metadata
* nodes to false. It returns true if it updated at least a node. * worker nodes to false. It returns true if it updated at least a node.
*/ */
static bool static bool
UnsetMetadataSyncedForAll(void) UnsetMetadataSyncedForAllWorkers(void)
{ {
bool updatedAtLeastOne = false; bool updatedAtLeastOne = false;
ScanKeyData scanKey[2]; ScanKeyData scanKey[3];
int scanKeyCount = 2; int scanKeyCount = 3;
bool indexOK = false; bool indexOK = false;
/* /*
@ -2669,6 +2669,11 @@ UnsetMetadataSyncedForAll(void)
ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced, ScanKeyInit(&scanKey[1], Anum_pg_dist_node_metadatasynced,
BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true)); BTEqualStrategyNumber, F_BOOLEQ, BoolGetDatum(true));
/* coordinator always has the up to date metadata */
ScanKeyInit(&scanKey[2], Anum_pg_dist_node_groupid,
BTGreaterStrategyNumber, F_INT4GT,
Int32GetDatum(COORDINATOR_GROUP_ID));
CatalogIndexState indstate = CatalogOpenIndexes(relation); CatalogIndexState indstate = CatalogOpenIndexes(relation);
SysScanDesc scanDescriptor = systable_beginscan(relation, SysScanDesc scanDescriptor = systable_beginscan(relation,

View File

@ -1,6 +1,6 @@
/*------------------------------------------------------------------------- /*-------------------------------------------------------------------------
* *
* pg_get_object_address_12_13_14.c * pg_get_object_address_13_14_15.c
* *
* Copied functions from Postgres pg_get_object_address with acl/owner check. * Copied functions from Postgres pg_get_object_address with acl/owner check.
* Since we need to use intermediate data types Relation and Node from * Since we need to use intermediate data types Relation and Node from
@ -40,11 +40,6 @@ static void ErrorIfCurrentUserCanNotDistributeObject(ObjectType type,
Relation *relation); Relation *relation);
static List * textarray_to_strvaluelist(ArrayType *arr); static List * textarray_to_strvaluelist(ArrayType *arr);
/* It is defined on PG >= 13 versions by default */
#if PG_VERSION_NUM < PG_VERSION_13
#define TYPALIGN_INT 'i'
#endif
/* /*
* PgGetObjectAddress gets the object address. This function is mostly copied from * PgGetObjectAddress gets the object address. This function is mostly copied from
* pg_get_object_address of the PG code. We need to copy that function to use * pg_get_object_address of the PG code. We need to copy that function to use
@ -283,6 +278,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr)
case OBJECT_FDW: case OBJECT_FDW:
case OBJECT_FOREIGN_SERVER: case OBJECT_FOREIGN_SERVER:
case OBJECT_LANGUAGE: case OBJECT_LANGUAGE:
#if PG_VERSION_NUM >= PG_VERSION_15
case OBJECT_PARAMETER_ACL:
#endif
case OBJECT_PUBLICATION: case OBJECT_PUBLICATION:
case OBJECT_ROLE: case OBJECT_ROLE:
case OBJECT_SCHEMA: case OBJECT_SCHEMA:
@ -320,6 +318,9 @@ PgGetObjectAddress(char *ttype, ArrayType *namearr, ArrayType *argsarr)
break; break;
} }
#if PG_VERSION_NUM >= PG_VERSION_15
case OBJECT_PUBLICATION_NAMESPACE:
#endif
case OBJECT_USER_MAPPING: case OBJECT_USER_MAPPING:
{ {
objnode = (Node *) list_make2(linitial(name), linitial(args)); objnode = (Node *) list_make2(linitial(name), linitial(args));

View File

@ -54,7 +54,6 @@
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
#include "utils/int8.h"
#include "utils/json.h" #include "utils/json.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"
@ -1396,9 +1395,9 @@ GetShardStatistics(MultiConnection *connection, HTAB *shardIds)
for (int rowIndex = 0; rowIndex < rowCount; rowIndex++) for (int rowIndex = 0; rowIndex < rowCount; rowIndex++)
{ {
char *shardIdString = PQgetvalue(result, rowIndex, 0); char *shardIdString = PQgetvalue(result, rowIndex, 0);
uint64 shardId = pg_strtouint64(shardIdString, NULL, 10); uint64 shardId = strtou64(shardIdString, NULL, 10);
char *sizeString = PQgetvalue(result, rowIndex, 1); char *sizeString = PQgetvalue(result, rowIndex, 1);
uint64 totalSize = pg_strtouint64(sizeString, NULL, 10); uint64 totalSize = strtou64(sizeString, NULL, 10);
ShardStatistics *statistics = ShardStatistics *statistics =
hash_search(shardStatistics, &shardId, HASH_ENTER, NULL); hash_search(shardStatistics, &shardId, HASH_ENTER, NULL);

View File

@ -923,7 +923,7 @@ WorkerShardStats(ShardPlacement *placement, Oid relationId, const char *shardNam
} }
errno = 0; errno = 0;
uint64 tableSize = pg_strtouint64(tableSizeString, &tableSizeStringEnd, 0); uint64 tableSize = strtou64(tableSizeString, &tableSizeStringEnd, 0);
if (errno != 0 || (*tableSizeStringEnd) != '\0') if (errno != 0 || (*tableSizeStringEnd) != '\0')
{ {
PQclear(queryResult); PQclear(queryResult);

View File

@ -12,6 +12,7 @@
*------------------------------------------------------------------------- *-------------------------------------------------------------------------
*/ */
#include "postgres.h" #include "postgres.h"
#include "pg_version_compat.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "distributed/cte_inline.h" #include "distributed/cte_inline.h"
@ -309,7 +310,7 @@ inline_cte_walker(Node *node, inline_cte_walker_context *context)
*/ */
if (columnAliasCount >= columnIndex) if (columnAliasCount >= columnIndex)
{ {
Value *columnAlias = (Value *) list_nth(columnAliasList, columnIndex - 1); String *columnAlias = (String *) list_nth(columnAliasList, columnIndex - 1);
Assert(IsA(columnAlias, String)); Assert(IsA(columnAlias, String));
TargetEntry *targetEntry = TargetEntry *targetEntry =
list_nth(rte->subquery->targetList, columnIndex - 1); list_nth(rte->subquery->targetList, columnIndex - 1);

View File

@ -1065,7 +1065,7 @@ CreateDistributedPlan(uint64 planId, Query *originalQuery, Query *query, ParamLi
/* /*
* EnsurePartitionTableNotReplicated errors out if the infput relation is * EnsurePartitionTableNotReplicated errors out if the input relation is
* a partition table and the table has a replication factor greater than * a partition table and the table has a replication factor greater than
* one. * one.
* *
@ -1353,7 +1353,7 @@ FinalizeRouterPlan(PlannedStmt *localPlan, CustomScan *customScan)
TargetEntry *targetEntry = NULL; TargetEntry *targetEntry = NULL;
foreach_ptr(targetEntry, customScan->scan.plan.targetlist) foreach_ptr(targetEntry, customScan->scan.plan.targetlist)
{ {
Value *columnName = makeString(targetEntry->resname); String *columnName = makeString(targetEntry->resname);
columnNameList = lappend(columnNameList, columnName); columnNameList = lappend(columnNameList, columnName);
} }

View File

@ -102,15 +102,15 @@ PlannedStmt *
GeneratePlaceHolderPlannedStmt(Query *parse) GeneratePlaceHolderPlannedStmt(Query *parse)
{ {
PlannedStmt *result = makeNode(PlannedStmt); PlannedStmt *result = makeNode(PlannedStmt);
SeqScan *seqScanNode = makeNode(SeqScan); Scan *scanNode = makeNode(Scan);
Plan *plan = &seqScanNode->plan; Plan *plan = &scanNode->plan;
Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL; Node *distKey PG_USED_FOR_ASSERTS_ONLY = NULL;
AssertArg(FastPathRouterQuery(parse, &distKey)); AssertArg(FastPathRouterQuery(parse, &distKey));
/* there is only a single relation rte */ /* there is only a single relation rte */
seqScanNode->scanrelid = 1; scanNode->scanrelid = 1;
plan->targetlist = plan->targetlist =
copyObject(FetchStatementTargetList((Node *) parse)); copyObject(FetchStatementTargetList((Node *) parse));

View File

@ -1062,8 +1062,8 @@ worker_save_query_explain_analyze(PG_FUNCTION_ARGS)
} }
/* resolve OIDs of unknown (user-defined) types */ /* resolve OIDs of unknown (user-defined) types */
Query *analyzedQuery = parse_analyze_varparams(parseTree, queryString, Query *analyzedQuery = parse_analyze_varparams_compat(parseTree, queryString,
&paramTypes, &numParams); &paramTypes, &numParams, NULL);
#if PG_VERSION_NUM >= PG_VERSION_14 #if PG_VERSION_NUM >= PG_VERSION_14

View File

@ -798,7 +798,7 @@ DerivedColumnNameList(uint32 columnCount, uint64 generatingJobId)
appendStringInfo(columnName, UINT64_FORMAT "_", generatingJobId); appendStringInfo(columnName, UINT64_FORMAT "_", generatingJobId);
appendStringInfo(columnName, "%u", columnIndex); appendStringInfo(columnName, "%u", columnIndex);
Value *columnValue = makeString(columnName->data); String *columnValue = makeString(columnName->data);
columnNameList = lappend(columnNameList, columnValue); columnNameList = lappend(columnNameList, columnValue);
} }

View File

@ -151,7 +151,7 @@ static Job * RouterJob(Query *originalQuery,
static bool RelationPrunesToMultipleShards(List *relationShardList); static bool RelationPrunesToMultipleShards(List *relationShardList);
static void NormalizeMultiRowInsertTargetList(Query *query); static void NormalizeMultiRowInsertTargetList(Query *query);
static void AppendNextDummyColReference(Alias *expendedReferenceNames); static void AppendNextDummyColReference(Alias *expendedReferenceNames);
static Value * MakeDummyColumnString(int dummyColumnId); static String * MakeDummyColumnString(int dummyColumnId);
static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError); static List * BuildRoutesForInsert(Query *query, DeferredErrorMessage **planningError);
static List * GroupInsertValuesByShardId(List *insertValuesList); static List * GroupInsertValuesByShardId(List *insertValuesList);
static List * ExtractInsertValuesList(Query *query, Var *partitionColumn); static List * ExtractInsertValuesList(Query *query, Var *partitionColumn);
@ -3249,7 +3249,7 @@ AppendNextDummyColReference(Alias *expendedReferenceNames)
{ {
int existingColReferences = list_length(expendedReferenceNames->colnames); int existingColReferences = list_length(expendedReferenceNames->colnames);
int nextColReferenceId = existingColReferences + 1; int nextColReferenceId = existingColReferences + 1;
Value *missingColumnString = MakeDummyColumnString(nextColReferenceId); String *missingColumnString = MakeDummyColumnString(nextColReferenceId);
expendedReferenceNames->colnames = lappend(expendedReferenceNames->colnames, expendedReferenceNames->colnames = lappend(expendedReferenceNames->colnames,
missingColumnString); missingColumnString);
} }
@ -3259,12 +3259,12 @@ AppendNextDummyColReference(Alias *expendedReferenceNames)
* MakeDummyColumnString returns a String (Value) object by appending given * MakeDummyColumnString returns a String (Value) object by appending given
* integer to end of the "column" string. * integer to end of the "column" string.
*/ */
static Value * static String *
MakeDummyColumnString(int dummyColumnId) MakeDummyColumnString(int dummyColumnId)
{ {
StringInfo dummyColumnStringInfo = makeStringInfo(); StringInfo dummyColumnStringInfo = makeStringInfo();
appendStringInfo(dummyColumnStringInfo, "column%d", dummyColumnId); appendStringInfo(dummyColumnStringInfo, "column%d", dummyColumnId);
Value *dummyColumnString = makeString(dummyColumnStringInfo->data); String *dummyColumnString = makeString(dummyColumnStringInfo->data);
return dummyColumnString; return dummyColumnString;
} }

View File

@ -1952,7 +1952,7 @@ BuildReadIntermediateResultsQuery(List *targetEntryList, List *columnAliasList,
*/ */
if (columnAliasCount >= columnNumber) if (columnAliasCount >= columnNumber)
{ {
Value *columnAlias = (Value *) list_nth(columnAliasList, columnNumber - 1); String *columnAlias = (String *) list_nth(columnAliasList, columnNumber - 1);
Assert(IsA(columnAlias, String)); Assert(IsA(columnAlias, String));
newTargetEntry->resname = strVal(columnAlias); newTargetEntry->resname = strVal(columnAlias);
} }

View File

@ -326,8 +326,8 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX || if (objectType == OBJECT_TABLE || objectType == OBJECT_INDEX ||
objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER) objectType == OBJECT_FOREIGN_TABLE || objectType == OBJECT_FOREIGN_SERVER)
{ {
Value *relationSchemaNameValue = NULL; String *relationSchemaNameValue = NULL;
Value *relationNameValue = NULL; String *relationNameValue = NULL;
uint32 dropCount = list_length(dropStmt->objects); uint32 dropCount = list_length(dropStmt->objects);
if (dropCount > 1) if (dropCount > 1)
@ -381,11 +381,11 @@ RelayEventExtendNames(Node *parseTree, char *schemaName, uint64 shardId)
/* prefix with schema name if it is not added already */ /* prefix with schema name if it is not added already */
if (relationSchemaNameValue == NULL) if (relationSchemaNameValue == NULL)
{ {
Value *schemaNameValue = makeString(pstrdup(schemaName)); String *schemaNameValue = makeString(pstrdup(schemaName));
relationNameList = lcons(schemaNameValue, relationNameList); relationNameList = lcons(schemaNameValue, relationNameList);
} }
char **relationName = &(relationNameValue->val.str); char **relationName = &(strVal(relationNameValue));
AppendShardIdToName(relationName, shardId); AppendShardIdToName(relationName, shardId);
} }
else if (objectType == OBJECT_POLICY) else if (objectType == OBJECT_POLICY)
@ -750,10 +750,10 @@ UpdateWholeRowColumnReferencesWalker(Node *node, uint64 *shardId)
* extend the penultimate element with the shardId. * extend the penultimate element with the shardId.
*/ */
int colrefFieldCount = list_length(columnRef->fields); int colrefFieldCount = list_length(columnRef->fields);
Value *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2); String *relnameValue = list_nth(columnRef->fields, colrefFieldCount - 2);
Assert(IsA(relnameValue, String)); Assert(IsA(relnameValue, String));
AppendShardIdToName(&relnameValue->val.str, *shardId); AppendShardIdToName(&strVal(relnameValue), *shardId);
} }
/* might be more than one ColumnRef to visit */ /* might be more than one ColumnRef to visit */

View File

@ -159,9 +159,9 @@ static bool ErrorIfNotASuitableDeadlockFactor(double *newval, void **extra,
static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source); static bool WarnIfDeprecatedExecutorUsed(int *newval, void **extra, GucSource source);
static bool WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source); static bool WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source);
static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSource source); static bool NoticeIfSubqueryPushdownEnabled(bool *newval, void **extra, GucSource source);
static bool HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, static bool ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra,
GucSource source); GucSource source);
static void HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra); static void ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra);
static void ApplicationNameAssignHook(const char *newval, void *extra); static void ApplicationNameAssignHook(const char *newval, void *extra);
static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source); static bool NodeConninfoGucCheckHook(char **newval, void **extra, GucSource source);
static void NodeConninfoGucAssignHook(const char *newval, void *extra); static void NodeConninfoGucAssignHook(const char *newval, void *extra);
@ -1174,24 +1174,6 @@ RegisterCitusConfigVariables(void)
GUC_NO_SHOW_ALL, GUC_NO_SHOW_ALL,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomStringVariable(
"citus.hide_shards_from_app_name_prefixes",
gettext_noop("If application_name starts with one of these values, hide shards"),
gettext_noop("Citus places distributed tables and shards in the same schema. "
"That can cause confusion when inspecting the list of tables on "
"a node with shards. This GUC can be used to hide the shards from "
"pg_class for certain applications based on the application_name "
"of the connection. The default is *, which hides shards from all "
"applications. This behaviour can be overridden using the "
"citus.override_table_visibility setting"),
&HideShardsFromAppNamePrefixes,
"*",
PGC_USERSET,
GUC_STANDARD,
HideShardsFromAppNamePrefixesCheckHook,
HideShardsFromAppNamePrefixesAssignHook,
NULL);
DefineCustomIntVariable( DefineCustomIntVariable(
"citus.isolation_test_session_process_id", "citus.isolation_test_session_process_id",
NULL, NULL,
@ -1716,6 +1698,25 @@ RegisterCitusConfigVariables(void)
GUC_STANDARD, GUC_STANDARD,
NULL, NULL, NULL); NULL, NULL, NULL);
DefineCustomStringVariable(
"citus.show_shards_for_app_name_prefixes",
gettext_noop("If application_name starts with one of these values, show shards"),
gettext_noop("Citus places distributed tables and shards in the same schema. "
"That can cause confusion when inspecting the list of tables on "
"a node with shards. By default the shards are hidden from "
"pg_class. This GUC can be used to show the shards to certain "
"applications based on the application_name of the connection. "
"The default is empty string, which hides shards from all "
"applications. This behaviour can be overridden using the "
"citus.override_table_visibility setting"),
&ShowShardsForAppNamePrefixes,
"",
PGC_USERSET,
GUC_STANDARD,
ShowShardsForAppNamePrefixesCheckHook,
ShowShardsForAppNamePrefixesAssignHook,
NULL);
DefineCustomBoolVariable( DefineCustomBoolVariable(
"citus.sort_returning", "citus.sort_returning",
gettext_noop("Sorts the RETURNING clause to get consistent test output"), gettext_noop("Sorts the RETURNING clause to get consistent test output"),
@ -1985,12 +1986,12 @@ WarnIfReplicationModelIsSet(int *newval, void **extra, GucSource source)
/* /*
* HideShardsFromAppNamePrefixesCheckHook ensures that the * ShowShardsForAppNamePrefixesCheckHook ensures that the
* citus.hide_shards_from_app_name_prefixes holds a valid list of application_name * citus.show_shards_for_app_name_prefixes holds a valid list of application_name
* values. * values.
*/ */
static bool static bool
HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source) ShowShardsForAppNamePrefixesCheckHook(char **newval, void **extra, GucSource source)
{ {
List *prefixList = NIL; List *prefixList = NIL;
@ -2020,7 +2021,7 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so
if (strcmp(prefixAscii, appNamePrefix) != 0) if (strcmp(prefixAscii, appNamePrefix) != 0)
{ {
GUC_check_errdetail("prefix %s in citus.hide_shards_from_app_name_prefixes " GUC_check_errdetail("prefix %s in citus.show_shards_for_app_name_prefixes "
"contains non-ascii characters", appNamePrefix); "contains non-ascii characters", appNamePrefix);
return false; return false;
} }
@ -2031,12 +2032,12 @@ HideShardsFromAppNamePrefixesCheckHook(char **newval, void **extra, GucSource so
/* /*
* HideShardsFromAppNamePrefixesAssignHook ensures changes to * ShowShardsForAppNamePrefixesAssignHook ensures changes to
* citus.hide_shards_from_app_name_prefixes are reflected in the decision * citus.show_shards_for_app_name_prefixes are reflected in the decision
* whether or not to show shards. * whether or not to show shards.
*/ */
static void static void
HideShardsFromAppNamePrefixesAssignHook(const char *newval, void *extra) ShowShardsForAppNamePrefixesAssignHook(const char *newval, void *extra)
{ {
ResetHideShardsDecision(); ResetHideShardsDecision();
} }

View File

@ -1 +1,2 @@
-- bump version to 11.0-2 #include "udfs/citus_shards_on_worker/11.0-2.sql"
#include "udfs/citus_shard_indexes_on_worker/11.0-2.sql"

View File

@ -1 +1,2 @@
-- bump down version to 11.0-1 #include "../udfs/citus_shards_on_worker/11.0-1.sql"
#include "../udfs/citus_shard_indexes_on_worker/11.0-1.sql"

View File

@ -0,0 +1,39 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker(
OUT schema_name name,
OUT index_name name,
OUT table_type text,
OUT owner_name name,
OUT shard_name name)
RETURNS SETOF record
LANGUAGE plpgsql
SET citus.show_shards_for_app_name_prefixes = '*'
AS $$
BEGIN
-- this is the query that \di produces, except pg_table_is_visible
-- is replaced with pg_catalog.relation_is_a_known_shard(c.oid)
RETURN QUERY
SELECT n.nspname as "Schema",
c.relname as "Name",
CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner",
c2.relname as "Table"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
LEFT JOIN pg_catalog.pg_index i ON i.indexrelid = c.oid
LEFT JOIN pg_catalog.pg_class c2 ON i.indrelid = c2.oid
WHERE c.relkind IN ('i','')
AND n.nspname <> 'pg_catalog'
AND n.nspname <> 'information_schema'
AND n.nspname !~ '^pg_toast'
AND pg_catalog.relation_is_a_known_shard(c.oid)
ORDER BY 1,2;
END;
$$;
CREATE OR REPLACE VIEW pg_catalog.citus_shard_indexes_on_worker AS
SELECT schema_name as "Schema",
index_name as "Name",
table_type as "Type",
owner_name as "Owner",
shard_name as "Table"
FROM pg_catalog.citus_shard_indexes_on_worker() s;

View File

@ -6,7 +6,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shard_indexes_on_worker(
OUT shard_name name) OUT shard_name name)
RETURNS SETOF record RETURNS SETOF record
LANGUAGE plpgsql LANGUAGE plpgsql
SET citus.hide_shards_from_app_name_prefixes = '' SET citus.show_shards_for_app_name_prefixes = '*'
AS $$ AS $$
BEGIN BEGIN
-- this is the query that \di produces, except pg_table_is_visible -- this is the query that \di produces, except pg_table_is_visible

View File

@ -0,0 +1,34 @@
CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker(
OUT schema_name name,
OUT shard_name name,
OUT table_type text,
OUT owner_name name)
RETURNS SETOF record
LANGUAGE plpgsql
SET citus.show_shards_for_app_name_prefixes = '*'
AS $$
BEGIN
-- this is the query that \d produces, except pg_table_is_visible
-- is replaced with pg_catalog.relation_is_a_known_shard(c.oid)
RETURN QUERY
SELECT n.nspname as "Schema",
c.relname as "Name",
CASE c.relkind WHEN 'r' THEN 'table' WHEN 'v' THEN 'view' WHEN 'm' THEN 'materialized view' WHEN 'i' THEN 'index' WHEN 'S' THEN 'sequence' WHEN 's' THEN 'special' WHEN 'f' THEN 'foreign table' WHEN 'p' THEN 'table' END as "Type",
pg_catalog.pg_get_userbyid(c.relowner) as "Owner"
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind IN ('r','p','v','m','S','f','')
AND n.nspname <> 'pg_catalog'
AND n.nspname <> 'information_schema'
AND n.nspname !~ '^pg_toast'
AND pg_catalog.relation_is_a_known_shard(c.oid)
ORDER BY 1,2;
END;
$$;
CREATE OR REPLACE VIEW pg_catalog.citus_shards_on_worker AS
SELECT schema_name as "Schema",
shard_name as "Name",
table_type as "Type",
owner_name as "Owner"
FROM pg_catalog.citus_shards_on_worker() s;

View File

@ -5,7 +5,7 @@ CREATE OR REPLACE FUNCTION pg_catalog.citus_shards_on_worker(
OUT owner_name name) OUT owner_name name)
RETURNS SETOF record RETURNS SETOF record
LANGUAGE plpgsql LANGUAGE plpgsql
SET citus.hide_shards_from_app_name_prefixes = '' SET citus.show_shards_for_app_name_prefixes = '*'
AS $$ AS $$
BEGIN BEGIN
-- this is the query that \d produces, except pg_table_is_visible -- this is the query that \d produces, except pg_table_is_visible

View File

@ -49,9 +49,9 @@ deparse_shard_query_test(PG_FUNCTION_ARGS)
Node *parsetree = NULL; Node *parsetree = NULL;
foreach_ptr(parsetree, parseTreeList) foreach_ptr(parsetree, parseTreeList)
{ {
List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree,
queryStringChar, queryStringChar,
NULL, 0, NULL); NULL, 0, NULL);
Query *query = NULL; Query *query = NULL;
foreach_ptr(query, queryTreeList) foreach_ptr(query, queryTreeList)

View File

@ -259,9 +259,9 @@ relation_count_in_query(PG_FUNCTION_ARGS)
Node *parsetree = NULL; Node *parsetree = NULL;
foreach_ptr(parsetree, parseTreeList) foreach_ptr(parsetree, parseTreeList)
{ {
List *queryTreeList = pg_analyze_and_rewrite((RawStmt *) parsetree, List *queryTreeList = pg_analyze_and_rewrite_fixedparams((RawStmt *) parsetree,
queryStringChar, queryStringChar,
NULL, 0, NULL); NULL, 0, NULL);
Query *query = NULL; Query *query = NULL;
foreach_ptr(query, queryTreeList) foreach_ptr(query, queryTreeList)

View File

@ -20,6 +20,7 @@
#include "postgres.h" #include "postgres.h"
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#include "pg_version_compat.h"
#include "access/amapi.h" #include "access/amapi.h"
@ -325,7 +326,7 @@ fake_relation_set_new_filenode(Relation rel,
*/ */
*minmulti = GetOldestMultiXactId(); *minmulti = GetOldestMultiXactId();
SMgrRelation srel = RelationCreateStorage(*newrnode, persistence); SMgrRelation srel = RelationCreateStorage_compat(*newrnode, persistence, true);
/* /*
* If required, set up an init fork for an unlogged table so that it can * If required, set up an init fork for an unlogged table so that it can
@ -446,20 +447,17 @@ fake_relation_size(Relation rel, ForkNumber forkNumber)
uint64 nblocks = 0; uint64 nblocks = 0;
/* Open it at the smgr level if not already done */
RelationOpenSmgr(rel);
/* InvalidForkNumber indicates returning the size for all forks */ /* InvalidForkNumber indicates returning the size for all forks */
if (forkNumber == InvalidForkNumber) if (forkNumber == InvalidForkNumber)
{ {
for (int i = 0; i < MAX_FORKNUM; i++) for (int i = 0; i < MAX_FORKNUM; i++)
{ {
nblocks += smgrnblocks(rel->rd_smgr, i); nblocks += smgrnblocks(RelationGetSmgr(rel), i);
} }
} }
else else
{ {
nblocks = smgrnblocks(rel->rd_smgr, forkNumber); nblocks = smgrnblocks(RelationGetSmgr(rel), forkNumber);
} }
return nblocks * BLCKSZ; return nblocks * BLCKSZ;

View File

@ -28,7 +28,6 @@
#include "funcapi.h" #include "funcapi.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/builtins.h" #include "utils/builtins.h"
#include "utils/int8.h"
#include "utils/json.h" #include "utils/json.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "utils/memutils.h" #include "utils/memutils.h"

View File

@ -37,6 +37,29 @@ wake_up_connection_pool_waiters(PG_FUNCTION_ARGS)
} }
/*
* makeIntConst creates a Const Node that stores a given integer
*
* copied from backend/parser/gram.c
*/
static Node *
makeIntConst(int val, int location)
{
A_Const *n = makeNode(A_Const);
#if PG_VERSION_NUM >= PG_VERSION_15
n->val.ival.type = T_Integer;
n->val.ival.ival = val;
#else
n->val.type = T_Integer;
n->val.val.ival = val;
#endif
n->location = location;
return (Node *) n;
}
/* /*
* set_max_shared_pool_size is a SQL * set_max_shared_pool_size is a SQL
* interface for setting MaxSharedPoolSize. We use this function in isolation * interface for setting MaxSharedPoolSize. We use this function in isolation
@ -49,9 +72,8 @@ set_max_shared_pool_size(PG_FUNCTION_ARGS)
AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt)); AlterSystemStmt *alterSystemStmt = palloc0(sizeof(AlterSystemStmt));
A_Const *aConstValue = makeNode(A_Const); A_Const *aConstValue = castNode(A_Const, makeIntConst(value, 0));
aConstValue->val = *makeInteger(value);
alterSystemStmt->setstmt = makeNode(VariableSetStmt); alterSystemStmt->setstmt = makeNode(VariableSetStmt);
alterSystemStmt->setstmt->name = "citus.max_shared_pool_size"; alterSystemStmt->setstmt->name = "citus.max_shared_pool_size";
alterSystemStmt->setstmt->is_local = false; alterSystemStmt->setstmt->is_local = false;

View File

@ -309,7 +309,7 @@ ParseIntField(PGresult *result, int rowIndex, int colIndex)
char *resultString = PQgetvalue(result, rowIndex, colIndex); char *resultString = PQgetvalue(result, rowIndex, colIndex);
return pg_strtouint64(resultString, NULL, 10); return strtou64(resultString, NULL, 10);
} }

View File

@ -1408,7 +1408,7 @@ ParsePreparedTransactionName(char *preparedTransactionName,
/* step ahead of the current '_' character */ /* step ahead of the current '_' character */
++currentCharPointer; ++currentCharPointer;
*transactionNumber = pg_strtouint64(currentCharPointer, NULL, 10); *transactionNumber = strtou64(currentCharPointer, NULL, 10);
if ((*transactionNumber == 0 && errno != 0) || if ((*transactionNumber == 0 && errno != 0) ||
(*transactionNumber == ULLONG_MAX && errno == ERANGE)) (*transactionNumber == ULLONG_MAX && errno == ERANGE))
{ {

View File

@ -205,8 +205,17 @@ InCoordinatedTransaction(void)
void void
Use2PCForCoordinatedTransaction(void) Use2PCForCoordinatedTransaction(void)
{ {
Assert(InCoordinatedTransaction()); /*
* If this transaction is also a coordinated
* transaction, use 2PC. Otherwise, this
* state change does nothing.
*
* In other words, when this flag is set,
* we "should" use 2PC when needed (e.g.,
* we are in a coordinated transaction and
* the coordinated transaction does a remote
* modification).
*/
ShouldCoordinatedTransactionUse2PC = true; ShouldCoordinatedTransactionUse2PC = true;
} }

View File

@ -917,7 +917,7 @@ try_relation_open_nolock(Oid relationId)
return NULL; return NULL;
} }
pgstat_initstats(relation); pgstat_init_relation(relation);
return relation; return relation;
} }

View File

@ -387,7 +387,7 @@ ExtractShardIdFromTableName(const char *tableName, bool missingOk)
shardIdString++; shardIdString++;
errno = 0; errno = 0;
uint64 shardId = pg_strtouint64(shardIdString, &shardIdStringEnd, 0); uint64 shardId = strtou64(shardIdString, &shardIdStringEnd, 0);
if (errno != 0 || (*shardIdStringEnd != '\0')) if (errno != 0 || (*shardIdStringEnd != '\0'))
{ {

View File

@ -40,8 +40,8 @@ typedef enum HideShardsMode
bool OverrideTableVisibility = true; bool OverrideTableVisibility = true;
bool EnableManualChangesToShards = false; bool EnableManualChangesToShards = false;
/* hide shards when the application_name starts with one of: */ /* show shards when the application_name starts with one of: */
char *HideShardsFromAppNamePrefixes = "*"; char *ShowShardsForAppNamePrefixes = "";
/* cache of whether or not to hide shards */ /* cache of whether or not to hide shards */
static HideShardsMode HideShards = CHECK_APPLICATION_NAME; static HideShardsMode HideShards = CHECK_APPLICATION_NAME;
@ -271,8 +271,8 @@ RelationIsAKnownShard(Oid shardRelationId)
/* /*
* HideShardsFromSomeApplications transforms queries to pg_class to * HideShardsFromSomeApplications transforms queries to pg_class to
* filter out known shards if the application_name matches any of * filter out known shards if the application_name does not match any of
* the prefixes in citus.hide_shards_from_app_name_prefixes. * the prefixes in citus.show_shards_for_app_name_prefix.
*/ */
void void
HideShardsFromSomeApplications(Query *query) HideShardsFromSomeApplications(Query *query)
@ -294,7 +294,7 @@ HideShardsFromSomeApplications(Query *query)
* ShouldHideShards returns whether we should hide shards in the current * ShouldHideShards returns whether we should hide shards in the current
* session. It only checks the application_name once and then uses a * session. It only checks the application_name once and then uses a
* cached response unless either the application_name or * cached response unless either the application_name or
* citus.hide_shards_from_app_name_prefixes changes. * citus.show_shards_for_app_name_prefix changes.
*/ */
static bool static bool
ShouldHideShards(void) ShouldHideShards(void)
@ -367,32 +367,33 @@ ShouldHideShardsInternal(void)
List *prefixList = NIL; List *prefixList = NIL;
/* SplitGUCList scribbles on the input */ /* SplitGUCList scribbles on the input */
char *splitCopy = pstrdup(HideShardsFromAppNamePrefixes); char *splitCopy = pstrdup(ShowShardsForAppNamePrefixes);
if (!SplitGUCList(splitCopy, ',', &prefixList)) if (!SplitGUCList(splitCopy, ',', &prefixList))
{ {
/* invalid GUC value, ignore */ /* invalid GUC value, ignore */
return false; return true;
} }
char *appNamePrefix = NULL; char *appNamePrefix = NULL;
foreach_ptr(appNamePrefix, prefixList) foreach_ptr(appNamePrefix, prefixList)
{ {
/* always hide shards when one of the prefixes is * */ /* never hide shards when one of the prefixes is * */
if (strcmp(appNamePrefix, "*") == 0) if (strcmp(appNamePrefix, "*") == 0)
{ {
return true; return false;
} }
/* compare only the first first <prefixLength> characters */ /* compare only the first first <prefixLength> characters */
int prefixLength = strlen(appNamePrefix); int prefixLength = strlen(appNamePrefix);
if (strncmp(application_name, appNamePrefix, prefixLength) == 0) if (strncmp(application_name, appNamePrefix, prefixLength) == 0)
{ {
return true; return false;
} }
} }
return false; /* default behaviour: hide shards */
return true;
} }

View File

@ -14,6 +14,14 @@
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#if PG_VERSION_NUM >= PG_VERSION_15
#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \
ExecARDeleteTriggers(a, b, c, d, e, f)
#else
#define ExecARDeleteTriggers_compat(a, b, c, d, e, f) \
ExecARDeleteTriggers(a, b, c, d, e)
#endif
#if PG_VERSION_NUM >= PG_VERSION_14 #if PG_VERSION_NUM >= PG_VERSION_14
#define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \ #define ColumnarProcessUtility_compat(a, b, c, d, e, f, g, h) \
ColumnarProcessUtility(a, b, c, d, e, f, g, h) ColumnarProcessUtility(a, b, c, d, e, f, g, h)

View File

@ -50,13 +50,13 @@ extern bool InDelegatedProcedureCall;
/* /*
* A DDLJob encapsulates the remote tasks and commands needed to process all or * A DDLJob encapsulates the remote tasks and commands needed to process all or
* part of a distributed DDL command. It hold the distributed relation's oid, * part of a distributed DDL command. It hold the target object's address,
* the original DDL command string (for MX DDL propagation), and a task list of * the original DDL command string (for MX DDL propagation), and a task list of
* DDL_TASK-type Tasks to be executed. * DDL_TASK-type Tasks to be executed.
*/ */
typedef struct DDLJob typedef struct DDLJob
{ {
Oid targetRelationId; /* oid of the target distributed relation */ ObjectAddress targetObjectAddress; /* target distributed object address */
/* /*
* Whether to commit and start a new transaction before sending commands * Whether to commit and start a new transaction before sending commands

View File

@ -13,6 +13,7 @@
#define METADATA_SYNC_H #define METADATA_SYNC_H
#include "distributed/commands/utility_hook.h"
#include "distributed/coordinator_protocol.h" #include "distributed/coordinator_protocol.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
@ -34,6 +35,7 @@ extern void SyncCitusTableMetadata(Oid relationId);
extern void EnsureSequentialModeMetadataOperations(void); extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern char * LocalGroupIdUpdateCommand(int32 groupId); extern char * LocalGroupIdUpdateCommand(int32 groupId);
extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
extern List * NodeMetadataCreateCommands(void); extern List * NodeMetadataCreateCommands(void);

View File

@ -15,7 +15,7 @@
extern bool OverrideTableVisibility; extern bool OverrideTableVisibility;
extern bool EnableManualChangesToShards; extern bool EnableManualChangesToShards;
extern char *HideShardsFromAppNamePrefixes; extern char *ShowShardsForAppNamePrefixes;
extern void HideShardsFromSomeApplications(Query *query); extern void HideShardsFromSomeApplications(Query *query);

View File

@ -13,6 +13,55 @@
#include "distributed/pg_version_constants.h" #include "distributed/pg_version_constants.h"
#if PG_VERSION_NUM >= PG_VERSION_15
#define ProcessCompletedNotifies()
#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b, c)
#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d, \
e)
#else
#include "nodes/value.h"
#include "storage/smgr.h"
#include "utils/int8.h"
#include "utils/rel.h"
typedef Value String;
#ifdef HAVE_LONG_INT_64
#define strtoi64(str, endptr, base) ((int64) strtol(str, endptr, base))
#define strtou64(str, endptr, base) ((uint64) strtoul(str, endptr, base))
#else
#define strtoi64(str, endptr, base) ((int64) strtoll(str, endptr, base))
#define strtou64(str, endptr, base) ((uint64) strtoull(str, endptr, base))
#endif
#define RelationCreateStorage_compat(a, b, c) RelationCreateStorage(a, b)
#define parse_analyze_varparams_compat(a, b, c, d, e) parse_analyze_varparams(a, b, c, d)
#define pgstat_init_relation(r) pgstat_initstats(r)
#define pg_analyze_and_rewrite_fixedparams(a, b, c, d, e) pg_analyze_and_rewrite(a, b, c, \
d, e)
static inline int64
pg_strtoint64(char *s)
{
int64 result;
(void) scanint8(s, false, &result);
return result;
}
static inline SMgrRelation
RelationGetSmgr(Relation rel)
{
if (unlikely(rel->rd_smgr == NULL))
{
smgrsetowner(&(rel->rd_smgr), smgropen(rel->rd_node, rel->rd_backend));
}
return rel->rd_smgr;
}
#endif
#if PG_VERSION_NUM >= PG_VERSION_14 #if PG_VERSION_NUM >= PG_VERSION_14
#define AlterTableStmtObjType_compat(a) ((a)->objtype) #define AlterTableStmtObjType_compat(a) ((a)->objtype)
#define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b) #define getObjectTypeDescription_compat(a, b) getObjectTypeDescription(a, b)

View File

@ -0,0 +1,68 @@
Parsed test spec with 2 sessions
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1 s1-abort s2-abort
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-begin: BEGIN;
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
step s2-begin: BEGIN;
step s2-update-node-1:
-- update a specific node by address
SELECT master_update_node(nodeid, 'localhost', nodeport + 10)
FROM pg_dist_node
WHERE nodename = 'localhost'
AND nodeport = 57637;
<waiting ...>
step s1-abort: ABORT;
step s2-update-node-1: <... completed>
master_update_node
---------------------------------------------------------------------
(1 row)
step s2-abort: ABORT;
master_remove_node
---------------------------------------------------------------------
(2 rows)
starting permutation: s1-begin s1-insert s2-begin s2-update-node-1-force s2-abort s1-abort
create_distributed_table
---------------------------------------------------------------------
(1 row)
step s1-begin: BEGIN;
step s1-insert: INSERT INTO t1 SELECT generate_series(1, 100);
step s2-begin: BEGIN;
step s2-update-node-1-force:
-- update a specific node by address (force)
SELECT master_update_node(nodeid, 'localhost', nodeport + 10, force => true, lock_cooldown => 100)
FROM pg_dist_node
WHERE nodename = 'localhost'
AND nodeport = 57637;
<waiting ...>
step s2-update-node-1-force: <... completed>
master_update_node
---------------------------------------------------------------------
(1 row)
step s2-abort: ABORT;
step s1-abort: ABORT;
FATAL: terminating connection due to administrator command
server closed the connection unexpectedly
This probably means the server terminated abnormally
before or while processing the request.
master_remove_node
---------------------------------------------------------------------
(2 rows)

View File

@ -1989,6 +1989,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
(1 row) (1 row)
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
create_distributed_table
---------------------------------------------------------------------
(1 row)
CREATE OR REPLACE FUNCTION regular_func(p invite_resp) CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$ RETURNS int AS $$
DECLARE DECLARE
@ -2438,6 +2449,755 @@ DEBUG: Creating router plan
17 | 777 | no 17 | 777 | no
(2 rows) (2 rows)
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 2
NOTICE: executing the command locally: SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480007 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 2)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
DEBUG: Distributed planning for a fast-path router query
DEBUG: Creating router plan
DEBUG: query has a single distribution column value: 1
NOTICE: issuing SELECT event_id, user_id, response FROM public.event_responses_no_pkey_1480004 event_responses_no_pkey WHERE (event_id OPERATOR(pg_catalog.=) 1)
event_id | user_id | response
---------------------------------------------------------------------
(0 rows)
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
ERROR: The transaction is not a coordinated transaction
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
count
---------------------------------------------------------------------
1
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
13
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
event_id | user_id | response
---------------------------------------------------------------------
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
2 | 2 | yes
(9 rows)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
9
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
17
(1 row)
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
event_id | user_id | response
---------------------------------------------------------------------
1 | 2 | yes
(1 row)
SELECT count(*) FROM event_responses_no_pkey;
count
---------------------------------------------------------------------
20
(1 row)
SELECT coordinated_transaction_should_use_2PC();
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
f
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
f
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
bool_or
---------------------------------------------------------------------
t
(1 row)
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
1
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
count
---------------------------------------------------------------------
10
(1 row)
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
count
---------------------------------------------------------------------
0
(1 row)
SELECT recover_prepared_transactions();
recover_prepared_transactions
---------------------------------------------------------------------
0
(1 row)
\c - - - :master_port \c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the -- verify the local_hostname guc is used for local executions that should connect to the
-- local host -- local host

View File

@ -114,7 +114,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
(2 rows) (2 rows)
-- changing application_name reveals the shards -- changing application_name reveals the shards
SET application_name TO ''; SET application_name TO 'pg_regress';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname relname
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -137,7 +137,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- changing application_name in transaction reveals the shards -- changing application_name in transaction reveals the shards
BEGIN; BEGIN;
SET LOCAL application_name TO ''; SET LOCAL application_name TO 'pg_regress';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname relname
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -160,7 +160,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- now with session-level GUC, but ROLLBACK; -- now with session-level GUC, but ROLLBACK;
BEGIN; BEGIN;
SET application_name TO ''; SET application_name TO 'pg_regress';
ROLLBACK; ROLLBACK;
-- shards are hidden again after GUCs are reset -- shards are hidden again after GUCs are reset
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
@ -173,7 +173,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- we should hide correctly based on application_name with savepoints -- we should hide correctly based on application_name with savepoints
BEGIN; BEGIN;
SAVEPOINT s1; SAVEPOINT s1;
SET application_name TO ''; SET application_name TO 'pg_regress';
-- changing application_name reveals the shards -- changing application_name reveals the shards
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname relname
@ -196,9 +196,9 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
(2 rows) (2 rows)
ROLLBACK; ROLLBACK;
-- changing citus.hide_shards_from_app_name_prefixes reveals the shards -- changing citus.show_shards_for_app_name_prefix reveals the shards
BEGIN; BEGIN;
SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
relname relname
--------------------------------------------------------------------- ---------------------------------------------------------------------

View File

@ -978,6 +978,12 @@ ORDER BY shardid ASC;
(0 rows) (0 rows)
\c - - - :master_port \c - - - :master_port
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT citus_disable_node('localhost', :worker_2_port); SELECT citus_disable_node('localhost', :worker_2_port);
citus_disable_node citus_disable_node
--------------------------------------------------------------------- ---------------------------------------------------------------------
@ -997,6 +1003,19 @@ SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
1 1
(1 row) (1 row)
-- never mark coordinator metadatasynced = false
SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port;
hasmetadata | metadatasynced
---------------------------------------------------------------------
t | t
(1 row)
SELECT 1 FROM citus_remove_node('localhost', :master_port);
?column?
---------------------------------------------------------------------
1
(1 row)
SELECT SELECT
shardid, shardstate, shardlength, nodename, nodeport shardid, shardstate, shardlength, nodename, nodeport
FROM FROM

View File

@ -2231,15 +2231,6 @@ NOTICE: executing the command locally: UPDATE single_node.another_schema_table_
(1 row) (1 row)
ROLLBACK; ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
NOTICE: executing the command locally: WITH cte_1 AS (UPDATE single_node.another_schema_table_90630515 another_schema_table SET b = (another_schema_table.b OPERATOR(pg_catalog.+) 1) WHERE (another_schema_table.a OPERATOR(pg_catalog.=) 1) RETURNING another_schema_table.a, another_schema_table.b) SELECT single_node.coordinated_transaction_should_use_2pc() AS coordinated_transaction_should_use_2pc FROM cte_1
coordinated_transaction_should_use_2pc
---------------------------------------------------------------------
t
(1 row)
-- if the local execution is disabled, we cannot failover to -- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail -- local execution and the queries would fail
SET citus.enable_local_execution TO false; SET citus.enable_local_execution TO false;

View File

@ -43,7 +43,9 @@ test: coordinator_evaluation_modify
test: coordinator_evaluation_select test: coordinator_evaluation_select
test: multi_mx_call test: multi_mx_call
test: multi_mx_function_call_delegation test: multi_mx_function_call_delegation
test: multi_mx_modifications local_shard_execution local_shard_execution_replicated test: multi_mx_modifications local_shard_execution_replicated
# the following test has to be run sequentially
test: local_shard_execution
test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2 test: multi_mx_repartition_udt_w1 multi_mx_repartition_udt_w2
test: local_shard_copy test: local_shard_copy
test: undistribute_table_cascade_mx test: undistribute_table_cascade_mx

View File

@ -466,7 +466,7 @@ push(@pgOptions, "citus.explain_analyze_sort_method='taskId'");
push(@pgOptions, "citus.enable_manual_changes_to_shards=on"); push(@pgOptions, "citus.enable_manual_changes_to_shards=on");
# Some tests look at shards in pg_class, make sure we can usually see them: # Some tests look at shards in pg_class, make sure we can usually see them:
push(@pgOptions, "citus.hide_shards_from_app_name_prefixes='psql,pg_dump'"); push(@pgOptions, "citus.show_shards_for_app_name_prefixes='pg_regress'");
# we disable slow start by default to encourage parallelism within tests # we disable slow start by default to encourage parallelism within tests
push(@pgOptions, "citus.executor_slow_start_interval=0ms"); push(@pgOptions, "citus.executor_slow_start_interval=0ms");

View File

@ -928,6 +928,17 @@ SELECT create_distributed_table('event_responses', 'event_id');
INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no'); INSERT INTO event_responses VALUES (1, 1, 'yes'), (2, 2, 'yes'), (3, 3, 'no'), (4, 4, 'no');
CREATE TABLE event_responses_no_pkey (
event_id int,
user_id int,
response invite_resp
);
SELECT create_distributed_table('event_responses_no_pkey', 'event_id');
CREATE OR REPLACE FUNCTION regular_func(p invite_resp) CREATE OR REPLACE FUNCTION regular_func(p invite_resp)
RETURNS int AS $$ RETURNS int AS $$
DECLARE DECLARE
@ -1120,6 +1131,281 @@ INSERT INTO event_responses VALUES (16, 666, 'maybe'), (17, 777, 'no')
ON CONFLICT (event_id, user_id) ON CONFLICT (event_id, user_id)
DO UPDATE SET response = EXCLUDED.response RETURNING *; DO UPDATE SET response = EXCLUDED.response RETURNING *;
-- set back to sane settings
RESET citus.enable_local_execution;
RESET citus.enable_fast_path_router_planner;
-- we'll test some 2PC states
SET citus.enable_metadata_sync TO OFF;
-- coordinated_transaction_should_use_2PC prints the internal
-- state for 2PC decision on Citus. However, even if 2PC is decided,
-- we may not necessarily use 2PC over a connection unless it does
-- a modification
CREATE OR REPLACE FUNCTION coordinated_transaction_should_use_2PC()
RETURNS BOOL LANGUAGE C STRICT VOLATILE AS 'citus',
$$coordinated_transaction_should_use_2PC$$;
-- make tests consistent
SET citus.max_adaptive_executor_pool_size TO 1;
RESET citus.enable_metadata_sync;
SELECT recover_prepared_transactions();
SET citus.log_remote_commands TO ON;
-- we use event_id = 2 for local execution and event_id = 1 for reemote execution
--show it here, if anything changes here, all the tests below might be broken
-- we prefer this to avoid excessive logging below
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
SELECT * FROM event_responses_no_pkey WHERE event_id = 1;
RESET citus.log_remote_commands;
RESET citus.log_local_commands;
RESET client_min_messages;
-- single shard local command without transaction block does set the
-- internal state for 2PC, but does not require any actual entries
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local commands without transaction block set the internal 2PC state
-- but does not use remotely
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by another single shard
-- local modification sets the 2PC state, but does not use remotely
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC because multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by a single shard
-- remote modification uses 2PC even if it is not in an explicit
-- tx block as multiple nodes involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC as multiple nodes involved
-- in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local modification uses 2PC even if it is not in an explicit
-- tx block
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2)
SELECT count(*) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two local SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote SELECT does not require to
-- use actual 2PC
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT count(*) FROM event_responses_no_pkey;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT * FROM event_responses_no_pkey WHERE event_id = 2;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a single shard
-- local SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT * FROM event_responses_no_pkey WHERE event_id = 2)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard local SELECT command without transaction block does not set the
-- internal state for 2PC
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- two multi-shard SELECT commands without transaction block does not set the internal 2PC state
-- and does not use remotely
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi-shard shard SELECT followed by a single shard
-- remote modification does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- multi shard SELECT followed by a single shard
-- remote single shard modification does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (SELECT count(*) FROM event_responses_no_pkey),
cte_2 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
BEGIN;
INSERT INTO event_responses_no_pkey VALUES (1, 2, 'yes') RETURNING *;
SELECT count(*) FROM event_responses_no_pkey;
SELECT coordinated_transaction_should_use_2PC();
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard remote modification followed by a multi shard
-- SELECT does not use 2PC, because only a single
-- machine involved in the modification
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (1, 1, 'yes') RETURNING *),
cte_2 AS (SELECT count(*) FROM event_responses_no_pkey)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- single shard local modification followed by remote multi-shard
-- modification uses 2PC as multiple nodes are involved in modifications
WITH cte_1 AS (INSERT INTO event_responses_no_pkey VALUES (2, 2, 'yes') RETURNING *),
cte_2 AS (UPDATE event_responses_no_pkey SET user_id = 1000 RETURNING *)
SELECT bool_or(coordinated_transaction_should_use_2PC()) FROM cte_1, cte_2;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote multi-shard UPDATE requires to
-- use actual 2PC as multiple nodes are involved in modifications
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a local SELECT followed by a remote single-shard UPDATE does not require to
-- use actual 2PC. This is because a single node is involved in modification
BEGIN;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
-- a remote single-shard UPDATE followed by a local single shard SELECT
-- does not require to use actual 2PC. This is because a single node
-- is involved in modification
BEGIN;
UPDATE event_responses_no_pkey SET user_id = 1 WHERE event_id = 1;
SELECT count(*) FROM event_responses_no_pkey WHERE event_id = 2;
COMMIT;
SELECT count(*) FROM pg_dist_transaction;
SELECT recover_prepared_transactions();
\c - - - :master_port \c - - - :master_port
-- verify the local_hostname guc is used for local executions that should connect to the -- verify the local_hostname guc is used for local executions that should connect to the

View File

@ -67,7 +67,7 @@ SELECT * FROM citus_shard_indexes_on_worker WHERE "Schema" = 'mx_hide_shard_name
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
-- changing application_name reveals the shards -- changing application_name reveals the shards
SET application_name TO ''; SET application_name TO 'pg_regress';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
RESET application_name; RESET application_name;
@ -76,7 +76,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- changing application_name in transaction reveals the shards -- changing application_name in transaction reveals the shards
BEGIN; BEGIN;
SET LOCAL application_name TO ''; SET LOCAL application_name TO 'pg_regress';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
ROLLBACK; ROLLBACK;
@ -85,7 +85,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- now with session-level GUC, but ROLLBACK; -- now with session-level GUC, but ROLLBACK;
BEGIN; BEGIN;
SET application_name TO ''; SET application_name TO 'pg_regress';
ROLLBACK; ROLLBACK;
-- shards are hidden again after GUCs are reset -- shards are hidden again after GUCs are reset
@ -94,7 +94,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
-- we should hide correctly based on application_name with savepoints -- we should hide correctly based on application_name with savepoints
BEGIN; BEGIN;
SAVEPOINT s1; SAVEPOINT s1;
SET application_name TO ''; SET application_name TO 'pg_regress';
-- changing application_name reveals the shards -- changing application_name reveals the shards
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
ROLLBACK TO SAVEPOINT s1; ROLLBACK TO SAVEPOINT s1;
@ -102,9 +102,9 @@ ROLLBACK TO SAVEPOINT s1;
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
ROLLBACK; ROLLBACK;
-- changing citus.hide_shards_from_app_name_prefixes reveals the shards -- changing citus.show_shards_for_app_name_prefix reveals the shards
BEGIN; BEGIN;
SET LOCAL citus.hide_shards_from_app_name_prefixes TO 'notpsql'; SET LOCAL citus.show_shards_for_app_name_prefixes TO 'psql';
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname; SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
ROLLBACK; ROLLBACK;

View File

@ -580,13 +580,19 @@ WHERE
ORDER BY shardid ASC; ORDER BY shardid ASC;
\c - - - :master_port \c - - - :master_port
SELECT 1 FROM citus_set_coordinator_host('localhost', :master_port);
SELECT citus_disable_node('localhost', :worker_2_port); SELECT citus_disable_node('localhost', :worker_2_port);
SELECT public.wait_until_metadata_sync(); SELECT public.wait_until_metadata_sync();
-- status after citus_disable_node_and_wait -- status after citus_disable_node_and_wait
SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port; SELECT COUNT(*) FROM pg_dist_node WHERE nodeport = :worker_2_port;
-- never mark coordinator metadatasynced = false
SELECT hasmetadata, metadatasynced FROM pg_dist_node WHERE nodeport = :master_port;
SELECT 1 FROM citus_remove_node('localhost', :master_port);
SELECT SELECT
shardid, shardstate, shardlength, nodename, nodeport shardid, shardstate, shardlength, nodename, nodeport
FROM FROM

View File

@ -1102,10 +1102,6 @@ BEGIN;
SELECT coordinated_transaction_should_use_2PC(); SELECT coordinated_transaction_should_use_2PC();
ROLLBACK; ROLLBACK;
-- same without transaction block
WITH cte_1 AS (UPDATE another_schema_table SET b = b + 1 WHERE a = 1 RETURNING *)
SELECT coordinated_transaction_should_use_2PC() FROM cte_1;
-- if the local execution is disabled, we cannot failover to -- if the local execution is disabled, we cannot failover to
-- local execution and the queries would fail -- local execution and the queries would fail
SET citus.enable_local_execution TO false; SET citus.enable_local_execution TO false;