Viskores  1.0
ParticleExchanger.h
Go to the documentation of this file.
1 //============================================================================
2 // The contents of this file are covered by the Viskores license. See
3 // LICENSE.txt for details.
4 //
5 // By contributing to this file, all contributors agree to the Developer
6 // Certificate of Origin Version 1.1 (DCO 1.1) as stated in DCO.txt.
7 //============================================================================
8 
9 //============================================================================
10 // Copyright (c) Kitware, Inc.
11 // All rights reserved.
12 // See LICENSE.txt for details.
13 //
14 // This software is distributed WITHOUT ANY WARRANTY; without even
15 // the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
16 // PURPOSE. See the above copyright notice for more information.
17 //============================================================================
18 
19 #ifndef viskores_filter_flow_internal_ParticleExchanger_h
20 #define viskores_filter_flow_internal_ParticleExchanger_h
21 
22 namespace viskores
23 {
24 namespace filter
25 {
26 namespace flow
27 {
28 namespace internal
29 {
30 
31 template <typename ParticleType>
32 class ParticleExchanger
33 {
34 public:
35 #ifdef VISKORES_ENABLE_MPI
36  ParticleExchanger(viskoresdiy::mpi::communicator& comm)
37  : MPIComm(viskoresdiy::mpi::mpi_cast(comm.handle()))
38  , NumRanks(comm.size())
39  , Rank(comm.rank())
40 #else
41  ParticleExchanger(viskoresdiy::mpi::communicator& viskoresNotUsed(comm))
42 #endif
43  {
44  }
45 #ifdef VISKORES_ENABLE_MPI
46  ~ParticleExchanger() {} //{ this->CleanupSendBuffers(false); }
47 #endif
48 
49  bool HaveWork() const { return !this->SendBuffers.empty(); }
50 
51  void Exchange(const std::vector<ParticleType>& outData,
52  const std::vector<viskores::Id>& outRanks,
53  const std::unordered_map<viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap,
54  std::vector<ParticleType>& inData,
55  std::unordered_map<viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap)
56  {
57  VISKORES_ASSERT(outData.size() == outRanks.size());
58 
59  if (this->NumRanks == 1)
60  this->SerialExchange(outData, outBlockIDsMap, inData, inDataBlockIDsMap);
61 #ifdef VISKORES_ENABLE_MPI
62  else
63  {
64  this->CleanupSendBuffers(true);
65  this->SendParticles(outData, outRanks, outBlockIDsMap);
66  this->RecvParticles(inData, inDataBlockIDsMap);
67  }
68 #endif
69  }
70 
71 private:
72  void SerialExchange(
73  const std::vector<ParticleType>& outData,
74  const std::unordered_map<viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap,
75  std::vector<ParticleType>& inData,
76  std::unordered_map<viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap)
77  {
78  //Copy output to input.
79  for (const auto& p : outData)
80  {
81  const auto& bids = outBlockIDsMap.find(p.GetID())->second;
82  inData.emplace_back(p);
83  inDataBlockIDsMap[p.GetID()] = bids;
84  }
85  }
86 
87 #ifdef VISKORES_ENABLE_MPI
88  using ParticleCommType = std::pair<ParticleType, std::vector<viskores::Id>>;
89 
90  void CleanupSendBuffers(bool checkRequests)
91  {
92  if (!checkRequests)
93  {
94  for (auto& entry : this->SendBuffers)
95  delete entry.second;
96  this->SendBuffers.clear();
97  return;
98  }
99 
100  if (this->SendBuffers.empty())
101  return;
102 
103  std::vector<MPI_Request> requests;
104  for (auto& req : this->SendBuffers)
105  requests.emplace_back(req.first);
106 
107  //MPI_Testsome will update the complete requests to MPI_REQUEST_NULL.
108  //Because we are using the MPI_Request as a key in SendBuffers, we need
109  //to make a copy.
110  auto requestsOrig = requests;
111 
112  std::vector<MPI_Status> status(requests.size());
113  std::vector<int> indices(requests.size());
114  int num = 0;
115  int err = MPI_Testsome(requests.size(), requests.data(), &num, indices.data(), status.data());
116 
117  if (err != MPI_SUCCESS)
119  "Error with MPI_Testsome in ParticleExchanger::CleanupSendBuffers");
120 
121  if (num > 0)
122  {
123  for (int i = 0; i < num; i++)
124  {
125  std::size_t idx = static_cast<std::size_t>(indices[i]);
126  const auto& req = requestsOrig[idx];
127  //const auto& stat = status[idx];
128  auto it = this->SendBuffers.find(req);
129  if (it == this->SendBuffers.end())
131  "Missing request in ParticleExchanger::CleanupSendBuffers");
132 
133  //Delete the buffer and remove from SendBuffers.
134  delete it->second;
135  this->SendBuffers.erase(it);
136  //std::cout<<this->Rank<<" SendBuffer: Delete"<<std::endl;
137  }
138  }
139  }
140 
141  void SendParticles(
142  const std::vector<ParticleType>& outData,
143  const std::vector<viskores::Id>& outRanks,
144  const std::unordered_map<viskores::Id, std::vector<viskores::Id>>& outBlockIDsMap)
145  {
146  if (outData.empty())
147  return;
148 
149  //create the send data: vector of particles, vector of vector of blockIds.
150  std::size_t n = outData.size();
151  std::unordered_map<int, std::vector<ParticleCommType>> sendData;
152 
153  // dst, vector of pair(particles, blockIds)
154  for (std::size_t i = 0; i < n; i++)
155  {
156  const auto& bids = outBlockIDsMap.find(outData[i].GetID())->second;
157  //sendData[outRanks[i]].emplace_back(std::make_pair(std::move(outData[i]), std::move(bids)));
158  sendData[outRanks[i]].emplace_back(std::make_pair(outData[i], bids));
159  }
160 
161  //Send to dst, vector<pair<particle, bids>>
162  for (auto& si : sendData)
163  this->SendParticlesToDst(si.first, si.second);
164  }
165 
166  void SendParticlesToDst(int dst, const std::vector<ParticleCommType>& data)
167  {
168  if (dst == this->Rank)
169  {
170  VISKORES_LOG_S(viskores::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
171  return;
172  }
173 
174  //Serialize vector(pair(particle, bids)) and send.
175  viskoresdiy::MemoryBuffer* bb = new viskoresdiy::MemoryBuffer();
176  viskoresdiy::save(*bb, data);
177  bb->reset();
178 
179  MPI_Request req;
180  int err =
181  MPI_Isend(bb->buffer.data(), bb->size(), MPI_BYTE, dst, this->Tag, this->MPIComm, &req);
182  if (err != MPI_SUCCESS)
183  throw viskores::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
184  this->SendBuffers[req] = bb;
185  }
186 
187  void RecvParticles(
188  std::vector<ParticleType>& inData,
189  std::unordered_map<viskores::Id, std::vector<viskores::Id>>& inDataBlockIDsMap) const
190  {
191  inData.resize(0);
192  inDataBlockIDsMap.clear();
193 
194  std::vector<viskoresdiy::MemoryBuffer> buffers;
195 
196  MPI_Status status;
197  while (true)
198  {
199  int flag = 0;
200  int err = MPI_Iprobe(MPI_ANY_SOURCE, this->Tag, this->MPIComm, &flag, &status);
201  if (err != MPI_SUCCESS)
203  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
204 
205  if (flag == 0) //no message arrived we are done.
206  break;
207 
208  //Otherwise, recv the incoming data
209  int incomingSize;
210  err = MPI_Get_count(&status, MPI_BYTE, &incomingSize);
211  if (err != MPI_SUCCESS)
213  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
214 
215  std::vector<char> recvBuff;
216  recvBuff.resize(incomingSize);
217  MPI_Status recvStatus;
218 
219  err = MPI_Recv(recvBuff.data(),
220  incomingSize,
221  MPI_BYTE,
222  status.MPI_SOURCE,
223  status.MPI_TAG,
224  this->MPIComm,
225  &recvStatus);
226  if (err != MPI_SUCCESS)
228  "Error in MPI_Probe in ParticleExchanger::RecvParticles");
229 
230  //Add incoming data to inData and inDataBlockIds.
231  viskoresdiy::MemoryBuffer memBuff;
232  memBuff.save_binary(recvBuff.data(), incomingSize);
233  memBuff.reset();
234 
235  std::vector<ParticleCommType> data;
236  viskoresdiy::load(memBuff, data);
237  memBuff.reset();
238  for (const auto& d : data)
239  {
240  const auto& particle = d.first;
241  const auto& bids = d.second;
242  inDataBlockIDsMap[particle.GetID()] = bids;
243  inData.emplace_back(particle);
244  }
245 
246  //Note, we don't terminate the while loop here. We want to go back and
247  //check if any messages came in while buffers were being processed.
248  }
249  }
250 
251  MPI_Comm MPIComm;
252  viskores::Id NumRanks;
253  viskores::Id Rank;
254  std::unordered_map<MPI_Request, viskoresdiy::MemoryBuffer*> SendBuffers;
255  int Tag = 100;
256 #else
257  viskores::Id NumRanks = 1;
258  viskores::Id Rank = 0;
259 #endif
260 };
261 
262 }
263 }
264 }
265 } //viskores::filter::flow::internal
266 
267 
268 #endif //viskores_filter_flow_internal_ParticleExchanger_h
viskores::exec::arg::load
T load(const U &u, viskores::Id v)
Definition: FetchTagArrayDirectIn.h:44
viskores::cont::LogLevel::Error
@ Error
Important but non-fatal errors, such as device fail-over.
viskoresNotUsed
#define viskoresNotUsed(parameter_name)
Simple macro to identify a parameter as unused.
Definition: ExportMacros.h:136
viskores::cont::ErrorFilterExecution
This class is primarily intended to filters to throw in the control environment to indicate an execut...
Definition: ErrorFilterExecution.h:35
viskores::Id
viskores::Int64 Id
Base type to use to index arrays.
Definition: Types.h:235
viskores
Groups connected points that have the same field value.
Definition: Atomic.h:27
VISKORES_ASSERT
#define VISKORES_ASSERT(condition)
Definition: Assert.h:51
VISKORES_LOG_S
#define VISKORES_LOG_S(level,...)
Writes a message using stream syntax to the indicated log level.
Definition: Logging.h:216