libflute
Transmitter.cpp
Go to the documentation of this file.
1 // libflute - FLUTE/ALC library
2 //
3 // Copyright (C) 2021 Klaus Kühnhammer (Österreichische Rundfunksender GmbH & Co KG)
4 //
5 // This program is free software: you can redistribute it and/or modify
6 // it under the terms of the GNU Affero General Public License as published by
7 // the Free Software Foundation, either version 3 of the License, or
8 // (at your option) any later version.
9 //
10 // This program is distributed in the hope that it will be useful,
11 // but WITHOUT ANY WARRANTY; without even the implied warranty of
12 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 // GNU Affero General Public License for more details.
14 //
15 // You should have received a copy of the GNU Affero General Public License
16 // along with this program. If not, see <http://www.gnu.org/licenses/>.
17 //
18 #include <cstdio>
19 #include <chrono>
20 #include <cstring>
21 #include <iostream>
22 #include <string>
23 #include "spdlog/spdlog.h"
24 #include "Transmitter.h"
25 #include "IpSec.h"
26 LibFlute::Transmitter::Transmitter ( const std::string& address,
27  short port, uint64_t tsi, unsigned short mtu, uint32_t rate_limit,
28  boost::asio::io_service& io_service)
29  : _endpoint(boost::asio::ip::address::from_string(address), port)
30  , _socket(io_service, _endpoint.protocol())
31  , _fdt_timer(io_service)
32  , _send_timer(io_service)
33  , _io_service(io_service)
34  , _tsi(tsi)
35  , _mtu(mtu)
36  , _rate_limit(rate_limit)
37  , _mcast_address(address)
38 {
39  _max_payload = mtu -
40  20 - // IPv4 header
41  8 - // UDP header
42  32 - // ALC Header with EXT_FDT and EXT_FTI
43  4; // SBN and ESI for compact no-code FEC
44  uint32_t max_source_block_length = 64;
45 
46  _socket.set_option(boost::asio::ip::multicast::enable_loopback(true));
47  _socket.set_option(boost::asio::ip::udp::socket::reuse_address(true));
48 
49  _fec_oti = FecOti{FecScheme::CompactNoCode, 0, _max_payload, max_source_block_length};
50  _fdt = std::make_unique<FileDeliveryTable>(1, _fec_oti);
51 
52  _fdt_timer.expires_from_now(boost::posix_time::seconds(_fdt_repeat_interval));
53  _fdt_timer.async_wait( boost::bind(&Transmitter::fdt_send_tick, this));
54 
55  send_next_packet();
56 }
57 
59 
60 auto LibFlute::Transmitter::enable_ipsec(uint32_t spi, const std::string& key) -> void
61 {
63 }
64 
65 auto LibFlute::Transmitter::handle_send_to(const boost::system::error_code& error) -> void
66 {
67  if (!error) {
68  }
69 }
70 
72 {
73  return std::chrono::duration_cast<std::chrono::seconds>(
74  std::chrono::system_clock::now().time_since_epoch()).count();
75 }
76 
77 auto LibFlute::Transmitter::send_fdt() -> void {
78  _fdt->set_expires(seconds_since_epoch() + _fdt_repeat_interval * 2);
79  auto fdt = _fdt->to_string();
80  auto file = std::make_shared<File>(
81  0,
82  _fec_oti,
83  "",
84  "",
85  seconds_since_epoch() + _fdt_repeat_interval * 2,
86  (char*)fdt.c_str(),
87  fdt.length(),
88  true);
89  file->set_fdt_instance_id( _fdt->instance_id() );
90  _files.insert_or_assign(0, file);
91 }
92 
94  const std::string& content_location,
95  const std::string& content_type,
96  uint32_t expires,
97  char* data,
98  size_t length) -> uint16_t
99 {
100  auto toi = _toi;
101  _toi++;
102  if (_toi == 0) _toi = 1; // clamp to >= 1 in case it wraps
103 
104  auto file = std::make_shared<File>(
105  toi,
106  _fec_oti,
107  content_location,
108  content_type,
109  expires,
110  data,
111  length);
112 
113  _fdt->add(file->meta());
114  send_fdt();
115  _files.insert({toi, file});
116  return toi;
117 }
118 
119 auto LibFlute::Transmitter::fdt_send_tick() -> void
120 {
121  send_fdt();
122  _fdt_timer.expires_from_now(boost::posix_time::seconds(_fdt_repeat_interval));
123  _fdt_timer.async_wait( boost::bind(&Transmitter::fdt_send_tick, this));
124 }
125 
126 auto LibFlute::Transmitter::file_transmitted(uint32_t toi) -> void
127 {
128  if (toi != 0) {
129  _files.erase(toi);
130  _fdt->remove(toi);
131  send_fdt();
132 
133  if (_completion_cb) {
134  _completion_cb(toi);
135  }
136  }
137 }
138 
139 auto LibFlute::Transmitter::send_next_packet() -> void
140 {
141  uint32_t bytes_queued = 0;
142 
143  if (_files.size()) {
144  for (auto& file_m : _files) {
145  auto file = file_m.second;
146 
147  if (file && !file->complete()) {
148  auto symbols = file->get_next_symbols(_max_payload);
149 
150  if (symbols.size()) {
151  for(const auto& symbol : symbols) {
152  spdlog::debug("sending TOI {} SBN {} ID {}", file->meta().toi, symbol.source_block_number(), symbol.id() );
153  }
154  auto packet = std::make_shared<AlcPacket>(_tsi, file->meta().toi, file->meta().fec_oti, symbols, _max_payload, file->fdt_instance_id());
155  bytes_queued += packet->size();
156 
157  _socket.async_send_to(
158  boost::asio::buffer(packet->data(), packet->size()), _endpoint,
159  [file, symbols, packet, this](
160  const boost::system::error_code& error,
161  std::size_t bytes_transferred)
162  {
163  if (error) {
164  spdlog::debug("sent_to error: {}", error.message());
165  } else {
166  file->mark_completed(symbols, !error);
167  if (file->complete()) {
168  file_transmitted(file->meta().toi);
169  }
170  }
171  });
172  }
173  break;
174  }
175  }
176  }
177  if (!bytes_queued) {
178  _send_timer.expires_from_now(boost::posix_time::milliseconds(10));
179  _send_timer.async_wait( boost::bind(&Transmitter::send_next_packet, this));
180  } else {
181  if (_rate_limit == 0) {
182  _io_service.post(boost::bind(&Transmitter::send_next_packet, this));
183  } else {
184  auto send_duration = ((bytes_queued * 8.0) / (double)_rate_limit/1000.0) * 1000.0 * 1000.0;
185  spdlog::debug("Rate limiter: queued {} bytes, limit {} kbps, next send in {} us",
186  bytes_queued, _rate_limit, send_duration);
187  _send_timer.expires_from_now(boost::posix_time::microseconds(
188  static_cast<int>(ceil(send_duration))));
189  _send_timer.async_wait( boost::bind(&Transmitter::send_next_packet, this));
190  }
191  }
192 }
uint64_t seconds_since_epoch()
Convenience function to get the current timestamp for expiry calculation.
Definition: Transmitter.cpp:71
virtual ~Transmitter()
Default destructor.
uint16_t send(const std::string &content_location, const std::string &content_type, uint32_t expires, char *data, size_t length)
Transmit a file.
Definition: Transmitter.cpp:93
void enable_ipsec(uint32_t spi, const std::string &aes_key)
Enable IPSEC ESP encryption of FLUTE payloads.
Definition: Transmitter.cpp:60
Transmitter(const std::string &address, short port, uint64_t tsi, unsigned short mtu, uint32_t rate_limit, boost::asio::io_service &io_service)
Default constructor.
Definition: Transmitter.cpp:26
void enable_esp(uint32_t spi, const std::string &dest_address, Direction direction, const std::string &key)
Definition: IpSec.cpp:125
OTI values struct.
Definition: flute_types.h:53