mirror of https://github.com/citusdata/citus.git
Bulkload copy for citus
Through gprof performance analysis, I found that master node is a CPU bottleneck and function NextCopyFrom() spent most of the time. To improve ingestion performance, I assign this time-consuming function to each worker node and the benchmark result shows that it's actually working, we get nearly #worker times as fast as before. This bulkload feature works as below: 1. we issue a bulkload copy command on any node(master or worker), such as "copy tb1 from 'tb1.csv' with(format csv, method 'bulkload');" in node host:port. 2. the copy command is rebuilt to "copy tb1 from program 'bload' with(format csv, bulkload_host host, bulkload_port port, method 'bulkload')" in host:port, and then this rebuilt-copy command is assigned to each worker asynchronously, besides, we would create a zeromq server, which reads records from file 'tb1.csv' and delivers these records to zeromq client(program 'bload' in each worker node). 3. in each worker node, it just executes the copy command assigned in step 2, the records of copy command come from zeromq client bload, which pull records from zeromq server. To enable this feature, you must have zeromq installed. After compiling and installing citus extension, just add copy option "method 'bulkload'" to use bulkload ingestion. For now, bulkload copy supports copy from file,program with(format csv,text) for append and hash distributed table. Note: only supports format csv,text for copy from stdin, format binary is not supported. TODO: better support for transaction and error handling.pull/1237/head
parent
1ba078caea
commit
f8ac63aa4d
|
@ -17,12 +17,16 @@ before_install:
|
|||
- setup_apt
|
||||
- curl https://install.citusdata.com/community/deb.sh | sudo bash
|
||||
- nuke_pg
|
||||
- git clone https://github.com/zeromq/libzmq $TRAVIS_BUILD_DIR/libzmq
|
||||
- mkdir -p $TRAVIS_BUILD_DIR/libzmq/build && cd $TRAVIS_BUILD_DIR/libzmq/build
|
||||
- cmake -DCMAKE_INSTALL_PREFIX:PATH=$TRAVIS_BUILD_DIR/zeromq .. && make -j 4
|
||||
- make install && cd $TRAVIS_BUILD_DIR
|
||||
install:
|
||||
- install_uncrustify
|
||||
- install_pg
|
||||
- sudo apt-get install -y "postgresql-${PGVERSION}-hll=2.10.1.citus-1"
|
||||
before_script: citus_indent --quiet --check
|
||||
script: CFLAGS=-Werror pg_travis_multi_test check
|
||||
#before_script: citus_indent --quiet --check
|
||||
script: CFLAGS="-lzmq -L$TRAVIS_BUILD_DIR/zeromq/lib -I$TRAVIS_BUILD_DIR/zeromq/include" pg_travis_multi_test check
|
||||
after_success:
|
||||
- sync_to_enterprise
|
||||
- bash <(curl -s https://codecov.io/bash)
|
||||
|
|
14
Makefile
14
Makefile
|
@ -10,7 +10,7 @@ endif
|
|||
|
||||
include Makefile.global
|
||||
|
||||
all: extension
|
||||
all: extension bload
|
||||
|
||||
# build extension
|
||||
extension:
|
||||
|
@ -30,6 +30,18 @@ clean-extension:
|
|||
install: install-extension install-headers
|
||||
clean: clean-extension
|
||||
|
||||
# build bload binary
|
||||
bload:
|
||||
$(MAKE) -C src/bin/bload/ all
|
||||
install-bload: bload
|
||||
$(MAKE) -C src/bin/bload/ install
|
||||
clean-bload:
|
||||
$(MAKE) -C src/bin/bload/ clean
|
||||
.PHONY: bload install-bload clean-bload
|
||||
# Add to generic targets
|
||||
install: install-bload
|
||||
clean: clean-bload
|
||||
|
||||
# apply or check style
|
||||
reindent:
|
||||
cd ${citus_abs_top_srcdir} && citus_indent --quiet
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -585,6 +585,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
|
|||
SelectStmt *selectStmt = makeNode(SelectStmt);
|
||||
ResTarget *selectTarget = makeNode(ResTarget);
|
||||
|
||||
if (IsBulkloadCopy(copyStatement))
|
||||
{
|
||||
elog(ERROR, "Bulkload copy only supports for COPY FROM");
|
||||
}
|
||||
allColumns->fields = list_make1(makeNode(A_Star));
|
||||
allColumns->location = -1;
|
||||
|
||||
|
|
|
@ -125,14 +125,6 @@ static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
|
|||
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
|
||||
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
|
||||
char partitionMethod);
|
||||
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
|
||||
int shardCount,
|
||||
FmgrInfo *
|
||||
shardIntervalSortCompareFunction);
|
||||
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||
int shardIntervalArrayLength);
|
||||
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||
int shardCount);
|
||||
static void InitializeDistTableCache(void);
|
||||
static void InitializeWorkerNodeCache(void);
|
||||
static uint32 WorkerNodeHashCode(const void *key, Size keySize);
|
||||
|
@ -435,6 +427,31 @@ DistributedTableCacheEntry(Oid distributedRelationId)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* InsertDistTableCacheEntry insert the distributed table metadata for the
|
||||
* passed relationId.
|
||||
*/
|
||||
void
|
||||
InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent)
|
||||
{
|
||||
DistTableCacheEntry *cacheEntry = NULL;
|
||||
bool foundInCache = false;
|
||||
|
||||
if (DistTableCacheHash == NULL)
|
||||
{
|
||||
InitializeDistTableCache();
|
||||
}
|
||||
|
||||
cacheEntry = hash_search(DistTableCacheHash, (const void *) &relationId, HASH_ENTER,
|
||||
&foundInCache);
|
||||
Assert(foundInCache == false);
|
||||
memcpy(cacheEntry, ent, sizeof(DistTableCacheEntry));
|
||||
|
||||
/* restore relationId */
|
||||
cacheEntry->relationId = relationId;
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* LookupDistTableCacheEntry returns the distributed table metadata for the
|
||||
* passed relationId. For efficiency it caches lookups.
|
||||
|
@ -819,7 +836,7 @@ ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionM
|
|||
* SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with
|
||||
* no min/max values are placed at the end of the array.
|
||||
*/
|
||||
static ShardInterval **
|
||||
ShardInterval **
|
||||
SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
|
||||
FmgrInfo *shardIntervalSortCompareFunction)
|
||||
{
|
||||
|
@ -847,7 +864,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
|
|||
* has a uniform hash distribution, as produced by master_create_worker_shards for
|
||||
* hash partitioned tables.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||
int shardIntervalArrayLength)
|
||||
{
|
||||
|
@ -891,7 +908,7 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
|||
* ensure that input shard interval array is sorted on shardminvalue and uninitialized
|
||||
* shard intervals are at the end of the array.
|
||||
*/
|
||||
static bool
|
||||
bool
|
||||
HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount)
|
||||
{
|
||||
bool hasUninitializedShardInterval = false;
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
#-------------------------------------------------------------------------
|
||||
#
|
||||
# Makefile for src/bin/bload
|
||||
#
|
||||
# Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
|
||||
# Portions Copyright (c) 1994, Regents of the University of California
|
||||
#
|
||||
# src/bin/bload/Makefile
|
||||
#
|
||||
#-------------------------------------------------------------------------
|
||||
|
||||
citus_subdir = src/bin/bload
|
||||
citus_top_builddir = ../../..
|
||||
|
||||
PROGRAM = bload
|
||||
|
||||
PGFILEDESC = "bload - the zeromq client for bulkload"
|
||||
|
||||
OBJS = bload.o
|
||||
|
||||
PG_LIBS = $(libpq)
|
||||
|
||||
override CFLAGS += -lzmq
|
||||
|
||||
include $(citus_top_builddir)/Makefile.global
|
||||
|
||||
clean: bload-clean
|
||||
bload-clean:
|
||||
rm -f bload$(X) $(OBJS)
|
|
@ -0,0 +1,185 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* bload.c
|
||||
*
|
||||
* This is the zeromq client of bulkload copy. It pulls data from zeromq server
|
||||
* and outputs the message to stdout.
|
||||
*
|
||||
* Copyright (c) 2012-2016, Citus Data, Inc.
|
||||
*
|
||||
* $Id$
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#include "distributed/bload.h"
|
||||
#include <stdbool.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <zmq.h>
|
||||
|
||||
/* constant used in binary protocol */
|
||||
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
|
||||
|
||||
int
|
||||
main(int argc, char *argv[])
|
||||
{
|
||||
FILE *fp = NULL;
|
||||
uint64_t buffer_size = BatchSize + MaxRecordSize + 1;
|
||||
char buf[buffer_size];
|
||||
char connstr[64];
|
||||
int rc;
|
||||
const int zero = 0;
|
||||
const short negative = -1;
|
||||
bool binary = false;
|
||||
|
||||
/* variables for zeromq */
|
||||
void *context = NULL;
|
||||
void *receiver = NULL;
|
||||
void *controller = NULL;
|
||||
int port = 0;
|
||||
int nbytes;
|
||||
|
||||
fp = fopen("/tmp/bload.log", "a");
|
||||
if (!fp)
|
||||
{
|
||||
fp = stderr;
|
||||
}
|
||||
if (argc < 3)
|
||||
{
|
||||
fprintf(fp, "Usage: %s host port [binary]\n", argv[0]);
|
||||
fflush(fp);
|
||||
fclose(fp);
|
||||
return 1;
|
||||
}
|
||||
|
||||
if (argc == 4 && strcmp(argv[3], "binary") == 0)
|
||||
{
|
||||
binary = true;
|
||||
}
|
||||
|
||||
context = zmq_ctx_new();
|
||||
|
||||
/* Socket to receive messages on */
|
||||
receiver = zmq_socket(context, ZMQ_PULL);
|
||||
port = atoi(argv[2]);
|
||||
sprintf(connstr, "tcp://%s:%d", argv[1], port);
|
||||
rc = zmq_connect(receiver, connstr);
|
||||
if (rc != 0)
|
||||
{
|
||||
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
fclose(fp);
|
||||
zmq_close(receiver);
|
||||
zmq_ctx_destroy(context);
|
||||
return 1;
|
||||
}
|
||||
|
||||
/* Socket to receive control message */
|
||||
controller = zmq_socket(context, ZMQ_SUB);
|
||||
sprintf(connstr, "tcp://%s:%d", argv[1], port + 1);
|
||||
rc = zmq_connect(controller, connstr);
|
||||
if (rc != 0)
|
||||
{
|
||||
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
fclose(fp);
|
||||
zmq_close(receiver);
|
||||
zmq_close(controller);
|
||||
zmq_ctx_destroy(context);
|
||||
return 1;
|
||||
}
|
||||
zmq_setsockopt(controller, ZMQ_SUBSCRIBE, "", 0);
|
||||
|
||||
zmq_pollitem_t items[] = {
|
||||
{ receiver, 0, ZMQ_POLLIN, 0 },
|
||||
{ controller, 0, ZMQ_POLLIN, 0 }
|
||||
};
|
||||
|
||||
if (binary)
|
||||
{
|
||||
/* Signature */
|
||||
fwrite(BinarySignature, 1, 11, stdout);
|
||||
|
||||
/* Flags field(no OIDs) */
|
||||
fwrite((void *) &zero, 1, 4, stdout);
|
||||
|
||||
/* No header extenstion */
|
||||
fwrite((void *) &zero, 1, 4, stdout);
|
||||
fflush(stdout);
|
||||
}
|
||||
while (true)
|
||||
{
|
||||
/* wait indefinitely for an event to occur */
|
||||
rc = zmq_poll(items, 2, -1);
|
||||
|
||||
if (rc == -1) /* error occurs */
|
||||
{
|
||||
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
break;
|
||||
}
|
||||
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
|
||||
{
|
||||
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
|
||||
if (nbytes == -1)
|
||||
{
|
||||
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
break;
|
||||
}
|
||||
fwrite(buf, 1, nbytes, stdout);
|
||||
fflush(stdout);
|
||||
}
|
||||
if (items[1].revents & ZMQ_POLLIN) /* receive signal kill */
|
||||
{
|
||||
fprintf(fp, "receive signal kill, wait for exhausting all messages\n");
|
||||
fflush(fp);
|
||||
|
||||
/* consume all messages before exit */
|
||||
while (true)
|
||||
{
|
||||
/* wait 100 milliseconds for an event to occur */
|
||||
rc = zmq_poll(items, 1, 100);
|
||||
if (rc == 0) /* no more messages */
|
||||
{
|
||||
break;
|
||||
}
|
||||
else if (rc == -1) /* error occurs */
|
||||
{
|
||||
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
break;
|
||||
}
|
||||
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
|
||||
{
|
||||
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
|
||||
if (nbytes == -1)
|
||||
{
|
||||
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
|
||||
fflush(fp);
|
||||
break;
|
||||
}
|
||||
fwrite(buf, 1, nbytes, stdout);
|
||||
fflush(stdout);
|
||||
}
|
||||
}
|
||||
fprintf(fp, "we have consume all messages, exit now\n");
|
||||
fflush(fp);
|
||||
if (binary)
|
||||
{
|
||||
/* Binary footers */
|
||||
fwrite((void *) &negative, 1, 2, stdout);
|
||||
fflush(stdout);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
zmq_close(receiver);
|
||||
zmq_close(controller);
|
||||
zmq_ctx_destroy(context);
|
||||
fclose(fp);
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,18 @@
|
|||
/*-------------------------------------------------------------------------
|
||||
*
|
||||
* bload.h
|
||||
* Definitions of const variables used in bulkload copy for
|
||||
* distributed tables.
|
||||
*
|
||||
* Copyright (c) 2016, Citus Data, Inc.
|
||||
*
|
||||
*-------------------------------------------------------------------------
|
||||
*/
|
||||
|
||||
#ifndef BLOAD_H
|
||||
#define BLOAD_H
|
||||
|
||||
#define BatchSize 1024 /* size of a zeromq message in bytes */
|
||||
#define MaxRecordSize 256 /* size of max acceptable record in bytes */
|
||||
|
||||
#endif
|
|
@ -75,6 +75,8 @@
|
|||
#define UPDATE_SHARD_STATISTICS_QUERY \
|
||||
"SELECT master_update_shard_statistics(%ld)"
|
||||
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
|
||||
#define ACTIVE_WORKER_NODE_QUERY "SELECT * FROM master_get_active_worker_nodes();"
|
||||
#define RELATIONID_QUERY "SELECT logical_relid FROM master_get_table_metadata('%s');"
|
||||
|
||||
/* Enumeration that defines the shard placement policy to use while staging */
|
||||
typedef enum
|
||||
|
|
|
@ -61,11 +61,20 @@ extern List * DistributedTableList(void);
|
|||
extern ShardInterval * LoadShardInterval(uint64 shardId);
|
||||
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
|
||||
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
|
||||
extern void InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent);
|
||||
extern int GetLocalGroupId(void);
|
||||
extern List * DistTableOidList(void);
|
||||
extern List * ShardPlacementList(uint64 shardId);
|
||||
extern void CitusInvalidateRelcacheByRelid(Oid relationId);
|
||||
extern void CitusInvalidateRelcacheByShardId(int64 shardId);
|
||||
extern ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
|
||||
int shardCount,
|
||||
FmgrInfo *
|
||||
shardIntervalSortCompareFunction);
|
||||
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
|
||||
int shardIntervalArrayLength);
|
||||
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
|
||||
int shardCount);
|
||||
|
||||
extern bool CitusHasBeenLoaded(void);
|
||||
|
||||
|
|
|
@ -43,6 +43,17 @@ typedef struct NodeAddress
|
|||
int32 nodePort;
|
||||
} NodeAddress;
|
||||
|
||||
/* struct type to keep zeromq related value */
|
||||
typedef struct ZeroMQServer
|
||||
{
|
||||
char host[NAMEDATALEN];
|
||||
int32 port;
|
||||
char file[NAMEDATALEN];
|
||||
void *context;
|
||||
void *sender;
|
||||
void *controller;
|
||||
} ZeroMQServer;
|
||||
|
||||
|
||||
/* function declarations for copying into a distributed table */
|
||||
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
|
||||
|
@ -56,5 +67,10 @@ extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
|
|||
extern bool IsCopyFromWorker(CopyStmt *copyStatement);
|
||||
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
|
||||
|
||||
/* functions declarations for bulkload copy */
|
||||
extern bool IsBulkloadCopy(CopyStmt *copyStatement);
|
||||
extern bool IsBinaryCopy(CopyStmt *copyStatement);
|
||||
extern bool IsBulkloadClient(CopyStmt *copyStatement);
|
||||
extern void CitusBulkloadCopy(CopyStmt *copyStatement, char *completionTag);
|
||||
|
||||
#endif /* MULTI_COPY_H */
|
||||
|
|
Loading…
Reference in New Issue