Compare commits
25 Commits
Author | SHA1 | Date |
---|---|---|
|
84f30f3f44 | |
|
e1f87147b7 | |
|
e709f326cb | |
|
cd4171a030 | |
|
c9b00aa8c0 | |
|
7a8c8b8f46 | |
|
e29a2580a5 | |
|
870ecfa976 | |
|
01126bfa89 | |
|
7f1c6fefc3 | |
|
54e2c45f11 | |
|
3c910fff12 | |
|
44cf35d4b5 | |
|
0b536f372e | |
|
c1a5f51179 | |
|
c719d3fa6a | |
|
2a60a18ab1 | |
|
c5068d5b42 | |
|
105344ac58 | |
|
e2da727139 | |
|
0d4db2816b | |
|
d9477a5b99 | |
|
8baab62456 | |
|
e375003c3b | |
|
2f3179ef4e |
9
Makefile
9
Makefile
|
@ -4,16 +4,11 @@ EXTENSION = decoderbufs
|
||||||
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
|
PROTOBUF_C_CFLAGS = $(shell pkg-config --cflags 'libprotobuf-c >= 1.0.0')
|
||||||
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
|
PROTOBUF_C_LDFLAGS = $(shell pkg-config --libs 'libprotobuf-c >= 1.0.0')
|
||||||
|
|
||||||
ifneq ($(USE_POSTGIS),false)
|
|
||||||
C_PARAMS = -DUSE_POSTGIS
|
|
||||||
POSTGIS_C_LDFLAGS = -L/usr/local/lib -llwgeom
|
|
||||||
endif
|
|
||||||
|
|
||||||
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
|
PG_CPPFLAGS += -std=c11 $(PROTOBUF_C_CFLAGS) -I/usr/local/include $(C_PARAMS)
|
||||||
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS) $(POSTGIS_C_LDFLAGS)
|
SHLIB_LINK += $(PROTOBUF_C_LDFLAGS)
|
||||||
|
|
||||||
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o
|
OBJS = src/decoderbufs.o src/proto/pg_logicaldec.pb-c.o
|
||||||
|
|
||||||
PG_CONFIG = pg_config
|
PG_CONFIG ?= pg_config
|
||||||
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
PGXS := $(shell $(PG_CONFIG) --pgxs)
|
||||||
include $(PGXS)
|
include $(PGXS)
|
122
README.md
122
README.md
|
@ -10,77 +10,125 @@
|
||||||
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
|
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
|
||||||
|
|
||||||
## Thanks to
|
## Thanks to
|
||||||
|
|
||||||
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
|
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
|
||||||
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
|
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
|
||||||
|
|
||||||
## Dependencies
|
## Dependencies
|
||||||
|
|
||||||
This code depends on the following libraries and requires them for compilation:
|
This code depends on the following libraries and requires them for compilation:
|
||||||
|
|
||||||
* [PostgreSQL](http://www.postgresql.org) 9.6+
|
- [PostgreSQL](http://www.postgresql.org) 9.6+
|
||||||
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.1+ - used for data serialization
|
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
|
||||||
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
||||||
|
|
||||||
## Building
|
## Building
|
||||||
|
|
||||||
To build you will need to install PostgreSQL (for pg_config), PostgreSQL server development packages, protobuf-c for the
|
`postgres-decoderbufs` has to be built from source after installing required dependencies. The required dependencies are first PostgreSQL
|
||||||
Protocol Buffer support and some PostGIS development packages
|
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
|
||||||
|
|
||||||
### Debian
|
### Installing Dependencies
|
||||||
|
|
||||||
# Core build utilities
|
#### Debian
|
||||||
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
|
||||||
|
|
||||||
# PostGIS dependency
|
```bash
|
||||||
apt-get install -f -y libproj-dev liblwgeom-dev
|
# Core build utilities
|
||||||
|
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
||||||
|
|
||||||
# Protobuf-c dependency (requires a non-stable Debian repo)
|
# PostGIS dependency
|
||||||
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
apt-get install -f -y libproj-dev liblwgeom-dev
|
||||||
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
|
||||||
|
|
||||||
The above are taken from the Debezium [docker images](https://github.com/debezium/docker-images).
|
# Protobuf-c dependency (requires a non-stable Debian repo)
|
||||||
|
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
||||||
|
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
||||||
|
```
|
||||||
|
|
||||||
### Other Linux distributions
|
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
apt-get install -y protobuf-c-compiler=1.2.*
|
||||||
|
```
|
||||||
|
|
||||||
|
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
|
||||||
|
|
||||||
|
#### Other Linux distributions
|
||||||
|
|
||||||
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
|
You just need to make sure the above software packages (_or some flavour thereof_) are installed for your distro.
|
||||||
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
|
Note that the last step from the above sequence is only required for Debian to be able to install `libprotobuf-c-dev:1.2.1`
|
||||||
|
|
||||||
If you have all of the prerequisites installed you should be able to just:
|
### Getting the source code
|
||||||
|
|
||||||
make && make install
|
If you have all of the above prerequisites installed, clone this git repo to build from source:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
git clone https://github.com/debezium/postgres-decoderbufs.git
|
||||||
|
cd postgres-decoderbufs
|
||||||
|
```
|
||||||
|
|
||||||
|
### Optional: Re-generating ProtoBuf code
|
||||||
|
|
||||||
|
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd proto
|
||||||
|
protoc-c --c_out=../src/proto pg_logicaldec.proto
|
||||||
|
cd ..
|
||||||
|
```
|
||||||
|
|
||||||
|
Commit the generated files to git then.
|
||||||
|
|
||||||
|
### Building and installing decoderbufs
|
||||||
|
|
||||||
|
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
|
||||||
|
Then `make` and `make install` for each version. Here is an example:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Install for Postgres 9.6 if I have multiple local versions
|
||||||
|
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
|
||||||
|
make
|
||||||
|
make install
|
||||||
|
```
|
||||||
|
|
||||||
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
|
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
|
||||||
|
|
||||||
# MODULES
|
```bash
|
||||||
shared_preload_libraries = 'decoderbufs'
|
# MODULES
|
||||||
|
shared_preload_libraries = 'decoderbufs'
|
||||||
|
|
||||||
# REPLICATION
|
# REPLICATION
|
||||||
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
||||||
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
||||||
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
||||||
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
||||||
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
||||||
|
```
|
||||||
|
|
||||||
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
|
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
|
||||||
|
|
||||||
local replication <youruser> trust
|
```make
|
||||||
host replication <youruser> 127.0.0.1/32 trust
|
local replication <youruser> trust
|
||||||
host replication <youruser> ::1/128 trust
|
host replication <youruser> 127.0.0.1/32 trust
|
||||||
|
host replication <youruser> ::1/128 trust
|
||||||
|
```
|
||||||
|
|
||||||
And restart PostgreSQL.
|
And restart PostgreSQL.
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
-- can use SQL for demo purposes
|
|
||||||
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
|
||||||
|
|
||||||
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
```sql
|
||||||
|
-- can use SQL for demo purposes
|
||||||
|
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
||||||
|
|
||||||
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
||||||
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
|
||||||
-- get WAL changes using decoderbufs to update the WAL position
|
|
||||||
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
|
||||||
|
|
||||||
-- check the WAL position of logical replicators
|
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
||||||
select * from pg_replication_slots where slot_type = 'logical';
|
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||||
|
-- get WAL changes using decoderbufs to update the WAL position
|
||||||
|
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||||
|
|
||||||
|
-- check the WAL position of logical replicators
|
||||||
|
select * from pg_replication_slots where slot_type = 'logical';
|
||||||
|
```
|
||||||
|
|
||||||
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
|
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,12 @@ option java_outer_classname = "PgProto";
|
||||||
option optimize_for = SPEED;
|
option optimize_for = SPEED;
|
||||||
|
|
||||||
enum Op {
|
enum Op {
|
||||||
|
UNKNOWN = -1;
|
||||||
INSERT = 0;
|
INSERT = 0;
|
||||||
UPDATE = 1;
|
UPDATE = 1;
|
||||||
DELETE = 2;
|
DELETE = 2;
|
||||||
|
BEGIN = 3;
|
||||||
|
COMMIT = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Point {
|
message Point {
|
||||||
|
@ -27,9 +30,15 @@ message DatumMessage {
|
||||||
string datum_string = 8;
|
string datum_string = 8;
|
||||||
bytes datum_bytes = 9;
|
bytes datum_bytes = 9;
|
||||||
Point datum_point = 10;
|
Point datum_point = 10;
|
||||||
|
bool datum_missing = 11;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message TypeInfo {
|
||||||
|
required string modifier = 1;
|
||||||
|
required bool value_optional = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message RowMessage {
|
message RowMessage {
|
||||||
optional uint32 transaction_id = 1;
|
optional uint32 transaction_id = 1;
|
||||||
optional uint64 commit_time = 2;
|
optional uint64 commit_time = 2;
|
||||||
|
@ -37,4 +46,5 @@ message RowMessage {
|
||||||
optional Op op = 4;
|
optional Op op = 4;
|
||||||
repeated DatumMessage new_tuple = 5;
|
repeated DatumMessage new_tuple = 5;
|
||||||
repeated DatumMessage old_tuple = 6;
|
repeated DatumMessage old_tuple = 6;
|
||||||
|
repeated TypeInfo new_typeinfo = 7;
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
Name: postgres-decoderbufs
|
||||||
|
Version: 0.10.0
|
||||||
|
Release: 1%{?dist}
|
||||||
|
Summary: PostgreSQL Protocol Buffers logical decoder plugin
|
||||||
|
|
||||||
|
License: MIT
|
||||||
|
URL: https://github.com/debezium/postgres-decoderbufs
|
||||||
|
|
||||||
|
%global full_version %{version}.Final
|
||||||
|
|
||||||
|
Source0: https://github.com/debezium/%{name}/archive/v%{full_version}.tar.gz
|
||||||
|
|
||||||
|
BuildRequires: gcc
|
||||||
|
BuildRequires: postgresql-devel >= 9.6, postgresql-server-devel >= 9.6
|
||||||
|
BuildRequires: postgis-devel >= 2
|
||||||
|
BuildRequires: protobuf-c-devel
|
||||||
|
|
||||||
|
Requires: protobuf-c
|
||||||
|
%{?postgresql_module_requires}
|
||||||
|
|
||||||
|
Recommends: postgis
|
||||||
|
|
||||||
|
%description
|
||||||
|
A PostgreSQL logical decoder output plugin to deliver data as Protocol Buffers messages.
|
||||||
|
|
||||||
|
%prep
|
||||||
|
%setup -qn postgres-decoderbufs-%{full_version}
|
||||||
|
|
||||||
|
|
||||||
|
%build
|
||||||
|
%make_build
|
||||||
|
|
||||||
|
|
||||||
|
%install
|
||||||
|
%make_install
|
||||||
|
|
||||||
|
|
||||||
|
%files
|
||||||
|
%doc README.md
|
||||||
|
%license LICENSE
|
||||||
|
%{_libdir}/pgsql/decoderbufs.so
|
||||||
|
%{_datadir}/pgsql/extension/decoderbufs.control
|
||||||
|
|
||||||
|
|
||||||
|
%changelog
|
||||||
|
* Wed Oct 9 2019 - Jiri Pechanec <jpechane@redhat.com> 0.10.0-1
|
||||||
|
* Tue May 14 2019 - Jiri Pechanec <jpechane@redhat.com> 0.9.5-1
|
||||||
|
- Initial RPM packaging
|
|
@ -60,10 +60,10 @@
|
||||||
#error Expecting timestamps to be represented as integers, not as floating-point.
|
#error Expecting timestamps to be represented as integers, not as floating-point.
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#ifdef USE_POSTGIS
|
#if PG_VERSION_NUM >= 170000
|
||||||
/* POSTGIS version define so it doesn't redef macros */
|
#define TUPLE_ACCESS(x) x
|
||||||
#define POSTGIS_PGSQL_VERSION 94
|
#else
|
||||||
#include "liblwgeom.h"
|
#define TUPLE_ACCESS(x) &x->tuple
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
@ -80,10 +80,6 @@ typedef struct {
|
||||||
bool debug_mode;
|
bool debug_mode;
|
||||||
} DecoderData;
|
} DecoderData;
|
||||||
|
|
||||||
/* GLOBALs for PostGIS dynamic OIDs */
|
|
||||||
Oid geometry_oid = InvalidOid;
|
|
||||||
Oid geography_oid = InvalidOid;
|
|
||||||
|
|
||||||
/* these must be available to pg_dlsym() */
|
/* these must be available to pg_dlsym() */
|
||||||
extern void _PG_init(void);
|
extern void _PG_init(void);
|
||||||
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
extern void _PG_output_plugin_init(OutputPluginCallbacks *cb);
|
||||||
|
@ -119,9 +115,14 @@ static void pg_decode_startup(LogicalDecodingContext *ctx,
|
||||||
elog(DEBUG1, "Entering startup callback");
|
elog(DEBUG1, "Entering startup callback");
|
||||||
|
|
||||||
data = palloc(sizeof(DecoderData));
|
data = palloc(sizeof(DecoderData));
|
||||||
|
#if PG_VERSION_NUM >= 90600
|
||||||
|
data->context = AllocSetContextCreate(
|
||||||
|
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_SIZES);
|
||||||
|
#else
|
||||||
data->context = AllocSetContextCreate(
|
data->context = AllocSetContextCreate(
|
||||||
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
|
ctx->context, "decoderbufs context", ALLOCSET_DEFAULT_MINSIZE,
|
||||||
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
|
ALLOCSET_DEFAULT_INITSIZE, ALLOCSET_DEFAULT_MAXSIZE);
|
||||||
|
#endif
|
||||||
data->debug_mode = false;
|
data->debug_mode = false;
|
||||||
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
opt->output_type = OUTPUT_PLUGIN_BINARY_OUTPUT;
|
||||||
|
|
||||||
|
@ -169,102 +170,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
|
||||||
MemoryContextDelete(data->context);
|
MemoryContextDelete(data->context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* BEGIN callback */
|
|
||||||
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|
||||||
ReorderBufferTXN *txn) {
|
|
||||||
#ifdef USE_POSTGIS
|
|
||||||
// set PostGIS geometry type id (these are dynamic)
|
|
||||||
// TODO: Figure out how to make sure we get the typid's from postgis extension namespace
|
|
||||||
if (geometry_oid == InvalidOid) {
|
|
||||||
geometry_oid = TypenameGetTypid("geometry");
|
|
||||||
if (geometry_oid != InvalidOid) {
|
|
||||||
elog(DEBUG1, "PostGIS geometry type detected: %u", geometry_oid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (geography_oid == InvalidOid) {
|
|
||||||
geography_oid = TypenameGetTypid("geography");
|
|
||||||
if (geography_oid != InvalidOid) {
|
|
||||||
elog(DEBUG1, "PostGIS geometry type detected: %u", geography_oid);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
/* COMMIT callback */
|
|
||||||
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|
||||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/* convenience method to free up sub-messages */
|
|
||||||
static void row_message_destroy(Decoderbufs__RowMessage *msg) {
|
|
||||||
if (!msg) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg->table) {
|
|
||||||
pfree(msg->table);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (msg->n_new_tuple > 0) {
|
|
||||||
for (int i = 0; i < msg->n_new_tuple; i++) {
|
|
||||||
if (msg->new_tuple[i]) {
|
|
||||||
switch (msg->new_tuple[i]->datum_case) {
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
|
|
||||||
if (msg->new_tuple[i]->datum_string) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_string);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
|
|
||||||
if (msg->new_tuple[i]->datum_bytes.data) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_bytes.data);
|
|
||||||
msg->new_tuple[i]->datum_bytes.data = NULL;
|
|
||||||
msg->new_tuple[i]->datum_bytes.len = 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
|
|
||||||
if (msg->new_tuple[i]->datum_point) {
|
|
||||||
pfree(msg->new_tuple[i]->datum_point);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pfree(msg->new_tuple[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pfree(msg->new_tuple);
|
|
||||||
}
|
|
||||||
if (msg->n_old_tuple > 0) {
|
|
||||||
for (int i = 0; i < msg->n_old_tuple; i++) {
|
|
||||||
if (msg->old_tuple[i]) {
|
|
||||||
switch (msg->old_tuple[i]->datum_case) {
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING:
|
|
||||||
if (msg->old_tuple[i]->datum_string) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_string);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES:
|
|
||||||
if (msg->old_tuple[i]->datum_bytes.data) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_bytes.data);
|
|
||||||
msg->old_tuple[i]->datum_bytes.data = NULL;
|
|
||||||
msg->old_tuple[i]->datum_bytes.len = 0;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT:
|
|
||||||
if (msg->old_tuple[i]->datum_point) {
|
|
||||||
pfree(msg->old_tuple[i]->datum_point);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
pfree(msg->old_tuple[i]);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pfree(msg->old_tuple);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/* print tuple datums (only used for debug-mode) */
|
/* print tuple datums (only used for debug-mode) */
|
||||||
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
||||||
size_t n) {
|
size_t n) {
|
||||||
|
@ -345,76 +250,14 @@ static void print_row_msg(StringInfo out, Decoderbufs__RowMessage *rmsg) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/* this doesn't seem to be available in the public api (unfortunate) */
|
|
||||||
static double numeric_to_double_no_overflow(Numeric num) {
|
|
||||||
char *tmp;
|
|
||||||
double val;
|
|
||||||
char *endptr;
|
|
||||||
|
|
||||||
tmp = DatumGetCString(DirectFunctionCall1(numeric_out, NumericGetDatum(num)));
|
|
||||||
|
|
||||||
/* unlike float8in, we ignore ERANGE from strtod */
|
|
||||||
val = strtod(tmp, &endptr);
|
|
||||||
if (*endptr != '\0') {
|
|
||||||
/* shouldn't happen ... */
|
|
||||||
ereport(ERROR,
|
|
||||||
(errcode(ERRCODE_INVALID_TEXT_REPRESENTATION),
|
|
||||||
errmsg("invalid input syntax for type double precision: \"%s\"",
|
|
||||||
tmp)));
|
|
||||||
}
|
|
||||||
|
|
||||||
pfree(tmp);
|
|
||||||
|
|
||||||
return val;
|
|
||||||
}
|
|
||||||
|
|
||||||
static bool geography_point_as_decoderbufs_point(Datum datum,
|
|
||||||
Decoderbufs__Point *p) {
|
|
||||||
#ifdef USE_POSTGIS
|
|
||||||
GSERIALIZED *geom;
|
|
||||||
LWGEOM *lwgeom;
|
|
||||||
LWPOINT *point = NULL;
|
|
||||||
POINT2D p2d;
|
|
||||||
|
|
||||||
geom = (GSERIALIZED *)PG_DETOAST_DATUM(datum);
|
|
||||||
if (gserialized_get_type(geom) != POINTTYPE) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
lwgeom = lwgeom_from_gserialized(geom);
|
|
||||||
point = lwgeom_as_lwpoint(lwgeom);
|
|
||||||
if (lwgeom_is_empty(lwgeom)) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
getPoint2d_p(point->point, 0, &p2d);
|
|
||||||
|
|
||||||
if (p != NULL) {
|
|
||||||
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
|
|
||||||
dp.x = p2d.x;
|
|
||||||
dp.y = p2d.y;
|
|
||||||
memcpy(p, &dp, sizeof(dp));
|
|
||||||
elog(DEBUG1, "Translating geography to point: (x,y) = (%f,%f)", p->x, p->y);
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
#else
|
|
||||||
elog(DEBUG1, "PostGIS support is off, recompile decoderbufs with USE_POSTGIS option!");
|
|
||||||
return false;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
/* set a datum value based on its OID specified by typid */
|
/* set a datum value based on its OID specified by typid */
|
||||||
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
Oid typoutput, Datum datum) {
|
Oid typoutput, Datum datum) {
|
||||||
Numeric num;
|
|
||||||
bytea *valptr = NULL;
|
bytea *valptr = NULL;
|
||||||
const char *output = NULL;
|
const char *output = NULL;
|
||||||
Point *p = NULL;
|
Point *p = NULL;
|
||||||
Timestamp ts;
|
Timestamp ts;
|
||||||
double duration;
|
|
||||||
TimeTzADT *timetz = NULL;
|
TimeTzADT *timetz = NULL;
|
||||||
Interval *interval = NULL;
|
|
||||||
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
|
Decoderbufs__Point dp = DECODERBUFS__POINT__INIT;
|
||||||
|
|
||||||
int size = 0;
|
int size = 0;
|
||||||
|
@ -432,10 +275,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
||||||
break;
|
break;
|
||||||
case INT8OID:
|
case INT8OID:
|
||||||
case OIDOID:
|
|
||||||
datum_msg->datum_int64 = DatumGetInt64(datum);
|
datum_msg->datum_int64 = DatumGetInt64(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
break;
|
break;
|
||||||
|
case OIDOID:
|
||||||
|
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
|
||||||
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
|
break;
|
||||||
case FLOAT4OID:
|
case FLOAT4OID:
|
||||||
datum_msg->datum_float = DatumGetFloat4(datum);
|
datum_msg->datum_float = DatumGetFloat4(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
|
||||||
|
@ -444,17 +290,11 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
datum_msg->datum_double = DatumGetFloat8(datum);
|
datum_msg->datum_double = DatumGetFloat8(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
||||||
break;
|
break;
|
||||||
case NUMERICOID:
|
|
||||||
num = DatumGetNumeric(datum);
|
|
||||||
if (!numeric_is_nan(num)) {
|
|
||||||
datum_msg->datum_double = numeric_to_double_no_overflow(num);
|
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case CASHOID:
|
case CASHOID:
|
||||||
datum_msg->datum_int64 = DatumGetCash(datum);
|
datum_msg->datum_int64 = DatumGetCash(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
break;
|
break;
|
||||||
|
case NUMERICOID:
|
||||||
case CHAROID:
|
case CHAROID:
|
||||||
case VARCHAROID:
|
case VARCHAROID:
|
||||||
case BPCHAROID:
|
case BPCHAROID:
|
||||||
|
@ -465,6 +305,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
case BITOID:
|
case BITOID:
|
||||||
case VARBITOID:
|
case VARBITOID:
|
||||||
case UUIDOID:
|
case UUIDOID:
|
||||||
|
case INTERVALOID:
|
||||||
output = OidOutputFunctionCall(typoutput, datum);
|
output = OidOutputFunctionCall(typoutput, datum);
|
||||||
datum_msg->datum_string = pnstrdup(output, strlen(output));
|
datum_msg->datum_string = pnstrdup(output, strlen(output));
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING;
|
||||||
|
@ -472,23 +313,21 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
case TIMESTAMPOID:
|
case TIMESTAMPOID:
|
||||||
ts = DatumGetTimestamp(datum);
|
ts = DatumGetTimestamp(datum);
|
||||||
if (TIMESTAMP_NOT_FINITE(ts)) {
|
if (TIMESTAMP_NOT_FINITE(ts)) {
|
||||||
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
|
datum_msg->datum_int64 = ts;
|
||||||
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
|
|
||||||
} else {
|
} else {
|
||||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
|
break;
|
||||||
case TIMESTAMPTZOID:
|
case TIMESTAMPTZOID:
|
||||||
ts = DatumGetTimestampTz(datum);
|
ts = DatumGetTimestampTz(datum);
|
||||||
if (TIMESTAMP_NOT_FINITE(ts)) {
|
if (TIMESTAMP_NOT_FINITE(ts)) {
|
||||||
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
|
datum_msg->datum_int64 = ts;
|
||||||
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
|
} else {
|
||||||
} else {
|
|
||||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
}
|
||||||
break;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
}
|
break;
|
||||||
case DATEOID:
|
case DATEOID:
|
||||||
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
|
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
|
||||||
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
|
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
|
||||||
|
@ -504,15 +343,6 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
|
datum_msg->datum_double = (double) (timetz->time + (timetz->zone * 1000000.0));
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
||||||
break;
|
break;
|
||||||
case INTERVALOID:
|
|
||||||
interval = DatumGetIntervalP(datum);
|
|
||||||
/*
|
|
||||||
Convert the month part of Interval to days using assumed average month length of 365.25/12.0 days.
|
|
||||||
*/
|
|
||||||
duration = interval->time + interval->day * (double) USECS_PER_DAY + interval->month * ((DAYS_PER_YEAR / (double) MONTHS_PER_YEAR) * USECS_PER_DAY);
|
|
||||||
datum_msg->datum_double = duration;
|
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_DOUBLE;
|
|
||||||
break;
|
|
||||||
case BYTEAOID:
|
case BYTEAOID:
|
||||||
valptr = DatumGetByteaPCopy(datum);
|
valptr = DatumGetByteaPCopy(datum);
|
||||||
size = VARSIZE(valptr) - VARHDRSZ;
|
size = VARSIZE(valptr) - VARHDRSZ;
|
||||||
|
@ -530,13 +360,7 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
// PostGIS uses dynamic OIDs so we need to check the type again here
|
{
|
||||||
if (typid == geometry_oid || typid == geography_oid) {
|
|
||||||
elog(DEBUG1, "Converting geography point to datum_point");
|
|
||||||
datum_msg->datum_point = palloc(sizeof(Decoderbufs__Point));
|
|
||||||
geography_point_as_decoderbufs_point(datum, datum_msg->datum_point);
|
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT;
|
|
||||||
} else {
|
|
||||||
int len;
|
int len;
|
||||||
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid);
|
elog(DEBUG1, "Encountered unknown typid: %d, using bytes", typid);
|
||||||
output = OidOutputFunctionCall(typoutput, datum);
|
output = OidOutputFunctionCall(typoutput, datum);
|
||||||
|
@ -556,7 +380,7 @@ static int valid_attributes_count_from(TupleDesc tupdesc) {
|
||||||
int natt;
|
int natt;
|
||||||
int count = 0;
|
int count = 0;
|
||||||
for (natt = 0; natt < tupdesc->natts; natt++) {
|
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||||
Form_pg_attribute attr = tupdesc->attrs[natt];
|
Form_pg_attribute attr = TupleDescAttr(tupdesc, natt);
|
||||||
|
|
||||||
/* skip dropped columns and system columns */
|
/* skip dropped columns and system columns */
|
||||||
if (attr->attisdropped || attr->attnum < 0) {
|
if (attr->attisdropped || attr->attnum < 0) {
|
||||||
|
@ -584,7 +408,7 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||||
bool typisvarlena;
|
bool typisvarlena;
|
||||||
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
|
Decoderbufs__DatumMessage datum_msg = DECODERBUFS__DATUM_MESSAGE__INIT;
|
||||||
|
|
||||||
attr = tupdesc->attrs[natt];
|
attr = TupleDescAttr(tupdesc, natt);
|
||||||
|
|
||||||
/* skip dropped columns and system columns */
|
/* skip dropped columns and system columns */
|
||||||
if (attr->attisdropped || attr->attnum < 0) {
|
if (attr->attisdropped || attr->attnum < 0) {
|
||||||
|
@ -609,7 +433,8 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||||
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
|
getTypeOutputInfo(attr->atttypid, &typoutput, &typisvarlena);
|
||||||
if (!isnull) {
|
if (!isnull) {
|
||||||
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
|
if (typisvarlena && VARATT_IS_EXTERNAL_ONDISK(origval)) {
|
||||||
// TODO: Is there a way we can handle this?
|
datum_msg.datum_missing = true;
|
||||||
|
datum_msg.datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING;
|
||||||
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
|
elog(DEBUG1, "Not handling external on disk varlena at the moment.");
|
||||||
} else if (!typisvarlena) {
|
} else if (!typisvarlena) {
|
||||||
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
|
set_datum_value(&datum_msg, attr->atttypid, typoutput, origval);
|
||||||
|
@ -628,6 +453,125 @@ static void tuple_to_tuple_msg(Decoderbufs__DatumMessage **tmsg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* provide a metadata for new tuple */
|
||||||
|
static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
|
||||||
|
Relation relation, HeapTuple tuple,
|
||||||
|
TupleDesc tupdesc) {
|
||||||
|
int natt;
|
||||||
|
int valid_attr_cnt = 0;
|
||||||
|
elog(DEBUG1, "Adding metadata for %d columns", tupdesc->natts);
|
||||||
|
/* build column names and values */
|
||||||
|
for (natt = 0; natt < tupdesc->natts; natt++) {
|
||||||
|
Form_pg_attribute attr;
|
||||||
|
char *typ_mod;
|
||||||
|
Decoderbufs__TypeInfo typeinfo = DECODERBUFS__TYPE_INFO__INIT;
|
||||||
|
bool not_null;
|
||||||
|
|
||||||
|
attr = TupleDescAttr(tupdesc, natt);
|
||||||
|
|
||||||
|
/* skip dropped columns and system columns */
|
||||||
|
if (attr->attisdropped || attr->attnum < 0) {
|
||||||
|
elog(DEBUG1, "skipping column %d because %s", natt + 1, attr->attisdropped ? "it's a dropped column" : "it's a system column");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
not_null = attr->attnotnull;
|
||||||
|
typ_mod = TextDatumGetCString(DirectFunctionCall2(format_type, attr->atttypid, attr->atttypmod));
|
||||||
|
elog(DEBUG1, "Adding typemodifier '%s' for column %d, optional %s", typ_mod, natt, !not_null ? "true" : "false");
|
||||||
|
|
||||||
|
typeinfo.modifier = typ_mod;
|
||||||
|
typeinfo.value_optional = !not_null;
|
||||||
|
tmsg[valid_attr_cnt] = palloc(sizeof(typeinfo));
|
||||||
|
memcpy(tmsg[valid_attr_cnt], &typeinfo, sizeof(typeinfo));
|
||||||
|
|
||||||
|
valid_attr_cnt++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* BEGIN callback */
|
||||||
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn) {
|
||||||
|
|
||||||
|
DecoderData *data;
|
||||||
|
MemoryContext old;
|
||||||
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
elog(DEBUG1, "Entering begin callback");
|
||||||
|
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
|
rmsg.op = DECODERBUFS__OP__BEGIN;
|
||||||
|
rmsg.has_op = true;
|
||||||
|
rmsg.transaction_id = txn->xid;
|
||||||
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
|
rmsg.has_commit_time = true;
|
||||||
|
|
||||||
|
/* write msg */
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
if (data->debug_mode) {
|
||||||
|
print_row_msg(ctx->out, &rmsg);
|
||||||
|
} else {
|
||||||
|
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||||
|
void *packed = palloc(psize);
|
||||||
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
|
}
|
||||||
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
|
/* Cleanup, freeing memory */
|
||||||
|
MemoryContextSwitchTo(old);
|
||||||
|
MemoryContextReset(data->context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* COMMIT callback */
|
||||||
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
||||||
|
|
||||||
|
DecoderData *data;
|
||||||
|
MemoryContext old;
|
||||||
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
elog(DEBUG1, "Entering commit callback");
|
||||||
|
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
|
rmsg.op = DECODERBUFS__OP__COMMIT;
|
||||||
|
rmsg.has_op = true;
|
||||||
|
rmsg.transaction_id = txn->xid;
|
||||||
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
|
rmsg.has_commit_time = true;
|
||||||
|
|
||||||
|
/* write msg */
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
if (data->debug_mode) {
|
||||||
|
print_row_msg(ctx->out, &rmsg);
|
||||||
|
} else {
|
||||||
|
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||||
|
void *packed = palloc(psize);
|
||||||
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
|
}
|
||||||
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
|
/* Cleanup, freeing memory */
|
||||||
|
MemoryContextSwitchTo(old);
|
||||||
|
MemoryContextReset(data->context);
|
||||||
|
}
|
||||||
|
|
||||||
/* callback for individual changed tuples */
|
/* callback for individual changed tuples */
|
||||||
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
Relation relation, ReorderBufferChange *change) {
|
Relation relation, ReorderBufferChange *change) {
|
||||||
|
@ -641,17 +585,16 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
TupleDesc tupdesc;
|
TupleDesc tupdesc;
|
||||||
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
|
||||||
elog(DEBUG1, "Entering decode_change callback");
|
elog(DEBUG1, "Entering decode_change callback");
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
replident = relation->rd_rel->relreplident;
|
replident = relation->rd_rel->relreplident;
|
||||||
|
|
||||||
class_form = RelationGetForm(relation);
|
class_form = RelationGetForm(relation);
|
||||||
|
|
||||||
data = ctx->output_plugin_private;
|
|
||||||
|
|
||||||
/* Avoid leaking memory by using and resetting our own context */
|
|
||||||
old = MemoryContextSwitchTo(data->context);
|
|
||||||
|
|
||||||
RelationGetIndexList(relation);
|
RelationGetIndexList(relation);
|
||||||
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
|
is_rel_non_selective = (replident == REPLICA_IDENTITY_NOTHING ||
|
||||||
(replident == REPLICA_IDENTITY_DEFAULT &&
|
(replident == REPLICA_IDENTITY_DEFAULT &&
|
||||||
|
@ -661,7 +604,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
/* set common fields */
|
/* set common fields */
|
||||||
rmsg.transaction_id = txn->xid;
|
rmsg.transaction_id = txn->xid;
|
||||||
rmsg.has_transaction_id = true;
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
rmsg.has_commit_time = true;
|
rmsg.has_commit_time = true;
|
||||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||||
NameStr(class_form->relname)));
|
NameStr(class_form->relname)));
|
||||||
|
@ -676,11 +623,18 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
if (change->data.tp.newtuple != NULL) {
|
if (change->data.tp.newtuple != NULL) {
|
||||||
elog(DEBUG1, "decoding new tuple information");
|
elog(DEBUG1, "decoding new tuple information");
|
||||||
tupdesc = RelationGetDescr(relation);
|
tupdesc = RelationGetDescr(relation);
|
||||||
|
|
||||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||||
rmsg.new_tuple =
|
rmsg.new_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
|
|
||||||
|
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||||
|
rmsg.new_typeinfo =
|
||||||
|
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||||
|
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||||
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||||
|
@ -695,16 +649,23 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.old_tuple =
|
rmsg.old_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||||
}
|
}
|
||||||
if (change->data.tp.newtuple != NULL) {
|
if (change->data.tp.newtuple != NULL) {
|
||||||
elog(DEBUG1, "decoding new tuple information");
|
elog(DEBUG1, "decoding new tuple information");
|
||||||
tupdesc = RelationGetDescr(relation);
|
tupdesc = RelationGetDescr(relation);
|
||||||
|
|
||||||
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
rmsg.n_new_tuple = valid_attributes_count_from(tupdesc);
|
||||||
rmsg.new_tuple =
|
rmsg.new_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
|
|
||||||
|
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||||
|
rmsg.new_typeinfo =
|
||||||
|
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||||
|
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||||
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -720,7 +681,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.old_tuple =
|
rmsg.old_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||||
} else {
|
} else {
|
||||||
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
||||||
}
|
}
|
||||||
|
@ -741,14 +702,10 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
void *packed = palloc(psize);
|
void *packed = palloc(psize);
|
||||||
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
appendBinaryStringInfo(ctx->out, packed, ssize);
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
/* free packed buffer */
|
|
||||||
pfree(packed);
|
|
||||||
}
|
}
|
||||||
OutputPluginWrite(ctx, true);
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
/* cleanup msg */
|
/* Cleanup, freeing memory */
|
||||||
row_message_destroy(&rmsg);
|
|
||||||
|
|
||||||
MemoryContextSwitchTo(old);
|
MemoryContextSwitchTo(old);
|
||||||
MemoryContextReset(data->context);
|
MemoryContextReset(data->context);
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,6 +93,49 @@ void decoderbufs__datum_message__free_unpacked
|
||||||
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
|
assert(message->base.descriptor == &decoderbufs__datum_message__descriptor);
|
||||||
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
|
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
|
||||||
}
|
}
|
||||||
|
void decoderbufs__type_info__init
|
||||||
|
(Decoderbufs__TypeInfo *message)
|
||||||
|
{
|
||||||
|
static Decoderbufs__TypeInfo init_value = DECODERBUFS__TYPE_INFO__INIT;
|
||||||
|
*message = init_value;
|
||||||
|
}
|
||||||
|
size_t decoderbufs__type_info__get_packed_size
|
||||||
|
(const Decoderbufs__TypeInfo *message)
|
||||||
|
{
|
||||||
|
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||||
|
return protobuf_c_message_get_packed_size ((const ProtobufCMessage*)(message));
|
||||||
|
}
|
||||||
|
size_t decoderbufs__type_info__pack
|
||||||
|
(const Decoderbufs__TypeInfo *message,
|
||||||
|
uint8_t *out)
|
||||||
|
{
|
||||||
|
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||||
|
return protobuf_c_message_pack ((const ProtobufCMessage*)message, out);
|
||||||
|
}
|
||||||
|
size_t decoderbufs__type_info__pack_to_buffer
|
||||||
|
(const Decoderbufs__TypeInfo *message,
|
||||||
|
ProtobufCBuffer *buffer)
|
||||||
|
{
|
||||||
|
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||||
|
return protobuf_c_message_pack_to_buffer ((const ProtobufCMessage*)message, buffer);
|
||||||
|
}
|
||||||
|
Decoderbufs__TypeInfo *
|
||||||
|
decoderbufs__type_info__unpack
|
||||||
|
(ProtobufCAllocator *allocator,
|
||||||
|
size_t len,
|
||||||
|
const uint8_t *data)
|
||||||
|
{
|
||||||
|
return (Decoderbufs__TypeInfo *)
|
||||||
|
protobuf_c_message_unpack (&decoderbufs__type_info__descriptor,
|
||||||
|
allocator, len, data);
|
||||||
|
}
|
||||||
|
void decoderbufs__type_info__free_unpacked
|
||||||
|
(Decoderbufs__TypeInfo *message,
|
||||||
|
ProtobufCAllocator *allocator)
|
||||||
|
{
|
||||||
|
assert(message->base.descriptor == &decoderbufs__type_info__descriptor);
|
||||||
|
protobuf_c_message_free_unpacked ((ProtobufCMessage*)message, allocator);
|
||||||
|
}
|
||||||
void decoderbufs__row_message__init
|
void decoderbufs__row_message__init
|
||||||
(Decoderbufs__RowMessage *message)
|
(Decoderbufs__RowMessage *message)
|
||||||
{
|
{
|
||||||
|
@ -187,7 +230,7 @@ const ProtobufCMessageDescriptor decoderbufs__point__descriptor =
|
||||||
(ProtobufCMessageInit) decoderbufs__point__init,
|
(ProtobufCMessageInit) decoderbufs__point__init,
|
||||||
NULL,NULL,NULL /* reserved[123] */
|
NULL,NULL,NULL /* reserved[123] */
|
||||||
};
|
};
|
||||||
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[10] =
|
static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descriptors[11] =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
"column_name",
|
"column_name",
|
||||||
|
@ -309,6 +352,18 @@ static const ProtobufCFieldDescriptor decoderbufs__datum_message__field_descript
|
||||||
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
|
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
|
||||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"datum_missing",
|
||||||
|
11,
|
||||||
|
PROTOBUF_C_LABEL_OPTIONAL,
|
||||||
|
PROTOBUF_C_TYPE_BOOL,
|
||||||
|
offsetof(Decoderbufs__DatumMessage, datum_case),
|
||||||
|
offsetof(Decoderbufs__DatumMessage, datum_missing),
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
0 | PROTOBUF_C_FIELD_FLAG_ONEOF, /* flags */
|
||||||
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
|
},
|
||||||
};
|
};
|
||||||
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
|
static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
|
||||||
0, /* field[0] = column_name */
|
0, /* field[0] = column_name */
|
||||||
|
@ -319,13 +374,14 @@ static const unsigned decoderbufs__datum_message__field_indices_by_name[] = {
|
||||||
4, /* field[4] = datum_float */
|
4, /* field[4] = datum_float */
|
||||||
2, /* field[2] = datum_int32 */
|
2, /* field[2] = datum_int32 */
|
||||||
3, /* field[3] = datum_int64 */
|
3, /* field[3] = datum_int64 */
|
||||||
|
10, /* field[10] = datum_missing */
|
||||||
9, /* field[9] = datum_point */
|
9, /* field[9] = datum_point */
|
||||||
7, /* field[7] = datum_string */
|
7, /* field[7] = datum_string */
|
||||||
};
|
};
|
||||||
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
|
static const ProtobufCIntRange decoderbufs__datum_message__number_ranges[1 + 1] =
|
||||||
{
|
{
|
||||||
{ 1, 0 },
|
{ 1, 0 },
|
||||||
{ 0, 10 }
|
{ 0, 11 }
|
||||||
};
|
};
|
||||||
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
||||||
{
|
{
|
||||||
|
@ -335,14 +391,65 @@ const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor =
|
||||||
"Decoderbufs__DatumMessage",
|
"Decoderbufs__DatumMessage",
|
||||||
"decoderbufs",
|
"decoderbufs",
|
||||||
sizeof(Decoderbufs__DatumMessage),
|
sizeof(Decoderbufs__DatumMessage),
|
||||||
10,
|
11,
|
||||||
decoderbufs__datum_message__field_descriptors,
|
decoderbufs__datum_message__field_descriptors,
|
||||||
decoderbufs__datum_message__field_indices_by_name,
|
decoderbufs__datum_message__field_indices_by_name,
|
||||||
1, decoderbufs__datum_message__number_ranges,
|
1, decoderbufs__datum_message__number_ranges,
|
||||||
(ProtobufCMessageInit) decoderbufs__datum_message__init,
|
(ProtobufCMessageInit) decoderbufs__datum_message__init,
|
||||||
NULL,NULL,NULL /* reserved[123] */
|
NULL,NULL,NULL /* reserved[123] */
|
||||||
};
|
};
|
||||||
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[6] =
|
static const ProtobufCFieldDescriptor decoderbufs__type_info__field_descriptors[2] =
|
||||||
|
{
|
||||||
|
{
|
||||||
|
"modifier",
|
||||||
|
1,
|
||||||
|
PROTOBUF_C_LABEL_REQUIRED,
|
||||||
|
PROTOBUF_C_TYPE_STRING,
|
||||||
|
0, /* quantifier_offset */
|
||||||
|
offsetof(Decoderbufs__TypeInfo, modifier),
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
0, /* flags */
|
||||||
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"value_optional",
|
||||||
|
2,
|
||||||
|
PROTOBUF_C_LABEL_REQUIRED,
|
||||||
|
PROTOBUF_C_TYPE_BOOL,
|
||||||
|
0, /* quantifier_offset */
|
||||||
|
offsetof(Decoderbufs__TypeInfo, value_optional),
|
||||||
|
NULL,
|
||||||
|
NULL,
|
||||||
|
0, /* flags */
|
||||||
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
|
},
|
||||||
|
};
|
||||||
|
static const unsigned decoderbufs__type_info__field_indices_by_name[] = {
|
||||||
|
0, /* field[0] = modifier */
|
||||||
|
1, /* field[1] = value_optional */
|
||||||
|
};
|
||||||
|
static const ProtobufCIntRange decoderbufs__type_info__number_ranges[1 + 1] =
|
||||||
|
{
|
||||||
|
{ 1, 0 },
|
||||||
|
{ 0, 2 }
|
||||||
|
};
|
||||||
|
const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor =
|
||||||
|
{
|
||||||
|
PROTOBUF_C__MESSAGE_DESCRIPTOR_MAGIC,
|
||||||
|
"decoderbufs.TypeInfo",
|
||||||
|
"TypeInfo",
|
||||||
|
"Decoderbufs__TypeInfo",
|
||||||
|
"decoderbufs",
|
||||||
|
sizeof(Decoderbufs__TypeInfo),
|
||||||
|
2,
|
||||||
|
decoderbufs__type_info__field_descriptors,
|
||||||
|
decoderbufs__type_info__field_indices_by_name,
|
||||||
|
1, decoderbufs__type_info__number_ranges,
|
||||||
|
(ProtobufCMessageInit) decoderbufs__type_info__init,
|
||||||
|
NULL,NULL,NULL /* reserved[123] */
|
||||||
|
};
|
||||||
|
static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptors[7] =
|
||||||
{
|
{
|
||||||
{
|
{
|
||||||
"transaction_id",
|
"transaction_id",
|
||||||
|
@ -416,10 +523,23 @@ static const ProtobufCFieldDescriptor decoderbufs__row_message__field_descriptor
|
||||||
0, /* flags */
|
0, /* flags */
|
||||||
0,NULL,NULL /* reserved1,reserved2, etc */
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
"new_typeinfo",
|
||||||
|
7,
|
||||||
|
PROTOBUF_C_LABEL_REPEATED,
|
||||||
|
PROTOBUF_C_TYPE_MESSAGE,
|
||||||
|
offsetof(Decoderbufs__RowMessage, n_new_typeinfo),
|
||||||
|
offsetof(Decoderbufs__RowMessage, new_typeinfo),
|
||||||
|
&decoderbufs__type_info__descriptor,
|
||||||
|
NULL,
|
||||||
|
0, /* flags */
|
||||||
|
0,NULL,NULL /* reserved1,reserved2, etc */
|
||||||
|
},
|
||||||
};
|
};
|
||||||
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
|
static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
|
||||||
1, /* field[1] = commit_time */
|
1, /* field[1] = commit_time */
|
||||||
4, /* field[4] = new_tuple */
|
4, /* field[4] = new_tuple */
|
||||||
|
6, /* field[6] = new_typeinfo */
|
||||||
5, /* field[5] = old_tuple */
|
5, /* field[5] = old_tuple */
|
||||||
3, /* field[3] = op */
|
3, /* field[3] = op */
|
||||||
2, /* field[2] = table */
|
2, /* field[2] = table */
|
||||||
|
@ -428,7 +548,7 @@ static const unsigned decoderbufs__row_message__field_indices_by_name[] = {
|
||||||
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
|
static const ProtobufCIntRange decoderbufs__row_message__number_ranges[1 + 1] =
|
||||||
{
|
{
|
||||||
{ 1, 0 },
|
{ 1, 0 },
|
||||||
{ 0, 6 }
|
{ 0, 7 }
|
||||||
};
|
};
|
||||||
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
||||||
{
|
{
|
||||||
|
@ -438,27 +558,33 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
||||||
"Decoderbufs__RowMessage",
|
"Decoderbufs__RowMessage",
|
||||||
"decoderbufs",
|
"decoderbufs",
|
||||||
sizeof(Decoderbufs__RowMessage),
|
sizeof(Decoderbufs__RowMessage),
|
||||||
6,
|
7,
|
||||||
decoderbufs__row_message__field_descriptors,
|
decoderbufs__row_message__field_descriptors,
|
||||||
decoderbufs__row_message__field_indices_by_name,
|
decoderbufs__row_message__field_indices_by_name,
|
||||||
1, decoderbufs__row_message__number_ranges,
|
1, decoderbufs__row_message__number_ranges,
|
||||||
(ProtobufCMessageInit) decoderbufs__row_message__init,
|
(ProtobufCMessageInit) decoderbufs__row_message__init,
|
||||||
NULL,NULL,NULL /* reserved[123] */
|
NULL,NULL,NULL /* reserved[123] */
|
||||||
};
|
};
|
||||||
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
|
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] =
|
||||||
{
|
{
|
||||||
|
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
|
||||||
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
|
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
|
||||||
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
|
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
|
||||||
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
|
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
|
||||||
|
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
|
||||||
|
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
|
||||||
};
|
};
|
||||||
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
|
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
|
||||||
{0, 0},{0, 3}
|
{-1, 0},{0, 6}
|
||||||
};
|
};
|
||||||
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
|
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] =
|
||||||
{
|
{
|
||||||
{ "DELETE", 2 },
|
{ "BEGIN", 4 },
|
||||||
{ "INSERT", 0 },
|
{ "COMMIT", 5 },
|
||||||
{ "UPDATE", 1 },
|
{ "DELETE", 3 },
|
||||||
|
{ "INSERT", 1 },
|
||||||
|
{ "UNKNOWN", 0 },
|
||||||
|
{ "UPDATE", 2 },
|
||||||
};
|
};
|
||||||
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
||||||
{
|
{
|
||||||
|
@ -467,9 +593,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
||||||
"Op",
|
"Op",
|
||||||
"Decoderbufs__Op",
|
"Decoderbufs__Op",
|
||||||
"decoderbufs",
|
"decoderbufs",
|
||||||
3,
|
6,
|
||||||
decoderbufs__op__enum_values_by_number,
|
decoderbufs__op__enum_values_by_number,
|
||||||
3,
|
6,
|
||||||
decoderbufs__op__enum_values_by_name,
|
decoderbufs__op__enum_values_by_name,
|
||||||
1,
|
1,
|
||||||
decoderbufs__op__value_ranges,
|
decoderbufs__op__value_ranges,
|
||||||
|
|
|
@ -10,22 +10,26 @@ PROTOBUF_C__BEGIN_DECLS
|
||||||
|
|
||||||
#if PROTOBUF_C_VERSION_NUMBER < 1000000
|
#if PROTOBUF_C_VERSION_NUMBER < 1000000
|
||||||
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
|
# error This file was generated by a newer version of protoc-c which is incompatible with your libprotobuf-c headers. Please update your headers.
|
||||||
#elif 1001001 < PROTOBUF_C_MIN_COMPILER_VERSION
|
#elif 1002001 < PROTOBUF_C_MIN_COMPILER_VERSION
|
||||||
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
|
# error This file was generated by an older version of protoc-c which is incompatible with your libprotobuf-c headers. Please regenerate this file with a newer version of protoc-c.
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
typedef struct _Decoderbufs__Point Decoderbufs__Point;
|
typedef struct _Decoderbufs__Point Decoderbufs__Point;
|
||||||
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
|
typedef struct _Decoderbufs__DatumMessage Decoderbufs__DatumMessage;
|
||||||
|
typedef struct _Decoderbufs__TypeInfo Decoderbufs__TypeInfo;
|
||||||
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
|
typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
|
||||||
|
|
||||||
|
|
||||||
/* --- enums --- */
|
/* --- enums --- */
|
||||||
|
|
||||||
typedef enum _Decoderbufs__Op {
|
typedef enum _Decoderbufs__Op {
|
||||||
|
DECODERBUFS__OP__UNKNOWN = -1,
|
||||||
DECODERBUFS__OP__INSERT = 0,
|
DECODERBUFS__OP__INSERT = 0,
|
||||||
DECODERBUFS__OP__UPDATE = 1,
|
DECODERBUFS__OP__UPDATE = 1,
|
||||||
DECODERBUFS__OP__DELETE = 2
|
DECODERBUFS__OP__DELETE = 2,
|
||||||
|
DECODERBUFS__OP__BEGIN = 3,
|
||||||
|
DECODERBUFS__OP__COMMIT = 4
|
||||||
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
|
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
|
||||||
} Decoderbufs__Op;
|
} Decoderbufs__Op;
|
||||||
|
|
||||||
|
@ -52,6 +56,7 @@ typedef enum {
|
||||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
|
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_STRING = 8,
|
||||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
|
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_BYTES = 9,
|
||||||
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
|
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_POINT = 10,
|
||||||
|
DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_MISSING = 11,
|
||||||
} Decoderbufs__DatumMessage__DatumCase;
|
} Decoderbufs__DatumMessage__DatumCase;
|
||||||
|
|
||||||
struct _Decoderbufs__DatumMessage
|
struct _Decoderbufs__DatumMessage
|
||||||
|
@ -70,11 +75,23 @@ struct _Decoderbufs__DatumMessage
|
||||||
char *datum_string;
|
char *datum_string;
|
||||||
ProtobufCBinaryData datum_bytes;
|
ProtobufCBinaryData datum_bytes;
|
||||||
Decoderbufs__Point *datum_point;
|
Decoderbufs__Point *datum_point;
|
||||||
|
protobuf_c_boolean datum_missing;
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
#define DECODERBUFS__DATUM_MESSAGE__INIT \
|
#define DECODERBUFS__DATUM_MESSAGE__INIT \
|
||||||
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
|
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__datum_message__descriptor) \
|
||||||
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {} }
|
, NULL, 0,0, DECODERBUFS__DATUM_MESSAGE__DATUM__NOT_SET, {0} }
|
||||||
|
|
||||||
|
|
||||||
|
struct _Decoderbufs__TypeInfo
|
||||||
|
{
|
||||||
|
ProtobufCMessage base;
|
||||||
|
char *modifier;
|
||||||
|
protobuf_c_boolean value_optional;
|
||||||
|
};
|
||||||
|
#define DECODERBUFS__TYPE_INFO__INIT \
|
||||||
|
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__type_info__descriptor) \
|
||||||
|
, NULL, 0 }
|
||||||
|
|
||||||
|
|
||||||
struct _Decoderbufs__RowMessage
|
struct _Decoderbufs__RowMessage
|
||||||
|
@ -91,10 +108,12 @@ struct _Decoderbufs__RowMessage
|
||||||
Decoderbufs__DatumMessage **new_tuple;
|
Decoderbufs__DatumMessage **new_tuple;
|
||||||
size_t n_old_tuple;
|
size_t n_old_tuple;
|
||||||
Decoderbufs__DatumMessage **old_tuple;
|
Decoderbufs__DatumMessage **old_tuple;
|
||||||
|
size_t n_new_typeinfo;
|
||||||
|
Decoderbufs__TypeInfo **new_typeinfo;
|
||||||
};
|
};
|
||||||
#define DECODERBUFS__ROW_MESSAGE__INIT \
|
#define DECODERBUFS__ROW_MESSAGE__INIT \
|
||||||
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
|
{ PROTOBUF_C_MESSAGE_INIT (&decoderbufs__row_message__descriptor) \
|
||||||
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL }
|
, 0,0, 0,0, NULL, 0,0, 0,NULL, 0,NULL, 0,NULL }
|
||||||
|
|
||||||
|
|
||||||
/* Decoderbufs__Point methods */
|
/* Decoderbufs__Point methods */
|
||||||
|
@ -135,6 +154,25 @@ Decoderbufs__DatumMessage *
|
||||||
void decoderbufs__datum_message__free_unpacked
|
void decoderbufs__datum_message__free_unpacked
|
||||||
(Decoderbufs__DatumMessage *message,
|
(Decoderbufs__DatumMessage *message,
|
||||||
ProtobufCAllocator *allocator);
|
ProtobufCAllocator *allocator);
|
||||||
|
/* Decoderbufs__TypeInfo methods */
|
||||||
|
void decoderbufs__type_info__init
|
||||||
|
(Decoderbufs__TypeInfo *message);
|
||||||
|
size_t decoderbufs__type_info__get_packed_size
|
||||||
|
(const Decoderbufs__TypeInfo *message);
|
||||||
|
size_t decoderbufs__type_info__pack
|
||||||
|
(const Decoderbufs__TypeInfo *message,
|
||||||
|
uint8_t *out);
|
||||||
|
size_t decoderbufs__type_info__pack_to_buffer
|
||||||
|
(const Decoderbufs__TypeInfo *message,
|
||||||
|
ProtobufCBuffer *buffer);
|
||||||
|
Decoderbufs__TypeInfo *
|
||||||
|
decoderbufs__type_info__unpack
|
||||||
|
(ProtobufCAllocator *allocator,
|
||||||
|
size_t len,
|
||||||
|
const uint8_t *data);
|
||||||
|
void decoderbufs__type_info__free_unpacked
|
||||||
|
(Decoderbufs__TypeInfo *message,
|
||||||
|
ProtobufCAllocator *allocator);
|
||||||
/* Decoderbufs__RowMessage methods */
|
/* Decoderbufs__RowMessage methods */
|
||||||
void decoderbufs__row_message__init
|
void decoderbufs__row_message__init
|
||||||
(Decoderbufs__RowMessage *message);
|
(Decoderbufs__RowMessage *message);
|
||||||
|
@ -162,6 +200,9 @@ typedef void (*Decoderbufs__Point_Closure)
|
||||||
typedef void (*Decoderbufs__DatumMessage_Closure)
|
typedef void (*Decoderbufs__DatumMessage_Closure)
|
||||||
(const Decoderbufs__DatumMessage *message,
|
(const Decoderbufs__DatumMessage *message,
|
||||||
void *closure_data);
|
void *closure_data);
|
||||||
|
typedef void (*Decoderbufs__TypeInfo_Closure)
|
||||||
|
(const Decoderbufs__TypeInfo *message,
|
||||||
|
void *closure_data);
|
||||||
typedef void (*Decoderbufs__RowMessage_Closure)
|
typedef void (*Decoderbufs__RowMessage_Closure)
|
||||||
(const Decoderbufs__RowMessage *message,
|
(const Decoderbufs__RowMessage *message,
|
||||||
void *closure_data);
|
void *closure_data);
|
||||||
|
@ -174,6 +215,7 @@ typedef void (*Decoderbufs__RowMessage_Closure)
|
||||||
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
|
extern const ProtobufCEnumDescriptor decoderbufs__op__descriptor;
|
||||||
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
|
extern const ProtobufCMessageDescriptor decoderbufs__point__descriptor;
|
||||||
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
|
extern const ProtobufCMessageDescriptor decoderbufs__datum_message__descriptor;
|
||||||
|
extern const ProtobufCMessageDescriptor decoderbufs__type_info__descriptor;
|
||||||
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
|
extern const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor;
|
||||||
|
|
||||||
PROTOBUF_C__END_DECLS
|
PROTOBUF_C__END_DECLS
|
||||||
|
|
Loading…
Reference in New Issue