Merge pull request #1264 from citusdata/bugfix/1251_remove_default_value

Remove default clause from shard DDL when sequences are used
pull/1271/head
Murat Tuncer 2017-03-01 21:03:40 +02:00 committed by GitHub
commit 0a58a7b2c1
12 changed files with 103 additions and 28 deletions

View File

@ -97,6 +97,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
uint64 hashTokenIncrement = 0; uint64 hashTokenIncrement = 0;
List *existingShardList = NIL; List *existingShardList = NIL;
int64 shardIndex = 0; int64 shardIndex = 0;
bool includeSequenceDefaults = false;
DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId); DistTableCacheEntry *cacheEntry = DistributedTableCacheEntry(distributedTableId);
/* make sure table is hash partitioned */ /* make sure table is hash partitioned */
@ -165,7 +166,7 @@ CreateShardsWithRoundRobinPolicy(Oid distributedTableId, int32 shardCount,
HOLD_INTERRUPTS(); HOLD_INTERRUPTS();
/* retrieve the DDL commands for the table */ /* retrieve the DDL commands for the table */
ddlCommandList = GetTableDDLEvents(distributedTableId); ddlCommandList = GetTableDDLEvents(distributedTableId, includeSequenceDefaults);
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);
if (replicationFactor > workerNodeCount) if (replicationFactor > workerNodeCount)
@ -247,6 +248,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
List *targetTableDDLEvents = NIL; List *targetTableDDLEvents = NIL;
List *targetTableForeignConstraintCommands = NIL; List *targetTableForeignConstraintCommands = NIL;
ListCell *sourceShardCell = NULL; ListCell *sourceShardCell = NULL;
bool includeSequenceDefaults = false;
/* make sure that tables are hash partitioned */ /* make sure that tables are hash partitioned */
CheckHashPartitionedTable(targetRelationId); CheckHashPartitionedTable(targetRelationId);
@ -281,7 +283,7 @@ CreateColocatedShards(Oid targetRelationId, Oid sourceRelationId)
} }
targetTableRelationOwner = TableOwner(targetRelationId); targetTableRelationOwner = TableOwner(targetRelationId);
targetTableDDLEvents = GetTableDDLEvents(targetRelationId); targetTableDDLEvents = GetTableDDLEvents(targetRelationId, includeSequenceDefaults);
targetTableForeignConstraintCommands = GetTableForeignConstraintCommands( targetTableForeignConstraintCommands = GetTableForeignConstraintCommands(
targetRelationId); targetRelationId);
targetShardStorageType = ShardStorageType(targetRelationId); targetShardStorageType = ShardStorageType(targetRelationId);
@ -355,6 +357,7 @@ CreateReferenceTableShard(Oid distributedTableId)
int replicationFactor = 0; int replicationFactor = 0;
text *shardMinValue = NULL; text *shardMinValue = NULL;
text *shardMaxValue = NULL; text *shardMaxValue = NULL;
bool includeSequenceDefaults = false;
/* /*
* In contrast to append/range partitioned tables it makes more sense to * In contrast to append/range partitioned tables it makes more sense to
@ -390,7 +393,7 @@ CreateReferenceTableShard(Oid distributedTableId)
shardId = GetNextShardId(); shardId = GetNextShardId();
/* retrieve the DDL commands for the table */ /* retrieve the DDL commands for the table */
ddlCommandList = GetTableDDLEvents(distributedTableId); ddlCommandList = GetTableDDLEvents(distributedTableId, includeSequenceDefaults);
/* set the replication factor equal to the number of worker nodes */ /* set the replication factor equal to the number of worker nodes */
workerNodeCount = list_length(workerNodeList); workerNodeCount = list_length(workerNodeList);

View File

@ -206,6 +206,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
{ {
text *relationName = PG_GETARG_TEXT_P(0); text *relationName = PG_GETARG_TEXT_P(0);
Oid relationId = ResolveRelationId(relationName); Oid relationId = ResolveRelationId(relationName);
bool includeSequenceDefaults = true;
MemoryContext oldContext = NULL; MemoryContext oldContext = NULL;
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
@ -217,7 +218,7 @@ master_get_table_ddl_events(PG_FUNCTION_ARGS)
oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx); oldContext = MemoryContextSwitchTo(functionContext->multi_call_memory_ctx);
/* allocate DDL statements, and then save position in DDL statements */ /* allocate DDL statements, and then save position in DDL statements */
tableDDLEventList = GetTableDDLEvents(relationId); tableDDLEventList = GetTableDDLEvents(relationId, includeSequenceDefaults);
tableDDLEventCell = list_head(tableDDLEventList); tableDDLEventCell = list_head(tableDDLEventList);
functionContext->user_fctx = tableDDLEventCell; functionContext->user_fctx = tableDDLEventCell;
@ -628,13 +629,16 @@ ResolveRelationId(text *relationName)
/* /*
* GetTableDDLEvents takes in a relationId, and returns the list of DDL commands * GetTableDDLEvents takes in a relationId, includeSequenceDefaults flag,
* needed to reconstruct the relation. These DDL commands are all palloced; and * and returns the list of DDL commands needed to reconstruct the relation.
* include the table's schema definition, optional column storage and statistics * When the flag includeSequenceDefaults is set, the function also creates
* definitions, and index and constraint definitions. * DEFAULT clauses for columns getting their default values from a sequence.
* These DDL commands are all palloced; and include the table's schema
* definition, optional column storage and statistics definitions, and index
* and constraint definitions.
*/ */
List * List *
GetTableDDLEvents(Oid relationId) GetTableDDLEvents(Oid relationId, bool includeSequenceDefaults)
{ {
List *tableDDLEventList = NIL; List *tableDDLEventList = NIL;
char tableType = 0; char tableType = 0;
@ -693,7 +697,7 @@ GetTableDDLEvents(Oid relationId)
} }
/* fetch table schema and column option definitions */ /* fetch table schema and column option definitions */
tableSchemaDef = pg_get_tableschemadef_string(relationId); tableSchemaDef = pg_get_tableschemadef_string(relationId, includeSequenceDefaults);
tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId); tableColumnOptionsDef = pg_get_tablecolumnoptionsdef_string(relationId);
tableDDLEventList = lappend(tableDDLEventList, tableSchemaDef); tableDDLEventList = lappend(tableDDLEventList, tableSchemaDef);

View File

@ -385,6 +385,7 @@ RecreateTableDDLCommandList(Oid relationId)
List *dropCommandList = NIL; List *dropCommandList = NIL;
List *recreateCommandList = NIL; List *recreateCommandList = NIL;
char relationKind = get_rel_relkind(relationId); char relationKind = get_rel_relkind(relationId);
bool includeSequenceDefaults = false;
/* build appropriate DROP command based on relation kind */ /* build appropriate DROP command based on relation kind */
if (relationKind == RELKIND_RELATION) if (relationKind == RELKIND_RELATION)
@ -405,7 +406,7 @@ RecreateTableDDLCommandList(Oid relationId)
dropCommandList = list_make1(dropCommand->data); dropCommandList = list_make1(dropCommand->data);
createCommandList = GetTableDDLEvents(relationId); createCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
recreateCommandList = list_concat(dropCommandList, createCommandList); recreateCommandList = list_concat(dropCommandList, createCommandList);

View File

@ -88,6 +88,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
char relationKind = get_rel_relkind(relationId); char relationKind = get_rel_relkind(relationId);
char *relationOwner = TableOwner(relationId); char *relationOwner = TableOwner(relationId);
char replicationModel = REPLICATION_MODEL_INVALID; char replicationModel = REPLICATION_MODEL_INVALID;
bool includeSequenceDefaults = false;
EnsureTablePermissions(relationId, ACL_INSERT); EnsureTablePermissions(relationId, ACL_INSERT);
CheckDistributedTable(relationId); CheckDistributedTable(relationId);
@ -136,7 +137,7 @@ master_create_empty_shard(PG_FUNCTION_ARGS)
shardId = GetNextShardId(); shardId = GetNextShardId();
/* get table DDL commands to replay on the worker node */ /* get table DDL commands to replay on the worker node */
ddlEventList = GetTableDDLEvents(relationId); ddlEventList = GetTableDDLEvents(relationId, includeSequenceDefaults);
/* if enough live nodes, add an extra candidate node as backup */ /* if enough live nodes, add an extra candidate node as backup */
attemptableNodeCount = ShardReplicationFactor; attemptableNodeCount = ShardReplicationFactor;

View File

@ -209,6 +209,7 @@ MetadataCreateCommands(void)
List *workerNodeList = WorkerNodeList(); List *workerNodeList = WorkerNodeList();
ListCell *distributedTableCell = NULL; ListCell *distributedTableCell = NULL;
char *nodeListInsertCommand = NULL; char *nodeListInsertCommand = NULL;
bool includeSequenceDefaults = true;
/* generate insert command for pg_dist_node table */ /* generate insert command for pg_dist_node table */
nodeListInsertCommand = NodeListInsertCommand(workerNodeList); nodeListInsertCommand = NodeListInsertCommand(workerNodeList);
@ -234,7 +235,7 @@ MetadataCreateCommands(void)
Oid relationId = cacheEntry->relationId; Oid relationId = cacheEntry->relationId;
List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId); List *workerSequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
List *ddlCommandList = GetTableDDLEvents(relationId); List *ddlCommandList = GetTableDDLEvents(relationId, includeSequenceDefaults);
char *tableOwnerResetCommand = TableOwnerResetCommand(relationId); char *tableOwnerResetCommand = TableOwnerResetCommand(relationId);
metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList, metadataSnapshotCommandList = list_concat(metadataSnapshotCommandList,
@ -312,13 +313,14 @@ GetDistributedTableDDLEvents(Oid relationId)
char *tableOwnerResetCommand = NULL; char *tableOwnerResetCommand = NULL;
char *metadataCommand = NULL; char *metadataCommand = NULL;
char *truncateTriggerCreateCommand = NULL; char *truncateTriggerCreateCommand = NULL;
bool includeSequenceDefaults = true;
/* commands to create sequences */ /* commands to create sequences */
sequenceDDLCommands = SequenceDDLCommandsForTable(relationId); sequenceDDLCommands = SequenceDDLCommandsForTable(relationId);
commandList = list_concat(commandList, sequenceDDLCommands); commandList = list_concat(commandList, sequenceDDLCommands);
/* commands to create the table */ /* commands to create the table */
tableDDLCommands = GetTableDDLEvents(relationId); tableDDLCommands = GetTableDDLEvents(relationId, includeSequenceDefaults);
commandList = list_concat(commandList, tableDDLCommands); commandList = list_concat(commandList, tableDDLCommands);
/* command to reset the table owner */ /* command to reset the table owner */

View File

@ -43,7 +43,8 @@ table_ddl_command_array(PG_FUNCTION_ARGS)
{ {
Oid distributedTableId = PG_GETARG_OID(0); Oid distributedTableId = PG_GETARG_OID(0);
ArrayType *ddlCommandArrayType = NULL; ArrayType *ddlCommandArrayType = NULL;
List *ddlCommandList = GetTableDDLEvents(distributedTableId); bool includeSequenceDefaults = true;
List *ddlCommandList = GetTableDDLEvents(distributedTableId, includeSequenceDefaults);
int ddlCommandCount = list_length(ddlCommandList); int ddlCommandCount = list_length(ddlCommandList);
Datum *ddlCommandDatumArray = palloc0(ddlCommandCount * sizeof(Datum)); Datum *ddlCommandDatumArray = palloc0(ddlCommandCount * sizeof(Datum));

View File

@ -36,6 +36,7 @@
#include "foreign/foreign.h" #include "foreign/foreign.h"
#include "lib/stringinfo.h" #include "lib/stringinfo.h"
#include "nodes/nodes.h" #include "nodes/nodes.h"
#include "nodes/nodeFuncs.h"
#include "nodes/parsenodes.h" #include "nodes/parsenodes.h"
#include "nodes/pg_list.h" #include "nodes/pg_list.h"
#include "storage/lock.h" #include "storage/lock.h"
@ -55,6 +56,7 @@
static void AppendOptionListToString(StringInfo stringData, List *options); static void AppendOptionListToString(StringInfo stringData, List *options);
static const char * convert_aclright_to_string(int aclright); static const char * convert_aclright_to_string(int aclright);
static bool contain_nextval_expression_walker(Node *node, void *context);
/* /*
* pg_get_extensiondef_string finds the foreign data wrapper that corresponds to * pg_get_extensiondef_string finds the foreign data wrapper that corresponds to
@ -246,9 +248,11 @@ pg_get_sequencedef(Oid sequenceRelationId)
* definition includes table's schema, default column values, not null and check * definition includes table's schema, default column values, not null and check
* constraints. The definition does not include constraints that trigger index * constraints. The definition does not include constraints that trigger index
* creations; specifically, unique and primary key constraints are excluded. * creations; specifically, unique and primary key constraints are excluded.
* When the flag includeSequenceDefaults is set, the function also creates
* DEFAULT clauses for columns getting their default values from a sequence.
*/ */
char * char *
pg_get_tableschemadef_string(Oid tableRelationId) pg_get_tableschemadef_string(Oid tableRelationId, bool includeSequenceDefaults)
{ {
Relation relation = NULL; Relation relation = NULL;
char *relationName = NULL; char *relationName = NULL;
@ -343,13 +347,23 @@ pg_get_tableschemadef_string(Oid tableRelationId)
/* convert expression to node tree, and prepare deparse context */ /* convert expression to node tree, and prepare deparse context */
defaultNode = (Node *) stringToNode(defaultValue->adbin); defaultNode = (Node *) stringToNode(defaultValue->adbin);
defaultContext = deparse_context_for(relationName, tableRelationId);
/* deparse default value string */ /*
defaultString = deparse_expression(defaultNode, defaultContext, * if column default value is explicitly requested, or it is
false, false); * not set from a sequence then we include DEFAULT clause for
* this column.
*/
if (includeSequenceDefaults ||
!contain_nextval_expression_walker(defaultNode, NULL))
{
defaultContext = deparse_context_for(relationName, tableRelationId);
appendStringInfo(&buffer, " DEFAULT %s", defaultString); /* deparse default value string */
defaultString = deparse_expression(defaultNode, defaultContext,
false, false);
appendStringInfo(&buffer, " DEFAULT %s", defaultString);
}
} }
/* if this column has a not null constraint, append the constraint */ /* if this column has a not null constraint, append the constraint */
@ -861,3 +875,28 @@ convert_aclright_to_string(int aclright)
} }
/* *INDENT-ON* */ /* *INDENT-ON* */
} }
/*
* contain_nextval_expression_walker walks over expression tree and returns
* true if it contains call to 'nextval' function.
*/
static bool
contain_nextval_expression_walker(Node *node, void *context)
{
if (node == NULL)
{
return false;
}
if (IsA(node, FuncExpr))
{
FuncExpr *funcExpr = (FuncExpr *) node;
if (funcExpr->funcid == F_NEXTVAL_OID)
{
return true;
}
}
return expression_tree_walker(node, contain_nextval_expression_walker, context);
}

View File

@ -30,7 +30,7 @@ extern Oid get_extension_schema(Oid ext_oid);
extern char * pg_get_serverdef_string(Oid tableRelationId); extern char * pg_get_serverdef_string(Oid tableRelationId);
extern char * pg_get_sequencedef_string(Oid sequenceRelid); extern char * pg_get_sequencedef_string(Oid sequenceRelid);
extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId); extern Form_pg_sequence pg_get_sequencedef(Oid sequenceRelationId);
extern char * pg_get_tableschemadef_string(Oid tableRelationId); extern char * pg_get_tableschemadef_string(Oid tableRelationId, bool forShardCreation);
extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId); extern char * pg_get_tablecolumnoptionsdef_string(Oid tableRelationId);
extern char * pg_get_indexclusterdef_string(Oid indexRelationId); extern char * pg_get_indexclusterdef_string(Oid indexRelationId);
extern List * pg_get_table_grants(Oid relationId); extern List * pg_get_table_grants(Oid relationId);

View File

@ -100,7 +100,7 @@ extern bool CStoreTable(Oid relationId);
extern uint64 GetNextShardId(void); extern uint64 GetNextShardId(void);
extern uint64 GetNextPlacementId(void); extern uint64 GetNextPlacementId(void);
extern Oid ResolveRelationId(text *relationName); extern Oid ResolveRelationId(text *relationName);
extern List * GetTableDDLEvents(Oid relationId); extern List * GetTableDDLEvents(Oid relationId, bool forShardCreation);
extern List * GetTableForeignConstraintCommands(Oid relationId); extern List * GetTableForeignConstraintCommands(Oid relationId);
extern char ShardStorageType(Oid relationId); extern char ShardStorageType(Oid relationId);
extern void CheckDistributedTable(Oid relationId); extern void CheckDistributedTable(Oid relationId);

View File

@ -401,11 +401,7 @@ BEGIN;
col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass) col_3 | bigint | not null default nextval('mx_table_col_3_seq'::regclass)
DROP SEQUENCE mx_table_col_3_seq CASCADE; DROP SEQUENCE mx_table_col_3_seq CASCADE;
NOTICE: drop cascades to 4 other objects NOTICE: drop cascades to default for table mx_table column col_3
DETAIL: drop cascades to default for table mx_table_1270000 column col_3
drop cascades to default for table mx_table_1270002 column col_3
drop cascades to default for table mx_table_1270004 column col_3
drop cascades to default for table mx_table column col_3
\d mx_table \d mx_table
Table "public.mx_table" Table "public.mx_table"
Column | Type | Modifiers Column | Type | Modifiers

View File

@ -369,6 +369,18 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename = 'lineitem_alter';
SELECT indexname, tablename FROM pg_indexes WHERE tablename like 'lineitem_alter_%'; SELECT indexname, tablename FROM pg_indexes WHERE tablename like 'lineitem_alter_%';
\c - - - :master_port \c - - - :master_port
-- verify alter table and drop sequence in the same transaction does not cause deadlock
CREATE TABLE sequence_deadlock_test (a serial, b serial);
SELECT create_distributed_table('sequence_deadlock_test', 'a');
BEGIN;
ALTER TABLE sequence_deadlock_test ADD COLUMN c int;
DROP SEQUENCE sequence_deadlock_test_b_seq CASCADE;
END;
DROP TABLE sequence_deadlock_test;
-- test ALTER TABLE ALL IN TABLESPACE -- test ALTER TABLE ALL IN TABLESPACE
-- we expect that it will warn out -- we expect that it will warn out
CREATE TABLESPACE super_fast_ssd LOCATION '@abs_srcdir@/data'; CREATE TABLESPACE super_fast_ssd LOCATION '@abs_srcdir@/data';

View File

@ -764,6 +764,22 @@ SELECT indexname, tablename FROM pg_indexes WHERE tablename like 'lineitem_alte
(0 rows) (0 rows)
\c - - - :master_port \c - - - :master_port
-- verify alter table and drop sequence in the same transaction does not cause deadlock
CREATE TABLE sequence_deadlock_test (a serial, b serial);
SELECT create_distributed_table('sequence_deadlock_test', 'a');
create_distributed_table
--------------------------
(1 row)
BEGIN;
ALTER TABLE sequence_deadlock_test ADD COLUMN c int;
NOTICE: using one-phase commit for distributed DDL commands
HINT: You can enable two-phase commit for extra safety with: SET citus.multi_shard_commit_protocol TO '2pc'
DROP SEQUENCE sequence_deadlock_test_b_seq CASCADE;
NOTICE: drop cascades to default for table sequence_deadlock_test column b
END;
DROP TABLE sequence_deadlock_test;
-- test ALTER TABLE ALL IN TABLESPACE -- test ALTER TABLE ALL IN TABLESPACE
-- we expect that it will warn out -- we expect that it will warn out
CREATE TABLESPACE super_fast_ssd LOCATION '@abs_srcdir@/data'; CREATE TABLESPACE super_fast_ssd LOCATION '@abs_srcdir@/data';