diff --git a/src/backend/distributed/commands/copy_url.c b/src/backend/distributed/commands/copy_url.c new file mode 100644 index 000000000..4773b616c --- /dev/null +++ b/src/backend/distributed/commands/copy_url.c @@ -0,0 +1,116 @@ +/*------------------------------------------------------------------------- + * + * copy_url.c + * COPY from a URL using libcurl + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#include "postgres.h" +#include "fmgr.h" +#include "miscadmin.h" + +#include "citus_version.h" + +#include "distributed/commands/copy_url.h" +#include "storage/latch.h" +#include "utils/wait_event.h" + +bool EnableCopyFromURL = true; + +#ifdef HAVE_LIBCURL + +#include + + +/* HTTP client (libcurl) */ +static CURL *curl = NULL; +static int CurrentSocket = 0; + + +/* + * OpenURL opens the given URL such that it can subsequently be read via + * ReadBytesFromURL. + */ +void +OpenURL(char *url) +{ + /* clean up any leftovers */ + CloseURL(); + + curl = curl_easy_init(); + + curl_easy_setopt(curl, CURLOPT_URL, url); + curl_easy_setopt(curl, CURLOPT_CONNECT_ONLY, 1L); + + CURLcode res = curl_easy_perform(curl); + if (res != CURLE_OK) + { + ereport(ERROR, (errmsg("failed to get URL: %s", curl_easy_strerror(res)))); + } + + res = curl_easy_getinfo(curl, CURLINFO_ACTIVESOCKET, &CurrentSocket); +} + + +/* + * ReadBytesFromURL reads bytes from a URL using libcurl. + */ +int +ReadBytesFromURL(void *outbuf, int minread, int maxread) +{ + size_t nread; + CURLcode res; + + do + { + nread = 0; + res = curl_easy_recv(curl, (char *) outbuf, maxread, &nread); + + if (res == CURLE_AGAIN) + { + int waitFlags = WL_POSTMASTER_DEATH | WL_LATCH_SET | WL_SOCKET_READABLE; + + int rc = WaitLatchOrSocket(MyLatch, waitFlags, CurrentSocket, 30000, + PG_WAIT_IO); + if (rc & WL_POSTMASTER_DEATH) + { + ereport(ERROR, (errmsg("postmaster was shut down, exiting"))); + } + + if (rc & WL_TIMEOUT) + { + ereport(ERROR, (errmsg("timeout while requesting URL"))); + } + + if (rc & WL_LATCH_SET) + { + ResetLatch(MyLatch); + CHECK_FOR_INTERRUPTS(); + } + } + } + while (res == CURLE_AGAIN); + +elog(NOTICE, "read %d bytes", nread); + + return (int) nread; +} + + +/* + * CloseURL closes a currently opened URL. + */ +void +CloseURL(void) +{ + if (curl != NULL) + { + curl_easy_cleanup(curl); + curl = NULL; + } +} + +#endif diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index d2d7d9b23..9bbff2dae 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -52,6 +52,8 @@ #include "miscadmin.h" #include "pgstat.h" +#include "citus_version.h" + #include /* for htons */ #include /* for htons */ #include @@ -70,6 +72,7 @@ #include "commands/defrem.h" #include "commands/progress.h" #include "distributed/citus_safe_lib.h" +#include "distributed/commands/copy_url.h" #include "distributed/commands/multi_copy.h" #include "distributed/commands/utility_hook.h" #include "distributed/intermediate_results.h" @@ -306,6 +309,7 @@ static void CitusCopyTo(CopyStmt *copyStatement, QueryCompletionCompat *completi static int64 ForwardCopyDataFromConnection(CopyOutState copyOutState, MultiConnection *connection); + /* Private functions copied and adapted from copy.c in PostgreSQL */ static void SendCopyBegin(CopyOutState cstate); static void SendCopyEnd(CopyOutState cstate); @@ -543,13 +547,30 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT copiedDistributedRelationTuple->relkind = RELKIND_RELATION; } + char *fileName = copyStatement->filename; + copy_data_source_cb dataSource = NULL; + +#ifdef HAVE_LIBCURL + if (fileName != NULL) + { + if (strncmp(fileName, "http://", strlen("http://")) == 0 || + strncmp(fileName, "https://", strlen("https://")) == 0) + { + OpenURL(fileName); + + dataSource = ReadBytesFromURL; + fileName = NULL; + } + } +#endif + /* initialize copy state to read from COPY data source */ CopyFromState copyState = BeginCopyFrom_compat(NULL, copiedDistributedRelation, NULL, - copyStatement->filename, + fileName, copyStatement->is_program, - NULL, + dataSource, copyStatement->attlist, copyStatement->options); @@ -607,6 +628,10 @@ CopyToExistingShards(CopyStmt *copyStatement, QueryCompletionCompat *completionT { CompleteCopyQueryTagCompat(completionTag, processedRowCount); } + +#ifdef HAVE_LIBCURL + CloseURL(); +#endif } diff --git a/src/include/distributed/commands/copy_url.h b/src/include/distributed/commands/copy_url.h new file mode 100644 index 000000000..791172be9 --- /dev/null +++ b/src/include/distributed/commands/copy_url.h @@ -0,0 +1,26 @@ +/*------------------------------------------------------------------------- + * + * copy_url.h + * Function for copying from a URL. + * + * Copyright (c) Citus Data, Inc. + * + *------------------------------------------------------------------------- + */ + +#ifndef COPY_URL_H +#define COPY_URL_H + +#include "citus_version.h" + +/* Config variables managed via guc.c */ +extern bool EnableCopyFromURL; + +#ifdef HAVE_LIBCURL + +void OpenURL(char *url); +int ReadBytesFromURL(void *outbuf, int minread, int maxread); +void CloseURL(void); + +#endif /* HAVE_LIBCURL */ +#endif /* COPY_URL_H */