From 3760fa66701f589223f62fdce79098b37c8879ab Mon Sep 17 00:00:00 2001 From: Onur Tirtir Date: Sun, 5 Jun 2022 03:13:03 +0300 Subject: [PATCH] Move tuples --- .../distributed/commands/move_tuples.c | 285 ++++++++++++++++++ 1 file changed, 285 insertions(+) create mode 100644 src/backend/distributed/commands/move_tuples.c diff --git a/src/backend/distributed/commands/move_tuples.c b/src/backend/distributed/commands/move_tuples.c new file mode 100644 index 000000000..2fc9da34a --- /dev/null +++ b/src/backend/distributed/commands/move_tuples.c @@ -0,0 +1,285 @@ + +#include "postgres.h" + +#include "access/htup_details.h" +#include "access/multixact.h" +#include "access/table.h" +#include "access/tableam.h" +#include "catalog/catalog.h" +#include "catalog/heap.h" +#include "catalog/indexing.h" +#include "catalog/pg_class.h" +#include "catalog/storage.h" +#include "storage/relfilenode.h" +#include "utils/rel.h" +#include "utils/syscache.h" +#include "utils/relcache.h" + +#include "distributed/listutils.h" + + +PG_FUNCTION_INFO_V1(move_tuples); + + +static void +CopyAttributes(Relation sourceRelation, Relation targetRelation) +{ + /* + * 1- Modify non-dropped attr's attrnums to match the source rel. + */ + int targetNewDroppedAttrArrayLen = 0; + Form_pg_attribute *targetNewDroppedAttrArray = + palloc0(sizeof(Form_pg_attribute) * + RelationGetNumberOfAttributes(sourceRelation)); + + List *targetNewNonDroppedAttrTupleList = NIL; + + for (int i = 0; i < RelationGetNumberOfAttributes(sourceRelation); i++) + { + Form_pg_attribute sourceAttr = + TupleDescAttr(RelationGetDescr(sourceRelation), i); + + if (sourceAttr->attisdropped) + { + Form_pg_attribute targetNewDroppedAttr = palloc0(sizeof(FormData_pg_attribute)); + *targetNewDroppedAttr = *((FormData_pg_attribute *) sourceAttr); + + targetNewDroppedAttr->attrelid = RelationGetRelid(targetRelation); + targetNewDroppedAttrArray[targetNewDroppedAttrArrayLen++] = targetNewDroppedAttr; + } + else + { + HeapTuple targetAttrTuple = SearchSysCache2( + ATTNAME, + ObjectIdGetDatum(RelationGetRelid(targetRelation)), + PointerGetDatum(sourceAttr->attname.data)); + + if (!HeapTupleIsValid(targetAttrTuple)) + { + ereport(ERROR, (errmsg("no matching column"))); + } + + Form_pg_attribute targetAttr= (Form_pg_attribute) GETSTRUCT(targetAttrTuple); + targetAttr->attnum = sourceAttr->attnum; + + targetNewNonDroppedAttrTupleList = lappend(targetNewNonDroppedAttrTupleList, + heap_copytuple(targetAttrTuple)); + + ReleaseSysCache(targetAttrTuple); + } + } + + CommandCounterIncrement(); + + Relation pgAttr = table_open(AttributeRelationId, RowExclusiveLock); + + HeapTuple updateTuple = NULL; + foreach_ptr(updateTuple, targetNewNonDroppedAttrTupleList) + { + CatalogTupleUpdate(pgAttr, &updateTuple->t_self, updateTuple); + } + + table_close(pgAttr, RowExclusiveLock); + + /* + * 2- Update natts of target rel. + */ + Relation pgClass = table_open(RelationRelationId, RowExclusiveLock); + + HeapTuple targetRelTup = SearchSysCache1( + RELOID, + ObjectIdGetDatum(RelationGetRelid(targetRelation))); + if (!HeapTupleIsValid(targetRelTup)) + { + ereport(ERROR, + (errmsg("cache lookup failed for relation with OID %u", + RelationGetRelid(targetRelation)))); + } + + Form_pg_class classForm = (Form_pg_class) GETSTRUCT(targetRelTup); + classForm->relnatts = RelationGetNumberOfAttributes(sourceRelation); + + CatalogTupleUpdate(pgClass, &targetRelTup->t_self, targetRelTup); + + ReleaseSysCache(targetRelTup); + + table_close(pgClass, RowExclusiveLock); + + /* + * 3- Drop target rel's dropped attrs. + */ + pgAttr = table_open(AttributeRelationId, RowExclusiveLock); + + for (int i = 0; i < RelationGetNumberOfAttributes(targetRelation); i++) + { + Form_pg_attribute targetAttr = + TupleDescAttr(RelationGetDescr(targetRelation), i); + + if (!targetAttr->attisdropped) + { + continue; + } + + Form_pg_attribute targetDroppedAttr = palloc0(sizeof(FormData_pg_attribute)); + *targetDroppedAttr = *((FormData_pg_attribute *) targetAttr); + + HeapTuple targetDroppedAttrTuple = SearchSysCache2( + ATTNUM, + ObjectIdGetDatum(RelationGetRelid(targetRelation)), + PointerGetDatum(targetDroppedAttr->attnum)); + + if (!HeapTupleIsValid(targetDroppedAttrTuple)) + { + ereport(ERROR, (errmsg("cache lookup failed for attnum %d of " + "relation with OID %u", + targetDroppedAttr->attnum, + RelationGetRelid(targetRelation)))); + } + + CatalogTupleDelete(pgAttr, &targetDroppedAttrTuple->t_self); + + ReleaseSysCache(targetDroppedAttrTuple); + } + + table_close(pgAttr, RowExclusiveLock); + + /* + * 4- Transfer dropped attrs. + */ + pgAttr = table_open(AttributeRelationId, RowExclusiveLock); + + CatalogIndexState pgAttrIndexState = CatalogOpenIndexes(pgAttr); + + InsertPgAttributeTuples( + pgAttr, + CreateTupleDesc(targetNewDroppedAttrArrayLen, targetNewDroppedAttrArray), + RelationGetRelid(targetRelation), + NULL, + pgAttrIndexState); + + CatalogCloseIndexes(pgAttrIndexState); + + table_close(pgAttr, RowExclusiveLock); +} + + +static void +RelationSetRelNode(Oid relationId, Oid relfileNodeSpcNode, Oid relfileNodeRelNode) +{ + Relation pgClass = table_open(RelationRelationId, RowExclusiveLock); + HeapTuple pgClassTuple = SearchSysCacheCopy1(RELOID, + ObjectIdGetDatum(relationId)); + if (!HeapTupleIsValid(pgClassTuple)) + { + ereport(ERROR, (errmsg("cache lookup failed for relation %u", + relationId))); + } + + Form_pg_class pgClassForm = (Form_pg_class) GETSTRUCT(pgClassTuple); + + pgClassForm->reltablespace = relfileNodeSpcNode; + pgClassForm->relfilenode = relfileNodeRelNode; + + CatalogTupleUpdate(pgClass, &pgClassTuple->t_self, pgClassTuple); + + heap_freetuple(pgClassTuple); + table_close(pgClass, RowExclusiveLock); +} + + +/* + * Move tuples of one rel to another. + * + * TODO: + * - How to WAL-log this operation ? + * - Doesn't handle indexes, maybe simply execute reindex for both rels ? + * - Need to check all non-dropped attributes are same before CopyAttribute(). + * - Constraints / indexes etc would still refer to old attrNumbers, need to handle this. + * - Need to handle partitioned tables maybe ? + * +CREATE FUNCTION move_tuples(source_table regclass, + target_table regclass) + RETURNS void + LANGUAGE C STRICT + AS 'citus', $$move_tuples$$; +*/ +Datum +move_tuples(PG_FUNCTION_ARGS) +{ + /* + * 1- Lock rels and verify they exist. + */ + Oid sourceRelationId = PG_GETARG_OID(0); + Relation sourceRelation = table_open(sourceRelationId, AccessExclusiveLock); + + Oid targetRelationId = PG_GETARG_OID(1); + Relation targetRelation = table_open(targetRelationId, AccessExclusiveLock); + + /* + * 2- Verify that they're suitable for the "move" operation. + */ + if (sourceRelation->rd_rel->relkind != RELKIND_RELATION || + targetRelation->rd_rel->relkind != RELKIND_RELATION) + { + ereport(ERROR, (errmsg("both must be relation"))); + } + + if (sourceRelation->rd_rel->relpersistence != + targetRelation->rd_rel->relpersistence) + { + ereport(ERROR, (errmsg("relations must have same persistency"))); + } + + if (sourceRelation->rd_rel->relam != targetRelation->rd_rel->relam) + { + ereport(ERROR, (errmsg("relations must be of same tableAM"))); + } + + /* + * TODO: Reconsider if we need to compare some other fields of two rels' + * Relation objects. + */ + + /* + * 3- Move the ownership of relfilenode of source rel to target and + * drop the old storage of target rel. + */ + RelationDropStorage(targetRelation); + + RelFileNode sourceOldRelfilenode = sourceRelation->rd_node; + RelationSetRelNode(targetRelationId, sourceOldRelfilenode.spcNode, + sourceOldRelfilenode.relNode); + + CommandCounterIncrement(); + RelationAssumeNewRelfilenode(targetRelation); + + CopyAttributes(sourceRelation, targetRelation); + + /* + * 4- Create a new relfilenode and a new storage for the source rel. + */ + RelFileNode sourceNewRelfilenode = sourceOldRelfilenode; + sourceNewRelfilenode.relNode = GetNewRelFileNode( + sourceRelation->rd_rel->reltablespace, NULL, + sourceRelation->rd_rel->relpersistence); + + TransactionId dummyXactId = InvalidTransactionId; + MultiXactId dummyMXactId = InvalidMultiXactId; + table_relation_set_new_filenode(sourceRelation, &sourceNewRelfilenode, + sourceRelation->rd_rel->relpersistence, + &dummyXactId, &dummyMXactId); + + RelationSetRelNode(sourceRelationId, sourceOldRelfilenode.spcNode, + sourceNewRelfilenode.relNode); + + CommandCounterIncrement(); + RelationAssumeNewRelfilenode(sourceRelation); + + /* + * 5- Unlock the rels, we're done. + */ + table_close(targetRelation, AccessExclusiveLock); + table_close(sourceRelation, AccessExclusiveLock); + + PG_RETURN_VOID(); +}