From 5b7a88438e49a1179e67dacef492d9f86e33d44b Mon Sep 17 00:00:00 2001 From: Xavier Stevens Date: Wed, 11 Feb 2015 11:48:42 -0800 Subject: [PATCH] Fixing type lookups for PostGIS types, some memory allocation, etc. --- .gitignore | 3 ++ README.md | 35 ++++++++++++++-- src/decoderbufs.c | 76 ++++++++++++---------------------- src/proto/pg_logicaldec.pb-c.h | 8 ++-- 4 files changed, 65 insertions(+), 57 deletions(-) diff --git a/.gitignore b/.gitignore index 496f7d7..f20cb14 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,6 @@ .cproject .project .settings + +# Xcode +*.xcodeproj \ No newline at end of file diff --git a/README.md b/README.md index 4300192..565ffb6 100644 --- a/README.md +++ b/README.md @@ -22,12 +22,13 @@ project and blog posts as a guide to teach myself how to write a PostgreSQL logi This code is built with the following assumptions. You may get mixed results if you deviate from these versions. * [PostgreSQL](http://www.postgresql.org) 9.4+ -* [Protocol Buffers](https://developers.google.com/protocol-buffers) 2.5.0 -* [protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.0.2 +* [Protocol Buffers](https://developers.google.com/protocol-buffers) 2.6.1 +* [protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.1.0 * [PostGIS](http://postgis.net) 2.1.x ### Requirements * PostgreSQL +* PostGIS * Protocol Buffers * protobuf-c @@ -86,7 +87,35 @@ The binary format uses simple frame encoding, which uses an 8-byte length (uint6 pg_recvlogical -h localhost -d -U -w -S decoderbufs_demo -P decoderbufs -f decoderbuf.frames -s 1 -F 1 --start For something a bit more useful, I am looking to implement a custom PostgreSQL logical replication client that publishes to something like Apache Kafka. - + +### Type Mappings + +The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields: + +| PostgreSQL Type OID | Decoderbuf Field | +|---------------------|---------------| +| BOOLOID | datum_boolean | +| INT2OID | datum_int32 | +| INT4OID | datum_int32 | +| INT8OID | datum_int64 | +| OIDOID | datum_int64 | +| FLOAT4OID | datum_float | +| FLOAT8OID | datum_double | +| NUMERICOID | datum_double | +| CHAROID | datum_string | +| VARCHAROID | datum_string | +| BPCHAROID | datum_string | +| TEXTOID | datum_string | +| JSONOID | datum_string | +| XMLOID | datum_string | +| UUIDOID | datum_string | +| TIMESTAMPOID | datum_string | +| TIMESTAMPTZOID | datum_string | +| BYTEAOID | datum_bytes | +| POINTOID | datum_point | +| PostGIS geometry | datum_point | +| PostGIS geography | datum_point | + ### Support File bug reports, feature requests and questions using diff --git a/src/decoderbufs.c b/src/decoderbufs.c index 198f5c4..39298af 100644 --- a/src/decoderbufs.c +++ b/src/decoderbufs.c @@ -36,6 +36,7 @@ #include "funcapi.h" #include "catalog/pg_class.h" #include "catalog/pg_type.h" +#include "catalog/namespace.h" #include "executor/spi.h" #include "replication/output_plugin.h" #include "replication/logical.h" @@ -73,9 +74,10 @@ typedef struct { MemoryContext context; bool debug_mode; } DecoderData; + /* GLOBALs for PostGIS dynamic OIDs */ -int geometry_oid = -1; -int geography_oid = -1; +Oid geometry_oid = InvalidOid; +Oid geography_oid = InvalidOid; /* these must be available to pg_dlsym() */ extern void _PG_init(void); @@ -145,45 +147,6 @@ static void pg_decode_startup(LogicalDecodingContext *ctx, } } - // set PostGIS geometry type id (these are dynamic unfortunately) - char *geom_oid_str = NULL; - char *geog_oid_str = NULL; - elog(INFO, "SPI_connect() ... "); - if (SPI_connect() == SPI_ERROR_CONNECT) { - elog(WARNING, "Could not connect to SPI manager to scan for PostGIS types"); - SPI_finish(); - return; - } - elog(INFO, "SPI_execute(\"SELECT oid FROM pg_type WHERE typname = 'geometry'\") ... "); - if (SPI_OK_SELECT == - SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geometry'", - true, 1) && - SPI_processed > 0) { - geom_oid_str = - SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); - if (geom_oid_str != NULL) { - elog(NOTICE, "Decoderbufs detected PostGIS geometry type with oid: %s", geom_oid_str); - geometry_oid = atoi(geom_oid_str); - } - } else { - elog(WARNING, "No type oid detected for PostGIS geometry"); - } - elog(INFO, "SPI_execute(\"SELECT oid FROM pg_type WHERE typname = 'geography'\") ... "); - if (SPI_OK_SELECT == - SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geography'", - true, 1) && - SPI_processed > 0) { - geog_oid_str = - SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1); - if (geog_oid_str != NULL) { - elog(NOTICE, "Decoderbufs detected PostGIS geography type with oid: %s", geog_oid_str); - geography_oid = atoi(geog_oid_str); - } - } else { - elog(WARNING, "No type oid detected for PostGIS geography"); - } - SPI_finish(); - ctx->output_plugin_private = data; elog(INFO, "Exiting startup callback"); @@ -199,7 +162,22 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) { /* BEGIN callback */ static void pg_decode_begin_txn(LogicalDecodingContext *ctx, - ReorderBufferTXN *txn) {} + ReorderBufferTXN *txn) { + // set PostGIS geometry type id (these are dynamic) + // TODO: Figure out how to make sure we get the typid's from postgis extension namespace + if (geometry_oid == InvalidOid) { + geometry_oid = TypenameGetTypid("geometry"); + if (geometry_oid != InvalidOid) { + elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid); + } + } + if (geography_oid == InvalidOid) { + geography_oid = TypenameGetTypid("geography"); + if (geography_oid != InvalidOid) { + elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid); + } + } +} /* COMMIT callback */ static void pg_decode_commit_txn(LogicalDecodingContext *ctx, @@ -439,8 +417,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid, datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); geography_point_as_decoderbufs_point(datum, datum_msg->datum_point); } else { - elog(DEBUG1, "Encountered unknown typid: %d", typid); - elog(DEBUG1, "PostGIS Geometry OID[%d], Geography OID[%d]", geometry_oid, geography_oid); + elog(WARNING, "Encountered unknown typid: %d, using bytes", typid); output = OidOutputFunctionCall(typoutput, datum); int len = strlen(output); size = sizeof(char) * len; @@ -475,8 +452,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg, Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; /* set the column name */ - const char *col_name = quote_identifier(NameStr(attr->attname)); - datum_msg.column_name = col_name; + datum_msg.column_name = quote_identifier(NameStr(attr->attname)); /* set datum from tuple */ origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); @@ -542,7 +518,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_new_tuple = tupdesc->natts; rmsg.new_tuple = - palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); } @@ -555,7 +531,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_old_tuple = tupdesc->natts; rmsg.old_tuple = - palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); } @@ -563,7 +539,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_new_tuple = tupdesc->natts; rmsg.new_tuple = - palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); tuple_to_tuple_msg(rmsg.new_tuple, relation, &change->data.tp.newtuple->tuple, tupdesc); } @@ -577,7 +553,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn, TupleDesc tupdesc = RelationGetDescr(relation); rmsg.n_old_tuple = tupdesc->natts; rmsg.old_tuple = - palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); + palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts); tuple_to_tuple_msg(rmsg.old_tuple, relation, &change->data.tp.oldtuple->tuple, tupdesc); } diff --git a/src/proto/pg_logicaldec.pb-c.h b/src/proto/pg_logicaldec.pb-c.h index 80b514d..b7fec87 100644 --- a/src/proto/pg_logicaldec.pb-c.h +++ b/src/proto/pg_logicaldec.pb-c.h @@ -1,8 +1,8 @@ /* Generated by the protocol buffer compiler. DO NOT EDIT! */ /* Generated from: pg_logicaldec.proto */ -#ifndef PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED -#define PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED +#ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED +#define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED #include @@ -10,7 +10,7 @@ PROTOBUF_C__BEGIN_DECLS #if PROTOBUF_C_VERSION_NUMBER < 1000000 # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. -#elif 1000002 < PROTOBUF_C_MIN_COMPILER_VERSION +#elif 1001000 < PROTOBUF_C_MIN_COMPILER_VERSION # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. #endif @@ -168,4 +168,4 @@ extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor; PROTOBUF_C__END_DECLS -#endif /* PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED */ +#endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */