mirror of https://github.com/citusdata/citus.git
Address reviews
parent
3c9aa360f5
commit
524ee83a9a
|
@ -785,24 +785,30 @@ SyncObjectDependenciesCommandList(WorkerNode *workerNode)
|
||||||
static void
|
static void
|
||||||
SyncObjectDependenciesToNode(WorkerNode *workerNode)
|
SyncObjectDependenciesToNode(WorkerNode *workerNode)
|
||||||
{
|
{
|
||||||
if (NodeIsPrimary(workerNode))
|
if (NodeIsCoordinator(workerNode))
|
||||||
{
|
{
|
||||||
EnsureNoModificationsHaveBeenDone();
|
/* coordinator has all the objects */
|
||||||
|
return;
|
||||||
Assert(ShouldPropagate());
|
|
||||||
if (!NodeIsCoordinator(workerNode))
|
|
||||||
{
|
|
||||||
List *commandList = SyncObjectDependenciesCommandList(workerNode);
|
|
||||||
|
|
||||||
/* send commands to new workers, the current user should be a superuser */
|
|
||||||
Assert(superuser());
|
|
||||||
SendMetadataCommandListToWorkerInCoordinatedTransaction(
|
|
||||||
workerNode->workerName,
|
|
||||||
workerNode->workerPort,
|
|
||||||
CurrentUserName(),
|
|
||||||
commandList);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!NodeIsPrimary(workerNode))
|
||||||
|
{
|
||||||
|
/* secondary nodes gets the objects from their primaries via replication */
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
EnsureNoModificationsHaveBeenDone();
|
||||||
|
Assert(ShouldPropagate());
|
||||||
|
|
||||||
|
List *commandList = SyncObjectDependenciesCommandList(workerNode);
|
||||||
|
|
||||||
|
/* send commands to new workers, the current user should be a superuser */
|
||||||
|
Assert(superuser());
|
||||||
|
SendMetadataCommandListToWorkerInCoordinatedTransaction(
|
||||||
|
workerNode->workerName,
|
||||||
|
workerNode->workerPort,
|
||||||
|
CurrentUserName(),
|
||||||
|
commandList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -466,18 +466,18 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
||||||
/*
|
/*
|
||||||
* If sequence with the same name exist for different type, it must have been
|
* If sequence with the same name exist for different type, it must have been
|
||||||
* stayed on that node after a rollbacked create_distributed_table operation.
|
* stayed on that node after a rollbacked create_distributed_table operation.
|
||||||
* We must drop it first to create the sequence with the correct type.
|
* We must change it's name first to create the sequence with the correct type.
|
||||||
*/
|
*/
|
||||||
|
Oid sequenceOid;
|
||||||
CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode;
|
CreateSeqStmt *createSequenceStatement = (CreateSeqStmt *) commandNode;
|
||||||
char *sequenceName = createSequenceStatement->sequence->relname;
|
char *sequenceName = createSequenceStatement->sequence->relname;
|
||||||
char *sequenceSchema = createSequenceStatement->sequence->schemaname;
|
char *sequenceSchema = createSequenceStatement->sequence->schemaname;
|
||||||
RangeVar *sequenceRange = makeRangeVar(sequenceSchema, sequenceName, -1);
|
|
||||||
|
|
||||||
Oid sequenceRelationId = RangeVarGetRelid(sequenceRange, AccessShareLock, true);
|
RangeVarGetAndCheckCreationNamespace(createSequenceStatement->sequence, NoLock,
|
||||||
|
&sequenceOid);
|
||||||
if (sequenceRelationId != InvalidOid)
|
if (OidIsValid(sequenceOid))
|
||||||
{
|
{
|
||||||
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceRelationId);
|
Form_pg_sequence pgSequenceForm = pg_get_sequencedef(sequenceOid);
|
||||||
if (pgSequenceForm->seqtypid != sequenceTypeId)
|
if (pgSequenceForm->seqtypid != sequenceTypeId)
|
||||||
{
|
{
|
||||||
StringInfo dropSequenceString = makeStringInfo();
|
StringInfo dropSequenceString = makeStringInfo();
|
||||||
|
|
|
@ -22,8 +22,17 @@ WITH dist_node_summary AS (
|
||||||
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||||
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
ARRAY[dist_node_summary.query, dist_node_summary.query],
|
||||||
false)
|
false)
|
||||||
|
), dist_placement_summary AS (
|
||||||
|
SELECT 'SELECT jsonb_agg(pg_dist_placement ORDER BY placementid) FROM pg_dist_placement' AS query
|
||||||
|
), dist_placement_check AS (
|
||||||
|
SELECT count(distinct result) = 1 AS matches
|
||||||
|
FROM dist_placement_summary CROSS JOIN LATERAL
|
||||||
|
master_run_on_worker(ARRAY[hostname, 'localhost'], ARRAY[port, master_port],
|
||||||
|
ARRAY[dist_placement_summary.query, dist_placement_summary.query],
|
||||||
|
false)
|
||||||
)
|
)
|
||||||
SELECT dist_node_check.matches FROM dist_node_check
|
SELECT dist_node_check.matches AND dist_placement_check.matches
|
||||||
|
FROM dist_node_check CROSS JOIN dist_placement_check
|
||||||
$$;
|
$$;
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue