Shell table work prototype

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-06 15:38:50 +03:00
parent a67f518ef0
commit 5762bfb454
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
6 changed files with 231 additions and 150 deletions

View File

@ -440,11 +440,19 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress);
/*
* For now assume that we can create table after ensuring that dependencies exist.
* Obviously it doesn't support sequences we don't care for it now.
*
* TODO: Consider partitioned tables
*/
CreateShellTableOnWorkers(relationId);
MarkObjectDistributed(&tableAddress);
char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName,
viaDeprecatedAPI);
/*
* Due to dropping columns, the parent's distribution key may not match the
* partition's distribution key. The input distributionColumn belongs to

View File

@ -232,8 +232,18 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return NIL;
}
/* if this relation is not supported, break to the error at the end */
break;
List *commandList = NIL;
List *tableDDLCommands = GetFullTableCreationCommands(dependency->objectId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
return commandList;
}
case OCLASS_COLLATION:

View File

@ -665,16 +665,20 @@ SupportedDependencyByCitus(const ObjectAddress *address)
case OCLASS_CLASS:
{
char relKind = get_rel_relkind(address->objectId);
/*
* composite types have a reference to a relation of composite type, we need
* to follow those to get the dependencies of type fields.
*
* As we also handle tables as objects as well, follow dependencies
* for tables.
*/
if (get_rel_relkind(address->objectId) == RELKIND_COMPOSITE_TYPE)
if (relKind == RELKIND_COMPOSITE_TYPE ||
relKind == RELKIND_RELATION)
{
return true;
}
return false;
}
default:

View File

@ -89,7 +89,6 @@ static List * GetDistributedTableDDLEvents(Oid relationId);
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
int colocationId);
static char * LocalGroupIdUpdateCommand(int32 groupId);
static List * SequenceDependencyCommandList(Oid relationId);
static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void);
@ -106,7 +105,7 @@ static GrantStmt * GenerateGrantOnSchemaStmtForRights(Oid roleOid,
Oid schemaOid,
char *permission,
bool withGrantOption);
static void SetLocalEnableDependencyCreation(bool state);
static char * GenerateSetRoleQuery(Oid roleOid);
static void MetadataSyncSigTermHandler(SIGNAL_ARGS);
static void MetadataSyncSigAlrmHandler(SIGNAL_ARGS);
@ -523,8 +522,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
* following queries:
*
* (i) Query that populates pg_dist_node table
* (ii) Queries that create the clustered tables (including foreign keys,
* partitioning hierarchy etc.)
* (ii) Queries that create the foreign keys and partitioning hierarchy
* (iii) Queries that populate pg_dist_partition table referenced by (ii)
* (iv) Queries that populate pg_dist_shard table referenced by (iii)
* (v) Queries that populate pg_dist_placement table referenced by (iv)
@ -538,7 +536,6 @@ MetadataCreateCommands(void)
List *propagatedTableList = NIL;
bool includeNodesFromOtherClusters = true;
List *workerNodeList = ReadDistNode(includeNodesFromOtherClusters);
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
/* make sure we have deterministic output for our tests */
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
@ -558,125 +555,7 @@ MetadataCreateCommands(void)
}
}
/* create the tables, but not the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
ObjectAddress tableAddress = { 0 };
if (IsTableOwnedByExtension(relationId))
{
/* skip table creation when the Citus table is owned by an extension */
continue;
}
List *ddlCommandList = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
/*
* Tables might have dependencies on different objects, since we create shards for
* table via multiple sessions these objects will be created via their own connection
* and committed immediately so they become visible to all sessions creating shards.
*/
ObjectAddressSet(tableAddress, RelationRelationId, relationId);
/*
* Set object propagation to off as we will mark objects distributed
* at the end of this function.
*/
bool prevDependencyCreationValue = EnableDependencyCreation;
SetLocalEnableDependencyCreation(false);
EnsureDependenciesExistOnAllNodes(&tableAddress);
/*
* Ensure sequence dependencies and mark them as distributed
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, dependentSequenceList)
{
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
/*
* Sequences are not marked as distributed while creating table
* if no metadata worker node exists. We are marking all sequences
* distributed while syncing metadata in such case.
*/
MarkObjectDistributed(&sequenceAddress);
}
SetLocalEnableDependencyCreation(prevDependencyCreationValue);
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
workerSequenceDDLCommands);
/* ddlCommandList contains TableDDLCommand information, need to materialize */
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, ddlCommandList)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
GetTableDDLCommand(tableDDLCommand));
}
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
tableOwnerResetCommand);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(
relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
sequenceDependencyCommandList);
}
/* construct the foreign key constraints after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip foreign key creation when the Citus table is owned by an extension */
continue;
}
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
foreignConstraintCommands);
}
/* construct partitioning hierarchy after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip partition creation when the Citus table is owned by an extension */
continue;
}
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
alterTableAttachPartitionCommands);
}
}
/* after all tables are created, create the metadata */
/* create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid clusteredTableId = cacheEntry->relationId;
@ -814,25 +693,11 @@ GetDistributedTableDDLEvents(Oid relationId)
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
List *commandList = NIL;
IncludeSequenceDefaults includeSequenceDefaults = WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
/* if the table is owned by an extension we only propagate pg_dist_* records */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (!tableOwnedByExtension)
{
/*
* Commands to create the table, these commands are TableDDLCommands so lets
* materialize to the non-sharded version
*/
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
/* command to associate sequences with table */
List *sequenceDependencyCommandList = SequenceDependencyCommandList(
relationId);
@ -1737,7 +1602,7 @@ GetSequencesFromAttrDef(Oid attrdefOid)
* necessary to ensure that the sequence is dropped when the table is
* dropped.
*/
static List *
List *
SequenceDependencyCommandList(Oid relationId)
{
List *sequenceCommandList = NIL;
@ -1975,7 +1840,7 @@ GenerateGrantOnSchemaStmtForRights(Oid roleOid,
/*
* SetLocalEnableDependencyCreation sets the enable_object_propagation locally
*/
static void
void
SetLocalEnableDependencyCreation(bool state)
{
set_config_option("citus.enable_object_propagation", state == true ? "on" : "off",
@ -2061,6 +1926,43 @@ HasMetadataWorkers(void)
}
/*
* CreateShellTableOnWorkers creates shell table on workers.
*/
void
CreateShellTableOnWorkers(Oid relationId)
{
/* if the table is owned by an extension we don't create */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (!tableOwnedByExtension)
{
List *commandList = NIL;
IncludeSequenceDefaults includeSequenceDefaults =
WORKER_NEXTVAL_SEQUENCE_DEFAULTS;
List *tableDDLCommands = GetFullTableCreationCommands(relationId,
includeSequenceDefaults);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
}
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *command = NULL;
foreach_ptr(command, commandList)
{
SendCommandToWorkersWithMetadata(command);
}
}
}
/*
* CreateTableMetadataOnWorkers creates the list of commands needed to create the
* given distributed table and sends these commands to all metadata workers i.e. workers

View File

@ -40,6 +40,7 @@
#include "distributed/remote_commands.h"
#include "distributed/resource_lock.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/shared_connection_stats.h"
#include "distributed/string_utils.h"
#include "distributed/transaction_recovery.h"
@ -105,7 +106,9 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpDistributedTableDependencies(WorkerNode *workerNode);
static void SetUpSequences(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
static void PropagateNodeWideObjects(WorkerNode *newWorkerNode);
static WorkerNode * ModifiableWorkerNode(const char *nodeName, int32 nodePort);
@ -576,10 +579,159 @@ master_set_node_property(PG_FUNCTION_ARGS)
}
static void
SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
List *multipleTableIntegrationCommandList = NIL;
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* construct the foreign key constraints after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip foreign key creation when the Citus table is owned by an extension */
continue;
}
List *foreignConstraintCommands =
GetReferencingForeignConstaintCommands(relationId);
multipleTableIntegrationCommandList = list_concat(
multipleTableIntegrationCommandList,
foreignConstraintCommands);
}
/* construct partitioning hierarchy after all tables are created */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip partition creation when the Citus table is owned by an extension */
continue;
}
if (PartitionTable(relationId))
{
char *alterTableAttachPartitionCommands =
GenerateAlterTableAttachPartitionCommand(relationId);
multipleTableIntegrationCommandList = lappend(
multipleTableIntegrationCommandList,
alterTableAttachPartitionCommands);
}
}
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *command = NULL;
foreach_ptr(command, multipleTableIntegrationCommandList)
{
SendCommandToWorkersWithMetadata(command);
}
}
static void
SetUpSequences(WorkerNode *workerNode)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
List *sequenceCommandList = NIL;
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip table metadata creation when the Citus table is owned by an extension */
continue;
}
/*
* Set object propagation to off as objects will be distributed while syncing
* the metadata.
*/
bool prevDependencyCreationValue = EnableDependencyCreation;
SetLocalEnableDependencyCreation(false);
/*
* Ensure sequence dependencies and mark them as distributed
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, dependentSequenceList)
{
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
/*
* Sequences are not marked as distributed while creating table
* if no metadata worker node exists. We are marking all sequences
* distributed while syncing metadata in such case.
*/
MarkObjectDistributed(&sequenceAddress);
}
SetLocalEnableDependencyCreation(prevDependencyCreationValue);
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
sequenceCommandList = list_concat(sequenceCommandList, workerSequenceDDLCommands);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
sequenceCommandList = list_concat(sequenceCommandList,
sequenceDependencyCommandList);
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *command = NULL;
foreach_ptr(command, sequenceCommandList)
{
SendCommandToWorkersWithMetadata(command);
}
}
}
/*
* SetUpDistributedTableDependencies sets up up the following on a node if it's
* SetUpDistributedTableWithDependencies sets up up the following on a node if it's
* a primary node that currently stores data:
* - All dependencies (e.g., types, schemas)
* - All shell distributed table
* - Reference tables, because they are needed to handle queries efficiently.
* - Distributed functions
*
@ -587,7 +739,7 @@ master_set_node_property(PG_FUNCTION_ARGS)
* since all the dependencies should be present in the coordinator already.
*/
static void
SetUpDistributedTableDependencies(WorkerNode *newWorkerNode)
SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
{
if (NodeIsPrimary(newWorkerNode))
{
@ -896,7 +1048,9 @@ ActivateNode(char *nodeName, int nodePort)
BoolGetDatum(isActive));
}
SetUpDistributedTableDependencies(workerNode);
SetUpSequences(workerNode);
SetUpDistributedTableWithDependencies(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode);
if (syncMetadata)
{

View File

@ -51,11 +51,14 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern List * GrantOnSchemaDDLCommands(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, int32 groupId);
extern void CreateShellTableOnWorkers(Oid relationId);
extern void CreateTableMetadataOnWorkers(Oid relationId);
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);
extern void SyncMetadataToNodesMain(Datum main_arg);
extern void SignalMetadataSyncDaemon(Oid database, int sig);
extern bool ShouldInitiateMetadataSync(bool *lockFailure);
extern void SetLocalEnableDependencyCreation(bool state);
extern List * SequenceDependencyCommandList(Oid relationId);
extern List * DDLCommandsForSequence(Oid sequenceOid, char *ownerName);
extern List * SequenceDDLCommandsForTable(Oid relationId);