• 2019 Update 7
  • 03/31/2020
Contents

thread_split.cpp

Intel® MPI Library Developer Guide for Linux* OS
*/ #include <mpi.h> #include <stdlib.h> #include <string.h> #include <stdio.h> #include <vector> #include <string> #include <utility> #include <assert.h> #include <sys/time.h> // Choose threading model: enum { THR_OPENMP = 1, THR_POSIX = 2, THR_NONE = 0 } threading = THR_POSIX; template <typename T> MPI_Datatype get_mpi_type(); template <> MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; } template <> MPI_Datatype get_mpi_type<int>() { return MPI_INT; } template <> MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; } template <> MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; } int main_threaded(int argc, char **argv); template <typename T> bool work_portion_2(T *in, size_t count, size_t niter, int rank, int nranks) { memset(in, 0, sizeof(T) * count); } template <typename T> bool work_portion_1(T *in, size_t count, size_t niter, int rank, int nranks); template <> bool work_portion_1<char>(char *in, size_t count, size_t niter, int rank, int nranks) { return true; } template <> bool work_portion_1<int>(int *in, size_t count, size_t niter, int rank, int nranks) { for (size_t i = 0; i < count; i++) { in[i] = (int)(niter * (rank+1) * i); } return true; } template <typename T> bool work_portion_3(T *in, size_t count, size_t niter, int rank, int nranks); template <> bool work_portion_3<char>(char *out, size_t count, size_t niter, int rank, int nranks) { return true; } template <> bool work_portion_3<int>(int *out, size_t count, size_t niter, int rank, int nranks) { bool result = true; for (size_t i = 0; i < count; i++) { result = (result && (out[i] == (int)(niter * nranks*(nranks+1)*i/2))); } return result; } int main_threaded_openmp(int argc, char **argv); int main_threaded_posix(int argc, char **argv); int main(int argc, char **argv) { if (argc > 1) { if (!strcasecmp(argv[1], "openmp")) threading = THR_OPENMP; if (!strcasecmp(argv[1], "posix")) threading = THR_POSIX; if (!strcasecmp(argv[1], "none")) threading = THR_NONE; } if (threading == THR_OPENMP) { main_threaded_openmp(argc, argv); return 0; } else if (threading == THR_POSIX) { main_threaded_posix(argc, argv); return 0; } printf("No threading\n"); int rank, nranks; MPI_Init(&argc, &argv); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nranks); typedef int type; size_t count = 1024*1024; int niter = 100; type *in = (type *)malloc(count * sizeof(type)); type *out = (type *)malloc(count * sizeof(type)); for (int j = 1; j < niter+1; j++) { work_portion_1<type>(in, count, j, rank, nranks); work_portion_2<type>(out, count, j, rank, nranks); MPI_Allreduce(in, out, count, get_mpi_type<type>(), MPI_SUM, MPI_COMM_WORLD); assert(work_portion_3<type>(out, count, j, rank, nranks)); MPI_Barrier(MPI_COMM_WORLD); } MPI_Finalize(); return 0; } #include <omp.h> void omp_aware_barrier(MPI_Comm &comm, int thread) { assert(thread != 0 || comm != MPI_COMM_NULL); #pragma omp barrier if (thread == 0) MPI_Barrier(comm); #pragma omp barrier } struct offset_and_count { size_t offset; size_t count; }; int main_threaded_openmp(int argc, char **argv) { printf("OpenMP\n"); int rank, nranks, provided = 0; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); assert(provided == MPI_THREAD_MULTIPLE); MPI_Comm_rank(MPI_COMM_WORLD, &rank); MPI_Comm_size(MPI_COMM_WORLD, &nranks); typedef int type; size_t count = 1024 * 1024; int niter = 100; type *in = (type *)malloc(count * sizeof(type)); type *out = (type *)malloc(count * sizeof(type)); // Divide workload for multiple threads. // Save (offset, count) pair for each piece size_t nthreads = 8; if (argc > 2) { nthreads = atoi(argv[2]); } size_t nparts = (count > nthreads) ? nthreads : count; // Use nparts, it might be less than nthreads size_t base = count / nparts; size_t rest = count % nparts; size_t base_off = 0; std::vector<offset_and_count> offs_and_counts(nparts); for (size_t i = 0; i < nparts; i++) { offs_and_counts[i].offset = base_off; // off base_off += (offs_and_counts[i].count = base + (i<rest?1:0)); // size } // Duplicate a communicator for each thread std::vector<MPI_Comm> comms(nparts, MPI_COMM_NULL); for (size_t i = 0; i < nparts; i++) { MPI_Comm &new_comm = comms[i]; MPI_Comm_dup(MPI_COMM_WORLD, &new_comm); } // Go into parallel region, use precalculated (offset, count) pairs to separate workload // use separated communicators from comms[] // use omp_aware_barrier instead of normal MPI_COMM_WORLD barrier #pragma omp parallel num_threads(nparts) { int thread = omp_get_thread_num(); offset_and_count &offs = offs_and_counts[thread]; MPI_Comm &comm = comms[thread]; for (int j = 1; j < niter+1; j++) { if (!offs.count) { omp_aware_barrier(comm, thread); continue; } work_portion_1<type>(in + offs.offset, offs.count, j, rank, nranks); work_portion_2<type>(out + offs.offset, offs.count, j, rank, nranks); MPI_Allreduce(in + offs.offset, out + offs.offset, offs.count, get_mpi_type<type>(), MPI_SUM, comm); assert(work_portion_3<type>(out + offs.offset, offs.count, j, rank, nranks)); omp_aware_barrier(comm, thread); } } MPI_Finalize(); return 0; } #include <pthread.h> #include <sys/time.h> #include <sched.h> void pthreads_aware_barrier(MPI_Comm &comm, pthread_barrier_t &barrier, int thread) { assert(thread != 0 || comm != MPI_COMM_NULL); pthread_barrier_wait(&barrier); if (thread == 0) MPI_Barrier(comm); pthread_barrier_wait(&barrier); } struct global_data { typedef int type; type *in, *out; int niter; size_t count; int rank, nranks; pthread_barrier_t barrier; }; struct thread_local_data { size_t offset; size_t count; int thread_id; MPI_Comm *comm; global_data *global; }; void *worker(void *arg_ptr) { thread_local_data &thr_local = *((thread_local_data *)arg_ptr); global_data &global = *(thr_local.global); global_data::type *in = global.in; global_data::type *out = global.out; int &niter = global.niter; int &rank = global.rank; int &nranks = global.nranks; pthread_barrier_t &barrier = global.barrier; size_t &offset = thr_local.offset; size_t &count = thr_local.count; int &thread = thr_local.thread_id; MPI_Comm &comm = *(thr_local.comm); cpu_set_t mask; CPU_ZERO(&mask); CPU_SET(thread, &mask); int res = sched_setaffinity(0, sizeof(mask), &mask); if (res == -1) printf("failed set_thread_affinity()\n"); for (int j = 1; j < global.niter+1; j++) { if (!thr_local.count) { pthreads_aware_barrier(comm, barrier, thread); continue; } work_portion_1<global_data::type>(in + offset, count, j, rank, nranks); work_portion_2<global_data::type>(out + offset, count, j, rank, nranks); MPI_Allreduce(in + offset, out + offset, count, get_mpi_type<global_data::type>(), MPI_SUM, comm); assert(work_portion_3<global_data::type>(out + offset, count, j, rank, nranks)); pthreads_aware_barrier(comm, barrier, thread); } } int main_threaded_posix(int argc, char **argv) { printf("POSIX\n"); int provided = 0; global_data global; MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided); assert(provided == MPI_THREAD_MULTIPLE); MPI_Comm_rank(MPI_COMM_WORLD, &global.rank); MPI_Comm_size(MPI_COMM_WORLD, &global.nranks); global.count = 1024 * 1024; global.niter = 100; global.in = (global_data::type *)malloc(global.count * sizeof(global_data::type)); global.out = (global_data::type *)malloc(global.count * sizeof(global_data::type)); // Divide workload for multiple threads. // Save (offset, count) pair for each piece size_t nthreads = 8; if (argc > 2) { nthreads = atoi(argv[2]); } size_t nparts = ((global.count > nthreads) ? nthreads : global.count); pthread_barrier_init(&global.barrier, NULL, nparts); // Use nparts, it might be less than nthreads size_t base = global.count / nparts; size_t rest = global.count % nparts; size_t base_off = 0; std::vector<thread_local_data> thr_local(nparts); for (size_t i = 0; i < nparts; i++) { thr_local[i].offset = base_off; // off base_off += (thr_local[i].count = base + (i<rest?1:0)); // size thr_local[i].thread_id = i; } // Duplicate a communicator for each thread std::vector<MPI_Comm> comms(nparts); MPI_Info info; MPI_Info_create(&info); char s[16]; for (size_t i = 0; i < nparts; i++) { MPI_Comm &new_comm = comms[i]; MPI_Comm_dup(MPI_COMM_WORLD, &new_comm); snprintf(s, sizeof s, "%d", i); MPI_Info_set(info, "thread_id", s); MPI_Comm_set_info(new_comm, info); thr_local[i].comm = &new_comm; thr_local[i].global = &global; } // Start parallel POSIX threads std::vector<pthread_t> pids(nparts); for (size_t i = 0; i < nparts; i++) { pthread_create(&pids[i], NULL, worker, (void *)&thr_local[i]); } // Wait for all POSIX threads to complete for (size_t i = 0; i < nparts; i++) { pthread_join(pids[i], NULL); } MPI_Info_free(&info); MPI_Finalize(); return 0; }

Product and Performance Information

1

Intel's compilers may or may not optimize to the same degree for non-Intel microprocessors for optimizations that are not unique to Intel microprocessors. These optimizations include SSE2, SSE3, and SSSE3 instruction sets and other optimizations. Intel does not guarantee the availability, functionality, or effectiveness of any optimization on microprocessors not manufactured by Intel. Microprocessor-dependent optimizations in this product are intended for use with Intel microprocessors. Certain optimizations not specific to Intel microarchitecture are reserved for Intel microprocessors. Please refer to the applicable product User and Reference Guides for more information regarding the specific instruction sets covered by this notice.

Notice revision #20110804