19 #ifndef viskores_filter_flow_internal_AdvectAlgorithmThreaded_h
20 #define viskores_filter_flow_internal_AdvectAlgorithmThreaded_h
38 template <
typename DSIType>
39 class AdvectAlgorithmThreaded :
public AdvectAlgorithm<DSIType>
42 using ParticleType =
typename DSIType::PType;
44 AdvectAlgorithmThreaded(
const viskores::filter::flow::internal::BoundsMap& bm,
45 std::vector<DSIType>& blocks)
46 : AdvectAlgorithm<DSIType>(bm, blocks)
52 for (
auto& block : this->Blocks)
53 block.SetCopySeedFlag(
true);
58 std::vector<std::thread> workerThreads;
59 workerThreads.emplace_back(std::thread(AdvectAlgorithmThreaded::Worker,
this));
64 for (
auto& t : workerThreads)
69 bool HaveWork()
override
71 std::lock_guard<std::mutex> lock(this->Mutex);
72 return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
75 virtual bool GetDone()
override
77 std::lock_guard<std::mutex> lock(this->Mutex);
78 #ifndef VISKORES_ENABLE_MPI
79 return !this->CheckHaveWork();
81 return this->Terminator.Done();
87 std::lock_guard<std::mutex> lock(this->Mutex);
91 bool GetActiveParticles(std::vector<ParticleType>& particles,
viskores::Id& blockId)
override
93 std::lock_guard<std::mutex> lock(this->Mutex);
94 bool val = this->AdvectAlgorithm<DSIType>::GetActiveParticles(particles, blockId);
95 this->WorkerActivate = val;
100 const std::vector<ParticleType>& particles,
101 const std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& idsMap)
override
103 if (!particles.empty())
105 std::lock_guard<std::mutex> lock(this->Mutex);
106 this->AdvectAlgorithm<DSIType>::UpdateActive(particles, idsMap);
109 this->WorkerActivateCondition.notify_all();
110 this->WorkerActivate =
true;
116 std::lock_guard<std::mutex> lock(this->Mutex);
118 this->WorkerActivateCondition.notify_all();
121 static void Worker(AdvectAlgorithmThreaded* algo) { algo->Work(); }
125 std::unique_lock<std::mutex> lock(this->Mutex);
126 this->WorkerActivateCondition.wait(lock, [
this] {
return WorkerActivate || Done; });
129 void UpdateWorkerResult(
viskores::Id blockId, DSIHelperInfo<ParticleType>& b)
131 std::lock_guard<std::mutex> lock(this->Mutex);
132 auto& it = this->WorkerResults[blockId];
138 while (!this->WorkerGetDone())
140 std::vector<ParticleType> v;
142 if (this->GetActiveParticles(v, blockId))
144 auto& block = this->GetDataSet(blockId);
145 DSIHelperInfo<ParticleType> bb(v, this->BoundsMap, this->ParticleBlockIDsMap);
146 block.Advect(bb, this->StepSize);
147 this->UpdateWorkerResult(blockId, bb);
156 while (!this->GetDone())
158 std::unordered_map<viskores::Id, std::vector<DSIHelperInfo<ParticleType>>> workerResults;
159 this->GetWorkerResults(workerResults);
162 for (
auto& it : workerResults)
163 for (
auto& r : it.second)
164 numTerm += this->UpdateResult(r);
166 this->ExchangeParticles();
171 void GetWorkerResults(
172 std::unordered_map<
viskores::Id, std::vector<DSIHelperInfo<ParticleType>>>& results)
176 std::lock_guard<std::mutex> lock(this->Mutex);
177 if (!this->WorkerResults.empty())
179 results = this->WorkerResults;
180 this->WorkerResults.clear();
187 return this->AdvectAlgorithm<DSIType>::HaveWork() || this->WorkerActivate;
190 std::atomic<bool> Done;
192 bool WorkerActivate =
false;
193 std::condition_variable WorkerActivateCondition;
194 std::unordered_map<viskores::Id, std::vector<DSIHelperInfo<ParticleType>>> WorkerResults;
202 #endif //viskores_filter_flow_internal_AdvectAlgorithmThreaded_h