libflute
Receiver.cpp
Go to the documentation of this file.
1 // libflute - FLUTE/ALC library
2 //
3 // Copyright (C) 2021 Klaus Kühnhammer (Österreichische Rundfunksender GmbH & Co KG)
4 //
5 // This program is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Affero General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU Affero General Public License for more details.
14 //
15 // You should have received a copy of the GNU Affero General Public License
16 // along with this program. If not, see <http://www.gnu.org/licenses/>.
17 //
18 
19 #include "Receiver.h"
20 #include "AlcPacket.h"
21 #include <iostream>
22 #include <string>
23 #include "spdlog/spdlog.h"
24 #include "IpSec.h"
25 
26 
27 LibFlute::Receiver::Receiver ( const std::string& iface, const std::string& address,
28  short port, uint64_t tsi,
29  boost::asio::io_service& io_service)
30  : _socket(io_service)
31  , _tsi(tsi)
32  , _mcast_address(address)
33 {
34  boost::asio::ip::udp::endpoint listen_endpoint(
35  boost::asio::ip::address::from_string(iface), port);
36  _socket.open(listen_endpoint.protocol());
37  _socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
38  _socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
39  _socket.set_option(boost::asio::socket_base::receive_buffer_size(16*1024*1024));
40  _socket.bind(listen_endpoint);
41 
42  // Join the multicast group.
43  _socket.set_option(
44  boost::asio::ip::multicast::join_group(
45  boost::asio::ip::address::from_string(address)));
46 
47  _socket.async_receive_from(
48  boost::asio::buffer(_data, max_length), _sender_endpoint,
49  boost::bind(&LibFlute::Receiver::handle_receive_from, this,
50  boost::asio::placeholders::error,
51  boost::asio::placeholders::bytes_transferred));
52 }
53 
55 {
56  //spdlog::debug("Closing flute receiver for ALC session {}", _alc_session_id);
57  //fcl::set_flute_session_state(_alc_session_id, fcl::SExiting);
58 }
59 
60 auto LibFlute::Receiver::enable_ipsec(uint32_t spi, const std::string& key) -> void
61 {
63 }
64 
65 auto LibFlute::Receiver::handle_receive_from(const boost::system::error_code& error,
66  size_t bytes_recvd) -> void
67 {
68  if (!error)
69  {
70  spdlog::debug("Received {} bytes", bytes_recvd);
71  try {
72  auto alc = LibFlute::AlcPacket(_data, bytes_recvd);
73 
74  if (alc.tsi() != _tsi) {
75  spdlog::warn("Discarding packet for unknown TSI {}", alc.tsi());
76  return;
77  }
78 
79  const std::lock_guard<std::mutex> lock(_files_mutex);
80 
81  if (alc.toi() == 0 && (!_fdt || _fdt->instance_id() != alc.fdt_instance_id())) {
82  if (_files.find(alc.toi()) == _files.end()) {
83  FileDeliveryTable::FileEntry fe{0, "", static_cast<uint32_t>(alc.fec_oti().transfer_length), "", "", 0, alc.fec_oti()};
84  _files.emplace(alc.toi(), std::make_shared<LibFlute::File>(fe));
85  }
86  }
87 
88  if (_files.find(alc.toi()) != _files.end() && !_files[alc.toi()]->complete()) {
89  auto encoding_symbols = LibFlute::EncodingSymbol::from_payload(
90  _data + alc.header_length(),
91  bytes_recvd - alc.header_length(),
92  _files[alc.toi()]->fec_oti(),
93  alc.content_encoding());
94 
95  for (const auto& symbol : encoding_symbols) {
96  spdlog::debug("received TOI {} SBN {} ID {}", alc.toi(), symbol.source_block_number(), symbol.id() );
97  _files[alc.toi()]->put_symbol(symbol);
98  }
99 
100  auto file = _files[alc.toi()].get();
101  if (_files[alc.toi()]->complete()) {
102  for (auto it = _files.cbegin(); it != _files.cend();)
103  {
104  if (it->second.get() != file && it->second->meta().content_location == file->meta().content_location)
105  {
106  spdlog::debug("Replacing file with TOI {}", it->first);
107  it = _files.erase(it);
108  }
109  else
110  {
111  ++it;
112  }
113  }
114 
115  spdlog::debug("File with TOI {} completed", alc.toi());
116  if (alc.toi() != 0 && _completion_cb) {
117  _completion_cb(_files[alc.toi()]);
118  }
119 
120  if (alc.toi() == 0) { // parse complete FDT
121  _fdt = std::make_unique<LibFlute::FileDeliveryTable>(
122  alc.fdt_instance_id(), _files[alc.toi()]->buffer(), _files[alc.toi()]->length());
123 
124  _files.erase(alc.toi());
125  for (const auto& file_entry : _fdt->file_entries()) {
126  // automatically receive all files in the FDT
127  if (_files.find(file_entry.toi) == _files.end()) {
128  spdlog::debug("Starting reception for file with TOI {}", file_entry.toi);
129  _files.emplace(file_entry.toi, std::make_shared<LibFlute::File>(file_entry));
130  }
131  }
132  }
133  }
134  } else {
135  spdlog::debug("Discarding packet for unknown or already completed file with TOI {}", alc.toi());
136  }
137  } catch (std::exception ex) {
138  spdlog::warn("Failed to decode ALC/FLUTE packet: {}", ex.what());
139  }
140 
141  _socket.async_receive_from(
142  boost::asio::buffer(_data, max_length), _sender_endpoint,
143  boost::bind(&LibFlute::Receiver::handle_receive_from, this,
144  boost::asio::placeholders::error,
145  boost::asio::placeholders::bytes_transferred));
146  }
147  else
148  {
149  spdlog::error("receive_from error: {}", error.message());
150  }
151 }
152 
153 auto LibFlute::Receiver::file_list() -> std::vector<std::shared_ptr<LibFlute::File>>
154 {
155  std::vector<std::shared_ptr<LibFlute::File>> files;
156  for (auto& f : _files) {
157  files.push_back(f.second);
158  }
159  return files;
160 }
161 
162 auto LibFlute::Receiver::remove_expired_files(unsigned max_age) -> void
163 {
164  const std::lock_guard<std::mutex> lock(_files_mutex);
165  for (auto it = _files.cbegin(); it != _files.cend();)
166  {
167  auto age = time(nullptr) - it->second->received_at();
168  if ( it->second->meta().content_location != "bootstrap.multipart" && age > max_age) {
169  it = _files.erase(it);
170  } else {
171  ++it;
172  }
173  }
174 }
A class for parsing and creating ALC packets.
Definition: AlcPacket.h:29
static std::vector< EncodingSymbol > from_payload(char *encoded_data, size_t data_len, const FecOti &fec_oti, ContentEncoding encoding)
Parse and construct all encoding symbols from a payload data buffer.
Receiver(const std::string &iface, const std::string &address, short port, uint64_t tsi, boost::asio::io_service &io_service)
Default constructor.
Definition: Receiver.cpp:27
virtual ~Receiver()
Default destructor.
Definition: Receiver.cpp:54
std::vector< std::shared_ptr< LibFlute::File > > file_list()
List all current files.
Definition: Receiver.cpp:153
void enable_ipsec(uint32_t spi, const std::string &aes_key)
Enable IPSEC ESP decryption of FLUTE payloads.
Definition: Receiver.cpp:60
void remove_expired_files(unsigned max_age)
Remove files from the list that are older than max_age seconds.
Definition: Receiver.cpp:162
void enable_esp(uint32_t spi, const std::string &dest_address, Direction direction, const std::string &key)
Definition: IpSec.cpp:125