thread_split.cpp

*/
#include <mpi.h>
#include <stdlib.h>
#include <string.h>
#include <stdio.h>
#include <vector>
#include <string>
#include <utility>
#include <assert.h>
#include <sys/time.h>
// Choose threading model:
enum { THR_OPENMP = 1, THR_POSIX = 2, THR_NONE = 0 } threading = THR_POSIX;
template <typename T> MPI_Datatype get_mpi_type();
template <> MPI_Datatype get_mpi_type<char>() { return MPI_CHAR; }
template <> MPI_Datatype get_mpi_type<int>() { return MPI_INT; }
template <> MPI_Datatype get_mpi_type<float>() { return MPI_FLOAT; }
template <> MPI_Datatype get_mpi_type<double>() { return MPI_DOUBLE; }
int main_threaded(int argc, char **argv);
template <typename T>
bool work_portion_2(T *in, size_t count, size_t niter, int rank, int nranks)
{
memset(in, 0, sizeof(T) * count);
}
template <typename T>
bool work_portion_1(T *in, size_t count, size_t niter, int rank, int nranks);
template <> bool work_portion_1<char>(char *in, size_t count, size_t niter, int rank, int nranks) { return true; }
template <> bool work_portion_1<int>(int *in, size_t count, size_t niter, int rank, int nranks)
{
for (size_t i = 0; i < count; i++) {
in[i] = (int)(niter * (rank+1) * i);
}
return true;
}
template <typename T>
bool work_portion_3(T *in, size_t count, size_t niter, int rank, int nranks);
template <> bool work_portion_3<char>(char *out, size_t count, size_t niter, int rank, int nranks) { return true; }
template <> bool work_portion_3<int>(int *out, size_t count, size_t niter, int rank, int nranks)
{
bool result = true;
for (size_t i = 0; i < count; i++) {
result = (result && (out[i] == (int)(niter * nranks*(nranks+1)*i/2)));
}
return result;
}
int main_threaded_openmp(int argc, char **argv);
int main_threaded_posix(int argc, char **argv);
int main(int argc, char **argv)
{
if (argc > 1) {
if (!strcasecmp(argv[1], "openmp")) threading = THR_OPENMP;
if (!strcasecmp(argv[1], "posix")) threading = THR_POSIX;
if (!strcasecmp(argv[1], "none")) threading = THR_NONE;
}
if (threading == THR_OPENMP) {
main_threaded_openmp(argc, argv);
return 0;
} else if (threading == THR_POSIX) {
main_threaded_posix(argc, argv);
return 0;
}
printf("No threading\n");
int rank, nranks;
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
typedef int type;
size_t count = 1024*1024;
int niter = 100;
type *in = (type *)malloc(count * sizeof(type));
type *out = (type *)malloc(count * sizeof(type));
for (int j = 1; j < niter+1; j++) {
work_portion_1<type>(in, count, j, rank, nranks);
work_portion_2<type>(out, count, j, rank, nranks);
MPI_Allreduce(in, out, count, get_mpi_type<type>(), MPI_SUM, MPI_COMM_WORLD);
assert(work_portion_3<type>(out, count, j, rank, nranks));
MPI_Barrier(MPI_COMM_WORLD);
}
MPI_Finalize();
return 0;
}
#include <omp.h>
void omp_aware_barrier(MPI_Comm &comm, int thread)
{
assert(thread != 0 || comm != MPI_COMM_NULL);
#pragma omp barrier
if (thread == 0)
MPI_Barrier(comm);
#pragma omp barrier
}
struct offset_and_count { size_t offset; size_t count; };
int main_threaded_openmp(int argc, char **argv)
{
printf("OpenMP\n");
int rank, nranks, provided = 0;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
assert(provided == MPI_THREAD_MULTIPLE);
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
MPI_Comm_size(MPI_COMM_WORLD, &nranks);
typedef int type;
size_t count = 1024 * 1024;
int niter = 100;
type *in = (type *)malloc(count * sizeof(type));
type *out = (type *)malloc(count * sizeof(type));
// Divide workload for multiple threads.
// Save (offset, count) pair for each piece
size_t nthreads = 8;
if (argc > 2) {
nthreads = atoi(argv[2]);
}
size_t nparts = (count > nthreads) ? nthreads : count;
// Use nparts, it might be less than nthreads
size_t base = count / nparts;
size_t rest = count % nparts;
size_t base_off = 0;
std::vector<offset_and_count> offs_and_counts(nparts);
for (size_t i = 0; i < nparts; i++) {
offs_and_counts[i].offset = base_off; // off
base_off += (offs_and_counts[i].count = base + (i<rest?1:0)); // size
}
// Duplicate a communicator for each thread
std::vector<MPI_Comm> comms(nparts, MPI_COMM_NULL);
for (size_t i = 0; i < nparts; i++) {
MPI_Comm &new_comm = comms[i];
MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);
}
// Go into parallel region, use precalculated (offset, count) pairs to separate workload
// use separated communicators from comms[]
// use omp_aware_barrier instead of normal MPI_COMM_WORLD barrier
#pragma omp parallel num_threads(nparts)
{
int thread = omp_get_thread_num();
offset_and_count &offs = offs_and_counts[thread];
MPI_Comm &comm = comms[thread];
for (int j = 1; j < niter+1; j++) {
if (!offs.count) { omp_aware_barrier(comm, thread); continue; }
work_portion_1<type>(in + offs.offset, offs.count, j, rank, nranks);
work_portion_2<type>(out + offs.offset, offs.count, j, rank, nranks);
MPI_Allreduce(in + offs.offset, out + offs.offset, offs.count, get_mpi_type<type>(), MPI_SUM, comm);
assert(work_portion_3<type>(out + offs.offset, offs.count, j, rank, nranks));
omp_aware_barrier(comm, thread);
}
}
MPI_Finalize();
return 0;
}
#include <pthread.h>
#include <sys/time.h>
#include <sched.h>
void pthreads_aware_barrier(MPI_Comm &comm, pthread_barrier_t &barrier, int thread)
{
assert(thread != 0 || comm != MPI_COMM_NULL);
pthread_barrier_wait(&barrier);
if (thread == 0)
MPI_Barrier(comm);
pthread_barrier_wait(&barrier);
}
struct global_data {
typedef int type;
type *in, *out;
int niter;
size_t count;
int rank, nranks;
pthread_barrier_t barrier;
};
struct thread_local_data {
size_t offset;
size_t count;
int thread_id;
MPI_Comm *comm;
global_data *global;
};
void *worker(void *arg_ptr)
{
thread_local_data &thr_local = *((thread_local_data *)arg_ptr);
global_data &global = *(thr_local.global);
global_data::type *in = global.in;
global_data::type *out = global.out;
int &niter = global.niter;
int &rank = global.rank;
int &nranks = global.nranks;
pthread_barrier_t &barrier = global.barrier;
size_t &offset = thr_local.offset;
size_t &count = thr_local.count;
int &thread = thr_local.thread_id;
MPI_Comm &comm = *(thr_local.comm);
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(thread, &mask);
int res = sched_setaffinity(0, sizeof(mask), &mask);
if (res == -1)
printf("failed set_thread_affinity()\n");
for (int j = 1; j < global.niter+1; j++) {
if (!thr_local.count) { pthreads_aware_barrier(comm, barrier, thread); continue; }
work_portion_1<global_data::type>(in + offset, count, j, rank, nranks);
work_portion_2<global_data::type>(out + offset, count, j, rank, nranks);
MPI_Allreduce(in + offset, out + offset, count, get_mpi_type<global_data::type>(), MPI_SUM, comm);
assert(work_portion_3<global_data::type>(out + offset, count, j, rank, nranks));
pthreads_aware_barrier(comm, barrier, thread);
}
}
int main_threaded_posix(int argc, char **argv)
{
printf("POSIX\n");
int provided = 0;
global_data global;
MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &provided);
assert(provided == MPI_THREAD_MULTIPLE);
MPI_Comm_rank(MPI_COMM_WORLD, &global.rank);
MPI_Comm_size(MPI_COMM_WORLD, &global.nranks);
global.count = 1024 * 1024;
global.niter = 100;
global.in = (global_data::type *)malloc(global.count * sizeof(global_data::type));
global.out = (global_data::type *)malloc(global.count * sizeof(global_data::type));
// Divide workload for multiple threads.
// Save (offset, count) pair for each piece
size_t nthreads = 8;
if (argc > 2) {
nthreads = atoi(argv[2]);
}
size_t nparts = ((global.count > nthreads) ? nthreads : global.count);
pthread_barrier_init(&global.barrier, NULL, nparts);
// Use nparts, it might be less than nthreads
size_t base = global.count / nparts;
size_t rest = global.count % nparts;
size_t base_off = 0;
std::vector<thread_local_data> thr_local(nparts);
for (size_t i = 0; i < nparts; i++) {
thr_local[i].offset = base_off; // off
base_off += (thr_local[i].count = base + (i<rest?1:0)); // size
thr_local[i].thread_id = i;
}
// Duplicate a communicator for each thread
std::vector<MPI_Comm> comms(nparts);
MPI_Info info;
MPI_Info_create(&info);
char s[16];
for (size_t i = 0; i < nparts; i++) {
MPI_Comm &new_comm = comms[i];
MPI_Comm_dup(MPI_COMM_WORLD, &new_comm);
snprintf(s, sizeof s, "%d", i);
MPI_Info_set(info, "thread_id", s);
MPI_Comm_set_info(new_comm, info);
thr_local[i].comm = &new_comm;
thr_local[i].global = &global;
}
// Start parallel POSIX threads
std::vector<pthread_t> pids(nparts);
for (size_t i = 0; i < nparts; i++) {
pthread_create(&pids[i], NULL, worker, (void *)&thr_local[i]);
}
// Wait for all POSIX threads to complete
for (size_t i = 0; i < nparts; i++) {
pthread_join(pids[i], NULL);
}
MPI_Info_free(&info);
MPI_Finalize();
return 0;
}

See Also

Code Change Guide

For more complete information about compiler optimizations, see our Optimization Notice.
Select sticky button color: 
Orange (only for download buttons)