mirror of https://github.com/citusdata/citus.git
Merge branch 'master' into issue4237
commit
0bb2c991f9
|
@ -102,7 +102,7 @@ jobs:
|
|||
image_tag:
|
||||
description: 'docker image tag to use'
|
||||
type: string
|
||||
default: latest
|
||||
default: 11-12-13
|
||||
docker:
|
||||
- image: '<< parameters.image >>:<< parameters.image_tag >>'
|
||||
working_directory: /home/circleci/project
|
||||
|
@ -537,14 +537,14 @@ workflows:
|
|||
name: 'test-11-12_check-pg-upgrade'
|
||||
old_pg_major: 11
|
||||
new_pg_major: 12
|
||||
image_tag: latest
|
||||
image_tag: 11-12-13
|
||||
requires: [build-11,build-12]
|
||||
|
||||
- test-pg-upgrade:
|
||||
name: 'test-12-13_check-pg-upgrade'
|
||||
old_pg_major: 12
|
||||
new_pg_major: 13
|
||||
image_tag: latest
|
||||
image_tag: 11-12-13
|
||||
requires: [build-12,build-13]
|
||||
|
||||
- test-citus-upgrade:
|
||||
|
|
|
@ -64,12 +64,11 @@ static void ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
|
|||
Var *referencedDistColumn,
|
||||
int *referencingAttrIndex,
|
||||
int *referencedAttrIndex);
|
||||
static List * GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
|
||||
int searchForeignKeyColumnFlags);
|
||||
static List * GetForeignConstraintCommandsInternal(Oid relationId, int flags);
|
||||
static Oid get_relation_constraint_oid_compat(HeapTuple heapTuple);
|
||||
static List * GetForeignKeyOidsToCitusLocalTables(Oid relationId);
|
||||
static List * GetForeignKeyOidsToReferenceTables(Oid relationId);
|
||||
static List * FilterFKeyOidListByReferencedTableType(List *foreignKeyOidList,
|
||||
CitusTableType citusTableType);
|
||||
static bool IsTableTypeIncluded(Oid relationId, int flags);
|
||||
|
||||
/*
|
||||
* ConstraintIsAForeignKeyToReferenceTable checks if the given constraint is a
|
||||
|
@ -78,7 +77,8 @@ static List * FilterFKeyOidListByReferencedTableType(List *foreignKeyOidList,
|
|||
bool
|
||||
ConstraintIsAForeignKeyToReferenceTable(char *inputConstaintName, Oid relationId)
|
||||
{
|
||||
List *foreignKeyOids = GetForeignKeyOidsToReferenceTables(relationId);
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
Oid foreignKeyOid = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
|
||||
|
||||
|
@ -120,7 +120,7 @@ ErrorIfUnsupportedForeignConstraintExists(Relation relation, char referencingDis
|
|||
{
|
||||
Oid referencingTableId = relation->rd_id;
|
||||
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(referencingTableId, flags);
|
||||
|
||||
Oid foreignKeyOid = InvalidOid;
|
||||
|
@ -490,9 +490,45 @@ ForeignConstraintFindDistKeys(HeapTuple pgConstraintTuple,
|
|||
bool
|
||||
ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
||||
{
|
||||
int searchForeignKeyColumnFlags = SEARCH_REFERENCING_RELATION |
|
||||
SEARCH_REFERENCED_RELATION;
|
||||
List *foreignKeyIdsColumnAppeared =
|
||||
GetForeignKeyIdsForColumn(columnName, relationId, searchForeignKeyColumnFlags);
|
||||
|
||||
Oid foreignKeyId = InvalidOid;
|
||||
foreach_oid(foreignKeyId, foreignKeyIdsColumnAppeared)
|
||||
{
|
||||
Oid referencedTableId = GetReferencedTableId(foreignKeyId);
|
||||
if (IsCitusTableType(referencedTableId, REFERENCE_TABLE))
|
||||
{
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignKeyIdsForColumn takes columnName and relationId for the owning
|
||||
* relation, and returns a list of OIDs for foreign constraints that the column
|
||||
* with columnName is involved according to "searchForeignKeyColumnFlags" argument.
|
||||
* See SearchForeignKeyColumnFlags enum definition for usage.
|
||||
*/
|
||||
static List *
|
||||
GetForeignKeyIdsForColumn(char *columnName, Oid relationId,
|
||||
int searchForeignKeyColumnFlags)
|
||||
{
|
||||
bool searchReferencing = searchForeignKeyColumnFlags & SEARCH_REFERENCING_RELATION;
|
||||
bool searchReferenced = searchForeignKeyColumnFlags & SEARCH_REFERENCED_RELATION;
|
||||
|
||||
/* at least one of them should be true */
|
||||
Assert(searchReferencing || searchReferenced);
|
||||
|
||||
List *foreignKeyIdsColumnAppeared = NIL;
|
||||
|
||||
ScanKeyData scanKey[1];
|
||||
int scanKeyCount = 1;
|
||||
bool foreignKeyToReferenceTableIncludesGivenColumn = false;
|
||||
|
||||
Relation pgConstraint = table_open(ConstraintRelationId, AccessShareLock);
|
||||
|
||||
|
@ -511,11 +547,11 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
|||
Oid referencedTableId = constraintForm->confrelid;
|
||||
Oid referencingTableId = constraintForm->conrelid;
|
||||
|
||||
if (referencedTableId == relationId)
|
||||
if (referencedTableId == relationId && searchReferenced)
|
||||
{
|
||||
pgConstraintKey = Anum_pg_constraint_confkey;
|
||||
}
|
||||
else if (referencingTableId == relationId)
|
||||
else if (referencingTableId == relationId && searchReferencing)
|
||||
{
|
||||
pgConstraintKey = Anum_pg_constraint_conkey;
|
||||
}
|
||||
|
@ -529,22 +565,12 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
|||
continue;
|
||||
}
|
||||
|
||||
/*
|
||||
* We check if the referenced table is a reference table. There cannot be
|
||||
* any foreign constraint from a distributed table to a local table.
|
||||
*/
|
||||
Assert(IsCitusTable(referencedTableId));
|
||||
if (!IsCitusTableType(referencedTableId, REFERENCE_TABLE))
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (HeapTupleOfForeignConstraintIncludesColumn(heapTuple, relationId,
|
||||
pgConstraintKey, columnName))
|
||||
{
|
||||
foreignKeyToReferenceTableIncludesGivenColumn = true;
|
||||
break;
|
||||
Oid foreignKeyOid = get_relation_constraint_oid_compat(heapTuple);
|
||||
foreignKeyIdsColumnAppeared = lappend_oid(foreignKeyIdsColumnAppeared,
|
||||
foreignKeyOid);
|
||||
}
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
|
@ -554,7 +580,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
|||
systable_endscan(scanDescriptor);
|
||||
table_close(pgConstraint, NoLock);
|
||||
|
||||
return foreignKeyToReferenceTableIncludesGivenColumn;
|
||||
return foreignKeyIdsColumnAppeared;
|
||||
}
|
||||
|
||||
|
||||
|
@ -567,7 +593,7 @@ ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid relationId)
|
|||
List *
|
||||
GetReferencingForeignConstaintCommands(Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
|
||||
return GetForeignConstraintCommandsInternal(relationId, flags);
|
||||
}
|
||||
|
||||
|
@ -633,23 +659,9 @@ get_relation_constraint_oid_compat(HeapTuple heapTuple)
|
|||
bool
|
||||
HasForeignKeyToCitusLocalTable(Oid relationId)
|
||||
{
|
||||
List *foreignKeyOidList = GetForeignKeyOidsToCitusLocalTables(relationId);
|
||||
return list_length(foreignKeyOidList) > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignKeyOidsToCitusLocalTables returns list of OIDs for the foreign key
|
||||
* constraints on the given relationId that are referencing to citus local tables.
|
||||
*/
|
||||
static List *
|
||||
GetForeignKeyOidsToCitusLocalTables(Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_CITUS_LOCAL_TABLES;
|
||||
List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
|
||||
List *fKeyOidsToCitusLocalTables =
|
||||
FilterFKeyOidListByReferencedTableType(foreignKeyOidList, CITUS_LOCAL_TABLE);
|
||||
return fKeyOidsToCitusLocalTables;
|
||||
return list_length(foreignKeyOidList) > 0;
|
||||
}
|
||||
|
||||
|
||||
|
@ -661,57 +673,13 @@ GetForeignKeyOidsToCitusLocalTables(Oid relationId)
|
|||
bool
|
||||
HasForeignKeyToReferenceTable(Oid relationId)
|
||||
{
|
||||
List *foreignKeyOids = GetForeignKeyOidsToReferenceTables(relationId);
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_REFERENCE_TABLES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
return list_length(foreignKeyOids) > 0;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignKeyOidsToReferenceTables returns list of OIDs for the foreign key
|
||||
* constraints on the given relationId that are referencing to reference tables.
|
||||
*/
|
||||
static List *
|
||||
GetForeignKeyOidsToReferenceTables(Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
List *foreignKeyOidList = GetForeignKeyOids(relationId, flags);
|
||||
List *fKeyOidsToReferenceTables =
|
||||
FilterFKeyOidListByReferencedTableType(foreignKeyOidList, REFERENCE_TABLE);
|
||||
return fKeyOidsToReferenceTables;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* FilterFKeyOidListByReferencedTableType takes a list of foreign key OIDs and
|
||||
* CitusTableType to filter the foreign key OIDs that CitusTableType matches
|
||||
* referenced relation's type.
|
||||
*/
|
||||
static List *
|
||||
FilterFKeyOidListByReferencedTableType(List *foreignKeyOidList,
|
||||
CitusTableType citusTableType)
|
||||
{
|
||||
List *filteredFKeyOidList = NIL;
|
||||
|
||||
Oid foreignKeyOid = InvalidOid;
|
||||
foreach_oid(foreignKeyOid, foreignKeyOidList)
|
||||
{
|
||||
HeapTuple heapTuple = SearchSysCache1(CONSTROID, ObjectIdGetDatum(foreignKeyOid));
|
||||
Form_pg_constraint constraintForm = (Form_pg_constraint) GETSTRUCT(heapTuple);
|
||||
|
||||
Oid referencedTableOid = constraintForm->confrelid;
|
||||
if (IsCitusTableType(referencedTableOid, citusTableType))
|
||||
{
|
||||
filteredFKeyOidList = lappend_oid(filteredFKeyOidList, foreignKeyOid);
|
||||
}
|
||||
|
||||
ReleaseSysCache(heapTuple);
|
||||
}
|
||||
|
||||
return filteredFKeyOidList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TableReferenced function checks whether given table is referenced by another table
|
||||
* via foreign constraints. If it is referenced, this function returns true.
|
||||
|
@ -719,7 +687,7 @@ FilterFKeyOidListByReferencedTableType(List *foreignKeyOidList,
|
|||
bool
|
||||
TableReferenced(Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCED_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCED_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
return list_length(foreignKeyOids) > 0;
|
||||
|
@ -765,7 +733,7 @@ HeapTupleOfForeignConstraintIncludesColumn(HeapTuple heapTuple, Oid relationId,
|
|||
bool
|
||||
TableReferencing(Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
return list_length(foreignKeyOids) > 0;
|
||||
|
@ -793,7 +761,7 @@ ConstraintIsAForeignKey(char *inputConstaintName, Oid relationId)
|
|||
Oid
|
||||
GetForeignKeyOidByName(char *inputConstaintName, Oid relationId)
|
||||
{
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS;
|
||||
int flags = INCLUDE_REFERENCING_CONSTRAINTS | INCLUDE_ALL_TABLE_TYPES;
|
||||
List *foreignKeyOids = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
Oid foreignKeyId = FindForeignKeyOidWithName(foreignKeyOids, inputConstaintName);
|
||||
|
@ -834,10 +802,12 @@ FindForeignKeyOidWithName(List *foreignKeyOids, const char *inputConstraintName)
|
|||
void
|
||||
ErrorIfTableHasExternalForeignKeys(Oid relationId)
|
||||
{
|
||||
int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES);
|
||||
int flags = (INCLUDE_REFERENCING_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
|
||||
INCLUDE_ALL_TABLE_TYPES);
|
||||
List *foreignKeyIdsTableReferencing = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES);
|
||||
flags = (INCLUDE_REFERENCED_CONSTRAINTS | EXCLUDE_SELF_REFERENCES |
|
||||
INCLUDE_ALL_TABLE_TYPES);
|
||||
List *foreignKeyIdsTableReferenced = GetForeignKeyOids(relationId, flags);
|
||||
|
||||
List *foreignKeysWithOtherTables = list_concat(foreignKeyIdsTableReferencing,
|
||||
|
@ -869,10 +839,8 @@ GetForeignKeyOids(Oid relationId, int flags)
|
|||
{
|
||||
AttrNumber pgConstraintTargetAttrNumber = InvalidAttrNumber;
|
||||
|
||||
bool extractReferencing PG_USED_FOR_ASSERTS_ONLY =
|
||||
(flags & INCLUDE_REFERENCING_CONSTRAINTS);
|
||||
bool extractReferenced PG_USED_FOR_ASSERTS_ONLY =
|
||||
(flags & INCLUDE_REFERENCED_CONSTRAINTS);
|
||||
bool extractReferencing = (flags & INCLUDE_REFERENCING_CONSTRAINTS);
|
||||
bool extractReferenced = (flags & INCLUDE_REFERENCED_CONSTRAINTS);
|
||||
|
||||
/*
|
||||
* Only one of them should be passed at a time since the way we scan
|
||||
|
@ -885,14 +853,14 @@ GetForeignKeyOids(Oid relationId, int flags)
|
|||
bool useIndex = false;
|
||||
Oid indexOid = InvalidOid;
|
||||
|
||||
if (flags & INCLUDE_REFERENCING_CONSTRAINTS)
|
||||
if (extractReferencing)
|
||||
{
|
||||
pgConstraintTargetAttrNumber = Anum_pg_constraint_conrelid;
|
||||
|
||||
useIndex = true;
|
||||
indexOid = ConstraintRelidTypidNameIndexId;
|
||||
}
|
||||
else if (flags & INCLUDE_REFERENCED_CONSTRAINTS)
|
||||
else if (extractReferenced)
|
||||
{
|
||||
pgConstraintTargetAttrNumber = Anum_pg_constraint_confrelid;
|
||||
}
|
||||
|
@ -942,6 +910,22 @@ GetForeignKeyOids(Oid relationId, int flags)
|
|||
continue;
|
||||
}
|
||||
|
||||
Oid otherTableId = InvalidOid;
|
||||
if (extractReferencing)
|
||||
{
|
||||
otherTableId = constraintForm->confrelid;
|
||||
}
|
||||
else if (extractReferenced)
|
||||
{
|
||||
otherTableId = constraintForm->conrelid;
|
||||
}
|
||||
|
||||
if (!IsTableTypeIncluded(otherTableId, flags))
|
||||
{
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
continue;
|
||||
}
|
||||
|
||||
foreignKeyOids = lappend_oid(foreignKeyOids, constraintId);
|
||||
|
||||
heapTuple = systable_getnext(scanDescriptor);
|
||||
|
@ -982,3 +966,30 @@ GetReferencedTableId(Oid foreignKeyId)
|
|||
|
||||
return referencedTableId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IsTableTypeIncluded returns true if type of the table with relationId (distributed,
|
||||
* reference, Citus local or Postgres local) is included in the flags, false if not
|
||||
*/
|
||||
static bool
|
||||
IsTableTypeIncluded(Oid relationId, int flags)
|
||||
{
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
return (flags & INCLUDE_LOCAL_TABLES) != 0;
|
||||
}
|
||||
else if (IsCitusTableType(relationId, DISTRIBUTED_TABLE))
|
||||
{
|
||||
return (flags & INCLUDE_DISTRIBUTED_TABLES) != 0;
|
||||
}
|
||||
else if (IsCitusTableType(relationId, REFERENCE_TABLE))
|
||||
{
|
||||
return (flags & INCLUDE_REFERENCE_TABLES) != 0;
|
||||
}
|
||||
else if (IsCitusTableType(relationId, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
return (flags & INCLUDE_CITUS_LOCAL_TABLES) != 0;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,8 @@
|
|||
|
||||
|
||||
/* Local functions forward declarations for helper functions */
|
||||
static void ErrorIfCreateIndexHasTooManyColumns(IndexStmt *createIndexStatement);
|
||||
static int GetNumberOfIndexParameters(IndexStmt *createIndexStatement);
|
||||
static bool IndexAlreadyExists(IndexStmt *createIndexStatement);
|
||||
static Oid CreateIndexStmtGetIndexId(IndexStmt *createIndexStatement);
|
||||
static Oid CreateIndexStmtGetSchemaId(IndexStmt *createIndexStatement);
|
||||
|
@ -174,7 +176,20 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
|||
return NIL;
|
||||
}
|
||||
|
||||
ErrorIfUnsupportedIndexStmt(createIndexStatement);
|
||||
if (createIndexStatement->idxname == NULL)
|
||||
{
|
||||
/*
|
||||
* Postgres does not support indexes with over INDEX_MAX_KEYS columns
|
||||
* and we should not attempt to generate an index name for such cases.
|
||||
*/
|
||||
ErrorIfCreateIndexHasTooManyColumns(createIndexStatement);
|
||||
|
||||
/* ensure we copy string into proper context */
|
||||
MemoryContext relationContext = GetMemoryChunkContext(relationRangeVar);
|
||||
char *defaultIndexName = GenerateDefaultIndexName(createIndexStatement);
|
||||
createIndexStatement->idxname = MemoryContextStrdup(relationContext,
|
||||
defaultIndexName);
|
||||
}
|
||||
|
||||
if (IndexAlreadyExists(createIndexStatement))
|
||||
{
|
||||
|
@ -185,6 +200,8 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
|||
return NIL;
|
||||
}
|
||||
|
||||
ErrorIfUnsupportedIndexStmt(createIndexStatement);
|
||||
|
||||
/*
|
||||
* Citus has the logic to truncate the long shard names to prevent
|
||||
* various issues, including self-deadlocks. However, for partitioned
|
||||
|
@ -208,6 +225,38 @@ PreprocessIndexStmt(Node *node, const char *createIndexCommand)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ErrorIfCreateIndexHasTooManyColumns errors out if given CREATE INDEX command
|
||||
* would use more than INDEX_MAX_KEYS columns.
|
||||
*/
|
||||
static void
|
||||
ErrorIfCreateIndexHasTooManyColumns(IndexStmt *createIndexStatement)
|
||||
{
|
||||
int numberOfIndexParameters = GetNumberOfIndexParameters(createIndexStatement);
|
||||
if (numberOfIndexParameters <= INDEX_MAX_KEYS)
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
ereport(ERROR, (errcode(ERRCODE_TOO_MANY_COLUMNS),
|
||||
errmsg("cannot use more than %d columns in an index",
|
||||
INDEX_MAX_KEYS)));
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetNumberOfIndexParameters returns number of parameters to be used when
|
||||
* creating the index to be defined by given CREATE INDEX command.
|
||||
*/
|
||||
static int
|
||||
GetNumberOfIndexParameters(IndexStmt *createIndexStatement)
|
||||
{
|
||||
List *indexParams = createIndexStatement->indexParams;
|
||||
List *indexIncludingParams = createIndexStatement->indexIncludingParams;
|
||||
return list_length(indexParams) + list_length(indexIncludingParams);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* IndexAlreadyExists returns true if index to be created by given CREATE INDEX
|
||||
* command already exists.
|
||||
|
@ -1043,14 +1092,6 @@ RangeVarCallbackForReindexIndex(const RangeVar *relation, Oid relId, Oid oldRelI
|
|||
static void
|
||||
ErrorIfUnsupportedIndexStmt(IndexStmt *createIndexStatement)
|
||||
{
|
||||
char *indexRelationName = createIndexStatement->idxname;
|
||||
if (indexRelationName == NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("creating index without a name on a distributed table is "
|
||||
"currently unsupported")));
|
||||
}
|
||||
|
||||
if (createIndexStatement->tableSpace != NULL)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
-- citus--9.5-1--10.0-1
|
||||
|
||||
DROP FUNCTION pg_catalog.upgrade_to_reference_table(regclass);
|
||||
DROP FUNCTION IF EXISTS pg_catalog.citus_total_relation_size(regclass);
|
||||
|
||||
#include "udfs/citus_total_relation_size/10.0-1.sql"
|
||||
|
|
|
@ -9,3 +9,4 @@ DROP VIEW public.citus_tables;
|
|||
DROP FUNCTION pg_catalog.citus_total_relation_size(regclass,boolean);
|
||||
|
||||
#include "../udfs/citus_total_relation_size/7.0-1.sql"
|
||||
#include "../udfs/upgrade_to_reference_table/8.0-1.sql"
|
||||
|
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$upgrade_to_reference_table$$;
|
||||
COMMENT ON FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
|
||||
IS 'upgrades an existing broadcast table to a reference table';
|
|
@ -0,0 +1,6 @@
|
|||
CREATE FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
|
||||
RETURNS void
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$upgrade_to_reference_table$$;
|
||||
COMMENT ON FUNCTION pg_catalog.upgrade_to_reference_table(table_name regclass)
|
||||
IS 'upgrades an existing broadcast table to a reference table';
|
|
@ -14,13 +14,20 @@
|
|||
#include "fmgr.h"
|
||||
#include "funcapi.h"
|
||||
|
||||
#include "distributed/foreign_key_relationship.h"
|
||||
#include "distributed/listutils.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/tuplestore.h"
|
||||
#include "distributed/version_compat.h"
|
||||
|
||||
|
||||
#define GET_FKEY_CONNECTED_RELATIONS_COLUMNS 1
|
||||
|
||||
|
||||
/* these functions are only exported in the regression tests */
|
||||
PG_FUNCTION_INFO_V1(get_referencing_relation_id_list);
|
||||
PG_FUNCTION_INFO_V1(get_referenced_relation_id_list);
|
||||
PG_FUNCTION_INFO_V1(get_foreign_key_connected_relations);
|
||||
|
||||
/*
|
||||
* get_referencing_relation_id_list returns the list of table oids that is referencing
|
||||
|
@ -138,3 +145,38 @@ get_referenced_relation_id_list(PG_FUNCTION_ARGS)
|
|||
SRF_RETURN_DONE(functionContext);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* get_foreign_key_connected_relations takes a relation, and returns relations
|
||||
* that are connected to input relation via a foreign key graph.
|
||||
*/
|
||||
Datum
|
||||
get_foreign_key_connected_relations(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
TupleDesc tupleDescriptor = NULL;
|
||||
Tuplestorestate *tupleStore = SetupTuplestore(fcinfo, &tupleDescriptor);
|
||||
|
||||
Oid connectedRelationId;
|
||||
List *fkeyConnectedRelationIdList = GetForeignKeyConnectedRelationIdList(relationId);
|
||||
foreach_oid(connectedRelationId, fkeyConnectedRelationIdList)
|
||||
{
|
||||
Datum values[GET_FKEY_CONNECTED_RELATIONS_COLUMNS];
|
||||
bool nulls[GET_FKEY_CONNECTED_RELATIONS_COLUMNS];
|
||||
|
||||
memset(values, 0, sizeof(values));
|
||||
memset(nulls, false, sizeof(nulls));
|
||||
|
||||
values[0] = ObjectIdGetDatum(connectedRelationId);
|
||||
|
||||
tuplestore_putvalues(tupleStore, tupleDescriptor, values, nulls);
|
||||
}
|
||||
|
||||
tuplestore_donestoring(tupleStore);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
|
|
@ -35,6 +35,9 @@
|
|||
#include "common/hashfn.h"
|
||||
#endif
|
||||
#include "utils/memutils.h"
|
||||
#if PG_VERSION_NUM < PG_VERSION_12
|
||||
#include "utils/rel.h"
|
||||
#endif
|
||||
|
||||
|
||||
/*
|
||||
|
@ -58,7 +61,6 @@ typedef struct ForeignConstraintRelationshipGraph
|
|||
typedef struct ForeignConstraintRelationshipNode
|
||||
{
|
||||
Oid relationId;
|
||||
bool visited;
|
||||
List *adjacencyList;
|
||||
List *backAdjacencyList;
|
||||
}ForeignConstraintRelationshipNode;
|
||||
|
@ -78,13 +80,15 @@ typedef struct ForeignConstraintRelationshipEdge
|
|||
|
||||
static ForeignConstraintRelationshipGraph *fConstraintRelationshipGraph = NULL;
|
||||
|
||||
static List * GetRelationshipNodesForFKeyConnectedRelations(
|
||||
ForeignConstraintRelationshipNode *relationshipNode);
|
||||
static List * GetAllNeighboursList(ForeignConstraintRelationshipNode *relationshipNode);
|
||||
static ForeignConstraintRelationshipNode * GetRelationshipNodeForRelationId(Oid
|
||||
relationId,
|
||||
bool *isFound);
|
||||
static void CreateForeignConstraintRelationshipGraph(void);
|
||||
static List * GetNeighbourList(ForeignConstraintRelationshipNode *relationshipNode,
|
||||
bool isReferencing);
|
||||
static void SetRelationshipNodeListNotVisited(List *relationshipNodeList);
|
||||
static List * GetRelationIdsFromRelationshipNodeList(List *fKeyRelationshipNodeList);
|
||||
static void PopulateAdjacencyLists(void);
|
||||
static int CompareForeignConstraintRelationshipEdges(const void *leftElement,
|
||||
|
@ -92,12 +96,116 @@ static int CompareForeignConstraintRelationshipEdges(const void *leftElement,
|
|||
static void AddForeignConstraintRelationshipEdge(Oid referencingOid, Oid referencedOid);
|
||||
static ForeignConstraintRelationshipNode * CreateOrFindNode(HTAB *adjacencyLists, Oid
|
||||
relid);
|
||||
static void GetConnectedListHelper(ForeignConstraintRelationshipNode *node,
|
||||
List **adjacentNodeList, bool
|
||||
isReferencing);
|
||||
static List * GetConnectedListHelper(ForeignConstraintRelationshipNode *node,
|
||||
bool isReferencing);
|
||||
static HTAB * CreateOidVisitedHashSet(void);
|
||||
static bool OidVisited(HTAB *oidVisitedMap, Oid oid);
|
||||
static void VisitOid(HTAB *oidVisitedMap, Oid oid);
|
||||
static List * GetForeignConstraintRelationshipHelper(Oid relationId, bool isReferencing);
|
||||
|
||||
|
||||
/*
|
||||
* GetForeignKeyConnectedRelationIdList returns a list of relation id's for
|
||||
* relations that are connected to relation with relationId via a foreign
|
||||
* key graph.
|
||||
*/
|
||||
List *
|
||||
GetForeignKeyConnectedRelationIdList(Oid relationId)
|
||||
{
|
||||
/* use ShareRowExclusiveLock to prevent concurent foreign key creation */
|
||||
LOCKMODE lockMode = ShareRowExclusiveLock;
|
||||
Relation relation = try_relation_open(relationId, lockMode);
|
||||
if (!RelationIsValid(relation))
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("relation with OID %d does not exist",
|
||||
relationId)));
|
||||
}
|
||||
|
||||
relation_close(relation, NoLock);
|
||||
|
||||
bool foundInFKeyGraph = false;
|
||||
ForeignConstraintRelationshipNode *relationshipNode =
|
||||
GetRelationshipNodeForRelationId(relationId, &foundInFKeyGraph);
|
||||
if (!foundInFKeyGraph)
|
||||
{
|
||||
/*
|
||||
* Relation could not be found in foreign key graph, then it has no
|
||||
* foreign key relationships.
|
||||
*/
|
||||
return NIL;
|
||||
}
|
||||
|
||||
List *fKeyConnectedRelationshipNodeList =
|
||||
GetRelationshipNodesForFKeyConnectedRelations(relationshipNode);
|
||||
List *fKeyConnectedRelationIdList =
|
||||
GetRelationIdsFromRelationshipNodeList(fKeyConnectedRelationshipNodeList);
|
||||
return fKeyConnectedRelationIdList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRelationshipNodesForFKeyConnectedRelations performs breadth-first search
|
||||
* starting from input ForeignConstraintRelationshipNode and returns a list
|
||||
* of ForeignConstraintRelationshipNode objects for relations that are connected
|
||||
* to given relation node via a foreign key relationhip graph.
|
||||
*/
|
||||
static List *
|
||||
GetRelationshipNodesForFKeyConnectedRelations(
|
||||
ForeignConstraintRelationshipNode *relationshipNode)
|
||||
{
|
||||
HTAB *oidVisitedMap = CreateOidVisitedHashSet();
|
||||
|
||||
VisitOid(oidVisitedMap, relationshipNode->relationId);
|
||||
List *relationshipNodeList = list_make1(relationshipNode);
|
||||
|
||||
ForeignConstraintRelationshipNode *currentNode = NULL;
|
||||
foreach_ptr_append(currentNode, relationshipNodeList)
|
||||
{
|
||||
List *allNeighboursList = GetAllNeighboursList(currentNode);
|
||||
ForeignConstraintRelationshipNode *neighbourNode = NULL;
|
||||
foreach_ptr(neighbourNode, allNeighboursList)
|
||||
{
|
||||
Oid neighbourRelationId = neighbourNode->relationId;
|
||||
if (OidVisited(oidVisitedMap, neighbourRelationId))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
VisitOid(oidVisitedMap, neighbourRelationId);
|
||||
relationshipNodeList = lappend(relationshipNodeList, neighbourNode);
|
||||
}
|
||||
}
|
||||
|
||||
return relationshipNodeList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetAllNeighboursList returns a list of ForeignConstraintRelationshipNode
|
||||
* objects by concatenating both (referencing & referenced) adjacency lists
|
||||
* of given relationship node.
|
||||
*/
|
||||
static List *
|
||||
GetAllNeighboursList(ForeignConstraintRelationshipNode *relationshipNode)
|
||||
{
|
||||
bool isReferencing = false;
|
||||
List *referencedNeighboursList = GetNeighbourList(relationshipNode, isReferencing);
|
||||
|
||||
isReferencing = true;
|
||||
List *referencingNeighboursList = GetNeighbourList(relationshipNode, isReferencing);
|
||||
|
||||
/*
|
||||
* GetNeighbourList returns list from graph as is, so first copy it as
|
||||
* list_concat might invalidate it.
|
||||
*/
|
||||
List *allNeighboursList = list_copy(referencedNeighboursList);
|
||||
allNeighboursList = list_concat_unique_ptr(allNeighboursList,
|
||||
referencingNeighboursList);
|
||||
return allNeighboursList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReferencedRelationIdList is a wrapper function around GetForeignConstraintRelationshipHelper
|
||||
* to get list of relation IDs which are referenced by the given relation id.
|
||||
|
@ -149,16 +257,8 @@ GetForeignConstraintRelationshipHelper(Oid relationId, bool isReferencing)
|
|||
return NIL;
|
||||
}
|
||||
|
||||
List *foreignNodeList = NIL;
|
||||
GetConnectedListHelper(relationshipNode, &foreignNodeList, isReferencing);
|
||||
|
||||
/* reset visited flags in foreign key graph */
|
||||
SetRelationshipNodeListNotVisited(foreignNodeList);
|
||||
|
||||
/* set to false separately, since we don't add itself to foreign node list */
|
||||
relationshipNode->visited = false;
|
||||
|
||||
List *relationIdList = GetRelationIdsFromRelationshipNodeList(foreignNodeList);
|
||||
List *connectedNodeList = GetConnectedListHelper(relationshipNode, isReferencing);
|
||||
List *relationIdList = GetRelationIdsFromRelationshipNodeList(connectedNodeList);
|
||||
return relationIdList;
|
||||
}
|
||||
|
||||
|
@ -264,28 +364,103 @@ SetForeignConstraintRelationshipGraphInvalid()
|
|||
|
||||
|
||||
/*
|
||||
* GetConnectedListHelper is the function for getting nodes connected (or connecting) to
|
||||
* the given relation. adjacentNodeList holds the result for recursive calls and
|
||||
* by changing isReferencing caller function can select connected or connecting
|
||||
* adjacency list.
|
||||
* GetConnectedListHelper returns list of ForeignConstraintRelationshipNode
|
||||
* objects for relations referenced by or referencing to given relation
|
||||
* according to isReferencing flag.
|
||||
*
|
||||
*/
|
||||
static void
|
||||
GetConnectedListHelper(ForeignConstraintRelationshipNode *node, List **adjacentNodeList,
|
||||
bool isReferencing)
|
||||
static List *
|
||||
GetConnectedListHelper(ForeignConstraintRelationshipNode *node, bool isReferencing)
|
||||
{
|
||||
node->visited = true;
|
||||
HTAB *oidVisitedMap = CreateOidVisitedHashSet();
|
||||
|
||||
ForeignConstraintRelationshipNode *neighbourNode = NULL;
|
||||
List *neighbourList = GetNeighbourList(node, isReferencing);
|
||||
foreach_ptr(neighbourNode, neighbourList)
|
||||
List *connectedNodeList = NIL;
|
||||
|
||||
List *relationshipNodeStack = list_make1(node);
|
||||
while (list_length(relationshipNodeStack) != 0)
|
||||
{
|
||||
if (neighbourNode->visited == false)
|
||||
/*
|
||||
* Note that this loop considers leftmost element of
|
||||
* relationshipNodeStack as top of the stack.
|
||||
*/
|
||||
|
||||
/* pop top element from stack */
|
||||
ForeignConstraintRelationshipNode *currentNode = linitial(relationshipNodeStack);
|
||||
relationshipNodeStack = list_delete_first(relationshipNodeStack);
|
||||
|
||||
Oid currentRelationId = currentNode->relationId;
|
||||
if (!OidVisited(oidVisitedMap, currentRelationId))
|
||||
{
|
||||
*adjacentNodeList = lappend(*adjacentNodeList, neighbourNode);
|
||||
GetConnectedListHelper(neighbourNode, adjacentNodeList, isReferencing);
|
||||
connectedNodeList = lappend(connectedNodeList, currentNode);
|
||||
VisitOid(oidVisitedMap, currentRelationId);
|
||||
}
|
||||
|
||||
List *neighbourList = GetNeighbourList(currentNode, isReferencing);
|
||||
ForeignConstraintRelationshipNode *neighbourNode = NULL;
|
||||
foreach_ptr(neighbourNode, neighbourList)
|
||||
{
|
||||
Oid neighbourRelationId = neighbourNode->relationId;
|
||||
if (!OidVisited(oidVisitedMap, neighbourRelationId))
|
||||
{
|
||||
/* push to stack */
|
||||
relationshipNodeStack = lcons(neighbourNode, relationshipNodeStack);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
hash_destroy(oidVisitedMap);
|
||||
|
||||
/* finally remove yourself from list */
|
||||
connectedNodeList = list_delete_first(connectedNodeList);
|
||||
return connectedNodeList;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateOidVisitedHashSet creates and returns an hash-set object in
|
||||
* CurrentMemoryContext to store visited oid's.
|
||||
* As hash_create allocates memory in heap, callers are responsible to call
|
||||
* hash_destroy when appropriate.
|
||||
*/
|
||||
static HTAB *
|
||||
CreateOidVisitedHashSet(void)
|
||||
{
|
||||
HASHCTL info = { 0 };
|
||||
|
||||
info.keysize = sizeof(Oid);
|
||||
info.hash = oid_hash;
|
||||
info.hcxt = CurrentMemoryContext;
|
||||
|
||||
/* we don't have value field as it's a set */
|
||||
info.entrysize = info.keysize;
|
||||
|
||||
uint32 hashFlags = (HASH_ELEM | HASH_FUNCTION | HASH_CONTEXT);
|
||||
|
||||
HTAB *oidVisitedMap = hash_create("oid visited hash map", 32, &info, hashFlags);
|
||||
return oidVisitedMap;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* OidVisited returns true if given oid is visited according to given oid hash-set.
|
||||
*/
|
||||
static bool
|
||||
OidVisited(HTAB *oidVisitedMap, Oid oid)
|
||||
{
|
||||
bool found = false;
|
||||
hash_search(oidVisitedMap, &oid, HASH_FIND, &found);
|
||||
return found;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* VisitOid sets given oid as visited in given hash-set.
|
||||
*/
|
||||
static void
|
||||
VisitOid(HTAB *oidVisitedMap, Oid oid)
|
||||
{
|
||||
bool found = false;
|
||||
hash_search(oidVisitedMap, &oid, HASH_ENTER, &found);
|
||||
}
|
||||
|
||||
|
||||
|
@ -308,21 +483,6 @@ GetNeighbourList(ForeignConstraintRelationshipNode *relationshipNode, bool isRef
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* SetRelationshipNodeListNotVisited takes a list of ForeignConstraintRelationshipNode
|
||||
* objects and sets their visited flags to false.
|
||||
*/
|
||||
static void
|
||||
SetRelationshipNodeListNotVisited(List *relationshipNodeList)
|
||||
{
|
||||
ForeignConstraintRelationshipNode *relationshipNode = NULL;
|
||||
foreach_ptr(relationshipNode, relationshipNodeList)
|
||||
{
|
||||
relationshipNode->visited = false;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* GetRelationIdsFromRelationshipNodeList returns list of relationId's for
|
||||
* given ForeignConstraintRelationshipNode object list.
|
||||
|
@ -474,7 +634,6 @@ CreateOrFindNode(HTAB *adjacencyLists, Oid relid)
|
|||
{
|
||||
node->adjacencyList = NIL;
|
||||
node->backAdjacencyList = NIL;
|
||||
node->visited = false;
|
||||
}
|
||||
|
||||
return node;
|
||||
|
|
|
@ -45,11 +45,8 @@ static StringInfo CopyShardPlacementToWorkerNodeQuery(
|
|||
ShardPlacement *sourceShardPlacement,
|
||||
WorkerNode *workerNode,
|
||||
char transferMode);
|
||||
static void ReplicateSingleShardTableToAllNodes(Oid relationId);
|
||||
static void ReplicateShardToAllNodes(ShardInterval *shardInterval);
|
||||
static void ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName,
|
||||
int nodePort);
|
||||
static void ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId);
|
||||
static bool AnyRelationsModifiedInTransaction(List *relationIdList);
|
||||
|
||||
/* exports for SQL callable functions */
|
||||
|
@ -134,9 +131,8 @@ EnsureReferenceTablesExistOnAllNodesExtended(char transferMode)
|
|||
|
||||
/*
|
||||
* We only take an access share lock, otherwise we'll hold up master_add_node.
|
||||
* In case of create_reference_table() and upgrade_to_reference_table(), where
|
||||
* we don't want concurrent writes to pg_dist_node, we have already acquired
|
||||
* ShareLock on pg_dist_node.
|
||||
* In case of create_reference_table() where we don't want concurrent writes
|
||||
* to pg_dist_node, we have already acquired ShareLock on pg_dist_node.
|
||||
*/
|
||||
List *newWorkersList = WorkersWithoutReferenceTablePlacement(shardId,
|
||||
AccessShareLock);
|
||||
|
@ -318,157 +314,14 @@ CopyShardPlacementToWorkerNodeQuery(ShardPlacement *sourceShardPlacement,
|
|||
|
||||
|
||||
/*
|
||||
* upgrade_to_reference_table accepts a broadcast table which has only one shard and
|
||||
* replicates it across all nodes to create a reference table. It also modifies related
|
||||
* metadata to mark the table as reference.
|
||||
* upgrade_to_reference_table was removed, but we maintain a dummy implementation
|
||||
* to support downgrades.
|
||||
*/
|
||||
Datum
|
||||
upgrade_to_reference_table(PG_FUNCTION_ARGS)
|
||||
{
|
||||
Oid relationId = PG_GETARG_OID(0);
|
||||
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureCoordinator();
|
||||
EnsureTableOwner(relationId);
|
||||
|
||||
if (!IsCitusTable(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Relation \"%s\" is not distributed.", relationName),
|
||||
errhint("Instead, you can use; "
|
||||
"create_reference_table('%s');", relationName)));
|
||||
}
|
||||
|
||||
CitusTableCacheEntry *tableEntry = GetCitusTableCacheEntry(relationId);
|
||||
|
||||
if (IsCitusTableTypeCacheEntry(tableEntry, REFERENCE_TABLE))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Relation \"%s\" is already a reference table",
|
||||
relationName)));
|
||||
}
|
||||
else if (IsCitusTableTypeCacheEntry(tableEntry, CITUS_LOCAL_TABLE))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Relation \"%s\" is a citus local table and "
|
||||
"currently it is not supported to upgrade "
|
||||
"a citus local table to a reference table ",
|
||||
relationName)));
|
||||
}
|
||||
|
||||
if (tableEntry->replicationModel == REPLICATION_MODEL_STREAMING)
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Upgrade is only supported for statement-based "
|
||||
"replicated tables but \"%s\" is streaming replicated",
|
||||
relationName)));
|
||||
}
|
||||
|
||||
LockRelationOid(relationId, AccessExclusiveLock);
|
||||
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
if (list_length(shardIntervalList) != 1)
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Relation \"%s\" shard count is not one. Only "
|
||||
"relations with one shard can be upgraded to "
|
||||
"reference tables.", relationName)));
|
||||
}
|
||||
|
||||
EnsureReferenceTablesExistOnAllNodes();
|
||||
ReplicateSingleShardTableToAllNodes(relationId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReplicateSingleShardTableToAllNodes accepts a broadcast table and replicates
|
||||
* it to all worker nodes, and the coordinator if it has been added by the user
|
||||
* to pg_dist_node. It assumes that caller of this function ensures that given
|
||||
* broadcast table has only one shard.
|
||||
*/
|
||||
static void
|
||||
ReplicateSingleShardTableToAllNodes(Oid relationId)
|
||||
{
|
||||
List *shardIntervalList = LoadShardIntervalList(relationId);
|
||||
ShardInterval *shardInterval = (ShardInterval *) linitial(shardIntervalList);
|
||||
uint64 shardId = shardInterval->shardId;
|
||||
|
||||
List *foreignConstraintCommandList = CopyShardForeignConstraintCommandList(
|
||||
shardInterval);
|
||||
|
||||
if (foreignConstraintCommandList != NIL || TableReferenced(relationId))
|
||||
{
|
||||
char *relationName = get_rel_name(relationId);
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot upgrade to reference table"),
|
||||
errdetail("Relation \"%s\" is part of a foreign constraint. "
|
||||
"Foreign key constraints are not allowed "
|
||||
"from or to reference tables.", relationName)));
|
||||
}
|
||||
|
||||
/*
|
||||
* ReplicateShardToAllNodes function opens separate transactions (i.e., not part
|
||||
* of any coordinated transactions) to each worker and replicates given shard to all
|
||||
* workers. If a worker already has a healthy replica of given shard, it skips that
|
||||
* worker to prevent copying unnecessary data.
|
||||
*/
|
||||
ReplicateShardToAllNodes(shardInterval);
|
||||
|
||||
/*
|
||||
* We need to update metadata tables to mark this table as reference table. We modify
|
||||
* pg_dist_partition, pg_dist_colocation and pg_dist_shard tables in
|
||||
* ConvertToReferenceTableMetadata function.
|
||||
*/
|
||||
ConvertToReferenceTableMetadata(relationId, shardId);
|
||||
|
||||
/*
|
||||
* After the table has been officially marked as a reference table, we need to create
|
||||
* the reference table itself and insert its pg_dist_partition, pg_dist_shard and
|
||||
* existing pg_dist_placement rows.
|
||||
*/
|
||||
CreateTableMetadataOnWorkers(relationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ReplicateShardToAllNodes function replicates given shard to all nodes
|
||||
* in separate transactions. While replicating, it only replicates the shard to the
|
||||
* nodes which does not have a healthy replica of the shard. However, this function
|
||||
* does not obtain any lock on shard resource and shard metadata. It is caller's
|
||||
* responsibility to take those locks.
|
||||
*/
|
||||
static void
|
||||
ReplicateShardToAllNodes(ShardInterval *shardInterval)
|
||||
{
|
||||
/* prevent concurrent pg_dist_node changes */
|
||||
List *workerNodeList = ReferenceTablePlacementNodeList(ShareLock);
|
||||
|
||||
/*
|
||||
* We will iterate over all worker nodes and if a healthy placement does not exist
|
||||
* at given node we will copy the shard to that node. Then we will also modify
|
||||
* the metadata to reflect newly copied shard.
|
||||
*/
|
||||
workerNodeList = SortList(workerNodeList, CompareWorkerNodes);
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
char *nodeName = workerNode->workerName;
|
||||
uint32 nodePort = workerNode->workerPort;
|
||||
|
||||
ReplicateShardToNode(shardInterval, nodeName, nodePort);
|
||||
}
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("this function is deprecated and no longer used")));
|
||||
}
|
||||
|
||||
|
||||
|
@ -548,34 +401,6 @@ ReplicateShardToNode(ShardInterval *shardInterval, char *nodeName, int nodePort)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* ConvertToReferenceTableMetadata accepts a broadcast table and modifies its metadata to
|
||||
* reference table metadata. To do this, this function updates pg_dist_partition,
|
||||
* pg_dist_colocation and pg_dist_shard. This function assumes that caller ensures that
|
||||
* given broadcast table has only one shard.
|
||||
*/
|
||||
static void
|
||||
ConvertToReferenceTableMetadata(Oid relationId, uint64 shardId)
|
||||
{
|
||||
uint32 currentColocationId = TableColocationId(relationId);
|
||||
uint32 newColocationId = CreateReferenceTableColocationId();
|
||||
Var *distributionColumn = NULL;
|
||||
char shardStorageType = ShardStorageType(relationId);
|
||||
text *shardMinValue = NULL;
|
||||
text *shardMaxValue = NULL;
|
||||
|
||||
/* delete old metadata rows */
|
||||
DeletePartitionRow(relationId);
|
||||
DeleteColocationGroupIfNoTablesBelong(currentColocationId);
|
||||
DeleteShardRow(shardId);
|
||||
|
||||
/* insert new metadata rows */
|
||||
InsertIntoPgDistPartition(relationId, DISTRIBUTE_BY_NONE, distributionColumn,
|
||||
newColocationId, REPLICATION_MODEL_2PC);
|
||||
InsertShardRow(relationId, shardId, shardStorageType, shardMinValue, shardMaxValue);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* CreateReferenceTableColocationId creates a new co-location id for reference tables and
|
||||
* writes it into pg_dist_colocation, then returns the created co-location id. Since there
|
||||
|
|
|
@ -61,9 +61,46 @@ typedef enum ExtractForeignKeyConstraintsMode
|
|||
INCLUDE_REFERENCED_CONSTRAINTS = 1 << 1,
|
||||
|
||||
/* exclude the self-referencing foreign keys */
|
||||
EXCLUDE_SELF_REFERENCES = 1 << 2
|
||||
EXCLUDE_SELF_REFERENCES = 1 << 2,
|
||||
|
||||
/* any combination of the 4 flags below is supported */
|
||||
/* include foreign keys when the other table is a distributed table*/
|
||||
INCLUDE_DISTRIBUTED_TABLES = 1 << 3,
|
||||
|
||||
/* include foreign keys when the other table is a reference table*/
|
||||
INCLUDE_REFERENCE_TABLES = 1 << 4,
|
||||
|
||||
/* include foreign keys when the other table is a citus local table*/
|
||||
INCLUDE_CITUS_LOCAL_TABLES = 1 << 5,
|
||||
|
||||
/* include foreign keys when the other table is a Postgres local table*/
|
||||
INCLUDE_LOCAL_TABLES = 1 << 6,
|
||||
|
||||
/* include foreign keys regardless of the other table's type */
|
||||
INCLUDE_ALL_TABLE_TYPES = INCLUDE_DISTRIBUTED_TABLES | INCLUDE_REFERENCE_TABLES |
|
||||
INCLUDE_CITUS_LOCAL_TABLES | INCLUDE_LOCAL_TABLES
|
||||
} ExtractForeignKeyConstraintMode;
|
||||
|
||||
|
||||
/*
|
||||
* Flags that can be passed to GetForeignKeyIdsForColumn to
|
||||
* indicate whether relationId argument should match:
|
||||
* - referencing relation or,
|
||||
* - referenced relation,
|
||||
* or we are searching for both sides.
|
||||
*/
|
||||
typedef enum SearchForeignKeyColumnFlags
|
||||
{
|
||||
/* relationId argument should match referencing relation */
|
||||
SEARCH_REFERENCING_RELATION = 1 << 0,
|
||||
|
||||
/* relationId argument should match referenced relation */
|
||||
SEARCH_REFERENCED_RELATION = 1 << 1,
|
||||
|
||||
/* callers can also pass union of above flags */
|
||||
} SearchForeignKeyColumnFlags;
|
||||
|
||||
|
||||
/* cluster.c - forward declarations */
|
||||
extern List * PreprocessClusterStmt(Node *node, const char *clusterCommand);
|
||||
|
||||
|
@ -122,6 +159,7 @@ extern void ErrorIfUnsupportedForeignConstraintExists(Relation relation,
|
|||
Var *distributionColumn,
|
||||
uint32 colocationId);
|
||||
extern void ErrorOutForFKeyBetweenPostgresAndCitusLocalTable(Oid localTableId);
|
||||
extern bool ColumnReferencedByAnyForeignKey(char *columnName, Oid relationId);
|
||||
extern bool ColumnAppearsInForeignKeyToReferenceTable(char *columnName, Oid
|
||||
relationId);
|
||||
extern List * GetReferencingForeignConstaintCommands(Oid relationOid);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
#include "utils/hsearch.h"
|
||||
#include "nodes/primnodes.h"
|
||||
|
||||
extern List * GetForeignKeyConnectedRelationIdList(Oid relationId);
|
||||
extern List * ReferencedRelationIdList(Oid relationId);
|
||||
extern List * ReferencingRelationIdList(Oid relationId);
|
||||
extern void SetForeignConstraintRelationshipGraphInvalid(void);
|
||||
|
|
|
@ -307,9 +307,6 @@ SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']);
|
|||
ERROR: citus local tables cannot be colocated with other tables
|
||||
SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']);
|
||||
ERROR: citus local tables cannot be colocated with other tables
|
||||
-- upgrade_to_reference_table is not supported
|
||||
SELECT upgrade_to_reference_table('citus_local_table_1');
|
||||
ERROR: cannot upgrade to reference table
|
||||
-- master_create_empty_shard is not supported
|
||||
SELECT master_create_empty_shard('citus_local_table_1');
|
||||
ERROR: relation "citus_local_table_1" is a citus local table
|
||||
|
@ -559,7 +556,7 @@ FROM pg_dist_partition WHERE logicalrelid = 'citus_local_table_4'::regclass;
|
|||
(1 row)
|
||||
|
||||
SELECT column_name_to_column('citus_local_table_4', 'a');
|
||||
column_name_to_column
|
||||
column_name_to_column
|
||||
---------------------------------------------------------------------
|
||||
{VAR :varno 1 :varattno 1 :vartype 23 :vartypmod -1 :varcollid 0 :varlevelsup 0 :varnoold 1 :varoattno 1 :location -1}
|
||||
(1 row)
|
||||
|
@ -618,8 +615,6 @@ BEGIN;
|
|||
|
||||
ROLLBACK;
|
||||
-- should fail --
|
||||
SELECT upgrade_to_reference_table('citus_local_table_4');
|
||||
ERROR: cannot upgrade to reference table
|
||||
SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with => 'none');
|
||||
ERROR: relation citus_local_table_4 should be a hash distributed table
|
||||
SELECT master_create_worker_shards('citus_local_table_4', 10, 1);
|
||||
|
|
|
@ -443,22 +443,23 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 10.0-1
|
||||
ALTER EXTENSION citus UPDATE TO '10.0-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_total_relation_size(regclass) |
|
||||
| access method columnar
|
||||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function columnar.columnar_handler(internal)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(13 rows)
|
||||
function citus_total_relation_size(regclass) |
|
||||
function upgrade_to_reference_table(regclass) |
|
||||
| access method columnar
|
||||
| function alter_columnar_table_reset(regclass,boolean,boolean,boolean,boolean)
|
||||
| function alter_columnar_table_set(regclass,integer,integer,name,integer)
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| function columnar.columnar_handler(internal)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(14 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -443,18 +443,19 @@ SELECT * FROM print_extension_changes();
|
|||
-- Snapshot of state at 10.0-1
|
||||
ALTER EXTENSION citus UPDATE TO '10.0-1';
|
||||
SELECT * FROM print_extension_changes();
|
||||
previous_object | current_object
|
||||
previous_object | current_object
|
||||
---------------------------------------------------------------------
|
||||
function citus_total_relation_size(regclass) |
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(9 rows)
|
||||
function citus_total_relation_size(regclass) |
|
||||
function upgrade_to_reference_table(regclass) |
|
||||
| function citus_internal.columnar_ensure_objects_exist()
|
||||
| function citus_total_relation_size(regclass,boolean)
|
||||
| schema columnar
|
||||
| sequence columnar.storageid_seq
|
||||
| table columnar.columnar_skipnodes
|
||||
| table columnar.columnar_stripes
|
||||
| table columnar.options
|
||||
| view citus_tables
|
||||
(10 rows)
|
||||
|
||||
DROP TABLE prev_objects, extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -602,18 +602,133 @@ drop cascades to table test_8
|
|||
(0 rows)
|
||||
|
||||
ROLLBACK;
|
||||
CREATE OR REPLACE FUNCTION get_foreign_key_connected_relations(IN table_name regclass)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus', $$get_foreign_key_connected_relations$$;
|
||||
COMMENT ON FUNCTION get_foreign_key_connected_relations(IN table_name regclass)
|
||||
IS 'returns relations connected to input relation via a foreign key graph';
|
||||
CREATE TABLE distributed_table_1(col int unique);
|
||||
CREATE TABLE distributed_table_2(col int unique);
|
||||
CREATE TABLE distributed_table_3(col int unique);
|
||||
CREATE TABLE distributed_table_4(col int unique);
|
||||
SELECT create_distributed_table('distributed_table_1', 'col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table_2', 'col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('distributed_table_3', 'col');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE reference_table_1(col int unique);
|
||||
CREATE TABLE reference_table_2(col int unique);
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Now build below foreign key graph:
|
||||
--
|
||||
-- --------------------------------------------
|
||||
-- ^ |
|
||||
-- | v
|
||||
-- distributed_table_1 <- distributed_table_2 -> reference_table_1 <- reference_table_2
|
||||
-- | ^
|
||||
-- | |
|
||||
-- ----------> distributed_table_3
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES distributed_table_1(col);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col) REFERENCES reference_table_1(col);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col) REFERENCES reference_table_1(col);
|
||||
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_4 FOREIGN KEY (col) REFERENCES distributed_table_2(col);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col) REFERENCES reference_table_2(col);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col) REFERENCES distributed_table_3(col);
|
||||
-- below queries should print all 5 tables mentioned in above graph
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('reference_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
distributed_table_1
|
||||
distributed_table_2
|
||||
distributed_table_3
|
||||
reference_table_1
|
||||
reference_table_2
|
||||
(5 rows)
|
||||
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
distributed_table_1
|
||||
distributed_table_2
|
||||
distributed_table_3
|
||||
reference_table_1
|
||||
reference_table_2
|
||||
(5 rows)
|
||||
|
||||
-- show that this does not print anything as distributed_table_4
|
||||
-- is not involved in any foreign key relationship
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_4') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
ALTER TABLE distributed_table_4 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES distributed_table_4(col);
|
||||
-- even if distributed_table_4 has a self referencing foreign key,
|
||||
-- we don't print anything as we only consider foreign key relationships
|
||||
-- with other tables
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_4') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
CREATE TABLE local_table_1 (col int unique);
|
||||
CREATE TABLE local_table_2 (col int unique);
|
||||
-- show that we do not trigger updating foreign key graph when
|
||||
-- defining/dropping foreign keys between postgres tables
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES local_table_2(col);
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('local_table_2') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
ALTER TABLE local_table_1 DROP CONSTRAINT fkey_1;
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('local_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
tablename
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
|
||||
-- show that we error out for non-existent tables
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('non_existent_table') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
ERROR: relation "non_existent_table" does not exist
|
||||
set client_min_messages to error;
|
||||
SET search_path TO public;
|
||||
DROP SCHEMA fkey_graph CASCADE;
|
||||
NOTICE: drop cascades to 12 other objects
|
||||
DETAIL: drop cascades to function fkey_graph.get_referencing_relation_id_list(oid)
|
||||
drop cascades to function fkey_graph.get_referenced_relation_id_list(oid)
|
||||
drop cascades to table fkey_graph.dtt1
|
||||
drop cascades to table fkey_graph.dtt2
|
||||
drop cascades to table fkey_graph.dtt3
|
||||
drop cascades to table fkey_graph.dtt4
|
||||
drop cascades to view fkey_graph.referential_integrity_summary
|
||||
drop cascades to table fkey_graph.test_1
|
||||
drop cascades to table fkey_graph.test_2
|
||||
drop cascades to table fkey_graph.test_3
|
||||
drop cascades to table fkey_graph.test_4
|
||||
drop cascades to table fkey_graph.test_5
|
||||
|
|
|
@ -177,12 +177,15 @@ ERROR: data type bigint has no default operator class for access method "gist"
|
|||
HINT: You must specify an operator class for the index or define a default operator class for the data type.
|
||||
CREATE INDEX try_index ON lineitem (non_existent_column);
|
||||
ERROR: column "non_existent_column" does not exist
|
||||
-- show that we support indexes without names
|
||||
CREATE INDEX ON lineitem (l_orderkey);
|
||||
ERROR: creating index without a name on a distributed table is currently unsupported
|
||||
CREATE UNIQUE INDEX ON index_test_hash(a);
|
||||
CREATE INDEX CONCURRENTLY ON lineitem USING hash (l_shipdate);
|
||||
-- Verify that none of failed indexes got created on the master node
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
---------------------------------------------------------------------
|
||||
multi_index_statements | index_test_hash | index_test_hash_a_idx | | CREATE UNIQUE INDEX index_test_hash_a_idx ON multi_index_statements.index_test_hash USING btree (a)
|
||||
multi_index_statements | index_test_hash | index_test_hash_index_a | | CREATE UNIQUE INDEX index_test_hash_index_a ON multi_index_statements.index_test_hash USING btree (a)
|
||||
multi_index_statements | index_test_hash | index_test_hash_index_a_b | | CREATE UNIQUE INDEX index_test_hash_index_a_b ON multi_index_statements.index_test_hash USING btree (a, b)
|
||||
multi_index_statements | index_test_hash | index_test_hash_index_a_b_c | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c ON multi_index_statements.index_test_hash USING btree (a) INCLUDE (b, c)
|
||||
|
@ -192,6 +195,8 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
multi_index_statements | index_test_range | index_test_range_index_a_b_partial | | CREATE UNIQUE INDEX index_test_range_index_a_b_partial ON multi_index_statements.index_test_range USING btree (a, b) WHERE (c IS NOT NULL)
|
||||
public | lineitem | lineitem_colref_index | | CREATE INDEX lineitem_colref_index ON public.lineitem USING btree (record_ne(lineitem.*, NULL::record))
|
||||
public | lineitem | lineitem_concurrently_index | | CREATE INDEX lineitem_concurrently_index ON public.lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_l_orderkey_idx | | CREATE INDEX lineitem_l_orderkey_idx ON public.lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_l_shipdate_idx | | CREATE INDEX lineitem_l_shipdate_idx ON public.lineitem USING hash (l_shipdate)
|
||||
public | lineitem | lineitem_orderkey_hash_index | | CREATE INDEX lineitem_orderkey_hash_index ON public.lineitem USING hash (l_partkey)
|
||||
public | lineitem | lineitem_orderkey_index | | CREATE INDEX lineitem_orderkey_index ON public.lineitem USING btree (l_orderkey)
|
||||
public | lineitem | lineitem_orderkey_index_new | | CREATE INDEX lineitem_orderkey_index_new ON public.lineitem USING btree (l_orderkey)
|
||||
|
@ -199,7 +204,7 @@ SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_t
|
|||
public | lineitem | lineitem_partkey_desc_index | | CREATE INDEX lineitem_partkey_desc_index ON public.lineitem USING btree (l_partkey DESC)
|
||||
public | lineitem | lineitem_pkey | | CREATE UNIQUE INDEX lineitem_pkey ON public.lineitem USING btree (l_orderkey, l_linenumber)
|
||||
public | lineitem | lineitem_time_index | | CREATE INDEX lineitem_time_index ON public.lineitem USING btree (l_shipdate)
|
||||
(16 rows)
|
||||
(19 rows)
|
||||
|
||||
--
|
||||
-- REINDEX
|
||||
|
@ -240,26 +245,39 @@ DROP INDEX index_test_hash_index_a_b_partial;
|
|||
DROP INDEX CONCURRENTLY lineitem_concurrently_index;
|
||||
-- Verify that all the indexes are dropped from the master and one worker node.
|
||||
-- As there's a primary key, so exclude those from this check.
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
indrelid | indexrelid
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
||||
indrelid | indexrelid
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
lineitem | lineitem_l_orderkey_idx
|
||||
lineitem | lineitem_l_shipdate_idx
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
---------------------------------------------------------------------
|
||||
multi_index_statements | index_test_hash | index_test_hash_a_idx | | CREATE UNIQUE INDEX index_test_hash_a_idx ON multi_index_statements.index_test_hash USING btree (a)
|
||||
multi_index_statements | index_test_hash | index_test_hash_index_a_b_c | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c ON multi_index_statements.index_test_hash USING btree (a) INCLUDE (b, c)
|
||||
(1 row)
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
indrelid | indexrelid
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
||||
indrelid | indexrelid
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
lineitem_290000 | lineitem_l_orderkey_idx_290000
|
||||
lineitem_290000 | lineitem_l_shipdate_idx_290000
|
||||
(2 rows)
|
||||
|
||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||
schemaname | tablename | indexname | tablespace | indexdef
|
||||
---------------------------------------------------------------------
|
||||
multi_index_statements | index_test_hash_102082 | index_test_hash_a_idx_102082 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102082 ON multi_index_statements.index_test_hash_102082 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102083 | index_test_hash_a_idx_102083 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102083 ON multi_index_statements.index_test_hash_102083 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102084 | index_test_hash_a_idx_102084 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102084 ON multi_index_statements.index_test_hash_102084 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102085 | index_test_hash_a_idx_102085 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102085 ON multi_index_statements.index_test_hash_102085 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102086 | index_test_hash_a_idx_102086 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102086 ON multi_index_statements.index_test_hash_102086 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102087 | index_test_hash_a_idx_102087 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102087 ON multi_index_statements.index_test_hash_102087 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102088 | index_test_hash_a_idx_102088 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102088 ON multi_index_statements.index_test_hash_102088 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102089 | index_test_hash_a_idx_102089 | | CREATE UNIQUE INDEX index_test_hash_a_idx_102089 ON multi_index_statements.index_test_hash_102089 USING btree (a)
|
||||
multi_index_statements | index_test_hash_102082 | index_test_hash_index_a_b_c_102082 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102082 ON multi_index_statements.index_test_hash_102082 USING btree (a) INCLUDE (b, c)
|
||||
multi_index_statements | index_test_hash_102083 | index_test_hash_index_a_b_c_102083 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102083 ON multi_index_statements.index_test_hash_102083 USING btree (a) INCLUDE (b, c)
|
||||
multi_index_statements | index_test_hash_102084 | index_test_hash_index_a_b_c_102084 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102084 ON multi_index_statements.index_test_hash_102084 USING btree (a) INCLUDE (b, c)
|
||||
|
@ -268,7 +286,7 @@ SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
|||
multi_index_statements | index_test_hash_102087 | index_test_hash_index_a_b_c_102087 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102087 ON multi_index_statements.index_test_hash_102087 USING btree (a) INCLUDE (b, c)
|
||||
multi_index_statements | index_test_hash_102088 | index_test_hash_index_a_b_c_102088 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102088 ON multi_index_statements.index_test_hash_102088 USING btree (a) INCLUDE (b, c)
|
||||
multi_index_statements | index_test_hash_102089 | index_test_hash_index_a_b_c_102089 | | CREATE UNIQUE INDEX index_test_hash_index_a_b_c_102089 ON multi_index_statements.index_test_hash_102089 USING btree (a) INCLUDE (b, c)
|
||||
(8 rows)
|
||||
(16 rows)
|
||||
|
||||
-- create index that will conflict with master operations
|
||||
CREATE INDEX CONCURRENTLY ith_b_idx_102089 ON multi_index_statements.index_test_hash_102089(b);
|
||||
|
@ -389,9 +407,92 @@ DEBUG: the index name on the shards of the partition is too long, switching to
|
|||
CREATE INDEX f1
|
||||
ON test_index_creation1 USING btree
|
||||
(field1);
|
||||
SELECT
|
||||
'CREATE TABLE distributed_table(' ||
|
||||
string_Agg('col' || x::text || ' int,', ' ') ||
|
||||
' last_column int)'
|
||||
FROM
|
||||
generate_Series(1, 32) x;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
CREATE TABLE distributed_table(col1 int, col2 int, col3 int, col4 int, col5 int, col6 int, col7 int, col8 int, col9 int, col10 int, col11 int, col12 int, col13 int, col14 int, col15 int, col16 int, col17 int, col18 int, col19 int, col20 int, col21 int, col22 int, col23 int, col24 int, col25 int, col26 int, col27 int, col28 int, col29 int, col30 int, col31 int, col32 int, last_column int)
|
||||
(1 row)
|
||||
|
||||
\gexec
|
||||
CREATE TABLE distributed_table(col1 int, col2 int, col3 int, col4 int, col5 int, col6 int, col7 int, col8 int, col9 int, col10 int, col11 int, col12 int, col13 int, col14 int, col15 int, col16 int, col17 int, col18 int, col19 int, col20 int, col21 int, col22 int, col23 int, col24 int, col25 int, col26 int, col27 int, col28 int, col29 int, col30 int, col31 int, col32 int, last_column int)
|
||||
SELECT create_distributed_table('distributed_table', 'last_column');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- try to use all 33 columns to create the index
|
||||
-- show that we error out as postgres would do
|
||||
SELECT
|
||||
'CREATE INDEX ON distributed_table(' ||
|
||||
string_Agg('col' || x::text || ',', ' ') ||
|
||||
' last_column)'
|
||||
FROM
|
||||
generate_Series(1, 32) x;
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
CREATE INDEX ON distributed_table(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18, col19, col20, col21, col22, col23, col24, col25, col26, col27, col28, col29, col30, col31, col32, last_column)
|
||||
(1 row)
|
||||
|
||||
\gexec
|
||||
CREATE INDEX ON distributed_table(col1, col2, col3, col4, col5, col6, col7, col8, col9, col10, col11, col12, col13, col14, col15, col16, col17, col18, col19, col20, col21, col22, col23, col24, col25, col26, col27, col28, col29, col30, col31, col32, last_column)
|
||||
ERROR: cannot use more than 32 columns in an index
|
||||
-- show that we generate different default index names
|
||||
-- for the indexes with same parameters on the same relation
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
SELECT indexrelid::regclass FROM pg_index WHERE indrelid='distributed_table'::regclass ORDER BY indexrelid;
|
||||
indexrelid
|
||||
---------------------------------------------------------------------
|
||||
distributed_table_last_column_idx
|
||||
distributed_table_last_column_idx1
|
||||
(2 rows)
|
||||
|
||||
-- test CREATE INDEX in plpgsql to verify that we don't break parse tree
|
||||
CREATE OR REPLACE FUNCTION create_index_in_plpgsql()
|
||||
RETURNS VOID AS
|
||||
$BODY$
|
||||
BEGIN
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
-- hide plpgsql messages as they differ across pg versions
|
||||
\set VERBOSITY terse
|
||||
SELECT create_index_in_plpgsql();
|
||||
create_index_in_plpgsql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_index_in_plpgsql();
|
||||
create_index_in_plpgsql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_index_in_plpgsql();
|
||||
create_index_in_plpgsql
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT indexrelid::regclass FROM pg_index WHERE indrelid='distributed_table'::regclass ORDER BY indexrelid;
|
||||
indexrelid
|
||||
---------------------------------------------------------------------
|
||||
distributed_table_last_column_idx
|
||||
distributed_table_last_column_idx1
|
||||
distributed_table_last_column_idx2
|
||||
distributed_table_last_column_idx3
|
||||
distributed_table_last_column_idx4
|
||||
(5 rows)
|
||||
|
||||
SET citus.force_max_query_parallelization TO OFF;
|
||||
SET client_min_messages TO ERROR;
|
||||
\set VERBOSITY terse
|
||||
DROP INDEX f1;
|
||||
DROP INDEX ix_test_index_creation2;
|
||||
DROP INDEX ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1;
|
||||
|
|
|
@ -312,9 +312,6 @@ ERROR: permission denied for table test
|
|||
ABORT;
|
||||
SELECT * FROM citus_stat_statements_reset();
|
||||
ERROR: permission denied for function citus_stat_statements_reset
|
||||
-- should not be allowed to upgrade to reference table
|
||||
SELECT upgrade_to_reference_table('singleshard');
|
||||
ERROR: must be owner of table singleshard
|
||||
-- should not be allowed to co-located tables
|
||||
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
||||
ERROR: must be owner of table test
|
||||
|
@ -572,66 +569,6 @@ SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
|||
(1 row)
|
||||
|
||||
RESET ROLE;
|
||||
-- a user with all privileges on a schema should be able to upgrade a distributed table to
|
||||
-- a reference table
|
||||
SET ROLE full_access;
|
||||
BEGIN;
|
||||
CREATE TABLE full_access_user_schema.r1(id int);
|
||||
SET LOCAL citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
|
||||
upgrade_to_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
RESET ROLE;
|
||||
-- the super user should be able to upgrade a distributed table to a reference table, even
|
||||
-- if it is owned by another user
|
||||
SET ROLE full_access;
|
||||
BEGIN;
|
||||
CREATE TABLE full_access_user_schema.r2(id int);
|
||||
SET LOCAL citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
COMMIT;
|
||||
RESET ROLE;
|
||||
-- the usage_access should not be able to upgrade the table
|
||||
SET ROLE usage_access;
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||
ERROR: must be owner of table r2
|
||||
RESET ROLE;
|
||||
-- the super user should be able
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||
upgrade_to_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- verify the owner of the shards for the reference table
|
||||
SELECT result FROM run_command_on_workers($cmd$
|
||||
SELECT tableowner FROM pg_tables WHERE
|
||||
true
|
||||
AND schemaname = 'full_access_user_schema'
|
||||
AND tablename LIKE 'r2_%'
|
||||
LIMIT 1;
|
||||
$cmd$);
|
||||
result
|
||||
---------------------------------------------------------------------
|
||||
full_access
|
||||
full_access
|
||||
(2 rows)
|
||||
|
||||
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
|
||||
SELECT worker_cleanup_job_schema_cache();
|
||||
worker_cleanup_job_schema_cache
|
||||
|
@ -756,11 +693,9 @@ SELECT citus_rm_job_directory(42::bigint);
|
|||
|
||||
\c - - - :master_port
|
||||
DROP SCHEMA full_access_user_schema CASCADE;
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
DETAIL: drop cascades to table full_access_user_schema.t1
|
||||
drop cascades to table full_access_user_schema.t2
|
||||
drop cascades to table full_access_user_schema.r1
|
||||
drop cascades to table full_access_user_schema.r2
|
||||
DROP TABLE
|
||||
my_table,
|
||||
my_table_with_data,
|
||||
|
|
|
@ -359,15 +359,6 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
|
|||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE replicate_reference_table_hash(column1 int);
|
||||
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
|
||||
CREATE TABLE replicate_reference_table_reference_two(column1 int);
|
||||
-- status before master_add_node
|
||||
SELECT
|
||||
|
@ -396,13 +387,12 @@ SELECT
|
|||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
|
||||
ORDER BY logicalrelid;
|
||||
logicalrelid | partmethod | ?column? | repmodel
|
||||
---------------------------------------------------------------------
|
||||
replicate_reference_table_reference_one | n | t | t
|
||||
replicate_reference_table_hash | h | f | c
|
||||
(2 rows)
|
||||
(1 row)
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
|
@ -411,12 +401,6 @@ SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
|||
1
|
||||
(1 row)
|
||||
|
||||
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
|
||||
upgrade_to_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_reference_table('replicate_reference_table_reference_two');
|
||||
create_reference_table
|
||||
---------------------------------------------------------------------
|
||||
|
@ -436,8 +420,7 @@ ORDER BY shardid, nodeport;
|
|||
---------------------------------------------------------------------
|
||||
1370004 | 1 | 0 | localhost | 57638
|
||||
1370005 | 1 | 0 | localhost | 57638
|
||||
1370006 | 1 | 0 | localhost | 57638
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype, distributioncolumncollation FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
|
@ -454,18 +437,16 @@ SELECT
|
|||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
|
||||
ORDER BY
|
||||
logicalrelid;
|
||||
logicalrelid | partmethod | ?column? | repmodel
|
||||
---------------------------------------------------------------------
|
||||
replicate_reference_table_reference_one | n | t | t
|
||||
replicate_reference_table_hash | n | t | t
|
||||
replicate_reference_table_reference_two | n | t | t
|
||||
(3 rows)
|
||||
(2 rows)
|
||||
|
||||
DROP TABLE replicate_reference_table_reference_one;
|
||||
DROP TABLE replicate_reference_table_hash;
|
||||
DROP TABLE replicate_reference_table_reference_two;
|
||||
-- test inserting a value then adding a new node in a transaction
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
|
@ -733,7 +714,7 @@ WHERE
|
|||
ORDER BY 1,4,5;
|
||||
shardid | shardstate | shardlength | nodename | nodeport
|
||||
---------------------------------------------------------------------
|
||||
1370015 | 1 | 0 | localhost | 57637
|
||||
1370014 | 1 | 0 | localhost | 57637
|
||||
(1 row)
|
||||
|
||||
-- we should see the two shard placements after activation
|
||||
|
@ -758,7 +739,7 @@ WHERE
|
|||
ORDER BY 1,4,5;
|
||||
shardid | shardstate | shardlength | nodename | nodeport
|
||||
---------------------------------------------------------------------
|
||||
1370015 | 1 | 0 | localhost | 57637
|
||||
1370014 | 1 | 0 | localhost | 57637
|
||||
(1 row)
|
||||
|
||||
SELECT 1 FROM master_remove_node('localhost', :worker_2_port);
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -1,3 +1,6 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 12 AS have_table_am
|
||||
\gset
|
||||
CREATE TEMPORARY TABLE output (line text);
|
||||
CREATE SCHEMA dumper;
|
||||
SET search_path TO 'dumper';
|
||||
|
@ -26,6 +29,24 @@ COPY data TO STDOUT;
|
|||
4 {}
|
||||
2 {$":9}
|
||||
2 {$":9}
|
||||
\if :have_table_am
|
||||
CREATE TABLE simple_columnar(i INT, t TEXT) USING columnar;
|
||||
\else
|
||||
CREATE TABLE simple_columnar(i INT, t TEXT);
|
||||
\endif
|
||||
INSERT INTO simple_columnar VALUES (1, 'one'), (2, 'two');
|
||||
\if :have_table_am
|
||||
CREATE TABLE dist_columnar(i INT, t TEXT) USING columnar;
|
||||
\else
|
||||
CREATE TABLE dist_columnar(i INT, t TEXT);
|
||||
\endif
|
||||
SELECT create_distributed_table('dist_columnar', 'i');
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO dist_columnar VALUES (1000, 'one thousand'), (2000, 'two thousand');
|
||||
-- go crazy with names
|
||||
CREATE TABLE "weird.table" (
|
||||
"key," int primary key,
|
||||
|
@ -49,8 +70,10 @@ _{"?\"": []}_?__
|
|||
\COPY output FROM PROGRAM 'pg_dump -f results/pg_dump.tmp -h localhost -p 57636 -U postgres -d regression -n dumper --quote-all-identifiers' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
-- drop the schema
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table data
|
||||
drop cascades to table simple_columnar
|
||||
drop cascades to table dist_columnar
|
||||
drop cascades to table "weird.table"
|
||||
-- recreate the schema
|
||||
\COPY (SELECT line FROM output WHERE line IS NOT NULL) TO PROGRAM 'psql -qtAX -h localhost -p 57636 -U postgres -d regression -f results/pg_dump.tmp' WITH (format csv, delimiter '|', escape '^', quote '^')
|
||||
|
@ -76,6 +99,16 @@ HINT: To remove the local data, run: SELECT truncate_local_data_after_distribut
|
|||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_table('dist_columnar', 'i');
|
||||
NOTICE: Copying data from local table...
|
||||
NOTICE: copying the data has completed
|
||||
DETAIL: The local data in the table is no longer visible, but is still on disk.
|
||||
HINT: To remove the local data, run: SELECT truncate_local_data_after_distributing_table($$dumper.dist_columnar$$)
|
||||
create_distributed_table
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- check the table contents
|
||||
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
|
||||
"{this:is,json:1}"
|
||||
|
@ -90,6 +123,31 @@ COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv,
|
|||
data.jsonb,?empty(
|
||||
"{""weird"": {""table"": ""{:""}}",""
|
||||
"{""?\"""": []}",""
|
||||
-- If server supports table access methods, check to be sure that the
|
||||
-- recreated table is still columnar. Otherwise, just return true.
|
||||
\if :have_table_am
|
||||
\set is_columnar '(SELECT amname=''columnar'' from pg_am where relam=oid)'
|
||||
\else
|
||||
\set is_columnar TRUE
|
||||
\endif
|
||||
SELECT :is_columnar AS check_columnar FROM pg_class WHERE oid='simple_columnar'::regclass;
|
||||
check_columnar
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COPY simple_columnar TO STDOUT;
|
||||
1 one
|
||||
2 two
|
||||
SELECT :is_columnar AS check_columnar FROM pg_class WHERE oid='dist_columnar'::regclass;
|
||||
check_columnar
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
(1 row)
|
||||
|
||||
COPY dist_columnar TO STDOUT;
|
||||
1000 one thousand
|
||||
2000 two thousand
|
||||
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
|
||||
indexname
|
||||
---------------------------------------------------------------------
|
||||
|
@ -98,6 +156,8 @@ SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexn
|
|||
(2 rows)
|
||||
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
NOTICE: drop cascades to 2 other objects
|
||||
NOTICE: drop cascades to 4 other objects
|
||||
DETAIL: drop cascades to table data
|
||||
drop cascades to table dist_columnar
|
||||
drop cascades to table simple_columnar
|
||||
drop cascades to table "weird.table"
|
||||
|
|
|
@ -155,7 +155,6 @@ ORDER BY 1;
|
|||
function truncate_local_data_after_distributing_table(regclass)
|
||||
function undistribute_table(regclass)
|
||||
function update_distributed_table_colocation(regclass,text)
|
||||
function upgrade_to_reference_table(regclass)
|
||||
function worker_append_table_to_shard(text,text,text,integer)
|
||||
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
|
||||
function worker_apply_sequence_command(text)
|
||||
|
@ -218,5 +217,5 @@ ORDER BY 1;
|
|||
view citus_tables
|
||||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
(202 rows)
|
||||
(201 rows)
|
||||
|
||||
|
|
|
@ -151,7 +151,6 @@ ORDER BY 1;
|
|||
function truncate_local_data_after_distributing_table(regclass)
|
||||
function undistribute_table(regclass)
|
||||
function update_distributed_table_colocation(regclass,text)
|
||||
function upgrade_to_reference_table(regclass)
|
||||
function worker_append_table_to_shard(text,text,text,integer)
|
||||
function worker_apply_inter_shard_ddl_command(bigint,text,bigint,text,text)
|
||||
function worker_apply_sequence_command(text)
|
||||
|
@ -214,5 +213,5 @@ ORDER BY 1;
|
|||
view citus_tables
|
||||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
(198 rows)
|
||||
(197 rows)
|
||||
|
||||
|
|
|
@ -304,11 +304,9 @@ test: node_conninfo_reload
|
|||
test: multi_foreign_key multi_foreign_key_relation_graph
|
||||
|
||||
# ----------
|
||||
# multi_upgrade_reference_table tests for upgrade_reference_table UDF
|
||||
# multi_replicate_reference_table tests replicating reference tables to new nodes after we add new nodes
|
||||
# multi_remove_node_reference_table tests metadata changes after master_remove_node
|
||||
# ----------
|
||||
test: multi_upgrade_reference_table
|
||||
test: multi_replicate_reference_table
|
||||
test: multi_remove_node_reference_table
|
||||
|
||||
|
@ -317,8 +315,6 @@ test: multi_remove_node_reference_table
|
|||
# and rerun some of the tests.
|
||||
# --------
|
||||
test: add_coordinator
|
||||
test: multi_upgrade_reference_table
|
||||
test: multi_replicate_reference_table
|
||||
test: multi_reference_table citus_local_tables_queries
|
||||
test: foreign_key_to_reference_table citus_local_table_triggers
|
||||
test: replicate_reference_tables_to_coordinator
|
||||
|
|
|
@ -230,8 +230,6 @@ SELECT mark_tables_colocated('reference_table', ARRAY['citus_local_table_1']);
|
|||
SELECT mark_tables_colocated('citus_local_table_1', ARRAY['distributed_table']);
|
||||
SELECT mark_tables_colocated('distributed_table', ARRAY['citus_local_table_1']);
|
||||
|
||||
-- upgrade_to_reference_table is not supported
|
||||
SELECT upgrade_to_reference_table('citus_local_table_1');
|
||||
-- master_create_empty_shard is not supported
|
||||
SELECT master_create_empty_shard('citus_local_table_1');
|
||||
-- get_shard_id_for_distribution_column is supported
|
||||
|
@ -421,7 +419,6 @@ ROLLBACK;
|
|||
|
||||
-- should fail --
|
||||
|
||||
SELECT upgrade_to_reference_table('citus_local_table_4');
|
||||
SELECT update_distributed_table_colocation('citus_local_table_4', colocate_with => 'none');
|
||||
|
||||
SELECT master_create_worker_shards('citus_local_table_4', 10, 1);
|
||||
|
|
|
@ -220,5 +220,95 @@ BEGIN;
|
|||
|
||||
ROLLBACK;
|
||||
|
||||
CREATE OR REPLACE FUNCTION get_foreign_key_connected_relations(IN table_name regclass)
|
||||
RETURNS SETOF RECORD
|
||||
LANGUAGE C STRICT
|
||||
AS 'citus', $$get_foreign_key_connected_relations$$;
|
||||
COMMENT ON FUNCTION get_foreign_key_connected_relations(IN table_name regclass)
|
||||
IS 'returns relations connected to input relation via a foreign key graph';
|
||||
|
||||
CREATE TABLE distributed_table_1(col int unique);
|
||||
CREATE TABLE distributed_table_2(col int unique);
|
||||
CREATE TABLE distributed_table_3(col int unique);
|
||||
CREATE TABLE distributed_table_4(col int unique);
|
||||
|
||||
SELECT create_distributed_table('distributed_table_1', 'col');
|
||||
SELECT create_distributed_table('distributed_table_2', 'col');
|
||||
SELECT create_distributed_table('distributed_table_3', 'col');
|
||||
|
||||
CREATE TABLE reference_table_1(col int unique);
|
||||
CREATE TABLE reference_table_2(col int unique);
|
||||
|
||||
SELECT create_reference_table('reference_table_1');
|
||||
SELECT create_reference_table('reference_table_2');
|
||||
|
||||
|
||||
-- Now build below foreign key graph:
|
||||
--
|
||||
-- --------------------------------------------
|
||||
-- ^ |
|
||||
-- | v
|
||||
-- distributed_table_1 <- distributed_table_2 -> reference_table_1 <- reference_table_2
|
||||
-- | ^
|
||||
-- | |
|
||||
-- ----------> distributed_table_3
|
||||
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES distributed_table_1(col);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_2 FOREIGN KEY (col) REFERENCES reference_table_1(col);
|
||||
ALTER TABLE reference_table_2 ADD CONSTRAINT fkey_3 FOREIGN KEY (col) REFERENCES reference_table_1(col);
|
||||
ALTER TABLE distributed_table_3 ADD CONSTRAINT fkey_4 FOREIGN KEY (col) REFERENCES distributed_table_2(col);
|
||||
ALTER TABLE distributed_table_2 ADD CONSTRAINT fkey_5 FOREIGN KEY (col) REFERENCES reference_table_2(col);
|
||||
ALTER TABLE distributed_table_1 ADD CONSTRAINT fkey_6 FOREIGN KEY (col) REFERENCES distributed_table_3(col);
|
||||
|
||||
-- below queries should print all 5 tables mentioned in above graph
|
||||
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('reference_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
-- show that this does not print anything as distributed_table_4
|
||||
-- is not involved in any foreign key relationship
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_4') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
ALTER TABLE distributed_table_4 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES distributed_table_4(col);
|
||||
|
||||
-- even if distributed_table_4 has a self referencing foreign key,
|
||||
-- we don't print anything as we only consider foreign key relationships
|
||||
-- with other tables
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('distributed_table_4') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
CREATE TABLE local_table_1 (col int unique);
|
||||
CREATE TABLE local_table_2 (col int unique);
|
||||
|
||||
-- show that we do not trigger updating foreign key graph when
|
||||
-- defining/dropping foreign keys between postgres tables
|
||||
|
||||
ALTER TABLE local_table_1 ADD CONSTRAINT fkey_1 FOREIGN KEY (col) REFERENCES local_table_2(col);
|
||||
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('local_table_2') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
ALTER TABLE local_table_1 DROP CONSTRAINT fkey_1;
|
||||
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('local_table_1') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
-- show that we error out for non-existent tables
|
||||
SELECT oid::regclass::text AS tablename
|
||||
FROM get_foreign_key_connected_relations('non_existent_table') AS f(oid oid)
|
||||
ORDER BY tablename;
|
||||
|
||||
set client_min_messages to error;
|
||||
|
||||
SET search_path TO public;
|
||||
DROP SCHEMA fkey_graph CASCADE;
|
||||
|
|
|
@ -110,7 +110,11 @@ CREATE UNIQUE INDEX try_unique_append_index_a_b ON index_test_append(a,b);
|
|||
CREATE INDEX lineitem_orderkey_index ON lineitem (l_orderkey);
|
||||
CREATE INDEX try_index ON lineitem USING gist (l_orderkey);
|
||||
CREATE INDEX try_index ON lineitem (non_existent_column);
|
||||
|
||||
-- show that we support indexes without names
|
||||
CREATE INDEX ON lineitem (l_orderkey);
|
||||
CREATE UNIQUE INDEX ON index_test_hash(a);
|
||||
CREATE INDEX CONCURRENTLY ON lineitem USING hash (l_shipdate);
|
||||
|
||||
-- Verify that none of failed indexes got created on the master node
|
||||
SELECT * FROM pg_indexes WHERE tablename = 'lineitem' or tablename like 'index_test_%' ORDER BY indexname;
|
||||
|
@ -159,10 +163,10 @@ DROP INDEX CONCURRENTLY lineitem_concurrently_index;
|
|||
|
||||
-- Verify that all the indexes are dropped from the master and one worker node.
|
||||
-- As there's a primary key, so exclude those from this check.
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||
\c - - - :worker_1_port
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%';
|
||||
SELECT indrelid::regclass, indexrelid::regclass FROM pg_index WHERE indrelid = (SELECT relname FROM pg_class WHERE relname LIKE 'lineitem%' ORDER BY relname LIMIT 1)::regclass AND NOT indisprimary AND indexrelid::regclass::text NOT LIKE 'lineitem_time_index%' ORDER BY 1,2;
|
||||
SELECT * FROM pg_indexes WHERE tablename LIKE 'index_test_%' ORDER BY indexname;
|
||||
|
||||
-- create index that will conflict with master operations
|
||||
|
@ -270,10 +274,52 @@ CREATE INDEX f1
|
|||
ON test_index_creation1 USING btree
|
||||
(field1);
|
||||
|
||||
SELECT
|
||||
'CREATE TABLE distributed_table(' ||
|
||||
string_Agg('col' || x::text || ' int,', ' ') ||
|
||||
' last_column int)'
|
||||
FROM
|
||||
generate_Series(1, 32) x;
|
||||
\gexec
|
||||
|
||||
SELECT create_distributed_table('distributed_table', 'last_column');
|
||||
|
||||
-- try to use all 33 columns to create the index
|
||||
-- show that we error out as postgres would do
|
||||
SELECT
|
||||
'CREATE INDEX ON distributed_table(' ||
|
||||
string_Agg('col' || x::text || ',', ' ') ||
|
||||
' last_column)'
|
||||
FROM
|
||||
generate_Series(1, 32) x;
|
||||
\gexec
|
||||
|
||||
-- show that we generate different default index names
|
||||
-- for the indexes with same parameters on the same relation
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
SELECT indexrelid::regclass FROM pg_index WHERE indrelid='distributed_table'::regclass ORDER BY indexrelid;
|
||||
|
||||
-- test CREATE INDEX in plpgsql to verify that we don't break parse tree
|
||||
CREATE OR REPLACE FUNCTION create_index_in_plpgsql()
|
||||
RETURNS VOID AS
|
||||
$BODY$
|
||||
BEGIN
|
||||
CREATE INDEX ON distributed_table(last_column);
|
||||
END;
|
||||
$BODY$ LANGUAGE plpgsql;
|
||||
|
||||
-- hide plpgsql messages as they differ across pg versions
|
||||
\set VERBOSITY terse
|
||||
|
||||
SELECT create_index_in_plpgsql();
|
||||
SELECT create_index_in_plpgsql();
|
||||
SELECT create_index_in_plpgsql();
|
||||
SELECT indexrelid::regclass FROM pg_index WHERE indrelid='distributed_table'::regclass ORDER BY indexrelid;
|
||||
|
||||
SET citus.force_max_query_parallelization TO OFF;
|
||||
|
||||
SET client_min_messages TO ERROR;
|
||||
\set VERBOSITY terse
|
||||
DROP INDEX f1;
|
||||
DROP INDEX ix_test_index_creation2;
|
||||
DROP INDEX ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1_ix_test_index_creation1;
|
||||
|
|
|
@ -192,9 +192,6 @@ ABORT;
|
|||
|
||||
SELECT * FROM citus_stat_statements_reset();
|
||||
|
||||
-- should not be allowed to upgrade to reference table
|
||||
SELECT upgrade_to_reference_table('singleshard');
|
||||
|
||||
-- should not be allowed to co-located tables
|
||||
SELECT mark_tables_colocated('test', ARRAY['test_coloc'::regclass]);
|
||||
|
||||
|
@ -339,44 +336,6 @@ CREATE TABLE full_access_user_schema.t2(id int);
|
|||
SELECT create_distributed_table('full_access_user_schema.t2', 'id');
|
||||
RESET ROLE;
|
||||
|
||||
-- a user with all privileges on a schema should be able to upgrade a distributed table to
|
||||
-- a reference table
|
||||
SET ROLE full_access;
|
||||
BEGIN;
|
||||
CREATE TABLE full_access_user_schema.r1(id int);
|
||||
SET LOCAL citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('full_access_user_schema.r1', 'id');
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r1');
|
||||
COMMIT;
|
||||
RESET ROLE;
|
||||
|
||||
-- the super user should be able to upgrade a distributed table to a reference table, even
|
||||
-- if it is owned by another user
|
||||
SET ROLE full_access;
|
||||
BEGIN;
|
||||
CREATE TABLE full_access_user_schema.r2(id int);
|
||||
SET LOCAL citus.shard_count TO 1;
|
||||
SELECT create_distributed_table('full_access_user_schema.r2', 'id');
|
||||
COMMIT;
|
||||
RESET ROLE;
|
||||
|
||||
-- the usage_access should not be able to upgrade the table
|
||||
SET ROLE usage_access;
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||
RESET ROLE;
|
||||
|
||||
-- the super user should be able
|
||||
SELECT upgrade_to_reference_table('full_access_user_schema.r2');
|
||||
|
||||
-- verify the owner of the shards for the reference table
|
||||
SELECT result FROM run_command_on_workers($cmd$
|
||||
SELECT tableowner FROM pg_tables WHERE
|
||||
true
|
||||
AND schemaname = 'full_access_user_schema'
|
||||
AND tablename LIKE 'r2_%'
|
||||
LIMIT 1;
|
||||
$cmd$);
|
||||
|
||||
-- super user should be the only one being able to call worker_cleanup_job_schema_cache
|
||||
SELECT worker_cleanup_job_schema_cache();
|
||||
SET ROLE full_access;
|
||||
|
|
|
@ -235,11 +235,6 @@ SELECT create_reference_table('replicate_reference_table_reference_one');
|
|||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE replicate_reference_table_hash(column1 int);
|
||||
SELECT create_distributed_table('replicate_reference_table_hash', 'column1');
|
||||
|
||||
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='replicate_reference_table_hash'::regclass;
|
||||
|
||||
CREATE TABLE replicate_reference_table_reference_two(column1 int);
|
||||
|
||||
|
@ -264,12 +259,11 @@ SELECT
|
|||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
|
||||
ORDER BY logicalrelid;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT 1 FROM master_add_node('localhost', :worker_2_port);
|
||||
SELECT upgrade_to_reference_table('replicate_reference_table_hash');
|
||||
SELECT create_reference_table('replicate_reference_table_reference_two');
|
||||
RESET client_min_messages;
|
||||
|
||||
|
@ -293,12 +287,11 @@ SELECT
|
|||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_hash', 'replicate_reference_table_reference_two')
|
||||
logicalrelid IN ('replicate_reference_table_reference_one', 'replicate_reference_table_reference_two')
|
||||
ORDER BY
|
||||
logicalrelid;
|
||||
|
||||
DROP TABLE replicate_reference_table_reference_one;
|
||||
DROP TABLE replicate_reference_table_hash;
|
||||
DROP TABLE replicate_reference_table_reference_two;
|
||||
|
||||
|
||||
|
|
|
@ -1,730 +0,0 @@
|
|||
--
|
||||
-- MULTI_UPGRADE_REFERENCE_TABLE
|
||||
--
|
||||
-- Tests around upgrade_reference_table UDF
|
||||
--
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_shardid_seq RESTART 1360000;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1360000;
|
||||
|
||||
-- We run this twice, once with coordinator node in pg_dist_node and once without.
|
||||
-- Set client_min_messages to WARNING to discard NOTICE messages by
|
||||
-- upgrade_to_reference_table() to make the output consistent in both cases.
|
||||
-- We check that reference table placements were actually replicated by checking
|
||||
-- pg_dist_placement.
|
||||
SET client_min_messages TO WARNING;
|
||||
|
||||
-- test with not distributed table
|
||||
CREATE TABLE upgrade_reference_table_local(column1 int);
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_local');
|
||||
DROP TABLE upgrade_reference_table_local;
|
||||
|
||||
-- test with table which has more than one shard
|
||||
SET citus.shard_count TO 4;
|
||||
CREATE TABLE upgrade_reference_table_multiple_shard(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_multiple_shard', 'column1');
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_multiple_shard');
|
||||
DROP TABLE upgrade_reference_table_multiple_shard;
|
||||
|
||||
-- test with table which has no shard
|
||||
CREATE TABLE upgrade_reference_table_no_shard(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_no_shard', 'column1', 'append');
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_no_shard');
|
||||
DROP TABLE upgrade_reference_table_no_shard;
|
||||
|
||||
-- test with table with foreign keys
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE upgrade_reference_table_referenced(column1 int PRIMARY KEY);
|
||||
SELECT create_distributed_table('upgrade_reference_table_referenced', 'column1');
|
||||
|
||||
CREATE TABLE upgrade_reference_table_referencing(column1 int REFERENCES upgrade_reference_table_referenced(column1));
|
||||
SELECT create_distributed_table('upgrade_reference_table_referencing', 'column1');
|
||||
|
||||
-- update replication model to statement-based replication since streaming replicated tables cannot be upgraded to reference tables
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referenced'::regclass;
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_referencing'::regclass;
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_referenced');
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_referencing');
|
||||
|
||||
DROP TABLE upgrade_reference_table_referencing;
|
||||
DROP TABLE upgrade_reference_table_referenced;
|
||||
|
||||
-- test with no healthy placements
|
||||
CREATE TABLE upgrade_reference_table_unhealthy(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_unhealthy', 'column1');
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_unhealthy'::regclass;
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_unhealthy'::regclass::oid);
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_unhealthy');
|
||||
DROP TABLE upgrade_reference_table_unhealthy;
|
||||
|
||||
-- test with table containing composite type
|
||||
CREATE TYPE upgrade_test_composite_type AS (key1 text, key2 text);
|
||||
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE upgrade_reference_table_composite(column1 int, column2 upgrade_test_composite_type);
|
||||
SELECT create_distributed_table('upgrade_reference_table_composite', 'column1');
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_composite'::regclass;
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_composite');
|
||||
DROP TABLE upgrade_reference_table_composite;
|
||||
DROP TYPE upgrade_test_composite_type;
|
||||
|
||||
-- test with reference table
|
||||
CREATE TABLE upgrade_reference_table_reference(column1 int);
|
||||
SELECT create_reference_table('upgrade_reference_table_reference');
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_reference');
|
||||
DROP TABLE upgrade_reference_table_reference;
|
||||
|
||||
-- test valid cases, append distributed table
|
||||
CREATE TABLE upgrade_reference_table_append(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_append', 'column1', 'append');
|
||||
COPY upgrade_reference_table_append FROM STDIN;
|
||||
1
|
||||
2
|
||||
3
|
||||
4
|
||||
5
|
||||
\.
|
||||
|
||||
SELECT colocationid AS reference_table_colocationid FROM pg_dist_colocation WHERE distributioncolumntype=0 \gset
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_append'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_append'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
|
||||
|
||||
SELECT count(*) active_primaries FROM pg_dist_node WHERE isactive AND noderole='primary' \gset
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_append');
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_append'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_append'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_append'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_append;
|
||||
|
||||
-- test valid cases, shard exists at one worker
|
||||
CREATE TABLE upgrade_reference_table_one_worker(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_one_worker', 'column1');
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_one_worker'::regclass;
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_one_worker');
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_worker'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_worker'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_one_worker;
|
||||
|
||||
-- test valid cases, shard exists at both workers but one is unhealthy
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
CREATE TABLE upgrade_reference_table_one_unhealthy(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_one_unhealthy', 'column1');
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||
WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass::oid) AND nodeport = :worker_1_port;
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
|
||||
AND shardstate = 1
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_one_unhealthy');
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_one_unhealthy'::regclass)
|
||||
AND shardstate = 1
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_one_unhealthy;
|
||||
|
||||
-- test valid cases, shard exists at both workers and both are healthy
|
||||
CREATE TABLE upgrade_reference_table_both_healthy(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_both_healthy', 'column1');
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_both_healthy');
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_both_healthy'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_both_healthy'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_both_healthy;
|
||||
|
||||
-- test valid cases, do it in transaction and ROLLBACK
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE upgrade_reference_table_transaction_rollback(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_transaction_rollback', 'column1');
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_rollback'::regclass;
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
BEGIN;
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_rollback');
|
||||
ROLLBACK;
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass);
|
||||
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_rollback'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_transaction_rollback;
|
||||
|
||||
-- test valid cases, do it in transaction and COMMIT
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
CREATE TABLE upgrade_reference_table_transaction_commit(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_transaction_commit', 'column1');
|
||||
UPDATE pg_dist_partition SET repmodel='c' WHERE logicalrelid='upgrade_reference_table_transaction_commit'::regclass;
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
BEGIN;
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_transaction_commit');
|
||||
COMMIT;
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_transaction_commit'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
-- verify that shard is replicated to other worker
|
||||
\c - - - :worker_2_port
|
||||
\dt upgrade_reference_table_transaction_commit_*
|
||||
\c - - - :master_port
|
||||
|
||||
DROP TABLE upgrade_reference_table_transaction_commit;
|
||||
|
||||
-- create an mx table
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
|
||||
-- verify that streaming replicated tables cannot be upgraded to reference tables
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
|
||||
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
DROP TABLE upgrade_reference_table_mx;
|
||||
|
||||
-- test valid cases, do it with MX
|
||||
SET citus.shard_count TO 1;
|
||||
SET citus.shard_replication_factor TO 2;
|
||||
RESET citus.replication_model;
|
||||
CREATE TABLE upgrade_reference_table_mx(column1 int);
|
||||
SELECT create_distributed_table('upgrade_reference_table_mx', 'column1');
|
||||
UPDATE pg_dist_shard_placement SET shardstate = 3
|
||||
WHERE nodeport = :worker_2_port AND
|
||||
shardid IN (SELECT shardid FROM pg_dist_shard WHERE logicalrelid='upgrade_reference_table_mx'::regclass);
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
||||
-- situation before upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
SET client_min_messages TO WARNING;
|
||||
SELECT upgrade_to_reference_table('upgrade_reference_table_mx');
|
||||
|
||||
|
||||
-- situation after upgrade_reference_table
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT shardcount, replicationfactor, distributioncolumntype
|
||||
FROM pg_dist_colocation
|
||||
WHERE colocationid IN
|
||||
(SELECT colocationid
|
||||
FROM pg_dist_partition
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass);
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
-- situation on metadata worker
|
||||
\c - - - :worker_1_port
|
||||
SELECT
|
||||
partmethod, (partkey IS NULL) as partkeyisnull, colocationid = :reference_table_colocationid AS has_reference_table_colocation_id, repmodel
|
||||
FROM
|
||||
pg_dist_partition
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, (shardminvalue IS NULL) as shardminvalueisnull, (shardmaxvalue IS NULL) as shardmaxvalueisnull
|
||||
FROM
|
||||
pg_dist_shard
|
||||
WHERE
|
||||
logicalrelid = 'upgrade_reference_table_mx'::regclass;
|
||||
|
||||
SELECT
|
||||
shardid, count(distinct nodeport) = :active_primaries
|
||||
FROM pg_dist_shard_placement
|
||||
WHERE shardid IN
|
||||
(SELECT shardid
|
||||
FROM pg_dist_shard
|
||||
WHERE logicalrelid = 'upgrade_reference_table_mx'::regclass)
|
||||
GROUP BY shardid
|
||||
ORDER BY shardid;
|
||||
|
||||
\c - - - :master_port
|
||||
DROP TABLE upgrade_reference_table_mx;
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
RESET client_min_messages;
|
|
@ -1,3 +1,7 @@
|
|||
SHOW server_version \gset
|
||||
SELECT substring(:'server_version', '\d+')::int >= 12 AS have_table_am
|
||||
\gset
|
||||
|
||||
CREATE TEMPORARY TABLE output (line text);
|
||||
|
||||
CREATE SCHEMA dumper;
|
||||
|
@ -25,6 +29,24 @@ COPY data FROM STDIN WITH (format csv, delimiter '|', escape '\');
|
|||
-- data should now appear twice
|
||||
COPY data TO STDOUT;
|
||||
|
||||
\if :have_table_am
|
||||
CREATE TABLE simple_columnar(i INT, t TEXT) USING columnar;
|
||||
\else
|
||||
CREATE TABLE simple_columnar(i INT, t TEXT);
|
||||
\endif
|
||||
|
||||
INSERT INTO simple_columnar VALUES (1, 'one'), (2, 'two');
|
||||
|
||||
\if :have_table_am
|
||||
CREATE TABLE dist_columnar(i INT, t TEXT) USING columnar;
|
||||
\else
|
||||
CREATE TABLE dist_columnar(i INT, t TEXT);
|
||||
\endif
|
||||
|
||||
SELECT create_distributed_table('dist_columnar', 'i');
|
||||
|
||||
INSERT INTO dist_columnar VALUES (1000, 'one thousand'), (2000, 'two thousand');
|
||||
|
||||
-- go crazy with names
|
||||
CREATE TABLE "weird.table" (
|
||||
"key," int primary key,
|
||||
|
@ -54,11 +76,28 @@ DROP SCHEMA dumper CASCADE;
|
|||
-- redistribute the schema
|
||||
SELECT create_distributed_table('data', 'key');
|
||||
SELECT create_distributed_table('"weird.table"', 'key,');
|
||||
SELECT create_distributed_table('dist_columnar', 'i');
|
||||
|
||||
-- check the table contents
|
||||
COPY data (value) TO STDOUT WITH (format csv, force_quote *);
|
||||
COPY dumper."weird.table" ("data.jsonb", "?empty(") TO STDOUT WITH (format csv, force_quote ("?empty("), null 'null', header true);
|
||||
|
||||
-- If server supports table access methods, check to be sure that the
|
||||
-- recreated table is still columnar. Otherwise, just return true.
|
||||
\if :have_table_am
|
||||
\set is_columnar '(SELECT amname=''columnar'' from pg_am where relam=oid)'
|
||||
\else
|
||||
\set is_columnar TRUE
|
||||
\endif
|
||||
|
||||
SELECT :is_columnar AS check_columnar FROM pg_class WHERE oid='simple_columnar'::regclass;
|
||||
|
||||
COPY simple_columnar TO STDOUT;
|
||||
|
||||
SELECT :is_columnar AS check_columnar FROM pg_class WHERE oid='dist_columnar'::regclass;
|
||||
|
||||
COPY dist_columnar TO STDOUT;
|
||||
|
||||
SELECT indexname FROM pg_indexes WHERE tablename = 'weird.table' ORDER BY indexname;
|
||||
|
||||
DROP SCHEMA dumper CASCADE;
|
||||
|
|
Loading…
Reference in New Issue