mirror of https://github.com/citusdata/citus.git
Propagate CREATE SCHEMA commands with the correct AUTHORIZATION clause in start_metadata_sync_to_node
parent
fb08093b00
commit
b94647c3bc
|
@ -15,9 +15,11 @@
|
||||||
#include "miscadmin.h"
|
#include "miscadmin.h"
|
||||||
|
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
|
#include "access/sysattr.h"
|
||||||
#include "access/xact.h"
|
#include "access/xact.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
#include "catalog/pg_namespace.h"
|
||||||
#include "distributed/citus_nodes.h"
|
#include "distributed/citus_nodes.h"
|
||||||
#include "distributed/master_metadata_utility.h"
|
#include "distributed/master_metadata_utility.h"
|
||||||
#include "distributed/master_protocol.h"
|
#include "distributed/master_protocol.h"
|
||||||
|
|
|
@ -26,6 +26,7 @@
|
||||||
#include "access/htup_details.h"
|
#include "access/htup_details.h"
|
||||||
#include "access/skey.h"
|
#include "access/skey.h"
|
||||||
#include "access/stratnum.h"
|
#include "access/stratnum.h"
|
||||||
|
#include "access/sysattr.h"
|
||||||
#include "access/tupdesc.h"
|
#include "access/tupdesc.h"
|
||||||
#include "catalog/dependency.h"
|
#include "catalog/dependency.h"
|
||||||
#include "catalog/indexing.h"
|
#include "catalog/indexing.h"
|
||||||
|
@ -37,6 +38,7 @@
|
||||||
#endif
|
#endif
|
||||||
#include "catalog/pg_index.h"
|
#include "catalog/pg_index.h"
|
||||||
#include "catalog/pg_type.h"
|
#include "catalog/pg_type.h"
|
||||||
|
#include "catalog/pg_namespace.h"
|
||||||
#include "commands/sequence.h"
|
#include "commands/sequence.h"
|
||||||
#include "distributed/citus_ruleutils.h"
|
#include "distributed/citus_ruleutils.h"
|
||||||
#include "distributed/listutils.h"
|
#include "distributed/listutils.h"
|
||||||
|
@ -56,6 +58,7 @@
|
||||||
#include "utils/palloc.h"
|
#include "utils/palloc.h"
|
||||||
#include "utils/relcache.h"
|
#include "utils/relcache.h"
|
||||||
#include "utils/ruleutils.h"
|
#include "utils/ruleutils.h"
|
||||||
|
#include "utils/tqual.h"
|
||||||
|
|
||||||
|
|
||||||
/* Shard related configuration */
|
/* Shard related configuration */
|
||||||
|
@ -66,6 +69,7 @@ int ShardPlacementPolicy = SHARD_PLACEMENT_ROUND_ROBIN;
|
||||||
|
|
||||||
|
|
||||||
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
|
static Datum WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor);
|
||||||
|
static char * SchemaOwner(Oid schemaId);
|
||||||
|
|
||||||
|
|
||||||
/* exports for SQL callable functions */
|
/* exports for SQL callable functions */
|
||||||
|
@ -633,7 +637,8 @@ GetTableDDLEvents(Oid relationId)
|
||||||
if (strncmp(schemaName, "public", NAMEDATALEN) != 0)
|
if (strncmp(schemaName, "public", NAMEDATALEN) != 0)
|
||||||
{
|
{
|
||||||
StringInfo schemaNameDef = makeStringInfo();
|
StringInfo schemaNameDef = makeStringInfo();
|
||||||
appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName);
|
char *ownerName = SchemaOwner(schemaId);
|
||||||
|
appendStringInfo(schemaNameDef, CREATE_SCHEMA_COMMAND, schemaName, ownerName);
|
||||||
|
|
||||||
tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data);
|
tableDDLEventList = lappend(tableDDLEventList, schemaNameDef->data);
|
||||||
}
|
}
|
||||||
|
@ -855,3 +860,47 @@ WorkerNodeGetDatum(WorkerNode *workerNode, TupleDesc tupleDescriptor)
|
||||||
|
|
||||||
return workerNodeDatum;
|
return workerNodeDatum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* SchemaOwner returns the name of the owner of the specified schema.
|
||||||
|
*/
|
||||||
|
char *
|
||||||
|
SchemaOwner(Oid schemaId)
|
||||||
|
{
|
||||||
|
const int scanKeyCount = 1;
|
||||||
|
|
||||||
|
Relation namespaceRelation = heap_open(NamespaceRelationId, AccessShareLock);
|
||||||
|
ScanKeyData scanKeyData[scanKeyCount];
|
||||||
|
SysScanDesc scanDescriptor = NULL;
|
||||||
|
HeapTuple tuple = NULL;
|
||||||
|
char *ownerName = NULL;
|
||||||
|
|
||||||
|
/* start scan */
|
||||||
|
ScanKeyInit(&scanKeyData[0],
|
||||||
|
ObjectIdAttributeNumber,
|
||||||
|
BTEqualStrategyNumber, F_OIDEQ,
|
||||||
|
ObjectIdGetDatum(schemaId));
|
||||||
|
|
||||||
|
scanDescriptor = systable_beginscan(namespaceRelation, NamespaceOidIndexId, true,
|
||||||
|
SnapshotSelf, 1, &scanKeyData[0]);
|
||||||
|
tuple = systable_getnext(scanDescriptor);
|
||||||
|
|
||||||
|
if (HeapTupleIsValid(tuple))
|
||||||
|
{
|
||||||
|
Form_pg_namespace nsptup = (Form_pg_namespace) GETSTRUCT(tuple);
|
||||||
|
Oid ownerId = nsptup->nspowner;
|
||||||
|
|
||||||
|
ownerName = GetUserNameFromId(ownerId, false);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* if the schema is not found, then return the name of current user */
|
||||||
|
ownerName = GetUserNameFromId(GetUserId(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
systable_endscan(scanDescriptor);
|
||||||
|
heap_close(namespaceRelation, NoLock);
|
||||||
|
|
||||||
|
return ownerName;
|
||||||
|
}
|
||||||
|
|
|
@ -66,7 +66,7 @@
|
||||||
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
|
#define SHARD_CSTORE_TABLE_SIZE_QUERY "SELECT cstore_table_size(%s)"
|
||||||
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s CASCADE"
|
#define DROP_REGULAR_TABLE_COMMAND "DROP TABLE IF EXISTS %s CASCADE"
|
||||||
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
|
#define DROP_FOREIGN_TABLE_COMMAND "DROP FOREIGN TABLE IF EXISTS %s CASCADE"
|
||||||
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s"
|
#define CREATE_SCHEMA_COMMAND "CREATE SCHEMA IF NOT EXISTS %s AUTHORIZATION %s"
|
||||||
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
|
#define CREATE_EMPTY_SHARD_QUERY "SELECT master_create_empty_shard('%s')"
|
||||||
#define FINALIZED_SHARD_PLACEMENTS_QUERY \
|
#define FINALIZED_SHARD_PLACEMENTS_QUERY \
|
||||||
"SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld"
|
"SELECT nodename, nodeport FROM pg_dist_shard_placement WHERE shardstate = 1 AND shardid = %ld"
|
||||||
|
|
|
@ -37,8 +37,8 @@ SELECT table_ddl_command_array('not_null_table');
|
||||||
CREATE SCHEMA not_in_path CREATE TABLE simple_table (id bigint);
|
CREATE SCHEMA not_in_path CREATE TABLE simple_table (id bigint);
|
||||||
SELECT table_ddl_command_array('not_in_path.simple_table');
|
SELECT table_ddl_command_array('not_in_path.simple_table');
|
||||||
table_ddl_command_array
|
table_ddl_command_array
|
||||||
-------------------------------------------------------------------------------------------------
|
------------------------------------------------------------------------------------------------------------------------
|
||||||
{"CREATE SCHEMA IF NOT EXISTS not_in_path","CREATE TABLE not_in_path.simple_table (id bigint)"}
|
{"CREATE SCHEMA IF NOT EXISTS not_in_path AUTHORIZATION postgres","CREATE TABLE not_in_path.simple_table (id bigint)"}
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
-- even more complex constraints should be preserved...
|
-- even more complex constraints should be preserved...
|
||||||
|
|
|
@ -98,7 +98,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
|
@ -125,7 +125,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
|
@ -145,7 +145,7 @@ SELECT unnest(master_metadata_snapshot());
|
||||||
TRUNCATE pg_dist_node
|
TRUNCATE pg_dist_node
|
||||||
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
SELECT worker_drop_distributed_table(logicalrelid) FROM pg_dist_partition
|
||||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata) VALUES (2, 2, 'localhost', 57638, 'default', FALSE),(1, 1, 'localhost', 57637, 'default', FALSE)
|
||||||
CREATE SCHEMA IF NOT EXISTS mx_testing_schema
|
CREATE SCHEMA IF NOT EXISTS mx_testing_schema AUTHORIZATION postgres
|
||||||
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE
|
||||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 integer DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL)
|
||||||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||||
|
|
Loading…
Reference in New Issue