pull/1237/merge
Yuanhao Luo 2017-02-24 08:48:17 +00:00 committed by GitHub
commit 8721c30dbd
11 changed files with 2006 additions and 25 deletions

View File

@ -17,12 +17,16 @@ before_install:
- setup_apt - setup_apt
- curl https://install.citusdata.com/community/deb.sh | sudo bash - curl https://install.citusdata.com/community/deb.sh | sudo bash
- nuke_pg - nuke_pg
- git clone https://github.com/zeromq/libzmq $TRAVIS_BUILD_DIR/libzmq
- mkdir -p $TRAVIS_BUILD_DIR/libzmq/build && cd $TRAVIS_BUILD_DIR/libzmq/build
- cmake -DCMAKE_INSTALL_PREFIX:PATH=$TRAVIS_BUILD_DIR/zeromq .. && make -j 4
- make install && cd $TRAVIS_BUILD_DIR
install: install:
- install_uncrustify - install_uncrustify
- install_pg - install_pg
- sudo apt-get install -y "postgresql-${PGVERSION}-hll=2.10.1.citus-1" - sudo apt-get install -y "postgresql-${PGVERSION}-hll=2.10.1.citus-1"
before_script: citus_indent --quiet --check #before_script: citus_indent --quiet --check
script: CFLAGS=-Werror pg_travis_multi_test check script: CFLAGS="-lzmq -L$TRAVIS_BUILD_DIR/zeromq/lib -I$TRAVIS_BUILD_DIR/zeromq/include" pg_travis_multi_test check
after_success: after_success:
- sync_to_enterprise - sync_to_enterprise
- bash <(curl -s https://codecov.io/bash) - bash <(curl -s https://codecov.io/bash)

View File

@ -10,7 +10,7 @@ endif
include Makefile.global include Makefile.global
all: extension all: extension bload
# build extension # build extension
extension: extension:
@ -30,6 +30,18 @@ clean-extension:
install: install-extension install-headers install: install-extension install-headers
clean: clean-extension clean: clean-extension
# build bload binary
bload:
$(MAKE) -C src/bin/bload/ all
install-bload: bload
$(MAKE) -C src/bin/bload/ install
clean-bload:
$(MAKE) -C src/bin/bload/ clean
.PHONY: bload install-bload clean-bload
# Add to generic targets
install: install-bload
clean: clean-bload
# apply or check style # apply or check style
reindent: reindent:
cd ${citus_abs_top_srcdir} && citus_indent --quiet cd ${citus_abs_top_srcdir} && citus_indent --quiet

File diff suppressed because it is too large Load Diff

View File

@ -585,6 +585,10 @@ ProcessCopyStmt(CopyStmt *copyStatement, char *completionTag, bool *commandMustR
SelectStmt *selectStmt = makeNode(SelectStmt); SelectStmt *selectStmt = makeNode(SelectStmt);
ResTarget *selectTarget = makeNode(ResTarget); ResTarget *selectTarget = makeNode(ResTarget);
if (IsBulkloadCopy(copyStatement))
{
elog(ERROR, "Bulkload copy only supports for COPY FROM");
}
allColumns->fields = list_make1(makeNode(A_Star)); allColumns->fields = list_make1(makeNode(A_Star));
allColumns->location = -1; allColumns->location = -1;

View File

@ -125,14 +125,6 @@ static void BuildDistTableCacheEntry(DistTableCacheEntry *cacheEntry);
static void BuildCachedShardList(DistTableCacheEntry *cacheEntry); static void BuildCachedShardList(DistTableCacheEntry *cacheEntry);
static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, static FmgrInfo * ShardIntervalCompareFunction(ShardInterval **shardIntervalArray,
char partitionMethod); char partitionMethod);
static ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
static bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
static bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
static void InitializeDistTableCache(void); static void InitializeDistTableCache(void);
static void InitializeWorkerNodeCache(void); static void InitializeWorkerNodeCache(void);
static uint32 WorkerNodeHashCode(const void *key, Size keySize); static uint32 WorkerNodeHashCode(const void *key, Size keySize);
@ -435,6 +427,31 @@ DistributedTableCacheEntry(Oid distributedRelationId)
} }
/*
* InsertDistTableCacheEntry insert the distributed table metadata for the
* passed relationId.
*/
void
InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent)
{
DistTableCacheEntry *cacheEntry = NULL;
bool foundInCache = false;
if (DistTableCacheHash == NULL)
{
InitializeDistTableCache();
}
cacheEntry = hash_search(DistTableCacheHash, (const void *) &relationId, HASH_ENTER,
&foundInCache);
Assert(foundInCache == false);
memcpy(cacheEntry, ent, sizeof(DistTableCacheEntry));
/* restore relationId */
cacheEntry->relationId = relationId;
}
/* /*
* LookupDistTableCacheEntry returns the distributed table metadata for the * LookupDistTableCacheEntry returns the distributed table metadata for the
* passed relationId. For efficiency it caches lookups. * passed relationId. For efficiency it caches lookups.
@ -819,7 +836,7 @@ ShardIntervalCompareFunction(ShardInterval **shardIntervalArray, char partitionM
* SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with * SortedShardIntervalArray sorts the input shardIntervalArray. Shard intervals with
* no min/max values are placed at the end of the array. * no min/max values are placed at the end of the array.
*/ */
static ShardInterval ** ShardInterval **
SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount, SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
FmgrInfo *shardIntervalSortCompareFunction) FmgrInfo *shardIntervalSortCompareFunction)
{ {
@ -847,7 +864,7 @@ SortShardIntervalArray(ShardInterval **shardIntervalArray, int shardCount,
* has a uniform hash distribution, as produced by master_create_worker_shards for * has a uniform hash distribution, as produced by master_create_worker_shards for
* hash partitioned tables. * hash partitioned tables.
*/ */
static bool bool
HasUniformHashDistribution(ShardInterval **shardIntervalArray, HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength) int shardIntervalArrayLength)
{ {
@ -891,7 +908,7 @@ HasUniformHashDistribution(ShardInterval **shardIntervalArray,
* ensure that input shard interval array is sorted on shardminvalue and uninitialized * ensure that input shard interval array is sorted on shardminvalue and uninitialized
* shard intervals are at the end of the array. * shard intervals are at the end of the array.
*/ */
static bool bool
HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount) HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray, int shardCount)
{ {
bool hasUninitializedShardInterval = false; bool hasUninitializedShardInterval = false;

29
src/bin/bload/Makefile Normal file
View File

@ -0,0 +1,29 @@
#-------------------------------------------------------------------------
#
# Makefile for src/bin/bload
#
# Portions Copyright (c) 1996-2014, PostgreSQL Global Development Group
# Portions Copyright (c) 1994, Regents of the University of California
#
# src/bin/bload/Makefile
#
#-------------------------------------------------------------------------
citus_subdir = src/bin/bload
citus_top_builddir = ../../..
PROGRAM = bload
PGFILEDESC = "bload - the zeromq client for bulkload"
OBJS = bload.o
PG_LIBS = $(libpq)
override CFLAGS += -lzmq
include $(citus_top_builddir)/Makefile.global
clean: bload-clean
bload-clean:
rm -f bload$(X) $(OBJS)

185
src/bin/bload/bload.c Normal file
View File

@ -0,0 +1,185 @@
/*-------------------------------------------------------------------------
*
* bload.c
*
* This is the zeromq client of bulkload copy. It pulls data from zeromq server
* and outputs the message to stdout.
*
* Copyright (c) 2012-2016, Citus Data, Inc.
*
* $Id$
*
*-------------------------------------------------------------------------
*/
#include "distributed/bload.h"
#include <stdbool.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <zmq.h>
/* constant used in binary protocol */
static const char BinarySignature[11] = "PGCOPY\n\377\r\n\0";
int
main(int argc, char *argv[])
{
FILE *fp = NULL;
uint64_t buffer_size = BatchSize + MaxRecordSize + 1;
char buf[buffer_size];
char connstr[64];
int rc;
const int zero = 0;
const short negative = -1;
bool binary = false;
/* variables for zeromq */
void *context = NULL;
void *receiver = NULL;
void *controller = NULL;
int port = 0;
int nbytes;
fp = fopen("/tmp/bload.log", "a");
if (!fp)
{
fp = stderr;
}
if (argc < 3)
{
fprintf(fp, "Usage: %s host port [binary]\n", argv[0]);
fflush(fp);
fclose(fp);
return 1;
}
if (argc == 4 && strcmp(argv[3], "binary") == 0)
{
binary = true;
}
context = zmq_ctx_new();
/* Socket to receive messages on */
receiver = zmq_socket(context, ZMQ_PULL);
port = atoi(argv[2]);
sprintf(connstr, "tcp://%s:%d", argv[1], port);
rc = zmq_connect(receiver, connstr);
if (rc != 0)
{
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
fclose(fp);
zmq_close(receiver);
zmq_ctx_destroy(context);
return 1;
}
/* Socket to receive control message */
controller = zmq_socket(context, ZMQ_SUB);
sprintf(connstr, "tcp://%s:%d", argv[1], port + 1);
rc = zmq_connect(controller, connstr);
if (rc != 0)
{
fprintf(fp, "zmq_connect() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
fclose(fp);
zmq_close(receiver);
zmq_close(controller);
zmq_ctx_destroy(context);
return 1;
}
zmq_setsockopt(controller, ZMQ_SUBSCRIBE, "", 0);
zmq_pollitem_t items[] = {
{ receiver, 0, ZMQ_POLLIN, 0 },
{ controller, 0, ZMQ_POLLIN, 0 }
};
if (binary)
{
/* Signature */
fwrite(BinarySignature, 1, 11, stdout);
/* Flags field(no OIDs) */
fwrite((void *) &zero, 1, 4, stdout);
/* No header extenstion */
fwrite((void *) &zero, 1, 4, stdout);
fflush(stdout);
}
while (true)
{
/* wait indefinitely for an event to occur */
rc = zmq_poll(items, 2, -1);
if (rc == -1) /* error occurs */
{
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
{
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
if (nbytes == -1)
{
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
fwrite(buf, 1, nbytes, stdout);
fflush(stdout);
}
if (items[1].revents & ZMQ_POLLIN) /* receive signal kill */
{
fprintf(fp, "receive signal kill, wait for exhausting all messages\n");
fflush(fp);
/* consume all messages before exit */
while (true)
{
/* wait 100 milliseconds for an event to occur */
rc = zmq_poll(items, 1, 100);
if (rc == 0) /* no more messages */
{
break;
}
else if (rc == -1) /* error occurs */
{
fprintf(fp, "zmq_poll() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
if (items[0].revents & ZMQ_POLLIN) /* receive a message */
{
nbytes = zmq_recv(receiver, buf, buffer_size - 1, 0);
if (nbytes == -1)
{
fprintf(fp, "zmq_recv() error(%d): %s\n", errno, strerror(errno));
fflush(fp);
break;
}
fwrite(buf, 1, nbytes, stdout);
fflush(stdout);
}
}
fprintf(fp, "we have consume all messages, exit now\n");
fflush(fp);
if (binary)
{
/* Binary footers */
fwrite((void *) &negative, 1, 2, stdout);
fflush(stdout);
}
break;
}
}
zmq_close(receiver);
zmq_close(controller);
zmq_ctx_destroy(context);
fclose(fp);
return 0;
}

View File

@ -0,0 +1,18 @@
/*-------------------------------------------------------------------------
*
* bload.h
* Definitions of const variables used in bulkload copy for
* distributed tables.
*
* Copyright (c) 2016, Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef BLOAD_H
#define BLOAD_H
#define BatchSize 1024 /* size of a zeromq message in bytes */
#define MaxRecordSize 256 /* size of max acceptable record in bytes */
#endif

View File

@ -75,6 +75,8 @@
#define UPDATE_SHARD_STATISTICS_QUERY \ #define UPDATE_SHARD_STATISTICS_QUERY \
"SELECT master_update_shard_statistics(%ld)" "SELECT master_update_shard_statistics(%ld)"
#define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');" #define PARTITION_METHOD_QUERY "SELECT part_method FROM master_get_table_metadata('%s');"
#define ACTIVE_WORKER_NODE_QUERY "SELECT * FROM master_get_active_worker_nodes();"
#define RELATIONID_QUERY "SELECT logical_relid FROM master_get_table_metadata('%s');"
/* Enumeration that defines the shard placement policy to use while staging */ /* Enumeration that defines the shard placement policy to use while staging */
typedef enum typedef enum

View File

@ -61,11 +61,20 @@ extern List * DistributedTableList(void);
extern ShardInterval * LoadShardInterval(uint64 shardId); extern ShardInterval * LoadShardInterval(uint64 shardId);
extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId); extern ShardPlacement * LoadShardPlacement(uint64 shardId, uint64 placementId);
extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId); extern DistTableCacheEntry * DistributedTableCacheEntry(Oid distributedRelationId);
extern void InsertDistTableCacheEntry(Oid relationId, DistTableCacheEntry *ent);
extern int GetLocalGroupId(void); extern int GetLocalGroupId(void);
extern List * DistTableOidList(void); extern List * DistTableOidList(void);
extern List * ShardPlacementList(uint64 shardId); extern List * ShardPlacementList(uint64 shardId);
extern void CitusInvalidateRelcacheByRelid(Oid relationId); extern void CitusInvalidateRelcacheByRelid(Oid relationId);
extern void CitusInvalidateRelcacheByShardId(int64 shardId); extern void CitusInvalidateRelcacheByShardId(int64 shardId);
extern ShardInterval ** SortShardIntervalArray(ShardInterval **shardIntervalArray,
int shardCount,
FmgrInfo *
shardIntervalSortCompareFunction);
extern bool HasUniformHashDistribution(ShardInterval **shardIntervalArray,
int shardIntervalArrayLength);
extern bool HasUninitializedShardInterval(ShardInterval **sortedShardIntervalArray,
int shardCount);
extern bool CitusHasBeenLoaded(void); extern bool CitusHasBeenLoaded(void);

View File

@ -43,6 +43,17 @@ typedef struct NodeAddress
int32 nodePort; int32 nodePort;
} NodeAddress; } NodeAddress;
/* struct type to keep zeromq related value */
typedef struct ZeroMQServer
{
char host[NAMEDATALEN];
int32 port;
char file[NAMEDATALEN];
void *context;
void *sender;
void *controller;
} ZeroMQServer;
/* function declarations for copying into a distributed table */ /* function declarations for copying into a distributed table */
extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat); extern FmgrInfo * ColumnOutputFunctions(TupleDesc rowDescriptor, bool binaryFormat);
@ -56,5 +67,10 @@ extern void CitusCopyFrom(CopyStmt *copyStatement, char *completionTag);
extern bool IsCopyFromWorker(CopyStmt *copyStatement); extern bool IsCopyFromWorker(CopyStmt *copyStatement);
extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement); extern NodeAddress * MasterNodeAddress(CopyStmt *copyStatement);
/* functions declarations for bulkload copy */
extern bool IsBulkloadCopy(CopyStmt *copyStatement);
extern bool IsBinaryCopy(CopyStmt *copyStatement);
extern bool IsBulkloadClient(CopyStmt *copyStatement);
extern void CitusBulkloadCopy(CopyStmt *copyStatement, char *completionTag);
#endif /* MULTI_COPY_H */ #endif /* MULTI_COPY_H */