5G-MAG Reference Tools - MBMS Modem
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
thread_pool Class Reference

#include <thread_pool.hpp>

Collaboration diagram for thread_pool:
Collaboration graph

Public Member Functions

 thread_pool (std::size_t thread_count=std::thread::hardware_concurrency(), int phy_prio=10)
 
 ~thread_pool ()
 
 thread_pool (thread_pool const &)=delete
 
 thread_pool (thread_pool &&)=default
 
thread_pooloperator= (thread_pool const &)=delete
 
thread_pooloperator= (thread_pool &&)=default
 
template<class Func , class... Args>
auto push (Func &&fn, Args &&...args)
 
void clear ()
 
void join ()
 
std::size_t thread_count () const
 
std::size_t active_count () const
 

Private Types

using task_type = std::function< void()>
 

Private Member Functions

void thread_loop ()
 
task_type next_task ()
 

Private Attributes

std::atomic< bool > m_stop { false }
 
std::atomic< std::size_t > m_active { 0 }
 
std::condition_variable m_notifier
 
std::mutex m_mutex
 
std::vector< std::thread > m_workers
 
std::queue< task_typem_tasks
 

Detailed Description

Definition at line 17 of file thread_pool.hpp.

Member Typedef Documentation

◆ task_type

using thread_pool::task_type = std::function<void()>
private

Definition at line 20 of file thread_pool.hpp.

Constructor & Destructor Documentation

◆ thread_pool() [1/3]

thread_pool::thread_pool ( std::size_t  thread_count = std::thread::hardware_concurrency(),
int  phy_prio = 10 
)
inlineexplicit

Definition at line 23 of file thread_pool.hpp.

24  {
25  struct sched_param thread_param;
26  thread_param.sched_priority = phy_prio;
27 
28  for (std::size_t i{ 0 }; i < thread_count; ++i) {
29  spdlog::info("Launching phy thread with realtime scheduling priority {}", thread_param.sched_priority );
30  m_workers.emplace_back(std::bind(&thread_pool::thread_loop, this));
31 
32  int error = pthread_setschedparam( m_workers.back().native_handle(), SCHED_RR, &thread_param );
33  if( error )
34  {
35  spdlog::error("Cannot set phy thread priority to realtime: {}. Thread will run at default priority.", strerror(error));
36  }
37  }
38  }
std::size_t thread_count() const
std::vector< std::thread > m_workers
void thread_loop()

◆ ~thread_pool()

thread_pool::~thread_pool ( )
inline

Definition at line 40 of file thread_pool.hpp.

41  {
42  if (m_workers.size() > 0) {
43  join();
44  }
45  }

◆ thread_pool() [2/3]

thread_pool::thread_pool ( thread_pool const &  )
delete

◆ thread_pool() [3/3]

thread_pool::thread_pool ( thread_pool &&  )
default

Member Function Documentation

◆ active_count()

std::size_t thread_pool::active_count ( ) const
inline

Definition at line 107 of file thread_pool.hpp.

108  {
109  return m_active;
110  }
std::atomic< std::size_t > m_active

◆ clear()

void thread_pool::clear ( )
inline

Definition at line 76 of file thread_pool.hpp.

77  {
78  std::unique_lock<std::mutex> lock{ m_mutex };
79 
80  while (!m_tasks.empty()) {
81  m_tasks.pop();
82  }
83  }
std::queue< task_type > m_tasks
std::mutex m_mutex

◆ join()

void thread_pool::join ( )
inline

Definition at line 86 of file thread_pool.hpp.

87  {
88  m_stop = true;
89  m_notifier.notify_all();
90 
91  for (auto &thread : m_workers) {
92  if (thread.joinable()) {
93  thread.join();
94  }
95  }
96 
97  m_workers.clear();
98  }
std::condition_variable m_notifier
std::atomic< bool > m_stop

◆ next_task()

task_type thread_pool::next_task ( )
inlineprivate

Definition at line 133 of file thread_pool.hpp.

134  {
135  std::unique_lock<std::mutex> lock{ m_mutex };
136 
137  m_notifier.wait(lock, [this]() {
138  return !m_tasks.empty() || m_stop;
139  });
140 
141  if (m_tasks.empty()) {
142  // No pending task
143  return {};
144  }
145 
146  auto task{ m_tasks.front() };
147  m_tasks.pop();
148  return task;
149  }

◆ operator=() [1/2]

thread_pool& thread_pool::operator= ( thread_pool &&  )
default

◆ operator=() [2/2]

thread_pool& thread_pool::operator= ( thread_pool const &  )
delete

◆ push()

template<class Func , class... Args>
auto thread_pool::push ( Func &&  fn,
Args &&...  args 
)
inline

Definition at line 55 of file thread_pool.hpp.

56  {
57  using return_type = typename std::result_of<Func(Args...)>::type;
58 
59  auto task{ std::make_shared<std::packaged_task<return_type()>>(
60  std::bind(std::forward<Func>(fn), std::forward<Args>(args)...)
61  ) };
62 
63  auto future{ task->get_future() };
64  std::unique_lock<std::mutex> lock{ m_mutex };
65 
66  m_tasks.emplace([task]() {
67  (*task)();
68  });
69 
70  lock.unlock();
71  m_notifier.notify_one();
72  return future;
73  }

◆ thread_count()

std::size_t thread_pool::thread_count ( ) const
inline

Definition at line 101 of file thread_pool.hpp.

102  {
103  return m_workers.size();
104  }

◆ thread_loop()

void thread_pool::thread_loop ( )
inlineprivate

Definition at line 114 of file thread_pool.hpp.

115  {
116  while (true) {
117  // Wait for a new task
118  auto task{ next_task() };
119 
120  if (task) {
121  ++m_active;
122  task();
123  --m_active;
124  }
125  else if (m_stop) {
126  // No more task + stop required
127  break;
128  }
129  }
130  }
task_type next_task()

Member Data Documentation

◆ m_active

std::atomic<std::size_t> thread_pool::m_active { 0 }
private

Definition at line 152 of file thread_pool.hpp.

◆ m_mutex

std::mutex thread_pool::m_mutex
private

Definition at line 155 of file thread_pool.hpp.

◆ m_notifier

std::condition_variable thread_pool::m_notifier
private

Definition at line 154 of file thread_pool.hpp.

◆ m_stop

std::atomic<bool> thread_pool::m_stop { false }
private

Definition at line 151 of file thread_pool.hpp.

◆ m_tasks

std::queue<task_type> thread_pool::m_tasks
private

Definition at line 158 of file thread_pool.hpp.

◆ m_workers

std::vector<std::thread> thread_pool::m_workers
private

Definition at line 157 of file thread_pool.hpp.


The documentation for this class was generated from the following file: