Start handling local tables

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-16 14:57:46 +03:00
parent a6cdd43d42
commit 3636b7c9c5
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
4 changed files with 52 additions and 37 deletions

View File

@ -30,6 +30,7 @@
#include "distributed/commands.h" #include "distributed/commands.h"
#include "distributed/commands/sequence.h" #include "distributed/commands/sequence.h"
#include "distributed/commands/utility_hook.h" #include "distributed/commands/utility_hook.h"
#include "distributed/metadata/distobject.h"
#include "distributed/foreign_key_relationship.h" #include "distributed/foreign_key_relationship.h"
#include "distributed/listutils.h" #include "distributed/listutils.h"
#include "distributed/local_executor.h" #include "distributed/local_executor.h"
@ -306,6 +307,17 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId); ObjectAddressSet(tableAddress, RelationRelationId, relationId);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/* /*
* Ensure dependencies first as we will create shell table on the other nodes * Ensure dependencies first as we will create shell table on the other nodes
* in the MX case. * in the MX case.
@ -333,6 +345,7 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
* via process utility. * via process utility.
*/ */
ExecuteAndLogUtilityCommandList(shellTableDDLEvents); ExecuteAndLogUtilityCommandList(shellTableDDLEvents);
MarkObjectDistributed(&tableAddress);
/* /*
* Set shellRelationId as the relation with relationId now points * Set shellRelationId as the relation with relationId now points
@ -354,17 +367,6 @@ CreateCitusLocalTable(Oid relationId, bool cascadeViaForeignKeys, bool autoConve
InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted); InsertMetadataForCitusLocalTable(shellRelationId, shardId, autoConverted);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(shellRelationId, &attnumList,
&dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(shellRelationId, dependentSequenceList,
attnumList);
FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList); FinalizeCitusLocalTableCreation(shellRelationId, dependentSequenceList);
} }
@ -1240,15 +1242,6 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
if (ClusterHasKnownMetadataWorkers())
{
/*
* Ensure sequence dependencies and mark them as distributed
* before creating table metadata on workers
*/
MarkSequenceListDistributedAndPropagateWithDependencies(relationId,
dependentSequenceList);
}
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
} }

View File

@ -449,11 +449,13 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId); ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress); EnsureDependenciesExistOnAllNodes(&tableAddress);
/* TODO: Update owner of the sequence(?) */
/* TODO: Consider partitioned tables */ /* TODO: Consider partitioned tables */
CreateShellTableOnWorkers(relationId); if (ShouldSyncTableMetadata(relationId))
MarkObjectDistributed(&tableAddress); {
CreateShellTableOnWorkers(relationId);
MarkObjectDistributed(&tableAddress);
}
char replicationModel = DecideReplicationModel(distributionMethod, char replicationModel = DecideReplicationModel(distributionMethod,
colocateWithTableName, colocateWithTableName,

View File

@ -83,7 +83,7 @@
char *EnableManualMetadataChangesForUser = ""; char *EnableManualMetadataChangesForUser = "";
static List * GetDistributedTableDDLEvents(Oid relationId); static List * GetDistributedTableMetadataEvents(Oid relationId);
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
int colocationId); int colocationId);
static char * LocalGroupIdUpdateCommand(int32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
@ -539,13 +539,13 @@ MetadataCreateCommands(void)
/* /*
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to * GetDistributedTableMetadataEvents returns the full set of DDL commands necessary to
* create the given distributed table on a worker. The list includes setting up any * create the given distributed table on a worker. The list includes setting up any
* sequences, setting the owner of the table, inserting table and shard metadata, * sequences, setting the owner of the table, inserting table and shard metadata,
* setting the truncate trigger and foreign key constraints. * setting the truncate trigger and foreign key constraints.
*/ */
static List * static List *
GetDistributedTableDDLEvents(Oid relationId) GetDistributedTableMetadataEvents(Oid relationId)
{ {
CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId); CitusTableCacheEntry *cacheEntry = GetCitusTableCacheEntry(relationId);
@ -553,13 +553,6 @@ GetDistributedTableDDLEvents(Oid relationId)
/* if the table is owned by an extension we only propagate pg_dist_* records */ /* if the table is owned by an extension we only propagate pg_dist_* records */
bool tableOwnedByExtension = IsTableOwnedByExtension(relationId); bool tableOwnedByExtension = IsTableOwnedByExtension(relationId);
if (!tableOwnedByExtension)
{
/* command to associate sequences with table */
List *sequenceDependencyCommandList = SequenceDependencyCommandList(
relationId);
commandList = list_concat(commandList, sequenceDependencyCommandList);
}
/* command to insert pg_dist_partition entry */ /* command to insert pg_dist_partition entry */
char *metadataCommand = DistributionCreateCommand(cacheEntry); char *metadataCommand = DistributionCreateCommand(cacheEntry);
@ -1788,6 +1781,10 @@ CreateShellTableOnWorkers(Oid relationId)
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
} }
/* command to associate sequences with table */
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
commandList = list_concat(commandList, sequenceDependencyCommandList);
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
@ -1811,7 +1808,7 @@ CreateShellTableOnWorkers(Oid relationId)
void void
CreateTableMetadataOnWorkers(Oid relationId) CreateTableMetadataOnWorkers(Oid relationId)
{ {
List *commandList = GetDistributedTableDDLEvents(relationId); List *commandList = GetDistributedTableMetadataEvents(relationId);
/* prevent recursive propagation */ /* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION); SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);

View File

@ -824,14 +824,37 @@ ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode)
clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList,
detachPartitionCommandList); detachPartitionCommandList);
/* iterate over the commands and execute them in the same connection */
const char *commandString = NULL;
foreach_ptr(commandString, clearDistTableInfoCommandList)
{
elog(WARNING, "Current command 1 is %s", commandString);
}
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND); REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
commandString = NULL;
foreach_ptr(commandString, clearDistTableInfoCommandList)
{
elog(WARNING, "Current command 2 is %s", commandString);
}
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS);
commandString = NULL;
foreach_ptr(commandString, clearDistTableInfoCommandList)
{
elog(WARNING, "Current command 3 is %s", commandString);
}
List *clearDistTableCommands = list_make3(DISABLE_DDL_PROPAGATION, clearDistTableInfoCommandList = list_concat(list_make1(DISABLE_DDL_PROPAGATION),clearDistTableInfoCommandList);
clearDistTableInfoCommandList, clearDistTableInfoCommandList = list_concat(clearDistTableInfoCommandList, list_make1(ENABLE_DDL_PROPAGATION));
ENABLE_DDL_PROPAGATION);
commandString = NULL;
foreach_ptr(commandString, clearDistTableInfoCommandList)
{
elog(WARNING, "Current command 3 is %s", commandString);
}
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,