23 #include "spdlog/spdlog.h"
28 short port, uint64_t tsi,
29 boost::asio::io_service& io_service)
32 , _mcast_address(address)
34 boost::asio::ip::udp::endpoint listen_endpoint(
35 boost::asio::ip::address::from_string(iface), port);
36 _socket.open(listen_endpoint.protocol());
37 _socket.set_option(boost::asio::ip::multicast::enable_loopback(
true));
38 _socket.set_option(boost::asio::ip::udp::socket::reuse_address(
true));
39 _socket.set_option(boost::asio::socket_base::receive_buffer_size(16*1024*1024));
40 _socket.bind(listen_endpoint);
44 boost::asio::ip::multicast::join_group(
45 boost::asio::ip::address::from_string(address)));
47 _socket.async_receive_from(
48 boost::asio::buffer(_data, max_length), _sender_endpoint,
49 boost::bind(&LibFlute::Receiver::handle_receive_from,
this,
50 boost::asio::placeholders::error,
51 boost::asio::placeholders::bytes_transferred));
65 auto LibFlute::Receiver::handle_receive_from(
const boost::system::error_code& error,
66 size_t bytes_recvd) ->
void
70 spdlog::debug(
"Received {} bytes", bytes_recvd);
74 if (alc.tsi() != _tsi) {
75 spdlog::warn(
"Discarding packet for unknown TSI {}", alc.tsi());
79 const std::lock_guard<std::mutex> lock(_files_mutex);
81 if (alc.toi() == 0 && (!_fdt || _fdt->instance_id() != alc.fdt_instance_id())) {
82 if (_files.find(alc.toi()) == _files.end()) {
83 FileDeliveryTable::FileEntry fe{0,
"",
static_cast<uint32_t
>(alc.fec_oti().transfer_length),
"",
"", 0, alc.fec_oti()};
84 _files.emplace(alc.toi(), std::make_shared<LibFlute::File>(fe));
88 if (_files.find(alc.toi()) != _files.end() && !_files[alc.toi()]->complete()) {
90 _data + alc.header_length(),
91 bytes_recvd - alc.header_length(),
92 _files[alc.toi()]->fec_oti(),
93 alc.content_encoding());
95 for (
const auto& symbol : encoding_symbols) {
96 spdlog::debug(
"received TOI {} SBN {} ID {}", alc.toi(), symbol.source_block_number(), symbol.id() );
97 _files[alc.toi()]->put_symbol(symbol);
100 auto file = _files[alc.toi()].get();
101 if (_files[alc.toi()]->complete()) {
102 for (
auto it = _files.cbegin(); it != _files.cend();)
104 if (it->second.get() != file && it->second->meta().content_location == file->meta().content_location)
106 spdlog::debug(
"Replacing file with TOI {}", it->first);
107 it = _files.erase(it);
115 spdlog::debug(
"File with TOI {} completed", alc.toi());
116 if (alc.toi() != 0 && _completion_cb) {
117 _completion_cb(_files[alc.toi()]);
120 if (alc.toi() == 0) {
121 _fdt = std::make_unique<LibFlute::FileDeliveryTable>(
122 alc.fdt_instance_id(), _files[alc.toi()]->buffer(), _files[alc.toi()]->length());
124 _files.erase(alc.toi());
125 for (
const auto& file_entry : _fdt->file_entries()) {
127 if (_files.find(file_entry.toi) == _files.end()) {
128 spdlog::debug(
"Starting reception for file with TOI {}", file_entry.toi);
129 _files.emplace(file_entry.toi, std::make_shared<LibFlute::File>(file_entry));
135 spdlog::debug(
"Discarding packet for unknown or already completed file with TOI {}", alc.toi());
137 }
catch (std::exception ex) {
138 spdlog::warn(
"Failed to decode ALC/FLUTE packet: {}", ex.what());
141 _socket.async_receive_from(
142 boost::asio::buffer(_data, max_length), _sender_endpoint,
143 boost::bind(&LibFlute::Receiver::handle_receive_from,
this,
144 boost::asio::placeholders::error,
145 boost::asio::placeholders::bytes_transferred));
149 spdlog::error(
"receive_from error: {}", error.message());
155 std::vector<std::shared_ptr<LibFlute::File>> files;
156 for (
auto& f : _files) {
157 files.push_back(f.second);
164 const std::lock_guard<std::mutex> lock(_files_mutex);
165 for (
auto it = _files.cbegin(); it != _files.cend();)
167 auto age = time(
nullptr) - it->second->received_at();
168 if ( it->second->meta().content_location !=
"bootstrap.multipart" && age > max_age) {
169 it = _files.erase(it);
A class for parsing and creating ALC packets.
static std::vector< EncodingSymbol > from_payload(char *encoded_data, size_t data_len, const FecOti &fec_oti, ContentEncoding encoding)
Parse and construct all encoding symbols from a payload data buffer.
Receiver(const std::string &iface, const std::string &address, short port, uint64_t tsi, boost::asio::io_service &io_service)
Default constructor.
virtual ~Receiver()
Default destructor.
std::vector< std::shared_ptr< LibFlute::File > > file_list()
List all current files.
void enable_ipsec(uint32_t spi, const std::string &aes_key)
Enable IPSEC ESP decryption of FLUTE payloads.
void remove_expired_files(unsigned max_age)
Remove files from the list that are older than max_age seconds.
void enable_esp(uint32_t spi, const std::string &dest_address, Direction direction, const std::string &key)