mirror of https://github.com/citusdata/citus.git
Merge pull request #3009 from citusdata/small_serial
Support serial and smallserial when syncing metadatapull/3005/head
commit
59fe461d4a
|
@ -57,8 +57,6 @@ static void MarkNodeHasMetadata(char *nodeName, int32 nodePort, bool hasMetadata
|
|||
static void UpdateDistNodeBoolAttr(char *nodeName, int32 nodePort, int attrNum,
|
||||
bool value);
|
||||
static List * SequenceDDLCommandsForTable(Oid relationId);
|
||||
static void EnsureSupportedSequenceColumnType(Oid sequenceOid);
|
||||
static Oid TypeOfColumn(Oid tableId, int16 columnId);
|
||||
static char * TruncateTriggerCreateCommand(Oid relationId);
|
||||
static char * SchemaOwnerName(Oid objectId);
|
||||
static bool HasMetadataWorkers(void);
|
||||
|
@ -1048,13 +1046,15 @@ SequenceDDLCommandsForTable(Oid relationId)
|
|||
StringInfo wrappedSequenceDef = makeStringInfo();
|
||||
StringInfo sequenceGrantStmt = makeStringInfo();
|
||||
char *sequenceName = generate_qualified_relation_name(sequenceOid);
|
||||
|
||||
EnsureSupportedSequenceColumnType(sequenceOid);
|
||||
Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceOid);
|
||||
Oid sequenceTypeOid = sequenceData->seqtypid;
|
||||
char *typeName = format_type_be(sequenceTypeOid);
|
||||
|
||||
/* create schema if needed */
|
||||
appendStringInfo(wrappedSequenceDef,
|
||||
WORKER_APPLY_SEQUENCE_COMMAND,
|
||||
escapedSequenceDef);
|
||||
escapedSequenceDef,
|
||||
quote_literal_cstr(typeName));
|
||||
|
||||
appendStringInfo(sequenceGrantStmt,
|
||||
"ALTER SEQUENCE %s OWNER TO %s", sequenceName,
|
||||
|
@ -1094,63 +1094,6 @@ CreateSchemaDDLCommand(Oid schemaId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* EnsureSupportedSequenceColumnType looks at the column which depends on this sequence
|
||||
* (which it Assert's exists) and makes sure its type is suitable for use in a disributed
|
||||
* manner.
|
||||
*
|
||||
* Any column which depends on a sequence (and will therefore be replicated) but which is
|
||||
* not a bigserial cannot be used for an mx table, because there aren't enough values to
|
||||
* ensure that generated numbers are globally unique.
|
||||
*/
|
||||
static void
|
||||
EnsureSupportedSequenceColumnType(Oid sequenceOid)
|
||||
{
|
||||
Oid tableId = InvalidOid;
|
||||
Oid columnType = InvalidOid;
|
||||
int32 columnId = 0;
|
||||
bool shouldSyncMetadata = false;
|
||||
bool hasMetadataWorkers = HasMetadataWorkers();
|
||||
|
||||
/* call sequenceIsOwned in order to get the tableId and columnId */
|
||||
bool sequenceOwned = sequenceIsOwned(sequenceOid, DEPENDENCY_AUTO, &tableId,
|
||||
&columnId);
|
||||
if (!sequenceOwned)
|
||||
{
|
||||
sequenceOwned = sequenceIsOwned(sequenceOid, DEPENDENCY_INTERNAL, &tableId,
|
||||
&columnId);
|
||||
}
|
||||
|
||||
Assert(sequenceOwned);
|
||||
|
||||
shouldSyncMetadata = ShouldSyncTableMetadata(tableId);
|
||||
|
||||
columnType = TypeOfColumn(tableId, (int16) columnId);
|
||||
|
||||
if (columnType != INT8OID && shouldSyncMetadata && hasMetadataWorkers)
|
||||
{
|
||||
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
|
||||
errmsg("cannot create an mx table with a serial or smallserial "
|
||||
"column "),
|
||||
errdetail("Only bigserial is supported in mx tables.")));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TypeOfColumn returns the Oid of the type of the provided column of the provided table.
|
||||
*/
|
||||
static Oid
|
||||
TypeOfColumn(Oid tableId, int16 columnId)
|
||||
{
|
||||
Relation tableRelation = relation_open(tableId, NoLock);
|
||||
TupleDesc tupleDescriptor = RelationGetDescr(tableRelation);
|
||||
Form_pg_attribute attrForm = TupleDescAttr(tupleDescriptor, columnId - 1);
|
||||
relation_close(tableRelation, NoLock);
|
||||
return attrForm->atttypid;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* TruncateTriggerCreateCommand creates a SQL query calling worker_create_truncate_trigger
|
||||
* function, which creates the truncate trigger on the worker.
|
||||
|
|
|
@ -147,4 +147,12 @@ ALTER TABLE pg_dist_node ADD COLUMN metadatasynced BOOLEAN DEFAULT FALSE;
|
|||
COMMENT ON COLUMN pg_dist_node.metadatasynced IS
|
||||
'indicates whether the node has the most recent metadata';
|
||||
|
||||
CREATE FUNCTION worker_apply_sequence_command(create_sequence_command text,
|
||||
sequence_type_id regtype DEFAULT 'bigint'::regtype)
|
||||
RETURNS VOID
|
||||
LANGUAGE C STRICT
|
||||
AS 'MODULE_PATHNAME', $$worker_apply_sequence_command$$;
|
||||
COMMENT ON FUNCTION worker_apply_sequence_command(text,regtype)
|
||||
IS 'create a sequence which produces globally unique values';
|
||||
|
||||
RESET search_path;
|
||||
|
|
|
@ -22,6 +22,7 @@
|
|||
#include "access/xact.h"
|
||||
#include "catalog/dependency.h"
|
||||
#include "catalog/namespace.h"
|
||||
#include "catalog/pg_type.h"
|
||||
#include "commands/copy.h"
|
||||
#include "commands/dbcommands.h"
|
||||
#include "commands/extension.h"
|
||||
|
@ -63,7 +64,8 @@ static void ReceiveResourceCleanup(int32 connectionId, const char *filename,
|
|||
int32 fileDescriptor);
|
||||
static void CitusDeleteFile(const char *filename);
|
||||
static bool check_log_statement(List *stmt_list);
|
||||
static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName);
|
||||
static void AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
|
||||
Oid sequenceTypeId);
|
||||
static void SetDefElemArg(AlterSeqStmt *statement, const char *name, Node *arg);
|
||||
|
||||
|
||||
|
@ -457,6 +459,7 @@ Datum
|
|||
worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
||||
{
|
||||
text *commandText = PG_GETARG_TEXT_P(0);
|
||||
Oid sequenceTypeId = PG_GETARG_OID(1);
|
||||
const char *commandString = text_to_cstring(commandText);
|
||||
Node *commandNode = ParseTreeNode(commandString);
|
||||
CreateSeqStmt *createSequenceStatement = NULL;
|
||||
|
@ -490,7 +493,7 @@ worker_apply_sequence_command(PG_FUNCTION_ARGS)
|
|||
AccessShareLock, false);
|
||||
Assert(sequenceRelationId != InvalidOid);
|
||||
|
||||
AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName);
|
||||
AlterSequenceMinMax(sequenceRelationId, sequenceSchema, sequenceName, sequenceTypeId);
|
||||
|
||||
PG_RETURN_VOID();
|
||||
}
|
||||
|
@ -862,27 +865,42 @@ check_log_statement(List *statementList)
|
|||
* GetLocalGroupId() << 48 + 1 and sets a maxvalue which stops it from passing out any
|
||||
* values greater than: (GetLocalGroupID() + 1) << 48.
|
||||
*
|
||||
* For serial we only have 32 bits and therefore shift by 28, and for smallserial
|
||||
* we only have 16 bits and therefore shift by 12.
|
||||
*
|
||||
* This is to ensure every group of workers passes out values from a unique range,
|
||||
* and therefore that all values generated for the sequence are globally unique.
|
||||
*/
|
||||
static void
|
||||
AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName)
|
||||
AlterSequenceMinMax(Oid sequenceId, char *schemaName, char *sequenceName,
|
||||
Oid sequenceTypeId)
|
||||
{
|
||||
Form_pg_sequence sequenceData = pg_get_sequencedef(sequenceId);
|
||||
int64 startValue = 0;
|
||||
int64 maxValue = 0;
|
||||
int64 sequenceMaxValue = sequenceData->seqmax;
|
||||
int64 sequenceMinValue = sequenceData->seqmin;
|
||||
int valueBitLength = 48;
|
||||
|
||||
/* for smaller types, put the group ID into the first 4 bits */
|
||||
if (sequenceTypeId == INT4OID)
|
||||
{
|
||||
valueBitLength = 28;
|
||||
sequenceMaxValue = INT_MAX;
|
||||
}
|
||||
else if (sequenceTypeId == INT2OID)
|
||||
{
|
||||
valueBitLength = 12;
|
||||
sequenceMaxValue = SHRT_MAX;
|
||||
}
|
||||
|
||||
/* calculate min/max values that the sequence can generate in this worker */
|
||||
startValue = (((int64) GetLocalGroupId()) << 48) + 1;
|
||||
maxValue = startValue + ((int64) 1 << 48);
|
||||
startValue = (((int64) GetLocalGroupId()) << valueBitLength) + 1;
|
||||
maxValue = startValue + ((int64) 1 << valueBitLength);
|
||||
|
||||
/*
|
||||
* We alter the sequence if the previously set min and max values are not equal to
|
||||
* their correct values. This happens when the sequence has been created
|
||||
* during shard, before the current worker having the metadata.
|
||||
* their correct values.
|
||||
*/
|
||||
if (sequenceMinValue != startValue || sequenceMaxValue != maxValue)
|
||||
{
|
||||
|
|
|
@ -51,7 +51,7 @@ extern bool SendOptionalCommandListToWorkerInTransaction(char *nodeName, int32 n
|
|||
"SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition"
|
||||
#define DISABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'off'"
|
||||
#define ENABLE_DDL_PROPAGATION "SET citus.enable_ddl_propagation TO 'on'"
|
||||
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s)"
|
||||
#define WORKER_APPLY_SEQUENCE_COMMAND "SELECT worker_apply_sequence_command (%s,%s)"
|
||||
#define UPSERT_PLACEMENT \
|
||||
"INSERT INTO pg_dist_placement " \
|
||||
"(shardid, shardstate, shardlength, " \
|
||||
|
|
|
@ -64,7 +64,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('public.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
|
@ -85,7 +85,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('public.mx_test_table'::regclass, 'h', column_name_to_column('public.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('public.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('public.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('public.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('public.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('public.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('public.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('public.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('public.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS public.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('public.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
|
@ -109,7 +109,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
|
@ -137,7 +137,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
|
@ -158,7 +158,7 @@ SELECT unnest(master_metadata_snapshot()) order by 1;
|
|||
INSERT INTO pg_dist_partition (logicalrelid, partmethod, partkey, colocationid, repmodel) VALUES ('mx_testing_schema.mx_test_table'::regclass, 'h', column_name_to_column('mx_testing_schema.mx_test_table','col_1'), 0, 's')
|
||||
INSERT INTO pg_dist_placement (shardid, shardstate, shardlength, groupid, placementid) VALUES (1310000, 1, 0, 1, 100000),(1310001, 1, 0, 2, 100001),(1310002, 1, 0, 1, 100002),(1310003, 1, 0, 2, 100003),(1310004, 1, 0, 1, 100004),(1310005, 1, 0, 2, 100005),(1310006, 1, 0, 1, 100006),(1310007, 1, 0, 2, 100007)
|
||||
INSERT INTO pg_dist_shard (logicalrelid, shardid, shardstorage, shardminvalue, shardmaxvalue) VALUES ('mx_testing_schema.mx_test_table'::regclass, 1310000, 't', '-2147483648', '-1610612737'),('mx_testing_schema.mx_test_table'::regclass, 1310001, 't', '-1610612736', '-1073741825'),('mx_testing_schema.mx_test_table'::regclass, 1310002, 't', '-1073741824', '-536870913'),('mx_testing_schema.mx_test_table'::regclass, 1310003, 't', '-536870912', '-1'),('mx_testing_schema.mx_test_table'::regclass, 1310004, 't', '0', '536870911'),('mx_testing_schema.mx_test_table'::regclass, 1310005, 't', '536870912', '1073741823'),('mx_testing_schema.mx_test_table'::regclass, 1310006, 't', '1073741824', '1610612735'),('mx_testing_schema.mx_test_table'::regclass, 1310007, 't', '1610612736', '2147483647')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE')
|
||||
SELECT worker_apply_sequence_command ('CREATE SEQUENCE IF NOT EXISTS mx_testing_schema.mx_test_table_col_3_seq INCREMENT BY 1 MINVALUE 1 MAXVALUE 9223372036854775807 START WITH 1 NO CYCLE','bigint')
|
||||
SELECT worker_create_truncate_trigger('mx_testing_schema.mx_test_table')
|
||||
SELECT worker_drop_distributed_table(logicalrelid::regclass::text) FROM pg_dist_partition
|
||||
TRUNCATE pg_dist_node CASCADE
|
||||
|
@ -923,7 +923,7 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te
|
|||
(1 row)
|
||||
|
||||
DROP TABLE mx_temp_drop_test;
|
||||
-- Check that MX tables can be created with SERIAL columns, but error out on metadata sync
|
||||
-- Check that MX tables can be created with SERIAL columns
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 3;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
@ -940,30 +940,35 @@ SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
|||
|
||||
(1 row)
|
||||
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL);
|
||||
-- sync table with serial column after create_distributed_table
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL, c SMALLSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
ERROR: cannot create an mx table with a serial or smallserial column
|
||||
DETAIL: Only bigserial is supported in mx tables.
|
||||
DROP TABLE mx_table_with_small_sequence;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
start_metadata_sync_to_node
|
||||
-----------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
-- Show that create_distributed_table errors out if the table has a SERIAL column and
|
||||
-- there are metadata workers
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
||||
ERROR: cannot create an mx table with a serial or smallserial column
|
||||
DETAIL: Only bigserial is supported in mx tables.
|
||||
DROP TABLE mx_table_with_small_sequence;
|
||||
-- Show that create_distributed_table works with a serial column
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL, c SMALLSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
||||
create_distributed_table
|
||||
--------------------------
|
||||
|
||||
(1 row)
|
||||
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (0);
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (1), (3);
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
-- Create an MX table with (BIGSERIAL) sequences
|
||||
CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_sequence', 'a');
|
||||
|
@ -1080,9 +1085,22 @@ SELECT nextval('mx_table_with_sequence_c_seq');
|
|||
562949953421313
|
||||
(1 row)
|
||||
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (2), (4);
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
\c - - - :master_port
|
||||
DROP TABLE mx_table_with_sequence;
|
||||
-- check our small sequence values
|
||||
SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c;
|
||||
a | b | c
|
||||
---+-----------+------
|
||||
0 | 1 | 1
|
||||
1 | 268435457 | 4097
|
||||
2 | 536870913 | 8193
|
||||
3 | 268435458 | 4098
|
||||
4 | 536870914 | 8194
|
||||
(5 rows)
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence;
|
||||
\d mx_table_with_sequence
|
||||
\ds mx_table_with_sequence_b_seq
|
||||
List of relations
|
||||
|
@ -1286,8 +1304,8 @@ ORDER BY
|
|||
nodeport;
|
||||
logicalrelid | partmethod | repmodel | shardid | placementid | nodename | nodeport
|
||||
--------------+------------+----------+---------+-------------+-----------+----------
|
||||
mx_ref | n | t | 1310071 | 100071 | localhost | 57637
|
||||
mx_ref | n | t | 1310071 | 100072 | localhost | 57638
|
||||
mx_ref | n | t | 1310072 | 100072 | localhost | 57637
|
||||
mx_ref | n | t | 1310072 | 100073 | localhost | 57638
|
||||
(2 rows)
|
||||
|
||||
|
||||
|
@ -1378,7 +1396,7 @@ FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
|||
WHERE logicalrelid='mx_ref'::regclass;
|
||||
shardid | nodename | nodeport
|
||||
---------+-----------+----------
|
||||
1310072 | localhost | 57637
|
||||
1310073 | localhost | 57637
|
||||
(1 row)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
|
@ -1387,7 +1405,7 @@ FROM pg_dist_shard NATURAL JOIN pg_dist_shard_placement
|
|||
WHERE logicalrelid='mx_ref'::regclass;
|
||||
shardid | nodename | nodeport
|
||||
---------+-----------+----------
|
||||
1310072 | localhost | 57637
|
||||
1310073 | localhost | 57637
|
||||
(1 row)
|
||||
|
||||
\c - - - :master_port
|
||||
|
@ -1404,8 +1422,8 @@ WHERE logicalrelid='mx_ref'::regclass
|
|||
ORDER BY shardid, nodeport;
|
||||
shardid | nodename | nodeport
|
||||
---------+-----------+----------
|
||||
1310072 | localhost | 57637
|
||||
1310072 | localhost | 57638
|
||||
1310073 | localhost | 57637
|
||||
1310073 | localhost | 57638
|
||||
(2 rows)
|
||||
|
||||
\c - - - :worker_1_port
|
||||
|
@ -1415,8 +1433,8 @@ WHERE logicalrelid='mx_ref'::regclass
|
|||
ORDER BY shardid, nodeport;
|
||||
shardid | nodename | nodeport
|
||||
---------+-----------+----------
|
||||
1310072 | localhost | 57637
|
||||
1310072 | localhost | 57638
|
||||
1310073 | localhost | 57637
|
||||
1310073 | localhost | 57638
|
||||
(2 rows)
|
||||
|
||||
-- Get the metadata back into a consistent state
|
||||
|
|
|
@ -423,7 +423,7 @@ SELECT logicalrelid, repmodel FROM pg_dist_partition WHERE logicalrelid = 'mx_te
|
|||
|
||||
DROP TABLE mx_temp_drop_test;
|
||||
|
||||
-- Check that MX tables can be created with SERIAL columns, but error out on metadata sync
|
||||
-- Check that MX tables can be created with SERIAL columns
|
||||
\c - - - :master_port
|
||||
SET citus.shard_count TO 3;
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
|
@ -432,18 +432,23 @@ SET citus.replication_model TO 'streaming';
|
|||
SELECT stop_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
SELECT stop_metadata_sync_to_node('localhost', :worker_2_port);
|
||||
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL);
|
||||
-- sync table with serial column after create_distributed_table
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL, c SMALLSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
||||
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
DROP TABLE mx_table_with_small_sequence;
|
||||
SELECT start_metadata_sync_to_node('localhost', :worker_1_port);
|
||||
|
||||
-- Show that create_distributed_table errors out if the table has a SERIAL column and
|
||||
-- there are metadata workers
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL);
|
||||
-- Show that create_distributed_table works with a serial column
|
||||
CREATE TABLE mx_table_with_small_sequence(a int, b SERIAL, c SMALLSERIAL);
|
||||
SELECT create_distributed_table('mx_table_with_small_sequence', 'a');
|
||||
DROP TABLE mx_table_with_small_sequence;
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (0);
|
||||
|
||||
\c - - - :worker_1_port
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (1), (3);
|
||||
|
||||
\c - - - :master_port
|
||||
SET citus.shard_replication_factor TO 1;
|
||||
SET citus.replication_model TO 'streaming';
|
||||
|
||||
-- Create an MX table with (BIGSERIAL) sequences
|
||||
CREATE TABLE mx_table_with_sequence(a int, b BIGSERIAL, c BIGSERIAL);
|
||||
|
@ -474,9 +479,16 @@ SELECT "Column", "Type", "Modifiers" FROM table_desc WHERE relid='mx_table_with_
|
|||
SELECT nextval('mx_table_with_sequence_b_seq');
|
||||
SELECT nextval('mx_table_with_sequence_c_seq');
|
||||
|
||||
INSERT INTO mx_table_with_small_sequence VALUES (2), (4);
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
\c - - - :master_port
|
||||
DROP TABLE mx_table_with_sequence;
|
||||
|
||||
-- check our small sequence values
|
||||
SELECT a, b, c FROM mx_table_with_small_sequence ORDER BY a,b,c;
|
||||
|
||||
-- Check that dropping the mx table with sequences works as expected
|
||||
DROP TABLE mx_table_with_small_sequence, mx_table_with_sequence;
|
||||
\d mx_table_with_sequence
|
||||
\ds mx_table_with_sequence_b_seq
|
||||
\ds mx_table_with_sequence_c_seq
|
||||
|
|
Loading…
Reference in New Issue