Introduce objects to dist. infrastructure when updating Citus (#3477)

Mark existing objects that are not included in distributed object infrastructure
in older versions of Citus (but now should be) as distributed, after updating
Citus successfully.
pull/3463/head
Onur Tirtir 2020-02-07 18:07:59 +03:00 committed by GitHub
parent d5433400f9
commit 39df51e903
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 401 additions and 4 deletions

View File

@ -135,6 +135,45 @@ EnsureDependenciesExistOnAllNodes(const ObjectAddress *target)
}
/*
* GetDistributableDependenciesForObject finds all the dependencies that Citus
* can distribute and returns those dependencies regardless of their existency
* on nodes.
*/
List *
GetDistributableDependenciesForObject(const ObjectAddress *target)
{
/* local variables to work with dependencies */
List *distributableDependencies = NIL;
ListCell *dependencyCell = NULL;
/* collect all dependencies in creation order */
List *dependencies = GetDependenciesForObject(target);
/* filter the ones that can be distributed */
foreach(dependencyCell, dependencies)
{
ObjectAddress *dependency = (ObjectAddress *) lfirst(dependencyCell);
/*
* TODO: maybe we can optimize the logic applied in below line. Actually we
* do not need to create ddl commands as we are not ensuring their existence
* in nodes, but we utilize logic it follows to choose the objects that could
* be distributed
*/
List *dependencyCommands = GetDependencyCreateDDLCommands(dependency);
/* create a new list with dependencies that actually created commands */
if (list_length(dependencyCommands) > 0)
{
distributableDependencies = lappend(distributableDependencies, dependency);
}
}
return distributableDependencies;
}
/*
* GetDependencyCreateDDLCommands returns a list (potentially empty or NIL) of ddl
* commands to execute on a worker to create the object.

View File

@ -17,7 +17,9 @@
#include "distributed/commands.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/deparser.h"
#include "distributed/master_protocol.h"
#include "distributed/metadata_sync.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "distributed/multi_executor.h"
#include "distributed/relation_access_tracking.h"
@ -32,6 +34,7 @@ static char * ExtractNewExtensionVersion(Node *parseTree);
static void AddSchemaFieldIfMissing(CreateExtensionStmt *stmt);
static List * FilterDistributedExtensions(List *extensionObjectList);
static List * ExtensionNameListToObjectAddressList(List *extensionObjectList);
static void MarkExistingObjectDependenciesDistributedIfSupported(void);
static void EnsureSequentialModeForExtensionDDL(void);
static bool ShouldPropagateExtensionCommand(Node *parseTree);
static bool IsDropCitusStmt(Node *parseTree);
@ -453,6 +456,7 @@ List *
PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString)
{
AlterExtensionStmt *alterExtensionStmt = castNode(AlterExtensionStmt, node);
if (!ShouldPropagateExtensionCommand((Node *) alterExtensionStmt))
{
return NIL;
@ -491,6 +495,112 @@ PreprocessAlterExtensionUpdateStmt(Node *node, const char *queryString)
}
/*
* PostprocessAlterExtensionCitusUpdateStmt is called after ALTER EXTENSION
* citus UPDATE command is executed by standard utility hook.
*
* Actually, we do not need such a post process function for ALTER EXTENSION
* UPDATE commands unless the extension is Citus itself. This is because we
* need to mark existing objects that are not included in distributed object
* infrastructure in older versions of Citus (but now should be) as distributed
* if we really updated Citus to the available version successfully via standard
* utility hook.
*/
void
PostprocessAlterExtensionCitusUpdateStmt(Node *node)
{
/*
* We should not postprocess this command in workers as they do not keep track
* of citus.pg_dist_object.
*/
if (!IsCoordinator())
{
return;
}
bool citusIsUpdatedToLatestVersion = InstalledAndAvailableVersionsSame();
/*
* Knowing that Citus version was different than the available version before
* standard process utility runs ALTER EXTENSION command, we perform post
* process operations if Citus is updated to that available version
*/
if (!citusIsUpdatedToLatestVersion)
{
return;
}
/*
* Finally, mark existing objects that are not included in distributed object
* infrastructure in older versions of Citus (but now should be) as distributed
*/
MarkExistingObjectDependenciesDistributedIfSupported();
}
/*
* MarkAllExistingObjectsDistributed marks all objects that could be distributed by
* resolving dependencies of "existing distributed tables" and "already distributed
* objects" to introduce the objects created in older versions of Citus to distributed
* object infrastructure as well.
*
* Note that this function is not responsible for ensuring if dependencies exist on
* nodes and satisfying these dependendencies if not exists, which is already done by
* EnsureDependenciesExistOnAllNodes on demand. Hence, this function is just designed
* to be used when "ALTER EXTENSION citus UPDATE" is executed.
* This is because we want to add existing objects that would have already been in
* pg_dist_object if we had created them in new version of Citus to pg_dist_object.
*/
static void
MarkExistingObjectDependenciesDistributedIfSupported()
{
ListCell *listCell = NULL;
/* resulting object addresses to be marked as distributed */
List *resultingObjectAddresses = NIL;
/* resolve dependencies of distributed tables */
List *distributedTableOidList = DistTableOidList();
foreach(listCell, distributedTableOidList)
{
Oid distributedTableOid = lfirst_oid(listCell);
ObjectAddress tableAddress = { 0 };
ObjectAddressSet(tableAddress, RelationRelationId, distributedTableOid);
List *distributableDependencyObjectAddresses =
GetDistributableDependenciesForObject(&tableAddress);
resultingObjectAddresses = list_concat(resultingObjectAddresses,
distributableDependencyObjectAddresses);
}
/* resolve dependencies of the objects in pg_dist_object*/
List *distributedObjectAddressList = GetDistributedObjectAddressList();
foreach(listCell, distributedObjectAddressList)
{
ObjectAddress *distributedObjectAddress = (ObjectAddress *) lfirst(listCell);
List *distributableDependencyObjectAddresses =
GetDistributableDependenciesForObject(distributedObjectAddress);
resultingObjectAddresses = list_concat(resultingObjectAddresses,
distributableDependencyObjectAddresses);
}
/* remove duplicates from object addresses list for efficiency */
List *uniqueObjectAddresses = GetUniqueDependenciesList(resultingObjectAddresses);
foreach(listCell, uniqueObjectAddresses)
{
ObjectAddress *objectAddress = (ObjectAddress *) lfirst(listCell);
MarkObjectDistributed(objectAddress);
}
}
/*
* PreprocessAlterExtensionContentsStmt issues a notice. It does not propagate.
*/

View File

@ -140,9 +140,9 @@ multi_ProcessUtility(PlannedStmt *pstmt,
return;
}
bool checkCreateAlterExtensionVersion = IsCreateAlterExtensionUpdateCitusStmt(
bool isCreateAlterExtensionUpdateCitusStmt = IsCreateAlterExtensionUpdateCitusStmt(
parsetree);
if (EnableVersionChecks && checkCreateAlterExtensionVersion)
if (EnableVersionChecks && isCreateAlterExtensionUpdateCitusStmt)
{
ErrorIfUnstableCreateOrAlterExtensionStmt(parsetree);
}
@ -466,9 +466,34 @@ multi_ProcessUtility(PlannedStmt *pstmt,
activeDropSchemaOrDBs++;
}
/*
* Check if we are running ALTER EXTENSION citus UPDATE (TO "<version>") command and
* the available version is different than the current version of Citus. In this case,
* ALTER EXTENSION citus UPDATE command can actually update Citus to a new version.
*/
bool isAlterExtensionUpdateCitusStmt = isCreateAlterExtensionUpdateCitusStmt &&
IsA(parsetree, AlterExtensionStmt);
bool citusCanBeUpdatedToAvailableVersion = false;
if (isAlterExtensionUpdateCitusStmt)
{
citusCanBeUpdatedToAvailableVersion = !InstalledAndAvailableVersionsSame();
}
standard_ProcessUtility(pstmt, queryString, context,
params, queryEnv, dest, completionTag);
/*
* if we are running ALTER EXTENSION citus UPDATE (to "<version>") command, we may need
* to mark existing objects as distributed depending on the "version" parameter if
* specified in "ALTER EXTENSION citus UPDATE" command
*/
if (isAlterExtensionUpdateCitusStmt && citusCanBeUpdatedToAvailableVersion)
{
PostprocessAlterExtensionCitusUpdateStmt(parsetree);
}
/*
* Postgres added the following CommandCounterIncrement as a patch in:
* - 10.7 -> 10.8

View File

@ -20,6 +20,7 @@
#include "catalog/pg_depend.h"
#include "catalog/pg_type.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/listutils.h"
#include "distributed/metadata/dependency.h"
#include "distributed/metadata/distobject.h"
#include "utils/fmgroids.h"
@ -74,6 +75,33 @@ static List * ExpandCitusSupportedTypes(ObjectAddressCollector *collector,
const ObjectAddress *target);
/*
* GetUniqueDependenciesList takes a list of object addresses and returns a new list
* of ObjectAddesses whose elements are unique.
*/
List *
GetUniqueDependenciesList(List *objectAddressesList)
{
ObjectAddressCollector objectAddressCollector = { 0 };
InitObjectAddressCollector(&objectAddressCollector);
ObjectAddress *objectAddress = NULL;
foreach_ptr(objectAddress, objectAddressesList)
{
if (IsObjectAddressCollected(objectAddress, &objectAddressCollector))
{
/* skip objects that are already collected */
continue;
}
CollectObjectAddress(&objectAddressCollector, objectAddress);
}
return objectAddressCollector.dependencyList;
}
/*
* GetDependenciesForObject returns a list of ObjectAddesses to be created in order
* before the target object could safely be created on a worker. Some of the object might

View File

@ -1627,7 +1627,7 @@ CheckAvailableVersion(int elevel)
/*
* CheckInstalledVersion compares CITUS_EXTENSIONVERSION and the
* extension's current version from the pg_extemsion catalog table. If they
* extension's current version from the pg_extension catalog table. If they
* are not compatible, this function logs an error with the specified elevel,
* otherwise it returns true.
*/
@ -1654,6 +1654,26 @@ CheckInstalledVersion(int elevel)
}
/*
* InstalledAndAvailableVersionsSame compares extension's available version and
* its current version from the pg_extension catalog table. If they are not same
* returns false, otherwise returns true.
*/
bool
InstalledAndAvailableVersionsSame()
{
char *installedVersion = InstalledExtensionVersion();
char *availableVersion = AvailableExtensionVersion();
if (strncmp(installedVersion, availableVersion, NAMEDATALEN) == 0)
{
return true;
}
return false;
}
/*
* MajorVersionsCompatible checks whether both versions are compatible. They
* are if major and minor version numbers match, the schema version is
@ -1904,7 +1924,7 @@ CitusCatalogNamespaceId(void)
}
/* return oid of pg_dist_shard relation */
/* return oid of pg_dist_object relation */
Oid
DistObjectRelationId(void)
{

View File

@ -81,6 +81,7 @@ extern List * PostprocessAlterExtensionSchemaStmt(Node *stmt,
const char *queryString);
extern List * PreprocessAlterExtensionUpdateStmt(Node *stmt,
const char *queryString);
extern void PostprocessAlterExtensionCitusUpdateStmt(Node *node);
extern List * PreprocessAlterExtensionContentsStmt(Node *node,
const char *queryString);
extern List * CreateExtensionDDLCommand(const ObjectAddress *extensionAddress);

View File

@ -138,6 +138,7 @@ extern void CreateDistributedTable(Oid relationId, Var *distributionColumn,
extern void CreateTruncateTrigger(Oid relationId);
extern void EnsureDependenciesExistOnAllNodes(const ObjectAddress *target);
extern List * GetDistributableDependenciesForObject(const ObjectAddress *target);
extern bool ShouldPropagate(void);
extern bool ShouldPropagateObject(const ObjectAddress *address);
extern void ReplicateAllDependenciesToNode(const char *nodeName, int nodePort);

View File

@ -17,6 +17,7 @@
#include "catalog/objectaddress.h"
#include "nodes/pg_list.h"
extern List * GetUniqueDependenciesList(List *objectAddressesList);
extern List * GetDependenciesForObject(const ObjectAddress *target);
extern List * OrderObjectAddressListInDependencyOrder(List *objectAddressList);
extern bool SupportedDependencyByCitus(const ObjectAddress *address);

View File

@ -150,6 +150,7 @@ extern bool HasOverlappingShardInterval(ShardInterval **shardIntervalArray,
extern bool CitusHasBeenLoaded(void);
extern bool CheckCitusVersion(int elevel);
extern bool CheckAvailableVersion(int elevel);
extern bool InstalledAndAvailableVersionsSame(void);
extern bool MajorVersionsCompatible(char *leftVersion, char *rightVersion);
extern void ErrorIfInconsistentShardIntervals(DistTableCacheEntry *cacheEntry);
extern void EnsureModificationsCanRun(void);

View File

@ -1,3 +1,4 @@
# this schedule is to be run only on coordinators
test: upgrade_basic_after
test: upgrade_pg_dist_object_test_after

View File

@ -1,3 +1,4 @@
# this schedule is to be run on only coordinators
test: upgrade_basic_before
test: upgrade_pg_dist_object_test_before

View File

@ -0,0 +1,33 @@
-- drop objects from previous test (uprade_basic_after.sql) for a clean test
-- drop upgrade_basic schema and switch back to public schema
SET search_path to public;
DROP SCHEMA upgrade_basic CASCADE;
NOTICE: drop cascades to 7 other objects
DETAIL: drop cascades to table upgrade_basic.t
drop cascades to table upgrade_basic.tp
drop cascades to table upgrade_basic.t_ab
drop cascades to table upgrade_basic.t2
drop cascades to table upgrade_basic.r
drop cascades to table upgrade_basic.tr
drop cascades to table upgrade_basic.t_append
-- as we updated citus to available version,
-- "isn" extension
-- "new_schema" schema
-- "public" schema
-- "fooschema" schema
-- "footype" type (under schema 'fooschema')
-- will now be marked as distributed
-- but,
-- "seg" extension
-- will not be marked as distributed
-- see underlying objects
SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3;
type | object_names | object_args
---------------------------------------------------------------------
extension | {isn} | {}
schema | {fooschema} | {}
schema | {new_schema} | {}
schema | {public} | {}
type | {fooschema.footype} | {}
(5 rows)

View File

@ -0,0 +1,74 @@
-- create some objects that we just included into distributed object
-- infrastructure in 9.1 versions but not included in 9.0.2
-- extension propagation --
-- create an extension on all nodes and a table that depends on it
CREATE EXTENSION isn;
SELECT run_command_on_workers($$CREATE EXTENSION isn;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57636,t,"CREATE EXTENSION")
(localhost,57637,t,"CREATE EXTENSION")
(2 rows)
CREATE TABLE isn_dist_table (key int, value issn);
SELECT create_reference_table('isn_dist_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- create an extension on all nodes, but do not create a table depending on it
CREATE EXTENSION seg;
SELECT run_command_on_workers($$CREATE EXTENSION seg;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57636,t,"CREATE EXTENSION")
(localhost,57637,t,"CREATE EXTENSION")
(2 rows)
-- schema propagation --
-- public schema
CREATE TABLE dist_table (a int);
SELECT create_reference_table('dist_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- custom schema
CREATE SCHEMA new_schema;
SET search_path to new_schema;
CREATE TABLE another_dist_table (a int);
SELECT create_reference_table('another_dist_table');
create_reference_table
---------------------------------------------------------------------
(1 row)
-- another custom schema and a type
-- create table that depends both on a type & schema here (actually type depends on the schema)
-- here we test if schema is marked as distributed successfully.
-- This is because tracking the dependencies will hit to the schema for two times
CREATE SCHEMA fooschema;
CREATE TYPE fooschema.footype AS (x int, y int);
SELECT run_command_on_workers($$CREATE SCHEMA fooschema;$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57636,t,"CREATE SCHEMA")
(localhost,57637,t,"CREATE SCHEMA")
(2 rows)
SELECT run_command_on_workers($$CREATE TYPE fooschema.footype AS (x int, y int);$$);
run_command_on_workers
---------------------------------------------------------------------
(localhost,57636,t,"CREATE TYPE")
(localhost,57637,t,"CREATE TYPE")
(2 rows)
CREATE TABLE fooschema.footable (f fooschema.footype);
SELECT create_reference_table('fooschema.footable');
create_reference_table
---------------------------------------------------------------------
(1 row)

View File

@ -0,0 +1,18 @@
-- drop objects from previous test (uprade_basic_after.sql) for a clean test
-- drop upgrade_basic schema and switch back to public schema
SET search_path to public;
DROP SCHEMA upgrade_basic CASCADE;
-- as we updated citus to available version,
-- "isn" extension
-- "new_schema" schema
-- "public" schema
-- "fooschema" schema
-- "footype" type (under schema 'fooschema')
-- will now be marked as distributed
-- but,
-- "seg" extension
-- will not be marked as distributed
-- see underlying objects
SELECT i.* FROM citus.pg_dist_object, pg_identify_object_as_address(classid, objid, objsubid) i ORDER BY 1, 2, 3;

View File

@ -0,0 +1,44 @@
-- create some objects that we just included into distributed object
-- infrastructure in 9.1 versions but not included in 9.0.2
-- extension propagation --
-- create an extension on all nodes and a table that depends on it
CREATE EXTENSION isn;
SELECT run_command_on_workers($$CREATE EXTENSION isn;$$);
CREATE TABLE isn_dist_table (key int, value issn);
SELECT create_reference_table('isn_dist_table');
-- create an extension on all nodes, but do not create a table depending on it
CREATE EXTENSION seg;
SELECT run_command_on_workers($$CREATE EXTENSION seg;$$);
-- schema propagation --
-- public schema
CREATE TABLE dist_table (a int);
SELECT create_reference_table('dist_table');
-- custom schema
CREATE SCHEMA new_schema;
SET search_path to new_schema;
CREATE TABLE another_dist_table (a int);
SELECT create_reference_table('another_dist_table');
-- another custom schema and a type
-- create table that depends both on a type & schema here (actually type depends on the schema)
-- here we test if schema is marked as distributed successfully.
-- This is because tracking the dependencies will hit to the schema for two times
CREATE SCHEMA fooschema;
CREATE TYPE fooschema.footype AS (x int, y int);
SELECT run_command_on_workers($$CREATE SCHEMA fooschema;$$);
SELECT run_command_on_workers($$CREATE TYPE fooschema.footype AS (x int, y int);$$);
CREATE TABLE fooschema.footable (f fooschema.footype);
SELECT create_reference_table('fooschema.footable');