Handle sequences

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-10 17:57:19 +03:00
parent 5762bfb454
commit 14f8cd5a75
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
5 changed files with 63 additions and 78 deletions

View File

@ -430,6 +430,16 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES); DropFKeysRelationInvolvedWithTableType(relationId, INCLUDE_LOCAL_TABLES);
} }
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/* /*
* distributed tables might have dependencies on different objects, since we create * distributed tables might have dependencies on different objects, since we create
* shards for a distributed table via multiple sessions these objects will be created * shards for a distributed table via multiple sessions these objects will be created
@ -439,13 +449,9 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
ObjectAddress tableAddress = { 0 }; ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, relationId); ObjectAddressSet(tableAddress, RelationRelationId, relationId);
EnsureDependenciesExistOnAllNodes(&tableAddress); EnsureDependenciesExistOnAllNodes(&tableAddress);
/* TODO: Update owner of the sequence(?) */
/* /* TODO: Consider partitioned tables */
* For now assume that we can create table after ensuring that dependencies exist.
* Obviously it doesn't support sequences we don't care for it now.
*
* TODO: Consider partitioned tables
*/
CreateShellTableOnWorkers(relationId); CreateShellTableOnWorkers(relationId);
MarkObjectDistributed(&tableAddress); MarkObjectDistributed(&tableAddress);
@ -500,16 +506,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn, InsertIntoPgDistPartition(relationId, distributionMethod, distributionColumn,
colocationId, replicationModel, autoConverted); colocationId, replicationModel, autoConverted);
/*
* Ensure that the sequences used in column defaults of the table
* have proper types
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList, &dependentSequenceList, 0);
EnsureDistributedSequencesHaveOneType(relationId, dependentSequenceList,
attnumList);
/* foreign tables do not support TRUNCATE trigger */ /* foreign tables do not support TRUNCATE trigger */
if (RegularTable(relationId)) if (RegularTable(relationId))
{ {
@ -543,16 +539,6 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
if (ClusterHasKnownMetadataWorkers())
{
/*
* Ensure both sequence and its' dependencies and mark them as distributed
* before creating table metadata on workers
*/
MarkSequenceListDistributedAndPropagateWithDependencies(relationId,
dependentSequenceList);
}
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
} }

View File

@ -25,6 +25,7 @@
#include "distributed/worker_transaction.h" #include "distributed/worker_transaction.h"
#include "storage/lmgr.h" #include "storage/lmgr.h"
#include "utils/lsyscache.h" #include "utils/lsyscache.h"
#include "miscadmin.h"
typedef bool (*AddressPredicate)(const ObjectAddress *); typedef bool (*AddressPredicate)(const ObjectAddress *);
@ -223,27 +224,40 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
{ {
case OCLASS_CLASS: case OCLASS_CLASS:
{ {
char relKind = get_rel_relkind(dependency->objectId);
/* /*
* types have an intermediate dependency on a relation (aka class), so we do * types have an intermediate dependency on a relation (aka class), so we do
* support classes when the relkind is composite * support classes when the relkind is composite
*/ */
if (get_rel_relkind(dependency->objectId) == RELKIND_COMPOSITE_TYPE) if (relKind == RELKIND_COMPOSITE_TYPE)
{ {
return NIL; return NIL;
} }
List *commandList = NIL; if (relKind == RELKIND_RELATION)
List *tableDDLCommands = GetFullTableCreationCommands(dependency->objectId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{ {
Assert(CitusIsA(tableDDLCommand, TableDDLCommand)); List *commandList = NIL;
commandList = lappend(commandList, GetTableDDLCommand(tableDDLCommand)); List *tableDDLCommands = GetFullTableCreationCommands(
dependency->objectId,
WORKER_NEXTVAL_SEQUENCE_DEFAULTS);
TableDDLCommand *tableDDLCommand = NULL;
foreach_ptr(tableDDLCommand, tableDDLCommands)
{
Assert(CitusIsA(tableDDLCommand, TableDDLCommand));
commandList = lappend(commandList, GetTableDDLCommand(
tableDDLCommand));
}
return commandList;
} }
return commandList; if (relKind == RELKIND_SEQUENCE)
{
char *userName = GetUserNameFromId(GetUserId(), false);
return DDLCommandsForSequence(dependency->objectId, userName);
}
} }
case OCLASS_COLLATION: case OCLASS_COLLATION:

View File

@ -37,6 +37,7 @@
#include "distributed/metadata/dependency.h" #include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h" #include "distributed/metadata/distobject.h"
#include "distributed/metadata_cache.h" #include "distributed/metadata_cache.h"
#include "distributed/metadata_sync.h"
#include "distributed/version_compat.h" #include "distributed/version_compat.h"
#include "miscadmin.h" #include "miscadmin.h"
#include "utils/fmgroids.h" #include "utils/fmgroids.h"
@ -675,7 +676,8 @@ SupportedDependencyByCitus(const ObjectAddress *address)
* for tables. * for tables.
*/ */
if (relKind == RELKIND_COMPOSITE_TYPE || if (relKind == RELKIND_COMPOSITE_TYPE ||
relKind == RELKIND_RELATION) relKind == RELKIND_RELATION ||
relKind == RELKIND_SEQUENCE)
{ {
return true; return true;
} }
@ -983,6 +985,27 @@ ExpandCitusSupportedTypes(ObjectAddressCollector *collector, ObjectAddress targe
List *statisticsSchemaDependencyList = List *statisticsSchemaDependencyList =
GetRelationStatsSchemaDependencyList(relationId); GetRelationStatsSchemaDependencyList(relationId);
result = list_concat(result, statisticsSchemaDependencyList); result = list_concat(result, statisticsSchemaDependencyList);
/*
* Add the dependent sequences for the relations
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
List *sequenceDependencyList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
ListCell *dependentSequenceCell = NULL;
foreach(dependentSequenceCell, dependentSequenceList)
{
Oid sequenceOid = lfirst_oid(dependentSequenceCell);
DependencyDefinition *dependency = CreateObjectAddressDependencyDef(
RelationRelationId, sequenceOid);
sequenceDependencyList = lappend(sequenceDependencyList, dependency);
}
result = list_concat(result, sequenceDependencyList);
} }
default: default:

View File

@ -763,9 +763,6 @@ MetadataDropCommands(void)
dropSnapshotCommandList = list_concat(dropSnapshotCommandList, dropSnapshotCommandList = list_concat(dropSnapshotCommandList,
detachPartitionCommandList); detachPartitionCommandList);
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES); dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
dropSnapshotCommandList = lappend(dropSnapshotCommandList, dropSnapshotCommandList = lappend(dropSnapshotCommandList,
DELETE_ALL_DISTRIBUTED_OBJECTS); DELETE_ALL_DISTRIBUTED_OBJECTS);

View File

@ -106,7 +106,7 @@ static void InsertPlaceholderCoordinatorRecord(void);
static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata static void InsertNodeRow(int nodeid, char *nodename, int32 nodeport, NodeMetadata
*nodeMetadata); *nodeMetadata);
static void DeleteNodeRow(char *nodename, int32 nodeport); static void DeleteNodeRow(char *nodename, int32 nodeport);
static void SetUpSequences(WorkerNode *workerNode); static void SetUpSequenceDependencies(WorkerNode *workerNode);
static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode); static void SetUpDistributedTableWithDependencies(WorkerNode *workerNode);
static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode); static void SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode);
static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple); static WorkerNode * TupleToWorkerNode(TupleDesc tupleDescriptor, HeapTuple heapTuple);
@ -649,7 +649,7 @@ SetUpMultipleDistributedTableIntegrations(WorkerNode *workerNode)
static void static void
SetUpSequences(WorkerNode *workerNode) SetUpSequenceDependencies(WorkerNode *workerNode)
{ {
List *distributedTableList = CitusTableList(); List *distributedTableList = CitusTableList();
List *propagatedTableList = NIL; List *propagatedTableList = NIL;
@ -675,41 +675,6 @@ SetUpSequences(WorkerNode *workerNode)
continue; continue;
} }
/*
* Set object propagation to off as objects will be distributed while syncing
* the metadata.
*/
bool prevDependencyCreationValue = EnableDependencyCreation;
SetLocalEnableDependencyCreation(false);
/*
* Ensure sequence dependencies and mark them as distributed
*/
List *attnumList = NIL;
List *dependentSequenceList = NIL;
GetDependentSequencesWithRelation(relationId, &attnumList,
&dependentSequenceList, 0);
Oid sequenceOid = InvalidOid;
foreach_oid(sequenceOid, dependentSequenceList)
{
ObjectAddress sequenceAddress = { 0 };
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
/*
* Sequences are not marked as distributed while creating table
* if no metadata worker node exists. We are marking all sequences
* distributed while syncing metadata in such case.
*/
MarkObjectDistributed(&sequenceAddress);
}
SetLocalEnableDependencyCreation(prevDependencyCreationValue);
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
sequenceCommandList = list_concat(sequenceCommandList, workerSequenceDDLCommands);
List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId); List *sequenceDependencyCommandList = SequenceDependencyCommandList(relationId);
sequenceCommandList = list_concat(sequenceCommandList, sequenceCommandList = list_concat(sequenceCommandList,
sequenceDependencyCommandList); sequenceDependencyCommandList);
@ -1048,8 +1013,8 @@ ActivateNode(char *nodeName, int nodePort)
BoolGetDatum(isActive)); BoolGetDatum(isActive));
} }
SetUpSequences(workerNode);
SetUpDistributedTableWithDependencies(workerNode); SetUpDistributedTableWithDependencies(workerNode);
SetUpSequenceDependencies(workerNode);
SetUpMultipleDistributedTableIntegrations(workerNode); SetUpMultipleDistributedTableIntegrations(workerNode);
if (syncMetadata) if (syncMetadata)