NGen
Loading...
Searching...
No Matches
parallel_utils.h
1#ifndef NGEN_PARALLEL_UTILS_H
2#define NGEN_PARALLEL_UTILS_H
3
4#include <NGenConfig.h>
5#if NGEN_WITH_MPI
6
7#ifndef MPI_HF_SUB_CODE_GOOD
8#define MPI_HF_SUB_CODE_GOOD 0
9#endif
10
11#ifndef MPI_HF_SUB_CODE_BAD
12#define MPI_HF_SUB_CODE_BAD 1
13#endif
14
15#ifndef NGEN_MPI_DATA_TAG
16#define NGEN_MPI_DATA_TAG 100
17#endif
18
19#ifndef NGEN_MPI_PROTOCOL_TAG
20#define NGEN_MPI_PROTOCOL_TAG 101
21#endif
22
23#include <cstring>
24#include <mpi.h>
25#include <string>
26#include <set>
27#include <vector>
28#if NGEN_WITH_PYTHON
29#include "python/HydrofabricSubsetter.hpp"
30#endif // NGEN_WITH_PYTHON
31
32namespace parallel {
33
53 bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs, const std::string &taskDesc) {
54
55 // Expect 0 is good and 1 is no good for goodCode
56 // TODO: assert this in constructor or somewhere, or maybe just in a unit test
57 unsigned short codeBuffer;
58 bool printMessage = !taskDesc.empty();
59 // For the other ranks, start by properly setting the status code value in the buffer and send to rank 0
60 if (mpi_rank != 0) {
61 codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
62 MPI_Send(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, 0, MPI_COMM_WORLD);
63 }
64 // In rank 0, the first step is to receive and process codes from the other ranks into unified global status
65 else {
66 for (int i = 1; i < mpi_num_procs; ++i) {
67 MPI_Recv(&codeBuffer, 1, MPI_UNSIGNED_SHORT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
68 // If any is ever "not good", overwrite status to be "false"
69 if (codeBuffer != MPI_HF_SUB_CODE_GOOD) {
70 if (printMessage) {
71 std::cout << "Rank " << i << " not successful/ready after " << taskDesc << std::endl;
72 }
73 status = false;
74 }
75 }
76 // Rank 0 must also now prepare the codeBuffer value for broadcasting the global status
77 codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
78 }
79
80 // Execute broadcast of global status rooted at rank 0
81 MPI_Bcast(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD);
82 return codeBuffer == MPI_HF_SUB_CODE_GOOD;
83 }
84
94 bool mpiSyncStatusAnd(bool status, int mpi_rank, int mpi_num_procs) {
95 return mpiSyncStatusAnd(status, mpi_rank, mpi_num_procs, "");
96 }
97
116 bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs, bool printMsg) {
117 std::string name = catchmentDataFile + "." + std::to_string(mpi_rank);
118 // Initialize isGood based on local state. Here, local file is "good" when it already exists.
119 // TODO: this isn't actually checking whether the files are right (just that they are present) so do we need to?
120 bool isGood = utils::FileChecker::file_is_readable(name);
121
122 if (mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs)) {
123 if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric already subdivided in " << mpi_num_procs << " files." << std::endl; }
124 return true;
125 }
126 else {
127 if (printMsg) { std::cout << "Process " << mpi_rank << ": Hydrofabric has not yet been subdivided." << std::endl; }
128 return false;
129 }
130 }
131
141 bool is_hydrofabric_subdivided(int mpi_rank, int mpi_num_procs) {
142 return is_hydrofabric_subdivided(mpi_rank, mpi_num_procs, false);
143 }
144
160 void get_hosts_array(int mpi_rank, int mpi_num_procs, int *host_array) {
161 const int ROOT_RANK = 0;
162 // These are the lengths of the (trimmed) C-string representations of the hostname for each rank
163 std::vector<int> actualHostnameCStrLength(mpi_num_procs);
164 // Initialize to -1 to represent unknown
165 for (int i = 0; i < mpi_num_procs; ++i) {
166 actualHostnameCStrLength[i] = -1;
167 }
168
169 // Get this rank's hostname (things should never be longer than 256)
170 char myhostname[256];
171 gethostname(myhostname, 256);
172
173 // Set the one for this rank
174 actualHostnameCStrLength[mpi_rank] = std::strlen(myhostname);
175
176 // First, gather the hostname string lengths
177 MPI_Gather(&actualHostnameCStrLength[mpi_rank], 1, MPI_INT, actualHostnameCStrLength.data(), 1, MPI_INT, ROOT_RANK,
178 MPI_COMM_WORLD);
179 // Per-rank start offsets/displacements in our hostname strings gather buffer.
180 std::vector<int> recvDisplacements(mpi_num_procs);
181 int totalLength = 0;
182 for (int i = 0; i < mpi_num_procs; ++i) {
183 // Displace each rank's string by the sum of the length of the previous rank strings
184 recvDisplacements[i] = totalLength;
185 // Adding the extra to make space for the \0
186 actualHostnameCStrLength[i] += 1;
187 // Then update the total length
188 totalLength += actualHostnameCStrLength[i];
189
190 }
191 // Now we can create our buffer array and gather the hostname strings into it
192 char hostnames[totalLength];
193 MPI_Gatherv(myhostname, actualHostnameCStrLength[mpi_rank], MPI_CHAR, hostnames, actualHostnameCStrLength.data(),
194 recvDisplacements.data(), MPI_CHAR, ROOT_RANK, MPI_COMM_WORLD);
195
196 if (mpi_rank == ROOT_RANK) {
197 host_array[0] = 0;
198 int next_host_id = 1;
199
200 int rank_with_matching_hostname;
201 char *checked_rank_hostname, *known_rank_hostname;
202
203 for (int rank_being_check = 0; rank_being_check < mpi_num_procs; ++rank_being_check) {
204 // Set this as negative initially for each rank check to indicate no match found (at least yet)
205 rank_with_matching_hostname = -1;
206 // Get a C-string pointer for this rank's hostname, offset by the appropriate displacement
207 checked_rank_hostname = &hostnames[recvDisplacements[rank_being_check]];
208
209 // Assume that hostnames for any ranks less than the current rank being check are already known
210 for (int known_rank = 0; known_rank < rank_being_check; ++known_rank) {
211 // Get the right C-string pointer for the current known rank's hostname also
212 known_rank_hostname = &hostnames[recvDisplacements[known_rank]];
213 // Compare the hostnames, setting and breaking if a match is found
214 if (std::strcmp(known_rank_hostname, checked_rank_hostname) == 0) {
215 rank_with_matching_hostname = known_rank;
216 break;
217 }
218 }
219 // This indicates this rank had no earlier rank with a matching hostname.
220 if (rank_with_matching_hostname < 0) {
221 // Assign new host id, then increment what the next id will be
222 host_array[rank_being_check] = next_host_id++;
223 }
224 else {
225 host_array[rank_being_check] = host_array[rank_with_matching_hostname];
226 }
227 }
228 }
229 // Now, broadcast the results out
230 MPI_Bcast(host_array, mpi_num_procs, MPI_INT, 0, MPI_COMM_WORLD);
231 }
232
243 bool mpi_send_text_file(const char *fileName, const int mpi_rank, const int destRank) {
244 int bufSize = 4096;
245 std::vector<char> buf(bufSize);
246 int code;
247 // How much has been transferred so far
248 int totalNumTransferred = 0;
249
250 FILE *file = fopen(fileName, "r");
251
252 // Transmit error code instead of expected size and return false if file can't be opened
253 if (file == NULL) {
254 // TODO: output error message
255 code = -1;
256 MPI_Send(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
257 return false;
258 }
259
260 // Send expected size to start
261 MPI_Send(&bufSize, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
262
263 // Then get back expected size to infer other side is good to go
264 MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
265 if (code != bufSize) {
266 // TODO: output error message
267 fclose(file);
268 return false;
269 }
270 int continueCode = 1;
271 // Then while there is more of the file to read and send, read the next batch and ...
272 while (fgets(buf.data(), bufSize, file) != NULL) {
273 // Indicate we are ready to continue sending data
274 MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
275
276 // Send this batch
277 MPI_Send(buf.data(), bufSize, MPI_CHAR, destRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD);
278
279 // Then get back a code, which will be -1 if bad and need to exit and otherwise good
280 MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
281 if (code < 0) {
282 // TODO: output error message
283 fclose(file);
284 return false;
285 }
286 }
287 // Once there is no more file to read and send, we should stop continuing
288 continueCode = 0;
289 MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
290 // Expect to get back a code of 0
291 MPI_Recv(&code, 1, MPI_INT, destRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
292 fclose(file);
293 return code == 0;
294 }
295
308 bool mpi_recv_text_file(const char *fileName, const int mpi_rank, const int srcRank) {
309 int bufSize, writeCode;
310 // Receive expected buffer size to start
311 MPI_Recv(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
312
313 // If the sending side couldn't open the file, then immediately return false
314 if (bufSize == -1) {
315 // TODO: output error
316 return false;
317 }
318
319 // Try to open recv file ...
320 FILE *file = fopen(fileName, "w");
321 // ... and let sending size know whether this was successful by sending error code if not ...
322 if (file == NULL) {
323 // TODO: output error message
324 bufSize = -1;
325 MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
326 return false;
327 }
328
329 // Send back the received buffer it if file opened, confirming things are good to go for transfer
330 MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
331
332 // How much has been transferred so far
333 int totalNumTransferred = 0;
334 std::vector<char> buf(bufSize);
335
336 int continueCode;
337
338 while (true) {
339 // Make sure the other side wants to continue sending data
340 MPI_Recv(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
341 if (continueCode <= 0)
342 break;
343
344 MPI_Recv(buf.data(), bufSize, MPI_CHAR, srcRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
345
346 writeCode = fputs(buf.data(), file);
347 MPI_Send(&writeCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
348
349 if (writeCode < 0) {
350 fclose(file);
351 return false;
352 }
353 }
354
355 fclose(file);
356 MPI_Send(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
357 return true;
358 }
359
360
410 bool distribute_subdivided_hydrofabric_files(const std::string &baseCatchmentFile, const std::string &baseNexusFile,
411 const int sendingRank, const int mpi_rank, const int mpi_num_procs,
412 const int *hostIdForRank, bool syncReturnStatus, bool blockAll)
413 {
414 // Start with status as good
415 bool isGood = true;
416 // FIXME: For now, just have rank 0 send everything, but optimize with multiple procs or threads later
417 // Only need to process this if sending rank or a receiving ranks (i.e., not on same host as sending rank)
418 if (mpi_rank == sendingRank || hostIdForRank[mpi_rank] != hostIdForRank[sendingRank]) {
419 // Have the sending rank send out all files
420 if (mpi_rank == sendingRank) {
421 // In rank 0, for all the other ranks ...
422 for (int otherRank = 0; otherRank < mpi_num_procs; ++otherRank) {
423 // If another rank is on a different host (note that this covers otherRank == sendingRank case) ...
424 if (hostIdForRank[otherRank] != hostIdForRank[mpi_rank]) {
425 // ... then send that rank its rank-specific catchment and nexus files
426 std::string catFileToSend = baseCatchmentFile + "." + std::to_string(otherRank);
427 std::string nexFileToSend = baseNexusFile + "." + std::to_string(otherRank);
428 // Note that checking previous isGood is necessary here because of loop
429 isGood = isGood && mpi_send_text_file(catFileToSend.c_str(), mpi_rank, otherRank);
430 isGood = isGood && mpi_send_text_file(nexFileToSend.c_str(), mpi_rank, otherRank);
431 }
432 }
433 }
434 else {
435 // For a rank not on the same host as the sending rank, receive the transmitted file
436 std::string catFileToReceive = baseCatchmentFile + "." + std::to_string(mpi_rank);
437 std::string nexFileToReceive = baseNexusFile + "." + std::to_string(mpi_rank);
438 // Note that, unlike a bit earlier, don't need to check prior isGood in 1st receive, because not in loop
439 isGood = mpi_recv_text_file(catFileToReceive.c_str(), mpi_rank, sendingRank);
440 isGood = isGood && mpi_recv_text_file(nexFileToReceive.c_str(), mpi_rank, sendingRank);
441 }
442 }
443
444 // Wait when appropriate
445 if (blockAll) { MPI_Barrier(MPI_COMM_WORLD); }
446
447 // Sync status among the ranks also, if appropriate
448 if (syncReturnStatus) {
449 return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "distributing subdivided hydrofabric files");
450 }
451 // Otherwise, just return the local status value
452 else {
453 return isGood;
454 }
455 }
456
457
472 bool subdivide_hydrofabric(int mpi_rank, int mpi_num_procs, const std::string &catchmentDataFile,
473 const std::string &nexusDataFile, const std::string &partitionConfigFile)
474 {
475 // Track whether things are good, meaning ok to continue and, at the end, whether successful
476 // Start with a value of true
477 bool isGood = true;
478
479 #if !NGEN_WITH_PYTHON
480 // We can't be good to proceed with this, because Python is not active
481 isGood = false;
482 std::cerr << "Driver is unable to perform required hydrofabric subdividing when Python integration is not active." << std::endl;
483
484
485 // Sync with the rest of the ranks and bail if any aren't ready to proceed for any reason
486 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider")) {
487 return false;
488 }
489 #else // i.e., #if NGEN_WITH_PYTHON
490 // Have rank 0 handle the generation task for all files/partitions
491 std::unique_ptr<utils::ngenPy::HydrofabricSubsetter> subdivider;
492 // Have rank 0 handle the generation task for all files/partitions
493 if (mpi_rank == 0) {
494 try {
495 subdivider = std::make_unique<utils::ngenPy::HydrofabricSubsetter>(catchmentDataFile, nexusDataFile,
496 partitionConfigFile);
497 }
498 catch (const std::exception &e) {
499 std::cerr << e.what() << std::endl;
500 // Set not good if the subdivider object couldn't be instantiated
501 isGood = false;
502 }
503 }
504 // Sync ranks and bail if any aren't ready to proceed for any reason
505 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "initializing hydrofabric subdivider")) {
506 return false;
507 }
508
509 if (mpi_rank == 0) {
510 // Try to perform the subdividing
511 try {
512 isGood = subdivider->execSubdivision();
513 }
514 catch (const std::exception &e) {
515 std::cerr << e.what() << std::endl;
516 // Set not good if the subdivider object couldn't be instantiated
517 isGood = false;
518 }
519 }
520 // Sync ranks again here on whether subdividing was successful, having them all exit at this point if not
521 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, "executing hydrofabric subdivision")) {
522 return false;
523 }
524 // But if the subdividing went fine ...
525 else {
526 // ... figure out what ranks are on hosts with each other by getting an id for host of each rank
527 std::vector<int> hostIdForRank(mpi_num_procs);
528 get_hosts_array(mpi_rank, mpi_num_procs, hostIdForRank.data());
529
530 // ... then (when necessary) transferring files around
531 return distribute_subdivided_hydrofabric_files(catchmentDataFile, nexusDataFile, 0, mpi_rank,
532 mpi_num_procs, hostIdForRank.data(), true, true);
533
534 }
535 #endif // NGEN_WITH_PYTHON
536 }
537}
538
539#endif // NGEN_WITH_MPI
540
541#endif //NGEN_PARALLEL_UTILS_H
static bool file_is_readable(std::string path)
Check whether the path provided points to an existing, readable file.
Definition FileChecker.h:90