Bulkload copy for citus

Through gprof performance analysis, I found that master node is a
CPU bottleneck and function NextCopyFrom() spent most of the time.
To improve ingestion performance, I assign this time-consuming
function to each worker node and the benchmark result shows that
it's actually working, we get nearly #worker times as fast as before.

This bulkload feature works as below:
1. we issue a bulkload copy command on any node(master or worker), such
   as "copy tb1 from 'tb1.csv' with(format csv, method 'bulkload');" in
   node host:port.
2. the copy command is rebuilt to "copy tb1 from program 'bload'
   with(format csv, bulkload_host host, bulkload_port port, method
   'bulkload')" in host:port, and then this rebuilt-copy command is
   assigned to each worker asynchronously, besides, we would create a
   zeromq server, which reads records from file 'tb1.csv' and delivers
   these records to zeromq client(program 'bload' in each worker node).
3. in each worker node, it just executes the copy command assigned in
   step 2, the records of copy command come from zeromq client bload,
   which pull records from zeromq server.

To enable this feature, you must have zeromq installed. After compiling
and installing citus extension, just add copy option "method 'bulkload'"
to use bulkload ingestion.

For now, bulkload copy supports copy from file,program with(format csv,text)
for append and hash distributed table.
Note: only supports format csv,text for copy from stdin, format binary is
not supported.

TODO: better support for transaction and error handling.
pull/1234/head
Yuanhao Luo 2017-02-16 19:28:45 +08:00
parent 1ba078caea
commit 4923a85aba
10 changed files with 1889 additions and 23 deletions

View File

@ -10,7 +10,7 @@ endif
include Makefile.global
all: extension
all: extension bload
# build extension
extension:
@ -30,6 +30,18 @@ clean-extension:
install: install-extension install-headers
clean: clean-extension
# build bload binary
bload:
$(MAKE) -C src/bin/bload/ all
install-bload: bload
$(MAKE) -C src/bin/bload/ install
clean-bload:
$(MAKE) -C src/bin/bload/ clean
.PHONY: bload install-bload clean-bload
# Add to generic targets
install: install-bload
clean: clean-bload
# apply or check style
reindent:
cd ${citus_abs_top_srcdir} && citus_indent --quiet

File diff suppressed because it is too large Load Diff

View File

@ -585,6 +585,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
SelectStmt *selectStmt = makeNode(SelectStmt);
ResTarget *selectTarget = makeNode(ResTarget);
if (IsBulkloadCopy(copyStatement))
{
elog(ERROR, "Bulkload copy only supports for COPY FROM");
}
allColumns->fields = list_make1(makeNode(A_Star));
allColumns->location = -1;

View File

@ -125,14 +125,6 @@ static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
char partitionMethod);
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static void InitializeDistTableCache(void);
static void InitializeWorkerNodeCache(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
@ -434,6 +426,28 @@ DistributedTableCacheEntry(Oid distributedRelationId)
}
}
/*
* InsertDistTableCacheEntry insert the distributed table metadata for the
* passed relationId.
*/
void
InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent)
{
DistTableCacheEntry *cacheEntry = NULL;
bool foundInCache = false;
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
}
cacheEntry = hash_search(DistTableCacheHash, (const void *)&relationId, HASH_ENTER,
&foundInCache);
Assert(foundInCache == false);
memcpy(cacheEntry, ent, sizeof(DistTableCacheEntry));
/* restore relationId */
cacheEntry->relationId = relationId;
}
/*
* LookupDistTableCacheEntry returns the distributed table metadata for the
@ -819,7 +833,7 @@ ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionM
* SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with
* no min/max values are placed at the end of the array.
*/
static ShardInterval **
ShardInterval **
SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
FmgrInfo *shardIntervalSortCompareFunction)
{
@ -847,7 +861,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
* has a uniform hash distribution, as produced by master_create_worker_shards for
* hash partitioned tables.
*/
static bool
bool
HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength)
{
@ -891,7 +905,7 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray,
* ensure that input shard interval array is sorted on shardminvalue and uninitialized
* shard intervals are at the end of the array.
*/
static bool
bool
HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount)
{
bool hasUninitializedShardInterval = false;

29
src/bin/bload/Makefile Normal file
View File

@ -0,0 +1,29 @@
#-------------------------------------------------------------------------
#
# Makefile for src/bin/bload
#
# Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
# Portions Copyright (c) 1994, Regents of the University of California
#
# src/bin/bload/Makefile
#
#-------------------------------------------------------------------------
citus_subdir = src/bin/bload
citus_top_builddir = ../../..
PROGRAM = bload
PGFILEDESC = "bload - the zeromq client for bulkload"
OBJS = bload.o
PG_LIBS = $(libpq)
override CFLAGS += -lzmq
include $(citus_top_builddir)/Makefile.global
clean: bload-clean
bload-clean:
rm -f bload$(X) $(OBJS)

176
src/bin/bload/bload.c Normal file
View File

@ -0,0 +1,176 @@
/*-------------------------------------------------------------------------
*
* bload.c
*
* This is the zeromq client of bulkload copy. It pulls data from zeromq server
* and outputs the message to stdout.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "distributed/bload.h"
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
/* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
int main(int argc, char *argv[])
{
FILE *fp = NULL;
uint64_t buffer_size = BatchSize + MaxRecordSize + 1;
char buf[buffer_size];
char connstr[64];
int rc;
const int zero = 0;
const short negative = -1;
bool binary = false;
/* variables for zeromq */
void *context = NULL;
void *receiver = NULL;
void *controller = NULL;
int port = 0;
int nbytes;
fp = fopen("/tmp/bload.log", "a");
if (!fp) fp = stderr;
if (argc < 3)
{
fprintf(fp, "Usage: %s host port [binary]\n", argv[0]);
fflush(fp);
fclose(fp);
return 1;
}
if (argc == 4 && strcmp(argv[3], "binary") == 0)
{
binary = true;
}
context = zmq_ctx_new();
// Socket to receive messages on
receiver = zmq_socket(context, ZMQ_PULL);
port = atoi(argv[2]);
sprintf(connstr, "tcp://%s:%d", argv[1], port);
rc = zmq_connect(receiver, connstr);
if (rc != 0)
{
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
fclose(fp);
zmq_close(receiver);
zmq_ctx_destroy(context);
return 1;
}
// Socket to receive control message
controller = zmq_socket(context, ZMQ_SUB);
sprintf(connstr, "tcp://%s:%d", argv[1], port + 1);
rc = zmq_connect(controller, connstr);
if (rc != 0)
{
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
fclose(fp);
zmq_close(receiver);
zmq_close(controller);
zmq_ctx_destroy(context);
return 1;
}
zmq_setsockopt(controller, ZMQ_SUBSCRIBE, "", 0);
zmq_pollitem_t items[] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
if (binary)
{
/* Signature */
fwrite(BinarySignature, 1, 11, stdout);
/* Flags field(no OIDs) */
fwrite((void *)&zero, 1, 4, stdout);
/* No header extenstion */
fwrite((void *)&zero, 1, 4, stdout);
fflush(stdout);
}
while (true) {
/* wait indefinitely for an event to occur */
rc = zmq_poll(items, 2, -1);
if (rc == -1) /* error occurs */
{
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
{
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
if (nbytes == -1)
{
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
fwrite(buf, 1, nbytes, stdout);
fflush(stdout);
}
if (items[1].revents & ZMQ_POLLIN) /* receive signal kill */
{
fprintf(fp, "receive signal kill, wait for exhausting all messages\n");
fflush(fp);
/* consume all messages before exit */
while (true) {
/* wait 100 milliseconds for an event to occur */
rc = zmq_poll(items, 1, 100);
if (rc == 0) /* no more messages */
{
break;
}
else if (rc == -1) /* error occurs */
{
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
{
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
if (nbytes == -1)
{
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
fwrite(buf, 1, nbytes, stdout);
fflush(stdout);
}
}
fprintf(fp, "we have consume all messages, exit now\n");
fflush(fp);
if (binary)
{
/* Binary footers */
fwrite((void *)&negative, 1, 2, stdout);
fflush(stdout);
}
break;
}
}
zmq_close(receiver);
zmq_close(controller);
zmq_ctx_destroy(context);
fclose(fp);
return 0;
}

View File

@ -0,0 +1,18 @@
/*-------------------------------------------------------------------------
*
* bload.h
* Definitions of const variables used in bulkload copy for
* distributed tables.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef BLOAD_H
#define BLOAD_H
#define BatchSize 1024 /* size of a zeromq message in bytes */
#define MaxRecordSize 256 /* size of max acceptable record in bytes */
#endif

View File

@ -75,6 +75,8 @@
#define UPDATE_SHARD_STATISTICS_QUERY \
"SELECT master_update_shard_statistics(%ld)"
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
#define ACTIVE_WORKER_NODE_QUERY "SELECT * FROM master_get_active_worker_nodes();"
#define RELATIONID_QUERY "SELECT logical_relid FROM master_get_table_metadata('%s');"
/* Enumeration that defines the shard placement policy to use while staging */
typedef enum

View File

@ -61,11 +61,20 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern void InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent);
extern int GetLocalGroupId(void);
extern List * DistTableOidList(void);
extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
extern bool CitusHasBeenLoaded(void);

View File

@ -43,6 +43,18 @@ typedef struct NodeAddress
int32 nodePort;
} NodeAddress;
/* struct type to keep zeromq related value */
typedef struct ZeroMQServer
{
char host[NAMEDATALEN];
int32 port;
char file[NAMEDATALEN];
void *context;
void *sender;
void *controller;
} ZeroMQServer;
/* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
@ -56,5 +68,10 @@ extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
/* functions declarations for bulkload copy */
extern bool IsBulkloadCopy(CopyStmt *copyStatement);
extern bool IsBinaryCopy(CopyStmt *copyStatement);
extern bool IsBulkloadClient(CopyStmt *copyStatement);
extern void CitusBulkloadCopy(CopyStmt *copyStatement, char *completionTag);
#endif /* MULTI_COPY_H */