Fix metadata changes and use same connection for all

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-15 23:58:03 +03:00
parent fea68a43ad
commit a6cdd43d42
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 156 additions and 175 deletions

View File

@ -239,6 +239,9 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
if (relKind == RELKIND_RELATION) if (relKind == RELKIND_RELATION)
{ {
Oid relationId = dependency->objectId; Oid relationId = dependency->objectId;
if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId))
{
/* skip table metadata creation when the Citus table is owned by an extension */
List *commandList = NIL; List *commandList = NIL;
List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS); List *tableDDLCommands = GetFullTableCreationCommands(relationId, WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
@ -249,11 +252,13 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand));
} }
// TODO: May need to move sequence dependencies to ActiveNode directly
List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId); List *sequenceDependencyCommandList = SequenceDependencyCommandList(dependency->objectId);
commandList = list_concat(commandList, sequenceDependencyCommandList); commandList = list_concat(commandList, sequenceDependencyCommandList);
return commandList; return commandList;
} }
}
if (relKind == RELKIND_SEQUENCE) if (relKind == RELKIND_SEQUENCE)
{ {
@ -387,10 +392,9 @@ ReplicateAllDependenciesToNode(const char *nodeName, int nodePort)
} }
/* since we are executing ddl commands lets disable propagation, primarily for mx */ /* since we are executing ddl commands lets disable propagation, primarily for mx */
ddlCommands = list_concat(list_make1(DISABLE_DDL_PROPAGATION), ddlCommands); ddlCommands = list_make3(DISABLE_DDL_PROPAGATION, ddlCommands, ENABLE_DDL_PROPAGATION);
SendCommandListToWorkerOutsideTransaction(nodeName, nodePort, SendMetadataCommandListToWorkerInCoordinatedTransaction(nodeName, nodePort, CitusExtensionOwnerName(), ddlCommands);
CitusExtensionOwnerName(), ddlCommands);
} }

View File

@ -676,7 +676,7 @@ SupportedDependencyByCitus(const ObjectAddress *address)
* for tables. * for tables.
*/ */
if (relKind == RELKIND_COMPOSITE_TYPE || if (relKind == RELKIND_COMPOSITE_TYPE ||
relKind == RELKIND_RELATION || (relKind == RELKIND_RELATION && IsCitusTable(address->objectId)) ||
relKind == RELKIND_SEQUENCE) relKind == RELKIND_SEQUENCE)
{ {
return true; return true;

View File

@ -83,13 +83,10 @@
char *EnableManualMetadataChangesForUser = ""; char *EnableManualMetadataChangesForUser = "";
static void EnsureSequentialModeMetadataOperations(void);
static List * DistributedObjectMetadataSyncCommandList(void);
static List * GetDistributedTableDDLEvents(Oid relationId); static List * GetDistributedTableDDLEvents(Oid relationId);
static void EnsureObjectMetadataIsSane(int distributionArgumentIndex, static void EnsureObjectMetadataIsSane(int distributionArgumentIndex,
int colocationId); int colocationId);
static char * LocalGroupIdUpdateCommand(int32 groupId); static char * LocalGroupIdUpdateCommand(int32 groupId);
static char * TruncateTriggerCreateCommand(Oid relationId);
static char * SchemaOwnerName(Oid objectId); static char * SchemaOwnerName(Oid objectId);
static bool HasMetadataWorkers(void); static bool HasMetadataWorkers(void);
static bool ShouldSyncTableMetadataInternal(bool hashDistributed, static bool ShouldSyncTableMetadataInternal(bool hashDistributed,
@ -254,7 +251,7 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
* visible on all connections used by the transaction, meaning we can only use 1 * visible on all connections used by the transaction, meaning we can only use 1
* connection per node. * connection per node.
*/ */
static void void
EnsureSequentialModeMetadataOperations(void) EnsureSequentialModeMetadataOperations(void)
{ {
if (!IsTransactionBlock()) if (!IsTransactionBlock())
@ -541,95 +538,6 @@ MetadataCreateCommands(void)
} }
/*
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
* pg_dist_object entries on the new node.
*/
static List *
DistributedObjectMetadataSyncCommandList(void)
{
HeapTuple pgDistObjectTup = NULL;
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
Relation pgDistObjectIndexRel = index_open(DistObjectPrimaryKeyIndexId(),
AccessShareLock);
TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel);
List *objectAddressList = NIL;
List *distArgumentIndexList = NIL;
List *colocationIdList = NIL;
/* It is not strictly necessary to read the tuples in order.
* However, it is useful to get consistent behavior, both for regression
* tests and also in production systems.
*/
SysScanDesc pgDistObjectScan = systable_beginscan_ordered(pgDistObjectRel,
pgDistObjectIndexRel, NULL,
0, NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext_ordered(pgDistObjectScan,
ForwardScanDirection)))
{
Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(
pgDistObjectTup);
ObjectAddress *address = palloc(sizeof(ObjectAddress));
ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid,
pg_dist_object->objsubid);
bool distributionArgumentIndexIsNull = false;
Datum distributionArgumentIndexDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index,
pgDistObjectDesc,
&distributionArgumentIndexIsNull);
int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum);
bool colocationIdIsNull = false;
Datum colocationIdDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_colocationid,
pgDistObjectDesc,
&colocationIdIsNull);
int32 colocationId = DatumGetInt32(colocationIdDatum);
objectAddressList = lappend(objectAddressList, address);
if (distributionArgumentIndexIsNull)
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
INVALID_DISTRIBUTION_ARGUMENT_INDEX);
}
else
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
distributionArgumentIndex);
}
if (colocationIdIsNull)
{
colocationIdList = lappend_int(colocationIdList,
INVALID_COLOCATION_ID);
}
else
{
colocationIdList = lappend_int(colocationIdList, colocationId);
}
}
systable_endscan_ordered(pgDistObjectScan);
index_close(pgDistObjectIndexRel, AccessShareLock);
relation_close(pgDistObjectRel, NoLock);
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumentIndexList,
colocationIdList);
List *commandList = list_make1(workerMetadataUpdateCommand);
return commandList;
}
/* /*
* GetDistributedTableDDLEvents returns the full set of DDL commands necessary to * GetDistributedTableDDLEvents returns the full set of DDL commands necessary to
* create the given distributed table on a worker. The list includes setting up any * create the given distributed table on a worker. The list includes setting up any
@ -1793,7 +1701,7 @@ GenerateSetRoleQuery(Oid roleOid)
* TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger * TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger
* function, which creates the truncate trigger on the worker. * function, which creates the truncate trigger on the worker.
*/ */
static char * char *
TruncateTriggerCreateCommand(Oid relationId) TruncateTriggerCreateCommand(Oid relationId)
{ {
StringInfo triggerCreateCommand = makeStringInfo(); StringInfo triggerCreateCommand = makeStringInfo();

View File

@ -43,6 +43,7 @@
#include "distributed/multi_partitioning_utils.h" #include "distributed/multi_partitioning_utils.h"
#include "distributed/shared_connection_stats.h" #include "distributed/shared_connection_stats.h"
#include "distributed/string_utils.h" #include "distributed/string_utils.h"
#include "distributed/metadata/pg_dist_object.h"
#include "distributed/transaction_recovery.h" #include "distributed/transaction_recovery.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "distributed/worker_manager.h" #include "distributed/worker_manager.h"
@ -91,6 +92,7 @@ typedef struct NodeMetadata
} NodeMetadata; } NodeMetadata;
/* local function forward declarations */ /* local function forward declarations */
static List * DistributedObjectMetadataSyncCommandList(void);
static List * DetachPartitionCommandList(void); static List * DetachPartitionCommandList(void);
static int ActivateNode(char *nodeName, int nodePort); static int ActivateNode(char *nodeName, int nodePort);
static void RemoveNodeFromCluster(char *nodeName, int32 nodePort); static void RemoveNodeFromCluster(char *nodeName, int32 nodePort);
@ -107,9 +109,8 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpSequenceDependencies(WorkerNode *workerNode);
static void SetUpObjectMetadata(WorkerNode *workerNode); static void SetUpObjectMetadata(WorkerNode *workerNode);
static void ClearDistributedTablesOnNode(WorkerNode *workerNode); static void ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -582,6 +583,15 @@ master_set_node_property(PG_FUNCTION_ARGS)
} }
/*
* SetUpMultipleDistributedTableIntegrations set up the multiple integrations
* including
*
* (i) Foreign keys
* (ii) Partionining hierarchy
*
* on the given worker node.
*/
static void static void
SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode) SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
{ {
@ -639,61 +649,19 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
} }
} }
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
/* send the commands one by one */
const char *command = NULL;
foreach_ptr(command, multipleTableIntegrationCommandList)
{
SendCommandToWorkersWithMetadata(command);
}
}
static void
SetUpSequenceDependencies(WorkerNode *workerNode)
{
List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL;
List *sequenceCommandList = NIL;
CitusTableCacheEntry *cacheEntry = NULL;
foreach_ptr(cacheEntry, distributedTableList)
{
if (ShouldSyncTableMetadata(cacheEntry->relationId))
{
propagatedTableList = lappend(propagatedTableList, cacheEntry);
}
}
/* create the metadata */
foreach_ptr(cacheEntry, propagatedTableList)
{
Oid relationId = cacheEntry->relationId;
if (IsTableOwnedByExtension(relationId))
{
/* skip table metadata creation when the Citus table is owned by an extension */
continue;
}
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
sequenceCommandList = list_concat(sequenceCommandList,
sequenceDependencyCommandList);
}
/* prevent recursive propagation */
SendCommandToWorkersWithMetadata(DISABLE_DDL_PROPAGATION);
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
List *commandList = list_make3(DISABLE_DDL_PROPAGATION, multipleTableIntegrationCommandList, ENABLE_DDL_PROPAGATION);
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
workerNode->workerPort, workerNode->workerPort,
currentUser, currentUser,
sequenceCommandList); commandList);
} }
/*
* SetUpObjectMetadata sets up the metadata depending on the distributed object
* on the given node.
*/
static void static void
SetUpObjectMetadata(WorkerNode *workerNode) SetUpObjectMetadata(WorkerNode *workerNode)
{ {
@ -744,7 +712,7 @@ SetUpObjectMetadata(WorkerNode *workerNode)
distributedObjectSyncCommandList); distributedObjectSyncCommandList);
} }
List *metadataSnapshotCommands = list_make2(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList); List *metadataSnapshotCommands = list_make3(DISABLE_DDL_PROPAGATION, metadataSnapshotCommandList, ENABLE_DDL_PROPAGATION);
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
@ -753,8 +721,102 @@ SetUpObjectMetadata(WorkerNode *workerNode)
metadataSnapshotCommands); metadataSnapshotCommands);
} }
/*
* DistributedObjectMetadataSyncCommandList returns the necessary commands to create
* pg_dist_object entries on the new node.
*/
static List *
DistributedObjectMetadataSyncCommandList(void)
{
HeapTuple pgDistObjectTup = NULL;
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
Relation pgDistObjectIndexRel = index_open(DistObjectPrimaryKeyIndexId(),
AccessShareLock);
TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel);
List *objectAddressList = NIL;
List *distArgumentIndexList = NIL;
List *colocationIdList = NIL;
/* It is not strictly necessary to read the tuples in order.
* However, it is useful to get consistent behavior, both for regression
* tests and also in production systems.
*/
SysScanDesc pgDistObjectScan = systable_beginscan_ordered(pgDistObjectRel,
pgDistObjectIndexRel, NULL,
0, NULL);
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext_ordered(pgDistObjectScan,
ForwardScanDirection)))
{
Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(
pgDistObjectTup);
ObjectAddress *address = palloc(sizeof(ObjectAddress));
ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid,
pg_dist_object->objsubid);
bool distributionArgumentIndexIsNull = false;
Datum distributionArgumentIndexDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_distribution_argument_index,
pgDistObjectDesc,
&distributionArgumentIndexIsNull);
int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum);
bool colocationIdIsNull = false;
Datum colocationIdDatum =
heap_getattr(pgDistObjectTup,
Anum_pg_dist_object_colocationid,
pgDistObjectDesc,
&colocationIdIsNull);
int32 colocationId = DatumGetInt32(colocationIdDatum);
objectAddressList = lappend(objectAddressList, address);
if (distributionArgumentIndexIsNull)
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
INVALID_DISTRIBUTION_ARGUMENT_INDEX);
}
else
{
distArgumentIndexList = lappend_int(distArgumentIndexList,
distributionArgumentIndex);
}
if (colocationIdIsNull)
{
colocationIdList = lappend_int(colocationIdList,
INVALID_COLOCATION_ID);
}
else
{
colocationIdList = lappend_int(colocationIdList, colocationId);
}
}
systable_endscan_ordered(pgDistObjectScan);
index_close(pgDistObjectIndexRel, AccessShareLock);
relation_close(pgDistObjectRel, NoLock);
char *workerMetadataUpdateCommand =
MarkObjectsDistributedCreateCommand(objectAddressList,
distArgumentIndexList,
colocationIdList);
List *commandList = list_make1(workerMetadataUpdateCommand);
return commandList;
}
/*
* ClearDistributedObjectsWithMetadataFromNode clears all the distributed objects and related
* metadata from the given worker node.
*/
static void static void
ClearDistributedTablesOnNode(WorkerNode *workerNode) ClearDistributedObjectsWithMetadataFromNode(WorkerNode *workerNode)
{ {
List *clearDistTableInfoCommandList = NIL; List *clearDistTableInfoCommandList = NIL;
List *detachPartitionCommandList = DetachPartitionCommandList(); List *detachPartitionCommandList = DetachPartitionCommandList();
@ -767,8 +829,9 @@ ClearDistributedTablesOnNode(WorkerNode *workerNode)
clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS); clearDistTableInfoCommandList = lappend(clearDistTableInfoCommandList, DELETE_ALL_DISTRIBUTED_OBJECTS);
List *clearDistTableCommands = list_make2(DISABLE_DDL_PROPAGATION, List *clearDistTableCommands = list_make3(DISABLE_DDL_PROPAGATION,
clearDistTableInfoCommandList); clearDistTableInfoCommandList,
ENABLE_DDL_PROPAGATION);
char *currentUser = CurrentUserName(); char *currentUser = CurrentUserName();
SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(workerNode->workerName,
@ -781,7 +844,7 @@ ClearDistributedTablesOnNode(WorkerNode *workerNode)
/* /*
* SetUpDistributedTableWithDependencies sets up up the following on a node if it's * SetUpDistributedTableWithDependencies sets up up the following on a node if it's
* a primary node that currently stores data: * a primary node that currently stores data:
* - All dependencies (e.g., types, schemas) * - All dependencies (e.g., types, schemas, sequences)
* - All shell distributed table * - All shell distributed table
* - Reference tables, because they are needed to handle queries efficiently. * - Reference tables, because they are needed to handle queries efficiently.
* - Distributed functions * - Distributed functions
@ -857,11 +920,10 @@ PropagateNodeWideObjects(WorkerNode *newWorkerNode)
if (list_length(ddlCommands) > 0) if (list_length(ddlCommands) > 0)
{ {
/* if there are command wrap them in enable_ddl_propagation off */ /* if there are command wrap them in enable_ddl_propagation off */
ddlCommands = lcons(DISABLE_DDL_PROPAGATION, ddlCommands); ddlCommands = list_make3(DISABLE_DDL_PROPAGATION, ddlCommands, ENABLE_DDL_PROPAGATION);
ddlCommands = lappend(ddlCommands, ENABLE_DDL_PROPAGATION);
/* send commands to new workers*/ /* send commands to new workers*/
SendCommandListToWorkerOutsideTransaction(newWorkerNode->workerName, SendMetadataCommandListToWorkerInCoordinatedTransaction(newWorkerNode->workerName,
newWorkerNode->workerPort, newWorkerNode->workerPort,
CitusExtensionOwnerName(), CitusExtensionOwnerName(),
ddlCommands); ddlCommands);
@ -1070,6 +1132,12 @@ ActivateNode(char *nodeName, int nodePort)
{ {
bool isActive = true; bool isActive = true;
CheckCitusVersion(ERROR);
EnsureCoordinator();
EnsureModificationsCanRun();
EnsureSequentialModeMetadataOperations();
/* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */ /* take an exclusive lock on pg_dist_node to serialize pg_dist_node changes */
LockRelationOid(DistNodeRelationId(), ExclusiveLock); LockRelationOid(DistNodeRelationId(), ExclusiveLock);
@ -1099,9 +1167,10 @@ ActivateNode(char *nodeName, int nodePort)
BoolGetDatum(isActive)); BoolGetDatum(isActive));
} }
ClearDistributedTablesOnNode(workerNode); UseCoordinatedTransaction();
ClearDistributedObjectsWithMetadataFromNode(workerNode);
SetUpDistributedTableWithDependencies(workerNode); SetUpDistributedTableWithDependencies(workerNode);
SetUpSequenceDependencies(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode);
SetUpObjectMetadata(workerNode); SetUpObjectMetadata(workerNode);
@ -1118,8 +1187,6 @@ ActivateNode(char *nodeName, int nodePort)
} }
/* /*
* DetachPartitionCommandList returns list of DETACH commands to detach partitions * DetachPartitionCommandList returns list of DETACH commands to detach partitions
* of all distributed tables. This function is used for detaching partitions in MX * of all distributed tables. This function is used for detaching partitions in MX

View File

@ -29,6 +29,7 @@ typedef enum
/* Functions declarations for metadata syncing */ /* Functions declarations for metadata syncing */
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort); extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
extern void EnsureSequentialModeMetadataOperations(void);
extern bool ClusterHasKnownMetadataWorkers(void); extern bool ClusterHasKnownMetadataWorkers(void);
extern bool ShouldSyncTableMetadata(Oid relationId); extern bool ShouldSyncTableMetadata(Oid relationId);
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId); extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
@ -51,6 +52,7 @@ extern char * CreateSchemaDDLCommand(Oid schemaId);
extern List * GrantOnSchemaDDLCommands(Oid schemaId); extern List * GrantOnSchemaDDLCommands(Oid schemaId);
extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState, extern char * PlacementUpsertCommand(uint64 shardId, uint64 placementId, int shardState,
uint64 shardLength, int32 groupId); uint64 shardLength, int32 groupId);
extern char * TruncateTriggerCreateCommand(Oid relationId);
extern void CreateShellTableOnWorkers(Oid relationId); extern void CreateShellTableOnWorkers(Oid relationId);
extern void CreateTableMetadataOnWorkers(Oid relationId); extern void CreateTableMetadataOnWorkers(Oid relationId);
extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner); extern BackgroundWorkerHandle * SpawnSyncMetadataToNodes(Oid database, Oid owner);