mirror of https://github.com/citusdata/citus.git
PR #6728 / commit - 4
Add new metadata sync methods which uses MemorySyncContext api so that during the sync we can - free memory to prevent OOM, - use either transactional or nontransactional modes according to the GUC .pull/6728/head
parent
8feb8c634a
commit
29ef9117e6
|
@ -29,13 +29,10 @@
|
|||
#include "storage/lmgr.h"
|
||||
#include "utils/lsyscache.h"
|
||||
|
||||
typedef bool (*AddressPredicate)(const ObjectAddress *);
|
||||
|
||||
static void EnsureDependenciesCanBeDistributed(const ObjectAddress *relationAddress);
|
||||
static void ErrorIfCircularDependencyExists(const ObjectAddress *objectAddress);
|
||||
static int ObjectAddressComparator(const void *a, const void *b);
|
||||
static List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
||||
AddressPredicate predicate);
|
||||
static void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
|
||||
static List * GetDependencyCreateDDLCommands(const ObjectAddress *dependency);
|
||||
static bool ShouldPropagateObject(const ObjectAddress *address);
|
||||
|
@ -749,7 +746,7 @@ ShouldPropagateAnyObject(List *addresses)
|
|||
* FilterObjectAddressListByPredicate takes a list of ObjectAddress *'s and returns a list
|
||||
* only containing the ObjectAddress *'s for which the predicate returned true.
|
||||
*/
|
||||
static List *
|
||||
List *
|
||||
FilterObjectAddressListByPredicate(List *objectAddressList, AddressPredicate predicate)
|
||||
{
|
||||
List *result = NIL;
|
||||
|
|
|
@ -613,6 +613,25 @@ ShouldSyncTableMetadataViaCatalog(Oid relationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* FetchRelationIdFromPgPartitionHeapTuple returns relation id from given heap tuple.
|
||||
*/
|
||||
Oid
|
||||
FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple, TupleDesc tupleDesc)
|
||||
{
|
||||
Assert(heapTuple->t_tableOid == DistPartitionRelationId());
|
||||
|
||||
bool isNullArray[Natts_pg_dist_partition];
|
||||
Datum datumArray[Natts_pg_dist_partition];
|
||||
heap_deform_tuple(heapTuple, tupleDesc, datumArray, isNullArray);
|
||||
|
||||
Datum relationIdDatum = datumArray[Anum_pg_dist_partition_logicalrelid - 1];
|
||||
Oid relationId = DatumGetObjectId(relationIdDatum);
|
||||
|
||||
return relationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSyncTableMetadataInternal decides whether we should sync the metadata for a table
|
||||
* based on whether it is a hash distributed table, or a citus table with no distribution
|
||||
|
@ -4218,36 +4237,7 @@ EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context)
|
|||
bareConnectionList = lappend(bareConnectionList, connection);
|
||||
}
|
||||
|
||||
context->activatedWorkerConnections = bareConnectionList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* EstablishAndSetMetadataSyncCoordinatedConnections establishes and sets
|
||||
* connections used throughout transactional metadata sync.
|
||||
*/
|
||||
void
|
||||
EstablishAndSetMetadataSyncCoordinatedConnections(MetadataSyncContext *context)
|
||||
{
|
||||
Assert(MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL);
|
||||
|
||||
int connectionFlags = REQUIRE_METADATA_CONNECTION;
|
||||
|
||||
/* establish coordinated connections to activated worker nodes */
|
||||
List *coordinatedConnectionList = NIL;
|
||||
WorkerNode *node = NULL;
|
||||
foreach_ptr(node, context->activatedWorkerNodeList)
|
||||
{
|
||||
MultiConnection *connection =
|
||||
StartNodeConnection(connectionFlags, node->workerName, node->workerPort);
|
||||
|
||||
MarkRemoteTransactionCritical(connection);
|
||||
|
||||
Assert(connection != NULL);
|
||||
coordinatedConnectionList = lappend(coordinatedConnectionList, connection);
|
||||
}
|
||||
|
||||
context->activatedWorkerConnections = coordinatedConnectionList;
|
||||
context->activatedWorkerBareConnections = bareConnectionList;
|
||||
}
|
||||
|
||||
|
||||
|
@ -4277,12 +4267,11 @@ CreateMetadataSyncContext(List *nodeList, bool collectCommands)
|
|||
/* filter the nodes that needs to be activated from given node list */
|
||||
SetMetadataSyncNodesFromNodeList(metadataSyncContext, nodeList);
|
||||
|
||||
/* establish connections */
|
||||
if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_TRANSACTIONAL)
|
||||
{
|
||||
EstablishAndSetMetadataSyncCoordinatedConnections(metadataSyncContext);
|
||||
}
|
||||
else if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
|
||||
/*
|
||||
* establish connections only for nontransactional mode to prevent connection
|
||||
* open-close for each command
|
||||
*/
|
||||
if (!collectCommands && MetadataSyncTransMode == METADATA_SYNC_NON_TRANSACTIONAL)
|
||||
{
|
||||
EstablishAndSetMetadataSyncBareConnections(metadataSyncContext);
|
||||
}
|
||||
|
@ -4363,8 +4352,7 @@ SendOrCollectCommandListToActivatedNodes(MetadataSyncContext *context, List *com
|
|||
*/
|
||||
if (MetadataSyncCollectsCommands(context))
|
||||
{
|
||||
context->collectedCommands = list_concat(context->collectedCommands,
|
||||
commands);
|
||||
context->collectedCommands = list_concat(context->collectedCommands, commands);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -4406,8 +4394,7 @@ SendOrCollectCommandListToMetadataNodes(MetadataSyncContext *context, List *comm
|
|||
*/
|
||||
if (MetadataSyncCollectsCommands(context))
|
||||
{
|
||||
context->collectedCommands = list_concat(context->collectedCommands,
|
||||
commands);
|
||||
context->collectedCommands = list_concat(context->collectedCommands, commands);
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -4450,18 +4437,13 @@ SendOrCollectCommandListToSingleNode(MetadataSyncContext *context, List *command
|
|||
*/
|
||||
if (MetadataSyncCollectsCommands(context))
|
||||
{
|
||||
context->collectedCommands = list_concat(context->collectedCommands,
|
||||
commands);
|
||||
context->collectedCommands = list_concat(context->collectedCommands, commands);
|
||||
return;
|
||||
}
|
||||
|
||||
/* send commands to new workers, the current user should be a superuser */
|
||||
Assert(superuser());
|
||||
|
||||
List *workerConnections = context->activatedWorkerConnections;
|
||||
Assert(nodeIdx < list_length(workerConnections));
|
||||
MultiConnection *workerConnection = list_nth(workerConnections, nodeIdx);
|
||||
|
||||
if (context->transactionMode == METADATA_SYNC_TRANSACTIONAL)
|
||||
{
|
||||
List *workerNodes = context->activatedWorkerNodeList;
|
||||
|
@ -4502,3 +4484,523 @@ WorkerDropAllShellTablesCommand(bool singleTransaction)
|
|||
singleTransactionString);
|
||||
return removeAllShellTablesCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* PropagateNodeWideObjectsCommandList is called during node activation to
|
||||
* propagate any object that should be propagated for every node. These are
|
||||
* generally not linked to any distributed object but change system wide behaviour.
|
||||
*/
|
||||
static List *
|
||||
PropagateNodeWideObjectsCommandList(void)
|
||||
{
|
||||
/* collect all commands */
|
||||
List *ddlCommands = NIL;
|
||||
|
||||
if (EnableAlterRoleSetPropagation)
|
||||
{
|
||||
/*
|
||||
* Get commands for database and postgres wide settings. Since these settings are not
|
||||
* linked to any role that can be distributed we need to distribute them seperately
|
||||
*/
|
||||
List *alterRoleSetCommands = GenerateAlterRoleSetCommandForRole(InvalidOid);
|
||||
ddlCommands = list_concat(ddlCommands, alterRoleSetCommands);
|
||||
}
|
||||
|
||||
return ddlCommands;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncDistributedObjects sync the distributed objects to the nodes in metadataSyncContext
|
||||
* with transactional or nontransactional mode according to transactionMode inside
|
||||
* metadataSyncContext.
|
||||
*
|
||||
* Transactions should be ordered like below:
|
||||
* - Nodewide objects (only roles for now),
|
||||
* - Deletion of sequence and shell tables and metadata entries
|
||||
* - All dependencies (e.g., types, schemas, sequences) and all shell distributed
|
||||
* table and their pg_dist_xx metadata entries
|
||||
* - Inter relation between those shell tables
|
||||
*
|
||||
* Note that we do not create the distributed dependencies on the coordinator
|
||||
* since all the dependencies should be present in the coordinator already.
|
||||
*/
|
||||
void
|
||||
SyncDistributedObjects(MetadataSyncContext *context)
|
||||
{
|
||||
if (context->activatedWorkerNodeList == NIL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
EnsureSequentialModeMetadataOperations();
|
||||
|
||||
Assert(ShouldPropagate());
|
||||
|
||||
/* Send systemwide objects, only roles for now */
|
||||
SendNodeWideObjectsSyncCommands(context);
|
||||
|
||||
/*
|
||||
* Break dependencies between sequences-shell tables, then remove shell tables,
|
||||
* and metadata tables respectively.
|
||||
* We should delete shell tables before metadata entries as we look inside
|
||||
* pg_dist_partition to figure out shell tables.
|
||||
*/
|
||||
SendShellTableDeletionCommands(context);
|
||||
SendMetadataDeletionCommands(context);
|
||||
|
||||
/*
|
||||
* Commands to insert pg_dist_colocation entries.
|
||||
* Replicating dist objects and their metadata depends on this step.
|
||||
*/
|
||||
SendColocationMetadataCommands(context);
|
||||
|
||||
/*
|
||||
* Replicate all objects of the pg_dist_object to the remote node and
|
||||
* create metadata entries for Citus tables (pg_dist_shard, pg_dist_shard_placement,
|
||||
* pg_dist_partition, pg_dist_object).
|
||||
*/
|
||||
SendDependencyCreationCommands(context);
|
||||
SendDistTableMetadataCommands(context);
|
||||
SendDistObjectCommands(context);
|
||||
|
||||
/*
|
||||
* After creating each table, handle the inter table relationship between
|
||||
* those tables.
|
||||
*/
|
||||
SendInterTableRelationshipCommands(context);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendNodeWideObjectsSyncCommands sends systemwide objects to workers with
|
||||
* transactional or nontransactional mode according to transactionMode inside
|
||||
* metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendNodeWideObjectsSyncCommands(MetadataSyncContext *context)
|
||||
{
|
||||
/* propagate node wide objects. It includes only roles for now. */
|
||||
List *commandList = PropagateNodeWideObjectsCommandList();
|
||||
|
||||
if (commandList == NIL)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
commandList = lcons(DISABLE_DDL_PROPAGATION, commandList);
|
||||
commandList = lappend(commandList, ENABLE_DDL_PROPAGATION);
|
||||
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendShellTableDeletionCommands sends sequence, and shell table deletion
|
||||
* commands to workers with transactional or nontransactional mode according to
|
||||
* transactionMode inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendShellTableDeletionCommands(MetadataSyncContext *context)
|
||||
{
|
||||
/* break all sequence deps for citus tables and remove all shell tables */
|
||||
char *breakSeqDepsCommand = BREAK_CITUS_TABLE_SEQUENCE_DEPENDENCY_COMMAND;
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(breakSeqDepsCommand));
|
||||
|
||||
/* remove shell tables */
|
||||
bool singleTransaction = (context->transactionMode == METADATA_SYNC_TRANSACTIONAL);
|
||||
char *dropShellTablesCommand = WorkerDropAllShellTablesCommand(singleTransaction);
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(dropShellTablesCommand));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendMetadataDeletionCommands sends metadata entry deletion commands to workers
|
||||
* with transactional or nontransactional mode according to transactionMode inside
|
||||
* metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendMetadataDeletionCommands(MetadataSyncContext *context)
|
||||
{
|
||||
/* remove pg_dist_partition entries */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PARTITIONS));
|
||||
|
||||
/* remove pg_dist_shard entries */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_SHARDS));
|
||||
|
||||
/* remove pg_dist_placement entries */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_PLACEMENTS));
|
||||
|
||||
/* remove pg_dist_object entries */
|
||||
SendOrCollectCommandListToActivatedNodes(context,
|
||||
list_make1(DELETE_ALL_DISTRIBUTED_OBJECTS));
|
||||
|
||||
/* remove pg_dist_colocation entries */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(DELETE_ALL_COLOCATION));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendColocationMetadataCommands sends colocation metadata with transactional or
|
||||
* nontransactional mode according to transactionMode inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendColocationMetadataCommands(MetadataSyncContext *context)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 0;
|
||||
|
||||
Relation relation = table_open(DistColocationRelationId(), AccessShareLock);
|
||||
SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
|
||||
HeapTuple nextTuple = NULL;
|
||||
while (true)
|
||||
{
|
||||
ResetMetadataSyncMemoryContext(context);
|
||||
|
||||
nextTuple = systable_getnext(scanDesc);
|
||||
if (!HeapTupleIsValid(nextTuple))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
StringInfo colocationGroupCreateCommand = makeStringInfo();
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"WITH colocation_group_data (colocationid, shardcount, "
|
||||
"replicationfactor, distributioncolumntype, "
|
||||
"distributioncolumncollationname, "
|
||||
"distributioncolumncollationschema) AS (VALUES ");
|
||||
|
||||
Form_pg_dist_colocation colocationForm =
|
||||
(Form_pg_dist_colocation) GETSTRUCT(nextTuple);
|
||||
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"(%d, %d, %d, %s, ",
|
||||
colocationForm->colocationid,
|
||||
colocationForm->shardcount,
|
||||
colocationForm->replicationfactor,
|
||||
RemoteTypeIdExpression(colocationForm->distributioncolumntype));
|
||||
|
||||
/*
|
||||
* For collations, include the names in the VALUES section and then
|
||||
* join with pg_collation.
|
||||
*/
|
||||
Oid distributionColumCollation = colocationForm->distributioncolumncollation;
|
||||
if (distributionColumCollation != InvalidOid)
|
||||
{
|
||||
Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation);
|
||||
HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum);
|
||||
if (HeapTupleIsValid(collationTuple))
|
||||
{
|
||||
Form_pg_collation collationform =
|
||||
(Form_pg_collation) GETSTRUCT(collationTuple);
|
||||
char *collationName = NameStr(collationform->collname);
|
||||
char *collationSchemaName =
|
||||
get_namespace_name(collationform->collnamespace);
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"%s, %s)",
|
||||
quote_literal_cstr(collationName),
|
||||
quote_literal_cstr(collationSchemaName));
|
||||
ReleaseSysCache(collationTuple);
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"NULL, NULL)");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"NULL, NULL)");
|
||||
}
|
||||
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
") SELECT pg_catalog.citus_internal_add_colocation_metadata("
|
||||
"colocationid, shardcount, replicationfactor, "
|
||||
"distributioncolumntype, coalesce(c.oid, 0)) "
|
||||
"FROM colocation_group_data d LEFT JOIN pg_collation c "
|
||||
"ON (d.distributioncolumncollationname = c.collname "
|
||||
"AND d.distributioncolumncollationschema::regnamespace"
|
||||
" = c.collnamespace)");
|
||||
|
||||
List *commandList = list_make1(colocationGroupCreateCommand->data);
|
||||
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
systable_endscan(scanDesc);
|
||||
table_close(relation, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendDependencyCreationCommands sends dependency creation commands to workers
|
||||
* with transactional or nontransactional mode according to transactionMode
|
||||
* inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendDependencyCreationCommands(MetadataSyncContext *context)
|
||||
{
|
||||
/* disable ddl propagation */
|
||||
SendOrCollectCommandListToActivatedNodes(context,
|
||||
list_make1(DISABLE_DDL_PROPAGATION));
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
|
||||
|
||||
/* collect all dependencies in creation order and get their ddl commands */
|
||||
List *dependencies = GetDistributedObjectAddressList();
|
||||
|
||||
/*
|
||||
* Depending on changes in the environment, such as the enable_metadata_sync guc
|
||||
* there might be objects in the distributed object address list that should currently
|
||||
* not be propagated by citus as they are 'not supported'.
|
||||
*/
|
||||
dependencies = FilterObjectAddressListByPredicate(dependencies,
|
||||
&SupportedDependencyByCitus);
|
||||
|
||||
dependencies = OrderObjectAddressListInDependencyOrder(dependencies);
|
||||
|
||||
/*
|
||||
* We need to create a subcontext as we reset the context after each dependency
|
||||
* creation but we want to preserve all dependency objects at metadataSyncContext.
|
||||
*/
|
||||
MemoryContext commandsContext = AllocSetContextCreate(context->context,
|
||||
"dependency commands context",
|
||||
ALLOCSET_DEFAULT_SIZES);
|
||||
MemoryContextSwitchTo(commandsContext);
|
||||
ObjectAddress *dependency = NULL;
|
||||
foreach_ptr(dependency, dependencies)
|
||||
{
|
||||
if (!MetadataSyncCollectsCommands(context))
|
||||
{
|
||||
MemoryContextReset(commandsContext);
|
||||
}
|
||||
|
||||
if (IsAnyObjectAddressOwnedByExtension(list_make1(dependency), NULL))
|
||||
{
|
||||
/*
|
||||
* We expect extension-owned objects to be created as a result
|
||||
* of the extension being created.
|
||||
*/
|
||||
continue;
|
||||
}
|
||||
|
||||
/* dependency creation commands */
|
||||
List *ddlCommands = GetAllDependencyCreateDDLCommands(list_make1(dependency));
|
||||
SendOrCollectCommandListToActivatedNodes(context, ddlCommands);
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
if (!MetadataSyncCollectsCommands(context))
|
||||
{
|
||||
MemoryContextDelete(commandsContext);
|
||||
}
|
||||
ResetMetadataSyncMemoryContext(context);
|
||||
|
||||
/* enable ddl propagation */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendDistTableMetadataCommands sends commands related to pg_dist_shard and,
|
||||
* pg_dist_shard_placement entries to workers with transactional or nontransactional
|
||||
* mode according to transactionMode inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendDistTableMetadataCommands(MetadataSyncContext *context)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 0;
|
||||
|
||||
Relation relation = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDesc = RelationGetDescr(relation);
|
||||
|
||||
SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
|
||||
HeapTuple nextTuple = NULL;
|
||||
while (true)
|
||||
{
|
||||
ResetMetadataSyncMemoryContext(context);
|
||||
|
||||
nextTuple = systable_getnext(scanDesc);
|
||||
if (!HeapTupleIsValid(nextTuple))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
/*
|
||||
* Create Citus table metadata commands (pg_dist_shard, pg_dist_shard_placement,
|
||||
* pg_dist_partition). Only Citus tables have shard metadata.
|
||||
*/
|
||||
Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc);
|
||||
if (!ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *commandList = CitusTableMetadataCreateCommandList(relationId);
|
||||
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
systable_endscan(scanDesc);
|
||||
table_close(relation, AccessShareLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendDistObjectCommands sends commands related to pg_dist_object entries to
|
||||
* workers with transactional or nontransactional mode according to transactionMode
|
||||
* inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendDistObjectCommands(MetadataSyncContext *context)
|
||||
{
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 0;
|
||||
|
||||
Relation relation = table_open(DistObjectRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDesc = RelationGetDescr(relation);
|
||||
|
||||
SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
|
||||
HeapTuple nextTuple = NULL;
|
||||
while (true)
|
||||
{
|
||||
ResetMetadataSyncMemoryContext(context);
|
||||
|
||||
nextTuple = systable_getnext(scanDesc);
|
||||
if (!HeapTupleIsValid(nextTuple))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
Form_pg_dist_object pg_dist_object = (Form_pg_dist_object) GETSTRUCT(nextTuple);
|
||||
|
||||
ObjectAddress *address = palloc(sizeof(ObjectAddress));
|
||||
|
||||
ObjectAddressSubSet(*address, pg_dist_object->classid, pg_dist_object->objid,
|
||||
pg_dist_object->objsubid);
|
||||
|
||||
bool distributionArgumentIndexIsNull = false;
|
||||
Datum distributionArgumentIndexDatum =
|
||||
heap_getattr(nextTuple,
|
||||
Anum_pg_dist_object_distribution_argument_index,
|
||||
tupleDesc,
|
||||
&distributionArgumentIndexIsNull);
|
||||
int32 distributionArgumentIndex = DatumGetInt32(distributionArgumentIndexDatum);
|
||||
|
||||
bool colocationIdIsNull = false;
|
||||
Datum colocationIdDatum =
|
||||
heap_getattr(nextTuple,
|
||||
Anum_pg_dist_object_colocationid,
|
||||
tupleDesc,
|
||||
&colocationIdIsNull);
|
||||
int32 colocationId = DatumGetInt32(colocationIdDatum);
|
||||
|
||||
bool forceDelegationIsNull = false;
|
||||
Datum forceDelegationDatum =
|
||||
heap_getattr(nextTuple,
|
||||
Anum_pg_dist_object_force_delegation,
|
||||
tupleDesc,
|
||||
&forceDelegationIsNull);
|
||||
bool forceDelegation = DatumGetBool(forceDelegationDatum);
|
||||
|
||||
if (distributionArgumentIndexIsNull)
|
||||
{
|
||||
distributionArgumentIndex = INVALID_DISTRIBUTION_ARGUMENT_INDEX;
|
||||
}
|
||||
|
||||
if (colocationIdIsNull)
|
||||
{
|
||||
colocationId = INVALID_COLOCATION_ID;
|
||||
}
|
||||
|
||||
if (forceDelegationIsNull)
|
||||
{
|
||||
forceDelegation = NO_FORCE_PUSHDOWN;
|
||||
}
|
||||
|
||||
char *workerMetadataUpdateCommand =
|
||||
MarkObjectsDistributedCreateCommand(list_make1(address),
|
||||
list_make1_int(distributionArgumentIndex),
|
||||
list_make1_int(colocationId),
|
||||
list_make1_int(forceDelegation));
|
||||
SendOrCollectCommandListToActivatedNodes(context,
|
||||
list_make1(workerMetadataUpdateCommand));
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
systable_endscan(scanDesc);
|
||||
relation_close(relation, NoLock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SendInterTableRelationshipCommands sends inter-table relationship commands
|
||||
* (e.g. constraints, attach partitions) to workers with transactional or
|
||||
* nontransactional mode per inter table relationship according to transactionMode
|
||||
* inside metadataSyncContext.
|
||||
*/
|
||||
void
|
||||
SendInterTableRelationshipCommands(MetadataSyncContext *context)
|
||||
{
|
||||
/* disable ddl propagation */
|
||||
SendOrCollectCommandListToActivatedNodes(context,
|
||||
list_make1(DISABLE_DDL_PROPAGATION));
|
||||
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 0;
|
||||
|
||||
Relation relation = table_open(DistPartitionRelationId(), AccessShareLock);
|
||||
TupleDesc tupleDesc = RelationGetDescr(relation);
|
||||
|
||||
SysScanDesc scanDesc = systable_beginscan(relation, InvalidOid, false, NULL,
|
||||
scanKeyCount, scanKey);
|
||||
|
||||
MemoryContext oldContext = MemoryContextSwitchTo(context->context);
|
||||
HeapTuple nextTuple = NULL;
|
||||
while (true)
|
||||
{
|
||||
ResetMetadataSyncMemoryContext(context);
|
||||
|
||||
nextTuple = systable_getnext(scanDesc);
|
||||
if (!HeapTupleIsValid(nextTuple))
|
||||
{
|
||||
break;
|
||||
}
|
||||
|
||||
Oid relationId = FetchRelationIdFromPgPartitionHeapTuple(nextTuple, tupleDesc);
|
||||
if (!ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* Skip foreign key and partition creation when the Citus table is
|
||||
* owned by an extension.
|
||||
*/
|
||||
if (IsTableOwnedByExtension(relationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
List *commandList = InterTableRelationshipOfRelationCommandList(relationId);
|
||||
SendOrCollectCommandListToActivatedNodes(context, commandList);
|
||||
}
|
||||
MemoryContextSwitchTo(oldContext);
|
||||
|
||||
systable_endscan(scanDesc);
|
||||
table_close(relation, AccessShareLock);
|
||||
|
||||
/* enable ddl propagation */
|
||||
SendOrCollectCommandListToActivatedNodes(context, list_make1(ENABLE_DDL_PROPAGATION));
|
||||
}
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
#include "distributed/errormessage.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
||||
typedef bool (*AddressPredicate)(const ObjectAddress *);
|
||||
|
||||
extern List * GetUniqueDependenciesList(List *objectAddressesList);
|
||||
extern List * GetDependenciesForObject(const ObjectAddress *target);
|
||||
extern List * GetAllSupportedDependenciesForObject(const ObjectAddress *target);
|
||||
|
@ -33,5 +35,7 @@ extern List * GetPgDependTuplesForDependingObjects(Oid targetObjectClassId,
|
|||
Oid targetObjectId);
|
||||
extern List * GetDependingViews(Oid relationId);
|
||||
extern Oid GetDependingView(Form_pg_depend pg_depend);
|
||||
extern List * FilterObjectAddressListByPredicate(List *objectAddressList,
|
||||
AddressPredicate predicate);
|
||||
|
||||
#endif /* CITUS_DEPENDENCY_H */
|
||||
|
|
|
@ -36,7 +36,7 @@ extern int MetadataSyncTransMode;
|
|||
typedef struct MetadataSyncContext
|
||||
{
|
||||
List *activatedWorkerNodeList; /* activated worker nodes */
|
||||
List *activatedWorkerConnections; /* connections to activated worker nodes */
|
||||
List *activatedWorkerBareConnections; /* bare connections to activated worker nodes */
|
||||
MemoryContext context; /* memory context for all allocations */
|
||||
MetadataSyncTransactionMode transactionMode; /* transaction mode for the sync */
|
||||
bool collectCommands; /* flag to collect commands instead of sending and resetting */
|
||||
|
@ -81,6 +81,8 @@ extern char * LocalGroupIdUpdateCommand(int32 groupId);
|
|||
extern bool ShouldSyncUserCommandForObject(ObjectAddress objectAddress);
|
||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
|
||||
extern Oid FetchRelationIdFromPgPartitionHeapTuple(HeapTuple heapTuple,
|
||||
TupleDesc tupleDesc);
|
||||
extern bool ShouldSyncSequenceMetadata(Oid relationId);
|
||||
extern List * NodeMetadataCreateCommands(void);
|
||||
extern List * DistributedObjectMetadataSyncCommandList(void);
|
||||
|
@ -140,8 +142,6 @@ extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
|||
extern MetadataSyncContext * CreateMetadataSyncContext(List *nodeList, bool testMode);
|
||||
extern void DestroyMetadataSyncContext(MetadataSyncContext *context);
|
||||
extern void EstablishAndSetMetadataSyncBareConnections(MetadataSyncContext *context);
|
||||
extern void EstablishAndSetMetadataSyncCoordinatedConnections(
|
||||
MetadataSyncContext *context);
|
||||
extern void SetMetadataSyncNodesFromNodeList(MetadataSyncContext *context,
|
||||
List *nodeList);
|
||||
extern void ResetMetadataSyncMemoryContext(MetadataSyncContext *context);
|
||||
|
@ -155,6 +155,16 @@ extern void SendOrCollectCommandListToSingleNode(MetadataSyncContext *context,
|
|||
|
||||
extern char * WorkerDropAllShellTablesCommand(bool singleTransaction);
|
||||
|
||||
extern void SyncDistributedObjects(MetadataSyncContext *context);
|
||||
extern void SendNodeWideObjectsSyncCommands(MetadataSyncContext *context);
|
||||
extern void SendShellTableDeletionCommands(MetadataSyncContext *context);
|
||||
extern void SendMetadataDeletionCommands(MetadataSyncContext *context);
|
||||
extern void SendColocationMetadataCommands(MetadataSyncContext *context);
|
||||
extern void SendDependencyCreationCommands(MetadataSyncContext *context);
|
||||
extern void SendDistTableMetadataCommands(MetadataSyncContext *context);
|
||||
extern void SendDistObjectCommands(MetadataSyncContext *context);
|
||||
extern void SendInterTableRelationshipCommands(MetadataSyncContext *context);
|
||||
|
||||
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
||||
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
|
||||
#define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard"
|
||||
|
|
|
@ -25,11 +25,11 @@ SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.p
|
|||
(19 rows)
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||
SELECT run_command_on_workers($$SELECT array_agg(worker_object) FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) worker_object FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1) worker_objects;$$) ORDER BY 1;
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57636,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57637,t,"{""(type,{post_11_upgrade.my_type},{})"",""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})""}")
|
||||
(localhost,57636,t,"{""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})"",""(type,{post_11_upgrade.my_type},{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})""}")
|
||||
(localhost,57637,t,"{""(function,\\""{post_11_upgrade,func_in_transaction_def}\\"",{})"",""(schema,{post_11_upgrade},{})"",""(table,\\""{post_11_upgrade,part_table}\\"",{})"",""(table,\\""{post_11_upgrade,sensors}\\"",{})"",""(\\""text search configuration\\"",\\""{post_11_upgrade,partial_index_test_config}\\"",{})"",""(type,{post_11_upgrade.my_type},{})"",""(view,\\""{post_11_upgrade,non_dist_upgrade_ref_view_2}\\"",{})"",""(view,\\""{post_11_upgrade,reporting_line}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test}\\"",{})"",""(view,\\""{post_11_upgrade,view_for_upgrade_test_my_type}\\"",{})""}")
|
||||
(2 rows)
|
||||
|
||||
-- Create the necessary test utility function
|
||||
|
|
|
@ -4,7 +4,7 @@ SET search_path = post_11_upgrade;
|
|||
SELECT pg_identify_object_as_address(classid, objid, objsubid) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.employees'::regclass, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.my_type_for_view'::regtype, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_table_for_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view'::regclass, 'post_11_upgrade.non_dist_upgrade_test_view_local_join'::regclass, 'post_11_upgrade.non_dist_upgrade_multiple_dist_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass, 'post_11_upgrade.v_test_1'::regclass, 'post_11_upgrade.v_test_2'::regclass, 'post_11_upgrade.owned_by_extension_table'::regclass, 'post_11_upgrade.materialized_view'::regclass, 'post_11_upgrade.owned_by_extension_view'::regclass, 'post_11_upgrade.local_type'::regtype, 'post_11_upgrade.non_dist_dist_table_for_view'::regclass, 'post_11_upgrade.depends_on_nothing_1'::regclass, 'post_11_upgrade.depends_on_nothing_2'::regclass, 'post_11_upgrade.depends_on_pg'::regclass, 'post_11_upgrade.depends_on_citus'::regclass, 'post_11_upgrade.depends_on_seq'::regclass, 'post_11_upgrade.depends_on_seq_and_no_support'::regclass) ORDER BY 1;
|
||||
|
||||
-- on all nodes
|
||||
SELECT run_command_on_workers($$SELECT array_agg(pg_identify_object_as_address(classid, objid, objsubid)) FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1;$$) ORDER BY 1;
|
||||
SELECT run_command_on_workers($$SELECT array_agg(worker_object) FROM (SELECT pg_identify_object_as_address(classid, objid, objsubid) worker_object FROM pg_catalog.pg_dist_object WHERE objid IN ('post_11_upgrade'::regnamespace, 'post_11_upgrade.part_table'::regclass, 'post_11_upgrade.sensors'::regclass, 'post_11_upgrade.func_in_transaction_def'::regproc, 'post_11_upgrade.partial_index_test_config'::regconfig, 'post_11_upgrade.my_type'::regtype, 'post_11_upgrade.view_for_upgrade_test'::regclass, 'post_11_upgrade.view_for_upgrade_test_my_type'::regclass, 'post_11_upgrade.non_dist_upgrade_ref_view_2'::regclass, 'post_11_upgrade.reporting_line'::regclass) ORDER BY 1) worker_objects;$$) ORDER BY 1;
|
||||
|
||||
-- Create the necessary test utility function
|
||||
CREATE OR REPLACE FUNCTION activate_node_snapshot()
|
||||
|
|
Loading…
Reference in New Issue