1#ifndef NGEN_PARALLEL_UTILS_H
2#define NGEN_PARALLEL_UTILS_H
7#ifndef MPI_HF_SUB_CODE_GOOD
8#define MPI_HF_SUB_CODE_GOOD 0
11#ifndef MPI_HF_SUB_CODE_BAD
12#define MPI_HF_SUB_CODE_BAD 1
15#ifndef NGEN_MPI_DATA_TAG
16#define NGEN_MPI_DATA_TAG 100
19#ifndef NGEN_MPI_PROTOCOL_TAG
20#define NGEN_MPI_PROTOCOL_TAG 101
29#include "python/HydrofabricSubsetter.hpp"
53 bool mpiSyncStatusAnd(
bool status,
int mpi_rank,
int mpi_num_procs,
const std::string &taskDesc) {
57 unsigned short codeBuffer;
58 bool printMessage = !taskDesc.empty();
61 codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
62 MPI_Send(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, 0, MPI_COMM_WORLD);
66 for (
int i = 1; i < mpi_num_procs; ++i) {
67 MPI_Recv(&codeBuffer, 1, MPI_UNSIGNED_SHORT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
69 if (codeBuffer != MPI_HF_SUB_CODE_GOOD) {
71 std::cout <<
"Rank " << i <<
" not successful/ready after " << taskDesc << std::endl;
77 codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
81 MPI_Bcast(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD);
82 return codeBuffer == MPI_HF_SUB_CODE_GOOD;
94 bool mpiSyncStatusAnd(
bool status,
int mpi_rank,
int mpi_num_procs) {
95 return mpiSyncStatusAnd(status, mpi_rank, mpi_num_procs,
"");
116 bool is_hydrofabric_subdivided(
int mpi_rank,
int mpi_num_procs,
bool printMsg) {
117 std::string name = catchmentDataFile +
"." + std::to_string(mpi_rank);
122 if (mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs)) {
123 if (printMsg) { std::cout <<
"Process " << mpi_rank <<
": Hydrofabric already subdivided in " << mpi_num_procs <<
" files." << std::endl; }
127 if (printMsg) { std::cout <<
"Process " << mpi_rank <<
": Hydrofabric has not yet been subdivided." << std::endl; }
141 bool is_hydrofabric_subdivided(
int mpi_rank,
int mpi_num_procs) {
142 return is_hydrofabric_subdivided(mpi_rank, mpi_num_procs,
false);
160 void get_hosts_array(
int mpi_rank,
int mpi_num_procs,
int *host_array) {
161 const int ROOT_RANK = 0;
163 std::vector<int> actualHostnameCStrLength(mpi_num_procs);
165 for (
int i = 0; i < mpi_num_procs; ++i) {
166 actualHostnameCStrLength[i] = -1;
170 char myhostname[256];
171 gethostname(myhostname, 256);
174 actualHostnameCStrLength[mpi_rank] = std::strlen(myhostname);
177 MPI_Gather(&actualHostnameCStrLength[mpi_rank], 1, MPI_INT, actualHostnameCStrLength.data(), 1, MPI_INT, ROOT_RANK,
180 std::vector<int> recvDisplacements(mpi_num_procs);
182 for (
int i = 0; i < mpi_num_procs; ++i) {
184 recvDisplacements[i] = totalLength;
186 actualHostnameCStrLength[i] += 1;
188 totalLength += actualHostnameCStrLength[i];
192 char hostnames[totalLength];
193 MPI_Gatherv(myhostname, actualHostnameCStrLength[mpi_rank], MPI_CHAR, hostnames, actualHostnameCStrLength.data(),
194 recvDisplacements.data(), MPI_CHAR, ROOT_RANK, MPI_COMM_WORLD);
196 if (mpi_rank == ROOT_RANK) {
198 int next_host_id = 1;
200 int rank_with_matching_hostname;
201 char *checked_rank_hostname, *known_rank_hostname;
203 for (
int rank_being_check = 0; rank_being_check < mpi_num_procs; ++rank_being_check) {
205 rank_with_matching_hostname = -1;
207 checked_rank_hostname = &hostnames[recvDisplacements[rank_being_check]];
210 for (
int known_rank = 0; known_rank < rank_being_check; ++known_rank) {
212 known_rank_hostname = &hostnames[recvDisplacements[known_rank]];
214 if (std::strcmp(known_rank_hostname, checked_rank_hostname) == 0) {
215 rank_with_matching_hostname = known_rank;
220 if (rank_with_matching_hostname < 0) {
222 host_array[rank_being_check] = next_host_id++;
225 host_array[rank_being_check] = host_array[rank_with_matching_hostname];
230 MPI_Bcast(host_array, mpi_num_procs, MPI_INT, 0, MPI_COMM_WORLD);
243 bool mpi_send_text_file(
const char *fileName,
const int mpi_rank,
const int destRank) {
245 std::vector<char> buf(bufSize);
248 int totalNumTransferred = 0;
250 FILE *file = fopen(fileName,
"r");
256 MPI_Send(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
261 MPI_Send(&bufSize, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
264 MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
265 if (code != bufSize) {
270 int continueCode = 1;
272 while (fgets(buf.data(), bufSize, file) != NULL) {
274 MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
277 MPI_Send(buf.data(), bufSize, MPI_CHAR, destRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD);
280 MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
289 MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
291 MPI_Recv(&code, 1, MPI_INT, destRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
308 bool mpi_recv_text_file(
const char *fileName,
const int mpi_rank,
const int srcRank) {
309 int bufSize, writeCode;
311 MPI_Recv(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
320 FILE *file = fopen(fileName,
"w");
325 MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
330 MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
333 int totalNumTransferred = 0;
334 std::vector<char> buf(bufSize);
340 MPI_Recv(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
341 if (continueCode <= 0)
344 MPI_Recv(buf.data(), bufSize, MPI_CHAR, srcRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
346 writeCode = fputs(buf.data(), file);
347 MPI_Send(&writeCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
356 MPI_Send(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
410 bool distribute_subdivided_hydrofabric_files(
const std::string &baseCatchmentFile,
const std::string &baseNexusFile,
411 const int sendingRank,
const int mpi_rank,
const int mpi_num_procs,
412 const int *hostIdForRank,
bool syncReturnStatus,
bool blockAll)
418 if (mpi_rank == sendingRank || hostIdForRank[mpi_rank] != hostIdForRank[sendingRank]) {
420 if (mpi_rank == sendingRank) {
422 for (
int otherRank = 0; otherRank < mpi_num_procs; ++otherRank) {
424 if (hostIdForRank[otherRank] != hostIdForRank[mpi_rank]) {
426 std::string catFileToSend = baseCatchmentFile +
"." + std::to_string(otherRank);
427 std::string nexFileToSend = baseNexusFile +
"." + std::to_string(otherRank);
429 isGood = isGood && mpi_send_text_file(catFileToSend.c_str(), mpi_rank, otherRank);
430 isGood = isGood && mpi_send_text_file(nexFileToSend.c_str(), mpi_rank, otherRank);
436 std::string catFileToReceive = baseCatchmentFile +
"." + std::to_string(mpi_rank);
437 std::string nexFileToReceive = baseNexusFile +
"." + std::to_string(mpi_rank);
439 isGood = mpi_recv_text_file(catFileToReceive.c_str(), mpi_rank, sendingRank);
440 isGood = isGood && mpi_recv_text_file(nexFileToReceive.c_str(), mpi_rank, sendingRank);
445 if (blockAll) { MPI_Barrier(MPI_COMM_WORLD); }
448 if (syncReturnStatus) {
449 return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs,
"distributing subdivided hydrofabric files");
472 bool subdivide_hydrofabric(
int mpi_rank,
int mpi_num_procs,
const std::string &catchmentDataFile,
473 const std::string &nexusDataFile,
const std::string &partitionConfigFile)
479 #if !NGEN_WITH_PYTHON
482 std::cerr <<
"Driver is unable to perform required hydrofabric subdividing when Python integration is not active." << std::endl;
486 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs,
"initializing hydrofabric subdivider")) {
491 std::unique_ptr<utils::ngenPy::HydrofabricSubsetter> subdivider;
495 subdivider = std::make_unique<utils::ngenPy::HydrofabricSubsetter>(catchmentDataFile, nexusDataFile,
496 partitionConfigFile);
498 catch (
const std::exception &e) {
499 std::cerr << e.what() << std::endl;
505 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs,
"initializing hydrofabric subdivider")) {
512 isGood = subdivider->execSubdivision();
514 catch (
const std::exception &e) {
515 std::cerr << e.what() << std::endl;
521 if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs,
"executing hydrofabric subdivision")) {
527 std::vector<int> hostIdForRank(mpi_num_procs);
528 get_hosts_array(mpi_rank, mpi_num_procs, hostIdForRank.data());
531 return distribute_subdivided_hydrofabric_files(catchmentDataFile, nexusDataFile, 0, mpi_rank,
532 mpi_num_procs, hostIdForRank.data(),
true,
true);
static bool file_is_readable(std::string path)
Check whether the path provided points to an existing, readable file.
Definition FileChecker.h:90