mirror of https://github.com/citusdata/citus.git
wip: table creations
parent
48a9c9bb68
commit
a35ac7c7d9
|
@ -104,6 +104,7 @@ static void AppendStringInfoFunction(StringInfo buf, Oid fnoid);
|
||||||
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
|
static Oid AggregateFunctionOid(const char *functionName, Oid inputType);
|
||||||
static char* CIMVTriggerFuncName(int prefixId, const char* relname);
|
static char* CIMVTriggerFuncName(int prefixId, const char* relname);
|
||||||
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
|
static char* CIMVInternalPrefix(const RangeVar* baseTable, int prefixId);
|
||||||
|
static void AlterTableOwner(RangeVar* tableName, char* ownerName);
|
||||||
|
|
||||||
bool
|
bool
|
||||||
ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string,
|
ProcessCreateMaterializedViewStmt(const CreateTableAsStmt *stmt, const char *query_string,
|
||||||
|
@ -149,7 +150,13 @@ CreateCimv(CimvCreate *cimvCreate)
|
||||||
elog(ERROR, "SPI_connect failed");
|
elog(ERROR, "SPI_connect failed");
|
||||||
}
|
}
|
||||||
|
|
||||||
PushCitusSecurityContext();
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
|
|
||||||
|
char* currentUserName = CurrentUserName();
|
||||||
|
|
||||||
|
GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
CreateMatTable(cimvCreate, false);
|
CreateMatTable(cimvCreate, false);
|
||||||
|
|
||||||
|
@ -164,7 +171,14 @@ CreateCimv(CimvCreate *cimvCreate)
|
||||||
CreateDataChangeTriggerFunction(cimvCreate);
|
CreateDataChangeTriggerFunction(cimvCreate);
|
||||||
CreateDataChangeTriggers(cimvCreate);
|
CreateDataChangeTriggers(cimvCreate);
|
||||||
InsertIntoPgCimv(cimvCreate->formCimv);
|
InsertIntoPgCimv(cimvCreate->formCimv);
|
||||||
PopCitusSecurityContext();
|
|
||||||
|
AlterTableOwner(cimvCreate->matTableName, currentUserName);
|
||||||
|
AlterTableOwner(cimvCreate->refreshViewName, currentUserName);
|
||||||
|
AlterTableOwner(cimvCreate->userViewName, currentUserName);
|
||||||
|
|
||||||
|
SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
if (SPI_finish() != SPI_OK_FINISH)
|
if (SPI_finish() != SPI_OK_FINISH)
|
||||||
{
|
{
|
||||||
|
@ -349,6 +363,24 @@ DistributeTable(CimvCreate *cimvCreate, RangeVar *tableName)
|
||||||
pfree(querybuf.data);
|
pfree(querybuf.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void AlterTableOwner(RangeVar* tableName, char* ownerName) {
|
||||||
|
StringInfoData querybuf;
|
||||||
|
initStringInfo(&querybuf);
|
||||||
|
|
||||||
|
appendStringInfo(&querybuf,
|
||||||
|
"ALTER TABLE %s.%s OWNER TO %s;",
|
||||||
|
tableName->schemaname ? tableName->schemaname : "public",
|
||||||
|
tableName->relname,
|
||||||
|
ownerName);
|
||||||
|
|
||||||
|
if (SPI_execute(querybuf.data, false, 0) != SPI_OK_UTILITY)
|
||||||
|
{
|
||||||
|
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
pfree(querybuf.data);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static void
|
static void
|
||||||
CreateUserView(CimvCreate *cimvCreate)
|
CreateUserView(CimvCreate *cimvCreate)
|
||||||
|
@ -433,7 +465,6 @@ CreateUserView(CimvCreate *cimvCreate)
|
||||||
cimvCreate->formCimv->userview = DefineVirtualRelation(cimvCreate->userViewName,
|
cimvCreate->formCimv->userview = DefineVirtualRelation(cimvCreate->userViewName,
|
||||||
query->targetList,
|
query->targetList,
|
||||||
query).objectId;
|
query).objectId;
|
||||||
|
|
||||||
pfree(queryText.data);
|
pfree(queryText.data);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -124,7 +124,11 @@ static void
|
||||||
DropCimv(Form_pg_cimv formCimv, DropBehavior behavior)
|
DropCimv(Form_pg_cimv formCimv, DropBehavior behavior)
|
||||||
{
|
{
|
||||||
|
|
||||||
PushCitusSecurityContext();
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
|
|
||||||
|
// GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
// SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
ObjectAddress matTableAddress;
|
ObjectAddress matTableAddress;
|
||||||
matTableAddress.classId = RelationRelationId;
|
matTableAddress.classId = RelationRelationId;
|
||||||
|
@ -195,7 +199,8 @@ DropCimv(Form_pg_cimv formCimv, DropBehavior behavior)
|
||||||
|
|
||||||
DeletePgCimvRow(userViewAddress.objectId);
|
DeletePgCimvRow(userViewAddress.objectId);
|
||||||
|
|
||||||
PopCitusSecurityContext();
|
// SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
/* Close SPI context. */
|
/* Close SPI context. */
|
||||||
if (SPI_finish() != SPI_OK_FINISH)
|
if (SPI_finish() != SPI_OK_FINISH)
|
||||||
{
|
{
|
||||||
|
|
|
@ -57,8 +57,12 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
matTableSchemaName = quote_identifier(matTableSchemaName);
|
matTableSchemaName = quote_identifier(matTableSchemaName);
|
||||||
matTableName = quote_identifier(matTableName);
|
matTableName = quote_identifier(matTableName);
|
||||||
|
|
||||||
|
Oid savedUserId = InvalidOid;
|
||||||
|
int savedSecurityContext = 0;
|
||||||
|
|
||||||
const char *landingTableSchemaName = NULL;
|
const char *landingTableSchemaName = NULL;
|
||||||
const char *landingTableName = NULL;
|
const char *landingTableName = NULL;
|
||||||
|
|
||||||
if (formCimv->landingtable)
|
if (formCimv->landingtable)
|
||||||
{
|
{
|
||||||
landingTableSchemaName = get_namespace_name(get_rel_namespace(
|
landingTableSchemaName = get_namespace_name(get_rel_namespace(
|
||||||
|
@ -70,6 +74,8 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
|
|
||||||
if (skipData)
|
if (skipData)
|
||||||
{
|
{
|
||||||
|
// GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
// SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
if (formCimv->landingtable)
|
if (formCimv->landingtable)
|
||||||
{
|
{
|
||||||
appendStringInfo(&querybuf,
|
appendStringInfo(&querybuf,
|
||||||
|
@ -101,6 +107,10 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
/* better: SPI_commit_and_chain(); */
|
/* better: SPI_commit_and_chain(); */
|
||||||
SPI_commit();
|
SPI_commit();
|
||||||
SPI_start_transaction();
|
SPI_start_transaction();
|
||||||
|
|
||||||
|
// GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
// SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
|
|
||||||
/* TODO: cleanup if this fails */
|
/* TODO: cleanup if this fails */
|
||||||
appendStringInfo(&querybuf,
|
appendStringInfo(&querybuf,
|
||||||
"INSERT INTO %s.%s "
|
"INSERT INTO %s.%s "
|
||||||
|
@ -109,17 +119,18 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
matTableName,
|
matTableName,
|
||||||
refreshViewSchemaName,
|
refreshViewSchemaName,
|
||||||
refreshViewName);
|
refreshViewName);
|
||||||
PushCitusSecurityContext();
|
|
||||||
if (SPI_execute(querybuf.data, false, 0) != SPI_OK_INSERT)
|
if (SPI_execute(querybuf.data, false, 0) != SPI_OK_INSERT)
|
||||||
{
|
{
|
||||||
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
|
elog(ERROR, "SPI_exec failed: %s", querybuf.data);
|
||||||
}
|
}
|
||||||
PopCitusSecurityContext();
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
Snapshot snapshot = GetLatestSnapshot();
|
Snapshot snapshot = GetLatestSnapshot();
|
||||||
|
|
||||||
|
// GetUserIdAndSecContext(&savedUserId, &savedSecurityContext);
|
||||||
|
// SetUserIdAndSecContext(CitusExtensionOwner(), SECURITY_LOCAL_USERID_CHANGE);
|
||||||
/* TODO: DELETE only if !isCreate */
|
/* TODO: DELETE only if !isCreate */
|
||||||
appendStringInfo(&querybuf,
|
appendStringInfo(&querybuf,
|
||||||
"DELETE FROM %s.%s",
|
"DELETE FROM %s.%s",
|
||||||
|
@ -135,7 +146,6 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
matTableName,
|
matTableName,
|
||||||
refreshViewSchemaName,
|
refreshViewSchemaName,
|
||||||
refreshViewName);
|
refreshViewName);
|
||||||
PushCitusSecurityContext();
|
|
||||||
SpiExecuteSnapshot(&querybuf, snapshot, SPI_OK_INSERT);
|
SpiExecuteSnapshot(&querybuf, snapshot, SPI_OK_INSERT);
|
||||||
resetStringInfo(&querybuf);
|
resetStringInfo(&querybuf);
|
||||||
|
|
||||||
|
@ -149,10 +159,11 @@ RefreshCimv(Form_pg_cimv formCimv, bool skipData, bool isCreate)
|
||||||
SpiExecuteSnapshot(&querybuf, snapshot, SPI_OK_DELETE);
|
SpiExecuteSnapshot(&querybuf, snapshot, SPI_OK_DELETE);
|
||||||
resetStringInfo(&querybuf);
|
resetStringInfo(&querybuf);
|
||||||
}
|
}
|
||||||
PopCitusSecurityContext();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetUserIdAndSecContext(savedUserId, savedSecurityContext);
|
||||||
|
|
||||||
/* Close SPI context. */
|
/* Close SPI context. */
|
||||||
if (SPI_finish() != SPI_OK_FINISH)
|
if (SPI_finish() != SPI_OK_FINISH)
|
||||||
{
|
{
|
||||||
|
|
|
@ -469,12 +469,12 @@ multi_ProcessUtility(PlannedStmt *pstmt,
|
||||||
|
|
||||||
if (IsA(parsetree, RefreshMatViewStmt))
|
if (IsA(parsetree, RefreshMatViewStmt))
|
||||||
{
|
{
|
||||||
// PushCitusSecurityContext();
|
|
||||||
continueProcessing = !ProcessRefreshMaterializedViewStmt(
|
continueProcessing = !ProcessRefreshMaterializedViewStmt(
|
||||||
(RefreshMatViewStmt *) parsetree);
|
(RefreshMatViewStmt *) parsetree);
|
||||||
// PopCitusSecurityContext();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool switchToExtensionOwner = false;
|
||||||
|
|
||||||
if (IsA(parsetree, DropStmt))
|
if (IsA(parsetree, DropStmt))
|
||||||
{
|
{
|
||||||
DropStmt *dropStatement = (DropStmt *) parsetree;
|
DropStmt *dropStatement = (DropStmt *) parsetree;
|
||||||
|
|
|
@ -9,3 +9,5 @@ DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
||||||
#include "udfs/citus_unique_id/10.0-1.sql"
|
#include "udfs/citus_unique_id/10.0-1.sql"
|
||||||
|
|
||||||
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
|
#include "../../columnar/sql/columnar--9.5-1--10.0-1.sql"
|
||||||
|
|
||||||
|
GRANT USAGE ON SCHEMA citus_internal TO public;
|
Loading…
Reference in New Issue