Merge pull request #1525 from citusdata/refactor_create_distributed_table

Refactor distributed table creation logic
pull/1477/head
Burak Yücesoy 2017-07-31 12:08:18 +03:00 committed by GitHub
commit f0275fe4ae
19 changed files with 767 additions and 615 deletions

File diff suppressed because it is too large Load Diff

View File

@ -183,7 +183,7 @@ DropShardsFromWorker(WorkerNode *workerNode, Oid relationId, List *shardInterval
}
}
if (relationKind == RELKIND_RELATION)
if (RegularTable(relationId))
{
appendStringInfo(workerCommand, DROP_REGULAR_TABLE_COMMAND, shardNames->data);
}

View File

@ -721,7 +721,7 @@ ShardStorageType(Oid relationId)
char shardStorageType = 0;
char relationType = get_rel_relkind(relationId);
if (relationType == RELKIND_RELATION)
if (RegularTable(relationId))
{
shardStorageType = SHARD_STORAGE_TABLE;
}

View File

@ -389,7 +389,7 @@ RecreateTableDDLCommandList(Oid relationId)
bool includeSequenceDefaults = false;
/* build appropriate DROP command based on relation kind */
if (relationKind == RELKIND_RELATION)
if (RegularTable(relationId))
{
appendStringInfo(dropCommand, DROP_REGULAR_TABLE_COMMAND,
qualifiedRelationName);

View File

@ -36,6 +36,7 @@
#include "distributed/citus_ruleutils.h"
#include "distributed/multi_partitioning_utils.h"
#include "distributed/relay_utility.h"
#include "distributed/master_metadata_utility.h"
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "nodes/nodes.h"
@ -59,7 +60,6 @@
static void AppendOptionListToString(StringInfo stringData, List *options);
static bool SupportedRelationKind(Relation relation);
static const char * convert_aclright_to_string(int aclright);
@ -291,7 +291,6 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
AttrNumber constraintIndex = 0;
AttrNumber constraintCount = 0;
StringInfoData buffer = { NULL, 0, 0, 0 };
bool supportedRelationKind = false;
/*
* Instead of retrieving values from system catalogs as other functions in
@ -304,13 +303,7 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
relation = relation_open(tableRelationId, AccessShareLock);
relationName = generate_relation_name(tableRelationId, NIL);
supportedRelationKind = SupportedRelationKind(relation);
if (!supportedRelationKind)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("%s is not a regular, foreign, or partitioned table",
relationName)));
}
EnsureRelationKindSupported(tableRelationId);
initStringInfo(&buffer);
@ -491,25 +484,33 @@ pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
/*
* SupportedRelationKind returns true if the given relation is supported as a
* distributed relation.
* EnsureRelationKindSupported errors out if the given relation is not supported
* as a distributed relation.
*/
static bool
SupportedRelationKind(Relation relation)
void
EnsureRelationKindSupported(Oid relationId)
{
char relationKind = relation->rd_rel->relkind;
bool supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind ==
RELKIND_FOREIGN_TABLE);
#if (PG_VERSION_NUM >= 100000)
supportedRelationKind = supportedRelationKind || relationKind ==
RELKIND_PARTITIONED_TABLE;
#endif
char relationKind = get_rel_relkind(relationId);
bool supportedRelationKind = false;
/* Citus doesn't support bare inhereted tables (i.e., not a partition or partitioned table) */
supportedRelationKind = supportedRelationKind && !(IsChildTable(relation->rd_id) ||
IsParentTable(relation->rd_id));
supportedRelationKind = RegularTable(relationId) ||
relationKind == RELKIND_FOREIGN_TABLE;
return supportedRelationKind;
/*
* Citus doesn't support bare inherited tables (i.e., not a partition or
* partitioned table)
*/
supportedRelationKind = supportedRelationKind && !(IsChildTable(relationId) ||
IsParentTable(relationId));
if (!supportedRelationKind)
{
char *relationName = get_rel_name(relationId);
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("%s is not a regular, foreign or partitioned table",
relationName)));
}
}
@ -523,15 +524,12 @@ char *
pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
{
Relation relation = NULL;
char *relationName = NULL;
char relationKind = 0;
TupleDesc tupleDescriptor = NULL;
AttrNumber attributeIndex = 0;
List *columnOptionList = NIL;
ListCell *columnOptionCell = NULL;
bool firstOptionPrinted = false;
StringInfoData buffer = { NULL, 0, 0, 0 };
bool supportedRelationKind = false;
/*
* Instead of retrieving values from system catalogs, we open the relation,
@ -539,22 +537,8 @@ pg_get_tablecolumnoptionsdef_string(Oid tableRelationId)
* This is primarily to maintain symmetry with pg_get_tableschemadef.
*/
relation = relation_open(tableRelationId, AccessShareLock);
relationName = generate_relation_name(tableRelationId, NIL);
relationKind = relation->rd_rel->relkind;
supportedRelationKind = (relationKind == RELKIND_RELATION || relationKind ==
RELKIND_FOREIGN_TABLE);
#if (PG_VERSION_NUM >= 100000)
supportedRelationKind = supportedRelationKind || relationKind ==
RELKIND_PARTITIONED_TABLE;
#endif
if (!supportedRelationKind)
{
ereport(ERROR, (errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("%s is not a regular or foreign table or partitioned",
relationName)));
}
EnsureRelationKindSupported(tableRelationId);
/*
* Iterate over the table's columns. If a particular column is not dropped

View File

@ -904,6 +904,15 @@ ColocatedTableId(Oid colocationId)
ScanKeyData scanKey[1];
int scanKeyCount = 1;
/*
* We may have a distributed table whose colocation id is INVALID_COLOCATION_ID.
* In this case, we do not want to send that table's id as colocated table id.
*/
if (colocationId == INVALID_COLOCATION_ID)
{
return colocatedTableId;
}
ScanKeyInit(&scanKey[0], Anum_pg_dist_partition_colocationid,
BTEqualStrategyNumber, F_INT4EQ, ObjectIdGetDatum(colocationId));

View File

@ -34,6 +34,7 @@ extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
extern void EnsureRelationKindSupported(Oid relationId);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern void deparse_shard_index_statement(IndexStmt *origStmt, Oid distrelid,
int64 shardid, StringInfo buffer);

View File

@ -139,6 +139,9 @@ extern void UpdateShardPlacementState(uint64 placementId, char shardState);
extern void DeleteShardPlacementRow(uint64 placementId);
extern void UpdateColocationGroupReplicationFactor(uint32 colocationId,
int replicationFactor);
extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
char distributionMethod, char *colocateWithTableName,
bool viaDeprecatedAPI);
extern void CreateTruncateTrigger(Oid relationId);
/* Remaining metadata utility functions */
@ -147,6 +150,7 @@ extern void EnsureTablePermissions(Oid relationId, AclMode mode);
extern void EnsureTableOwner(Oid relationId);
extern void EnsureSuperUser(void);
extern void EnsureReplicationSettings(Oid relationId, char replicationModel);
extern bool RegularTable(Oid relationId);
extern bool TableReferenced(Oid relationId);
extern char * ConstructQualifiedShardName(ShardInterval *shardInterval);
extern Datum StringToDatum(char *inputString, Oid dataType);

View File

@ -8,10 +8,10 @@ step s1-begin:
BEGIN;
step s1-insert:
INSERT INTO test_table VALUES(1);
INSERT INTO test_concurrent_dml VALUES(1);
step s2-update:
UPDATE test_table SET data = 'blarg' WHERE test_id = 1;
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
<waiting ...>
step s1-commit:
COMMIT;
@ -23,8 +23,8 @@ master_create_worker_shards
step s1-insert:
INSERT INTO test_table VALUES(1);
INSERT INTO test_concurrent_dml VALUES(1);
step s2-update:
UPDATE test_table SET data = 'blarg' WHERE test_id = 1;
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;

View File

@ -2,16 +2,16 @@ Parsed test spec with 2 sessions
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -29,7 +29,7 @@ master_copy_shard_placement
step s1-update:
UPDATE test_table SET y = 5 WHERE x = 5;
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
@ -42,7 +42,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -55,16 +55,16 @@ nodeport success result
starting permutation: s1-load-cache s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -82,7 +82,7 @@ master_copy_shard_placement
step s1-delete:
DELETE FROM test_table WHERE x = 5;
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
@ -95,7 +95,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -108,13 +108,13 @@ nodeport success result
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-insert s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -132,7 +132,7 @@ master_copy_shard_placement
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
@ -145,7 +145,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -158,13 +158,13 @@ nodeport success result
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-copy s2-commit s1-commit s2-print-content
step s1-load-cache:
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -182,7 +182,7 @@ master_copy_shard_placement
step s1-copy:
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
@ -195,7 +195,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -208,13 +208,13 @@ nodeport success result
starting permutation: s1-load-cache s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-ddl s2-commit s1-commit s2-print-index-count
step s1-load-cache:
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -232,7 +232,7 @@ master_copy_shard_placement
step s1-ddl:
CREATE INDEX test_table_index ON test_table(x);
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
<waiting ...>
step s2-commit:
COMMIT;
@ -245,7 +245,7 @@ step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
ORDER BY
nodeport;
@ -258,13 +258,13 @@ nodeport success result
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-update s2-commit s1-commit s2-print-content
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -282,7 +282,7 @@ master_copy_shard_placement
step s1-update:
UPDATE test_table SET y = 5 WHERE x = 5;
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
@ -295,7 +295,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -308,13 +308,13 @@ nodeport success result
starting permutation: s1-insert s1-begin s1-select s2-set-placement-inactive s2-begin s2-repair-placement s1-delete s2-commit s1-commit s2-print-content
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -332,7 +332,7 @@ master_copy_shard_placement
step s1-delete:
DELETE FROM test_table WHERE x = 5;
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
<waiting ...>
step s2-commit:
COMMIT;
@ -345,7 +345,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -361,7 +361,7 @@ step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -379,7 +379,7 @@ master_copy_shard_placement
step s1-insert:
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
<waiting ...>
step s2-commit:
COMMIT;
@ -392,7 +392,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -408,7 +408,7 @@ step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -426,7 +426,7 @@ master_copy_shard_placement
step s1-copy:
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
<waiting ...>
step s2-commit:
COMMIT;
@ -439,7 +439,7 @@ step s2-print-content:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -455,7 +455,7 @@ step s1-begin:
BEGIN;
step s1-select:
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
count
@ -473,7 +473,7 @@ master_copy_shard_placement
step s1-ddl:
CREATE INDEX test_table_index ON test_table(x);
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
<waiting ...>
step s2-commit:
COMMIT;
@ -486,7 +486,7 @@ step s2-print-index-count:
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
ORDER BY
nodeport;

View File

@ -5,16 +5,16 @@ master_create_worker_shards
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s1-begin:
BEGIN;
step s1-insertone:
INSERT INTO test_table VALUES(1, 1);
INSERT INTO test_dml_vs_repair VALUES(1, 1);
step s2-repair:
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
@ -29,19 +29,19 @@ master_create_worker_shards
step s1-insertone:
INSERT INTO test_table VALUES(1, 1);
INSERT INTO test_dml_vs_repair VALUES(1, 1);
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s1-begin:
BEGIN;
step s1-insertall:
INSERT INTO test_table SELECT test_id, data+1 FROM test_table;
INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair;
step s2-repair:
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
<waiting ...>
step s1-commit:
COMMIT;
@ -56,41 +56,41 @@ master_create_worker_shards
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s2-begin:
BEGIN;
step s2-repair:
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
master_copy_shard_placement
step s1-insertone:
INSERT INTO test_table VALUES(1, 1);
INSERT INTO test_dml_vs_repair VALUES(1, 1);
<waiting ...>
step s2-commit:
COMMIT;
step s1-insertone: <... completed>
step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data
1 1
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data
@ -101,7 +101,7 @@ master_create_worker_shards
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s1-prepared-insertone:
EXECUTE insertone;
@ -110,7 +110,7 @@ step s2-begin:
BEGIN;
step s2-repair:
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
master_copy_shard_placement
@ -124,22 +124,22 @@ step s2-commit:
step s1-prepared-insertone: <... completed>
error in steps s2-commit s1-prepared-insertone: ERROR: prepared modifications cannot be executed on a shard while it is being copied
step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data
1 1
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data
@ -150,10 +150,10 @@ master_create_worker_shards
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s1-insertone:
INSERT INTO test_table VALUES(1, 1);
INSERT INTO test_dml_vs_repair VALUES(1, 1);
step s1-prepared-insertall:
EXECUTE insertall;
@ -162,7 +162,7 @@ step s2-begin:
BEGIN;
step s2-repair:
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
master_copy_shard_placement
@ -176,23 +176,23 @@ step s2-commit:
step s1-prepared-insertall: <... completed>
error in steps s2-commit s1-prepared-insertall: ERROR: prepared modifications cannot be executed on a shard while it is being copied
step s2-invalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data
1 1
1 2
step s2-invalidate-57637:
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
step s2-revalidate-57638:
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
step s1-display:
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
test_id data

View File

@ -48,7 +48,7 @@ DETAIL: Distributed relations must not specify the WITH (OIDS) option in their
ALTER TABLE table_to_distribute SET WITHOUT OIDS;
-- use an index instead of table name
SELECT master_create_distributed_table('table_to_distribute_pkey', 'id', 'hash');
ERROR: table_to_distribute_pkey is not a regular or foreign table
ERROR: table_to_distribute_pkey is not a regular, foreign or partitioned table
-- use a bad column name
SELECT master_create_distributed_table('table_to_distribute', 'bad_column', 'hash');
ERROR: column "bad_column" of relation "table_to_distribute" does not exist

View File

@ -146,7 +146,7 @@ NOTICE: foreign-data wrapper "fake_fdw" does not have an extension defined
-- propagating views is not supported
CREATE VIEW local_view AS SELECT * FROM simple_table;
SELECT table_ddl_command_array('local_view');
ERROR: public.local_view is not a regular, foreign, or partitioned table
ERROR: local_view is not a regular, foreign or partitioned table
-- clean up
DROP VIEW IF EXISTS local_view;
DROP FOREIGN TABLE IF EXISTS foreign_table;

View File

@ -415,8 +415,8 @@ SELECT table_inherited('date_partitioned_table');
-- also these are not supported
SELECT master_get_table_ddl_events('capitals');
ERROR: public.capitals is not a regular, foreign, or partitioned table
ERROR: capitals is not a regular, foreign or partitioned table
SELECT master_get_table_ddl_events('cities');
ERROR: public.cities is not a regular, foreign, or partitioned table
ERROR: cities is not a regular, foreign or partitioned table
-- dropping parents frop the partitions
DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals;

View File

@ -355,9 +355,9 @@ LINE 1: SELECT table_inherited('date_partitioned_table');
^
-- also these are not supported
SELECT master_get_table_ddl_events('capitals');
ERROR: public.capitals is not a regular, foreign, or partitioned table
ERROR: capitals is not a regular, foreign or partitioned table
SELECT master_get_table_ddl_events('cities');
ERROR: public.cities is not a regular, foreign, or partitioned table
ERROR: cities is not a regular, foreign or partitioned table
-- dropping parents frop the partitions
DROP TABLE date_partitioned_table, multi_column_partitioned, list_partitioned, partition_parent_schema.parent_table, cities, capitals;
ERROR: table "date_partitioned_table" does not exist

View File

@ -5,7 +5,8 @@ test: isolation_add_node_vs_reference_table_operations
# that come later can be parallelized
test: isolation_cluster_management
test: isolation_dml_vs_repair isolation_copy_placement_vs_copy_placement isolation_cancellation
test: isolation_dml_vs_repair
test: isolation_copy_placement_vs_copy_placement isolation_cancellation
test: isolation_concurrent_dml isolation_data_migration
test: isolation_drop_shards isolation_copy_placement_vs_modification
test: isolation_insert_vs_vacuum isolation_transaction_recovery

View File

@ -1,13 +1,13 @@
setup
{
CREATE TABLE test_table (test_id integer NOT NULL, data text);
SELECT master_create_distributed_table('test_table', 'test_id', 'hash');
SELECT master_create_worker_shards('test_table', 4, 2);
CREATE TABLE test_concurrent_dml (test_id integer NOT NULL, data text);
SELECT master_create_distributed_table('test_concurrent_dml', 'test_id', 'hash');
SELECT master_create_worker_shards('test_concurrent_dml', 4, 2);
}
teardown
{
DROP TABLE IF EXISTS test_table CASCADE;
DROP TABLE IF EXISTS test_concurrent_dml CASCADE;
}
session "s1"
@ -19,7 +19,7 @@ step "s1-begin"
step "s1-insert"
{
INSERT INTO test_table VALUES(1);
INSERT INTO test_concurrent_dml VALUES(1);
}
step "s1-commit"
@ -31,7 +31,7 @@ session "s2"
step "s2-update"
{
UPDATE test_table SET data = 'blarg' WHERE test_id = 1;
UPDATE test_concurrent_dml SET data = 'blarg' WHERE test_id = 1;
}
# verify that an in-progress insert blocks concurrent updates

View File

@ -4,15 +4,15 @@ setup
{
SET citus.shard_count TO 2;
SET citus.shard_replication_factor TO 2;
CREATE TABLE test_table (x int, y int);
SELECT create_distributed_table('test_table', 'x');
CREATE TABLE test_copy_placement_vs_modification (x int, y int);
SELECT create_distributed_table('test_copy_placement_vs_modification', 'x');
SELECT get_shard_id_for_distribution_column('test_table', 5) INTO selected_shard;
SELECT get_shard_id_for_distribution_column('test_copy_placement_vs_modification', 5) INTO selected_shard;
}
teardown
{
DROP TABLE test_table;
DROP TABLE test_copy_placement_vs_modification;
DROP TABLE selected_shard;
}
@ -23,41 +23,41 @@ step "s1-begin"
BEGIN;
}
# since test_table has rep > 1 simple select query doesn't hit all placements
# since test_copy_placement_vs_modification has rep > 1 simple select query doesn't hit all placements
# hence not all placements are cached
step "s1-load-cache"
{
TRUNCATE test_table;
TRUNCATE test_copy_placement_vs_modification;
}
step "s1-insert"
{
INSERT INTO test_table VALUES (5, 10);
INSERT INTO test_copy_placement_vs_modification VALUES (5, 10);
}
step "s1-update"
{
UPDATE test_table SET y = 5 WHERE x = 5;
UPDATE test_copy_placement_vs_modification SET y = 5 WHERE x = 5;
}
step "s1-delete"
{
DELETE FROM test_table WHERE x = 5;
DELETE FROM test_copy_placement_vs_modification WHERE x = 5;
}
step "s1-select"
{
SELECT count(*) FROM test_table WHERE x = 5;
SELECT count(*) FROM test_copy_placement_vs_modification WHERE x = 5;
}
step "s1-ddl"
{
CREATE INDEX test_table_index ON test_table(x);
CREATE INDEX test_copy_placement_vs_modification_index ON test_copy_placement_vs_modification(x);
}
step "s1-copy"
{
COPY test_table FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
COPY test_copy_placement_vs_modification FROM PROGRAM 'echo "1,1\n2,2\n3,3\n4,4\n5,5"' WITH CSV;
}
step "s1-commit"
@ -92,7 +92,7 @@ step "s2-print-content"
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select y from %s WHERE x = 5')
run_command_on_placements('test_copy_placement_vs_modification', 'select y from %s WHERE x = 5')
WHERE
shardid IN (SELECT * FROM selected_shard)
ORDER BY
@ -104,7 +104,7 @@ step "s2-print-index-count"
SELECT
nodeport, success, result
FROM
run_command_on_placements('test_table', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
run_command_on_placements('test_copy_placement_vs_modification', 'select count(*) from pg_indexes WHERE schemaname || ''.'' || tablename = ''%s''')
ORDER BY
nodeport;
}

View File

@ -1,13 +1,13 @@
setup
{
CREATE TABLE test_table (test_id integer NOT NULL, data int);
SELECT master_create_distributed_table('test_table', 'test_id', 'hash');
SELECT master_create_worker_shards('test_table', 1, 2);
CREATE TABLE test_dml_vs_repair (test_id integer NOT NULL, data int);
SELECT master_create_distributed_table('test_dml_vs_repair', 'test_id', 'hash');
SELECT master_create_worker_shards('test_dml_vs_repair', 1, 2);
}
teardown
{
DROP TABLE IF EXISTS test_table CASCADE;
DROP TABLE IF EXISTS test_dml_vs_repair CASCADE;
}
session "s1"
@ -15,9 +15,9 @@ session "s1"
setup
{
DEALLOCATE all;
TRUNCATE test_table;
PREPARE insertone AS INSERT INTO test_table VALUES(1, 1);
PREPARE insertall AS INSERT INTO test_table SELECT test_id, data+1 FROM test_table;
TRUNCATE test_dml_vs_repair;
PREPARE insertone AS INSERT INTO test_dml_vs_repair VALUES(1, 1);
PREPARE insertall AS INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair;
}
step "s1-begin"
@ -27,7 +27,7 @@ step "s1-begin"
step "s1-insertone"
{
INSERT INTO test_table VALUES(1, 1);
INSERT INTO test_dml_vs_repair VALUES(1, 1);
}
step "s1-prepared-insertone"
@ -37,7 +37,7 @@ step "s1-prepared-insertone"
step "s1-insertall"
{
INSERT INTO test_table SELECT test_id, data+1 FROM test_table;
INSERT INTO test_dml_vs_repair SELECT test_id, data+1 FROM test_dml_vs_repair;
}
step "s1-prepared-insertall"
@ -47,7 +47,7 @@ step "s1-prepared-insertall"
step "s1-display"
{
SELECT * FROM test_table WHERE test_id = 1;
SELECT * FROM test_dml_vs_repair WHERE test_id = 1;
}
step "s1-commit"
@ -65,27 +65,27 @@ step "s2-begin"
step "s2-invalidate-57637"
{
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
}
step "s2-revalidate-57637"
{
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57637;
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57637;
}
step "s2-invalidate-57638"
{
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '3' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
}
step "s2-revalidate-57638"
{
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass) AND nodeport = 57638;
UPDATE pg_dist_shard_placement SET shardstate = '1' WHERE shardid = (SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass) AND nodeport = 57638;
}
step "s2-repair"
{
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_table'::regclass), 'localhost', 57638, 'localhost', 57637);
SELECT master_copy_shard_placement((SELECT shardid FROM pg_dist_shard WHERE logicalrelid = 'test_dml_vs_repair'::regclass), 'localhost', 57638, 'localhost', 57637);
}
step "s2-commit"