mirror of https://github.com/citusdata/citus.git
Sync pg_dist_object to all nodes with metadata
parent
00b6efa607
commit
5ae0974f38
|
@ -33,6 +33,7 @@
|
|||
#include "distributed/listutils.h"
|
||||
#include "distributed/local_executor.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/namespace_utils.h"
|
||||
#include "distributed/reference_table_utils.h"
|
||||
|
@ -1059,9 +1060,19 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
|
|||
* Ensure sequence dependencies and mark them as distributed
|
||||
* before creating table metadata on workers
|
||||
*/
|
||||
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
|
||||
PropagateSequenceListDependencies(dependentSequenceList);
|
||||
}
|
||||
CreateTableMetadataOnWorkers(relationId);
|
||||
|
||||
Oid sequenceOid;
|
||||
foreach_oid(sequenceOid, dependentSequenceList)
|
||||
{
|
||||
ObjectAddress address;
|
||||
|
||||
ObjectAddressSet(address, RelationRelationId, sequenceOid);
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(&address, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
|
|
@ -590,8 +590,6 @@ PostprocessDefineCollationStmt(Node *node, const char *queryString)
|
|||
|
||||
EnsureDependenciesExistOnAllNodes(&collationAddress);
|
||||
|
||||
MarkObjectDistributed(&collationAddress);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, CreateCollationDDLsIdempotent(
|
||||
collationAddress.objectId));
|
||||
}
|
||||
|
|
|
@ -513,10 +513,9 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
if (ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
/*
|
||||
* Ensure sequence dependencies and mark them as distributed
|
||||
* before creating table metadata on workers
|
||||
* Ensure sequence dependencies table metadata on workers
|
||||
*/
|
||||
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
|
||||
PropagateSequenceListDependencies(dependentSequenceList);
|
||||
}
|
||||
|
||||
CreateTableMetadataOnWorkers(relationId);
|
||||
|
@ -554,6 +553,22 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
|
|||
}
|
||||
}
|
||||
|
||||
if (ShouldSyncTableMetadata(relationId))
|
||||
{
|
||||
if (ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
Oid sequenceOid;
|
||||
foreach_oid(sequenceOid, dependentSequenceList)
|
||||
{
|
||||
ObjectAddress address;
|
||||
|
||||
ObjectAddressSet(address, RelationRelationId, sequenceOid);
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(&address, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Now recreate foreign keys that we dropped beforehand. As modifications are not
|
||||
* allowed on the relations that are involved in the foreign key relationship,
|
||||
|
@ -642,34 +657,36 @@ AlterSequenceType(Oid seqOid, Oid typeOid)
|
|||
|
||||
|
||||
/*
|
||||
* MarkSequenceListDistributedAndPropagateDependencies ensures dependencies
|
||||
* for the given sequence list exist on all nodes and marks the sequences
|
||||
* as distributed.
|
||||
* PropagateSequenceListDependencies ensures dependencies for the given
|
||||
* sequence list exist on all nodes and marks these dependencies as
|
||||
* distributed.
|
||||
* NOTE: The sequences in the sequence list itself are not created yet on
|
||||
* all nodes and they are also not marked distributed.
|
||||
*/
|
||||
void
|
||||
MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList)
|
||||
PropagateSequenceListDependencies(List *sequenceList)
|
||||
{
|
||||
Oid sequenceOid = InvalidOid;
|
||||
foreach_oid(sequenceOid, sequenceList)
|
||||
{
|
||||
MarkSequenceDistributedAndPropagateDependencies(sequenceOid);
|
||||
PropagateSequenceDependencies(sequenceOid);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* MarkSequenceDistributedAndPropagateDependencies ensures dependencies
|
||||
* for the given sequence exist on all nodes and marks the sequence
|
||||
* as distributed.
|
||||
* PropagateSequenceDependencies ensures dependencies for the given sequence
|
||||
* exist on all nodes and marks them as distributed.
|
||||
* NOTE: The sequence itself is not created yet on workers and it's also not
|
||||
* marked distributed.
|
||||
*/
|
||||
void
|
||||
MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid)
|
||||
PropagateSequenceDependencies(Oid sequenceOid)
|
||||
{
|
||||
/* get sequence address */
|
||||
ObjectAddress sequenceAddress = { 0 };
|
||||
ObjectAddressSet(sequenceAddress, RelationRelationId, sequenceOid);
|
||||
EnsureDependenciesExistOnAllNodes(&sequenceAddress);
|
||||
MarkObjectDistributed(&sequenceAddress);
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -49,8 +49,8 @@ bool EnableDependencyCreation = true;
|
|||
* This is solved by creating the dependencies in an idempotent manner, either via
|
||||
* postgres native CREATE IF NOT EXISTS, or citus helper functions.
|
||||
*/
|
||||
void
|
||||
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
||||
List *
|
||||
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const ObjectAddress *target)
|
||||
{
|
||||
List *dependenciesWithCommands = NIL;
|
||||
List *ddlCommands = NULL;
|
||||
|
@ -72,7 +72,7 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
|||
if (list_length(ddlCommands) <= 0)
|
||||
{
|
||||
/* no ddl commands to be executed */
|
||||
return;
|
||||
return NIL;
|
||||
}
|
||||
|
||||
/* since we are executing ddl commands lets disable propagation, primarily for mx */
|
||||
|
@ -88,31 +88,9 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
|||
*/
|
||||
List *workerNodeList = ActivePrimaryNonCoordinatorNodeList(RowShareLock);
|
||||
|
||||
/*
|
||||
* right after we acquired the lock we mark our objects as distributed, these changes
|
||||
* will not become visible before we have successfully created all the objects on our
|
||||
* workers.
|
||||
*
|
||||
* It is possible to create distributed tables which depend on other dependencies
|
||||
* before any node is in the cluster. If we would wait till we actually had connected
|
||||
* to the nodes before marking the objects as distributed these objects would never be
|
||||
* created on the workers when they get added, causing shards to fail to create.
|
||||
*/
|
||||
foreach_ptr(dependency, dependenciesWithCommands)
|
||||
{
|
||||
MarkObjectDistributed(dependency);
|
||||
}
|
||||
|
||||
/*
|
||||
* collect and connect to all applicable nodes
|
||||
*/
|
||||
if (list_length(workerNodeList) <= 0)
|
||||
{
|
||||
/* no nodes to execute on */
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
WorkerNode *workerNode = NULL;
|
||||
foreach_ptr(workerNode, workerNodeList)
|
||||
{
|
||||
|
@ -123,6 +101,29 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
|||
CitusExtensionOwnerName(),
|
||||
ddlCommands);
|
||||
}
|
||||
|
||||
return dependenciesWithCommands;
|
||||
}
|
||||
|
||||
|
||||
void
|
||||
EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
|
||||
{
|
||||
List *dependenciesWithCommands =
|
||||
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(target);
|
||||
|
||||
/*
|
||||
* It is possible to create distributed tables which depend on other dependencies
|
||||
* before any node is in the cluster. If we would wait till we actually had connected
|
||||
* to the nodes before marking the objects as distributed these objects would never be
|
||||
* created on the workers when they get added, causing shards to fail to create.
|
||||
*/
|
||||
ObjectAddress *dependency = NULL;
|
||||
foreach_ptr(dependency, dependenciesWithCommands)
|
||||
{
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(dependency, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -127,6 +127,7 @@ static DistributeObjectOps Any_CompositeType = {
|
|||
.preprocess = PreprocessCompositeTypeStmt,
|
||||
.postprocess = PostprocessCompositeTypeStmt,
|
||||
.address = CompositeTypeStmtObjectAddress,
|
||||
.markDistributed = true,
|
||||
};
|
||||
static DistributeObjectOps Any_CreateEnum = {
|
||||
.deparse = DeparseCreateEnumStmt,
|
||||
|
@ -134,6 +135,7 @@ static DistributeObjectOps Any_CreateEnum = {
|
|||
.preprocess = PreprocessCreateEnumStmt,
|
||||
.postprocess = PostprocessCreateEnumStmt,
|
||||
.address = CreateEnumStmtObjectAddress,
|
||||
.markDistributed = true,
|
||||
};
|
||||
static DistributeObjectOps Any_CreateExtension = {
|
||||
.deparse = DeparseCreateExtensionStmt,
|
||||
|
@ -141,6 +143,7 @@ static DistributeObjectOps Any_CreateExtension = {
|
|||
.preprocess = NULL,
|
||||
.postprocess = PostprocessCreateExtensionStmt,
|
||||
.address = CreateExtensionStmtObjectAddress,
|
||||
.markDistributed = true,
|
||||
};
|
||||
static DistributeObjectOps Any_CreateFunction = {
|
||||
.deparse = NULL,
|
||||
|
@ -225,6 +228,7 @@ static DistributeObjectOps Collation_Define = {
|
|||
.preprocess = NULL,
|
||||
.postprocess = PostprocessDefineCollationStmt,
|
||||
.address = DefineCollationStmtObjectAddress,
|
||||
.markDistributed = true,
|
||||
};
|
||||
static DistributeObjectOps Collation_Drop = {
|
||||
.deparse = DeparseDropCollationStmt,
|
||||
|
|
|
@ -188,8 +188,6 @@ PostprocessCreateExtensionStmt(Node *node, const char *queryString)
|
|||
|
||||
EnsureDependenciesExistOnAllNodes(&extensionAddress);
|
||||
|
||||
MarkObjectDistributed(&extensionAddress);
|
||||
|
||||
return NodeDDLTaskList(NON_COORDINATOR_NODES, commands);
|
||||
}
|
||||
|
||||
|
@ -593,7 +591,8 @@ MarkExistingObjectDependenciesDistributedIfSupported()
|
|||
ObjectAddress *objectAddress = NULL;
|
||||
foreach_ptr(objectAddress, uniqueObjectAddresses)
|
||||
{
|
||||
MarkObjectDistributed(objectAddress);
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(objectAddress, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -75,9 +75,6 @@ static int GetFunctionColocationId(Oid functionOid, char *colocateWithName, Oid
|
|||
static void EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid
|
||||
distributionColumnType, Oid
|
||||
sourceRelationId);
|
||||
static void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
||||
int *distribution_argument_index,
|
||||
int *colocationId);
|
||||
static void EnsureSequentialModeForFunctionDDL(void);
|
||||
static void TriggerSyncMetadataToPrimaryNodes(void);
|
||||
static bool ShouldPropagateCreateFunction(CreateFunctionStmt *stmt);
|
||||
|
@ -187,10 +184,15 @@ create_distributed_function(PG_FUNCTION_ARGS)
|
|||
const char *createFunctionSQL = GetFunctionDDLCommand(funcOid, true);
|
||||
const char *alterFunctionOwnerSQL = GetFunctionAlterOwnerCommand(funcOid);
|
||||
initStringInfo(&ddlCommand);
|
||||
appendStringInfo(&ddlCommand, "%s;%s", createFunctionSQL, alterFunctionOwnerSQL);
|
||||
appendStringInfo(&ddlCommand, "%s;%s;%s;%s",
|
||||
DISABLE_DDL_PROPAGATION,
|
||||
createFunctionSQL,
|
||||
alterFunctionOwnerSQL,
|
||||
ENABLE_DDL_PROPAGATION);
|
||||
SendCommandToWorkersAsUser(NON_COORDINATOR_NODES, CurrentUserName(), ddlCommand.data);
|
||||
|
||||
MarkObjectDistributed(&functionAddress);
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(&functionAddress, shouldSyncMetadata);
|
||||
|
||||
if (distributionArgumentName != NULL)
|
||||
{
|
||||
|
@ -236,8 +238,9 @@ DistributeFunctionWithDistributionArgument(RegProcedure funcOid,
|
|||
distributionArgumentOid);
|
||||
|
||||
/* record the distribution argument and colocationId */
|
||||
bool shouldSyncMetadata = true;
|
||||
UpdateFunctionDistributionInfo(functionAddress, &distributionArgumentIndex,
|
||||
&colocationId);
|
||||
&colocationId, shouldSyncMetadata);
|
||||
|
||||
/*
|
||||
* Once we have at least one distributed function/procedure with distribution
|
||||
|
@ -274,8 +277,10 @@ DistributeFunctionColocatedWithDistributedTable(RegProcedure funcOid,
|
|||
" parameter should also be provided")));
|
||||
}
|
||||
|
||||
bool shouldSyncMetadata = true;
|
||||
|
||||
/* set distribution argument and colocationId to NULL */
|
||||
UpdateFunctionDistributionInfo(functionAddress, NULL, NULL);
|
||||
UpdateFunctionDistributionInfo(functionAddress, NULL, NULL, shouldSyncMetadata);
|
||||
}
|
||||
|
||||
|
||||
|
@ -291,8 +296,9 @@ DistributeFunctionColocatedWithReferenceTable(const ObjectAddress *functionAddre
|
|||
|
||||
/* set distribution argument to NULL and colocationId to the reference table colocation id */
|
||||
int *distributionArgumentIndex = NULL;
|
||||
bool shouldSyncMetadata = true;
|
||||
UpdateFunctionDistributionInfo(functionAddress, distributionArgumentIndex,
|
||||
&colocationId);
|
||||
&colocationId, shouldSyncMetadata);
|
||||
|
||||
/*
|
||||
* Once we have at least one distributed function/procedure that reads
|
||||
|
@ -564,10 +570,10 @@ EnsureFunctionCanBeColocatedWithTable(Oid functionOid, Oid distributionColumnTyp
|
|||
* UpdateFunctionDistributionInfo gets object address of a function and
|
||||
* updates its distribution_argument_index and colocationId in pg_dist_object.
|
||||
*/
|
||||
static void
|
||||
void
|
||||
UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
||||
int *distribution_argument_index,
|
||||
int *colocationId)
|
||||
int *colocationId, bool shouldSyncMetadata)
|
||||
{
|
||||
const bool indexOK = true;
|
||||
|
||||
|
@ -637,6 +643,13 @@ UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
|||
systable_endscan(scanDescriptor);
|
||||
|
||||
table_close(pgDistObjectRel, NoLock);
|
||||
|
||||
if (shouldSyncMetadata)
|
||||
{
|
||||
char *workerMetadataUpdateCommand = DistributedObjectCreateCommand(
|
||||
distAddress, distribution_argument_index, colocationId);
|
||||
SendCommandToWorkersWithMetadata(workerMetadataUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
@ -1089,8 +1102,8 @@ EnsureSequentialModeForFunctionDDL(void)
|
|||
|
||||
/*
|
||||
* TriggerSyncMetadataToPrimaryNodes iterates over the active primary nodes,
|
||||
* and triggers the metadata syncs if the node has not the metadata. Later,
|
||||
* maintenance daemon will sync the metadata to nodes.
|
||||
* and triggers the metadata syncs if the node does not have the metadata.
|
||||
* Later the maintenance daemon will sync the metadata to nodes.
|
||||
*/
|
||||
static void
|
||||
TriggerSyncMetadataToPrimaryNodes(void)
|
||||
|
|
|
@ -1600,6 +1600,8 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
EnsureDependenciesExistOnAllNodes(&tableAddress);
|
||||
}
|
||||
|
||||
List *newDistributedObjects = NIL;
|
||||
|
||||
List *commandList = alterTableStatement->cmds;
|
||||
AlterTableCmd *command = NULL;
|
||||
foreach_ptr(command, commandList)
|
||||
|
@ -1688,8 +1690,14 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
MarkSequenceDistributedAndPropagateDependencies(
|
||||
PropagateSequenceDependencies(
|
||||
seqOid);
|
||||
ObjectAddress *sequenceAddress = palloc(
|
||||
sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*sequenceAddress, RelationRelationId,
|
||||
seqOid);
|
||||
newDistributedObjects = lappend(newDistributedObjects,
|
||||
sequenceAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1721,7 +1729,11 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
if (ShouldSyncTableMetadata(relationId) &&
|
||||
ClusterHasKnownMetadataWorkers())
|
||||
{
|
||||
MarkSequenceDistributedAndPropagateDependencies(seqOid);
|
||||
PropagateSequenceDependencies(seqOid);
|
||||
ObjectAddress *sequenceAddress = palloc(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*sequenceAddress, RelationRelationId, seqOid);
|
||||
newDistributedObjects = lappend(newDistributedObjects,
|
||||
sequenceAddress);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1748,6 +1760,13 @@ PostprocessAlterTableStmt(AlterTableStmt *alterTableStatement)
|
|||
}
|
||||
|
||||
SendCommandToWorkersWithMetadata(ENABLE_DDL_PROPAGATION);
|
||||
|
||||
ObjectAddress *address;
|
||||
foreach_ptr(address, newDistributedObjects)
|
||||
{
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(address, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -189,8 +189,6 @@ PostprocessCompositeTypeStmt(Node *node, const char *queryString)
|
|||
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
|
||||
EnsureDependenciesExistOnAllNodes(&typeAddress);
|
||||
|
||||
MarkObjectDistributed(&typeAddress);
|
||||
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
@ -299,13 +297,6 @@ PostprocessCreateEnumStmt(Node *node, const char *queryString)
|
|||
ObjectAddress typeAddress = GetObjectAddressFromParseTree(node, false);
|
||||
EnsureDependenciesExistOnAllNodes(&typeAddress);
|
||||
|
||||
/*
|
||||
* now that the object has been created and distributed to the workers we mark them as
|
||||
* distributed so we know to keep them up to date and recreate on a new node in the
|
||||
* future
|
||||
*/
|
||||
MarkObjectDistributed(&typeAddress);
|
||||
|
||||
return NIL;
|
||||
}
|
||||
|
||||
|
|
|
@ -53,6 +53,7 @@
|
|||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/multi_executor.h"
|
||||
#include "distributed/multi_explain.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
@ -650,6 +651,12 @@ ProcessUtilityInternal(PlannedStmt *pstmt,
|
|||
MarkIndexValid(indexStmt);
|
||||
}
|
||||
}
|
||||
if (ops && ops->markDistributed)
|
||||
{
|
||||
ObjectAddress address = GetObjectAddressFromParseTree(parsetree, false);
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(&address, shouldSyncMetadata);
|
||||
}
|
||||
}
|
||||
|
||||
/* TODO: fold VACUUM's processing into the above block */
|
||||
|
|
|
@ -31,7 +31,11 @@
|
|||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/commands/utility_hook.h"
|
||||
#include "distributed/coordinator_protocol.h"
|
||||
#include "distributed/version_compat.h"
|
||||
#include "distributed/worker_transaction.h"
|
||||
#include "executor/spi.h"
|
||||
#include "nodes/makefuncs.h"
|
||||
#include "nodes/pg_list.h"
|
||||
|
@ -137,7 +141,7 @@ ObjectExists(const ObjectAddress *address)
|
|||
* by adding appropriate entries to citus.pg_dist_object.
|
||||
*/
|
||||
void
|
||||
MarkObjectDistributed(const ObjectAddress *distAddress)
|
||||
MarkObjectDistributed(const ObjectAddress *distAddress, bool shouldSyncMetadata)
|
||||
{
|
||||
int paramCount = 3;
|
||||
Oid paramTypes[3] = {
|
||||
|
@ -160,6 +164,13 @@ MarkObjectDistributed(const ObjectAddress *distAddress)
|
|||
{
|
||||
ereport(ERROR, (errmsg("failed to insert object into citus.pg_dist_object")));
|
||||
}
|
||||
|
||||
if (shouldSyncMetadata)
|
||||
{
|
||||
char *workerMetadataUpdateCommand = DistributedObjectCreateCommand(
|
||||
distAddress, NULL, NULL);
|
||||
SendCommandToWorkersWithMetadata(workerMetadataUpdateCommand);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -47,6 +47,7 @@
|
|||
#include "distributed/metadata_cache.h"
|
||||
#include "distributed/metadata_sync.h"
|
||||
#include "distributed/metadata/distobject.h"
|
||||
#include "distributed/metadata/pg_dist_object.h"
|
||||
#include "distributed/multi_join_order.h"
|
||||
#include "distributed/multi_partitioning_utils.h"
|
||||
#include "distributed/multi_physical_planner.h"
|
||||
|
@ -126,6 +127,7 @@ PG_FUNCTION_INFO_V1(worker_record_sequence_dependency);
|
|||
PG_FUNCTION_INFO_V1(citus_internal_add_partition_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_shard_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_placement_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_add_object_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_update_placement_metadata);
|
||||
PG_FUNCTION_INFO_V1(citus_internal_delete_shard_metadata);
|
||||
|
||||
|
@ -216,7 +218,6 @@ StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort)
|
|||
}
|
||||
|
||||
SyncMetadataSnapshotToNode(workerNode, raiseInterrupts);
|
||||
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
|
||||
}
|
||||
|
||||
|
||||
|
@ -357,8 +358,10 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
/* generate the queries which drop the metadata */
|
||||
List *dropMetadataCommandList = MetadataDropCommands();
|
||||
|
||||
List *newDistributedObjects = NIL;
|
||||
|
||||
/* generate the queries which create the metadata from scratch */
|
||||
List *createMetadataCommandList = MetadataCreateCommands();
|
||||
List *createMetadataCommandList = MetadataCreateCommands(&newDistributedObjects);
|
||||
|
||||
List *recreateMetadataSnapshotCommandList = list_make1(localGroupIdUpdateCommand);
|
||||
recreateMetadataSnapshotCommandList = list_concat(recreateMetadataSnapshotCommandList,
|
||||
|
@ -378,7 +381,6 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
workerNode->workerPort,
|
||||
currentUser,
|
||||
recreateMetadataSnapshotCommandList);
|
||||
return true;
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -387,8 +389,20 @@ SyncMetadataSnapshotToNode(WorkerNode *workerNode, bool raiseOnError)
|
|||
workerNode->workerPort,
|
||||
currentUser,
|
||||
recreateMetadataSnapshotCommandList);
|
||||
return success;
|
||||
if (!success)
|
||||
{
|
||||
return false;
|
||||
}
|
||||
}
|
||||
MarkNodeMetadataSynced(workerNode->workerName, workerNode->workerPort, true);
|
||||
|
||||
ObjectAddress *address;
|
||||
foreach_ptr(address, newDistributedObjects)
|
||||
{
|
||||
bool shouldSyncMetadata = true;
|
||||
MarkObjectDistributed(address, shouldSyncMetadata);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
|
@ -428,7 +442,7 @@ DropMetadataSnapshotOnNode(WorkerNode *workerNode)
|
|||
* (v) Queries that populate pg_dist_placement table referenced by (iv)
|
||||
*/
|
||||
List *
|
||||
MetadataCreateCommands(void)
|
||||
MetadataCreateCommands(List **newDistributedObjects)
|
||||
{
|
||||
List *metadataSnapshotCommandList = NIL;
|
||||
List *distributedTableList = CitusTableList();
|
||||
|
@ -486,7 +500,19 @@ MetadataCreateCommands(void)
|
|||
List *dependentSequenceList = NIL;
|
||||
GetDependentSequencesWithRelation(relationId, &attnumList,
|
||||
&dependentSequenceList, 0);
|
||||
MarkSequenceListDistributedAndPropagateDependencies(dependentSequenceList);
|
||||
|
||||
Oid sequenceOid;
|
||||
foreach_oid(sequenceOid, dependentSequenceList)
|
||||
{
|
||||
ObjectAddress *sequenceAddress = palloc(sizeof(ObjectAddress));
|
||||
ObjectAddressSet(*sequenceAddress, RelationRelationId, sequenceOid);
|
||||
List *addedDependencies =
|
||||
EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(
|
||||
sequenceAddress);
|
||||
*newDistributedObjects = list_concat(*newDistributedObjects,
|
||||
addedDependencies);
|
||||
*newDistributedObjects = lappend(*newDistributedObjects, sequenceAddress);
|
||||
}
|
||||
|
||||
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
|
||||
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
|
||||
|
@ -573,6 +599,43 @@ MetadataCreateCommands(void)
|
|||
shardCreateCommandList);
|
||||
}
|
||||
|
||||
HeapTuple pgDistObjectTup = NULL;
|
||||
|
||||
Relation pgDistObjectRel = table_open(DistObjectRelationId(), AccessShareLock);
|
||||
|
||||
TupleDesc pgDistObjectDesc = RelationGetDescr(pgDistObjectRel);
|
||||
|
||||
SysScanDesc pgDistObjectScan =
|
||||
systable_beginscan(pgDistObjectRel, InvalidOid, false, NULL, 0, NULL);
|
||||
while (HeapTupleIsValid(pgDistObjectTup = systable_getnext(pgDistObjectScan)))
|
||||
{
|
||||
Form_pg_dist_object pg_dist_object =
|
||||
(Form_pg_dist_object) GETSTRUCT(pgDistObjectTup);
|
||||
|
||||
ObjectAddress address;
|
||||
ObjectAddressSubSet(address, pg_dist_object->classid, pg_dist_object->objid,
|
||||
pg_dist_object->objsubid);
|
||||
bool distributionArgumentIndexIsNull = true;
|
||||
int32 distributionArgumentIndex = DatumGetInt32(
|
||||
heap_getattr(pgDistObjectTup, Anum_pg_dist_object_distribution_argument_index,
|
||||
pgDistObjectDesc, &distributionArgumentIndexIsNull));
|
||||
bool colocationIdIsNull = true;
|
||||
int32 colocationId = DatumGetInt32(
|
||||
heap_getattr(pgDistObjectTup, Anum_pg_dist_object_distribution_argument_index,
|
||||
pgDistObjectDesc, &colocationIdIsNull));
|
||||
|
||||
char *workerMetadataUpdateCommand = DistributedObjectCreateCommand(
|
||||
&address,
|
||||
distributionArgumentIndexIsNull ? NULL : &distributionArgumentIndex,
|
||||
colocationIdIsNull ? NULL : &colocationId);
|
||||
metadataSnapshotCommandList = lappend(metadataSnapshotCommandList,
|
||||
workerMetadataUpdateCommand);
|
||||
}
|
||||
|
||||
systable_endscan(pgDistObjectScan);
|
||||
relation_close(pgDistObjectRel, AccessShareLock);
|
||||
|
||||
|
||||
return metadataSnapshotCommandList;
|
||||
}
|
||||
|
||||
|
@ -666,6 +729,7 @@ GetDistributedTableDDLEvents(Oid relationId)
|
|||
* (v) Queries that delete all the rows from pg_dist_shard table referenced by (iv)
|
||||
* (vi) Queries that delete all the rows from pg_dist_placement table
|
||||
* referenced by (v)
|
||||
* (vii) Truncate pg_dist_object
|
||||
*/
|
||||
List *
|
||||
MetadataDropCommands(void)
|
||||
|
@ -680,6 +744,8 @@ MetadataDropCommands(void)
|
|||
REMOVE_ALL_CLUSTERED_TABLES_COMMAND);
|
||||
|
||||
dropSnapshotCommandList = lappend(dropSnapshotCommandList, DELETE_ALL_NODES);
|
||||
dropSnapshotCommandList = lappend(dropSnapshotCommandList,
|
||||
DELETE_ALL_DISTRIBUTED_OBJECTS);
|
||||
|
||||
return dropSnapshotCommandList;
|
||||
}
|
||||
|
@ -752,6 +818,86 @@ NodeListInsertCommand(List *workerNodeList)
|
|||
}
|
||||
|
||||
|
||||
char *
|
||||
DistributedObjectCreateCommand(const ObjectAddress *address,
|
||||
int32 *distributionArgumentIndex,
|
||||
int32 *colocationId)
|
||||
{
|
||||
StringInfo insertDistributedObjectCommand = makeStringInfo();
|
||||
|
||||
/*
|
||||
* Here we get the three things that pg_identify_object_as_address returns,
|
||||
* without going through the hassle of going from and to Datums using
|
||||
* DirectFunctionCall3.
|
||||
*/
|
||||
List *names;
|
||||
List *args;
|
||||
char *objectType = getObjectTypeDescription(address);
|
||||
getObjectIdentityParts(address, &names, &args);
|
||||
|
||||
appendStringInfo(insertDistributedObjectCommand,
|
||||
"SELECT citus_internal_add_object_metadata "
|
||||
"(classid, objid, objsubid, ");
|
||||
if (distributionArgumentIndex == NULL)
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, "NULL, ");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, "%d, ",
|
||||
*distributionArgumentIndex);
|
||||
}
|
||||
if (colocationId == NULL)
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, "NULL");
|
||||
}
|
||||
else
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, "%d", *colocationId);
|
||||
}
|
||||
|
||||
appendStringInfo(insertDistributedObjectCommand,
|
||||
") FROM pg_get_object_address(%s, ARRAY[",
|
||||
quote_literal_cstr(objectType));
|
||||
|
||||
char *name;
|
||||
bool firstLoop = true;
|
||||
foreach_ptr(name, names)
|
||||
{
|
||||
if (!firstLoop)
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, ", ");
|
||||
}
|
||||
firstLoop = false;
|
||||
appendStringInfoString(insertDistributedObjectCommand, quote_literal_cstr(name));
|
||||
}
|
||||
|
||||
appendStringInfo(insertDistributedObjectCommand, "]::text[], ARRAY[");
|
||||
|
||||
char *arg;
|
||||
firstLoop = true;
|
||||
foreach_ptr(arg, args)
|
||||
{
|
||||
if (!firstLoop)
|
||||
{
|
||||
appendStringInfo(insertDistributedObjectCommand, ", ");
|
||||
}
|
||||
firstLoop = false;
|
||||
appendStringInfoString(insertDistributedObjectCommand, quote_literal_cstr(arg));
|
||||
}
|
||||
|
||||
appendStringInfo(insertDistributedObjectCommand, "]::text[])");
|
||||
return insertDistributedObjectCommand->data;
|
||||
}
|
||||
|
||||
|
||||
char *
|
||||
DistributedObjectDeleteCommand(const ObjectAddress *address)
|
||||
{
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* DistributionCreateCommands generates a commands that can be
|
||||
* executed to replicate the metadata for a distributed table.
|
||||
|
@ -1803,11 +1949,6 @@ SyncMetadataToNodes(void)
|
|||
workerNode->workerPort)));
|
||||
result = METADATA_SYNC_FAILED_SYNC;
|
||||
}
|
||||
else
|
||||
{
|
||||
MarkNodeMetadataSynced(workerNode->workerName,
|
||||
workerNode->workerPort, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2479,6 +2620,56 @@ EnsureShardPlacementMetadataIsSane(Oid relationId, int64 shardId, int64 placemen
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* citus_internal_add_object_metadata is an internal UDF to
|
||||
* add a row to pg_dist_object.
|
||||
*/
|
||||
Datum
|
||||
citus_internal_add_object_metadata(PG_FUNCTION_ARGS)
|
||||
{
|
||||
PG_ENSURE_ARGNOTNULL(0, "classid");
|
||||
Oid classid = PG_GETARG_OID(0);
|
||||
PG_ENSURE_ARGNOTNULL(1, "objid");
|
||||
Oid objid = PG_GETARG_OID(1);
|
||||
PG_ENSURE_ARGNOTNULL(2, "objsubid");
|
||||
Oid objsubid = PG_GETARG_OID(2);
|
||||
int distributionArgumentIndexValue;
|
||||
int colocationIdValue;
|
||||
int *distributionArgumentIndex = NULL;
|
||||
int *colocationId = NULL;
|
||||
if (!PG_ARGISNULL(3))
|
||||
{
|
||||
distributionArgumentIndexValue = PG_GETARG_INT32(3);
|
||||
distributionArgumentIndex = &distributionArgumentIndexValue;
|
||||
}
|
||||
if (!PG_ARGISNULL(4))
|
||||
{
|
||||
colocationIdValue = PG_GETARG_INT32(4);
|
||||
colocationId = &colocationIdValue;
|
||||
}
|
||||
|
||||
|
||||
/* TODO: only owner of the object is allowed to modify the metadata */
|
||||
|
||||
/* TODO: maybe we want to serialize all the metadata changes to this table */
|
||||
|
||||
if (!ShouldSkipMetadataChecks())
|
||||
{
|
||||
/* this UDF is not allowed allowed for executing as a separate command */
|
||||
EnsureCoordinatorInitiatedOperation();
|
||||
}
|
||||
|
||||
ObjectAddress address = { 0 };
|
||||
ObjectAddressSubSet(address, classid, objid, objsubid);
|
||||
bool shouldSyncMetadata = false;
|
||||
MarkObjectDistributed(&address, shouldSyncMetadata);
|
||||
UpdateFunctionDistributionInfo(&address, distributionArgumentIndex, colocationId,
|
||||
shouldSyncMetadata);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* ShouldSkipMetadataChecks returns true if the current user is allowed to
|
||||
* make any
|
||||
|
|
|
@ -13,5 +13,6 @@ ALTER TABLE pg_catalog.pg_dist_placement ADD CONSTRAINT placement_shardid_groupi
|
|||
#include "udfs/citus_internal_add_partition_metadata/10.2-1.sql";
|
||||
#include "udfs/citus_internal_add_shard_metadata/10.2-1.sql";
|
||||
#include "udfs/citus_internal_add_placement_metadata/10.2-1.sql";
|
||||
#include "udfs/citus_internal_add_object_metadata/10.2-1.sql";
|
||||
#include "udfs/citus_internal_update_placement_metadata/10.2-1.sql";
|
||||
#include "udfs/citus_internal_delete_shard_metadata/10.2-1.sql";
|
||||
|
|
|
@ -14,6 +14,7 @@ COMMENT ON FUNCTION pg_catalog.stop_metadata_sync_to_node(nodename text, nodepor
|
|||
DROP FUNCTION pg_catalog.citus_internal_add_partition_metadata(regclass, "char", text, integer, "char");
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_shard_metadata(regclass, bigint, "char", text, text);
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_placement_metadata(bigint, integer, bigint, integer, bigint);
|
||||
DROP FUNCTION pg_catalog.citus_internal_add_object_metadata(oid, oid, oid, integer, integer);
|
||||
DROP FUNCTION pg_catalog.citus_internal_update_placement_metadata(bigint, integer, integer);
|
||||
DROP FUNCTION pg_catalog.citus_internal_delete_shard_metadata(bigint);
|
||||
|
||||
|
|
9
src/backend/distributed/sql/udfs/citus_internal_add_object_metadata/10.2-1.sql
generated
Normal file
9
src/backend/distributed/sql/udfs/citus_internal_add_object_metadata/10.2-1.sql
generated
Normal file
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_object_metadata(
|
||||
classid oid, objid oid, objsubid oid,
|
||||
distribution_argument_index int, colocationid int)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(oid,oid,oid,int,int) IS
|
||||
'Inserts into pg_dist_object with user checks';
|
|
@ -0,0 +1,9 @@
|
|||
CREATE OR REPLACE FUNCTION pg_catalog.citus_internal_add_object_metadata(
|
||||
classid oid, objid oid, objsubid oid,
|
||||
distribution_argument_index int, colocationid int)
|
||||
RETURNS void
|
||||
LANGUAGE C
|
||||
AS 'MODULE_PATHNAME';
|
||||
|
||||
COMMENT ON FUNCTION pg_catalog.citus_internal_add_object_metadata(oid,oid,oid,int,int) IS
|
||||
'Inserts into pg_dist_object with user checks';
|
|
@ -42,7 +42,8 @@ Datum
|
|||
master_metadata_snapshot(PG_FUNCTION_ARGS)
|
||||
{
|
||||
List *dropSnapshotCommands = MetadataDropCommands();
|
||||
List *createSnapshotCommands = MetadataCreateCommands();
|
||||
List *newDistributedObjects = NIL;
|
||||
List *createSnapshotCommands = MetadataCreateCommands(&newDistributedObjects);
|
||||
List *snapshotCommandList = NIL;
|
||||
int snapshotCommandIndex = 0;
|
||||
Oid ddlCommandTypeId = TEXTOID;
|
||||
|
|
|
@ -56,6 +56,7 @@ typedef struct DistributeObjectOps
|
|||
List * (*preprocess)(Node *, const char *, ProcessUtilityContext);
|
||||
List * (*postprocess)(Node *, const char *);
|
||||
ObjectAddress (*address)(Node *, bool);
|
||||
bool markDistributed;
|
||||
} DistributeObjectOps;
|
||||
|
||||
#define CITUS_TRUNCATE_TRIGGER_NAME "citus_truncate_trigger"
|
||||
|
@ -470,6 +471,10 @@ extern List * CreateFunctionDDLCommandsIdempotent(const ObjectAddress *functionA
|
|||
extern char * GetFunctionDDLCommand(const RegProcedure funcOid, bool useCreateOrReplace);
|
||||
extern char * GenerateBackupNameForProcCollision(const ObjectAddress *address);
|
||||
extern ObjectWithArgs * ObjectWithArgsFromOid(Oid funcOid);
|
||||
extern void UpdateFunctionDistributionInfo(const ObjectAddress *distAddress,
|
||||
int *distribution_argument_index,
|
||||
int *colocationId,
|
||||
bool shouldSyncMetadata);
|
||||
|
||||
/* vacuum.c - forward declarations */
|
||||
extern void PostprocessVacuumStmt(VacuumStmt *vacuumStmt, const char *vacuumCommand);
|
||||
|
|
|
@ -20,7 +20,8 @@ extern bool ObjectExists(const ObjectAddress *address);
|
|||
extern bool CitusExtensionObject(const ObjectAddress *objectAddress);
|
||||
extern bool IsObjectDistributed(const ObjectAddress *address);
|
||||
extern bool ClusterHasDistributedFunctionWithDistArgument(void);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress);
|
||||
extern void MarkObjectDistributed(const ObjectAddress *distAddress,
|
||||
bool shouldSyncMetadata);
|
||||
extern void UnmarkObjectDistributed(const ObjectAddress *address);
|
||||
extern bool IsTableOwnedByExtension(Oid relationId);
|
||||
extern bool IsObjectAddressOwnedByExtension(const ObjectAddress *target,
|
||||
|
|
|
@ -31,8 +31,12 @@ typedef enum
|
|||
extern void StartMetadataSyncToNode(const char *nodeNameString, int32 nodePort);
|
||||
extern bool ClusterHasKnownMetadataWorkers(void);
|
||||
extern bool ShouldSyncTableMetadata(Oid relationId);
|
||||
extern List * MetadataCreateCommands(void);
|
||||
extern List * MetadataCreateCommands(List **newDistributedObjects);
|
||||
extern List * MetadataDropCommands(void);
|
||||
extern char * DistributedObjectCreateCommand(const ObjectAddress *address,
|
||||
int32 *distributionArgumentIndex,
|
||||
int32 *colocationId);
|
||||
extern char * DistributedObjectDeleteCommand(const ObjectAddress *address);
|
||||
extern char * DistributionCreateCommand(CitusTableCacheEntry *cacheEntry);
|
||||
extern char * DistributionDeleteCommand(const char *schemaName,
|
||||
const char *tableName);
|
||||
|
@ -63,6 +67,7 @@ extern void GetDependentSequencesWithRelation(Oid relationId, List **attnumList,
|
|||
extern Oid GetAttributeTypeOid(Oid relationId, AttrNumber attnum);
|
||||
|
||||
#define DELETE_ALL_NODES "TRUNCATE pg_dist_node CASCADE"
|
||||
#define DELETE_ALL_DISTRIBUTED_OBJECTS "TRUNCATE citus.pg_dist_object"
|
||||
#define REMOVE_ALL_CLUSTERED_TABLES_COMMAND \
|
||||
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
|
||||
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
|
||||
|
|
|
@ -293,9 +293,12 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
|
|||
extern void ExecuteQueryViaSPI(char *query, int SPIOK);
|
||||
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId);
|
||||
extern void AlterSequenceType(Oid seqOid, Oid typeOid);
|
||||
extern void MarkSequenceListDistributedAndPropagateDependencies(List *sequenceList);
|
||||
extern void MarkSequenceDistributedAndPropagateDependencies(Oid sequenceOid);
|
||||
extern void PropagateSequenceListDependencies(List *sequenceList);
|
||||
extern void PropagateSequenceDependencies(Oid sequenceOid);
|
||||
extern void EnsureDistributedSequencesHaveOneType(Oid relationId,
|
||||
List *dependentSequenceList,
|
||||
List *attnumList);
|
||||
extern List * EnsureDependenciesExistOnAllNodesWithoutMarkingDistributed(const
|
||||
ObjectAddress *
|
||||
target);
|
||||
#endif /* METADATA_UTILITY_H */
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.enable_ddl_propagation TO OFF');
|
||||
success
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
(2 rows)
|
||||
|
||||
SELECT success FROM run_command_on_workers('select pg_reload_conf()');
|
||||
success
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
(2 rows)
|
||||
|
|
@ -252,6 +252,21 @@ SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab')
|
|||
localhost | 57638 | t | (01:23:45:67:89:ab,"01:23:45:67:89:ab is text")
|
||||
(2 rows)
|
||||
|
||||
-- Wait two times for metadata sync, once for each node in the cluster.
|
||||
-- Otherwise the next distributed function creation will fail, telling us to
|
||||
-- wait for syncing.
|
||||
SELECT public.wait_until_metadata_sync(30000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT public.wait_until_metadata_sync(30000);
|
||||
wait_until_metadata_sync
|
||||
---------------------------------------------------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
|
||||
create_distributed_function
|
||||
---------------------------------------------------------------------
|
||||
|
|
|
@ -0,0 +1,14 @@
|
|||
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.enable_ddl_propagation TO ON');
|
||||
success
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
(2 rows)
|
||||
|
||||
SELECT success FROM run_command_on_workers('select pg_reload_conf()');
|
||||
success
|
||||
---------------------------------------------------------------------
|
||||
t
|
||||
t
|
||||
(2 rows)
|
||||
|
|
@ -646,13 +646,14 @@ SELECT * FROM multi_extension.print_extension_changes();
|
|||
function stop_metadata_sync_to_node(text,integer) void |
|
||||
| function citus_internal.downgrade_columnar_storage(regclass) void
|
||||
| function citus_internal.upgrade_columnar_storage(regclass) void
|
||||
| function citus_internal_add_object_metadata(oid,oid,oid,integer,integer) void
|
||||
| function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char") void
|
||||
| function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint) void
|
||||
| function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text) void
|
||||
| function citus_internal_delete_shard_metadata(bigint) void
|
||||
| function citus_internal_update_placement_metadata(bigint,integer,integer) void
|
||||
| function stop_metadata_sync_to_node(text,integer,boolean) void
|
||||
(9 rows)
|
||||
(10 rows)
|
||||
|
||||
DROP TABLE multi_extension.prev_objects, multi_extension.extension_diff;
|
||||
-- show running version
|
||||
|
|
|
@ -29,9 +29,13 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
unnest
|
||||
---------------------------------------------------------------------
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
(3 rows)
|
||||
(7 rows)
|
||||
|
||||
-- this function is dropped in Citus10, added here for tests
|
||||
CREATE OR REPLACE FUNCTION pg_catalog.master_create_distributed_table(table_name regclass,
|
||||
|
@ -79,16 +83,20 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
ALTER TABLE public.mx_test_table OWNER TO postgres
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('public.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(16 rows)
|
||||
(20 rows)
|
||||
|
||||
-- Show that CREATE INDEX commands are included in the metadata snapshot
|
||||
CREATE INDEX mx_index ON mx_test_table(col_2);
|
||||
|
@ -103,16 +111,20 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE INDEX mx_index ON public.mx_test_table USING btree (col_2)
|
||||
CREATE TABLE public.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('public.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('public.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
SELECT pg_catalog.worker_record_sequence_dependency('public.mx_test_table_col_3_seq'::regclass,'public.mx_test_table'::regclass,'col_3')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('public.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('public.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('public.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('public.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('public.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('public.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('public.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('public.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(17 rows)
|
||||
(21 rows)
|
||||
|
||||
-- Show that schema changes are included in the metadata snapshot
|
||||
CREATE SCHEMA mx_testing_schema;
|
||||
|
@ -128,16 +140,21 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(17 rows)
|
||||
(22 rows)
|
||||
|
||||
-- Show that append distributed tables are not included in the metadata snapshot
|
||||
CREATE TABLE non_mx_test_table (col_1 int, col_2 text);
|
||||
|
@ -159,16 +176,21 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(17 rows)
|
||||
(22 rows)
|
||||
|
||||
-- Show that range distributed tables are not included in the metadata snapshot
|
||||
UPDATE pg_dist_partition SET partmethod='r' WHERE logicalrelid='non_mx_test_table'::regclass;
|
||||
|
@ -183,16 +205,21 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE INDEX mx_index ON mx_testing_schema.mx_test_table USING btree (col_2)
|
||||
CREATE TABLE mx_testing_schema.mx_test_table (col_1 integer, col_2 text NOT NULL, col_3 bigint DEFAULT nextval('mx_testing_schema.mx_test_table_col_3_seq'::regclass) NOT NULL, col_4 bigint DEFAULT nextval('public.user_defined_seq'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (1, 1, 'localhost', 57637, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default'),(2, 2, 'localhost', 57638, 'default', FALSE, FALSE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
SELECT pg_catalog.worker_record_sequence_dependency('mx_testing_schema.mx_test_table_col_3_seq'::regclass,'mx_testing_schema.mx_test_table'::regclass,'col_3')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.user_defined_seq AS bigint INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 CACHE 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 2, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 2, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 2, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 2, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't'::"char", '-2147483648', '-1610612737'), ('mx_testing_schema.mx_test_table'::regclass, 1310001, 't'::"char", '-1610612736', '-1073741825'), ('mx_testing_schema.mx_test_table'::regclass, 1310002, 't'::"char", '-1073741824', '-536870913'), ('mx_testing_schema.mx_test_table'::regclass, 1310003, 't'::"char", '-536870912', '-1'), ('mx_testing_schema.mx_test_table'::regclass, 1310004, 't'::"char", '0', '536870911'), ('mx_testing_schema.mx_test_table'::regclass, 1310005, 't'::"char", '536870912', '1073741823'), ('mx_testing_schema.mx_test_table'::regclass, 1310006, 't'::"char", '1073741824', '1610612735'), ('mx_testing_schema.mx_test_table'::regclass, 1310007, 't'::"char", '1610612736', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(17 rows)
|
||||
(22 rows)
|
||||
|
||||
-- Test start_metadata_sync_to_node UDF
|
||||
-- Ensure that hasmetadata=false for all nodes
|
||||
|
@ -1187,8 +1214,10 @@ DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence;
|
|||
-- owner
|
||||
CREATE TABLE pg_dist_placement_temp AS SELECT * FROM pg_dist_placement;
|
||||
CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition;
|
||||
CREATE TABLE pg_dist_object_temp AS SELECT * FROM citus.pg_dist_object;
|
||||
DELETE FROM pg_dist_placement;
|
||||
DELETE FROM pg_dist_partition;
|
||||
DELETE FROM citus.pg_dist_object;
|
||||
SELECT groupid AS old_worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
master_remove_node
|
||||
|
@ -1271,8 +1300,10 @@ DROP TABLE mx_table;
|
|||
\c - postgres - :master_port
|
||||
INSERT INTO pg_dist_placement SELECT * FROM pg_dist_placement_temp;
|
||||
INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp;
|
||||
INSERT INTO citus.pg_dist_object SELECT * FROM pg_dist_object_temp ON CONFLICT DO NOTHING;
|
||||
DROP TABLE pg_dist_placement_temp;
|
||||
DROP TABLE pg_dist_partition_temp;
|
||||
DROP TABLE pg_dist_object_temp;
|
||||
UPDATE pg_dist_placement
|
||||
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
||||
WHERE groupid = :old_worker_2_group;
|
||||
|
@ -1677,6 +1708,17 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
CREATE TABLE public.mx_ref (col_1 integer, col_2 text)
|
||||
CREATE TABLE public.test_table (id integer DEFAULT nextval('public.mx_test_sequence_0'::regclass), id2 integer DEFAULT nextval('public.mx_test_sequence_1'::regclass))
|
||||
INSERT INTO pg_dist_node (nodeid, groupid, nodename, nodeport, noderack, hasmetadata, metadatasynced, isactive, noderole, nodecluster) VALUES (4, 1, 'localhost', 8888, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'default'),(5, 1, 'localhost', 8889, 'default', FALSE, FALSE, TRUE, 'secondary'::noderole, 'second-cluster'),(1, 1, 'localhost', 57637, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default'),(7, 5, 'localhost', 57638, 'default', TRUE, TRUE, TRUE, 'primary'::noderole, 'default')
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('database', ARRAY['regression']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('role', ARRAY['postgres']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_test_schema_1']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_test_schema_2']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_testing_schema']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['mx_testing_schema_2']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('schema', ARRAY['public']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('sequence', ARRAY['mx_testing_schema', 'mx_test_table_col_3_seq']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('sequence', ARRAY['public', 'mx_test_sequence_0']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('sequence', ARRAY['public', 'mx_test_sequence_1']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_object_metadata (classid, objid, objsubid, NULL, NULL) FROM pg_get_object_address('sequence', ARRAY['public', 'user_defined_seq']::text[], ARRAY[]::text[])
|
||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_1.mx_table_1'::regclass, 'h', 'col1', 3, 's')
|
||||
SELECT citus_internal_add_partition_metadata ('mx_test_schema_2.mx_table_2'::regclass, 'h', 'col1', 3, 's')
|
||||
SELECT citus_internal_add_partition_metadata ('mx_testing_schema.mx_test_table'::regclass, 'h', 'col_1', 0, 's')
|
||||
|
@ -1695,6 +1737,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
SELECT worker_create_truncate_trigger('public.mx_ref')
|
||||
SELECT worker_create_truncate_trigger('public.test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE citus.pg_dist_object
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310000, 1, 0, 1, 100000), (1310001, 1, 0, 5, 100001), (1310002, 1, 0, 1, 100002), (1310003, 1, 0, 5, 100003), (1310004, 1, 0, 1, 100004), (1310005, 1, 0, 5, 100005), (1310006, 1, 0, 1, 100006), (1310007, 1, 0, 5, 100007)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
WITH placement_data(shardid, shardstate, shardlength, groupid, placementid) AS (VALUES (1310020, 1, 0, 1, 100020), (1310021, 1, 0, 5, 100021), (1310022, 1, 0, 1, 100022), (1310023, 1, 0, 5, 100023), (1310024, 1, 0, 1, 100024)) SELECT citus_internal_add_placement_metadata(shardid, shardstate, shardlength, groupid, placementid) FROM placement_data;
|
||||
|
@ -1708,7 +1751,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.dist_table_1'::regclass, 1310074, 't'::"char", '-2147483648', '-1073741825'), ('public.dist_table_1'::regclass, 1310075, 't'::"char", '-1073741824', '-1'), ('public.dist_table_1'::regclass, 1310076, 't'::"char", '0', '1073741823'), ('public.dist_table_1'::regclass, 1310077, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.mx_ref'::regclass, 1310073, 't'::"char", NULL, NULL)) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
WITH shard_data(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) AS (VALUES ('public.test_table'::regclass, 1310083, 't'::"char", '-2147483648', '-1073741825'), ('public.test_table'::regclass, 1310084, 't'::"char", '-1073741824', '-1'), ('public.test_table'::regclass, 1310085, 't'::"char", '0', '1073741823'), ('public.test_table'::regclass, 1310086, 't'::"char", '1073741824', '2147483647')) SELECT citus_internal_add_shard_metadata(relationname, shardid, storagetype, shardminvalue, shardmaxvalue) FROM shard_data;
|
||||
(62 rows)
|
||||
(74 rows)
|
||||
|
||||
-- shouldn't work since test_table is MX
|
||||
ALTER TABLE test_table ADD COLUMN id3 bigserial;
|
||||
|
|
|
@ -353,29 +353,10 @@ CREATE TABLE some_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
|
|||
DROP TABLE some_table_with_sequence;
|
||||
CREATE SEQUENCE some_sequence;
|
||||
DROP SEQUENCE some_sequence;
|
||||
-- Show that dropping the sequence of an MX table with cascade harms the table and shards
|
||||
BEGIN;
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
col_1 | integer |
|
||||
col_2 | text |
|
||||
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
|
||||
(3 rows)
|
||||
|
||||
-- suppress notice message caused by DROP ... CASCADE to prevent pg version difference
|
||||
SET client_min_messages TO 'WARNING';
|
||||
DROP SEQUENCE mx_table_col_3_seq CASCADE;
|
||||
RESET client_min_messages;
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||
Column | Type | Modifiers
|
||||
---------------------------------------------------------------------
|
||||
col_1 | integer |
|
||||
col_2 | text |
|
||||
col_3 | bigint | not null
|
||||
(3 rows)
|
||||
|
||||
ROLLBACK;
|
||||
-- Show that dropping the sequence of an MX table is not allowed on a worker
|
||||
DROP SEQUENCE mx_table_col_3_seq;
|
||||
ERROR: operation is not allowed on this node
|
||||
HINT: Connect to the coordinator and run it again.
|
||||
-- Cleanup
|
||||
\c - - - :master_port
|
||||
DROP TABLE mx_table;
|
||||
|
|
|
@ -66,6 +66,7 @@ ORDER BY 1;
|
|||
function citus_internal.replace_isolation_tester_func()
|
||||
function citus_internal.restore_isolation_tester_func()
|
||||
function citus_internal.upgrade_columnar_storage(regclass)
|
||||
function citus_internal_add_object_metadata(oid,oid,oid,integer,integer)
|
||||
function citus_internal_add_partition_metadata(regclass,"char",text,integer,"char")
|
||||
function citus_internal_add_placement_metadata(bigint,integer,bigint,integer,bigint)
|
||||
function citus_internal_add_shard_metadata(regclass,bigint,"char",text,text)
|
||||
|
@ -253,5 +254,5 @@ ORDER BY 1;
|
|||
view citus_worker_stat_activity
|
||||
view pg_dist_shard_placement
|
||||
view time_partitions
|
||||
(237 rows)
|
||||
(238 rows)
|
||||
|
||||
|
|
|
@ -327,8 +327,14 @@ test: distributed_procedure
|
|||
|
||||
# ---------
|
||||
# deparsing logic tests
|
||||
#
|
||||
# we temporarily disable ddl propagation on the workers, because these tests
|
||||
# run queries there directly. We do this disabling/enabling in a separate test,
|
||||
# so that the deparse tests can run in parallel.
|
||||
# ---------
|
||||
test: disable_worker_ddl_propagation
|
||||
test: multi_deparse_function multi_deparse_procedure
|
||||
test: enable_worker_ddl_propagation
|
||||
|
||||
# --------
|
||||
# cannot be run in parallel with any other tests as it checks
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.enable_ddl_propagation TO OFF');
|
||||
SELECT success FROM run_command_on_workers('select pg_reload_conf()');
|
||||
|
|
@ -204,6 +204,12 @@ END;
|
|||
SELECT create_distributed_function('dup(macaddr)', '$1', colocate_with := 'streaming_table');
|
||||
SELECT * FROM run_command_on_workers($$SELECT function_tests.dup('0123456789ab');$$) ORDER BY 1,2;
|
||||
|
||||
-- Wait two times for metadata sync, once for each node in the cluster.
|
||||
-- Otherwise the next distributed function creation will fail, telling us to
|
||||
-- wait for syncing.
|
||||
SELECT public.wait_until_metadata_sync(30000);
|
||||
SELECT public.wait_until_metadata_sync(30000);
|
||||
|
||||
SELECT create_distributed_function('eq(macaddr,macaddr)', '$1', colocate_with := 'streaming_table');
|
||||
SELECT * FROM run_command_on_workers($$SELECT function_tests.eq('012345689ab','0123456789ab');$$) ORDER BY 1,2;
|
||||
SELECT public.verify_function_is_same_on_workers('function_tests.eq(macaddr,macaddr)');
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
SELECT success FROM run_command_on_workers('ALTER SYSTEM SET citus.enable_ddl_propagation TO ON');
|
||||
SELECT success FROM run_command_on_workers('select pg_reload_conf()');
|
||||
|
|
@ -533,8 +533,10 @@ DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence;
|
|||
-- owner
|
||||
CREATE TABLE pg_dist_placement_temp AS SELECT * FROM pg_dist_placement;
|
||||
CREATE TABLE pg_dist_partition_temp AS SELECT * FROM pg_dist_partition;
|
||||
CREATE TABLE pg_dist_object_temp AS SELECT * FROM citus.pg_dist_object;
|
||||
DELETE FROM pg_dist_placement;
|
||||
DELETE FROM pg_dist_partition;
|
||||
DELETE FROM citus.pg_dist_object;
|
||||
SELECT groupid AS old_worker_2_group FROM pg_dist_node WHERE nodeport = :worker_2_port \gset
|
||||
SELECT master_remove_node('localhost', :worker_2_port);
|
||||
|
||||
|
@ -574,8 +576,10 @@ DROP TABLE mx_table;
|
|||
\c - postgres - :master_port
|
||||
INSERT INTO pg_dist_placement SELECT * FROM pg_dist_placement_temp;
|
||||
INSERT INTO pg_dist_partition SELECT * FROM pg_dist_partition_temp;
|
||||
INSERT INTO citus.pg_dist_object SELECT * FROM pg_dist_object_temp ON CONFLICT DO NOTHING;
|
||||
DROP TABLE pg_dist_placement_temp;
|
||||
DROP TABLE pg_dist_partition_temp;
|
||||
DROP TABLE pg_dist_object_temp;
|
||||
UPDATE pg_dist_placement
|
||||
SET groupid = (SELECT groupid FROM pg_dist_node WHERE nodeport = :worker_2_port)
|
||||
WHERE groupid = :old_worker_2_group;
|
||||
|
|
|
@ -216,15 +216,8 @@ DROP TABLE some_table_with_sequence;
|
|||
CREATE SEQUENCE some_sequence;
|
||||
DROP SEQUENCE some_sequence;
|
||||
|
||||
-- Show that dropping the sequence of an MX table with cascade harms the table and shards
|
||||
BEGIN;
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||
-- suppress notice message caused by DROP ... CASCADE to prevent pg version difference
|
||||
SET client_min_messages TO 'WARNING';
|
||||
DROP SEQUENCE mx_table_col_3_seq CASCADE;
|
||||
RESET client_min_messages;
|
||||
SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='public.mx_table'::regclass;
|
||||
ROLLBACK;
|
||||
-- Show that dropping the sequence of an MX table is not allowed on a worker
|
||||
DROP SEQUENCE mx_table_col_3_seq;
|
||||
|
||||
-- Cleanup
|
||||
\c - - - :master_port
|
||||
|
|
Loading…
Reference in New Issue