/*
* This file is part of mpv.
*
* mpv is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* mpv is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with mpv. If not, see .
*/
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include
#include "stream.h"
#include "stream_curl.h"
#include "common/common.h"
#include "common/global.h"
#include "common/msg.h"
#include "cookies.h"
#include "demux/demux.h"
#include "misc/bstr.h"
#include "misc/dispatch.h"
#include "misc/path_utils.h"
#include "misc/thread_tools.h"
#include "mpv_talloc.h"
#include "network.h"
#include "options/m_config.h"
#include "options/m_option.h"
#include "options/path.h"
#include "osdep/threads.h"
#include "osdep/timer.h"
enum curl_proto {
MP_CURL_PROTO_HTTP,
MP_CURL_PROTO_FTP,
};
struct curl_scheme {
bstr scheme;
enum curl_proto proto;
};
static const struct curl_scheme curl_schemes[] = {
{bstr0_lit("http"), MP_CURL_PROTO_HTTP},
{bstr0_lit("https"), MP_CURL_PROTO_HTTP},
{bstr0_lit("ftp"), MP_CURL_PROTO_FTP},
{bstr0_lit("ftps"), MP_CURL_PROTO_FTP},
};
// Special args for use by lavf. Matches lavf/http.c "offset"/"end_offset" opts.
// `offset` is the inclusive starting byte.
// `end_offset` is the exclusive upper bound (0 = unbounded).
struct curl_open_args {
int64_t offset;
int64_t end_offset;
};
struct curl_opts {
bool enabled;
int http_version;
int max_redirects;
int max_retries;
double connect_timeout;
int64_t buffer_size;
int64_t max_request_size;
};
#ifndef CURL_HTTP_VERSION_3
#define CURL_HTTP_VERSION_3 CURL_HTTP_VERSION_NONE
#endif
#ifndef CURL_HTTP_VERSION_3ONLY
#define CURL_HTTP_VERSION_3ONLY CURL_HTTP_VERSION_NONE
#endif
#define OPT_BASE_STRUCT struct curl_opts
const struct m_sub_options curl_conf = {
.opts = (const struct m_option[]) {
{"enabled", OPT_BOOL(enabled)},
{"http-version", OPT_CHOICE(http_version,
{"auto", CURL_HTTP_VERSION_NONE},
{"1.0", CURL_HTTP_VERSION_1_0},
{"1.1", CURL_HTTP_VERSION_1_1},
{"2", CURL_HTTP_VERSION_2},
{"2tls", CURL_HTTP_VERSION_2TLS},
{"2-prior-knowledge", CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE},
{"3", CURL_HTTP_VERSION_3},
{"3only", CURL_HTTP_VERSION_3ONLY}
)},
{"max-redirects", OPT_INT(max_redirects), M_RANGE(0, 100)},
{"max-retries", OPT_INT(max_retries), M_RANGE(0, 100)},
{"connect-timeout", OPT_DOUBLE(connect_timeout), M_RANGE(0, 600)},
{"buffer-size", OPT_BYTE_SIZE(buffer_size),
M_RANGE(2 * CURL_MAX_WRITE_SIZE, M_MAX_MEM_BYTES)},
{"max-request-size", OPT_BYTE_SIZE(max_request_size),
M_RANGE(0, M_MAX_MEM_BYTES)},
{0}
},
.defaults = &(const struct curl_opts) {
// Older lavf has a bug with nested IO cleanup, disable by default.
//
.enabled = LIBAVFORMAT_VERSION_INT >= AV_VERSION_INT(62, 15, 101),
.http_version = CURL_HTTP_VERSION_NONE,
.max_redirects = 16,
.max_retries = 5,
.connect_timeout = 30,
.buffer_size = 4 << 20, // 4 MiB
.max_request_size = 0,
},
.size = sizeof(struct curl_opts),
};
static const struct curl_scheme *curl_scheme_lookup(bstr url)
{
bstr scheme = mp_split_proto(url, NULL);
for (int i = 0; i < MP_ARRAY_SIZE(curl_schemes); i++) {
if (bstrcasecmp(scheme, curl_schemes[i].scheme) == 0)
return &curl_schemes[i];
}
return NULL;
}
struct curl_ctx {
mp_thread thread;
struct mp_dispatch_queue *dispatch;
CURLM *multi;
bool exit;
};
// Per-stream state, owned by the curl thread.
struct priv {
struct mp_log *log;
struct mpv_global *global;
struct curl_ctx *ctx;
struct stream *s;
struct curl_opts *opts;
struct mp_network_opts *net_opts;
CURL *curl;
struct curl_slist *headers;
char *url;
const struct curl_scheme *scheme;
// Stream parameters
bool seekable;
int64_t content_size; // -1 if unknown
// Producer state. Only touched by the curl thread.
uint64_t request_start; // absolute byte position of next request
uint64_t request_received; // bytes received in the current request
uint64_t request_end; // exclusive byte cap (0 = unbounded)
int retry_count; // consecutive failed attempts at request_start
bool active; // handle is currently active in the multi
bool finished; // current request has reached EOF
// Probe state. Set on the curl thread read by curl_open after.
bool probed;
bool stream_ok;
// Shared state, protected by mtx.
mp_mutex mtx;
mp_cond cond;
uint8_t *buffer;
size_t buffer_size;
size_t head, tail, count;
bool paused; // write callback paused due to a full buffer
bool stream_eof; // producer has delivered all data
bool stream_error; // unrecoverable error
atomic_bool aborted; // canceled by user (mp_cancel)
};
// Curl thread
enum cmd_kind {
CMD_ADD,
CMD_REMOVE,
CMD_SEEK,
CMD_UNPAUSE,
CMD_EXIT,
};
struct cmd {
enum cmd_kind kind;
struct curl_ctx *ctx;
struct priv *p;
int64_t pos;
bool drop;
};
static void start_request(struct priv *p);
static void on_done(struct priv *p, CURLcode code);
static void run_cmd(void *arg)
{
struct cmd *c = arg;
struct curl_ctx *ctx = c->ctx;
switch (c->kind) {
case CMD_ADD:
MP_TRACE(c->p, "starting curl request at %" PRIu64 "\n", c->p->request_start);
start_request(c->p);
break;
case CMD_REMOVE:
if (c->p->active) {
MP_TRACE(c->p, "removing curl handle\n");
curl_multi_remove_handle(ctx->multi, c->p->curl);
c->p->active = false;
}
break;
case CMD_UNPAUSE:
// The consumer freed enough buffer space. Clear the pause flag and
// resume the transfer.
mp_mutex_lock(&c->p->mtx);
MP_TRACE(c->p, "resuming curl transfer\n");
c->p->paused = false;
mp_mutex_unlock(&c->p->mtx);
curl_easy_pause(c->p->curl, CURLPAUSE_CONT);
break;
case CMD_SEEK:
MP_TRACE(c->p, "seeking to %" PRIu64 "\n", c->pos);
if (c->p->active) {
curl_multi_remove_handle(ctx->multi, c->p->curl);
c->p->active = false;
}
mp_mutex_lock(&c->p->mtx);
if (c->drop)
c->p->head = c->p->tail = c->p->count = 0;
c->p->paused = false;
c->p->stream_eof = false;
c->p->stream_error = false;
mp_cond_broadcast(&c->p->cond);
mp_mutex_unlock(&c->p->mtx);
c->p->request_start = c->pos;
c->p->request_received = 0;
c->p->retry_count = 0;
c->p->finished = false;
start_request(c->p);
break;
case CMD_EXIT:
ctx->exit = true;
break;
}
}
static void cmd_async(struct priv *p, enum cmd_kind kind)
{
struct cmd *c = talloc_zero(NULL, struct cmd);
c->kind = kind;
c->ctx = p->ctx;
c->p = p;
mp_dispatch_enqueue_autofree(p->ctx->dispatch, run_cmd, c);
}
static void cmd_sync(struct priv *p, enum cmd_kind kind, int64_t pos, bool drop)
{
struct cmd c = {
.kind = kind,
.ctx = p->ctx,
.p = p,
.pos = pos,
.drop = drop,
};
mp_dispatch_run(p->ctx->dispatch, run_cmd, &c);
}
static void curl_wakeup(void *arg)
{
struct curl_ctx *ctx = arg;
curl_multi_wakeup(ctx->multi);
}
static MP_THREAD_VOID curl_thread(void *arg)
{
mp_thread_set_name("curl");
struct curl_ctx *ctx = arg;
curl_global_init(CURL_GLOBAL_ALL);
ctx->multi = curl_multi_init();
curl_multi_setopt(ctx->multi, CURLMOPT_PIPELINING, CURLPIPE_MULTIPLEX);
mp_dispatch_set_wakeup_fn(ctx->dispatch, curl_wakeup, ctx);
while (!ctx->exit) {
mp_dispatch_queue_process(ctx->dispatch, 0);
// Stop early to avoid delays, this happens only when player is closing.
if (ctx->exit)
break;
int running = 0;
CURLMcode mres = curl_multi_perform(ctx->multi, &running);
if (mres != CURLM_OK && mres != CURLM_CALL_MULTI_PERFORM)
break;
CURLMsg *msg;
int left = 0;
while ((msg = curl_multi_info_read(ctx->multi, &left))) {
if (msg->msg != CURLMSG_DONE)
continue;
struct priv *p = NULL;
curl_easy_getinfo(msg->easy_handle, CURLINFO_PRIVATE, &p);
mp_assert(p);
curl_multi_remove_handle(ctx->multi, msg->easy_handle);
p->active = false;
on_done(p, msg->data.result);
}
curl_multi_poll(ctx->multi, NULL, 0, 1000, NULL);
}
curl_multi_cleanup(ctx->multi);
curl_global_cleanup();
MP_THREAD_RETURN();
}
static void mp_curl_destroy(void *ptr)
{
struct curl_ctx *ctx = ptr;
struct cmd c = { .kind = CMD_EXIT, .ctx = ctx };
mp_dispatch_run(ctx->dispatch, run_cmd, &c);
mp_thread_join(ctx->thread);
}
void mp_curl_global_init(struct mpv_global *global)
{
struct curl_ctx *ctx = talloc_zero(global, struct curl_ctx);
talloc_set_destructor(ctx, mp_curl_destroy);
ctx->dispatch = mp_dispatch_create(ctx);
global->curl = ctx;
mp_require(!mp_thread_create(&ctx->thread, curl_thread, ctx));
}
// Curl callbacks
static bool is_http_success(long resp)
{
return resp >= 200 && resp < 300;
}
// Called per chunk of body data.
static size_t write_callback(char *ptr, size_t size, size_t nmemb, void *userdata)
{
struct priv *p = userdata;
size_t bytes = size * nmemb;
// header_callback validated the response and logged any error status,
// we don't care about error body.
if (!p->stream_ok)
return CURL_WRITEFUNC_ERROR;
if (atomic_load_explicit(&p->aborted, memory_order_relaxed))
return CURL_WRITEFUNC_ERROR;
mp_mutex_lock(&p->mtx);
if (p->buffer_size - p->count < bytes) {
// No room in the buffer. Pause the transfer and wait for the consumer.
p->paused = true;
mp_mutex_unlock(&p->mtx);
MP_TRACE(p, "pausing curl transfer, buffer full (%zu bytes)\n", p->count);
return CURL_WRITEFUNC_PAUSE;
}
size_t tail_chunk = MPMIN(p->buffer_size - p->tail, bytes);
memcpy(p->buffer + p->tail, ptr, tail_chunk);
memcpy(p->buffer, ptr + tail_chunk, bytes - tail_chunk);
p->tail = (p->tail + bytes) % p->buffer_size;
p->count += bytes;
p->paused = false;
p->request_received += bytes;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
return bytes;
}
static int xferinfo_callback(void *userdata, curl_off_t dl_total, curl_off_t dl_now,
curl_off_t ul_total, curl_off_t ul_now)
{
struct priv *p = userdata;
return atomic_load_explicit(&p->aborted, memory_order_relaxed);
}
static int64_t parse_content_range_total(const char *value)
{
if (!value)
return -1;
bstr after;
if (!bstr_split_tok(bstr0(value), "/", &(bstr){0}, &after))
return -1;
bstr rest;
long long total = bstrtoll(after, &rest, 10);
return (rest.len == 0 && total > 0) ? (int64_t)total : -1;
}
static const char *header_value(CURL *c, const char *name)
{
struct curl_header *h = NULL;
if (curl_easy_header(c, name, 0, CURLH_HEADER, -1, &h) == CURLHE_OK)
return h->value;
return NULL;
}
static void finalize_probe(struct priv *p)
{
if (mp_msg_test(p->log, MSGL_DEBUG)) {
long resp = 0;
char *ctype = NULL;
curl_easy_getinfo(p->curl, CURLINFO_RESPONSE_CODE, &resp);
curl_easy_getinfo(p->curl, CURLINFO_CONTENT_TYPE, &ctype);
MP_DBG(p, "proto=%.*s ok=%d code=%ld size=%" PRId64 " seekable=%d type=%s\n",
BSTR_P(p->scheme->scheme), p->stream_ok, resp,
p->content_size, p->seekable, ctype ? ctype : "-");
}
mp_mutex_lock(&p->mtx);
p->probed = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
}
// Empty line is the end of the header. Skip intermediate 1xx and 3xx responses,
// we care about the final one.
static void probe_http(struct priv *p, struct bstr line)
{
if (line.len > 0)
return;
long resp = 0;
curl_easy_getinfo(p->curl, CURLINFO_RESPONSE_CODE, &resp);
if (resp < 200 || (resp >= 300 && resp < 400))
return;
if (!is_http_success(resp)) {
MP_ERR(p, "HTTP error %ld\n", resp);
goto done;
}
// Compressed responses are byte-addressed in the encoded representation,
// which our caller can't translate, so they are non-seekable.
const char *ce = header_value(p->curl, "Content-Encoding");
bool compressed = ce && ce[0] && strcasecmp(ce, "identity") != 0;
const char *ar = header_value(p->curl, "Accept-Ranges");
bool accept_ranges = ar && strcasecmp(ar, "bytes") == 0;
// Some servers reply 200 to an open-ended "Range: 0-" but 206 to explicit
// byte ranges, so trust either.
p->seekable = !compressed && (resp == 206 || accept_ranges);
if (p->seekable) {
// Content-Range carries the full size on a partial response. On any
// non-206 success code use Content-Length.
int64_t total = parse_content_range_total(header_value(p->curl, "Content-Range"));
if (total < 0 && resp != 206) {
curl_off_t cl = -1;
if (curl_easy_getinfo(p->curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
&cl) == CURLE_OK && cl >= 0)
total = cl;
}
p->content_size = total;
}
p->stream_ok = true;
done:
finalize_probe(p);
}
static void probe_ftp(struct priv *p, struct bstr line)
{
if (line.len < 4 || line.start[3] != ' ')
return;
// Parse the line directly: libcurl only stamps CURLINFO_RESPONSE_CODE after
// a reply is fully processed, so polling it from header_callback returns
// the previous code.
struct bstr code = {line.start, 3};
if (!bstr_equals0(code, "150") && !bstr_equals0(code, "125"))
return;
curl_off_t cl = -1;
if (curl_easy_getinfo(p->curl, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T,
&cl) == CURLE_OK && cl >= 0)
p->content_size = cl;
p->seekable = p->content_size > 0;
p->stream_ok = true;
finalize_probe(p);
}
// Called per header line.
static size_t header_callback(char *buffer, size_t size, size_t nitems, void *userdata)
{
struct priv *p = userdata;
size_t bytes = size * nitems;
if (p->probed)
return bytes;
struct bstr line = bstr_strip_linebreaks((bstr){buffer, bytes});
switch (p->scheme->proto) {
case MP_CURL_PROTO_HTTP:
probe_http(p, line);
break;
case MP_CURL_PROTO_FTP:
probe_ftp(p, line);
break;
default:
break;
}
return bytes;
}
static int debug_callback(CURL *handle, curl_infotype type, char *data, size_t size,
void *userdata)
{
struct priv *p = userdata;
const char *prefix;
switch (type) {
case CURLINFO_TEXT: prefix = "* "; break;
case CURLINFO_HEADER_IN: prefix = "< "; break;
case CURLINFO_HEADER_OUT: prefix = "> "; break;
default: return 0;
}
bstr msg = bstr_strip_linebreaks((bstr){data, size});
MP_TRACE(p, "%s%.*s\n", prefix, BSTR_P(msg));
return 0;
}
// Request handling
static bool is_recoverable_error(CURLcode code)
{
switch (code) {
case CURLE_RECV_ERROR:
case CURLE_SEND_ERROR:
case CURLE_PARTIAL_FILE:
case CURLE_OPERATION_TIMEDOUT:
case CURLE_GOT_NOTHING:
case CURLE_COULDNT_CONNECT:
case CURLE_COULDNT_RESOLVE_HOST:
case CURLE_HTTP2:
case CURLE_HTTP2_STREAM:
return true;
default:
return false;
}
}
static void start_request(struct priv *p)
{
if (p->finished) {
mp_mutex_lock(&p->mtx);
p->stream_eof = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
return;
}
uint64_t start = p->request_start;
bool ranged = !p->probed || p->seekable;
bool chunked = ranged && p->opts->max_request_size > 0;
bool capped = ranged && p->request_end > 0;
bool past_size = p->seekable && p->content_size > 0 && start >= p->content_size;
bool past_end = capped && start >= p->request_end;
if (past_size || past_end) {
p->finished = true;
mp_mutex_lock(&p->mtx);
p->stream_eof = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
return;
}
char range[64];
if (chunked || capped) {
uint64_t end = UINT64_MAX;
if (chunked)
end = start + p->opts->max_request_size - 1;
if (p->content_size > 0)
end = MPMIN(end, p->content_size - 1);
if (capped)
end = MPMIN(end, p->request_end - 1);
snprintf(range, sizeof(range), "%" PRIu64 "-%" PRIu64, start, end);
curl_easy_setopt(p->curl, CURLOPT_RANGE, range);
} else if (ranged) {
snprintf(range, sizeof(range), "%" PRIu64 "-", start);
curl_easy_setopt(p->curl, CURLOPT_RANGE, range);
} else {
curl_easy_setopt(p->curl, CURLOPT_RANGE, NULL);
}
p->request_received = 0;
p->active = true;
curl_multi_add_handle(p->ctx->multi, p->curl);
}
static void log_curl_error(struct priv *p, const char *what, CURLcode code)
{
MP_ERR(p, "%s: %s\n", what, curl_easy_strerror(code));
if (code == CURLE_PEER_FAILED_VERIFICATION ||
code == CURLE_SSL_CACERT_BADFILE)
{
MP_ERR(p,
"TLS certificate verification failed.\n"
"This usually means an outdated CA bundle, a self-signed "
"certificate,\nor a MITM proxy on your network. To bypass at "
"your own risk, pass\n--tls-verify=no.\n");
}
}
static void on_done(struct priv *p, CURLcode code)
{
bool aborted = atomic_load_explicit(&p->aborted, memory_order_relaxed);
if (!p->probed) {
// Connection died before any headers arrived.
if (code != CURLE_OK && !aborted)
log_curl_error(p, "error", code);
mp_mutex_lock(&p->mtx);
p->probed = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
return;
}
// Roll completed bytes into request_start so retries and chunk
// continuations resume at the next missing byte.
p->request_start += p->request_received;
p->request_received = 0;
if (code == CURLE_OK && !aborted) {
p->retry_count = 0;
bool chunked = p->seekable && p->opts->max_request_size > 0;
bool past_size = p->content_size > 0 && p->request_start >= p->content_size;
bool past_end = p->request_end > 0 && p->request_start >= p->request_end;
if (chunked && !past_size && !past_end) {
start_request(p);
return;
}
p->finished = true;
mp_mutex_lock(&p->mtx);
p->stream_eof = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
return;
}
// Try to recover if the stream is seekable and the failure looks
// recoverable.
bool recoverable = !aborted && p->seekable &&
is_recoverable_error(code) &&
p->retry_count < p->opts->max_retries;
if (recoverable) {
p->retry_count++;
MP_WARN(p, "%s, retrying (#%d) from %" PRIu64 "\n",
curl_easy_strerror(code), p->retry_count, p->request_start);
start_request(p);
return;
}
if (!aborted)
log_curl_error(p, "transfer failed", code);
mp_mutex_lock(&p->mtx);
p->stream_error = true;
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
}
static void on_cancel(void *ctx)
{
struct priv *p = ctx;
atomic_store(&p->aborted, true);
mp_mutex_lock(&p->mtx);
mp_cond_broadcast(&p->cond);
mp_mutex_unlock(&p->mtx);
if (p->ctx && p->ctx->multi)
curl_multi_wakeup(p->ctx->multi);
}
// Configuration and initial setup
static struct curl_slist *build_header_list(struct priv *p)
{
struct curl_slist *list = NULL;
if (p->net_opts->referrer && p->net_opts->referrer[0]) {
char *h = talloc_asprintf(NULL, "Referer: %s", p->net_opts->referrer);
list = curl_slist_append(list, h);
talloc_free(h);
}
if (p->net_opts->http_header_fields) {
for (int i = 0; p->net_opts->http_header_fields[i]; i++)
list = curl_slist_append(list, p->net_opts->http_header_fields[i]);
}
return list;
}
static void setup_curl(struct priv *p)
{
CURL *c = p->curl;
curl_easy_setopt(c, CURLOPT_URL, p->url);
curl_easy_setopt(c, CURLOPT_NOSIGNAL, 1L);
curl_easy_setopt(c, CURLOPT_PRIVATE, p);
curl_easy_setopt(c, CURLOPT_WRITEFUNCTION, write_callback);
curl_easy_setopt(c, CURLOPT_WRITEDATA, p);
curl_easy_setopt(c, CURLOPT_HEADERFUNCTION, header_callback);
curl_easy_setopt(c, CURLOPT_HEADERDATA, p);
// enable progress callback, so we can cancel transfer at any point
curl_easy_setopt(c, CURLOPT_NOPROGRESS, 0L);
curl_easy_setopt(c, CURLOPT_XFERINFOFUNCTION, xferinfo_callback);
curl_easy_setopt(c, CURLOPT_XFERINFODATA, p);
// Enable verbose output with trace level logging.
curl_easy_setopt(c, CURLOPT_VERBOSE, mp_msg_test(p->log, MSGL_TRACE) ? 1L : 0L);
curl_easy_setopt(c, CURLOPT_DEBUGFUNCTION, debug_callback);
curl_easy_setopt(c, CURLOPT_DEBUGDATA, p);
curl_easy_setopt(c, CURLOPT_ACCEPT_ENCODING, "");
curl_easy_setopt(c, CURLOPT_FOLLOWLOCATION, 1L);
curl_easy_setopt(c, CURLOPT_MAXREDIRS, (long)p->opts->max_redirects);
curl_easy_setopt(c, CURLOPT_HTTP_VERSION, (long)p->opts->http_version);
curl_easy_setopt(c, CURLOPT_HSTS_CTRL, (long)CURLHSTS_ENABLE);
curl_easy_setopt(c, CURLOPT_TCP_KEEPALIVE, 1L);
curl_easy_setopt(c, CURLOPT_CONNECTTIMEOUT_MS,
(long)(p->opts->connect_timeout * 1000));
if (p->net_opts->useragent && p->net_opts->useragent[0])
curl_easy_setopt(c, CURLOPT_USERAGENT, p->net_opts->useragent);
if (p->net_opts->http_proxy && p->net_opts->http_proxy[0])
curl_easy_setopt(c, CURLOPT_PROXY, p->net_opts->http_proxy);
curl_easy_setopt(c, CURLOPT_SSL_OPTIONS, (long)CURLSSLOPT_NATIVE_CA);
curl_easy_setopt(c, CURLOPT_SSL_VERIFYPEER, p->net_opts->tls_verify ? 1L : 0L);
curl_easy_setopt(c, CURLOPT_SSL_VERIFYHOST, p->net_opts->tls_verify ? 2L : 0L);
if (p->net_opts->tls_ca_file) {
char *path = mp_get_user_path(p, p->global, p->net_opts->tls_ca_file);
curl_easy_setopt(c, CURLOPT_CAINFO, path);
}
if (p->net_opts->tls_cert_file) {
char *path = mp_get_user_path(p, p->global, p->net_opts->tls_cert_file);
curl_easy_setopt(c, CURLOPT_SSLCERT, path);
}
if (p->net_opts->tls_key_file) {
char *path = mp_get_user_path(p, p->global, p->net_opts->tls_key_file);
curl_easy_setopt(c, CURLOPT_SSLKEY, path);
}
if (p->net_opts->cookies_enabled) {
curl_easy_setopt(c, CURLOPT_COOKIEFILE, "");
char *file = p->net_opts->cookies_file;
if (file && file[0]) {
void *tmp = talloc_new(NULL);
char *path = mp_get_user_path(tmp, p->global, file);
bstr data = stream_read_file2(path, tmp,
STREAM_READ_FILE_FLAGS_DEFAULT & ~STREAM_LOCAL_FS_ONLY,
p->global, 1000000);
bstr buf = data;
while (buf.len) {
bstr line = bstr_strip_linebreaks(bstr_getline(buf, &buf));
if (!line.len)
continue;
char *line_str = bstrto0(tmp, line);
curl_easy_setopt(c, CURLOPT_COOKIELIST, line_str);
}
talloc_free(tmp);
}
}
p->headers = build_header_list(p);
if (p->headers)
curl_easy_setopt(c, CURLOPT_HTTPHEADER, p->headers);
}
// stream_curl implementation
static int curl_fill_buffer(struct stream *s, void *buffer, int max_len)
{
struct priv *p = s->priv;
if (max_len <= 0)
return 0;
mp_mutex_lock(&p->mtx);
while (p->count == 0 && !p->stream_eof && !p->stream_error &&
!atomic_load_explicit(&p->aborted, memory_order_relaxed))
{
mp_cond_wait(&p->cond, &p->mtx);
}
size_t copy = MPMIN((size_t)max_len, p->count);
if (copy > 0) {
size_t head_chunk = MPMIN(p->buffer_size - p->head, copy);
memcpy(buffer, p->buffer + p->head, head_chunk);
memcpy((char *)buffer + head_chunk, p->buffer, copy - head_chunk);
p->head = (p->head + copy) % p->buffer_size;
p->count -= copy;
}
bool unpause = p->paused && !p->stream_eof && !p->stream_error &&
p->buffer_size - p->count >= p->buffer_size / 2;
mp_mutex_unlock(&p->mtx);
if (unpause)
cmd_async(p, CMD_UNPAUSE);
return copy;
}
static int curl_seek(struct stream *s, int64_t pos)
{
struct priv *p = s->priv;
if (pos < 0)
return 0;
cmd_sync(p, CMD_SEEK, pos, true);
return 1;
}
static int64_t curl_get_size(struct stream *s)
{
struct priv *p = s->priv;
return p->content_size;
}
static void priv_destructor(void *ptr)
{
struct priv *p = ptr;
mp_cancel_set_cb(p->s->cancel, NULL, NULL);
if (p->curl) {
cmd_sync(p, CMD_REMOVE, 0, false);
curl_easy_cleanup(p->curl);
}
if (p->headers)
curl_slist_free_all(p->headers);
mp_mutex_destroy(&p->mtx);
mp_cond_destroy(&p->cond);
}
static void curl_close(struct stream *s)
{
struct priv *p = s->priv;
if (!p)
return;
mp_cancel_set_cb(s->cancel, NULL, NULL);
if (p->curl) {
cmd_sync(p, CMD_REMOVE, 0, false);
curl_easy_cleanup(p->curl);
p->curl = NULL;
}
}
static int curl_open(stream_t *s, const struct stream_open_args *args)
{
if (s->mode != STREAM_READ)
return STREAM_UNSUPPORTED;
if (!s->global || !s->global->curl) {
MP_ERR(s, "curl backend not initialized\n");
return STREAM_ERROR;
}
struct curl_opts *opts = mp_get_config_group(s, s->global, &curl_conf);
if (!opts->enabled)
return STREAM_NO_MATCH;
struct priv *p = talloc_zero(s, struct priv);
s->priv = p;
talloc_set_destructor(p, priv_destructor);
p->log = s->log;
p->global = s->global;
p->ctx = s->global->curl;
p->s = s;
p->opts = talloc_steal(p, opts);
p->net_opts = mp_get_config_group(p, s->global, &mp_network_conf);
p->url = talloc_strdup(p, s->url);
p->scheme = curl_scheme_lookup(bstr0(p->url));
// Only supported URLs are supposed to reach here.
mp_assert(p->scheme);
p->content_size = -1;
p->buffer_size = p->opts->buffer_size;
p->buffer = talloc_size(p, p->buffer_size);
if (args->special_arg) {
const struct curl_open_args *oa = args->special_arg;
if (oa->offset > 0)
p->request_start = oa->offset;
if (oa->end_offset > 0)
p->request_end = oa->end_offset;
}
mp_mutex_init(&p->mtx);
mp_cond_init(&p->cond);
p->aborted = false;
p->curl = curl_easy_init();
if (!p->curl) {
MP_ERR(s, "curl_easy_init failed\n");
return STREAM_ERROR;
}
setup_curl(p);
mp_cancel_set_cb(s->cancel, on_cancel, p);
cmd_sync(p, CMD_ADD, 0, false);
mp_mutex_lock(&p->mtx);
while (!p->probed && !atomic_load_explicit(&p->aborted, memory_order_relaxed))
mp_cond_wait(&p->cond, &p->mtx);
mp_mutex_unlock(&p->mtx);
if (!p->stream_ok || atomic_load(&p->aborted))
return STREAM_ERROR;
char *content_type = NULL;
curl_easy_getinfo(p->curl, CURLINFO_CONTENT_TYPE, &content_type);
if (content_type && content_type[0])
s->mime_type = talloc_strdup(s, content_type);
s->seekable = p->seekable;
s->is_network = true;
s->streaming = true;
s->fast_skip = true;
s->fill_buffer = curl_fill_buffer;
s->seek = p->seekable ? curl_seek : NULL;
s->get_size = curl_get_size;
s->close = curl_close;
return STREAM_OK;
}
static bool curl_has_proto(bstr scheme)
{
curl_version_info_data *info = curl_version_info(CURLVERSION_NOW);
mp_require(info && info->protocols);
return bstr_in_list0(scheme, (char **)info->protocols);
}
static char **curl_get_protocols(void)
{
int num = 0;
char **protocols = NULL;
for (int i = 0; i < MP_ARRAY_SIZE(curl_schemes); i++) {
bstr scheme = curl_schemes[i].scheme;
if (curl_has_proto(scheme))
MP_TARRAY_APPEND(NULL, protocols, num, bstrdup0(protocols, scheme));
}
MP_TARRAY_APPEND(NULL, protocols, num, NULL);
return protocols;
}
const stream_info_t stream_info_curl = {
.name = "curl",
.open2 = curl_open,
.get_protocols = curl_get_protocols,
.stream_origin = STREAM_ORIGIN_NET,
};
// FFmpeg AVIOContext implementation
// Allows demuxers to use our stream_curl in nested io and sub-demuxers. This
// should route all traffic through our implementation.
struct curl_avio_cookie {
struct stream *stream;
struct mp_cancel *cancel;
};
static bool is_protocol_allowed(struct mp_log *log, bstr scheme,
const char *whitelist, const char *blacklist)
{
// `scheme` is required to be wrapped null-terminated string literal.
// This is UB otherwise, see curl_schemes.
mp_assert(scheme.len && scheme.start[scheme.len] == '\0');
if (whitelist && av_match_list(scheme.start, whitelist, ',') <= 0) {
mp_err(log, "Protocol '%.*s' not on whitelist '%s'!\n", BSTR_P(scheme), whitelist);
return false;
}
if (blacklist && av_match_list(scheme.start, blacklist, ',') > 0) {
mp_err(log, "Protocol '%.*s' on blacklist '%s'!\n", BSTR_P(scheme), blacklist);
return false;
}
return true;
}
static int curl_avio_read(void *opaque, uint8_t *buf, int size)
{
struct curl_avio_cookie *c = opaque;
int ret = stream_read_partial(c->stream, buf, size);
return ret > 0 ? ret : AVERROR_EOF;
}
static int64_t curl_avio_seek(void *opaque, int64_t pos, int whence)
{
struct curl_avio_cookie *c = opaque;
if (whence == AVSEEK_SIZE) {
int64_t end = stream_get_size(c->stream);
return end >= 0 ? end : AVERROR(ENOSYS);
}
if (whence == SEEK_END) {
int64_t end = stream_get_size(c->stream);
if (end < 0)
return AVERROR(EINVAL);
pos += end;
} else if (whence == SEEK_CUR) {
pos += stream_tell(c->stream);
} else if (whence != SEEK_SET) {
return AVERROR(EINVAL);
}
if (pos < 0)
return AVERROR(EINVAL);
if (!stream_seek(c->stream, pos))
return AVERROR(EIO);
return pos;
}
int mp_curl_avio_open(struct demuxer *demuxer, AVIOContext **pb_out,
void **cookie_out, const char *url, int flags,
AVDictionary **options,
const char *whitelist, const char *blacklist)
{
*pb_out = NULL;
*cookie_out = NULL;
if (flags & AVIO_FLAG_WRITE)
return AVERROR(ENOSYS);
// Check protocol early, to return ENOSYS and allow lavf to fallback.
const struct curl_scheme *cs = curl_scheme_lookup(bstr0(url));
if (!cs || !curl_has_proto(cs->scheme))
return AVERROR(ENOSYS);
struct curl_opts *opts = mp_get_config_group(NULL, demuxer->global, &curl_conf);
bool enabled = opts->enabled;
talloc_free(opts);
if (!enabled)
return AVERROR(ENOSYS);
// The context is required to be initialized in global.
mp_require(demuxer->global && demuxer->global->curl);
struct curl_open_args oa = {0};
if (options && *options) {
AVDictionaryEntry *e;
// Nested IO plumbs whitelist/blacklist through the AVDictionary,
// use that if set, same as FFmpeg's implementation.
if ((e = av_dict_get(*options, "protocol_whitelist", NULL, 0)))
whitelist = e->value;
if ((e = av_dict_get(*options, "protocol_blacklist", NULL, 0)))
blacklist = e->value;
// lavf's http demuxer exposes initial/final byte offsets as AVOptions
// Some demuxers, like lavf/hls.c assume it is always available, even for
// custom IO... Add support for this.
if ((e = av_dict_get(*options, "offset", NULL, 0)))
oa.offset = strtoll(e->value, NULL, 10);
if ((e = av_dict_get(*options, "end_offset", NULL, 0)))
oa.end_offset = strtoll(e->value, NULL, 10);
}
if (!is_protocol_allowed(demuxer->log, cs->scheme, whitelist, blacklist))
return AVERROR(EINVAL);
// Each nested stream gets its own mp_cancel slaved to the main demuxer,
// so the http backend can install its own wake-up callback without
// clobbering the top-level stream or any sibling nested stream.
struct mp_cancel *cancel = mp_cancel_new(NULL);
mp_cancel_set_parent(cancel, demuxer->cancel);
struct stream_open_args args = {
.global = demuxer->global,
.cancel = cancel,
.url = url,
.flags = STREAM_READ | (demuxer->stream_origin & STREAM_ORIGIN_MASK),
.sinfo = &stream_info_curl,
.special_arg = &oa,
};
struct stream *s = NULL;
int r = stream_create_with_args(&args, &s);
if (r != STREAM_OK || !s) {
talloc_free(cancel);
return AVERROR(EIO);
}
struct curl_avio_cookie *c = talloc_zero(NULL, struct curl_avio_cookie);
c->stream = s;
c->cancel = cancel;
void *buffer = av_malloc(64 * 1024);
if (!buffer) {
free_stream(s);
talloc_free(cancel);
talloc_free(c);
return AVERROR(ENOMEM);
}
AVIOContext *pb = avio_alloc_context(buffer, 64 * 1024, 0, c,
curl_avio_read, NULL,
s->seekable ? curl_avio_seek : NULL);
if (!pb) {
av_free(buffer);
free_stream(s);
talloc_free(cancel);
talloc_free(c);
return AVERROR(ENOMEM);
}
pb->seekable = s->seekable ? AVIO_SEEKABLE_NORMAL : 0;
*pb_out = pb;
*cookie_out = c;
return 0;
}
void mp_curl_avio_close(AVIOContext *pb, void *cookie)
{
struct curl_avio_cookie *c = cookie;
if (pb) {
av_freep(&pb->buffer);
avio_context_free(&pb);
}
if (c) {
free_stream(c->stream);
talloc_free(c->cancel);
talloc_free(c);
}
}