SourceXtractorPlusPlus  0.8
Please provide a description of the project.
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros Groups Pages
MultithreadedMeasurement.cpp
Go to the documentation of this file.
1 
17 /*
18  * MultiThreadedMeasurement.cpp
19  *
20  * Created on: May 23, 2018
21  * Author: mschefer
22  */
23 
24 #include <iostream>
25 #include <chrono>
26 #include <atomic>
27 #include <ElementsKernel/Logging.h>
28 #include <csignal>
29 
32 
33 using namespace SourceXtractor;
34 
36 
38 
40  // Start worker threads
42  for (int i=0; i<m_worker_threads_nb; i++) {
43  m_worker_threads.emplace_back(std::make_shared<std::thread>(workerThreadStatic, this, i));
44  }
45 
46  // Start output thread
47  m_output_thread = std::make_shared<std::thread>(outputThreadStatic, this, m_worker_threads_nb);
48 }
49 
51  logger.debug() << "Waiting for worker threads";
52 
53  // set flag to indicate no new input will be coming
54  {
56  m_input_done = true;
58  }
59 
60  // Wait for all threads to finish
61  for (int i=0; i<m_worker_threads_nb; i++) {
62  m_worker_threads[i]->join();
63  }
64  m_output_thread->join();
65 
66  logger.debug() << "All worker threads done!";
67 }
68 
71 
72  //Force computation of SourceID here, where the order is still deterministic
73  for (auto& source : *source_group) {
74  source.getProperty<SourceID>();
75  }
76 
77  // put the new SourceGroup into the input queue
78  m_input_queue.emplace_back(m_group_counter++, source_group);
79 
80  // notify one worker thread that there is an available input
82 }
83 
85  logger.debug() << "Starting worker thread " << id;
86  try {
87  measurement->workerThreadLoop();
88  }
89  catch (const Elements::Exception &e) {
90  logger.fatal() << "Worker thread " << id << " got an exception!";
91  logger.fatal() << e.what();
92  if (!measurement->m_abort_raised.exchange(true)) {
93  logger.fatal() << "Aborting the execution";
94  ::raise(SIGTERM);
95  }
96  }
97  logger.debug() << "Stopping worker thread " << id;
98 }
99 
101  logger.debug() << "Starting output thread " << id;
102  try {
103  measurement->outputThreadLoop();
104  }
105  catch (const Elements::Exception &e) {
106  logger.fatal() << "Output thread got an exception!";
107  logger.fatal() << e.what();
108  if (!measurement->m_abort_raised.exchange(true)) {
109  logger.fatal() << "Aborting the execution";
110  ::raise(SIGTERM);
111  }
112  }
113  logger.debug() << "Stopping output thread " << id;
114 }
115 
117  while (true) {
118  int order_number;
120  {
122 
123  // We should end the thread once we're done with all input
124  if (m_input_done && m_input_queue.empty()) {
125  break;
126  }
127 
128  // If the queue is empty but we expect more data, wait
129  if (m_input_queue.empty()) {
131  continue;
132  }
133 
134  order_number = m_input_queue.front().first;
135  source_group = m_input_queue.front().second;
136  m_input_queue.pop_front();
137  }
138 
139  // Trigger measurements
140  for (auto& source : *source_group) {
141  m_source_to_row(source);
142  }
143 
144  {
146  m_output_queue.emplace_back(order_number, source_group);
147  source_group = nullptr;
149  }
150  }
151 
152  // Before ending the thread, decrement active threads counter
153  {
156  }
157 }
158 
160  while (true) {
161  {
163 
164  // Wait for something in the output queue
165  if (m_output_queue.empty()) {
167  }
168 
169  // Process the output queue
170  while(!m_output_queue.empty()) {
171  notifyObservers(m_output_queue.front().second);
172  m_output_queue.pop_front();
173  }
174  }
175 
176  {
179  if (m_active_threads <= 0 && m_output_queue.empty()) {
180  break;
181  }
182  }
183  }
184 }
185 
186 
static Elements::Logging logger
void notifyObservers(const std::shared_ptr< SourceGroupInterface > &message) const
Definition: Observable.h:71
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_input_queue
std::shared_ptr< std::thread > m_output_thread
constexpr double e
static void outputThreadStatic(MultithreadedMeasurement *measurement, int id)
std::vector< std::shared_ptr< std::thread > > m_worker_threads
void debug(const std::string &logMessage)
void fatal(const std::string &logMessage)
std::list< std::pair< int, std::shared_ptr< SourceGroupInterface > > > m_output_queue
void handleMessage(const std::shared_ptr< SourceGroupInterface > &source_group) override
static void workerThreadStatic(MultithreadedMeasurement *measurement, int id)
const char * what() const noexceptoverride
static Logging getLogger(const std::string &name="")