stream-deploy/ZLM/3rdpart/ZLToolKit/src/Util/RingBuffer.h

508 lines
16 KiB
C++
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

/*
* Copyright (c) 2016 The ZLToolKit project authors. All Rights Reserved.
*
* This file is part of ZLToolKit(https://github.com/ZLMediaKit/ZLToolKit).
*
* Use of this source code is governed by MIT license that can be found in the
* LICENSE file in the root of the source tree. All contributing project authors
* may be found in the AUTHORS file in the root of the source tree.
*/
#ifndef UTIL_RINGBUFFER_H_
#define UTIL_RINGBUFFER_H_
#include <assert.h>
#include <atomic>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <condition_variable>
#include <functional>
#include "util.h"
#include "List.h"
#include "Poller/EventPoller.h"
// GOP缓存最大长度下限值 [AUTO-TRANSLATED:63162058]
//GOP cache minimum length lower bound value
#define RING_MIN_SIZE 32
#define LOCK_GUARD(mtx) std::lock_guard<decltype(mtx)> lck(mtx)
namespace toolkit {
template <typename T>
class RingDelegate {
public:
using Ptr = std::shared_ptr<RingDelegate>;
RingDelegate() = default;
virtual ~RingDelegate() = default;
virtual void onWrite(T in, bool is_key = true) = 0;
};
template <typename T>
class _RingStorage;
template <typename T>
class _RingReaderDispatcher;
/**
* 环形缓存读取器
* 该对象的事件触发都会在绑定的poller线程中执行
* 所以把锁去掉了
* 对该对象的一切操作都应该在poller线程中执行
* Circular cache reader
* All events triggered by this object will be executed in the bound poller thread
* So the lock is removed
* All operations on this object should be executed in the poller thread
* [AUTO-TRANSLATED:3d0f773d]
*/
template <typename T>
class _RingReader {
public:
using Ptr = std::shared_ptr<_RingReader>;
friend class _RingReaderDispatcher<T>;
_RingReader(std::shared_ptr<_RingStorage<T>> storage) {
_storage = std::move(storage);
setReadCB(nullptr);
setDetachCB(nullptr);
setGetInfoCB(nullptr);
setMessageCB(nullptr);
}
~_RingReader() = default;
void setReadCB(std::function<void(const T &)> cb) {
if (!cb) {
_read_cb = [](const T &) {};
} else {
_read_cb = std::move(cb);
flushGop();
}
}
void setDetachCB(std::function<void()> cb) {
_detach_cb = cb ? std::move(cb) : []() {};
}
void setGetInfoCB(std::function<Any()> cb) {
_info_cb = cb ? std::move(cb) : []() { return Any(); };
}
void setMessageCB(std::function<void(const Any &data)> cb) {
_msg_cb = cb ? std::move(cb) : [](const Any &data) {};
}
void flushGop() {
if (!_storage) {
return;
}
_storage->getCache().for_each([this](const List<std::pair<bool, T>> &lst) {
lst.for_each([this](const std::pair<bool, T> &pr) { onRead(pr.second, pr.first); });
});
}
private:
void onRead(const T &data, bool /*is_key*/) { _read_cb(data); }
void onMessage(const Any &data) { _msg_cb(data); }
void onDetach() const { _detach_cb(); }
Any getInfo() { return _info_cb(); }
private:
std::shared_ptr<_RingStorage<T>> _storage;
std::function<void(void)> _detach_cb;
std::function<void(const T &)> _read_cb;
std::function<Any()> _info_cb;
std::function<void(const Any &data)> _msg_cb;
};
template <typename T>
class _RingStorage {
public:
using Ptr = std::shared_ptr<_RingStorage>;
using GopType = List<List<std::pair<bool, T>>>;
_RingStorage(size_t max_size, size_t max_gop_size) {
// gop缓存个数不能小于32 [AUTO-TRANSLATED:63d52404]
//The number of GOP caches cannot be less than 32
if (max_size < RING_MIN_SIZE) {
max_size = RING_MIN_SIZE;
}
_max_size = max_size;
_max_gop_size = max_gop_size;
clearCache();
}
~_RingStorage() = default;
/**
* 写入环形缓存数据
* @param in 数据
* @param is_key 是否为关键帧
* @return 是否触发重置环形缓存大小
* Write data to the circular cache
* @param in Data
* @param is_key Whether it is a key frame
* @return Whether to trigger a reset of the circular cache size
* [AUTO-TRANSLATED:8ccedd1d]
*/
void write(T in, bool is_key = true) {
if (is_key) {
_have_idr = true;
_started = true;
if (!_data_cache.back().empty()) {
//当前gop列队还没收到任意缓存 [AUTO-TRANSLATED:81e257d0]
//The current GOP queue has not received any cache
_data_cache.emplace_back();
}
if (_data_cache.size() > _max_gop_size) {
// GOP个数超过限制那么移除最早的GOP [AUTO-TRANSLATED:054ad5e4]
//The number of GOPs exceeds the limit, so remove the earliest GOP
popFrontGop();
}
}
if (!_have_idr && _started) {
//缓存中没有关键帧那么gop缓存无效 [AUTO-TRANSLATED:394a9170]
//There is no key frame in the cache, so the GOP cache is invalid
return;
}
_data_cache.back().emplace_back(std::make_pair(is_key, std::move(in)));
if (++_size > _max_size) {
// GOP缓存溢出 [AUTO-TRANSLATED:1cd0ddc4]
//GOP cache overflow
while (_data_cache.size() > 1) {
//先尝试清除老的GOP缓存 [AUTO-TRANSLATED:a01422a1]
//Try to clear the old GOP cache first
popFrontGop();
}
if (_size > _max_size) {
//还是大于最大缓冲限制那么清空所有GOP [AUTO-TRANSLATED:dec7aa9b]
//Still greater than the maximum buffer limit, so clear all GOPs
clearCache();
}
}
}
Ptr clone() const {
Ptr ret(new _RingStorage());
ret->_size = _size;
ret->_have_idr = _have_idr;
ret->_started = _started;
ret->_max_size = _max_size;
ret->_max_gop_size = _max_gop_size;
ret->_data_cache = _data_cache;
return ret;
}
const GopType &getCache() const { return _data_cache; }
void clearCache() {
_size = 0;
_have_idr = false;
_data_cache.clear();
_data_cache.emplace_back();
}
private:
_RingStorage() = default;
void popFrontGop() {
if (!_data_cache.empty()) {
_size -= _data_cache.front().size();
_data_cache.pop_front();
if (_data_cache.empty()) {
_data_cache.emplace_back();
}
}
}
private:
bool _started = false;
bool _have_idr;
size_t _size;
size_t _max_size;
size_t _max_gop_size;
GopType _data_cache;
};
template <typename T>
class RingBuffer;
/**
* 环形缓存事件派发器只能一个poller线程操作它
* @tparam T
* Ring buffer event dispatcher, can only be operated by one poller thread
* @tparam T
* [AUTO-TRANSLATED:6c0d8449]
*/
template <typename T>
class _RingReaderDispatcher : public std::enable_shared_from_this<_RingReaderDispatcher<T>> {
public:
using Ptr = std::shared_ptr<_RingReaderDispatcher>;
using RingReader = _RingReader<T>;
using RingStorage = _RingStorage<T>;
using onChangeInfoCB = std::function<Any(Any &&info)>;
friend class RingBuffer<T>;
~_RingReaderDispatcher() {
decltype(_reader_map) reader_map;
reader_map.swap(_reader_map);
for (auto &pr : reader_map) {
auto reader = pr.second.lock();
if (reader) {
reader->onDetach();
}
}
}
private:
_RingReaderDispatcher(
const typename RingStorage::Ptr &storage, std::function<void(int, bool)> onSizeChanged) {
_reader_size = 0;
_storage = storage;
_on_size_changed = std::move(onSizeChanged);
assert(_on_size_changed);
}
void write(T in, bool is_key = true) {
for (auto it = _reader_map.begin(); it != _reader_map.end();) {
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
reader->onRead(in, is_key);
++it;
}
_storage->write(std::move(in), is_key);
}
void sendMessage(const Any &data) {
for (auto it = _reader_map.begin(); it != _reader_map.end();) {
auto reader = it->second.lock();
if (!reader) {
it = _reader_map.erase(it);
--_reader_size;
onSizeChanged(false);
continue;
}
reader->onMessage(data);
++it;
}
}
std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache) {
if (!poller->isCurrentThread()) {
throw std::runtime_error("You can attach RingBuffer only in it's poller thread");
}
std::weak_ptr<_RingReaderDispatcher> weak_self = this->shared_from_this();
auto on_dealloc = [weak_self, poller](RingReader *ptr) {
poller->async([weak_self, ptr]() {
auto strong_self = weak_self.lock();
if (strong_self && strong_self->_reader_map.erase(ptr)) {
--strong_self->_reader_size;
strong_self->onSizeChanged(false);
}
delete ptr;
});
};
std::shared_ptr<RingReader> reader(new RingReader(use_cache ? _storage : nullptr), on_dealloc);
_reader_map[reader.get()] = reader;
++_reader_size;
onSizeChanged(true);
return reader;
}
void onSizeChanged(bool add_flag) { _on_size_changed(_reader_size, add_flag); }
void clearCache() {
if (_reader_size == 0) {
_storage->clearCache();
}
}
std::list<Any> getInfoList(const onChangeInfoCB &on_change) {
std::list<Any> ret;
for (auto &pr : _reader_map) {
auto reader = pr.second.lock();
if (!reader) {
continue;
}
auto info = reader->getInfo();
if (!info) {
continue;
}
ret.emplace_back(on_change(std::move(info)));
}
return ret;
}
private:
std::atomic_int _reader_size;
std::function<void(int, bool)> _on_size_changed;
typename RingStorage::Ptr _storage;
std::unordered_map<void *, std::weak_ptr<RingReader>> _reader_map;
};
template <typename T>
class RingBuffer : public std::enable_shared_from_this<RingBuffer<T>> {
public:
using Ptr = std::shared_ptr<RingBuffer>;
using RingReader = _RingReader<T>;
using RingStorage = _RingStorage<T>;
using RingReaderDispatcher = _RingReaderDispatcher<T>;
using onReaderChanged = std::function<void(int size)>;
using onGetInfoCB = std::function<void(std::list<Any> &info_list)>;
RingBuffer(size_t max_size = 1024, onReaderChanged cb = nullptr, size_t max_gop_size = 1) {
_storage = std::make_shared<RingStorage>(max_size, max_gop_size);
_on_reader_changed = cb ? std::move(cb) : [](int size) {};
//先触发无人观看 [AUTO-TRANSLATED:34c64fef]
//First trigger no one watching
_on_reader_changed(0);
}
~RingBuffer() = default;
void write(T in, bool is_key = true) {
if (_delegate) {
_delegate->onWrite(std::move(in), is_key);
return;
}
LOCK_GUARD(_mtx_map);
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
//切换线程后触发onRead事件 [AUTO-TRANSLATED:4ca6647d]
//Switch thread and trigger onRead event
pr.first->async([second, in, is_key]() mutable { second->write(std::move(in), is_key); }, false);
}
_storage->write(std::move(in), is_key);
}
void sendMessage(const Any &data) {
LOCK_GUARD(_mtx_map);
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
// 切换线程后触发sendMessage [AUTO-TRANSLATED:350138c9]
//Switch thread and trigger sendMessage
pr.first->async([second, data]() { second->sendMessage(data); }, false);
}
}
void setDelegate(const typename RingDelegate<T>::Ptr &delegate) { _delegate = delegate; }
std::shared_ptr<RingReader> attach(const EventPoller::Ptr &poller, bool use_cache = true) {
typename RingReaderDispatcher::Ptr dispatcher;
{
LOCK_GUARD(_mtx_map);
auto &ref = _dispatcher_map[poller];
if (!ref) {
std::weak_ptr<RingBuffer> weak_self = this->shared_from_this();
auto onSizeChanged = [weak_self, poller](int size, bool add_flag) {
if (auto strong_self = weak_self.lock()) {
strong_self->onSizeChanged(poller, size, add_flag);
}
};
auto onDealloc = [poller](RingReaderDispatcher *ptr) { poller->async([ptr]() { delete ptr; }); };
ref.reset(new RingReaderDispatcher(_storage->clone(), std::move(onSizeChanged)), std::move(onDealloc));
}
dispatcher = ref;
}
return dispatcher->attach(poller, use_cache);
}
int readerCount() { return _total_count; }
void clearCache() {
LOCK_GUARD(_mtx_map);
_storage->clearCache();
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
//切换线程后清空缓存 [AUTO-TRANSLATED:150f7fa4]
//Switch thread and clear cache
pr.first->async([second]() { second->clearCache(); }, false);
}
}
void flushGop(std::function<void(const T &)> cb) {
LOCK_GUARD(_mtx_map);
_storage->getCache().for_each([&](const List<std::pair<bool, T>> &lst) {
lst.for_each([&](const std::pair<bool, T> &pr) { cb(pr.second); });
});
}
void getInfoList(const onGetInfoCB &cb, const typename RingReaderDispatcher::onChangeInfoCB &on_change = nullptr) {
if (!cb) {
return;
}
if (!on_change) {
const_cast<typename RingReaderDispatcher::onChangeInfoCB &>(on_change) = [](Any &&info) { return std::move(info); };
}
LOCK_GUARD(_mtx_map);
auto info_vec = std::make_shared<std::vector<std::list<Any>>>();
// 1、最少确保一个元素 [AUTO-TRANSLATED:6dafe078]
//1. Ensure at least one element
info_vec->resize(_dispatcher_map.empty() ? 1 : _dispatcher_map.size());
std::shared_ptr<void> on_finished(nullptr, [cb, info_vec](void *) mutable {
// 2、防止这里为空 [AUTO-TRANSLATED:4484baf7]
//2. Prevent this from being empty
auto &lst = *info_vec->begin();
for (auto &item : *info_vec) {
if (&lst != &item) {
lst.insert(lst.end(), item.begin(), item.end());
}
}
cb(lst);
});
auto i = 0U;
for (auto &pr : _dispatcher_map) {
auto &second = pr.second;
pr.first->async([second, info_vec, on_finished, i, on_change]() { (*info_vec)[i] = second->getInfoList(on_change); });
++i;
}
}
private:
void onSizeChanged(const EventPoller::Ptr &poller, int size, bool add_flag) {
if (size == 0) {
LOCK_GUARD(_mtx_map);
_dispatcher_map.erase(poller);
}
if (add_flag) {
++_total_count;
} else {
--_total_count;
}
_on_reader_changed(_total_count);
}
private:
struct HashOfPtr {
std::size_t operator()(const EventPoller::Ptr &key) const { return (std::size_t)key.get(); }
};
private:
std::mutex _mtx_map;
std::atomic_int _total_count { 0 };
typename RingStorage::Ptr _storage;
typename RingDelegate<T>::Ptr _delegate;
onReaderChanged _on_reader_changed;
std::unordered_map<EventPoller::Ptr, typename RingReaderDispatcher::Ptr, HashOfPtr> _dispatcher_map;
};
} /* namespace toolkit */
#endif /* UTIL_RINGBUFFER_H_ */