mirror of https://github.com/citusdata/citus.git
Fix inserting to pg_dist_object for queries from other nodes
parent
00068e07c5
commit
b67560abf2
|
@ -31,6 +31,7 @@
|
||||||
#include "nodes/makefuncs.h"
|
#include "nodes/makefuncs.h"
|
||||||
#include "nodes/pg_list.h"
|
#include "nodes/pg_list.h"
|
||||||
#include "parser/parse_type.h"
|
#include "parser/parse_type.h"
|
||||||
|
#include "postmaster/postmaster.h"
|
||||||
#include "utils/builtins.h"
|
#include "utils/builtins.h"
|
||||||
#include "utils/fmgroids.h"
|
#include "utils/fmgroids.h"
|
||||||
#include "utils/lsyscache.h"
|
#include "utils/lsyscache.h"
|
||||||
|
@ -80,7 +81,14 @@ mark_object_distributed(PG_FUNCTION_ARGS)
|
||||||
Oid objectId = PG_GETARG_OID(2);
|
Oid objectId = PG_GETARG_OID(2);
|
||||||
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
|
ObjectAddress *objectAddress = palloc0(sizeof(ObjectAddress));
|
||||||
ObjectAddressSet(*objectAddress, classId, objectId);
|
ObjectAddressSet(*objectAddress, classId, objectId);
|
||||||
MarkObjectDistributedWithName(objectAddress, objectName);
|
|
||||||
|
/*
|
||||||
|
* This function is called when a query is run from a Citus non-main database.
|
||||||
|
* We need to insert into local pg_dist_object over a connection to make sure
|
||||||
|
* 2PC still works.
|
||||||
|
*/
|
||||||
|
bool useConnectionForLocalQuery = true;
|
||||||
|
MarkObjectDistributedWithName(objectAddress, objectName, useConnectionForLocalQuery);
|
||||||
PG_RETURN_VOID();
|
PG_RETURN_VOID();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -184,7 +192,8 @@ ObjectExists(const ObjectAddress *address)
|
||||||
void
|
void
|
||||||
MarkObjectDistributed(const ObjectAddress *distAddress)
|
MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||||
{
|
{
|
||||||
MarkObjectDistributedWithName(distAddress, "");
|
bool useConnectionForLocalQuery = false;
|
||||||
|
MarkObjectDistributedWithName(distAddress, "", useConnectionForLocalQuery);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -194,13 +203,32 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||||
* that is used in case the object does not exists for the current transaction.
|
* that is used in case the object does not exists for the current transaction.
|
||||||
*/
|
*/
|
||||||
void
|
void
|
||||||
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName)
|
MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *objectName,
|
||||||
|
bool useConnectionForLocalQuery)
|
||||||
{
|
{
|
||||||
if (!CitusHasBeenLoaded())
|
if (!CitusHasBeenLoaded())
|
||||||
{
|
{
|
||||||
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
|
elog(ERROR, "Cannot mark object distributed because Citus has not been loaded.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* When a query is run from a Citus non-main database we need to insert into pg_dist_object
|
||||||
|
* over a connection to make sure 2PC still works.
|
||||||
|
*/
|
||||||
|
if (useConnectionForLocalQuery)
|
||||||
|
{
|
||||||
|
StringInfo insertQuery = makeStringInfo();
|
||||||
|
appendStringInfo(insertQuery,
|
||||||
|
"INSERT INTO pg_catalog.pg_dist_object (classid, objid, objsubid)"
|
||||||
|
"VALUES (%d, %d, %d) ON CONFLICT DO NOTHING",
|
||||||
|
distAddress->classId, distAddress->objectId,
|
||||||
|
distAddress->objectSubId);
|
||||||
|
SendCommandToWorker(LocalHostName, PostPortNumber, insertQuery->data);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
MarkObjectDistributedLocally(distAddress);
|
MarkObjectDistributedLocally(distAddress);
|
||||||
|
}
|
||||||
|
|
||||||
if (EnableMetadataSync)
|
if (EnableMetadataSync)
|
||||||
{
|
{
|
||||||
|
|
|
@ -23,7 +23,8 @@ extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
||||||
extern bool IsAnyObjectDistributed(const List *addresses);
|
extern bool IsAnyObjectDistributed(const List *addresses);
|
||||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name);
|
extern void MarkObjectDistributedWithName(const ObjectAddress *distAddress, char *name,
|
||||||
|
bool useConnectionForLocalQuery);
|
||||||
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedViaSuperUser(const ObjectAddress *distAddress);
|
||||||
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
extern void MarkObjectDistributedLocally(const ObjectAddress *distAddress);
|
||||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||||
|
|
|
@ -5,6 +5,9 @@ def test_main_commited_outer_not_yet(cluster):
|
||||||
# create a non-main database
|
# create a non-main database
|
||||||
c.sql("CREATE DATABASE db1")
|
c.sql("CREATE DATABASE db1")
|
||||||
|
|
||||||
|
c.sql("ALTER SYSTEM SET citus.local_hostname TO '127.0.0.1'")
|
||||||
|
c.sql("SELECT pg_reload_conf()")
|
||||||
|
|
||||||
# we will use cur1 to simulate non-main database user and
|
# we will use cur1 to simulate non-main database user and
|
||||||
# cur2 to manually do the steps we would do in the main database
|
# cur2 to manually do the steps we would do in the main database
|
||||||
with c.cur(dbname="db1") as cur1, c.cur() as cur2:
|
with c.cur(dbname="db1") as cur1, c.cur() as cur2:
|
||||||
|
@ -36,14 +39,25 @@ def test_main_commited_outer_not_yet(cluster):
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
int(role_before_commit) == 0
|
int(role_before_commit) == 0
|
||||||
), "role is on pg_dist_object despite not committing"
|
), "role is in pg_dist_object despite not committing"
|
||||||
|
|
||||||
|
# user should not be in pg_dist_object on the coordinator because outer transaction is not committed yet
|
||||||
|
pdo_coordinator_before_commit = c.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
int(pdo_coordinator_before_commit) == 0
|
||||||
|
), "role is in pg_dist_object on coordinator despite not committing"
|
||||||
|
|
||||||
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
|
# user should not be in pg_dist_object on the worker because outer transaction is not committed yet
|
||||||
pdo_before_commit = w0.sql_value(
|
pdo_worker_before_commit = w0.sql_value(
|
||||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert int(pdo_before_commit) == 0, "role is created despite not committing"
|
assert (
|
||||||
|
int(pdo_worker_before_commit) == 0
|
||||||
|
), "role is in pg_dist_object on worker despite not committing"
|
||||||
|
|
||||||
# commit in cur1 so the transaction recovery thinks this is a successful transaction
|
# commit in cur1 so the transaction recovery thinks this is a successful transaction
|
||||||
cur1.execute("COMMIT")
|
cur1.execute("COMMIT")
|
||||||
|
@ -60,14 +74,23 @@ def test_main_commited_outer_not_yet(cluster):
|
||||||
int(role_after_commit) == 1
|
int(role_after_commit) == 1
|
||||||
), "role is not created during recovery despite committing"
|
), "role is not created during recovery despite committing"
|
||||||
|
|
||||||
# check that the user is on pg_dist_object on the worker after transaction recovery
|
# check that the user is in pg_dist_object on the coordinator after transaction recovery
|
||||||
pdo_after_commit = w0.sql_value(
|
pdo_coordinator_after_commit = c.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_dist_object WHERE objid = 123123"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
int(pdo_coordinator_after_commit) == 1
|
||||||
|
), "role is not in pg_dist_object on coordinator after recovery despite committing"
|
||||||
|
|
||||||
|
# check that the user is in pg_dist_object on the worker after transaction recovery
|
||||||
|
pdo_worker_after_commit = w0.sql_value(
|
||||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u1'"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
int(pdo_after_commit) == 1
|
int(pdo_worker_after_commit) == 1
|
||||||
), "role is not on pg_dist_object after recovery despite committing"
|
), "role is not in pg_dist_object on worker after recovery despite committing"
|
||||||
|
|
||||||
c.sql("DROP DATABASE db1")
|
c.sql("DROP DATABASE db1")
|
||||||
c.sql(
|
c.sql(
|
||||||
|
@ -81,6 +104,12 @@ def test_main_commited_outer_not_yet(cluster):
|
||||||
$$)
|
$$)
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
c.sql(
|
||||||
|
"""
|
||||||
|
DELETE FROM pg_dist_object
|
||||||
|
WHERE objid = 123123
|
||||||
|
"""
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def test_main_commited_outer_aborted(cluster):
|
def test_main_commited_outer_aborted(cluster):
|
||||||
|
@ -90,6 +119,8 @@ def test_main_commited_outer_aborted(cluster):
|
||||||
# create a non-main database
|
# create a non-main database
|
||||||
c.sql("CREATE DATABASE db2")
|
c.sql("CREATE DATABASE db2")
|
||||||
|
|
||||||
|
c.sql("SELECT citus_set_coordinator_host('localhost')")
|
||||||
|
|
||||||
# we will use cur1 to simulate non-main database user and
|
# we will use cur1 to simulate non-main database user and
|
||||||
# cur2 to manually do the steps we would do in the main database
|
# cur2 to manually do the steps we would do in the main database
|
||||||
with c.cur(dbname="db2") as cur1, c.cur() as cur2:
|
with c.cur(dbname="db2") as cur1, c.cur() as cur2:
|
||||||
|
@ -121,14 +152,23 @@ def test_main_commited_outer_aborted(cluster):
|
||||||
|
|
||||||
assert int(role_before_recovery) == 0, "role is already created before recovery"
|
assert int(role_before_recovery) == 0, "role is already created before recovery"
|
||||||
|
|
||||||
# check that the user is not on pg_dist_object on the worker
|
# check that the user is not in pg_dist_object on the coordinator
|
||||||
pdo_before_recovery = w0.sql_value(
|
pdo_coordinator_before_recovery = c.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
int(pdo_coordinator_before_recovery) == 0
|
||||||
|
), "role is already in pg_dist_object on coordinator before recovery"
|
||||||
|
|
||||||
|
# check that the user is not in pg_dist_object on the worker
|
||||||
|
pdo_worker_before_recovery = w0.sql_value(
|
||||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
int(pdo_before_recovery) == 0
|
int(pdo_worker_before_recovery) == 0
|
||||||
), "role is already on pg_dist_object before recovery"
|
), "role is already in pg_dist_object on worker before recovery"
|
||||||
|
|
||||||
# run the transaction recovery
|
# run the transaction recovery
|
||||||
c.sql("SELECT recover_prepared_transactions()")
|
c.sql("SELECT recover_prepared_transactions()")
|
||||||
|
@ -142,13 +182,22 @@ def test_main_commited_outer_aborted(cluster):
|
||||||
int(role_after_recovery) == 0
|
int(role_after_recovery) == 0
|
||||||
), "role is created during recovery despite aborting"
|
), "role is created during recovery despite aborting"
|
||||||
|
|
||||||
# check that the user is not on pg_dist_object on the worker after transaction recovery
|
# check that the user is not in pg_dist_object on the coordinator after transaction recovery
|
||||||
pdo_after_recovery = w0.sql_value(
|
pdo_coordinator_after_recovery = c.sql_value(
|
||||||
|
"SELECT count(*) FROM pg_dist_object WHERE objid = 321321"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
int(pdo_coordinator_after_recovery) == 0
|
||||||
|
), "role is in pg_dist_object on coordinator after recovery despite aborting"
|
||||||
|
|
||||||
|
# check that the user is not in pg_dist_object on the worker after transaction recovery
|
||||||
|
pdo_worker_after_recovery = w0.sql_value(
|
||||||
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
"SELECT count(*) FROM pg_dist_object WHERE objid::regrole::text = 'u2'"
|
||||||
)
|
)
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
int(pdo_after_recovery) == 0
|
int(pdo_worker_after_recovery) == 0
|
||||||
), "role is on pg_dist_object after recovery despite aborting"
|
), "role is in pg_dist_object on worker after recovery despite aborting"
|
||||||
|
|
||||||
c.sql("DROP DATABASE db2")
|
c.sql("DROP DATABASE db2")
|
||||||
|
|
Loading…
Reference in New Issue