Merge branch 'master' into velioglu/table_wo_seq_prototype

velioglu/wo_seq_test_1
Burak Velioglu 2022-01-24 23:28:54 +03:00
commit 8a3e92c569
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
8 changed files with 151 additions and 41 deletions

View File

@ -1028,12 +1028,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams)
* then we should error out as it would cause inconsistencies across the * then we should error out as it would cause inconsistencies across the
* remote connection and local execution. * remote connection and local execution.
*/ */
List *remoteTaskList = execution->remoteTaskList; EnsureCompatibleLocalExecutionState(execution->remoteTaskList);
if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED &&
AnyTaskAccessesLocalNode(remoteTaskList))
{
ErrorIfTransactionAccessedPlacementsLocally();
}
/* run the remote execution */ /* run the remote execution */
StartDistributedExecution(execution); StartDistributedExecution(execution);

View File

@ -50,7 +50,6 @@ static void TraverseJobTree(Job *curJob, List **jobs);
static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner);
static char * GenerateJobCommands(List *jobIds, char *templateCommand); static char * GenerateJobCommands(List *jobIds, char *templateCommand);
static char * GenerateDeleteJobsCommand(List *jobIds); static char * GenerateDeleteJobsCommand(List *jobIds);
static void EnsureCompatibleLocalExecutionState(List *taskList);
/* /*
@ -79,7 +78,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob)
* EnsureCompatibleLocalExecutionState makes sure that the tasks won't have * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have
* any visibility problems because of local execution. * any visibility problems because of local execution.
*/ */
static void void
EnsureCompatibleLocalExecutionState(List *taskList) EnsureCompatibleLocalExecutionState(List *taskList)
{ {
/* /*

View File

@ -95,12 +95,16 @@ static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnE
static void DropMetadataSnapshotOnNode(WorkerNode *workerNode); static void DropMetadataSnapshotOnNode(WorkerNode *workerNode);
static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId,
char *columnName); char *columnName);
static GrantStmt * GenerateGrantStmtForRights(ObjectType objectType,
Oid roleOid,
Oid objectId,
char *permission,
bool withGrantOption);
static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId);
static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission);
static RoleSpec * GetRoleSpecObjectForGrantStmt(Oid roleOid);
static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid,
AclItem *aclItem); AclItem *aclItem);
static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
Oid schemaOid,
char *permission,
bool withGrantOption);
static void SetLocalEnableDependencyCreation(bool state); static void SetLocalEnableDependencyCreation(bool state);
static char * GenerateSetRoleQuery(Oid roleOid); static char * GenerateSetRoleQuery(Oid roleOid);
static void MetadataSyncSigTermHandler(SIGNAL_ARGS); static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
@ -1734,16 +1738,16 @@ GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem)
if (permissions & ACL_USAGE) if (permissions & ACL_USAGE)
{ {
char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights( char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights(
granteeOid, schemaOid, "USAGE", grants & OBJECT_SCHEMA, granteeOid, schemaOid, "USAGE",
ACL_USAGE)); grants & ACL_USAGE));
queries = lappend(queries, query); queries = lappend(queries, query);
} }
if (permissions & ACL_CREATE) if (permissions & ACL_CREATE)
{ {
char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights( char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights(
granteeOid, schemaOid, "CREATE", grants & OBJECT_SCHEMA, granteeOid, schemaOid, "CREATE",
ACL_CREATE)); grants & ACL_CREATE));
queries = lappend(queries, query); queries = lappend(queries, query);
} }
@ -1753,30 +1757,88 @@ GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem)
} }
GrantStmt * /*
GenerateGrantOnSchemaStmtForRights(Oid roleOid, * GenerateGrantStmtForRights is the function for creating GrantStmt's for all
Oid schemaOid, * types of objects that are supported. It takes parameters to fill a GrantStmt's
char *permission, * fields and returns the GrantStmt.
bool withGrantOption) * The field `objects` of GrantStmt doesn't have a common structure for all types.
* Make sure you have added your object type to GetObjectsForGrantStmt.
*/
static GrantStmt *
GenerateGrantStmtForRights(ObjectType objectType,
Oid roleOid,
Oid objectId,
char *permission,
bool withGrantOption)
{
GrantStmt *stmt = makeNode(GrantStmt);
stmt->is_grant = true;
stmt->targtype = ACL_TARGET_OBJECT;
stmt->objtype = objectType;
stmt->objects = GetObjectsForGrantStmt(objectType, objectId);
stmt->privileges = list_make1(GetAccessPrivObjectForGrantStmt(permission));
stmt->grantees = list_make1(GetRoleSpecObjectForGrantStmt(roleOid));
stmt->grant_option = withGrantOption;
return stmt;
}
/*
* GetObjectsForGrantStmt takes an object type and object id and returns the 'objects'
* field to be used when creating GrantStmt. We have only one object here (the one with
* the oid = objectId) but we pass it into the GrantStmt as a list with one element,
* as GrantStmt->objects field is actually a list.
*/
static List *
GetObjectsForGrantStmt(ObjectType objectType, Oid objectId)
{
switch (objectType)
{
/* supported object types */
case OBJECT_SCHEMA:
{
return list_make1(makeString(get_namespace_name(objectId)));
}
default:
{
elog(ERROR, "unsupported object type for GRANT");
}
}
return NIL;
}
/*
* GetAccessPrivObjectForGrantStmt creates an AccessPriv object for the given permission.
* It will be used when creating GrantStmt objects.
*/
static AccessPriv *
GetAccessPrivObjectForGrantStmt(char *permission)
{ {
AccessPriv *accessPriv = makeNode(AccessPriv); AccessPriv *accessPriv = makeNode(AccessPriv);
accessPriv->priv_name = permission; accessPriv->priv_name = pstrdup(permission);
accessPriv->cols = NULL; accessPriv->cols = NULL;
return accessPriv;
}
/*
* GetRoleSpecObjectForGrantStmt creates a RoleSpec object for the given roleOid.
* It will be used when creating GrantStmt objects.
*/
static RoleSpec *
GetRoleSpecObjectForGrantStmt(Oid roleOid)
{
RoleSpec *roleSpec = makeNode(RoleSpec); RoleSpec *roleSpec = makeNode(RoleSpec);
roleSpec->roletype = OidIsValid(roleOid) ? ROLESPEC_CSTRING : ROLESPEC_PUBLIC; roleSpec->roletype = OidIsValid(roleOid) ? ROLESPEC_CSTRING : ROLESPEC_PUBLIC;
roleSpec->rolename = OidIsValid(roleOid) ? GetUserNameFromId(roleOid, false) : NULL; roleSpec->rolename = OidIsValid(roleOid) ? GetUserNameFromId(roleOid, false) : NULL;
roleSpec->location = -1; roleSpec->location = -1;
GrantStmt *stmt = makeNode(GrantStmt); return roleSpec;
stmt->is_grant = true;
stmt->targtype = ACL_TARGET_OBJECT;
stmt->objtype = OBJECT_SCHEMA;
stmt->objects = list_make1(makeString(get_namespace_name(schemaOid)));
stmt->privileges = list_make1(accessPriv);
stmt->grantees = list_make1(roleSpec);
stmt->grant_option = withGrantOption;
return stmt;
} }
@ -2953,7 +3015,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
CheckCitusVersion(ERROR); CheckCitusVersion(ERROR);
Oid relationId = PG_GETARG_OID(0); Oid relationId = PG_GETARG_OID(0);
uint32 tagetColocationId = PG_GETARG_UINT32(1); uint32 targetColocationId = PG_GETARG_UINT32(1);
EnsureTableOwner(relationId); EnsureTableOwner(relationId);
@ -2982,7 +3044,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
int count = 1; int count = 1;
List *targetColocatedTableList = List *targetColocatedTableList =
ColocationGroupTableList(tagetColocationId, count); ColocationGroupTableList(targetColocationId, count);
if (list_length(targetColocatedTableList) == 0) if (list_length(targetColocatedTableList) == 0)
{ {
@ -2999,7 +3061,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
} }
bool localOnly = true; bool localOnly = true;
UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly); UpdateRelationColocationGroup(relationId, targetColocationId, localOnly);
PG_RETURN_VOID(); PG_RETURN_VOID();
} }

View File

@ -446,8 +446,16 @@ multi_log_hook(ErrorData *edata)
MyBackendGotCancelledDueToDeadlock(clearState)) MyBackendGotCancelledDueToDeadlock(clearState))
{ {
edata->sqlerrcode = ERRCODE_T_R_DEADLOCK_DETECTED; edata->sqlerrcode = ERRCODE_T_R_DEADLOCK_DETECTED;
edata->message = "canceling the transaction since it was "
"involved in a distributed deadlock"; /*
* This hook is called by EmitErrorReport() when emitting the ereport
* either to frontend or to the server logs. And some callers of
* EmitErrorReport() (e.g.: errfinish()) seems to assume that string
* fields of given ErrorData object needs to be freed. For this reason,
* we copy the message into heap here.
*/
edata->message = pstrdup("canceling the transaction since it was "
"involved in a distributed deadlock");
} }
} }

View File

@ -13,6 +13,7 @@
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob); extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob);
extern void EnsureCompatibleLocalExecutionState(List *taskList);
extern void DoRepartitionCleanup(List *jobIds); extern void DoRepartitionCleanup(List *jobIds);

View File

@ -54,6 +54,16 @@ CREATE FUNCTION find_shard_interval_index(bigint)
RETURNS int RETURNS int
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;
-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);
select 1 from run_command_on_workers($$
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$);
?column?
---------------------------------------------------------------------
1
1
(2 rows)
-- =================================================================== -- ===================================================================
-- test co-location util functions -- test co-location util functions
-- =================================================================== -- ===================================================================
@ -960,6 +970,33 @@ SELECT update_distributed_table_colocation('table1_group_none', colocate_with =>
(1 row) (1 row)
-- sync metadata to get rid of inconsistencies in pg_dist tables
select stop_metadata_sync_to_node('localhost', :worker_1_port);
NOTICE: dropping metadata on the node (localhost,57637)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
select stop_metadata_sync_to_node('localhost', :worker_2_port);
NOTICE: dropping metadata on the node (localhost,57638)
stop_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
select start_metadata_sync_to_node('localhost', :worker_1_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
select start_metadata_sync_to_node('localhost', :worker_2_port);
start_metadata_sync_to_node
---------------------------------------------------------------------
(1 row)
-- move a table with a colocation id which is already not in pg_dist_colocation -- move a table with a colocation id which is already not in pg_dist_colocation
SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none'); SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none');
update_distributed_table_colocation update_distributed_table_colocation

View File

@ -249,10 +249,7 @@ test: multi_truncate
# multi_colocation_utils tests utility functions written for co-location feature & internal API # multi_colocation_utils tests utility functions written for co-location feature & internal API
# multi_colocated_shard_transfer tests master_copy_shard_placement with colocated tables. # multi_colocated_shard_transfer tests master_copy_shard_placement with colocated tables.
# ---------- # ----------
test: check_mx
test: turn_mx_off
test: multi_colocation_utils test: multi_colocation_utils
test: turn_mx_on
test: multi_colocated_shard_transfer test: multi_colocated_shard_transfer
# ---------- # ----------

View File

@ -65,6 +65,11 @@ CREATE FUNCTION find_shard_interval_index(bigint)
AS 'citus' AS 'citus'
LANGUAGE C STRICT; LANGUAGE C STRICT;
-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);
select 1 from run_command_on_workers($$
delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$);
-- =================================================================== -- ===================================================================
-- test co-location util functions -- test co-location util functions
-- =================================================================== -- ===================================================================
@ -405,6 +410,12 @@ SELECT update_distributed_table_colocation('table1_group_none', colocate_with =>
SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_groupE'); SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_groupE');
SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table3_groupE'); SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table3_groupE');
-- sync metadata to get rid of inconsistencies in pg_dist tables
select stop_metadata_sync_to_node('localhost', :worker_1_port);
select stop_metadata_sync_to_node('localhost', :worker_2_port);
select start_metadata_sync_to_node('localhost', :worker_1_port);
select start_metadata_sync_to_node('localhost', :worker_2_port);
-- move a table with a colocation id which is already not in pg_dist_colocation -- move a table with a colocation id which is already not in pg_dist_colocation
SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none'); SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none');