Fixing type lookups for PostGIS types, some memory allocation, etc.

pull/1/head
Xavier Stevens 2015-02-11 11:48:42 -08:00
parent 744b29fa7d
commit 5b7a88438e
4 changed files with 65 additions and 57 deletions

3
.gitignore vendored
View File

@ -26,3 +26,6 @@
.cproject .cproject
.project .project
.settings .settings
# Xcode
*.xcodeproj

View File

@ -22,12 +22,13 @@ project and blog posts as a guide to teach myself how to write a PostgreSQL logi
This code is built with the following assumptions. You may get mixed results if you deviate from these versions. This code is built with the following assumptions. You may get mixed results if you deviate from these versions.
* [PostgreSQL](http://www.postgresql.org) 9.4+ * [PostgreSQL](http://www.postgresql.org) 9.4+
* [Protocol Buffers](https://developers.google.com/protocol-buffers) 2.5.0 * [Protocol Buffers](https://developers.google.com/protocol-buffers) 2.6.1
* [protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.0.2 * [protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.1.0
* [PostGIS](http://postgis.net) 2.1.x * [PostGIS](http://postgis.net) 2.1.x
### Requirements ### Requirements
* PostgreSQL * PostgreSQL
* PostGIS
* Protocol Buffers * Protocol Buffers
* protobuf-c * protobuf-c
@ -86,7 +87,35 @@ The binary format uses simple frame encoding, which uses an 8-byte length (uint6
pg_recvlogical -h localhost -d <yourdb> -U <youruser> -w -S decoderbufs_demo -P decoderbufs -f decoderbuf.frames -s 1 -F 1 --start pg_recvlogical -h localhost -d <yourdb> -U <youruser> -w -S decoderbufs_demo -P decoderbufs -f decoderbuf.frames -s 1 -F 1 --start
For something a bit more useful, I am looking to implement a custom PostgreSQL logical replication client that publishes to something like Apache Kafka. For something a bit more useful, I am looking to implement a custom PostgreSQL logical replication client that publishes to something like Apache Kafka.
### Type Mappings
The following table shows how current PostgreSQL type OIDs are mapped to which decoderbuf fields:
| PostgreSQL Type OID | Decoderbuf Field |
|---------------------|---------------|
| BOOLOID | datum_boolean |
| INT2OID | datum_int32 |
| INT4OID | datum_int32 |
| INT8OID | datum_int64 |
| OIDOID | datum_int64 |
| FLOAT4OID | datum_float |
| FLOAT8OID | datum_double |
| NUMERICOID | datum_double |
| CHAROID | datum_string |
| VARCHAROID | datum_string |
| BPCHAROID | datum_string |
| TEXTOID | datum_string |
| JSONOID | datum_string |
| XMLOID | datum_string |
| UUIDOID | datum_string |
| TIMESTAMPOID | datum_string |
| TIMESTAMPTZOID | datum_string |
| BYTEAOID | datum_bytes |
| POINTOID | datum_point |
| PostGIS geometry | datum_point |
| PostGIS geography | datum_point |
### Support ### Support
File bug reports, feature requests and questions using File bug reports, feature requests and questions using

View File

@ -36,6 +36,7 @@
#include "funcapi.h" #include "funcapi.h"
#include "catalog/pg_class.h" #include "catalog/pg_class.h"
#include "catalog/pg_type.h" #include "catalog/pg_type.h"
#include "catalog/namespace.h"
#include "executor/spi.h" #include "executor/spi.h"
#include "replication/output_plugin.h" #include "replication/output_plugin.h"
#include "replication/logical.h" #include "replication/logical.h"
@ -73,9 +74,10 @@ typedef struct {
MemoryContext context; MemoryContext context;
bool debug_mode; bool debug_mode;
} DecoderData; } DecoderData;
/* GLOBALs for PostGIS dynamic OIDs */ /* GLOBALs for PostGIS dynamic OIDs */
int geometry_oid = -1; Oid geometry_oid = InvalidOid;
int geography_oid = -1; Oid geography_oid = InvalidOid;
/* these must be available to pg_dlsym() */ /* these must be available to pg_dlsym() */
extern void _PG_init(void); extern void _PG_init(void);
@ -145,45 +147,6 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
} }
} }
// set PostGIS geometry type id (these are dynamic unfortunately)
char *geom_oid_str = NULL;
char *geog_oid_str = NULL;
elog(INFO, "SPI_connect() ... ");
if (SPI_connect() == SPI_ERROR_CONNECT) {
elog(WARNING, "Could not connect to SPI manager to scan for PostGIS types");
SPI_finish();
return;
}
elog(INFO, "SPI_execute(\"SELECT oid FROM pg_type WHERE typname = 'geometry'\") ... ");
if (SPI_OK_SELECT ==
SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geometry'",
true, 1) &&
SPI_processed > 0) {
geom_oid_str =
SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
if (geom_oid_str != NULL) {
elog(NOTICE, "Decoderbufs detected PostGIS geometry type with oid: %s", geom_oid_str);
geometry_oid = atoi(geom_oid_str);
}
} else {
elog(WARNING, "No type oid detected for PostGIS geometry");
}
elog(INFO, "SPI_execute(\"SELECT oid FROM pg_type WHERE typname = 'geography'\") ... ");
if (SPI_OK_SELECT ==
SPI_execute("SELECT oid FROM pg_type WHERE typname = 'geography'",
true, 1) &&
SPI_processed > 0) {
geog_oid_str =
SPI_getvalue(SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
if (geog_oid_str != NULL) {
elog(NOTICE, "Decoderbufs detected PostGIS geography type with oid: %s", geog_oid_str);
geography_oid = atoi(geog_oid_str);
}
} else {
elog(WARNING, "No type oid detected for PostGIS geography");
}
SPI_finish();
ctx->output_plugin_private = data; ctx->output_plugin_private = data;
elog(INFO, "Exiting startup callback"); elog(INFO, "Exiting startup callback");
@ -199,7 +162,22 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
/* BEGIN callback */ /* BEGIN callback */
static void pg_decode_begin_txn(LogicalDecodingContext *ctx, static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn) {} ReorderBufferTXN *txn) {
// set PostGIS geometry type id (these are dynamic)
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
if (geometry_oid == InvalidOid) {
geometry_oid = TypenameGetTypid("geometry");
if (geometry_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
}
}
if (geography_oid == InvalidOid) {
geography_oid = TypenameGetTypid("geography");
if (geography_oid != InvalidOid) {
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
}
}
}
/* COMMIT callback */ /* COMMIT callback */
static void pg_decode_commit_txn(LogicalDecodingContext *ctx, static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
@ -439,8 +417,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point)); datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point); geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
} else { } else {
elog(DEBUG1, "Encountered unknown typid: %d", typid); elog(WARNING, "Encountered unknown typid: %d, using bytes", typid);
elog(DEBUG1, "PostGIS Geometry OID[%d], Geography OID[%d]", geometry_oid, geography_oid);
output = OidOutputFunctionCall(typoutput, datum); output = OidOutputFunctionCall(typoutput, datum);
int len = strlen(output); int len = strlen(output);
size = sizeof(char) * len; size = sizeof(char) * len;
@ -475,8 +452,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT; Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
/* set the column name */ /* set the column name */
const char *col_name = quote_identifier(NameStr(attr->attname)); datum_msg.column_name = quote_identifier(NameStr(attr->attname));
datum_msg.column_name = col_name;
/* set datum from tuple */ /* set datum from tuple */
origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull); origval = fastgetattr(tuple, natt + 1, tupdesc, &isnull);
@ -542,7 +518,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleDesc tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = tupdesc->natts; rmsg.n_new_tuple = tupdesc->natts;
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
} }
@ -555,7 +531,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleDesc tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = tupdesc->natts; rmsg.n_old_tuple = tupdesc->natts;
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} }
@ -563,7 +539,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleDesc tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_new_tuple = tupdesc->natts; rmsg.n_new_tuple = tupdesc->natts;
rmsg.new_tuple = rmsg.new_tuple =
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
tuple_to_tuple_msg(rmsg.new_tuple, relation, tuple_to_tuple_msg(rmsg.new_tuple, relation,
&change->data.tp.newtuple->tuple, tupdesc); &change->data.tp.newtuple->tuple, tupdesc);
} }
@ -577,7 +553,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
TupleDesc tupdesc = RelationGetDescr(relation); TupleDesc tupdesc = RelationGetDescr(relation);
rmsg.n_old_tuple = tupdesc->natts; rmsg.n_old_tuple = tupdesc->natts;
rmsg.old_tuple = rmsg.old_tuple =
palloc(sizeof(Decoderbufs__DatumMessage) * tupdesc->natts); palloc(sizeof(Decoderbufs__DatumMessage*) * tupdesc->natts);
tuple_to_tuple_msg(rmsg.old_tuple, relation, tuple_to_tuple_msg(rmsg.old_tuple, relation,
&change->data.tp.oldtuple->tuple, tupdesc); &change->data.tp.oldtuple->tuple, tupdesc);
} }

View File

@ -1,8 +1,8 @@
/* Generated by the protocol buffer compiler. DO NOT EDIT! */ /* Generated by the protocol buffer compiler. DO NOT EDIT! */
/* Generated from: pg_logicaldec.proto */ /* Generated from: pg_logicaldec.proto */
#ifndef PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED #ifndef PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED
#define PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED #define PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED
#include <protobuf-c/protobuf-c.h> #include <protobuf-c/protobuf-c.h>
@ -10,7 +10,7 @@ PROTOBUF_C__BEGIN_DECLS
#if PROTOBUF_C_VERSION_NUMBER < 1000000 #if PROTOBUF_C_VERSION_NUMBER < 1000000
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers. # error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
#elif 1000002 < PROTOBUF_C_MIN_COMPILER_VERSION #elif 1001000 < PROTOBUF_C_MIN_COMPILER_VERSION
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c. # error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
#endif #endif
@ -168,4 +168,4 @@ extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
PROTOBUF_C__END_DECLS PROTOBUF_C__END_DECLS
#endif /* PROTOBUF_C_pg_5flogicaldec_2eproto__INCLUDED */ #endif /* PROTOBUF_C_proto_2fpg_5flogicaldec_2eproto__INCLUDED */