Attempt at using libcurl for COPY

marcocitus/curl-copy
Marco Slot 2022-03-22 16:21:19 +01:00
parent 1e3c8e34c0
commit c1a5104665
3 changed files with 169 additions and 2 deletions

View File

@ -0,0 +1,116 @@
/*-------------------------------------------------------------------------
*
* copy_url.c
* COPY from a URL using libcurl
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "fmgr.h"
#include "miscadmin.h"
#include "citus_version.h"
#include "distributed/commands/copy_url.h"
#include "storage/latch.h"
#include "utils/wait_event.h"
bool EnableCopyFromURL = true;
#ifdef HAVE_LIBCURL
#include <curl/curl.h>
/* HTTP client (libcurl) */
static CURL *curl = NULL;
static int CurrentSocket = 0;
/*
* OpenURL opens the given URL such that it can subsequently be read via
* ReadBytesFromURL.
*/
void
OpenURL(char *url)
{
/* clean up any leftovers */
CloseURL();
curl = curl_easy_init();
curl_easy_setopt(curl, CURLOPT_URL, url);
curl_easy_setopt(curl, CURLOPT_CONNECT_ONLY, 1L);
CURLcode res = curl_easy_perform(curl);
if (res != CURLE_OK)
{
ereport(ERROR, (errmsg("failed to get URL: %s", curl_easy_strerror(res))));
}
res = curl_easy_getinfo(curl, CURLINFO_ACTIVESOCKET, &CurrentSocket);
}
/*
* ReadBytesFromURL reads bytes from a URL using libcurl.
*/
int
ReadBytesFromURL(void *outbuf, int minread, int maxread)
{
size_t nread;
CURLcode res;
do
{
nread = 0;
res = curl_easy_recv(curl, (char *) outbuf, maxread, &nread);
if (res == CURLE_AGAIN)
{
int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE;
int rc = WaitLatchOrSocket(MyLatch, waitFlags, CurrentSocket, 30000,
PG_WAIT_IO);
if (rc & WL_POSTMASTER_DEATH)
{
ereport(ERROR, (errmsg("postmaster was shut down, exiting")));
}
if (rc & WL_TIMEOUT)
{
ereport(ERROR, (errmsg("timeout while requesting URL")));
}
if (rc & WL_LATCH_SET)
{
ResetLatch(MyLatch);
CHECK_FOR_INTERRUPTS();
}
}
}
while (res == CURLE_AGAIN);
elog(NOTICE, "read %d bytes", nread);
return (int) nread;
}
/*
* CloseURL closes a currently opened URL.
*/
void
CloseURL(void)
{
if (curl != NULL)
{
curl_easy_cleanup(curl);
curl = NULL;
}
}
#endif

View File

@ -52,6 +52,8 @@
#include "miscadmin.h"
#include "pgstat.h"
#include "citus_version.h"
#include <arpa/inet.h> /* for htons */
#include <netinet/in.h> /* for htons */
#include <string.h>
@ -70,6 +72,7 @@
#include "commands/defrem.h"
#include "commands/progress.h"
#include "distributed/citus_safe_lib.h"
#include "distributed/commands/copy_url.h"
#include "distributed/commands/multi_copy.h"
#include "distributed/commands/utility_hook.h"
#include "distributed/intermediate_results.h"
@ -306,6 +309,7 @@ static void CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completi
static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState,
MultiConnection *connection);
/* Private functions copied and adapted from copy.c in PostgreSQL */
static void SendCopyBegin(CopyOutState cstate);
static void SendCopyEnd(CopyOutState cstate);
@ -543,13 +547,30 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
copiedDistributedRelationTuple->relkind = RELKIND_RELATION;
}
char *fileName = copyStatement->filename;
copy_data_source_cb dataSource = NULL;
#ifdef HAVE_LIBCURL
if (fileName != NULL)
{
if (strncmp(fileName, "http://", strlen("http://")) == 0 ||
strncmp(fileName, "https://", strlen("https://")) == 0)
{
OpenURL(fileName);
dataSource = ReadBytesFromURL;
fileName = NULL;
}
}
#endif
/* initialize copy state to read from COPY data source */
CopyFromState copyState = BeginCopyFrom_compat(NULL,
copiedDistributedRelation,
NULL,
copyStatement->filename,
fileName,
copyStatement->is_program,
NULL,
dataSource,
copyStatement->attlist,
copyStatement->options);
@ -607,6 +628,10 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT
{
CompleteCopyQueryTagCompat(completionTag, processedRowCount);
}
#ifdef HAVE_LIBCURL
CloseURL();
#endif
}

View File

@ -0,0 +1,26 @@
/*-------------------------------------------------------------------------
*
* copy_url.h
* Function for copying from a URL.
*
* Copyright (c) Citus Data, Inc.
*
*-------------------------------------------------------------------------
*/
#ifndef COPY_URL_H
#define COPY_URL_H
#include "citus_version.h"
/* Config variables managed via guc.c */
extern bool EnableCopyFromURL;
#ifdef HAVE_LIBCURL
void OpenURL(char *url);
int ReadBytesFromURL(void *outbuf, int minread, int maxread);
void CloseURL(void);
#endif /* HAVE_LIBCURL */
#endif /* COPY_URL_H */