27 #include <openssl/md5.h>
29 #include "spdlog/spdlog.h"
32 : _meta( std::move(entry) )
33 , _received_at( time(nullptr) )
37 if (_buffer ==
nullptr)
39 throw "Failed to allocate file buffer";
43 calculate_partitioning();
49 std::string content_location,
50 std::string content_type,
57 _buffer = (
char*)malloc(length);
58 if (_buffer ==
nullptr)
60 throw "Failed to allocate file buffer";
62 memcpy(_buffer, data, length);
68 unsigned char md5[MD5_DIGEST_LENGTH];
69 MD5((
const unsigned char*)data, length, md5);
72 _meta.content_location = std::move(content_location);
73 _meta.content_type = std::move(content_type);
74 _meta.content_length = length;
75 _meta.content_md5 = base64_encode(md5, MD5_DIGEST_LENGTH);
76 _meta.expires = expires;
77 _meta.fec_oti = fec_oti;
83 throw "Unsupported FEC scheme";
86 calculate_partitioning();
92 if (_own_buffer && _buffer !=
nullptr)
100 if (symbol.source_block_number() > _source_blocks.size()) {
101 throw "Source Block number too high";
104 SourceBlock& source_block = _source_blocks[ symbol.source_block_number() ];
106 if (symbol.id() > source_block.symbols.size()) {
107 throw "Encoding Symbol ID too high";
113 symbol.decode_to(target_symbol.
data, target_symbol.
length);
116 check_source_block_completion(source_block);
117 check_file_completion();
122 auto LibFlute::File::check_source_block_completion( SourceBlock& block ) ->
void
124 block.complete = std::all_of(block.symbols.begin(), block.symbols.end(), [](
const auto& symbol){ return symbol.second.complete; });
127 auto LibFlute::File::check_file_completion() ->
void
129 _complete = std::all_of(_source_blocks.begin(), _source_blocks.end(), [](
const auto& block){ return block.second.complete; });
131 if (_complete && !_meta.content_md5.empty()) {
133 unsigned char md5[MD5_DIGEST_LENGTH];
134 MD5((
const unsigned char*)buffer(), length(), md5);
136 auto content_md5 = base64_decode(_meta.content_md5);
137 if (memcmp(md5, content_md5.c_str(), MD5_DIGEST_LENGTH) != 0) {
138 spdlog::debug(
"MD5 mismatch for TOI {}, discarding", _meta.toi);
141 for (
auto& block : _source_blocks) {
142 for (
auto& symbol : block.second.symbols) {
143 symbol.second.complete =
false;
145 block.second.complete =
false;
152 auto LibFlute::File::calculate_partitioning() ->
void
155 _nof_source_symbols = ceil((
double)_meta.fec_oti.transfer_length / (
double)_meta.fec_oti.encoding_symbol_length);
156 _nof_source_blocks = ceil((
double)_nof_source_symbols / (
double)_meta.fec_oti.max_source_block_length);
157 _large_source_block_length = ceil((
double)_nof_source_symbols / (
double)_nof_source_blocks);
158 _small_source_block_length = floor((
double)_nof_source_symbols / (
double)_nof_source_blocks);
159 _nof_large_source_blocks = _nof_source_symbols - _small_source_block_length * _nof_source_blocks;
162 auto LibFlute::File::create_blocks() ->
void
165 auto buffer_ptr = _buffer;
166 size_t remaining_size = _meta.fec_oti.transfer_length;
168 while (remaining_size > 0) {
171 auto block_length = ( number < _nof_large_source_blocks ) ? _large_source_block_length : _small_source_block_length;
173 for (
int i = 0; i < block_length; i++) {
174 auto symbol_length = std::min(remaining_size, (
size_t)_meta.fec_oti.encoding_symbol_length);
175 assert(buffer_ptr + symbol_length <= _buffer + _meta.fec_oti.transfer_length);
177 SourceBlock::Symbol symbol{.data = buffer_ptr, .length = symbol_length, .complete =
false};
178 block.symbols[ symbol_id++ ] = symbol;
180 remaining_size -= symbol_length;
181 buffer_ptr += symbol_length;
183 if (remaining_size <= 0)
break;
185 _source_blocks[number++] = block;
191 auto block = _source_blocks.begin();
192 int nof_symbols = std::ceil((
float)(max_size - 4) / (
float)_meta.fec_oti.encoding_symbol_length);
194 std::vector<EncodingSymbol> symbols;
196 for (
auto& block : _source_blocks) {
197 if (cnt >= nof_symbols)
break;
199 if (!block.second.complete) {
200 for (
auto& symbol : block.second.symbols) {
201 if (cnt >= nof_symbols)
break;
203 if (!symbol.second.complete && !symbol.second.queued) {
204 symbols.emplace_back(symbol.first, block.first, symbol.second.data, symbol.second.length, _meta.fec_oti.encoding_id);
205 symbol.second.queued =
true;
217 for (
auto& symbol : symbols) {
218 auto block = _source_blocks.find(symbol.source_block_number());
219 if (block != _source_blocks.end()) {
220 auto sym = block->second.symbols.find(symbol.id());
221 if (sym != block->second.symbols.end()) {
222 sym->second.queued =
false;
223 sym->second.complete = success;
225 check_source_block_completion(block->second);
226 check_file_completion();
A class for handling FEC encoding symbols.
void put_symbol(const EncodingSymbol &symbol)
Write the data from an encoding symbol into the appropriate place in the buffer.
virtual ~File()
Default destructor.
File(LibFlute::FileDeliveryTable::FileEntry entry)
Create a file from an FDT entry (used for reception)
void mark_completed(const std::vector< EncodingSymbol > &symbols, bool success)
Mark encoding symbols as completed.
std::vector< EncodingSymbol > get_next_symbols(size_t max_size)
Get the next encoding symbols that fit in max_size bytes.
An entry for a file in the FDT.