Add shard transfer mode parameter to shard copy functions

pull/1743/head
Marco Slot 2017-10-30 19:29:46 +01:00
parent bc3bdeaac8
commit 7e34348334
6 changed files with 111 additions and 2 deletions

View File

@ -12,7 +12,7 @@ EXTVERSIONS = 5.0 5.0-1 5.0-2 \
6.1-1 6.1-2 6.1-3 6.1-4 6.1-5 6.1-6 6.1-7 6.1-8 6.1-9 6.1-10 6.1-11 6.1-12 6.1-13 6.1-14 6.1-15 6.1-16 6.1-17 \
6.2-1 6.2-2 6.2-3 6.2-4 \
7.0-1 7.0-2 7.0-3 7.0-4 7.0-5 7.0-6 7.0-7 7.0-8 7.0-9 7.0-10 7.0-11 7.0-12 7.0-13 7.0-14 7.0-15 \
7.1-1 7.1-2 7.1-3
7.1-1 7.1-2 7.1-3 7.1-4
# All citus--*.sql files in the source directory
DATA = $(patsubst $(citus_abs_srcdir)/%.sql,%.sql,$(wildcard $(citus_abs_srcdir)/$(EXTENSION)--*--*.sql))
@ -176,6 +176,8 @@ $(EXTENSION)--7.1-2.sql: $(EXTENSION)--7.1-1.sql $(EXTENSION)--7.1-1--7.1-2.sql
cat $^ > $@
$(EXTENSION)--7.1-3.sql: $(EXTENSION)--7.1-2.sql $(EXTENSION)--7.1-2--7.1-3.sql
cat $^ > $@
$(EXTENSION)--7.1-4.sql: $(EXTENSION)--7.1-3.sql $(EXTENSION)--7.1-3--7.1-4.sql
cat $^ > $@
NO_PGXS = 1

View File

@ -0,0 +1,54 @@
/* citus--7.1-3--7.1-4 */
CREATE TYPE citus.shard_transfer_mode AS ENUM (
'auto',
'force_logical',
'block_writes'
);
SET search_path = 'pg_catalog';
DROP FUNCTION master_move_shard_placement(bigint, text, integer, text, integer);
DROP FUNCTION master_copy_shard_placement(bigint, text, integer, text, integer, bool);
CREATE OR REPLACE FUNCTION master_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_move_shard_placement$$;
COMMENT ON FUNCTION master_move_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
shard_transfer_mode citus.shard_transfer_mode)
IS 'move a shard from a the source node to the destination node';
CREATE FUNCTION master_copy_shard_placement(
shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
do_repair bool DEFAULT true,
transfer_mode citus.shard_transfer_mode default 'auto')
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$master_copy_shard_placement$$;
COMMENT ON FUNCTION master_copy_shard_placement(shard_id bigint,
source_node_name text,
source_node_port integer,
target_node_name text,
target_node_port integer,
do_repair bool,
shard_transfer_mode citus.shard_transfer_mode)
IS 'copy a shard from the source node to the destination node';
RESET search_path;

View File

@ -1,6 +1,6 @@
# Citus extension
comment = 'Citus distributed database'
default_version = '7.1-3'
default_version = '7.1-4'
module_pathname = '$libdir/citus'
relocatable = false
schema = pg_catalog

View File

@ -38,7 +38,13 @@
#include "utils/palloc.h"
#define TRANSFER_MODE_AUTOMATIC 'a'
#define TRANSFER_MODE_FORCE_LOGICAL 'l'
#define TRANSFER_MODE_BLOCK_WRITES 'b'
/* local function forward declarations */
static char LookupShardTransferMode(Oid shardReplicationModeOid);
static void RepairShardPlacement(int64 shardId, char *sourceNodeName,
int32 sourceNodePort, char *targetNodeName,
int32 targetNodePort);
@ -72,6 +78,8 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
text *targetNodeNameText = PG_GETARG_TEXT_P(3);
int32 targetNodePort = PG_GETARG_INT32(4);
bool doRepair = PG_GETARG_BOOL(5);
Oid shardReplicationModeOid = PG_GETARG_OID(6);
char shardReplicationMode = LookupShardTransferMode(shardReplicationModeOid);
char *sourceNodeName = text_to_cstring(sourceNodeNameText);
char *targetNodeName = text_to_cstring(targetNodeNameText);
@ -83,6 +91,12 @@ master_copy_shard_placement(PG_FUNCTION_ARGS)
"with do not repair functionality "
"is only supported on Citus Enterprise")));
}
else if (shardReplicationMode == TRANSFER_MODE_FORCE_LOGICAL)
{
ereport(ERROR, (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("using logical replication with repair functionality "
"is currently not supported")));
}
EnsureCoordinator();
CheckCitusVersion(ERROR);
@ -108,6 +122,39 @@ master_move_shard_placement(PG_FUNCTION_ARGS)
}
/*
* LookupShardTransferMode maps the oids of citus.shard_transfer_mode enum
* values to a char.
*/
static char
LookupShardTransferMode(Oid shardReplicationModeOid)
{
char shardReplicationMode = 0;
Datum enumLabelDatum = DirectFunctionCall1(enum_out, shardReplicationModeOid);
char *enumLabel = DatumGetCString(enumLabelDatum);
if (strncmp(enumLabel, "auto", NAMEDATALEN) == 0)
{
shardReplicationMode = TRANSFER_MODE_AUTOMATIC;
}
else if (strncmp(enumLabel, "force_logical", NAMEDATALEN) == 0)
{
shardReplicationMode = TRANSFER_MODE_FORCE_LOGICAL;
}
else if (strncmp(enumLabel, "block_writes", NAMEDATALEN) == 0)
{
shardReplicationMode = TRANSFER_MODE_BLOCK_WRITES;
}
else
{
ereport(ERROR, (errmsg("invalid label for enum: %s", enumLabel)));
}
return shardReplicationMode;
}
/*
* RepairShardPlacement repairs given shard from a source node to target node.
* This function is not co-location aware. It only repairs given shard.

View File

@ -126,6 +126,9 @@ ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
ALTER EXTENSION citus UPDATE TO '7.0-15';
ALTER EXTENSION citus UPDATE TO '7.1-1';
ALTER EXTENSION citus UPDATE TO '7.1-2';
ALTER EXTENSION citus UPDATE TO '7.1-3';
ALTER EXTENSION citus UPDATE TO '7.1-4';
-- show running version
SHOW citus.version;
citus.version

View File

@ -126,6 +126,9 @@ ALTER EXTENSION citus UPDATE TO '7.0-13';
ALTER EXTENSION citus UPDATE TO '7.0-14';
ALTER EXTENSION citus UPDATE TO '7.0-15';
ALTER EXTENSION citus UPDATE TO '7.1-1';
ALTER EXTENSION citus UPDATE TO '7.1-2';
ALTER EXTENSION citus UPDATE TO '7.1-3';
ALTER EXTENSION citus UPDATE TO '7.1-4';
-- show running version
SHOW citus.version;