5G-MAG Reference Tools - MBMS Middleware
ContentStream.cpp
Go to the documentation of this file.
1 // 5G-MAG Reference Tools
2 // MBMS Middleware Process
3 //
4 // Copyright (C) 2021 Klaus Kühnhammer (Österreichische Rundfunksender GmbH & Co KG)
5 //
6 // Licensed under the License terms and conditions for use, reproduction, and
7 // distribution of 5G-MAG software (the “License”). You may not use this file
8 // except in compliance with the License. You may obtain a copy of the License at
9 // https://www.5g-mag.com/reference-tools. Unless required by applicable law or
10 // agreed to in writing, software distributed under the License is distributed on
11 // an “AS IS” BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
12 // or implied.
13 //
14 // See the License for the specific language governing permissions and limitations
15 // under the License.
16 //
17 
18 #include <regex>
19 #include "ContentStream.h"
20 #include "CacheItems.h"
21 #include "HlsPrimaryPlaylist.h"
22 
23 #include "spdlog/spdlog.h"
24 #include "cpprest/base_uri.h"
25 
26 MBMS_RT::ContentStream::ContentStream(std::string base, std::string flute_if, boost::asio::io_service &io_service,
27  CacheManagement &cache, DeliveryProtocol protocol, const libconfig::Config &cfg)
28  : _5gbc_stream_iface(std::move(flute_if)), _cfg(cfg), _delivery_protocol(protocol), _base(std::move(base)),
29  _io_service(io_service), _cache(cache), _flute_thread{} {
30 }
31 
33  spdlog::debug("Destroying content stream at base {}", _base);
34  if (_flute_receiver) {
35  _flute_receiver->stop();
36  _flute_thread.join();
37  }
38 }
39 
40 auto MBMS_RT::ContentStream::configure_5gbc_delivery_from_sdp(const std::string &sdp) -> bool {
41  spdlog::debug("ContentStream parsing SDP");
42  std::istringstream iss(sdp);
43  for (std::string line; std::getline(iss, line);) {
44  const std::regex sdp_line_regex("^([a-z])\\=(.+)$");
45  std::smatch match;
46  if (std::regex_match(line, match, sdp_line_regex)) {
47  if (match.size() == 3) {
48  auto field = match[1].str();
49  auto value = match[2].str();
50  spdlog::debug("SDP line {}: {}", field, value);
51 
52  if (field == "c") {
53  const std::regex value_regex("^IN (IP.) ([0-9\\.]+).*$");
54  std::smatch cmatch;
55  if (std::regex_match(value, cmatch, value_regex)) {
56  if (cmatch.size() == 3) {
57  _5gbc_stream_mcast_addr = cmatch[2].str();
58  }
59  }
60  } else if (field == "m") {
61  const std::regex value_regex("^application (.+) (.+)$");
62  std::smatch cmatch;
63  if (std::regex_match(value, cmatch, value_regex)) {
64  if (cmatch.size() == 3) {
65  _5gbc_stream_mcast_port = cmatch[1].str();
66  _5gbc_stream_type = cmatch[2].str();
67  }
68  }
69  const std::regex value_regex2("^application (.+) (.+) (.+)$");
70  if (std::regex_match(value, cmatch, value_regex2)) {
71  if (cmatch.size() == 4) {
72  _5gbc_stream_mcast_port = cmatch[1].str();
73  _5gbc_stream_type = cmatch[2].str();
74  }
75  }
76  } else if (field == "a") {
77  const std::regex value_regex("^flute-tsi:(.+)$");
78  std::smatch cmatch;
79  if (std::regex_match(value, cmatch, value_regex)) {
80  if (cmatch.size() == 2) {
81  _5gbc_stream_flute_tsi = stoul(cmatch[1].str());
82  }
83  }
84  }
85  }
86  }
87  }
88  if (!_5gbc_stream_type.empty() && !_5gbc_stream_mcast_addr.empty() &&
89  !_5gbc_stream_mcast_port.empty()) {
90  spdlog::info("ContentStream SDP parsing complete. Stream type {}, TSI {}, MCast at {}:{}",
91  _5gbc_stream_type, _5gbc_stream_flute_tsi, _5gbc_stream_mcast_addr, _5gbc_stream_mcast_port);
92  return true;
93  }
94  return false;
95 }
96 
97 auto MBMS_RT::ContentStream::flute_file_received(std::shared_ptr<LibFlute::File> file) -> void {
98  spdlog::info("ContentStream: {} (TOI {}, MIME type {}) has been received at {}",
99  file->meta().content_location, file->meta().toi, file->meta().content_type, file->received_at());
100  if (file->meta().content_location != "index.m3u8") { // ignore generated manifests
101  std::string content_location = file->meta().content_location;
102  // DASH: Add BaseURl to the content location of the received segments otherwise we do not have a cache match later
103  if (_delivery_protocol == MBMS_RT::DeliveryProtocol::DASH) {
104  content_location = _base_path + content_location;
105  }
106 
107  //if this is an MPD also update our internal representation of the MPD
108  if (file->meta().content_location.find(".mpd") != std::string::npos) {
109  _cache.add_item(std::make_shared<CachedFile>(
110  _base_path + "manifest.mpd", file->received_at(), std::move(file))
111  );
112  } else {
113  _cache.add_item(std::make_shared<CachedFile>(
114  content_location, file->received_at(), std::move(file))
115  );
116  }
117 
118  }
119 }
120 
122  spdlog::info("ContentStream starting");
123  if (_5gbc_stream_type == "FLUTE/UDP") {
124  spdlog::info("Starting FLUTE receiver on {}:{} for TSI {}", _5gbc_stream_mcast_addr, _5gbc_stream_mcast_port,
125  _5gbc_stream_flute_tsi);
126  _flute_thread = std::thread{[&]() {
127  _flute_receiver = std::make_unique<LibFlute::Receiver>(_5gbc_stream_iface, _5gbc_stream_mcast_addr,
128  atoi(_5gbc_stream_mcast_port.c_str()),
129  _5gbc_stream_flute_tsi, _io_service);
130  _flute_receiver->register_completion_callback(
131  boost::bind(&ContentStream::flute_file_received, this, _1)); //NOLINT
132  }};
133  }
134 };
135 
136 auto MBMS_RT::ContentStream::read_master_manifest(const std::string &manifest) -> void {
137  if (_delivery_protocol == DeliveryProtocol::HLS) {
138  auto pl = HlsPrimaryPlaylist(manifest, "");
139  if (pl.streams().size() == 1) {
140  auto stream = pl.streams()[0];
141  _playlist_path = stream.uri;
142  } else {
143  spdlog::error("Error: HLS primary playlist for stream contains more than one stream definitions. Ignoring.");
144  }
145  }
146 }
147 
148 auto MBMS_RT::ContentStream::flute_info() const -> std::string {
149  if (_5gbc_stream_type == "none") {
150  return "n/a";
151  } else {
152  return _5gbc_stream_type + ": " + _5gbc_stream_mcast_addr + ":" + _5gbc_stream_mcast_port +
153  ", TSI " + std::to_string(_5gbc_stream_flute_tsi);
154  }
155 }
bool configure_5gbc_delivery_from_sdp(const std::string &sdp)
virtual void flute_file_received(std::shared_ptr< LibFlute::File > file)
void read_master_manifest(const std::string &manifest)
ContentStream(std::string base, std::string flute_if, boost::asio::io_service &io_service, CacheManagement &cache, DeliveryProtocol protocol, const libconfig::Config &cfg)
std::string flute_info() const
static Config cfg
Global configuration object.
Definition: main.cpp:111