Viskores  1.0
AdvectAlgorithmThreaded.h
Go to the documentation of this file.
1 //============================================================================
2 // The contents of this file are covered by the Viskores license. See
3 // LICENSE.txt for details.
4 //
5 // By contributing to this file, all contributors agree to the Developer
6 // Certificate of Origin Version 1.1 (DCO 1.1) as stated in DCO.txt.
7 //============================================================================
8 
9 //============================================================================
10 // Copyright (c) Kitware, Inc.
11 // All rights reserved.
12 // See LICENSE.txt for details.
13 //
14 // This software is distributed WITHOUT ANY WARRANTY; without even
15 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 // PURPOSE. See the above copyright notice for more information.
17 //============================================================================
18 
19 #ifndef viskores_filter_flow_internal_AdvectAlgorithmThreaded_h
20 #define viskores_filter_flow_internal_AdvectAlgorithmThreaded_h
21 
26 
27 #include <thread>
28 
29 namespace viskores
30 {
31 namespace filter
32 {
33 namespace flow
34 {
35 namespace internal
36 {
37 
38 template <typename DSIType>
39 class AdvectAlgorithmThreaded : public AdvectAlgorithm<DSIType>
40 {
41 public:
42  using ParticleType = typename DSIType::PType;
43 
44  AdvectAlgorithmThreaded(const viskores::filter::flow::internal::BoundsMap& bm,
45  std::vector<DSIType>& blocks)
46  : AdvectAlgorithm<DSIType>(bm, blocks)
47  , Done(false)
48  {
49  //For threaded algorithm, the particles go out of scope in the Work method.
50  //When this happens, they are destructed by the time the Manage thread gets them.
51  //Set the copy flag so the std::vector is copied into the ArrayHandle
52  for (auto& block : this->Blocks)
53  block.SetCopySeedFlag(true);
54  }
55 
56  void Go() override
57  {
58  std::vector<std::thread> workerThreads;
59  workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker, this));
60  this->Manage();
61 
62  //This will only work for 1 thread. For > 1, the Blocks will need a mutex.
63  VISKORES_ASSERT(workerThreads.size() == 1);
64  for (auto& t : workerThreads)
65  t.join();
66  }
67 
68 protected:
69  bool HaveWork() override
70  {
71  std::lock_guard<std::mutex> lock(this->Mutex);
72  return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
73  }
74 
75  virtual bool GetDone() override
76  {
77  std::lock_guard<std::mutex> lock(this->Mutex);
78 #ifndef VISKORES_ENABLE_MPI
79  return !this->CheckHaveWork();
80 #else
81  return this->Terminator.Done();
82 #endif
83  }
84 
85  bool WorkerGetDone()
86  {
87  std::lock_guard<std::mutex> lock(this->Mutex);
88  return this->Done;
89  }
90 
91  bool GetActiveParticles(std::vector<ParticleType>& particles, viskores::Id& blockId) override
92  {
93  std::lock_guard<std::mutex> lock(this->Mutex);
94  bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
95  this->WorkerActivate = val;
96  return val;
97  }
98 
99  void UpdateActive(
100  const std::vector<ParticleType>& particles,
101  const std::unordered_map<viskores::Id, std::vector<viskores::Id>>& idsMap) override
102  {
103  if (!particles.empty())
104  {
105  std::lock_guard<std::mutex> lock(this->Mutex);
106  this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
107 
108  //Let workers know there is new work
109  this->WorkerActivateCondition.notify_all();
110  this->WorkerActivate = true;
111  }
112  }
113 
114  void SetDone()
115  {
116  std::lock_guard<std::mutex> lock(this->Mutex);
117  this->Done = true;
118  this->WorkerActivateCondition.notify_all();
119  }
120 
121  static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
122 
123  void WorkerWait()
124  {
125  std::unique_lock<std::mutex> lock(this->Mutex);
126  this->WorkerActivateCondition.wait(lock, [this] { return WorkerActivate || Done; });
127  }
128 
129  void UpdateWorkerResult(viskores::Id blockId, DSIHelperInfo<ParticleType>& b)
130  {
131  std::lock_guard<std::mutex> lock(this->Mutex);
132  auto& it = this->WorkerResults[blockId];
133  it.emplace_back(b);
134  }
135 
136  void Work()
137  {
138  while (!this->WorkerGetDone())
139  {
140  std::vector<ParticleType> v;
141  viskores::Id blockId = -1;
142  if (this->GetActiveParticles(v, blockId))
143  {
144  auto& block = this->GetDataSet(blockId);
145  DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
146  block.Advect(bb, this->StepSize);
147  this->UpdateWorkerResult(blockId, bb);
148  }
149  else
150  this->WorkerWait();
151  }
152  }
153 
154  void Manage()
155  {
156  while (!this->GetDone())
157  {
158  std::unordered_map<viskores::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
159  this->GetWorkerResults(workerResults);
160 
161  viskores::Id numTerm = 0;
162  for (auto& it : workerResults)
163  for (auto& r : it.second)
164  numTerm += this->UpdateResult(r);
165 
166  this->ExchangeParticles();
167  }
168  this->SetDone();
169  }
170 
171  void GetWorkerResults(
172  std::unordered_map<viskores::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
173  {
174  results.clear();
175 
176  std::lock_guard<std::mutex> lock(this->Mutex);
177  if (!this->WorkerResults.empty())
178  {
179  results = this->WorkerResults;
180  this->WorkerResults.clear();
181  }
182  }
183 
184 private:
185  bool CheckHaveWork()
186  {
187  return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
188  }
189 
190  std::atomic<bool> Done;
191  std::mutex Mutex;
192  bool WorkerActivate = false;
193  std::condition_variable WorkerActivateCondition;
194  std::unordered_map<viskores::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
195 };
196 
197 }
198 }
199 }
200 } //viskores::filter::flow::internal
201 
202 #endif //viskores_filter_flow_internal_AdvectAlgorithmThreaded_h
BoundsMap.h
DataSetIntegrator.h
viskores::Id
viskores::Int64 Id
Base type to use to index arrays.
Definition: Types.h:235
viskores
Groups connected points that have the same field value.
Definition: Atomic.h:27
VISKORES_ASSERT
#define VISKORES_ASSERT(condition)
Definition: Assert.h:51
AdvectAlgorithm.h
PartitionedDataSet.h