mirror of https://github.com/citusdata/citus.git
Hide shards from CDC subscriptions
parent
b09d239809
commit
8ad444f8ef
|
@ -88,6 +88,8 @@ static const char *replicationSlotPrefix[] = {
|
||||||
* IMPORTANT: All the subscription names should start with "citus_". Otherwise
|
* IMPORTANT: All the subscription names should start with "citus_". Otherwise
|
||||||
* our utility hook does not defend against non-superusers altering or dropping
|
* our utility hook does not defend against non-superusers altering or dropping
|
||||||
* them, which is important for security purposes.
|
* them, which is important for security purposes.
|
||||||
|
*
|
||||||
|
* We should also keep these in sync with IsCitusShardTransferBackend().
|
||||||
*/
|
*/
|
||||||
static const char *subscriptionPrefix[] = {
|
static const char *subscriptionPrefix[] = {
|
||||||
[SHARD_MOVE] = "citus_shard_move_subscription_",
|
[SHARD_MOVE] = "citus_shard_move_subscription_",
|
||||||
|
|
|
@ -1452,6 +1452,21 @@ IsExternalClientBackend(void)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* IsRebalancerInitiatedBackend returns true if we are in a backend that citus
|
||||||
|
* rebalancer initiated.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
IsCitusShardTransferBackend(void)
|
||||||
|
{
|
||||||
|
int prefixLength = strlen(CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX);
|
||||||
|
|
||||||
|
return strncmp(application_name,
|
||||||
|
CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX,
|
||||||
|
prefixLength) == 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
* DetermineCitusBackendType determines the type of backend based on the application_name.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -351,18 +351,17 @@ ShouldHideShardsInternal(void)
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else if (MyBackendType != B_BACKEND)
|
else if (MyBackendType != B_BACKEND && MyBackendType != B_WAL_SENDER)
|
||||||
{
|
{
|
||||||
/*
|
/*
|
||||||
* We are aiming only to hide shards from client
|
* We are aiming only to hide shards from client
|
||||||
* backends or certain background workers(see above),
|
* backends or certain background workers(see above),
|
||||||
* not backends like walsender or checkpointer.
|
|
||||||
*/
|
*/
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (IsCitusInternalBackend() || IsRebalancerInternalBackend() ||
|
if (IsCitusInternalBackend() || IsRebalancerInternalBackend() ||
|
||||||
IsCitusRunCommandBackend())
|
IsCitusRunCommandBackend() || IsCitusShardTransferBackend())
|
||||||
{
|
{
|
||||||
/* we never hide shards from Citus */
|
/* we never hide shards from Citus */
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -77,6 +77,7 @@ extern bool IsCitusInternalBackend(void);
|
||||||
extern bool IsRebalancerInternalBackend(void);
|
extern bool IsRebalancerInternalBackend(void);
|
||||||
extern bool IsCitusRunCommandBackend(void);
|
extern bool IsCitusRunCommandBackend(void);
|
||||||
extern bool IsExternalClientBackend(void);
|
extern bool IsExternalClientBackend(void);
|
||||||
|
extern bool IsCitusShardTransferBackend(void);
|
||||||
|
|
||||||
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
#define INVALID_CITUS_INTERNAL_BACKEND_GPID 0
|
||||||
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
#define GLOBAL_PID_NODE_ID_FOR_NODES_NOT_IN_METADATA 99999999
|
||||||
|
|
|
@ -42,6 +42,14 @@
|
||||||
/* application name used for connections made by run_command_on_* */
|
/* application name used for connections made by run_command_on_* */
|
||||||
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
|
#define CITUS_RUN_COMMAND_APPLICATION_NAME_PREFIX "citus_run_command gpid="
|
||||||
|
|
||||||
|
/*
|
||||||
|
* application name prefix for move/split replication connections.
|
||||||
|
*
|
||||||
|
* This application_name is set to the subscription name by logical replication
|
||||||
|
* workers, so there is no GPID.
|
||||||
|
*/
|
||||||
|
#define CITUS_SHARD_TRANSFER_APPLICATION_NAME_PREFIX "citus_shard_"
|
||||||
|
|
||||||
/* deal with waiteventset errors */
|
/* deal with waiteventset errors */
|
||||||
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
|
#define WAIT_EVENT_SET_INDEX_NOT_INITIALIZED -1
|
||||||
#define WAIT_EVENT_SET_INDEX_FAILED -2
|
#define WAIT_EVENT_SET_INDEX_FAILED -2
|
||||||
|
|
|
@ -444,7 +444,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
|
||||||
(4 rows)
|
(4 rows)
|
||||||
|
|
||||||
-- or, we set it to walsender
|
-- or, we set it to walsender
|
||||||
-- the shards and indexes do show up
|
-- the shards and indexes do not show up
|
||||||
SELECT set_backend_type(9);
|
SELECT set_backend_type(9);
|
||||||
NOTICE: backend type switched to: walsender
|
NOTICE: backend type switched to: walsender
|
||||||
set_backend_type
|
set_backend_type
|
||||||
|
@ -452,6 +452,17 @@ NOTICE: backend type switched to: walsender
|
||||||
|
|
||||||
(1 row)
|
(1 row)
|
||||||
|
|
||||||
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
|
relname
|
||||||
|
---------------------------------------------------------------------
|
||||||
|
test_index
|
||||||
|
test_table
|
||||||
|
test_table_102008
|
||||||
|
test_table_2_1130000
|
||||||
|
(4 rows)
|
||||||
|
|
||||||
|
-- unless the application name starts with citus_shard
|
||||||
|
SET application_name = 'citus_shard_move';
|
||||||
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
relname
|
relname
|
||||||
---------------------------------------------------------------------
|
---------------------------------------------------------------------
|
||||||
|
@ -467,6 +478,7 @@ SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_name
|
||||||
test_table_2_1130000
|
test_table_2_1130000
|
||||||
(10 rows)
|
(10 rows)
|
||||||
|
|
||||||
|
RESET application_name;
|
||||||
-- but, client backends to see the shards
|
-- but, client backends to see the shards
|
||||||
SELECT set_backend_type(3);
|
SELECT set_backend_type(3);
|
||||||
NOTICE: backend type switched to: client backend
|
NOTICE: backend type switched to: client backend
|
||||||
|
|
|
@ -232,10 +232,15 @@ SELECT set_backend_type(4);
|
||||||
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
|
|
||||||
-- or, we set it to walsender
|
-- or, we set it to walsender
|
||||||
-- the shards and indexes do show up
|
-- the shards and indexes do not show up
|
||||||
SELECT set_backend_type(9);
|
SELECT set_backend_type(9);
|
||||||
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
|
|
||||||
|
-- unless the application name starts with citus_shard
|
||||||
|
SET application_name = 'citus_shard_move';
|
||||||
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
|
RESET application_name;
|
||||||
|
|
||||||
-- but, client backends to see the shards
|
-- but, client backends to see the shards
|
||||||
SELECT set_backend_type(3);
|
SELECT set_backend_type(3);
|
||||||
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
SELECT relname FROM pg_catalog.pg_class WHERE relnamespace = 'mx_hide_shard_names'::regnamespace ORDER BY relname;
|
||||||
|
|
Loading…
Reference in New Issue