diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index ae4db9b49..6161b7d70 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1028,12 +1028,7 @@ ExecuteTaskListExtended(ExecutionParams *executionParams) * then we should error out as it would cause inconsistencies across the * remote connection and local execution. */ - List *remoteTaskList = execution->remoteTaskList; - if (GetCurrentLocalExecutionStatus() == LOCAL_EXECUTION_REQUIRED && - AnyTaskAccessesLocalNode(remoteTaskList)) - { - ErrorIfTransactionAccessedPlacementsLocally(); - } + EnsureCompatibleLocalExecutionState(execution->remoteTaskList); /* run the remote execution */ StartDistributedExecution(execution); diff --git a/src/backend/distributed/executor/repartition_join_execution.c b/src/backend/distributed/executor/repartition_join_execution.c index ae55b14b3..dfe8efd9e 100644 --- a/src/backend/distributed/executor/repartition_join_execution.c +++ b/src/backend/distributed/executor/repartition_join_execution.c @@ -50,7 +50,6 @@ static void TraverseJobTree(Job *curJob, List **jobs); static char * GenerateCreateSchemasCommand(List *jobIds, char *schemaOwner); static char * GenerateJobCommands(List *jobIds, char *templateCommand); static char * GenerateDeleteJobsCommand(List *jobIds); -static void EnsureCompatibleLocalExecutionState(List *taskList); /* @@ -79,7 +78,7 @@ ExecuteDependentTasks(List *topLevelTasks, Job *topLevelJob) * EnsureCompatibleLocalExecutionState makes sure that the tasks won't have * any visibility problems because of local execution. */ -static void +void EnsureCompatibleLocalExecutionState(List *taskList) { /* diff --git a/src/backend/distributed/metadata/metadata_sync.c b/src/backend/distributed/metadata/metadata_sync.c index d47a0c717..29d7aa962 100644 --- a/src/backend/distributed/metadata/metadata_sync.c +++ b/src/backend/distributed/metadata/metadata_sync.c @@ -95,12 +95,16 @@ static bool SyncNodeMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnE static void DropMetadataSnapshotOnNode(WorkerNode *workerNode); static char * CreateSequenceDependencyCommand(Oid relationId, Oid sequenceId, char *columnName); +static GrantStmt * GenerateGrantStmtForRights(ObjectType objectType, + Oid roleOid, + Oid objectId, + char *permission, + bool withGrantOption); +static List * GetObjectsForGrantStmt(ObjectType objectType, Oid objectId); +static AccessPriv * GetAccessPrivObjectForGrantStmt(char *permission); +static RoleSpec * GetRoleSpecObjectForGrantStmt(Oid roleOid); static List * GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem); -static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid, - Oid schemaOid, - char *permission, - bool withGrantOption); static void SetLocalEnableDependencyCreation(bool state); static char * GenerateSetRoleQuery(Oid roleOid); static void MetadataSyncSigTermHandler(SIGNAL_ARGS); @@ -1734,16 +1738,16 @@ GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem) if (permissions & ACL_USAGE) { - char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights( - granteeOid, schemaOid, "USAGE", grants & - ACL_USAGE)); + char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( + OBJECT_SCHEMA, granteeOid, schemaOid, "USAGE", + grants & ACL_USAGE)); queries = lappend(queries, query); } if (permissions & ACL_CREATE) { - char *query = DeparseTreeNode((Node *) GenerateGrantOnSchemaStmtForRights( - granteeOid, schemaOid, "CREATE", grants & - ACL_CREATE)); + char *query = DeparseTreeNode((Node *) GenerateGrantStmtForRights( + OBJECT_SCHEMA, granteeOid, schemaOid, "CREATE", + grants & ACL_CREATE)); queries = lappend(queries, query); } @@ -1753,30 +1757,88 @@ GenerateGrantOnSchemaQueriesFromAclItem(Oid schemaOid, AclItem *aclItem) } -GrantStmt * -GenerateGrantOnSchemaStmtForRights(Oid roleOid, - Oid schemaOid, - char *permission, - bool withGrantOption) +/* + * GenerateGrantStmtForRights is the function for creating GrantStmt's for all + * types of objects that are supported. It takes parameters to fill a GrantStmt's + * fields and returns the GrantStmt. + * The field `objects` of GrantStmt doesn't have a common structure for all types. + * Make sure you have added your object type to GetObjectsForGrantStmt. + */ +static GrantStmt * +GenerateGrantStmtForRights(ObjectType objectType, + Oid roleOid, + Oid objectId, + char *permission, + bool withGrantOption) +{ + GrantStmt *stmt = makeNode(GrantStmt); + stmt->is_grant = true; + stmt->targtype = ACL_TARGET_OBJECT; + stmt->objtype = objectType; + stmt->objects = GetObjectsForGrantStmt(objectType, objectId); + stmt->privileges = list_make1(GetAccessPrivObjectForGrantStmt(permission)); + stmt->grantees = list_make1(GetRoleSpecObjectForGrantStmt(roleOid)); + stmt->grant_option = withGrantOption; + + return stmt; +} + + +/* + * GetObjectsForGrantStmt takes an object type and object id and returns the 'objects' + * field to be used when creating GrantStmt. We have only one object here (the one with + * the oid = objectId) but we pass it into the GrantStmt as a list with one element, + * as GrantStmt->objects field is actually a list. + */ +static List * +GetObjectsForGrantStmt(ObjectType objectType, Oid objectId) +{ + switch (objectType) + { + /* supported object types */ + case OBJECT_SCHEMA: + { + return list_make1(makeString(get_namespace_name(objectId))); + } + + default: + { + elog(ERROR, "unsupported object type for GRANT"); + } + } + + return NIL; +} + + +/* + * GetAccessPrivObjectForGrantStmt creates an AccessPriv object for the given permission. + * It will be used when creating GrantStmt objects. + */ +static AccessPriv * +GetAccessPrivObjectForGrantStmt(char *permission) { AccessPriv *accessPriv = makeNode(AccessPriv); - accessPriv->priv_name = permission; + accessPriv->priv_name = pstrdup(permission); accessPriv->cols = NULL; + return accessPriv; +} + + +/* + * GetRoleSpecObjectForGrantStmt creates a RoleSpec object for the given roleOid. + * It will be used when creating GrantStmt objects. + */ +static RoleSpec * +GetRoleSpecObjectForGrantStmt(Oid roleOid) +{ RoleSpec *roleSpec = makeNode(RoleSpec); roleSpec->roletype = OidIsValid(roleOid) ? ROLESPEC_CSTRING : ROLESPEC_PUBLIC; roleSpec->rolename = OidIsValid(roleOid) ? GetUserNameFromId(roleOid, false) : NULL; roleSpec->location = -1; - GrantStmt *stmt = makeNode(GrantStmt); - stmt->is_grant = true; - stmt->targtype = ACL_TARGET_OBJECT; - stmt->objtype = OBJECT_SCHEMA; - stmt->objects = list_make1(makeString(get_namespace_name(schemaOid))); - stmt->privileges = list_make1(accessPriv); - stmt->grantees = list_make1(roleSpec); - stmt->grant_option = withGrantOption; - return stmt; + return roleSpec; } @@ -2953,7 +3015,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) CheckCitusVersion(ERROR); Oid relationId = PG_GETARG_OID(0); - uint32 tagetColocationId = PG_GETARG_UINT32(1); + uint32 targetColocationId = PG_GETARG_UINT32(1); EnsureTableOwner(relationId); @@ -2982,7 +3044,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) int count = 1; List *targetColocatedTableList = - ColocationGroupTableList(tagetColocationId, count); + ColocationGroupTableList(targetColocationId, count); if (list_length(targetColocatedTableList) == 0) { @@ -2999,7 +3061,7 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS) } bool localOnly = true; - UpdateRelationColocationGroup(relationId, tagetColocationId, localOnly); + UpdateRelationColocationGroup(relationId, targetColocationId, localOnly); PG_RETURN_VOID(); } diff --git a/src/backend/distributed/shared_library_init.c b/src/backend/distributed/shared_library_init.c index f74a3f43b..5e3ab85d2 100644 --- a/src/backend/distributed/shared_library_init.c +++ b/src/backend/distributed/shared_library_init.c @@ -446,8 +446,16 @@ multi_log_hook(ErrorData *edata) MyBackendGotCancelledDueToDeadlock(clearState)) { edata->sqlerrcode = ERRCODE_T_R_DEADLOCK_DETECTED; - edata->message = "canceling the transaction since it was " - "involved in a distributed deadlock"; + + /* + * This hook is called by EmitErrorReport() when emitting the ereport + * either to frontend or to the server logs. And some callers of + * EmitErrorReport() (e.g.: errfinish()) seems to assume that string + * fields of given ErrorData object needs to be freed. For this reason, + * we copy the message into heap here. + */ + edata->message = pstrdup("canceling the transaction since it was " + "involved in a distributed deadlock"); } } diff --git a/src/include/distributed/repartition_join_execution.h b/src/include/distributed/repartition_join_execution.h index 784d0d9cf..596dffc0b 100644 --- a/src/include/distributed/repartition_join_execution.h +++ b/src/include/distributed/repartition_join_execution.h @@ -13,6 +13,7 @@ #include "nodes/pg_list.h" extern List * ExecuteDependentTasks(List *taskList, Job *topLevelJob); +extern void EnsureCompatibleLocalExecutionState(List *taskList); extern void DoRepartitionCleanup(List *jobIds); diff --git a/src/test/regress/expected/multi_colocation_utils.out b/src/test/regress/expected/multi_colocation_utils.out index 25ba1b26e..c9f441af6 100644 --- a/src/test/regress/expected/multi_colocation_utils.out +++ b/src/test/regress/expected/multi_colocation_utils.out @@ -54,6 +54,16 @@ CREATE FUNCTION find_shard_interval_index(bigint) RETURNS int AS 'citus' LANGUAGE C STRICT; +-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class +delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid); +select 1 from run_command_on_workers($$ + delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$); + ?column? +--------------------------------------------------------------------- + 1 + 1 +(2 rows) + -- =================================================================== -- test co-location util functions -- =================================================================== @@ -960,6 +970,33 @@ SELECT update_distributed_table_colocation('table1_group_none', colocate_with => (1 row) +-- sync metadata to get rid of inconsistencies in pg_dist tables +select stop_metadata_sync_to_node('localhost', :worker_1_port); +NOTICE: dropping metadata on the node (localhost,57637) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select stop_metadata_sync_to_node('localhost', :worker_2_port); +NOTICE: dropping metadata on the node (localhost,57638) + stop_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_1_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + +select start_metadata_sync_to_node('localhost', :worker_2_port); + start_metadata_sync_to_node +--------------------------------------------------------------------- + +(1 row) + -- move a table with a colocation id which is already not in pg_dist_colocation SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none'); update_distributed_table_colocation diff --git a/src/test/regress/multi_1_schedule b/src/test/regress/multi_1_schedule index db9760a77..e909f8c7a 100644 --- a/src/test/regress/multi_1_schedule +++ b/src/test/regress/multi_1_schedule @@ -249,10 +249,7 @@ test: multi_truncate # multi_colocation_utils tests utility functions written for co-location feature & internal API # multi_colocated_shard_transfer tests master_copy_shard_placement with colocated tables. # ---------- -test: check_mx -test: turn_mx_off test: multi_colocation_utils -test: turn_mx_on test: multi_colocated_shard_transfer # ---------- diff --git a/src/test/regress/sql/multi_colocation_utils.sql b/src/test/regress/sql/multi_colocation_utils.sql index 07c1266b9..3e4f8e441 100644 --- a/src/test/regress/sql/multi_colocation_utils.sql +++ b/src/test/regress/sql/multi_colocation_utils.sql @@ -65,6 +65,11 @@ CREATE FUNCTION find_shard_interval_index(bigint) AS 'citus' LANGUAGE C STRICT; +-- remove tables from pg_dist_partition, if they don't exist i.e not found in pg_class +delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid); +select 1 from run_command_on_workers($$ + delete from pg_dist_partition where not exists(select * from pg_class where pg_class.oid=pg_dist_partition.logicalrelid);$$); + -- =================================================================== -- test co-location util functions -- =================================================================== @@ -405,6 +410,12 @@ SELECT update_distributed_table_colocation('table1_group_none', colocate_with => SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_groupE'); SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table3_groupE'); +-- sync metadata to get rid of inconsistencies in pg_dist tables +select stop_metadata_sync_to_node('localhost', :worker_1_port); +select stop_metadata_sync_to_node('localhost', :worker_2_port); +select start_metadata_sync_to_node('localhost', :worker_1_port); +select start_metadata_sync_to_node('localhost', :worker_2_port); + -- move a table with a colocation id which is already not in pg_dist_colocation SELECT update_distributed_table_colocation('table1_group_none', colocate_with => 'table2_group_none');