1#ifndef NGEN_PARALLEL_UTILS_H 
    2#define NGEN_PARALLEL_UTILS_H 
    7#ifndef MPI_HF_SUB_CODE_GOOD 
    8#define MPI_HF_SUB_CODE_GOOD 0 
   11#ifndef MPI_HF_SUB_CODE_BAD 
   12#define MPI_HF_SUB_CODE_BAD 1 
   15#ifndef NGEN_MPI_DATA_TAG 
   16#define NGEN_MPI_DATA_TAG 100 
   19#ifndef NGEN_MPI_PROTOCOL_TAG 
   20#define NGEN_MPI_PROTOCOL_TAG 101 
   29#include "python/HydrofabricSubsetter.hpp" 
   53    bool mpiSyncStatusAnd(
bool status, 
int mpi_rank, 
int mpi_num_procs, 
const std::string &taskDesc) {
 
   57        unsigned short codeBuffer;
 
   58        bool printMessage = !taskDesc.empty();
 
   61            codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
 
   62            MPI_Send(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, 0, MPI_COMM_WORLD);
 
   66            for (
int i = 1; i < mpi_num_procs; ++i) {
 
   67                MPI_Recv(&codeBuffer, 1, MPI_UNSIGNED_SHORT, i, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
   69                if (codeBuffer != MPI_HF_SUB_CODE_GOOD) {
 
   71                        std::cout << 
"Rank " << i << 
" not successful/ready after " << taskDesc << std::endl;
 
   77            codeBuffer = status ? MPI_HF_SUB_CODE_GOOD : MPI_HF_SUB_CODE_BAD;
 
   81        MPI_Bcast(&codeBuffer, 1, MPI_UNSIGNED_SHORT, 0, MPI_COMM_WORLD);
 
   82        return codeBuffer == MPI_HF_SUB_CODE_GOOD;
 
   94    bool mpiSyncStatusAnd(
bool status, 
int mpi_rank, 
int mpi_num_procs) {
 
   95        return mpiSyncStatusAnd(status, mpi_rank, mpi_num_procs, 
"");
 
  116    bool is_hydrofabric_subdivided(
int mpi_rank, 
int mpi_num_procs, 
bool printMsg) {
 
  117        std::string name = catchmentDataFile + 
"." + std::to_string(mpi_rank);
 
  122        if (mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs)) {
 
  123            if (printMsg) { std::cout << 
"Process " << mpi_rank << 
": Hydrofabric already subdivided in " << mpi_num_procs << 
" files." << std::endl; }
 
  127            if (printMsg) { std::cout << 
"Process " << mpi_rank << 
": Hydrofabric has not yet been subdivided." << std::endl; }
 
  141    bool is_hydrofabric_subdivided(
int mpi_rank, 
int mpi_num_procs) {
 
  142        return is_hydrofabric_subdivided(mpi_rank, mpi_num_procs, 
false);
 
  160    void get_hosts_array(
int mpi_rank, 
int mpi_num_procs, 
int *host_array) {
 
  161        const int ROOT_RANK = 0;
 
  163        std::vector<int> actualHostnameCStrLength(mpi_num_procs);
 
  165        for (
int i = 0; i < mpi_num_procs; ++i) {
 
  166            actualHostnameCStrLength[i] = -1;
 
  170        char myhostname[256];
 
  171        gethostname(myhostname, 256);
 
  174        actualHostnameCStrLength[mpi_rank] = std::strlen(myhostname);
 
  177        MPI_Gather(&actualHostnameCStrLength[mpi_rank], 1, MPI_INT, actualHostnameCStrLength.data(), 1, MPI_INT, ROOT_RANK,
 
  180        std::vector<int> recvDisplacements(mpi_num_procs);
 
  182        for (
int i = 0; i < mpi_num_procs; ++i) {
 
  184            recvDisplacements[i] = totalLength;
 
  186            actualHostnameCStrLength[i] += 1;
 
  188            totalLength += actualHostnameCStrLength[i];
 
  192        char hostnames[totalLength];
 
  193        MPI_Gatherv(myhostname, actualHostnameCStrLength[mpi_rank], MPI_CHAR, hostnames, actualHostnameCStrLength.data(),
 
  194                    recvDisplacements.data(), MPI_CHAR, ROOT_RANK, MPI_COMM_WORLD);
 
  196        if (mpi_rank == ROOT_RANK) {
 
  198            int next_host_id = 1;
 
  200            int rank_with_matching_hostname;
 
  201            char *checked_rank_hostname, *known_rank_hostname;
 
  203            for (
int rank_being_check = 0; rank_being_check < mpi_num_procs; ++rank_being_check) {
 
  205                rank_with_matching_hostname = -1;
 
  207                checked_rank_hostname = &hostnames[recvDisplacements[rank_being_check]];
 
  210                for (
int known_rank = 0; known_rank < rank_being_check; ++known_rank) {
 
  212                    known_rank_hostname = &hostnames[recvDisplacements[known_rank]];
 
  214                    if (std::strcmp(known_rank_hostname, checked_rank_hostname) == 0) {
 
  215                        rank_with_matching_hostname = known_rank;
 
  220                if (rank_with_matching_hostname < 0) {
 
  222                    host_array[rank_being_check] = next_host_id++;
 
  225                    host_array[rank_being_check] = host_array[rank_with_matching_hostname];
 
  230        MPI_Bcast(host_array, mpi_num_procs, MPI_INT, 0, MPI_COMM_WORLD);
 
  243    bool mpi_send_text_file(
const char *fileName, 
const int mpi_rank, 
const int destRank) {
 
  245        std::vector<char> buf(bufSize);
 
  248        int totalNumTransferred = 0;
 
  250        FILE *file = fopen(fileName, 
"r");
 
  256            MPI_Send(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  261        MPI_Send(&bufSize, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  264        MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  265        if (code != bufSize) {
 
  270        int continueCode = 1;
 
  272        while (fgets(buf.data(), bufSize, file) != NULL) {
 
  274            MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  277            MPI_Send(buf.data(), bufSize, MPI_CHAR, destRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD);
 
  280            MPI_Recv(&code, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  289        MPI_Send(&continueCode, 1, MPI_INT, destRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  291        MPI_Recv(&code, 1, MPI_INT, destRank, 0, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  308    bool mpi_recv_text_file(
const char *fileName, 
const int mpi_rank, 
const int srcRank) {
 
  309        int bufSize, writeCode;
 
  311        MPI_Recv(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  320        FILE *file = fopen(fileName, 
"w");
 
  325            MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  330        MPI_Send(&bufSize, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  333        int totalNumTransferred = 0;
 
  334        std::vector<char> buf(bufSize);
 
  340            MPI_Recv(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  341            if (continueCode <= 0)
 
  344            MPI_Recv(buf.data(), bufSize, MPI_CHAR, srcRank, NGEN_MPI_DATA_TAG, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
 
  346            writeCode = fputs(buf.data(), file);
 
  347            MPI_Send(&writeCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  356        MPI_Send(&continueCode, 1, MPI_INT, srcRank, NGEN_MPI_PROTOCOL_TAG, MPI_COMM_WORLD);
 
  410    bool distribute_subdivided_hydrofabric_files(
const std::string &baseCatchmentFile, 
const std::string &baseNexusFile,
 
  411                                                 const int sendingRank, 
const int mpi_rank, 
const int mpi_num_procs,
 
  412                                                 const int *hostIdForRank, 
bool syncReturnStatus, 
bool blockAll)
 
  418        if (mpi_rank == sendingRank || hostIdForRank[mpi_rank] != hostIdForRank[sendingRank]) {
 
  420            if (mpi_rank == sendingRank) {
 
  422                for (
int otherRank = 0; otherRank < mpi_num_procs; ++otherRank) {
 
  424                    if (hostIdForRank[otherRank] != hostIdForRank[mpi_rank]) {
 
  426                        std::string catFileToSend = baseCatchmentFile + 
"." + std::to_string(otherRank);
 
  427                        std::string nexFileToSend = baseNexusFile + 
"." + std::to_string(otherRank);
 
  429                        isGood = isGood && mpi_send_text_file(catFileToSend.c_str(), mpi_rank, otherRank);
 
  430                        isGood = isGood && mpi_send_text_file(nexFileToSend.c_str(), mpi_rank, otherRank);
 
  436                std::string catFileToReceive = baseCatchmentFile + 
"." + std::to_string(mpi_rank);
 
  437                std::string nexFileToReceive = baseNexusFile + 
"." + std::to_string(mpi_rank);
 
  439                isGood = mpi_recv_text_file(catFileToReceive.c_str(), mpi_rank, sendingRank);
 
  440                isGood = isGood && mpi_recv_text_file(nexFileToReceive.c_str(), mpi_rank, sendingRank);
 
  445        if (blockAll) { MPI_Barrier(MPI_COMM_WORLD); }
 
  448        if (syncReturnStatus) {
 
  449            return mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, 
"distributing subdivided hydrofabric files");
 
  472    bool subdivide_hydrofabric(
int mpi_rank, 
int mpi_num_procs, 
const std::string &catchmentDataFile,
 
  473                               const std::string &nexusDataFile, 
const std::string &partitionConfigFile)
 
  479        #if !NGEN_WITH_PYTHON 
  482        std::cerr << 
"Driver is unable to perform required hydrofabric subdividing when Python integration is not active." << std::endl;
 
  486        if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, 
"initializing hydrofabric subdivider")) {
 
  491        std::unique_ptr<utils::ngenPy::HydrofabricSubsetter> subdivider;
 
  495                subdivider = std::make_unique<utils::ngenPy::HydrofabricSubsetter>(catchmentDataFile, nexusDataFile,
 
  496                                                                                   partitionConfigFile);
 
  498            catch (
const std::exception &e) {
 
  499                std::cerr << e.what() << std::endl;
 
  505        if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, 
"initializing hydrofabric subdivider")) {
 
  512                isGood = subdivider->execSubdivision();
 
  514            catch (
const std::exception &e) {
 
  515                std::cerr << e.what() << std::endl;
 
  521        if (!mpiSyncStatusAnd(isGood, mpi_rank, mpi_num_procs, 
"executing hydrofabric subdivision")) {
 
  527            std::vector<int> hostIdForRank(mpi_num_procs);
 
  528            get_hosts_array(mpi_rank, mpi_num_procs, hostIdForRank.data());
 
  531            return distribute_subdivided_hydrofabric_files(catchmentDataFile, nexusDataFile, 0, mpi_rank,
 
  532                                                           mpi_num_procs, hostIdForRank.data(), 
true, 
true);
 
static bool file_is_readable(std::string path)
Check whether the path provided points to an existing, readable file.
Definition FileChecker.h:90