23 #include "spdlog/spdlog.h"
27 short port, uint64_t tsi,
unsigned short mtu, uint32_t rate_limit,
28 boost::asio::io_service& io_service)
29 : _endpoint(boost::asio::ip::address::from_string(address), port)
30 , _socket(io_service, _endpoint.protocol())
31 , _fdt_timer(io_service)
32 , _send_timer(io_service)
33 , _io_service(io_service)
36 , _rate_limit(rate_limit)
37 , _mcast_address(address)
44 uint32_t max_source_block_length = 64;
46 _socket.set_option(boost::asio::ip::multicast::enable_loopback(
true));
47 _socket.set_option(boost::asio::ip::udp::socket::reuse_address(
true));
50 _fdt = std::make_unique<FileDeliveryTable>(1, _fec_oti);
52 _fdt_timer.expires_from_now(boost::posix_time::seconds(_fdt_repeat_interval));
53 _fdt_timer.async_wait( boost::bind(&Transmitter::fdt_send_tick,
this));
65 auto LibFlute::Transmitter::handle_send_to(
const boost::system::error_code& error) ->
void
73 return std::chrono::duration_cast<std::chrono::seconds>(
74 std::chrono::system_clock::now().time_since_epoch()).count();
77 auto LibFlute::Transmitter::send_fdt() ->
void {
78 _fdt->set_expires(seconds_since_epoch() + _fdt_repeat_interval * 2);
79 auto fdt = _fdt->to_string();
80 auto file = std::make_shared<File>(
85 seconds_since_epoch() + _fdt_repeat_interval * 2,
89 file->set_fdt_instance_id( _fdt->instance_id() );
90 _files.insert_or_assign(0, file);
94 const std::string& content_location,
95 const std::string& content_type,
98 size_t length) -> uint16_t
102 if (_toi == 0) _toi = 1;
104 auto file = std::make_shared<File>(
113 _fdt->add(file->meta());
115 _files.insert({toi, file});
119 auto LibFlute::Transmitter::fdt_send_tick() ->
void
122 _fdt_timer.expires_from_now(boost::posix_time::seconds(_fdt_repeat_interval));
123 _fdt_timer.async_wait( boost::bind(&Transmitter::fdt_send_tick,
this));
126 auto LibFlute::Transmitter::file_transmitted(uint32_t toi) ->
void
133 if (_completion_cb) {
139 auto LibFlute::Transmitter::send_next_packet() ->
void
141 uint32_t bytes_queued = 0;
144 for (
auto& file_m : _files) {
145 auto file = file_m.second;
147 if (file && !file->complete()) {
148 auto symbols = file->get_next_symbols(_max_payload);
150 if (symbols.size()) {
151 for(
const auto& symbol : symbols) {
152 spdlog::debug(
"sending TOI {} SBN {} ID {}", file->meta().toi, symbol.source_block_number(), symbol.id() );
154 auto packet = std::make_shared<AlcPacket>(_tsi, file->meta().toi, file->meta().fec_oti, symbols, _max_payload, file->fdt_instance_id());
155 bytes_queued += packet->size();
157 _socket.async_send_to(
158 boost::asio::buffer(packet->data(), packet->size()), _endpoint,
159 [file, symbols, packet,
this](
160 const boost::system::error_code& error,
161 std::size_t bytes_transferred)
164 spdlog::debug(
"sent_to error: {}", error.message());
166 file->mark_completed(symbols, !error);
167 if (file->complete()) {
168 file_transmitted(file->meta().toi);
178 _send_timer.expires_from_now(boost::posix_time::milliseconds(10));
179 _send_timer.async_wait( boost::bind(&Transmitter::send_next_packet,
this));
181 if (_rate_limit == 0) {
182 _io_service.post(boost::bind(&Transmitter::send_next_packet,
this));
184 auto send_duration = ((bytes_queued * 8.0) / (
double)_rate_limit/1000.0) * 1000.0 * 1000.0;
185 spdlog::debug(
"Rate limiter: queued {} bytes, limit {} kbps, next send in {} us",
186 bytes_queued, _rate_limit, send_duration);
187 _send_timer.expires_from_now(boost::posix_time::microseconds(
188 static_cast<int>(ceil(send_duration))));
189 _send_timer.async_wait( boost::bind(&Transmitter::send_next_packet,
this));
uint64_t seconds_since_epoch()
Convenience function to get the current timestamp for expiry calculation.
virtual ~Transmitter()
Default destructor.
uint16_t send(const std::string &content_location, const std::string &content_type, uint32_t expires, char *data, size_t length)
Transmit a file.
void enable_ipsec(uint32_t spi, const std::string &aes_key)
Enable IPSEC ESP encryption of FLUTE payloads.
Transmitter(const std::string &address, short port, uint64_t tsi, unsigned short mtu, uint32_t rate_limit, boost::asio::io_service &io_service)
Default constructor.
void enable_esp(uint32_t spi, const std::string &dest_address, Direction direction, const std::string &key)