增加http代理功能

This commit is contained in:
Alex 2023-11-17 05:49:03 +08:00
parent 7865f2c360
commit 88dc59ec68
15 changed files with 130 additions and 24 deletions

View File

@ -199,6 +199,8 @@ public:
// 支持通过on_publish返回值替换stream_id // 支持通过on_publish返回值替换stream_id
std::string stream_replace; std::string stream_replace;
// http proxy url
std::string proxy_url;
template <typename MAP> template <typename MAP>
ProtocolOption(const MAP &allArgs) : ProtocolOption() { ProtocolOption(const MAP &allArgs) : ProtocolOption() {
@ -229,6 +231,8 @@ public:
GET_OPT_VALUE(hls_save_path); GET_OPT_VALUE(hls_save_path);
GET_OPT_VALUE(stream_replace); GET_OPT_VALUE(stream_replace);
GET_OPT_VALUE(proxy_url);
} }
private: private:

View File

@ -8,12 +8,13 @@
* may be found in the AUTHORS file in the root of the source tree. * may be found in the AUTHORS file in the root of the source tree.
*/ */
#include <cinttypes>
#include "Parser.h" #include "Parser.h"
#include "strCoding.h"
#include "macros.h"
#include "Network/sockutil.h"
#include "Common/macros.h" #include "Common/macros.h"
#include "Network/sockutil.h"
#include "Util/base64.h"
#include "macros.h"
#include "strCoding.h"
#include <cinttypes>
using namespace std; using namespace std;
using namespace toolkit; using namespace toolkit;
@ -325,6 +326,24 @@ void splitUrl(const std::string &url, std::string &host, uint16_t &port) {
host = url.substr(0, pos); host = url.substr(0, pos);
checkHost(host); checkHost(host);
} }
void parseProxyUrl(const std::string &proxy_url, std::string &proxy_host, uint16_t &proxy_port, std::string &proxy_auth){
//判断是否包含http://, 如果是则去掉
std::string host;
auto pos = proxy_url.find("://");
if (pos != string::npos) {
host = proxy_url.substr(pos + 3);
}else{
host = proxy_url;
}
//判断是否包含用户名和密码
pos = host.rfind('@');
if (pos != string::npos) {
proxy_auth = encodeBase64(host.substr(0, pos));
host = host.substr(pos + 1, host.size());
}
splitUrl(host, proxy_host, proxy_port);
}
#if 0 #if 0
//测试代码 //测试代码
static onceToken token([](){ static onceToken token([](){

View File

@ -21,6 +21,8 @@ namespace mediakit {
std::string findSubString(const char *buf, const char *start, const char *end, size_t buf_size = 0); std::string findSubString(const char *buf, const char *start, const char *end, size_t buf_size = 0);
// 把url解析为主机地址和端口号,兼容ipv4/ipv6/dns // 把url解析为主机地址和端口号,兼容ipv4/ipv6/dns
void splitUrl(const std::string &url, std::string &host, uint16_t &port); void splitUrl(const std::string &url, std::string &host, uint16_t &port);
// 解析proxy url,仅支持http
void parseProxyUrl(const std::string &proxy_url, std::string &proxy_host, uint16_t &proxy_port, std::string &proxy_auth);
struct StrCaseCompare { struct StrCaseCompare {
bool operator()(const std::string &__x, const std::string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; } bool operator()(const std::string &__x, const std::string &__y) const { return strcasecmp(__x.data(), __y.data()) < 0; }

View File

@ -22,6 +22,7 @@ HlsPlayer::HlsPlayer(const EventPoller::Ptr &poller) {
void HlsPlayer::play(const string &url) { void HlsPlayer::play(const string &url) {
_play_result = false; _play_result = false;
_play_url = url; _play_url = url;
setProxyUrl(_option.proxy_url);
fetchIndexFile(); fetchIndexFile();
} }
@ -88,6 +89,7 @@ void HlsPlayer::fetchSegment() {
weak_ptr<HlsPlayer> weak_self = static_pointer_cast<HlsPlayer>(shared_from_this()); weak_ptr<HlsPlayer> weak_self = static_pointer_cast<HlsPlayer>(shared_from_this());
if (!_http_ts_player) { if (!_http_ts_player) {
_http_ts_player = std::make_shared<HttpTSPlayer>(getPoller()); _http_ts_player = std::make_shared<HttpTSPlayer>(getPoller());
_http_ts_player->setProxyUrl(_option.proxy_url);
_http_ts_player->setOnCreateSocket([weak_self](const EventPoller::Ptr &poller) { _http_ts_player->setOnCreateSocket([weak_self](const EventPoller::Ptr &poller) {
auto strong_self = weak_self.lock(); auto strong_self = weak_self.lock();
if (strong_self) { if (strong_self) {

View File

@ -78,7 +78,9 @@ void HttpClient::sendRequest(const string &url) {
printer.pop_back(); printer.pop_back();
_header.emplace("Cookie", printer); _header.emplace("Cookie", printer);
} }
if (isUsedProxy()) {
startConnect(_proxy_host, _proxy_port, _wait_header_ms / 1000.0f);
} else {
if (!alive() || host_changed) { if (!alive() || host_changed) {
startConnect(host, port, _wait_header_ms / 1000.0f); startConnect(host, port, _wait_header_ms / 1000.0f);
} else { } else {
@ -86,6 +88,7 @@ void HttpClient::sendRequest(const string &url) {
onConnect_l(ex); onConnect_l(ex);
} }
} }
}
void HttpClient::clear() { void HttpClient::clear() {
_url.clear(); _url.clear();
@ -158,8 +161,9 @@ void HttpClient::onConnect_l(const SockException &ex) {
onResponseCompleted_l(ex); onResponseCompleted_l(ex);
return; return;
} }
_StrPrinter printer; _StrPrinter printer;
//不使用代理或者代理服务器已经连接成功
if(_proxy_connected || !isUsedProxy()) {
printer << _method + " " << _path + " HTTP/1.1\r\n"; printer << _method + " " << _path + " HTTP/1.1\r\n";
for (auto &pr : _header) { for (auto &pr : _header) {
printer << pr.first + ": "; printer << pr.first + ": ";
@ -167,6 +171,13 @@ void HttpClient::onConnect_l(const SockException &ex) {
} }
_header.clear(); _header.clear();
_path.clear(); _path.clear();
}else{
printer << "CONNECT "<< _last_host <<" HTTP/1.1\r\n";
printer << "Proxy-Connection: keep-alive\r\n";
if(!_proxy_auth.empty()) {
printer << "Proxy-Authorization: Basic " << _proxy_auth << "\r\n";
}
}
SockSender::send(printer << "\r\n"); SockSender::send(printer << "\r\n");
onFlush(); onFlush();
} }
@ -400,5 +411,28 @@ void HttpClient::setBodyTimeout(size_t timeout_ms) {
void HttpClient::setCompleteTimeout(size_t timeout_ms) { void HttpClient::setCompleteTimeout(size_t timeout_ms) {
_wait_complete_ms = timeout_ms; _wait_complete_ms = timeout_ms;
} }
bool HttpClient::isUsedProxy() const {
return _used_proxy;
}
bool HttpClient::isProxyConnected() const {
return _proxy_connected;
}
void HttpClient::setProxyUrl(const string &proxyUrl) {
_proxy_url = proxyUrl;
if (!_proxy_url.empty()) {
parseProxyUrl(_proxy_url, _proxy_host, _proxy_port, _proxy_auth);
_used_proxy = true;
}else{
_used_proxy = false;
}
}
bool HttpClient::checkProxyConnected(const char *data, size_t len) {
auto ret = strstr(data, "HTTP/1.1 200 Connection established");
_proxy_connected = ret != nullptr;
return _proxy_connected;
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -141,6 +141,8 @@ public:
*/ */
void setCompleteTimeout(size_t timeout_ms); void setCompleteTimeout(size_t timeout_ms);
void setProxyUrl(const std::string &proxyUrl);
protected: protected:
/** /**
* http回复头 * http回复头
@ -181,11 +183,19 @@ protected:
void onFlush() override; void onFlush() override;
void onManager() override; void onManager() override;
bool checkProxyConnected(const char *data, size_t len);
bool isUsedProxy() const;
bool isProxyConnected() const;
void clearResponse();
private: private:
void onResponseCompleted_l(const toolkit::SockException &ex); void onResponseCompleted_l(const toolkit::SockException &ex);
void onConnect_l(const toolkit::SockException &ex); void onConnect_l(const toolkit::SockException &ex);
void checkCookie(HttpHeader &headers); void checkCookie(HttpHeader &headers);
void clearResponse();
private: private:
//for http response //for http response
@ -215,6 +225,12 @@ private:
toolkit::Ticker _wait_header; toolkit::Ticker _wait_header;
toolkit::Ticker _wait_body; toolkit::Ticker _wait_body;
toolkit::Ticker _wait_complete; toolkit::Ticker _wait_complete;
std::string _proxy_url;
std::string _proxy_host;
std::string _proxy_auth;
uint16_t _proxy_port;
bool _proxy_connected = false;
bool _used_proxy = false;
}; };
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -15,6 +15,11 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
void HttpClientImp::onConnect(const SockException &ex) { void HttpClientImp::onConnect(const SockException &ex) {
if (isUsedProxy() && !isProxyConnected()) {
//连接代理服务器
setDoNotUseSSL();
HttpClient::onConnect(ex);
} else {
if (!isHttps()) { if (!isHttps()) {
//https 302跳转 http时需要关闭ssl //https 302跳转 http时需要关闭ssl
setDoNotUseSSL(); setDoNotUseSSL();
@ -23,5 +28,16 @@ void HttpClientImp::onConnect(const SockException &ex) {
TcpClientWithSSL<HttpClient>::onConnect(ex); TcpClientWithSSL<HttpClient>::onConnect(ex);
} }
} }
}
ssize_t HttpClientImp::onRecvHeader(const char *data, size_t len) {
if (isUsedProxy() && !isProxyConnected()) {
if(checkProxyConnected(data, len)) {
clearResponse();
onConnect(SockException(Err_success, "proxy connected"));
return 0;
}
}
return HttpClient::onRecvHeader(data, len);
}
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -24,6 +24,7 @@ public:
protected: protected:
void onConnect(const toolkit::SockException &ex) override; void onConnect(const toolkit::SockException &ex) override;
virtual ssize_t onRecvHeader(const char *data, size_t len) override;
}; };
} /* namespace mediakit */ } /* namespace mediakit */

View File

@ -21,6 +21,7 @@ void TsPlayer::play(const string &url) {
TraceL << "play http-ts: " << url; TraceL << "play http-ts: " << url;
_play_result = false; _play_result = false;
_benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>(); _benchmark_mode = (*this)[Client::kBenchmarkMode].as<int>();
setProxyUrl(_option.proxy_url);
setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>()); setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>());
setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>()); setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>());
setMethod("GET"); setMethod("GET");

View File

@ -16,8 +16,9 @@ using namespace toolkit;
namespace mediakit { namespace mediakit {
MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller) { MediaPlayer::MediaPlayer(const EventPoller::Ptr &poller, const ProtocolOption &option) {
_poller = poller ? poller : EventPollerPool::Instance().getPoller(); _poller = poller ? poller : EventPollerPool::Instance().getPoller();
_option = option;
} }
static void setOnCreateSocket_l(const std::shared_ptr<PlayerBase> &delegate, const Socket::onCreateSocket &cb){ static void setOnCreateSocket_l(const std::shared_ptr<PlayerBase> &delegate, const Socket::onCreateSocket &cb){
@ -37,6 +38,7 @@ static void setOnCreateSocket_l(const std::shared_ptr<PlayerBase> &delegate, con
void MediaPlayer::play(const string &url) { void MediaPlayer::play(const string &url) {
_delegate = PlayerBase::createPlayer(_poller, url); _delegate = PlayerBase::createPlayer(_poller, url);
assert(_delegate); assert(_delegate);
_delegate->setOption(_option);
setOnCreateSocket_l(_delegate, _on_create_socket); setOnCreateSocket_l(_delegate, _on_create_socket);
_delegate->setOnShutdown(_on_shutdown); _delegate->setOnShutdown(_on_shutdown);
_delegate->setOnPlayResult(_on_play_result); _delegate->setOnPlayResult(_on_play_result);

View File

@ -21,7 +21,7 @@ class MediaPlayer : public PlayerImp<PlayerBase, PlayerBase> {
public: public:
using Ptr = std::shared_ptr<MediaPlayer>; using Ptr = std::shared_ptr<MediaPlayer>;
MediaPlayer(const toolkit::EventPoller::Ptr &poller = nullptr); MediaPlayer(const toolkit::EventPoller::Ptr &poller = nullptr, const ProtocolOption &option=ProtocolOption());
~MediaPlayer() override = default; ~MediaPlayer() override = default;
void play(const std::string &url) override; void play(const std::string &url) override;
@ -29,6 +29,7 @@ public:
void setOnCreateSocket(toolkit::Socket::onCreateSocket cb); void setOnCreateSocket(toolkit::Socket::onCreateSocket cb);
private: private:
ProtocolOption _option;
toolkit::EventPoller::Ptr _poller; toolkit::EventPoller::Ptr _poller;
toolkit::Socket::onCreateSocket _on_create_socket; toolkit::Socket::onCreateSocket _on_create_socket;
}; };

View File

@ -115,10 +115,16 @@ public:
*/ */
virtual void setOnResume(const std::function<void()> &cb) = 0; virtual void setOnResume(const std::function<void()> &cb) = 0;
void setOption(const ProtocolOption &option){
_option = option;
}
protected: protected:
virtual void onResume() = 0; virtual void onResume() = 0;
virtual void onShutdown(const toolkit::SockException &ex) = 0; virtual void onShutdown(const toolkit::SockException &ex) = 0;
virtual void onPlayResult(const toolkit::SockException &ex) = 0; virtual void onPlayResult(const toolkit::SockException &ex) = 0;
protected:
ProtocolOption _option;
}; };
template<typename Parent, typename Delegate> template<typename Parent, typename Delegate>

View File

@ -27,7 +27,7 @@ namespace mediakit {
PlayerProxy::PlayerProxy( PlayerProxy::PlayerProxy(
const string &vhost, const string &app, const string &stream_id, const ProtocolOption &option, int retry_count, const string &vhost, const string &app, const string &stream_id, const ProtocolOption &option, int retry_count,
const EventPoller::Ptr &poller, int reconnect_delay_min, int reconnect_delay_max, int reconnect_delay_step) const EventPoller::Ptr &poller, int reconnect_delay_min, int reconnect_delay_max, int reconnect_delay_step)
: MediaPlayer(poller) : MediaPlayer(poller, option)
, _option(option) { , _option(option) {
_tuple.vhost = vhost; _tuple.vhost = vhost;
_tuple.app = app; _tuple.app = app;

View File

@ -22,6 +22,7 @@ FlvPlayer::FlvPlayer(const EventPoller::Ptr &poller) {
void FlvPlayer::play(const string &url) { void FlvPlayer::play(const string &url) {
TraceL << "play http-flv: " << url; TraceL << "play http-flv: " << url;
_play_result = false; _play_result = false;
setProxyUrl(_option.proxy_url);
setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>()); setHeaderTimeout((*this)[Client::kTimeoutMS].as<int>());
setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>()); setBodyTimeout((*this)[Client::kMediaTimeoutMS].as<int>());
setMethod("GET"); setMethod("GET");

View File

@ -39,7 +39,8 @@ const char *TSSegment::onSearchPacketTail(const char *data, size_t len) {
if (((uint8_t *) data)[_size] == TS_SYNC_BYTE) { if (((uint8_t *) data)[_size] == TS_SYNC_BYTE) {
return data + _size; return data + _size;
} }
auto pos = memchr(data + _size, TS_SYNC_BYTE, len - _size); //搜索ts包头
auto pos = memchr(data, TS_SYNC_BYTE, len);
if (pos) { if (pos) {
return (char *) pos; return (char *) pos;
} }