From b211211fdf212833ef2b5fb271e3def8a64e48ff Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Tue, 20 Jan 2015 15:09:29 -0800 Subject: [PATCH] Adding support for Postgres Point as well as PostGIS Geometry/Geography Points --- Makefile | 4 +- proto/pg_logicaldec.proto | 6 ++ src/decoderbufs.c | 106 +++++++++++++++++++++++++++--- src/proto/pg_logicaldec.pb-c.c | 115 +++++++++++++++++++++++++++++++-- src/proto/pg_logicaldec.pb-c.h | 48 ++++++++++++-- 5 files changed, 257 insertions(+), 22 deletions(-) diff --git a/Makefile b/Makefile index 465a200..daefde4 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,8 @@ MODULES = decoderbufs PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0') PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0') -PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) +PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include +SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) -L/usr/local/lib -llwgeom MODULE_big = $(patsubst src/%.c,%,$(wildcard src/*.c)) OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o diff --git a/proto/pg_logicaldec.proto b/proto/pg_logicaldec.proto index 71c317b..39cd277 100644 --- a/proto/pg_logicaldec.proto +++ b/proto/pg_logicaldec.proto @@ -10,6 +10,11 @@ enum Op { DELETE = 2; } +message Point { + required double x = 1; + required double y = 2; +} + message DatumMessage { optional string column_name = 1; optional int64 column_type = 2; @@ -20,6 +25,7 @@ message DatumMessage { optional bool datum_bool = 7; optional string datum_string = 8; optional bytes datum_bytes = 9; + optional Point datum_point = 10; } message RowMessage { diff --git a/src/decoderbufs.c b/src/decoderbufs.c index dac6321..0461001 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -36,10 +36,12 @@ #include "funcapi.h" #include "catalog/pg_class.h" #include "catalog/pg_type.h" +#include "executor/spi.h" #include "replication/output_plugin.h" #include "replication/logical.h" #include "utils/builtins.h" #include "utils/lsyscache.h" +#include "utils/geo_decls.h" #include "utils/json.h" #include "utils/memutils.h" #include "utils/numeric.h" @@ -50,6 +52,10 @@ #include "utils/uuid.h" #include "proto/pg_logicaldec.pb-c.h" +/* POSTGIS version define so it doesn't redef macros */ +#define POSTGIS_PGSQL_VERSION 94 +#include "libpgcommon/lwgeom_pg.h" + PG_MODULE_MAGIC; /* define a time macro to convert TimestampTz into something more sane, @@ -67,6 +73,9 @@ typedef struct { MemoryContext context; bool debug_mode; } DecoderData; +/* GLOBALs for PostGIS dynamic OIDs */ +int geometry_oid = -1; +int geography_oid = -1; /* these must be available to pg_dlsym() */ extern void _PG_init(void); @@ -124,16 +133,40 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, } if (data->debug_mode) { - fprintf(stderr, "Decoderbufs DEBUG MODE is ON."); + fprintf(stderr, "Decoderbufs DEBUG MODE is ON.\n"); opt->output_type = OUTPUT_PLUGIN_TEXTUAL_OUTPUT; } else { - fprintf(stderr, "Decoderbufs DEBUG MODE is OFF."); + fprintf(stderr, "Decoderbufs DEBUG MODE is OFF.\n"); } } else { ereport(ERROR, (errcode(ERRCODE_INVALID_PARAMETER_VALUE), errmsg("option \"%s\" = \"%s\" is unknown", elem->defname, elem->arg ? strVal(elem->arg) : "(null)"))); } + + // set PostGIS geometry type id (these are dynamic unfortunately) + char *geom_oid_str = NULL; + char *geog_oid_str = NULL; + if (SPI_connect() == SPI_ERROR_CONNECT) { + elog(NOTICE, "Could not connect to SPI manager to scan for PostGIS types"); + SPI_finish(); + return; + } + if (SPI_OK_SELECT == SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geometry'", true, 1) && SPI_processed > 0) { + geom_oid_str = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); + if (geom_oid_str != NULL) { + fprintf(stderr, "Decoderbufs detected PostGIS geometry type with oid: %s\n", geom_oid_str); + geometry_oid = atoi(geom_oid_str); + } + } + if (SPI_OK_SELECT == SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geography'", true, 1) && SPI_processed > 0) { + geog_oid_str = SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); + if (geog_oid_str != NULL) { + fprintf(stderr, "Decoderbufs detected PostGIS geography type with oid: %s\n", geog_oid_str); + geography_oid = atoi(geog_oid_str); + } + } + SPI_finish(); } } @@ -156,7 +189,7 @@ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, } /* convenience method to free up sub-messages */ -static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { +static void row_message_destroy(Decoderbufs__RowMessage *msg) { if (!msg) { return; } @@ -171,6 +204,8 @@ static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { pfree(msg->new_tuple[i]->datum_bytes.data); msg->new_tuple[i]->datum_bytes.data = NULL; msg->new_tuple[i]->datum_bytes.len = 0; + } else if (msg->new_tuple[i]->datum_point) { + pfree(msg->new_tuple[i]->datum_point); } pfree(msg->new_tuple[i]); } @@ -186,6 +221,8 @@ static void free_row_msg_subs(Decoderbufs__RowMessage *msg) { pfree(msg->old_tuple[i]->datum_bytes.data); msg->old_tuple[i]->datum_bytes.data = NULL; msg->old_tuple[i]->datum_bytes.len = 0; + } else if (msg->old_tuple[i]->datum_point) { + pfree(msg->old_tuple[i]->datum_point); } pfree(msg->old_tuple[i]); } @@ -230,7 +267,15 @@ static void print_tuple_msg(StringInfo out, Decoderbufs__DatumMessage **tup, case TIMESTAMPTZOID: appendStringInfo(out, ", datum[%s]", dmsg->datum_string); break; + case POINTOID: + appendStringInfo(out, ", datum[POINT(%f, %f)]", dmsg->datum_point->x, dmsg->datum_point->y); + break; default: + if (dmsg->column_type == geometry_oid && dmsg->datum_point != NULL) { + appendStringInfo(out, ", datum[GEOMETRY(POINT(%f,%f))]", dmsg->datum_point->x, dmsg->datum_point->y); + } else if (dmsg->column_type == geography_oid && dmsg->datum_point != NULL) { + appendStringInfo(out, ", datum[GEOGRAPHY(POINT(%f,%f))]", dmsg->datum_point->x, dmsg->datum_point->y); + } break; } appendStringInfo(out, "\n"); @@ -262,12 +307,40 @@ static double numeric_to_double_no_overflow(Numeric num) { return val; } +static bool geography_point_as_decoderbufs_point(Datum datum, Decoderbufs__Point *p) { + GSERIALIZED *geom; + LWGEOM *lwgeom; + LWPOINT *point = NULL; + POINT2D p2d; + + geom = (GSERIALIZED*)PG_DETOAST_DATUM(datum); + if (gserialized_get_type(geom) != POINTTYPE) { + return false; + } + + lwgeom = lwgeom_from_gserialized(geom); + point = lwgeom_as_lwpoint(lwgeom); + if (!lwgeom_is_empty(lwgeom)) { + return false; + } + + getPoint2d_p(point->point, 0, &p2d); + + if (p != NULL) { + p->x = p2d.x; + p->y = p2d.y; + } + + return true; +} + /* set a datum value based on its OID specified by typid */ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, Oid typoutput, Datum datum) { Numeric num; bytea *valptr; const char *output; + Point *p; int size = 0; switch (typid) { case BOOLOID: @@ -328,13 +401,26 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_bytes.len = size; datum_msg->has_datum_bytes = true; break; + case POINTOID: + p = DatumGetPointP(datum); + datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); + datum_msg->datum_point->x = p->x; + datum_msg->datum_point->y = p->y; + break; default: - output = OidOutputFunctionCall(typoutput, datum); - size = sizeof(output); - datum_msg->datum_bytes.data = palloc(size); - memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); - datum_msg->datum_bytes.len = size; - datum_msg->has_datum_bytes = true; + // PostGIS uses dynamic OIDs so we need to check the type again here + if (typid == geometry_oid || typid == geography_oid) { + datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); + geography_point_as_decoderbufs_point(datum, datum_msg->datum_point); + } else { + output = OidOutputFunctionCall(typoutput, datum); + int len = strlen(output); + size = sizeof(char) * len; + datum_msg->datum_bytes.data = palloc(size); + memcpy(datum_msg->datum_bytes.data, (uint8_t *)output, size); + datum_msg->datum_bytes.len = len; + datum_msg->has_datum_bytes = true; + } break; } } @@ -509,7 +595,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, } /* cleanup msg */ - free_row_msg_subs(&rmsg); + row_message_destroy(&rmsg); MemoryContextSwitchTo(old); MemoryContextReset(data->context); diff --git a/src/proto/pg_logicaldec.pb-c.c b/src/proto/pg_logicaldec.pb-c.c index 4dd9d5c..46a7af1 100644 --- a/src/proto/pg_logicaldec.pb-c.c +++ b/src/proto/pg_logicaldec.pb-c.c @@ -1,5 +1,5 @@ /* Generated by the protocol buffer compiler. DO NOT EDIT! */ -/* Generated from: proto/pg_logicaldec.proto */ +/* Generated from: pg_logicaldec.proto */ /* Do not generate deprecated warnings for self */ #ifndef PROTOBUF_C__NO_DEPRECATED @@ -7,6 +7,49 @@ #endif #include "pg_logicaldec.pb-c.h" +void decoderbufs__point__init + (Decoderbufs__Point *message) +{ + static Decoderbufs__Point init_value = DECODERBUFS__POINT__INIT; + *message = init_value; +} +size_t decoderbufs__point__get_packed_size + (const Decoderbufs__Point *message) +{ + assert(message->base.descriptor == &decoderbufs__point__descriptor); + return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message)); +} +size_t decoderbufs__point__pack + (const Decoderbufs__Point *message, + uint8_t *out) +{ + assert(message->base.descriptor == &decoderbufs__point__descriptor); + return protobuf_c_message_pack ((const ProtobufCMessage*)message, out); +} +size_t decoderbufs__point__pack_to_buffer + (const Decoderbufs__Point *message, + ProtobufCBuffer *buffer) +{ + assert(message->base.descriptor == &decoderbufs__point__descriptor); + return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer); +} +Decoderbufs__Point * + decoderbufs__point__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data) +{ + return (Decoderbufs__Point *) + protobuf_c_message_unpack (&decoderbufs__point__descriptor, + allocator, len, data); +} +void decoderbufs__point__free_unpacked + (Decoderbufs__Point *message, + ProtobufCAllocator *allocator) +{ + assert(message->base.descriptor == &decoderbufs__point__descriptor); + protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); +} void decoderbufs__datum_message__init (Decoderbufs__DatumMessage *message) { @@ -93,7 +136,58 @@ void decoderbufs__row_message__free_unpacked assert(message->base.descriptor == &decoderbufs__row_message__descriptor); protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator); } -static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[9] = +static const ProtobufCFieldDescriptor decoderbufs__point__field_descriptors[2] = +{ + { + "x", + 1, + PROTOBUF_C_LABEL_REQUIRED, + PROTOBUF_C_TYPE_DOUBLE, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__Point, x), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, + { + "y", + 2, + PROTOBUF_C_LABEL_REQUIRED, + PROTOBUF_C_TYPE_DOUBLE, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__Point, y), + NULL, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, +}; +static const unsigned decoderbufs__point__field_indices_by_name[] = { + 0, /* field[0] = x */ + 1, /* field[1] = y */ +}; +static const ProtobufCIntRange decoderbufs__point__number_ranges[1 + 1] = +{ + { 1, 0 }, + { 0, 2 } +}; +const ProtobufCMessageDescriptor decoderbufs__point__descriptor = +{ + PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC, + "decoderbufs.Point", + "Point", + "Decoderbufs__Point", + "decoderbufs", + sizeof(Decoderbufs__Point), + 2, + decoderbufs__point__field_descriptors, + decoderbufs__point__field_indices_by_name, + 1, decoderbufs__point__number_ranges, + (ProtobufCMessageInit) decoderbufs__point__init, + NULL,NULL,NULL /* reserved[123] */ +}; +static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] = { { "column_name", @@ -203,6 +297,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript 0, /* flags */ 0,NULL,NULL /* reserved1,reserved2, etc */ }, + { + "datum_point", + 10, + PROTOBUF_C_LABEL_OPTIONAL, + PROTOBUF_C_TYPE_MESSAGE, + 0, /* quantifier_offset */ + offsetof(Decoderbufs__DatumMessage, datum_point), + &decoderbufs__point__descriptor, + NULL, + 0, /* flags */ + 0,NULL,NULL /* reserved1,reserved2, etc */ + }, }; static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { 0, /* field[0] = column_name */ @@ -213,12 +319,13 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = { 4, /* field[4] = datum_float */ 2, /* field[2] = datum_int32 */ 3, /* field[3] = datum_int64 */ + 9, /* field[9] = datum_point */ 7, /* field[7] = datum_string */ }; static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] = { { 1, 0 }, - { 0, 9 } + { 0, 10 } }; const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = { @@ -228,7 +335,7 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor = "Decoderbufs__DatumMessage", "decoderbufs", sizeof(Decoderbufs__DatumMessage), - 9, + 10, decoderbufs__datum_message__field_descriptors, decoderbufs__datum_message__field_indices_by_name, 1, decoderbufs__datum_message__number_ranges, diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index dfbbaf4..80b514d 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -1,8 +1,8 @@ /* Generated by the protocol buffer compiler. DO NOT EDIT! */ -/* Generated from: proto/pg_logicaldec.proto */ +/* Generated from: pg_logicaldec.proto */ -#ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED -#define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED +#ifndef PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED +#define PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED #include @@ -10,11 +10,12 @@ PROTOBUF_C__BEGIN_DECLS #if PROTOBUF_C_VERSION_NUMBER < 1000000 # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. -#elif 1000001 < PROTOBUF_C_MIN_COMPILER_VERSION +#elif 1000002 < PROTOBUF_C_MIN_COMPILER_VERSION # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. #endif +typedef struct _Decoderbufs__Point Decoderbufs__Point; typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage; typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage; @@ -30,6 +31,17 @@ typedef enum _Decoderbufs__Op { /* --- messages --- */ +struct _Decoderbufs__Point +{ + ProtobufCMessage base; + double x; + double y; +}; +#define DECODERBUFS__POINT__INIT \ + { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__point__descriptor) \ + , 0, 0 } + + struct _Decoderbufs__DatumMessage { ProtobufCMessage base; @@ -49,10 +61,11 @@ struct _Decoderbufs__DatumMessage char *datum_string; protobuf_c_boolean has_datum_bytes; ProtobufCBinaryData datum_bytes; + Decoderbufs__Point *datum_point; }; #define DECODERBUFS__DATUM_MESSAGE__INIT \ { PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \ - , NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL} } + , NULL, 0,0, 0,0, 0,0, 0,0, 0,0, 0,0, NULL, 0,{0,NULL}, NULL } struct _Decoderbufs__RowMessage @@ -73,6 +86,25 @@ struct _Decoderbufs__RowMessage , 0,0, NULL, 0,0, 0,NULL, 0,NULL } +/* Decoderbufs__Point methods */ +void decoderbufs__point__init + (Decoderbufs__Point *message); +size_t decoderbufs__point__get_packed_size + (const Decoderbufs__Point *message); +size_t decoderbufs__point__pack + (const Decoderbufs__Point *message, + uint8_t *out); +size_t decoderbufs__point__pack_to_buffer + (const Decoderbufs__Point *message, + ProtobufCBuffer *buffer); +Decoderbufs__Point * + decoderbufs__point__unpack + (ProtobufCAllocator *allocator, + size_t len, + const uint8_t *data); +void decoderbufs__point__free_unpacked + (Decoderbufs__Point *message, + ProtobufCAllocator *allocator); /* Decoderbufs__DatumMessage methods */ void decoderbufs__datum_message__init (Decoderbufs__DatumMessage *message); @@ -113,6 +145,9 @@ void decoderbufs__row_message__free_unpacked ProtobufCAllocator *allocator); /* --- per-message closures --- */ +typedef void (*Decoderbufs__Point_Closure) + (const Decoderbufs__Point *message, + void *closure_data); typedef void (*Decoderbufs__DatumMessage_Closure) (const Decoderbufs__DatumMessage *message, void *closure_data); @@ -126,10 +161,11 @@ typedef void (*Decoderbufs__RowMessage_Closure) /* --- descriptors --- */ extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor; +extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor; extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; PROTOBUF_C__END_DECLS -#endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */ +#endif /* PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED */