19 #ifndef viskores_filter_flow_internal_ParticleExchanger_h
20 #define viskores_filter_flow_internal_ParticleExchanger_h
31 template <
typename ParticleType>
32 class ParticleExchanger
35 #ifdef VISKORES_ENABLE_MPI
36 ParticleExchanger(viskoresdiy::mpi::communicator& comm)
37 : MPIComm(viskoresdiy::mpi::mpi_cast(comm.handle()))
38 , NumRanks(comm.size())
41 ParticleExchanger(viskoresdiy::mpi::communicator&
viskoresNotUsed(comm))
45 #ifdef VISKORES_ENABLE_MPI
46 ~ParticleExchanger() {}
49 bool HaveWork()
const {
return !this->SendBuffers.empty(); }
51 void Exchange(
const std::vector<ParticleType>& outData,
52 const std::vector<viskores::Id>& outRanks,
53 const std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap,
54 std::vector<ParticleType>& inData,
55 std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap)
59 if (this->NumRanks == 1)
60 this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
61 #ifdef VISKORES_ENABLE_MPI
64 this->CleanupSendBuffers(
true);
65 this->SendParticles(outData, outRanks, outBlockIDsMap);
66 this->RecvParticles(inData, inDataBlockIDsMap);
73 const std::vector<ParticleType>& outData,
74 const std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap,
75 std::vector<ParticleType>& inData,
76 std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap)
79 for (
const auto& p : outData)
81 const auto& bids = outBlockIDsMap.find(p.GetID())->second;
82 inData.emplace_back(p);
83 inDataBlockIDsMap[p.GetID()] = bids;
87 #ifdef VISKORES_ENABLE_MPI
88 using ParticleCommType = std::pair<ParticleType, std::vector<viskores::Id>>;
90 void CleanupSendBuffers(
bool checkRequests)
94 for (
auto& entry : this->SendBuffers)
96 this->SendBuffers.clear();
100 if (this->SendBuffers.empty())
103 std::vector<MPI_Request> requests;
104 for (
auto& req : this->SendBuffers)
105 requests.emplace_back(req.first);
110 auto requestsOrig = requests;
112 std::vector<MPI_Status> status(requests.size());
113 std::vector<int> indices(requests.size());
115 int err = MPI_Testsome(requests.size(), requests.data(), &num, indices.data(), status.data());
117 if (err != MPI_SUCCESS)
119 "Error with MPI_Testsome in ParticleExchanger::CleanupSendBuffers");
123 for (
int i = 0; i < num; i++)
125 std::size_t idx =
static_cast<std::size_t
>(indices[i]);
126 const auto& req = requestsOrig[idx];
128 auto it = this->SendBuffers.find(req);
129 if (it == this->SendBuffers.end())
131 "Missing request in ParticleExchanger::CleanupSendBuffers");
135 this->SendBuffers.erase(it);
142 const std::vector<ParticleType>& outData,
143 const std::vector<viskores::Id>& outRanks,
144 const std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap)
150 std::size_t n = outData.size();
151 std::unordered_map<int, std::vector<ParticleCommType>> sendData;
154 for (std::size_t i = 0; i < n; i++)
156 const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
158 sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
162 for (
auto& si : sendData)
163 this->SendParticlesToDst(si.first, si.second);
166 void SendParticlesToDst(
int dst,
const std::vector<ParticleCommType>& data)
168 if (dst == this->Rank)
175 viskoresdiy::MemoryBuffer* bb =
new viskoresdiy::MemoryBuffer();
176 viskoresdiy::save(*bb, data);
181 MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, this->Tag, this->MPIComm, &req);
182 if (err != MPI_SUCCESS)
184 this->SendBuffers[req] = bb;
188 std::vector<ParticleType>& inData,
189 std::unordered_map<
viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap)
const
192 inDataBlockIDsMap.clear();
194 std::vector<viskoresdiy::MemoryBuffer> buffers;
200 int err = MPI_Iprobe(MPI_ANY_SOURCE, this->Tag, this->MPIComm, &flag, &status);
201 if (err != MPI_SUCCESS)
203 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
210 err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
211 if (err != MPI_SUCCESS)
213 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
215 std::vector<char> recvBuff;
216 recvBuff.resize(incomingSize);
217 MPI_Status recvStatus;
219 err = MPI_Recv(recvBuff.data(),
226 if (err != MPI_SUCCESS)
228 "Error in MPI_Probe in ParticleExchanger::RecvParticles");
231 viskoresdiy::MemoryBuffer memBuff;
232 memBuff.save_binary(recvBuff.data(), incomingSize);
235 std::vector<ParticleCommType> data;
238 for (
const auto& d : data)
240 const auto& particle = d.first;
241 const auto& bids = d.second;
242 inDataBlockIDsMap[particle.GetID()] = bids;
243 inData.emplace_back(particle);
254 std::unordered_map<MPI_Request, viskoresdiy::MemoryBuffer*> SendBuffers;
268 #endif //viskores_filter_flow_internal_ParticleExchanger_h