Changes depending on the discussion

velioglu/wo_seq_test_1
Burak Velioglu 2021-12-30 14:38:31 +03:00
parent a348b6b097
commit 848d13f6eb
No known key found for this signature in database
GPG Key ID: F6827E620F6549C6
8 changed files with 23 additions and 26 deletions

View File

@ -1239,23 +1239,15 @@ FinalizeCitusLocalTableCreation(Oid relationId, List *dependentSequenceList)
CreateTruncateTrigger(relationId); CreateTruncateTrigger(relationId);
} }
CreateShellTableOnWorkers(relationId);
ObjectAddress relationAddress = { 0 };
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
CreateShellTableOnWorkers(relationId);
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
ObjectAddress relationAddress = { 0 };
ObjectAddressSet(relationAddress, RelationRelationId, relationId);
MarkObjectDistributed(&relationAddress); MarkObjectDistributed(&relationAddress);
} }
else
{
/* Mark the table as distributed only locally */
bool prevDependencyCreationValue = EnableDependencyCreation;
SetLocalEnableDependencyCreation(false);
MarkObjectDistributed(&relationAddress);
SetLocalEnableDependencyCreation(prevDependencyCreationValue);
}
/* /*
* We've a custom way of foreign key graph invalidation, * We've a custom way of foreign key graph invalidation,

View File

@ -532,19 +532,16 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
CreateReferenceTableShard(relationId); CreateReferenceTableShard(relationId);
} }
CreateShellTableOnWorkers(relationId);
if (ShouldSyncTableMetadata(relationId)) if (ShouldSyncTableMetadata(relationId))
{ {
CreateShellTableOnWorkers(relationId);
MarkObjectDistributed(&tableAddress); MarkObjectDistributed(&tableAddress);
CreateTableMetadataOnWorkers(relationId); CreateTableMetadataOnWorkers(relationId);
} }
else else
{ {
/* Mark the table as distributed only locally */ // sync etmedigimizi ignore ediyoruz commenti, ozellikle append dist tablolar icin
bool prevDependencyCreationValue = EnableDependencyCreation; // backward compatible olarak calisacak, sync de yapsak gitmicek
SetLocalEnableDependencyCreation(false);
MarkObjectDistributed(&tableAddress);
SetLocalEnableDependencyCreation(prevDependencyCreationValue);
} }
/* /*
@ -602,11 +599,15 @@ CreateDistributedTable(Oid relationId, Var *distributionColumn, char distributio
* If any other distributed table uses the input sequence, it checks whether * If any other distributed table uses the input sequence, it checks whether
* the types of the columns using the sequence match. If they don't, it errors out. * the types of the columns using the sequence match. If they don't, it errors out.
* Otherwise, the condition is ensured. * Otherwise, the condition is ensured.
* Since the owner relation id may not have been distributed yet, we need to check it
* in addition all citus tables as well.
*/ */
void void
EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId) EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId)
{ {
List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE); List *citusTableIdList = CitusTableTypeIdList(ANY_CITUS_TABLE_TYPE);
citusTableIdList = lappend_oid(citusTableIdList, ownerRelationId);
Oid citusTableId = InvalidOid; Oid citusTableId = InvalidOid;
foreach_oid(citusTableId, citusTableIdList) foreach_oid(citusTableId, citusTableIdList)
{ {
@ -750,7 +751,7 @@ EnsureDistributedSequencesHaveOneType(Oid relationId, List *dependentSequenceLis
* that sequence is supported * that sequence is supported
*/ */
Oid seqTypId = GetAttributeTypeOid(relationId, attnum); Oid seqTypId = GetAttributeTypeOid(relationId, attnum);
EnsureSequenceTypeSupported(sequenceOid, seqTypId); EnsureSequenceTypeSupported(sequenceOid, seqTypId, relationId);
/* /*
* Alter the sequence's data type in the coordinator if needed. * Alter the sequence's data type in the coordinator if needed.

View File

@ -236,7 +236,7 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
return NIL; return NIL;
} }
if (relKind == RELKIND_RELATION) if (relKind == RELKIND_RELATION || relKind == RELKIND_PARTITIONED_TABLE)
{ {
Oid relationId = dependency->objectId; Oid relationId = dependency->objectId;
if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId)) if (IsCitusTable(relationId) && !IsTableOwnedByExtension(relationId))
@ -255,7 +255,6 @@ GetDependencyCreateDDLCommands(const ObjectAddress *dependency)
tableDDLCommand)); tableDDLCommand));
} }
/* TODO: May need to move sequence dependencies to ActivateNode directly */
List *sequenceDependencyCommandList = SequenceDependencyCommandList( List *sequenceDependencyCommandList = SequenceDependencyCommandList(
dependency->objectId); dependency->objectId);
commandList = list_concat(commandList, sequenceDependencyCommandList); commandList = list_concat(commandList, sequenceDependencyCommandList);

View File

@ -908,6 +908,7 @@ SetUpDistributedTableWithDependencies(WorkerNode *newWorkerNode)
* transaction blocks. * transaction blocks.
*/ */
/* TODO: Doesn't make sense to have that here as we won't handle placement metadata */ /* TODO: Doesn't make sense to have that here as we won't handle placement metadata */
// TODO: Metadatasynced olmayan worker varsa patla (Onder'in PRinda gidebilir)
/* with maintenance daemon anymore */ /* with maintenance daemon anymore */
/* if (ClusterHasDistributedFunctionWithDistArgument()) /* if (ClusterHasDistributedFunctionWithDistArgument())

View File

@ -285,7 +285,7 @@ extern bool GetNodeDiskSpaceStatsForConnection(MultiConnection *connection,
uint64 *availableBytes, uint64 *availableBytes,
uint64 *totalBytes); uint64 *totalBytes);
extern void ExecuteQueryViaSPI(char *query, int SPIOK); extern void ExecuteQueryViaSPI(char *query, int SPIOK);
extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId); extern void EnsureSequenceTypeSupported(Oid seqOid, Oid seqTypId, Oid ownerRelationId);
extern void AlterSequenceType(Oid seqOid, Oid typeOid); extern void AlterSequenceType(Oid seqOid, Oid typeOid);
extern void MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId, extern void MarkSequenceListDistributedAndPropagateWithDependencies(Oid relationId,
List *sequenceList); List *sequenceList);

View File

@ -35,8 +35,8 @@ test: alter_database_owner
test: multi_test_catalog_views test: multi_test_catalog_views
test: multi_table_ddl test: multi_table_ddl
test: check_mx test: check_mx
test: turn_mx_off
test: multi_sequence_default test: multi_sequence_default
test: turn_mx_off
test: multi_name_lengths test: multi_name_lengths
test: turn_mx_on test: turn_mx_on
test: multi_name_resolution test: multi_name_resolution

View File

@ -12,6 +12,7 @@ SET search_path = sequence_default, public;
-- test both distributed and citus local tables -- test both distributed and citus local tables
table pg_dist_node;
SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0); SELECT 1 FROM citus_add_node('localhost', :master_port, groupId => 0);
-- Cannot add a column involving DEFAULT nextval('..') because the table is not empty -- Cannot add a column involving DEFAULT nextval('..') because the table is not empty
CREATE SEQUENCE seq_0; CREATE SEQUENCE seq_0;
@ -91,7 +92,6 @@ CREATE TABLE seq_test_4 (x int, y int);
SELECT create_distributed_table('seq_test_4','x'); SELECT create_distributed_table('seq_test_4','x');
CREATE SEQUENCE seq_4; CREATE SEQUENCE seq_4;
ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4'); ALTER TABLE seq_test_4 ADD COLUMN a bigint DEFAULT nextval('seq_4');
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
DROP SEQUENCE seq_4 CASCADE; DROP SEQUENCE seq_4 CASCADE;
TRUNCATE seq_test_4; TRUNCATE seq_test_4;
CREATE SEQUENCE seq_4; CREATE SEQUENCE seq_4;
@ -366,7 +366,7 @@ SELECT create_reference_table('seq_test_10');
INSERT INTO seq_test_10 VALUES (0); INSERT INTO seq_test_10 VALUES (0);
CREATE TABLE seq_test_11 (col0 int, col1 bigint DEFAULT nextval('seq_11'::text)); CREATE TABLE seq_test_11 (col0 int, col1 bigint DEFAULT nextval('seq_11'::text));
-- works but doesn't create seq_11 in the workers -- works but doesn't create seq_11 in the workers
SELECT start_metadata_sync_to_node('localhost', :worker_1_port); SELECT citus_activate_node('localhost', :worker_1_port);
-- works because there is no dependency created between seq_11 and seq_test_10 -- works because there is no dependency created between seq_11 and seq_test_10
SELECT create_distributed_table('seq_test_11', 'col1'); SELECT create_distributed_table('seq_test_11', 'col1');
-- insertion from workers fails -- insertion from workers fails

View File

@ -4,6 +4,10 @@ SET citus.shard_count TO 4;
SET citus.shard_replication_factor TO 1; SET citus.shard_replication_factor TO 1;
SET citus.next_shard_id TO 90630500; SET citus.next_shard_id TO 90630500;
table pg_dist_node;
table pg_dist_partition;
\d
-- Ensure tuple data in explain analyze output is the same on all PG versions -- Ensure tuple data in explain analyze output is the same on all PG versions
SET citus.enable_binary_protocol = TRUE; SET citus.enable_binary_protocol = TRUE;