Compare commits
8 Commits
v1.0.0.CR1
...
main
Author | SHA1 | Date |
---|---|---|
|
84f30f3f44 | |
|
e1f87147b7 | |
|
e709f326cb | |
|
cd4171a030 | |
|
c9b00aa8c0 | |
|
7a8c8b8f46 | |
|
e29a2580a5 | |
|
870ecfa976 |
100
README.md
100
README.md
|
@ -10,15 +10,17 @@
|
||||||
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
|
A PostgreSQL logical decoder output plugin to deliver data as [Protocol Buffers](https://developers.google.com/protocol-buffers), adapted for Debezium
|
||||||
|
|
||||||
## Thanks to
|
## Thanks to
|
||||||
|
|
||||||
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
|
- The original [Decoderbufs Project](https://github.com/xstevens/decoderbufs) on which this is based
|
||||||
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
|
- [The PostgreSQL Team](https://postgresql.org) for adding [logical decoding](http://www.postgresql.org/docs/9.4/static/logicaldecoding.html) support
|
||||||
|
|
||||||
## Dependencies
|
## Dependencies
|
||||||
|
|
||||||
This code depends on the following libraries and requires them for compilation:
|
This code depends on the following libraries and requires them for compilation:
|
||||||
|
|
||||||
* [PostgreSQL](http://www.postgresql.org) 9.6+
|
- [PostgreSQL](http://www.postgresql.org) 9.6+
|
||||||
* [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
|
- [Protobuf-c](https://github.com/protobuf-c/protobuf-c) 1.2+ - used for data serialization
|
||||||
* [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
- [PostGIS](http://www.postgis.net/) 2.1+ - used for Postgres geometric types support
|
||||||
|
|
||||||
## Building
|
## Building
|
||||||
|
|
||||||
|
@ -26,21 +28,26 @@ This code depends on the following libraries and requires them for compilation:
|
||||||
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
|
(for pg_config), PostgreSQL server development packages, protobuf-c for the Protocol Buffer support and some PostGIS development packages.
|
||||||
|
|
||||||
### Installing Dependencies
|
### Installing Dependencies
|
||||||
|
|
||||||
#### Debian
|
#### Debian
|
||||||
|
|
||||||
# Core build utilities
|
```bash
|
||||||
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
# Core build utilities
|
||||||
|
apt-get update && apt-get install -f -y software-properties-common build-essential pkg-config git postgresql-server-dev-9.6
|
||||||
|
|
||||||
# PostGIS dependency
|
# PostGIS dependency
|
||||||
apt-get install -f -y libproj-dev liblwgeom-dev
|
apt-get install -f -y libproj-dev liblwgeom-dev
|
||||||
|
|
||||||
# Protobuf-c dependency (requires a non-stable Debian repo)
|
# Protobuf-c dependency (requires a non-stable Debian repo)
|
||||||
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
add-apt-repository "deb http://ftp.debian.org/debian testing main contrib" && apt-get update
|
||||||
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
apt-get install -y libprotobuf-c-dev=1.2.1-1+b1
|
||||||
|
```
|
||||||
|
|
||||||
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
|
When updating the ProtoBuf definition, also install the ProtoBuf C compiler:
|
||||||
|
|
||||||
apt-get install -y protobuf-c-compiler=1.2.*
|
```bash
|
||||||
|
apt-get install -y protobuf-c-compiler=1.2.*
|
||||||
|
```
|
||||||
|
|
||||||
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
|
The above are taken from the Debezium [container images](https://github.com/debezium/docker-images).
|
||||||
|
|
||||||
|
@ -53,16 +60,20 @@ Note that the last step from the above sequence is only required for Debian to b
|
||||||
|
|
||||||
If you have all of the above prerequisites installed, clone this git repo to build from source:
|
If you have all of the above prerequisites installed, clone this git repo to build from source:
|
||||||
|
|
||||||
git clone https://github.com/debezium/postgres-decoderbufs.git
|
```bash
|
||||||
cd postgres-decoderbufs
|
git clone https://github.com/debezium/postgres-decoderbufs.git
|
||||||
|
cd postgres-decoderbufs
|
||||||
|
```
|
||||||
|
|
||||||
### Optional: Re-generating ProtoBuf code
|
### Optional: Re-generating ProtoBuf code
|
||||||
|
|
||||||
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
|
This is only needed after changes to the ProtoBuf definition (_proto/pg_logicaldec.proto):
|
||||||
|
|
||||||
cd proto
|
```bash
|
||||||
protoc-c --c_out=../src/proto pg_logicaldec.proto
|
cd proto
|
||||||
cd ..
|
protoc-c --c_out=../src/proto pg_logicaldec.proto
|
||||||
|
cd ..
|
||||||
|
```
|
||||||
|
|
||||||
Commit the generated files to git then.
|
Commit the generated files to git then.
|
||||||
|
|
||||||
|
@ -71,44 +82,53 @@ Commit the generated files to git then.
|
||||||
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
|
If you have multiple Postgres versions installed, you can select which version to install decoderbufs into by altering your `$PATH` to point to the right version.
|
||||||
Then `make` and `make install` for each version. Here is an example:
|
Then `make` and `make install` for each version. Here is an example:
|
||||||
|
|
||||||
# Install for Postgres 9.6 if I have multiple local versions
|
```bash
|
||||||
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
|
# Install for Postgres 9.6 if I have multiple local versions
|
||||||
make
|
export PATH=/usr/lib/postgresql/9.6/bin:$PATH
|
||||||
make install
|
make
|
||||||
|
make install
|
||||||
|
```
|
||||||
|
|
||||||
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
|
Once the extension has been installed you just need to enable it and logical replication in postgresql.conf:
|
||||||
|
|
||||||
# MODULES
|
```bash
|
||||||
shared_preload_libraries = 'decoderbufs'
|
# MODULES
|
||||||
|
shared_preload_libraries = 'decoderbufs'
|
||||||
|
|
||||||
# REPLICATION
|
# REPLICATION
|
||||||
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
wal_level = logical # minimal, archive, hot_standby, or logical (change requires restart)
|
||||||
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
max_wal_senders = 8 # max number of walsender processes (change requires restart)
|
||||||
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
wal_keep_segments = 4 # in logfile segments, 16MB each; 0 disables
|
||||||
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
#wal_sender_timeout = 60s # in milliseconds; 0 disables
|
||||||
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
max_replication_slots = 4 # max number of replication slots (change requires restart)
|
||||||
|
```
|
||||||
|
|
||||||
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
|
In addition, permissions will have to be added for the user that connects to the DB to be able to replicate. This can be modified in _pg\_hba.conf_ like so:
|
||||||
|
|
||||||
local replication <youruser> trust
|
```make
|
||||||
host replication <youruser> 127.0.0.1/32 trust
|
local replication <youruser> trust
|
||||||
host replication <youruser> ::1/128 trust
|
host replication <youruser> 127.0.0.1/32 trust
|
||||||
|
host replication <youruser> ::1/128 trust
|
||||||
|
```
|
||||||
|
|
||||||
And restart PostgreSQL.
|
And restart PostgreSQL.
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
-- can use SQL for demo purposes
|
|
||||||
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
|
||||||
|
|
||||||
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
```sql
|
||||||
|
-- can use SQL for demo purposes
|
||||||
|
select * from pg_create_logical_replication_slot('decoderbufs_demo', 'decoderbufs');
|
||||||
|
|
||||||
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
-- DO SOME TABLE MODIFICATIONS (see below about UPDATE/DELETE)
|
||||||
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
|
||||||
-- get WAL changes using decoderbufs to update the WAL position
|
|
||||||
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
|
||||||
|
|
||||||
-- check the WAL position of logical replicators
|
-- peek at WAL changes using decoderbufs debug mode for SQL console
|
||||||
select * from pg_replication_slots where slot_type = 'logical';
|
select data from pg_logical_slot_peek_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||||
|
-- get WAL changes using decoderbufs to update the WAL position
|
||||||
|
select data from pg_logical_slot_get_changes('decoderbufs_demo', NULL, NULL, 'debug-mode', '1');
|
||||||
|
|
||||||
|
-- check the WAL position of logical replicators
|
||||||
|
select * from pg_replication_slots where slot_type = 'logical';
|
||||||
|
```
|
||||||
|
|
||||||
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
|
If you're performing an UPDATE/DELETE on your table and you don't see results for those operations from logical decoding, make sure you have set [REPLICA IDENTITY](http://www.postgresql.org/docs/9.4/static/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY) appropriately for your use case.
|
||||||
|
|
||||||
|
|
|
@ -5,9 +5,12 @@ option java_outer_classname = "PgProto";
|
||||||
option optimize_for = SPEED;
|
option optimize_for = SPEED;
|
||||||
|
|
||||||
enum Op {
|
enum Op {
|
||||||
|
UNKNOWN = -1;
|
||||||
INSERT = 0;
|
INSERT = 0;
|
||||||
UPDATE = 1;
|
UPDATE = 1;
|
||||||
DELETE = 2;
|
DELETE = 2;
|
||||||
|
BEGIN = 3;
|
||||||
|
COMMIT = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message Point {
|
message Point {
|
||||||
|
|
|
@ -60,6 +60,12 @@
|
||||||
#error Expecting timestamps to be represented as integers, not as floating-point.
|
#error Expecting timestamps to be represented as integers, not as floating-point.
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM >= 170000
|
||||||
|
#define TUPLE_ACCESS(x) x
|
||||||
|
#else
|
||||||
|
#define TUPLE_ACCESS(x) &x->tuple
|
||||||
|
#endif
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
|
|
||||||
/* define a time macro to convert TimestampTz into something more sane,
|
/* define a time macro to convert TimestampTz into something more sane,
|
||||||
|
@ -164,16 +170,6 @@ static void pg_decode_shutdown(LogicalDecodingContext *ctx) {
|
||||||
MemoryContextDelete(data->context);
|
MemoryContextDelete(data->context);
|
||||||
}
|
}
|
||||||
|
|
||||||
/* BEGIN callback */
|
|
||||||
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
|
||||||
ReorderBufferTXN *txn) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/* COMMIT callback */
|
|
||||||
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
|
||||||
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/* print tuple datums (only used for debug-mode) */
|
/* print tuple datums (only used for debug-mode) */
|
||||||
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
static void print_tuple_datums(StringInfo out, Decoderbufs__DatumMessage **tup,
|
||||||
size_t n) {
|
size_t n) {
|
||||||
|
@ -279,10 +275,13 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT32;
|
||||||
break;
|
break;
|
||||||
case INT8OID:
|
case INT8OID:
|
||||||
case OIDOID:
|
|
||||||
datum_msg->datum_int64 = DatumGetInt64(datum);
|
datum_msg->datum_int64 = DatumGetInt64(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
break;
|
break;
|
||||||
|
case OIDOID:
|
||||||
|
datum_msg->datum_int64 = (Oid) DatumGetUInt64(datum);
|
||||||
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
|
break;
|
||||||
case FLOAT4OID:
|
case FLOAT4OID:
|
||||||
datum_msg->datum_float = DatumGetFloat4(datum);
|
datum_msg->datum_float = DatumGetFloat4(datum);
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_FLOAT;
|
||||||
|
@ -314,23 +313,21 @@ static void set_datum_value(Decoderbufs__DatumMessage *datum_msg, Oid typid,
|
||||||
case TIMESTAMPOID:
|
case TIMESTAMPOID:
|
||||||
ts = DatumGetTimestamp(datum);
|
ts = DatumGetTimestamp(datum);
|
||||||
if (TIMESTAMP_NOT_FINITE(ts)) {
|
if (TIMESTAMP_NOT_FINITE(ts)) {
|
||||||
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
|
datum_msg->datum_int64 = ts;
|
||||||
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
|
|
||||||
} else {
|
} else {
|
||||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||||
|
}
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
case TIMESTAMPTZOID:
|
case TIMESTAMPTZOID:
|
||||||
ts = DatumGetTimestampTz(datum);
|
ts = DatumGetTimestampTz(datum);
|
||||||
if (TIMESTAMP_NOT_FINITE(ts)) {
|
if (TIMESTAMP_NOT_FINITE(ts)) {
|
||||||
ereport(ERROR, (errcode(ERRCODE_DATETIME_VALUE_OUT_OF_RANGE),
|
datum_msg->datum_int64 = ts;
|
||||||
errmsg("timestamp \'%s\'out of range", ts ? strVal(ts) : "(null)")));
|
|
||||||
} else {
|
} else {
|
||||||
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
datum_msg->datum_int64 = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(ts);
|
||||||
|
}
|
||||||
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
datum_msg->datum_case = DECODERBUFS__DATUM_MESSAGE__DATUM_DATUM_INT64;
|
||||||
break;
|
break;
|
||||||
}
|
|
||||||
case DATEOID:
|
case DATEOID:
|
||||||
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
|
/* simply get the number of days as the stored 32 bit value and convert to EPOCH */
|
||||||
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
|
datum_msg->datum_int32 = DATE_TO_DAYS_SINCE_EPOCH(DatumGetDateADT(datum));
|
||||||
|
@ -491,6 +488,90 @@ static void add_metadata_to_msg(Decoderbufs__TypeInfo **tmsg,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* BEGIN callback */
|
||||||
|
static void pg_decode_begin_txn(LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn) {
|
||||||
|
|
||||||
|
DecoderData *data;
|
||||||
|
MemoryContext old;
|
||||||
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
elog(DEBUG1, "Entering begin callback");
|
||||||
|
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
|
rmsg.op = DECODERBUFS__OP__BEGIN;
|
||||||
|
rmsg.has_op = true;
|
||||||
|
rmsg.transaction_id = txn->xid;
|
||||||
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
|
rmsg.has_commit_time = true;
|
||||||
|
|
||||||
|
/* write msg */
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
if (data->debug_mode) {
|
||||||
|
print_row_msg(ctx->out, &rmsg);
|
||||||
|
} else {
|
||||||
|
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||||
|
void *packed = palloc(psize);
|
||||||
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
|
}
|
||||||
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
|
/* Cleanup, freeing memory */
|
||||||
|
MemoryContextSwitchTo(old);
|
||||||
|
MemoryContextReset(data->context);
|
||||||
|
}
|
||||||
|
|
||||||
|
/* COMMIT callback */
|
||||||
|
static void pg_decode_commit_txn(LogicalDecodingContext *ctx,
|
||||||
|
ReorderBufferTXN *txn, XLogRecPtr commit_lsn) {
|
||||||
|
|
||||||
|
DecoderData *data;
|
||||||
|
MemoryContext old;
|
||||||
|
Decoderbufs__RowMessage rmsg = DECODERBUFS__ROW_MESSAGE__INIT;
|
||||||
|
elog(DEBUG1, "Entering commit callback");
|
||||||
|
|
||||||
|
|
||||||
|
/* Avoid leaking memory by using and resetting our own context */
|
||||||
|
data = ctx->output_plugin_private;
|
||||||
|
old = MemoryContextSwitchTo(data->context);
|
||||||
|
|
||||||
|
rmsg.op = DECODERBUFS__OP__COMMIT;
|
||||||
|
rmsg.has_op = true;
|
||||||
|
rmsg.transaction_id = txn->xid;
|
||||||
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
|
rmsg.has_commit_time = true;
|
||||||
|
|
||||||
|
/* write msg */
|
||||||
|
OutputPluginPrepareWrite(ctx, true);
|
||||||
|
if (data->debug_mode) {
|
||||||
|
print_row_msg(ctx->out, &rmsg);
|
||||||
|
} else {
|
||||||
|
size_t psize = decoderbufs__row_message__get_packed_size(&rmsg);
|
||||||
|
void *packed = palloc(psize);
|
||||||
|
size_t ssize = decoderbufs__row_message__pack(&rmsg, packed);
|
||||||
|
appendBinaryStringInfo(ctx->out, packed, ssize);
|
||||||
|
}
|
||||||
|
OutputPluginWrite(ctx, true);
|
||||||
|
|
||||||
|
/* Cleanup, freeing memory */
|
||||||
|
MemoryContextSwitchTo(old);
|
||||||
|
MemoryContextReset(data->context);
|
||||||
|
}
|
||||||
|
|
||||||
/* callback for individual changed tuples */
|
/* callback for individual changed tuples */
|
||||||
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
Relation relation, ReorderBufferChange *change) {
|
Relation relation, ReorderBufferChange *change) {
|
||||||
|
@ -523,7 +604,11 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
/* set common fields */
|
/* set common fields */
|
||||||
rmsg.transaction_id = txn->xid;
|
rmsg.transaction_id = txn->xid;
|
||||||
rmsg.has_transaction_id = true;
|
rmsg.has_transaction_id = true;
|
||||||
|
#if PG_VERSION_NUM >= 150000
|
||||||
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->xact_time.commit_time);
|
||||||
|
#else
|
||||||
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
rmsg.commit_time = TIMESTAMPTZ_TO_USEC_SINCE_EPOCH(txn->commit_time);
|
||||||
|
#endif
|
||||||
rmsg.has_commit_time = true;
|
rmsg.has_commit_time = true;
|
||||||
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
rmsg.table = pstrdup(quote_qualified_identifier(get_namespace_name(get_rel_namespace(RelationGetRelid(relation))),
|
||||||
NameStr(class_form->relname)));
|
NameStr(class_form->relname)));
|
||||||
|
@ -543,13 +628,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.new_tuple =
|
rmsg.new_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
|
|
||||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||||
rmsg.new_typeinfo =
|
rmsg.new_typeinfo =
|
||||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case REORDER_BUFFER_CHANGE_UPDATE:
|
case REORDER_BUFFER_CHANGE_UPDATE:
|
||||||
|
@ -564,7 +649,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.old_tuple =
|
rmsg.old_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||||
}
|
}
|
||||||
if (change->data.tp.newtuple != NULL) {
|
if (change->data.tp.newtuple != NULL) {
|
||||||
elog(DEBUG1, "decoding new tuple information");
|
elog(DEBUG1, "decoding new tuple information");
|
||||||
|
@ -574,13 +659,13 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.new_tuple =
|
rmsg.new_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_new_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
tuple_to_tuple_msg(rmsg.new_tuple, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
|
|
||||||
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
rmsg.n_new_typeinfo = rmsg.n_new_tuple;
|
||||||
rmsg.new_typeinfo =
|
rmsg.new_typeinfo =
|
||||||
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
palloc(sizeof(Decoderbufs__TypeInfo*) * rmsg.n_new_typeinfo);
|
||||||
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
add_metadata_to_msg(rmsg.new_typeinfo, relation,
|
||||||
&change->data.tp.newtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.newtuple), tupdesc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
|
@ -596,7 +681,7 @@ static void pg_decode_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
|
||||||
rmsg.old_tuple =
|
rmsg.old_tuple =
|
||||||
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
palloc(sizeof(Decoderbufs__DatumMessage*) * rmsg.n_old_tuple);
|
||||||
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
tuple_to_tuple_msg(rmsg.old_tuple, relation,
|
||||||
&change->data.tp.oldtuple->tuple, tupdesc);
|
TUPLE_ACCESS(change->data.tp.oldtuple), tupdesc);
|
||||||
} else {
|
} else {
|
||||||
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
elog(DEBUG1, "no information to decode from DELETE because either no PK is present or REPLICA IDENTITY NOTHING or invalid ");
|
||||||
}
|
}
|
||||||
|
|
|
@ -565,20 +565,26 @@ const ProtobufCMessageDescriptor decoderbufs__row_message__descriptor =
|
||||||
(ProtobufCMessageInit) decoderbufs__row_message__init,
|
(ProtobufCMessageInit) decoderbufs__row_message__init,
|
||||||
NULL,NULL,NULL /* reserved[123] */
|
NULL,NULL,NULL /* reserved[123] */
|
||||||
};
|
};
|
||||||
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[3] =
|
static const ProtobufCEnumValue decoderbufs__op__enum_values_by_number[6] =
|
||||||
{
|
{
|
||||||
|
{ "UNKNOWN", "DECODERBUFS__OP__UNKNOWN", -1 },
|
||||||
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
|
{ "INSERT", "DECODERBUFS__OP__INSERT", 0 },
|
||||||
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
|
{ "UPDATE", "DECODERBUFS__OP__UPDATE", 1 },
|
||||||
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
|
{ "DELETE", "DECODERBUFS__OP__DELETE", 2 },
|
||||||
|
{ "BEGIN", "DECODERBUFS__OP__BEGIN", 3 },
|
||||||
|
{ "COMMIT", "DECODERBUFS__OP__COMMIT", 4 },
|
||||||
};
|
};
|
||||||
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
|
static const ProtobufCIntRange decoderbufs__op__value_ranges[] = {
|
||||||
{0, 0},{0, 3}
|
{-1, 0},{0, 6}
|
||||||
};
|
};
|
||||||
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[3] =
|
static const ProtobufCEnumValueIndex decoderbufs__op__enum_values_by_name[6] =
|
||||||
{
|
{
|
||||||
{ "DELETE", 2 },
|
{ "BEGIN", 4 },
|
||||||
{ "INSERT", 0 },
|
{ "COMMIT", 5 },
|
||||||
{ "UPDATE", 1 },
|
{ "DELETE", 3 },
|
||||||
|
{ "INSERT", 1 },
|
||||||
|
{ "UNKNOWN", 0 },
|
||||||
|
{ "UPDATE", 2 },
|
||||||
};
|
};
|
||||||
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
||||||
{
|
{
|
||||||
|
@ -587,9 +593,9 @@ const ProtobufCEnumDescriptor decoderbufs__op__descriptor =
|
||||||
"Op",
|
"Op",
|
||||||
"Decoderbufs__Op",
|
"Decoderbufs__Op",
|
||||||
"decoderbufs",
|
"decoderbufs",
|
||||||
3,
|
6,
|
||||||
decoderbufs__op__enum_values_by_number,
|
decoderbufs__op__enum_values_by_number,
|
||||||
3,
|
6,
|
||||||
decoderbufs__op__enum_values_by_name,
|
decoderbufs__op__enum_values_by_name,
|
||||||
1,
|
1,
|
||||||
decoderbufs__op__value_ranges,
|
decoderbufs__op__value_ranges,
|
||||||
|
|
|
@ -24,9 +24,12 @@ typedef struct _Decoderbufs__RowMessage Decoderbufs__RowMessage;
|
||||||
/* --- enums --- */
|
/* --- enums --- */
|
||||||
|
|
||||||
typedef enum _Decoderbufs__Op {
|
typedef enum _Decoderbufs__Op {
|
||||||
|
DECODERBUFS__OP__UNKNOWN = -1,
|
||||||
DECODERBUFS__OP__INSERT = 0,
|
DECODERBUFS__OP__INSERT = 0,
|
||||||
DECODERBUFS__OP__UPDATE = 1,
|
DECODERBUFS__OP__UPDATE = 1,
|
||||||
DECODERBUFS__OP__DELETE = 2
|
DECODERBUFS__OP__DELETE = 2,
|
||||||
|
DECODERBUFS__OP__BEGIN = 3,
|
||||||
|
DECODERBUFS__OP__COMMIT = 4
|
||||||
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
|
PROTOBUF_C__FORCE_ENUM_TO_BE_INT_SIZE(DECODERBUFS__OP)
|
||||||
} Decoderbufs__Op;
|
} Decoderbufs__Op;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue