5G-MAG Reference Tools - MBMS Modem
thread_pool.hpp
Go to the documentation of this file.
1 // Copyright (c) 2018 Ethan Margaillan <contact@ethan.jp>.
2 // Licensed under the MIT Licence - https://raw.githubusercontent.com/Ethan13310/Thread-Pool-Cpp/master/LICENSE
3 
4 #pragma once
5 
6 #include <atomic>
7 #include <condition_variable>
8 #include <functional>
9 #include <future>
10 #include <memory>
11 #include <mutex>
12 #include <queue>
13 #include <thread>
14 #include <type_traits>
15 #include <vector>
16 
18 {
19  // Task function
20  using task_type = std::function<void()>;
21 
22 public:
23  explicit thread_pool(std::size_t thread_count = std::thread::hardware_concurrency(), int phy_prio = 10)
24  {
25  struct sched_param thread_param;
26  thread_param.sched_priority = phy_prio;
27 
28  for (std::size_t i{ 0 }; i < thread_count; ++i) {
29  spdlog::info("Launching phy thread with realtime scheduling priority {}", thread_param.sched_priority );
30  m_workers.emplace_back(std::bind(&thread_pool::thread_loop, this));
31 
32  int error = pthread_setschedparam( m_workers.back().native_handle(), SCHED_RR, &thread_param );
33  if( error )
34  {
35  spdlog::error("Cannot set phy thread priority to realtime: {}. Thread will run at default priority.", strerror(error));
36  }
37  }
38  }
39 
41  {
42  if (m_workers.size() > 0) {
43  join();
44  }
45  }
46 
47  thread_pool(thread_pool const &) = delete;
48  thread_pool(thread_pool &&) = default;
49 
50  thread_pool &operator=(thread_pool const &) = delete;
52 
53  // Push a new task into the queue
54  template <class Func, class... Args>
55  auto push(Func &&fn, Args &&...args)
56  {
57  using return_type = typename std::result_of<Func(Args...)>::type;
58 
59  auto task{ std::make_shared<std::packaged_task<return_type()>>(
60  std::bind(std::forward<Func>(fn), std::forward<Args>(args)...)
61  ) };
62 
63  auto future{ task->get_future() };
64  std::unique_lock<std::mutex> lock{ m_mutex };
65 
66  m_tasks.emplace([task]() {
67  (*task)();
68  });
69 
70  lock.unlock();
71  m_notifier.notify_one();
72  return future;
73  }
74 
75  // Remove all pending tasks from the queue
76  void clear()
77  {
78  std::unique_lock<std::mutex> lock{ m_mutex };
79 
80  while (!m_tasks.empty()) {
81  m_tasks.pop();
82  }
83  }
84 
85  // Wait all workers to finish
86  void join()
87  {
88  m_stop = true;
89  m_notifier.notify_all();
90 
91  for (auto &thread : m_workers) {
92  if (thread.joinable()) {
93  thread.join();
94  }
95  }
96 
97  m_workers.clear();
98  }
99 
100  // Get the number of active and waiting workers
101  std::size_t thread_count() const
102  {
103  return m_workers.size();
104  }
105 
106  // Get the number of active workers
107  std::size_t active_count() const
108  {
109  return m_active;
110  }
111 
112 private:
113  // Thread main loop
114  void thread_loop()
115  {
116  while (true) {
117  // Wait for a new task
118  auto task{ next_task() };
119 
120  if (task) {
121  ++m_active;
122  task();
123  --m_active;
124  }
125  else if (m_stop) {
126  // No more task + stop required
127  break;
128  }
129  }
130  }
131 
132  // Get the next pending task
134  {
135  std::unique_lock<std::mutex> lock{ m_mutex };
136 
137  m_notifier.wait(lock, [this]() {
138  return !m_tasks.empty() || m_stop;
139  });
140 
141  if (m_tasks.empty()) {
142  // No pending task
143  return {};
144  }
145 
146  auto task{ m_tasks.front() };
147  m_tasks.pop();
148  return task;
149  }
150 
151  std::atomic<bool> m_stop{ false };
152  std::atomic<std::size_t> m_active{ 0 };
153 
154  std::condition_variable m_notifier;
155  std::mutex m_mutex;
156 
157  std::vector<std::thread> m_workers;
158  std::queue<task_type> m_tasks;
159 };
thread_pool(thread_pool &&)=default
void clear()
Definition: thread_pool.hpp:76
thread_pool & operator=(thread_pool &&)=default
std::atomic< std::size_t > m_active
std::size_t thread_count() const
std::vector< std::thread > m_workers
std::condition_variable m_notifier
std::size_t active_count() const
thread_pool & operator=(thread_pool const &)=delete
void thread_loop()
std::queue< task_type > m_tasks
thread_pool(std::size_t thread_count=std::thread::hardware_concurrency(), int phy_prio=10)
Definition: thread_pool.hpp:23
auto push(Func &&fn, Args &&...args)
Definition: thread_pool.hpp:55
std::mutex m_mutex
std::function< void()> task_type
Definition: thread_pool.hpp:20
thread_pool(thread_pool const &)=delete
task_type next_task()
std::atomic< bool > m_stop