mirror of https://github.com/citusdata/citus.git
Merge pull request #5742 from citusdata/marcocitus/colocation
commit
ab614194fd
|
@ -28,6 +28,7 @@
|
|||
#include "catalog/indexing.h"
|
||||
#include "catalog/pg_am.h"
|
||||
#include "catalog/pg_attrdef.h"
|
||||
#include "catalog/pg_collation.h"
|
||||
#include "catalog/pg_constraint.h"
|
||||
#include "catalog/pg_depend.h"
|
||||
#include "catalog/pg_foreign_server.h"
|
||||
|
@ -48,12 +49,14 @@
|
|||
#include "distributed/maintenanced.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata_utility.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
#include "distributed/pg_dist_colocation.h"
|
||||
#include "distributed/pg_dist_node.h"
|
||||
#include "distributed/pg_dist_shard.h"
|
||||
#include "distributed/relation_access_tracking.h"
|
||||
|
@ -124,6 +127,14 @@ static void EnsureShardMetadataIsSane(Oid relationId, int64 shardId, char storag
|
|||
static void EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId,
|
||||
int64 placementId, int32 shardState,
|
||||
int64 shardLength, int32 groupId);
|
||||
static char * ColocationGroupCreateCommand(uint32 colocationId, int shardCount,
|
||||
int replicationFactor,
|
||||
Oid distributionColumnType,
|
||||
Oid distributionColumnCollation);
|
||||
static char * ColocationGroupDeleteCommand(uint32 colocationId);
|
||||
static char * RemoteTypeIdExpression(Oid typeId);
|
||||
static char * RemoteCollationIdExpression(Oid colocationId);
|
||||
|
||||
|
||||
PG_FUNCTION_INFO_V1(start_metadata_sync_to_node);
|
||||
PG_FUNCTION_INFO_V1(stop_metadata_sync_to_node);
|
||||
|
@ -142,6 +153,8 @@ PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
|||
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_relation_colocation);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_colocation_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_delete_colocation_metadata);
|
||||
|
||||
|
||||
static bool got_SIGTERM = false;
|
||||
|
@ -558,6 +571,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
|
|||
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_PLACEMENTS);
|
||||
dropMetadataCommandList = lappend(dropMetadataCommandList,
|
||||
DELETE_ALL_DISTRIBUTED_OBJECTS);
|
||||
dropMetadataCommandList = lappend(dropMetadataCommandList, DELETE_ALL_COLOCATION);
|
||||
|
||||
Assert(superuser());
|
||||
SendOptionalMetadataCommandListToWorkerInCoordinatedTransaction(
|
||||
|
@ -3161,3 +3175,311 @@ citus_internal_update_relation_colocation(PG_FUNCTION_ARGS)
|
|||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_internal_add_colocation_metadata is an internal UDF to
|
||||
* add a row to pg_dist_colocation.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_add_colocation_metadata(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
int colocationId = PG_GETARG_INT32(0);
|
||||
int shardCount = PG_GETARG_INT32(1);
|
||||
int replicationFactor = PG_GETARG_INT32(2);
|
||||
Oid distributionColumnType = PG_GETARG_INT32(3);
|
||||
Oid distributionColumnCollation = PG_GETARG_INT32(4);
|
||||
|
||||
if (!ShouldSkipMetadataChecks())
|
||||
{
|
||||
/* this UDF is not allowed allowed for executing as a separate command */
|
||||
EnsureCoordinatorInitiatedOperation();
|
||||
}
|
||||
|
||||
InsertColocationGroupLocally(colocationId, shardCount, replicationFactor,
|
||||
distributionColumnType, distributionColumnCollation);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_internal_delete_colocation_metadata is an internal UDF to
|
||||
* delte row from pg_dist_colocation.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_delete_colocation_metadata(PG_FUNCTION_ARGS)
|
||||
{
|
||||
CheckCitusVersion(ERROR);
|
||||
EnsureSuperUser();
|
||||
|
||||
int colocationId = PG_GETARG_INT32(0);
|
||||
|
||||
if (!ShouldSkipMetadataChecks())
|
||||
{
|
||||
/* this UDF is not allowed allowed for executing as a separate command */
|
||||
EnsureCoordinatorInitiatedOperation();
|
||||
}
|
||||
|
||||
DeleteColocationGroupLocally(colocationId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncNewColocationGroup synchronizes a new pg_dist_colocation entry to a worker.
|
||||
*/
|
||||
void
|
||||
SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount, int replicationFactor,
|
||||
Oid distributionColumnType, Oid distributionColumnCollation)
|
||||
{
|
||||
char *command = ColocationGroupCreateCommand(colocationId, shardCount,
|
||||
replicationFactor,
|
||||
distributionColumnType,
|
||||
distributionColumnCollation);
|
||||
|
||||
/*
|
||||
* We require superuser for all pg_dist_colocation operations because we have
|
||||
* no reasonable way of restricting access.
|
||||
*/
|
||||
SendCommandToWorkersWithMetadataViaSuperUser(command);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColocationGroupCreateCommand returns a command for creating a colocation group.
|
||||
*/
|
||||
static char *
|
||||
ColocationGroupCreateCommand(uint32 colocationId, int shardCount, int replicationFactor,
|
||||
Oid distributionColumnType, Oid distributionColumnCollation)
|
||||
{
|
||||
StringInfo insertColocationCommand = makeStringInfo();
|
||||
|
||||
appendStringInfo(insertColocationCommand,
|
||||
"SELECT pg_catalog.citus_internal_add_colocation_metadata("
|
||||
"%d, %d, %d, %s, %s)",
|
||||
colocationId,
|
||||
shardCount,
|
||||
replicationFactor,
|
||||
RemoteTypeIdExpression(distributionColumnType),
|
||||
RemoteCollationIdExpression(distributionColumnCollation));
|
||||
|
||||
return insertColocationCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoteTypeIdExpression returns an expression in text form that can
|
||||
* be used to obtain the OID of a type on a different node when included
|
||||
* in a query string.
|
||||
*/
|
||||
static char *
|
||||
RemoteTypeIdExpression(Oid typeId)
|
||||
{
|
||||
/* by default, use 0 (InvalidOid) */
|
||||
char *expression = "0";
|
||||
|
||||
/* we also have pg_dist_colocation entries for reference tables */
|
||||
if (typeId != InvalidOid)
|
||||
{
|
||||
char *typeName = format_type_extended(typeId, -1,
|
||||
FORMAT_TYPE_FORCE_QUALIFY |
|
||||
FORMAT_TYPE_ALLOW_INVALID);
|
||||
|
||||
/* format_type_extended returns ??? in case of an unknown type */
|
||||
if (strcmp(typeName, "???") != 0)
|
||||
{
|
||||
StringInfo regtypeExpression = makeStringInfo();
|
||||
|
||||
appendStringInfo(regtypeExpression,
|
||||
"%s::regtype",
|
||||
quote_literal_cstr(typeName));
|
||||
|
||||
expression = regtypeExpression->data;
|
||||
}
|
||||
}
|
||||
|
||||
return expression;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* RemoteCollationIdExpression returns an expression in text form that can
|
||||
* be used to obtain the OID of a type on a different node when included
|
||||
* in a query string. Currently this is a sublink because regcollation type
|
||||
* is not available in PG12.
|
||||
*/
|
||||
static char *
|
||||
RemoteCollationIdExpression(Oid colocationId)
|
||||
{
|
||||
/* by default, use 0 (InvalidOid) */
|
||||
char *expression = "0";
|
||||
|
||||
if (colocationId != InvalidOid)
|
||||
{
|
||||
Datum collationIdDatum = ObjectIdGetDatum(colocationId);
|
||||
HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum);
|
||||
|
||||
if (HeapTupleIsValid(collationTuple))
|
||||
{
|
||||
Form_pg_collation collationform =
|
||||
(Form_pg_collation) GETSTRUCT(collationTuple);
|
||||
char *collationName = NameStr(collationform->collname);
|
||||
char *collationSchemaName = get_namespace_name(collationform->collnamespace);
|
||||
|
||||
StringInfo colocationIdQuery = makeStringInfo();
|
||||
appendStringInfo(colocationIdQuery,
|
||||
"(select oid from pg_collation"
|
||||
" where collname = %s"
|
||||
" and collnamespace = %s::regnamespace)",
|
||||
quote_literal_cstr(collationName),
|
||||
quote_literal_cstr(collationSchemaName));
|
||||
|
||||
expression = colocationIdQuery->data;
|
||||
}
|
||||
|
||||
ReleaseSysCache(collationTuple);
|
||||
}
|
||||
|
||||
return expression;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* SyncDeleteColocationGroupToNodes deletes a pg_dist_colocation record from workers.
|
||||
*/
|
||||
void
|
||||
SyncDeleteColocationGroupToNodes(uint32 colocationId)
|
||||
{
|
||||
char *command = ColocationGroupDeleteCommand(colocationId);
|
||||
|
||||
/*
|
||||
* We require superuser for all pg_dist_colocation operations because we have
|
||||
* no reasonable way of restricting access.
|
||||
*/
|
||||
SendCommandToWorkersWithMetadataViaSuperUser(command);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColocationGroupDeleteCommand returns a command for deleting a colocation group.
|
||||
*/
|
||||
static char *
|
||||
ColocationGroupDeleteCommand(uint32 colocationId)
|
||||
{
|
||||
StringInfo deleteColocationCommand = makeStringInfo();
|
||||
|
||||
appendStringInfo(deleteColocationCommand,
|
||||
"SELECT pg_catalog.citus_internal_delete_colocation_metadata(%d)",
|
||||
colocationId);
|
||||
|
||||
return deleteColocationCommand->data;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ColocationGroupCreateCommandList returns the full list of commands for syncing
|
||||
* pg_dist_colocation.
|
||||
*/
|
||||
List *
|
||||
ColocationGroupCreateCommandList(void)
|
||||
{
|
||||
bool hasColocations = false;
|
||||
|
||||
StringInfo colocationGroupCreateCommand = makeStringInfo();
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"WITH colocation_group_data (colocationid, shardcount, "
|
||||
"replicationfactor, distributioncolumntype, "
|
||||
"distributioncolumncollationname, "
|
||||
"distributioncolumncollationschema) AS (VALUES ");
|
||||
|
||||
Relation pgDistColocation = table_open(DistColocationRelationId(), AccessShareLock);
|
||||
|
||||
bool indexOK = false;
|
||||
SysScanDesc scanDescriptor = systable_beginscan(pgDistColocation, InvalidOid, indexOK,
|
||||
NULL, 0, NULL);
|
||||
|
||||
HeapTuple colocationTuple = systable_getnext(scanDescriptor);
|
||||
|
||||
while (HeapTupleIsValid(colocationTuple))
|
||||
{
|
||||
if (hasColocations)
|
||||
{
|
||||
appendStringInfo(colocationGroupCreateCommand, ", ");
|
||||
}
|
||||
|
||||
hasColocations = true;
|
||||
|
||||
Form_pg_dist_colocation colocationForm =
|
||||
(Form_pg_dist_colocation) GETSTRUCT(colocationTuple);
|
||||
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"(%d, %d, %d, %s, ",
|
||||
colocationForm->colocationid,
|
||||
colocationForm->shardcount,
|
||||
colocationForm->replicationfactor,
|
||||
RemoteTypeIdExpression(colocationForm->distributioncolumntype));
|
||||
|
||||
/*
|
||||
* For collations, include the names in the VALUES section and then
|
||||
* join with pg_collation.
|
||||
*/
|
||||
Oid distributionColumCollation = colocationForm->distributioncolumncollation;
|
||||
if (distributionColumCollation != InvalidOid)
|
||||
{
|
||||
Datum collationIdDatum = ObjectIdGetDatum(distributionColumCollation);
|
||||
HeapTuple collationTuple = SearchSysCache1(COLLOID, collationIdDatum);
|
||||
|
||||
if (HeapTupleIsValid(collationTuple))
|
||||
{
|
||||
Form_pg_collation collationform =
|
||||
(Form_pg_collation) GETSTRUCT(collationTuple);
|
||||
char *collationName = NameStr(collationform->collname);
|
||||
char *collationSchemaName = get_namespace_name(
|
||||
collationform->collnamespace);
|
||||
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"%s, %s)",
|
||||
quote_literal_cstr(collationName),
|
||||
quote_literal_cstr(collationSchemaName));
|
||||
|
||||
ReleaseSysCache(collationTuple);
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"NULL, NULL)");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
"NULL, NULL)");
|
||||
}
|
||||
|
||||
colocationTuple = systable_getnext(scanDescriptor);
|
||||
}
|
||||
|
||||
systable_endscan(scanDescriptor);
|
||||
table_close(pgDistColocation, AccessShareLock);
|
||||
|
||||
if (!hasColocations)
|
||||
{
|
||||
return NIL;
|
||||
}
|
||||
|
||||
appendStringInfo(colocationGroupCreateCommand,
|
||||
") SELECT pg_catalog.citus_internal_add_colocation_metadata("
|
||||
"colocationid, shardcount, replicationfactor, "
|
||||
"distributioncolumntype, coalesce(c.oid, 0)) "
|
||||
"FROM colocation_group_data d LEFT JOIN pg_collation c "
|
||||
"ON (d.distributioncolumncollationname = c.collname "
|
||||
"AND d.distributioncolumncollationschema::regnamespace"
|
||||
" = c.collnamespace)");
|
||||
|
||||
return list_make1(colocationGroupCreateCommand->data);
|
||||
}
|
||||
|
|
|
@ -653,6 +653,8 @@ PgDistTableMetadataSyncCommandList(void)
|
|||
DELETE_ALL_PLACEMENTS);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
DELETE_ALL_DISTRIBUTED_OBJECTS);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
DELETE_ALL_COLOCATION);
|
||||
|
||||
/* create pg_dist_partition, pg_dist_shard and pg_dist_placement entries */
|
||||
foreach_ptr(cacheEntry, propagatedTableList)
|
||||
|
@ -664,6 +666,11 @@ PgDistTableMetadataSyncCommandList(void)
|
|||
tableMetadataCreateCommandList);
|
||||
}
|
||||
|
||||
/* commands to insert pg_dist_colocation entries */
|
||||
List *colocationGroupSyncCommandList = ColocationGroupCreateCommandList();
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
colocationGroupSyncCommandList);
|
||||
|
||||
/* As the last step, propagate the pg_dist_object entities */
|
||||
Assert(ShouldPropagate());
|
||||
List *distributedObjectSyncCommandList = DistributedObjectMetadataSyncCommandList();
|
||||
|
|
|
@ -10,6 +10,8 @@
|
|||
#include "udfs/citus_shard_indexes_on_worker/11.0-1.sql"
|
||||
|
||||
#include "udfs/citus_internal_add_object_metadata/11.0-1.sql"
|
||||
#include "udfs/citus_internal_add_colocation_metadata/11.0-1.sql"
|
||||
#include "udfs/citus_internal_delete_colocation_metadata/11.0-1.sql"
|
||||
#include "udfs/citus_run_local_command/11.0-1.sql"
|
||||
#include "udfs/worker_drop_sequence_dependency/11.0-1.sql"
|
||||
#include "udfs/worker_drop_shell_table/11.0-1.sql"
|
||||
|
|
|
@ -50,6 +50,8 @@ DROP FUNCTION pg_catalog.citus_check_connection_to_node (text, integer);
|
|||
DROP FUNCTION pg_catalog.citus_check_cluster_node_health ();
|
||||
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(text, text[], text[], integer, integer, boolean);
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int, int, int, regtype, oid);
|
||||
DROP FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int);
|
||||
DROP FUNCTION pg_catalog.citus_run_local_command(text);
|
||||
DROP FUNCTION pg_catalog.worker_drop_sequence_dependency(text);
|
||||
DROP FUNCTION pg_catalog.worker_drop_shell_table(table_name text);
|
||||
|
|
13
src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql
generated
Normal file
13
src/backend/distributed/sql/udfs/citus_internal_add_colocation_metadata/11.0-1.sql
generated
Normal file
|
@ -0,0 +1,13 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_colocation_metadata(
|
||||
colocation_id int,
|
||||
shard_count int,
|
||||
replication_factor int,
|
||||
distribution_column_type regtype,
|
||||
distribution_column_collation oid)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int,int,int,regtype,oid) IS
|
||||
'Inserts a co-location group into pg_dist_colocation';
|
|
@ -0,0 +1,13 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_colocation_metadata(
|
||||
colocation_id int,
|
||||
shard_count int,
|
||||
replication_factor int,
|
||||
distribution_column_type regtype,
|
||||
distribution_column_collation oid)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_colocation_metadata(int,int,int,regtype,oid) IS
|
||||
'Inserts a co-location group into pg_dist_colocation';
|
9
src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql
generated
Normal file
9
src/backend/distributed/sql/udfs/citus_internal_delete_colocation_metadata/11.0-1.sql
generated
Normal file
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(
|
||||
colocation_id int)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int) IS
|
||||
'deletes a co-location group from pg_dist_colocation';
|
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(
|
||||
colocation_id int)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
STRICT
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_delete_colocation_metadata(int) IS
|
||||
'deletes a co-location group from pg_dist_colocation';
|
|
@ -583,6 +583,25 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
|
|||
Oid distributionColumnCollation)
|
||||
{
|
||||
uint32 colocationId = GetNextColocationId();
|
||||
|
||||
InsertColocationGroupLocally(colocationId, shardCount, replicationFactor,
|
||||
distributionColumnType, distributionColumnCollation);
|
||||
|
||||
SyncNewColocationGroupToNodes(colocationId, shardCount, replicationFactor,
|
||||
distributionColumnType, distributionColumnCollation);
|
||||
|
||||
return colocationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertColocationGroupLocally inserts a record into pg_dist_colocation.
|
||||
*/
|
||||
void
|
||||
InsertColocationGroupLocally(uint32 colocationId, int shardCount, int replicationFactor,
|
||||
Oid distributionColumnType,
|
||||
Oid distributionColumnCollation)
|
||||
{
|
||||
Datum values[Natts_pg_dist_colocation];
|
||||
bool isNulls[Natts_pg_dist_colocation];
|
||||
|
||||
|
@ -610,8 +629,6 @@ CreateColocationGroup(int shardCount, int replicationFactor, Oid distributionCol
|
|||
/* increment the counter so that next command can see the row */
|
||||
CommandCounterIncrement();
|
||||
table_close(pgDistColocation, RowExclusiveLock);
|
||||
|
||||
return colocationId;
|
||||
}
|
||||
|
||||
|
||||
|
@ -1215,10 +1232,22 @@ DeleteColocationGroupIfNoTablesBelong(uint32 colocationId)
|
|||
|
||||
|
||||
/*
|
||||
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation.
|
||||
* DeleteColocationGroup deletes the colocation group from pg_dist_colocation
|
||||
* throughout the cluster.
|
||||
*/
|
||||
static void
|
||||
DeleteColocationGroup(uint32 colocationId)
|
||||
{
|
||||
DeleteColocationGroupLocally(colocationId);
|
||||
SyncDeleteColocationGroupToNodes(colocationId);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DeleteColocationGroupLocally deletes the colocation group from pg_dist_colocation.
|
||||
*/
|
||||
void
|
||||
DeleteColocationGroupLocally(uint32 colocationId)
|
||||
{
|
||||
int scanKeyCount = 1;
|
||||
ScanKeyData scanKey[1];
|
||||
|
|
|
@ -31,6 +31,10 @@ uint32 ColocationId(int shardCount, int replicationFactor, Oid distributionColum
|
|||
extern uint32 CreateColocationGroup(int shardCount, int replicationFactor,
|
||||
Oid distributionColumnType,
|
||||
Oid distributionColumnCollation);
|
||||
extern void InsertColocationGroupLocally(uint32 colocationId, int shardCount,
|
||||
int replicationFactor,
|
||||
Oid distributionColumnType,
|
||||
Oid distributionColumnCollation);
|
||||
extern bool IsColocateWithNone(char *colocateWithTableName);
|
||||
extern uint32 GetNextColocationId(void);
|
||||
extern void ErrorIfShardPlacementsNotColocated(Oid leftRelationId, Oid rightRelationId);
|
||||
|
@ -43,5 +47,6 @@ extern void UpdateRelationColocationGroup(Oid distributedRelationId, uint32 colo
|
|||
bool localOnly);
|
||||
extern void DeleteColocationGroupIfNoTablesBelong(uint32 colocationId);
|
||||
extern List * ColocationGroupTableList(uint32 colocationId, uint32 count);
|
||||
extern void DeleteColocationGroupLocally(uint32 colocationId);
|
||||
|
||||
#endif /* COLOCATION_UTILS_H_ */
|
||||
|
|
|
@ -38,6 +38,7 @@ extern bool ShouldSyncTableMetadata(Oid relationId);
|
|||
extern bool ShouldSyncTableMetadataViaCatalog(Oid relationId);
|
||||
extern List * NodeMetadataCreateCommands(void);
|
||||
extern List * DistributedObjectMetadataSyncCommandList(void);
|
||||
extern List * ColocationGroupCreateCommandList(void);
|
||||
extern List * CitusTableMetadataCreateCommandList(Oid relationId);
|
||||
extern List * NodeMetadataDropCommands(void);
|
||||
extern char * MarkObjectsDistributedCreateCommand(List *addresses,
|
||||
|
@ -76,12 +77,18 @@ extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
|
|||
extern List * GetDependentFunctionsWithRelation(Oid relationId);
|
||||
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
|
||||
extern void SetLocalEnableMetadataSync(bool state);
|
||||
extern void SyncNewColocationGroupToNodes(uint32 colocationId, int shardCount,
|
||||
int replicationFactor,
|
||||
Oid distributionColumType,
|
||||
Oid distributionColumnCollation);
|
||||
extern void SyncDeleteColocationGroupToNodes(uint32 colocationId);
|
||||
|
||||
#define DELETE_ALL_NODES "DELETE FROM pg_dist_node"
|
||||
#define DELETE_ALL_PLACEMENTS "DELETE FROM pg_dist_placement"
|
||||
#define DELETE_ALL_SHARDS "DELETE FROM pg_dist_shard"
|
||||
#define DELETE_ALL_DISTRIBUTED_OBJECTS "DELETE FROM citus.pg_dist_object"
|
||||
#define DELETE_ALL_PARTITIONS "DELETE FROM pg_dist_partition"
|
||||
#define DELETE_ALL_COLOCATION "DELETE FROM pg_catalog.pg_dist_colocation"
|
||||
#define REMOVE_ALL_SHELL_TABLES_COMMAND \
|
||||
"SELECT worker_drop_shell_table(logicalrelid::regclass::text) FROM pg_dist_partition"
|
||||
#define REMOVE_ALL_CITUS_TABLES_COMMAND \
|
||||
|
|
|
@ -101,6 +101,12 @@ SELECT * FROM rebalance_table_shards();
|
|||
-- TODO: Figure out why this is necessary, rebalance_table_shards shouldn't
|
||||
-- insert stuff into pg_dist_colocation
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT run_command_on_workers('TRUNCATE pg_dist_colocation');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"TRUNCATE TABLE")
|
||||
(1 row)
|
||||
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||
SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
|
||||
?column?
|
||||
|
|
|
@ -2,6 +2,13 @@ SET citus.next_shard_id TO 1300000;
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 4;
|
||||
-- Delete orphaned entries from pg_dist_colocation
|
||||
DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6;
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
-- ===================================================================
|
||||
-- create test utility function
|
||||
-- ===================================================================
|
||||
|
@ -356,6 +363,13 @@ SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5);
|
|||
(1 row)
|
||||
|
||||
DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5);
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5)');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
SET citus.shard_count = 2;
|
||||
CREATE TABLE table1_groupA ( id int );
|
||||
SELECT create_distributed_table('table1_groupA', 'id');
|
||||
|
@ -449,6 +463,23 @@ SELECT * FROM pg_dist_colocation
|
|||
7 | 8 | 2 | 23 | 0
|
||||
(4 rows)
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
nodeport | unnest
|
||||
---------------------------------------------------------------------
|
||||
57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 6, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57637 | {"shardcount": 8, "colocationid": 7, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 6, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57638 | {"shardcount": 8, "colocationid": 7, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
(8 rows)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY logicalrelid;
|
||||
|
@ -472,6 +503,16 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
|
|||
4 | 2 | 2 | 23 | 0
|
||||
(1 row)
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4
|
||||
$$);
|
||||
nodeport | unnest
|
||||
---------------------------------------------------------------------
|
||||
57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
(2 rows)
|
||||
|
||||
-- dropping all tables in a colocation group also deletes the colocation group
|
||||
DROP TABLE table2_groupA;
|
||||
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
|
||||
|
@ -480,6 +521,16 @@ SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
|
|||
4 | 2 | 2 | 23 | 0
|
||||
(1 row)
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4
|
||||
$$);
|
||||
nodeport | unnest
|
||||
---------------------------------------------------------------------
|
||||
57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
(2 rows)
|
||||
|
||||
-- create dropped colocation group again
|
||||
SET citus.shard_count = 2;
|
||||
CREATE TABLE table1_groupE ( id int );
|
||||
|
@ -820,6 +871,13 @@ ORDER BY
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
||||
DELETE FROM pg_dist_colocation
|
||||
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000');
|
||||
?column?
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
1
|
||||
(2 rows)
|
||||
|
||||
UPDATE pg_dist_partition SET colocationid = 0
|
||||
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||
-- check metadata
|
||||
|
@ -933,6 +991,25 @@ SELECT * FROM pg_dist_colocation
|
|||
5 | 2 | 2 | 23 | 0
|
||||
(5 rows)
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
nodeport | unnest
|
||||
---------------------------------------------------------------------
|
||||
57637 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57637 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57638 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
(10 rows)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid, logicalrelid;
|
||||
|
@ -1008,6 +1085,25 @@ SELECT * FROM pg_dist_colocation
|
|||
5 | 2 | 2 | 23 | 0
|
||||
(5 rows)
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
nodeport | unnest
|
||||
---------------------------------------------------------------------
|
||||
57637 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57637 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57637 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 1, "replicationfactor": 1, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 2, "replicationfactor": 2, "distributioncolumntype": "25", "distributioncolumncollation": "100"}
|
||||
57638 | {"shardcount": 8, "colocationid": 3, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 4, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
57638 | {"shardcount": 2, "colocationid": 5, "replicationfactor": 2, "distributioncolumntype": "23", "distributioncolumncollation": "0"}
|
||||
(10 rows)
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid, logicalrelid;
|
||||
|
|
|
@ -428,20 +428,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE
|
|||
ALTER EXTENSION citus UPDATE TO '9.4-2';
|
||||
-- should see the old source code
|
||||
SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1;
|
||||
prosrc
|
||||
prosrc
|
||||
---------------------------------------------------------------------
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -469,20 +469,20 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
ALTER EXTENSION citus UPDATE TO '9.4-1';
|
||||
-- should see the old source code
|
||||
SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1;
|
||||
prosrc
|
||||
prosrc
|
||||
---------------------------------------------------------------------
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -578,20 +578,20 @@ SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDE
|
|||
ALTER EXTENSION citus UPDATE TO '9.5-2';
|
||||
-- should see the old source code
|
||||
SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1;
|
||||
prosrc
|
||||
prosrc
|
||||
---------------------------------------------------------------------
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -619,20 +619,20 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
ALTER EXTENSION citus UPDATE TO '9.5-1';
|
||||
-- should see the old source code
|
||||
SELECT prosrc FROM pg_proc WHERE proname = 'master_update_table_statistics' ORDER BY 1;
|
||||
prosrc
|
||||
prosrc
|
||||
---------------------------------------------------------------------
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
+
|
||||
DECLARE +
|
||||
colocated_tables regclass[]; +
|
||||
BEGIN +
|
||||
SELECT get_colocated_table_array(relation) INTO colocated_tables;+
|
||||
PERFORM +
|
||||
master_update_shard_statistics(shardid) +
|
||||
FROM +
|
||||
pg_dist_shard +
|
||||
WHERE +
|
||||
logicalrelid = ANY (colocated_tables); +
|
||||
END; +
|
||||
|
||||
(1 row)
|
||||
|
||||
|
@ -1013,7 +1013,9 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function citus_check_connection_to_node(text,integer) boolean
|
||||
| function citus_disable_node(text,integer,boolean) void
|
||||
| function citus_finalize_upgrade_to_citus11(boolean) boolean
|
||||
| function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid) void
|
||||
| function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean) void
|
||||
| function citus_internal_delete_colocation_metadata(integer) void
|
||||
| function citus_internal_global_blocked_processes() SETOF record
|
||||
| function citus_internal_local_blocked_processes() SETOF record
|
||||
| function citus_run_local_command(text) void
|
||||
|
@ -1026,7 +1028,7 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
| function worker_drop_sequence_dependency(text) void
|
||||
| function worker_drop_shell_table(text) void
|
||||
| function worker_partition_query_result(text,text,integer,citus.distribution_type,text[],text[],boolean,boolean,boolean) SETOF record
|
||||
(23 rows)
|
||||
(25 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -59,10 +59,11 @@ ALTER ROLE CURRENT_USER WITH PASSWORD 'dummypassword';
|
|||
-- Show that, with no MX tables, activate node snapshot contains only the delete commands,
|
||||
-- pg_dist_node entries, pg_dist_object entries and roles.
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -87,8 +88,9 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
(27 rows)
|
||||
(29 rows)
|
||||
|
||||
-- this function is dropped in Citus10, added here for tests
|
||||
SET citus.enable_metadata_sync TO OFF;
|
||||
|
@ -121,7 +123,7 @@ reset citus.shard_replication_factor;
|
|||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='mx_test_table'::regclass;
|
||||
-- Show that the created MX table is and its sequences are included in the activate node snapshot
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
|
||||
|
@ -130,6 +132,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -159,15 +162,16 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(39 rows)
|
||||
(41 rows)
|
||||
|
||||
-- Show that CREATE INDEX commands are included in the activate node snapshot
|
||||
CREATE INDEX mx_index ON mx_test_table(col_2);
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE public.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
|
||||
|
@ -177,6 +181,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -206,16 +211,17 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(40 rows)
|
||||
(42 rows)
|
||||
|
||||
-- Show that schema changes are included in the activate node snapshot
|
||||
CREATE SCHEMA mx_testing_schema;
|
||||
ALTER TABLE mx_test_table SET SCHEMA mx_testing_schema;
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
|
||||
|
@ -226,6 +232,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -255,10 +262,11 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(41 rows)
|
||||
(43 rows)
|
||||
|
||||
-- Show that append distributed tables are not included in the activate node snapshot
|
||||
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
|
||||
|
@ -270,7 +278,7 @@ SELECT master_create_distributed_table('non_mx_test_table', 'col_1', 'append');
|
|||
|
||||
UPDATE pg_dist_partition SET repmodel='s' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
|
||||
|
@ -281,6 +289,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -310,15 +319,16 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(41 rows)
|
||||
(43 rows)
|
||||
|
||||
-- Show that range distributed tables are not included in the activate node snapshot
|
||||
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.user_defined_seq OWNER TO postgres
|
||||
|
@ -329,6 +339,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE SCHEMA IF NOT EXISTS public AUTHORIZATION postgres
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -358,10 +369,11 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (1, 1, -1, 0, NULL, NULL), (2, 8, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(41 rows)
|
||||
(43 rows)
|
||||
|
||||
-- Test start_metadata_sync_to_node and citus_activate_node UDFs
|
||||
-- Ensure that hasmetadata=false for all nodes
|
||||
|
@ -497,11 +509,13 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
|
|||
col_2 | text | col_2
|
||||
(1 row)
|
||||
|
||||
-- Check that pg_dist_colocation is not synced
|
||||
-- Check that pg_dist_colocation is synced
|
||||
SELECT * FROM pg_dist_colocation ORDER BY colocationid;
|
||||
colocationid | shardcount | replicationfactor | distributioncolumntype | distributioncolumncollation
|
||||
---------------------------------------------------------------------
|
||||
(0 rows)
|
||||
1 | 1 | -1 | 0 | 0
|
||||
2 | 8 | 1 | 23 | 0
|
||||
(2 rows)
|
||||
|
||||
-- Make sure that truncate trigger has been set for the MX table on worker
|
||||
SELECT count(*) FROM pg_trigger WHERE tgrelid='mx_testing_schema.mx_test_table'::regclass;
|
||||
|
@ -1522,6 +1536,13 @@ ORDER BY
|
|||
(2 rows)
|
||||
|
||||
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
|
||||
-- make sure we have the pg_dist_colocation record on the worker
|
||||
SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
1
|
||||
(1 row)
|
||||
|
||||
-- Check that DDL commands are propagated to reference tables on workers
|
||||
\c - - - :master_port
|
||||
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
|
||||
|
@ -1823,7 +1844,7 @@ ALTER TABLE test_table ADD COLUMN id2 int DEFAULT nextval('mx_test_sequence_1');
|
|||
ALTER TABLE test_table ALTER COLUMN id2 DROP DEFAULT;
|
||||
ALTER TABLE test_table ALTER COLUMN id2 SET DEFAULT nextval('mx_test_sequence_1');
|
||||
SELECT unnest(activate_node_snapshot()) order by 1;
|
||||
unnest
|
||||
unnest
|
||||
---------------------------------------------------------------------
|
||||
ALTER SEQUENCE mx_testing_schema.mx_test_table_col_3_seq OWNER TO postgres
|
||||
ALTER SEQUENCE public.mx_test_sequence_0 OWNER TO postgres
|
||||
|
@ -1855,6 +1876,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
CREATE TABLE public.mx_ref (col_1 integer, col_2 text)
|
||||
CREATE TABLE public.test_table (id integer DEFAULT worker_nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT worker_nextval('public.mx_test_sequence_1'::regclass))
|
||||
DELETE FROM citus.pg_dist_object
|
||||
DELETE FROM pg_catalog.pg_dist_colocation
|
||||
DELETE FROM pg_dist_node
|
||||
DELETE FROM pg_dist_partition
|
||||
DELETE FROM pg_dist_placement
|
||||
|
@ -1896,6 +1918,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
SET citus.enable_ddl_propagation TO 'on'
|
||||
SET citus.enable_ddl_propagation TO 'on'
|
||||
UPDATE pg_dist_local_group SET groupid = 1
|
||||
WITH colocation_group_data (colocationid, shardcount, replicationfactor, distributioncolumntype, distributioncolumncollationname, distributioncolumncollationschema) AS (VALUES (10002, 7, 1, 'integer'::regtype, NULL, NULL), (10003, 1, -1, 0, NULL, NULL), (10004, 3, 1, 'integer'::regtype, NULL, NULL), (10005, 4, 1, 'integer'::regtype, NULL, NULL)) SELECT pg_catalog.citus_internal_add_colocation_metadata(colocationid, shardcount, replicationfactor, distributioncolumntype, coalesce(c.oid, 0)) FROM colocation_group_data d LEFT JOIN pg_collation c ON (d.distributioncolumncollationname = c.collname AND d.distributioncolumncollationschema::regnamespace = c.collnamespace)
|
||||
WITH distributed_object_data(typetext, objnames, objargs, distargumentindex, colocationid, force_delegation) AS (VALUES ('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_testing_schema', 'mx_test_table']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_1', 'mx_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['mx_test_schema_2', 'mx_table_2']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'mx_ref']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'dist_table_1']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[], -1, 0, false), ('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[], -1, 0, false), ('table', ARRAY['public', 'test_table']::text[], ARRAY[]::text[], -1, 0, false), ('role', ARRAY['postgres']::text[], ARRAY[]::text[], -1, 0, false), ('database', ARRAY['regression']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['public']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[], -1, 0, false), ('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[], -1, 0, false)) SELECT citus_internal_add_object_metadata(typetext, objnames, objargs, distargumentindex::int, colocationid::int, force_delegation::bool) FROM distributed_object_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 5, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 5, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 5, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 5, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310020, 1, 0, 1, 100020), (1310021, 1, 0, 5, 100021), (1310022, 1, 0, 1, 100022), (1310023, 1, 0, 5, 100023), (1310024, 1, 0, 1, 100024)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
|
@ -1909,7 +1932,7 @@ SELECT unnest(activate_node_snapshot()) order by 1;
|
|||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(84 rows)
|
||||
(86 rows)
|
||||
|
||||
-- shouldn't work since test_table is MX
|
||||
ALTER TABLE test_table ADD COLUMN id3 bigserial;
|
||||
|
@ -1928,8 +1951,8 @@ ALTER TABLE test_table ADD COLUMN id4 bigserial CHECK (id4 > 0);
|
|||
ERROR: cannot execute ADD COLUMN commands involving serial pseudotypes when metadata is synchronized to workers
|
||||
\c - - - :worker_1_port
|
||||
\ds
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
---------------------------------------------------------------------
|
||||
public | mx_test_sequence_0 | sequence | postgres
|
||||
public | mx_test_sequence_1 | sequence | postgres
|
||||
|
@ -1949,8 +1972,8 @@ DETAIL: drop cascades to default value for column id2 of table test_table
|
|||
drop cascades to default value for column id of table test_table
|
||||
\c - - - :worker_1_port
|
||||
\ds
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
List of relations
|
||||
Schema | Name | Type | Owner
|
||||
---------------------------------------------------------------------
|
||||
public | mx_test_table_col_3_seq | sequence | postgres
|
||||
public | sequence_rollback | sequence | postgres
|
||||
|
@ -2080,13 +2103,13 @@ NOTICE: dropping metadata on the node (localhost,57637)
|
|||
stop_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
(1 row)
|
||||
|
||||
RESET citus.shard_count;
|
||||
RESET citus.shard_replication_factor;
|
||||
|
|
|
@ -860,6 +860,13 @@ NOTICE: drop cascades to default value for column a of table reference_table
|
|||
DROP TABLE ref_table;
|
||||
DROP TABLE reference_table;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT run_command_on_workers('TRUNCATE pg_dist_colocation');
|
||||
run_command_on_workers
|
||||
---------------------------------------------------------------------
|
||||
(localhost,57637,t,"TRUNCATE TABLE")
|
||||
(localhost,57638,t,"TRUNCATE TABLE")
|
||||
(2 rows)
|
||||
|
||||
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
|
||||
count
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -70,10 +70,12 @@ ORDER BY 1;
|
|||
function citus_internal.replace_isolation_tester_func()
|
||||
function citus_internal.restore_isolation_tester_func()
|
||||
function citus_internal.upgrade_columnar_storage(regclass)
|
||||
function citus_internal_add_colocation_metadata(integer,integer,integer,regtype,oid)
|
||||
function citus_internal_add_object_metadata(text,text[],text[],integer,integer,boolean)
|
||||
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
|
||||
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
|
||||
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
|
||||
function citus_internal_delete_colocation_metadata(integer)
|
||||
function citus_internal_delete_shard_metadata(bigint)
|
||||
function citus_internal_global_blocked_processes()
|
||||
function citus_internal_local_blocked_processes()
|
||||
|
@ -176,7 +178,7 @@ ORDER BY 1;
|
|||
function master_update_table_statistics(regclass)
|
||||
function notify_constraint_dropped()
|
||||
function pg_cancel_backend(bigint)
|
||||
function pg_terminate_backend(bigint, bigint)
|
||||
function pg_terminate_backend(bigint,bigint)
|
||||
function poolinfo_valid(text)
|
||||
function read_intermediate_result(text,citus_copy_format)
|
||||
function read_intermediate_results(text[],citus_copy_format)
|
||||
|
@ -273,5 +275,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(257 rows)
|
||||
(259 rows)
|
||||
|
||||
|
|
|
@ -46,6 +46,7 @@ SELECT * FROM rebalance_table_shards();
|
|||
-- TODO: Figure out why this is necessary, rebalance_table_shards shouldn't
|
||||
-- insert stuff into pg_dist_colocation
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT run_command_on_workers('TRUNCATE pg_dist_colocation');
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1390000;
|
||||
|
||||
SELECT 1 FROM citus_activate_node('localhost', :worker_2_port);
|
||||
|
|
|
@ -4,6 +4,7 @@ ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 4;
|
|||
|
||||
-- Delete orphaned entries from pg_dist_colocation
|
||||
DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6;
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation where colocationid = 5 or colocationid = 6');
|
||||
|
||||
-- ===================================================================
|
||||
-- create test utility function
|
||||
|
@ -161,6 +162,7 @@ SELECT find_shard_interval_index(1300016);
|
|||
|
||||
SELECT count(*) FROM pg_dist_partition WHERE colocationid IN (4, 5);
|
||||
DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5);
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid IN (4, 5)');
|
||||
|
||||
SET citus.shard_count = 2;
|
||||
|
||||
|
@ -213,6 +215,12 @@ SELECT * FROM pg_dist_colocation
|
|||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid;
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY logicalrelid;
|
||||
|
@ -221,10 +229,21 @@ SELECT logicalrelid, colocationid FROM pg_dist_partition
|
|||
DROP TABLE table1_groupA;
|
||||
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4
|
||||
$$);
|
||||
|
||||
|
||||
-- dropping all tables in a colocation group also deletes the colocation group
|
||||
DROP TABLE table2_groupA;
|
||||
SELECT * FROM pg_dist_colocation WHERE colocationid = 4;
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c)) FROM pg_dist_colocation c WHERE colocationid = 4
|
||||
$$);
|
||||
|
||||
-- create dropped colocation group again
|
||||
SET citus.shard_count = 2;
|
||||
|
||||
|
@ -350,6 +369,7 @@ ORDER BY
|
|||
ALTER SEQUENCE pg_catalog.pg_dist_colocationid_seq RESTART 1;
|
||||
DELETE FROM pg_dist_colocation
|
||||
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||
SELECT 1 FROM run_command_on_workers('DELETE FROM pg_dist_colocation WHERE colocationid >= 1 AND colocationid < 1000');
|
||||
UPDATE pg_dist_partition SET colocationid = 0
|
||||
WHERE colocationid >= 1 AND colocationid < 1000;
|
||||
|
||||
|
@ -401,6 +421,12 @@ SELECT * FROM pg_dist_colocation
|
|||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid;
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid, logicalrelid;
|
||||
|
@ -427,6 +453,12 @@ SELECT * FROM pg_dist_colocation
|
|||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid;
|
||||
|
||||
-- check to see whether metadata is synced
|
||||
SELECT nodeport, unnest(result::jsonb[]) FROM run_command_on_workers($$
|
||||
SELECT array_agg(row_to_json(c) ORDER BY colocationid) FROM pg_dist_colocation c
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
$$);
|
||||
|
||||
SELECT logicalrelid, colocationid FROM pg_dist_partition
|
||||
WHERE colocationid >= 1 AND colocationid < 1000
|
||||
ORDER BY colocationid, logicalrelid;
|
||||
|
|
|
@ -129,7 +129,7 @@ SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
|
|||
SELECT "Column", "Type", "Definition" FROM index_attrs WHERE
|
||||
relid = 'mx_testing_schema.mx_index'::regclass;
|
||||
|
||||
-- Check that pg_dist_colocation is not synced
|
||||
-- Check that pg_dist_colocation is synced
|
||||
SELECT * FROM pg_dist_colocation ORDER BY colocationid;
|
||||
|
||||
-- Make sure that truncate trigger has been set for the MX table on worker
|
||||
|
@ -637,6 +637,9 @@ ORDER BY
|
|||
|
||||
SELECT shardid AS ref_table_shardid FROM pg_dist_shard WHERE logicalrelid='mx_ref'::regclass \gset
|
||||
|
||||
-- make sure we have the pg_dist_colocation record on the worker
|
||||
SELECT count(*) FROM pg_dist_colocation WHERE distributioncolumntype = 0;
|
||||
|
||||
-- Check that DDL commands are propagated to reference tables on workers
|
||||
\c - - - :master_port
|
||||
ALTER TABLE mx_ref ADD COLUMN col_3 NUMERIC DEFAULT 0;
|
||||
|
|
|
@ -388,6 +388,7 @@ DROP SEQUENCE sequence CASCADE;
|
|||
DROP TABLE ref_table;
|
||||
DROP TABLE reference_table;
|
||||
TRUNCATE pg_dist_colocation;
|
||||
SELECT run_command_on_workers('TRUNCATE pg_dist_colocation');
|
||||
SELECT count(*) FROM (SELECT master_remove_node(nodename, nodeport) FROM pg_dist_node) t;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_groupid_seq RESTART :last_group_id;
|
||||
ALTER SEQUENCE pg_catalog.pg_dist_node_nodeid_seq RESTART :last_node_id;
|
||||
|
|
Loading…
Reference in New Issue