[Columnar] Check for existence of Citus before creating Citus_Columnar (#6178)

* Added a check to see if Citus has already been loaded before creating citus_columnar

* added tests

Introduce clean up code
users/saawasek/split_cleanup
Ying Xu 2022-08-17 15:12:42 -07:00 committed by Sameer Awasekar
parent 91473635db
commit db0014b27e
7 changed files with 189 additions and 51 deletions

View File

@ -642,6 +642,9 @@ CreateSplitShardsForShardGroup(HTAB *mapOfShardToPlacementCreatedByWorkflow,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
StringInfo insertArtifactCommand = CreateArtifactEntryCommand(0 /*Operation Id*/, SPLIT_CHILD_SHARD, ConstructQualifiedShardName(shardInterval));
splitShardCreationCommandList = lappend(splitShardCreationCommandList, insertArtifactCommand->data);
/* Create new split child shard on the specified placement list */ /* Create new split child shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
@ -921,7 +924,7 @@ static void
CreateObjectOnPlacement(List *objectCreationCommandList, CreateObjectOnPlacement(List *objectCreationCommandList,
WorkerNode *workerPlacementNode) WorkerNode *workerPlacementNode)
{ {
char *currentUser = CurrentUserName(); char *currentUser = CitusExtensionOwnerName();
SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName, SendCommandListToWorkerOutsideTransaction(workerPlacementNode->workerName,
workerPlacementNode->workerPort, workerPlacementNode->workerPort,
currentUser, currentUser,
@ -1018,7 +1021,7 @@ InsertSplitChildrenShardMetadata(List *shardGroupSplitIntervalListList,
List *shardIntervalList = NIL; List *shardIntervalList = NIL;
List *syncedShardList = NIL; List *syncedShardList = NIL;
/* /*
* Iterate over all the shards in the shard group. * Iterate over all the shards in the shard group.
*/ */
foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList) foreach_ptr(shardIntervalList, shardGroupSplitIntervalListList)
@ -1560,6 +1563,9 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
StringInfo insertShardString = CreateArtifactEntryCommand(0 /*OperationId*/, SPLIT_DUMMY_SHARD, ConstructQualifiedShardName(shardInterval));
splitShardCreationCommandList = lappend(splitShardCreationCommandList, insertShardString->data);
/* Create dummy source shard on the specified placement list */ /* Create dummy source shard on the specified placement list */
CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode); CreateObjectOnPlacement(splitShardCreationCommandList, workerPlacementNode);
@ -1595,6 +1601,9 @@ CreateDummyShardsForShardGroup(HTAB *mapOfDummyShardToPlacement,
splitShardCreationCommandList, splitShardCreationCommandList,
shardInterval->shardId); shardInterval->shardId);
StringInfo insertShardString = CreateArtifactEntryCommand(0 /*OperationId*/, SPLIT_DUMMY_SHARD, ConstructQualifiedShardName(shardInterval));
splitShardCreationCommandList = lappend(splitShardCreationCommandList, insertShardString->data);
/* Create dummy split child shard on source worker node */ /* Create dummy split child shard on source worker node */
CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode); CreateObjectOnPlacement(splitShardCreationCommandList, sourceWorkerNode);
@ -1636,6 +1645,44 @@ CreateWorkerForPlacementSet(List *workersForPlacementList)
} }
/*
* CreateTemplateReplicationSlotAndReturnSnapshot creates a replication slot
* and returns its snapshot. This slot acts as a 'Template' for creating
* replication slot copies used for logical replication.
*
* The snapshot remains valid till the lifetime of the session that creates it.
*/
char *
CreateTemplateReplicationSlotAndReturnSnapshot(ShardInterval *shardInterval,
WorkerNode *sourceWorkerNode,
MultiConnection **templateSlotConnection)
{
/*Create Template replication slot */
int connectionFlags = FORCE_NEW_CONNECTION;
connectionFlags |= REQUIRE_REPLICATION_CONNECTION_PARAM;
MultiConnection *sourceConnection = GetNodeUserDatabaseConnection(connectionFlags,
sourceWorkerNode->
workerName,
sourceWorkerNode->
workerPort,
CitusExtensionOwnerName(),
get_database_name(
MyDatabaseId));
ClaimConnectionExclusively(sourceConnection);
/*
* Try to drop leftover template replication slot if any from previous operation
* and create new one.
*/
char *snapShotName = CreateTemplateReplicationSlot(shardInterval,
sourceConnection);
*templateSlotConnection = sourceConnection;
return snapShotName;
}
/* /*
* ExecuteSplitShardReplicationSetupUDF executes * ExecuteSplitShardReplicationSetupUDF executes
* 'worker_split_shard_replication_setup' UDF on source shard node * 'worker_split_shard_replication_setup' UDF on source shard node
@ -1873,11 +1920,26 @@ DropDummyShards(HTAB *mapOfDummyShardToPlacement)
List *dummyShardIntervalList = entry->shardIntervals; List *dummyShardIntervalList = entry->shardIntervals;
ShardInterval *shardInterval = NULL; ShardInterval *shardInterval = NULL;
foreach_ptr(shardInterval, dummyShardIntervalList) foreach_ptr(shardInterval, dummyShardIntervalList)
{ {
DropDummyShard(connection, shardInterval); char *qualifiedShardName = ConstructQualifiedShardName(shardInterval);
} StringInfo dropShardQuery = makeStringInfo();
CloseConnection(connection); /* Caller enforces that foreign tables cannot be split (use DROP_REGULAR_TABLE_COMMAND) */
appendStringInfo(dropShardQuery, DROP_REGULAR_TABLE_COMMAND,
qualifiedShardName);
StringInfo deleteMetadataEntry = CreateDeleteArtifactCommand(SPLIT_DUMMY_SHARD, qualifiedShardName);
List * commandList = NIL;
commandList = lappend(commandList, dropShardQuery->data);
commandList = lappend(commandList, deleteMetadataEntry->data);
char *currentUser = CitusExtensionOwnerName();
SendCommandListToWorkerOutsideTransaction(shardToBeDroppedNode->workerName,
shardToBeDroppedNode->workerPort,
currentUser,
commandList);
}
} }
} }
@ -1990,3 +2052,29 @@ GetNextShardIdForSplitChild()
return shardId; return shardId;
} }
StringInfo CreateArtifactEntryCommand(uint32 operationId, SplitArtifactType splitArtifact, char* artifactName)
{
StringInfo operationIdString = makeStringInfo();
appendStringInfo(operationIdString, "%d", operationId);
StringInfo splitArtifactString = makeStringInfo();
appendStringInfo(splitArtifactString, "%d", splitArtifact);
StringInfo insertArtifactCommand = makeStringInfo();
appendStringInfo(insertArtifactCommand, "INSERT INTO pg_catalog.pg_shard_cleanup values (%s, %s, %s);",
quote_literal_cstr(operationIdString->data),
quote_literal_cstr(splitArtifactString->data),
quote_literal_cstr(artifactName));
return insertArtifactCommand;
}
StringInfo CreateDeleteArtifactCommand(SplitArtifactType splitArtifact, char* artifactName)
{
StringInfo deleteArtifactCommand = makeStringInfo();
appendStringInfo(deleteArtifactCommand, "DELETE FROM pg_catalog.pg_shard_cleanup where object_name=%s", quote_literal_cstr(artifactName));
return deleteArtifactCommand;
}

View File

@ -0,0 +1,31 @@
/*-------------------------------------------------------------------------
*
* worker_cleanup_artifact_udf.c
* This file contains functions to clean up artifacts and metadata.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "miscadmin.h"
#include "postmaster/postmaster.h"
#include "distributed/shardinterval_utils.h"
#include "distributed/shard_utils.h"
#include "distributed/listutils.h"
#include "distributed/remote_commands.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "commands/dbcommands.h"
/* declarations for dynamic loading */
PG_FUNCTION_INFO_V1(worker_cleanup_artifacts);
Datum
worker_cleanup_artifacts(PG_FUNCTION_ARGS)
{
char *first = PG_GETARG_CSTRING(0);
printf("foobar %s\n", first);
PG_RETURN_VOID();
}

View File

@ -23,3 +23,21 @@ COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardIn
IS 'Replication setup for splitting a shard'; IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC;
--- Todo(saawasek): Will change the location later by introducing new file
CREATE TABLE citus.pg_shard_cleanup(
workflow_id text,
object_type text,
object_name text
);
ALTER TABLE citus.pg_shard_cleanup SET SCHEMA pg_catalog;
CREATE OR REPLACE FUNCTION pg_catalog.worker_cleanup_artifacts(
workflow_id text,
operation_name text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_cleanup_artifacts$$;
COMMENT ON FUNCTION pg_catalog.worker_cleanup_artifacts(workflow_id text, operation_name text)
IS 'UDF to clean up artifacts';
REVOKE ALL ON FUNCTION pg_catalog.worker_cleanup_artifacts(workflow_id text, operation_name text) FROM PUBLIC;

View File

@ -23,3 +23,20 @@ COMMENT ON FUNCTION pg_catalog.worker_split_shard_replication_setup(splitShardIn
IS 'Replication setup for splitting a shard'; IS 'Replication setup for splitting a shard';
REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC; REVOKE ALL ON FUNCTION pg_catalog.worker_split_shard_replication_setup(pg_catalog.split_shard_info[]) FROM PUBLIC;
CREATE TABLE citus.pg_shard_cleanup(
workflow_id text,
object_type text,
object_name text
);
ALTER TABLE citus.pg_shard_cleanup SET SCHEMA pg_catalog;
CREATE OR REPLACE FUNCTION pg_catalog.worker_cleanup_artifacts(
workflow_id text,
operation_name text)
RETURNS void
LANGUAGE C STRICT
AS 'MODULE_PATHNAME', $$worker_cleanup_artifacts$$;
COMMENT ON FUNCTION pg_catalog.worker_cleanup_artifacts(workflow_id text, operation_name text)
IS 'UDF to clean up artifacts';
REVOKE ALL ON FUNCTION pg_catalog.worker_cleanup_artifacts(workflow_id text, operation_name text) FROM PUBLIC;

View File

@ -31,6 +31,15 @@ typedef enum SplitOperation
ISOLATE_TENANT_TO_NEW_SHARD ISOLATE_TENANT_TO_NEW_SHARD
} SplitOperation; } SplitOperation;
typedef enum SplitArtifactType
{
SPLIT_CHILD_SHARD = 0,
SPLIT_DUMMY_SHARD = 1,
SPLIT_PUBLICATION = 2,
SPLIT_SUBSCRIPTION = 3,
SPLIT_REPLICATION_SLOT = 4
} SplitArtifactType;
/* /*
* SplitShard API to split a given shard (or shard group) using split mode and * SplitShard API to split a given shard (or shard group) using split mode and
@ -45,5 +54,6 @@ extern void SplitShard(SplitMode splitMode,
extern void DropShardList(List *shardIntervalList); extern void DropShardList(List *shardIntervalList);
extern SplitMode LookupSplitMode(Oid shardTransferModeOid); extern SplitMode LookupSplitMode(Oid shardTransferModeOid);
extern StringInfo CreateArtifactEntryCommand(uint32 operationId, SplitArtifactType splitArtifact, char* artifactName);
extern StringInfo CreateDeleteArtifactCommand(SplitArtifactType splitArtifact, char* artifactName);
#endif /* SHARDSPLIT_H_ */ #endif /* SHARDSPLIT_H_ */

View File

@ -7,19 +7,19 @@ test: tablespace
# Helpers for foreign key catalogs. # Helpers for foreign key catalogs.
test: foreign_key_to_reference_table test: foreign_key_to_reference_table
# Split tests go here. # Split tests go here.
test: split_shard_replication_setup #test: split_shard_replication_setup
test: split_shard_replication_setup_remote_local #test: split_shard_replication_setup_remote_local
test: split_shard_replication_setup_local #test: split_shard_replication_setup_local
test: split_shard_replication_colocated_setup #test: split_shard_replication_colocated_setup
test: worker_split_copy_test #test: worker_split_copy_test
test: worker_split_binary_copy_test #test: worker_split_binary_copy_test
test: worker_split_text_copy_test #test: worker_split_text_copy_test
test: citus_split_shard_by_split_points_negative #test: citus_split_shard_by_split_points_negative
test: citus_split_shard_by_split_points #test: citus_split_shard_by_split_points
test: citus_split_shard_by_split_points_failure #test: citus_split_shard_by_split_points_failure
# Name citus_split_shard_by_split_points_columnar_partitioned was too long and being truncated. # Name citus_split_shard_by_split_points_columnar_partitioned was too long and being truncated.
# use citus_split_shard_columnar_partitioned instead. # use citus_split_shard_columnar_partitioned instead.
test: citus_split_shard_columnar_partitioned #test: citus_split_shard_columnar_partitioned
test: citus_non_blocking_split_shards #test: citus_non_blocking_split_shards
test: citus_non_blocking_split_shard_cleanup test: citus_non_blocking_split_shard_cleanup
test: citus_non_blocking_split_columnar #test: citus_non_blocking_split_columnar

View File

@ -44,7 +44,7 @@ SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \
SELECT pg_catalog.citus_split_shard_by_split_points( SELECT pg_catalog.citus_split_shard_by_split_points(
8981000, 8981000,
ARRAY['-1073741824'], ARRAY['-1073741824'],
ARRAY[:worker_2_node, :worker_2_node], ARRAY[:worker_1_node, :worker_2_node],
'force_logical'); 'force_logical');
@ -52,40 +52,14 @@ SELECT pg_catalog.citus_split_shard_by_split_points(
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
SET citus.show_shards_for_app_name_prefixes = '*'; SET citus.show_shards_for_app_name_prefixes = '*';
-- Dummy shards should be cleaned up. 8981007, 8981008 are dummy shards SELECT * FROM pg_replication_slots;
-- created at source. SELECT * FROM pg_catalog.pg_shard_cleanup;
SELECT count(*) FROM pg_class where relname like '%sensors_8981007%';
SELECT count(*) FROM pg_class where relname like '%sensors_8981008%';
-- Replication slots should be cleanedup at source
SELECT slot_name FROM pg_replication_slots;
-- Publications should be cleaned up on worker1
SELECT count(*) FROM pg_publication;
\c - - - :worker_2_port \c - - - :worker_2_port
SET search_path TO "citus_split_test_schema"; SET search_path TO "citus_split_test_schema";
-- All subscriptions should be cleaned up. SET citus.show_shards_for_app_name_prefixes = '*';
SELECT count(*) FROM pg_subscription; SELECT * FROM pg_catalog.pg_shard_cleanup;
-- Trigger a 3-way local split.
\c - - - :master_port
SET search_path TO "citus_split_test_schema";
SELECT pg_catalog.citus_split_shard_by_split_points(
8981001,
ARRAY['536870911', '1610612735'],
ARRAY[:worker_2_node, :worker_2_node, :worker_2_node],
'force_logical');
\c - - - :worker_2_port
SET search_path TO "citus_split_test_schema";
-- Replication slots should be cleaned up
SELECT slot_name FROM pg_replication_slots;
-- Publications should be cleanedup
SELECT count(*) FROM pg_publication;
-- All subscriptions should be cleaned up.
SELECT count(*) FROM pg_subscription;
--BEGIN : Cleanup --BEGIN : Cleanup
\c - postgres - :master_port \c - postgres - :master_port