# 案例研究： 面向神经细胞模拟优化代码

#### 包括

``````#include <cstring>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <getopt.h>
#include "util.hpp"``````

``````#include <cstring>
#include <cstdlib>
#include <ctime>
#include <cmath>
#include <omp.h>
#include <getopt.h>
#include "util.hpp"
#include <malloc.h>     //Useless
#include <mkl.h>        //Useless
#include <cilk/cilk.h> //Useless``````

#### RandomFloatPos

``````static float RandomFloatPos() {
// returns a random number between a given minimum and maximum
float random = ((float) rand()) / (float) RAND_MAX;
float a = 0;
float r = random;
return a + r;
}``````

#### 优化后的代码

``````static void RandomFloatPos(float input[3],unsigned sedin) {
// returns a random number between a given minimum and maximum
__assume_aligned((float*)input, 64);
unsigned int seed=sedin,i;
for(i=0;i<3;++i){
input[i]=(((float)rand_r(&seed))/(float)(RAND_MAX))-0.5;
}
}``````

#### 优化说明

for 循环生成 3D 坐标并将其保存在输入中。

#### getNorm

``````static float getNorm(float* currArray) {
// computes L2 norm of input array
int c;
float arraySum=0;
for (c=0; c<3; c++) {
arraySum += currArray[c]*currArray[c];
}
float res = sqrt(arraySum);
return res;
}``````

``````static float getNorm(float* currArray) {
// computes L2 norm of input array
float arraySum=0;
for (int c=0; c<3; ++c) {
arraySum += pow(currArray[c],2);
}
return sqrt(arraySum);
}``````

#### 优化说明

• 移除 res 变量。
• 添加 pow() 函数。
后续开展的广泛研究说明，pow() 并非显著改进。 另外，鉴于英特尔® 至强融核™ 协处理器能够在每个循环执行多次浮点运算，原始 currArray[c]*currArray[c] 在那些案例中的速度更快。 英特尔编译器对 pow() 函数进行了矢量化处理，但因此添加了更多代码，因此降低了运行时速度。

#### getL2Distance

``````static float getL2Distance(float pos1x, float pos1y, float pos1z, float
pos2x, float pos2y, float pos2z) {
// returns distance (L2 norm) between two positions in 3D
float distArray[3];
distArray[0] = pos2x-pos1x;
distArray[1] = pos2y-pos1y;
distArray[2] = pos2z-pos1z;
float l2Norm = getNorm(distArray);
return l2Norm;
}``````

``````static float getL2Distance(float* pos1, float* pos2) {
// returns distance (L2 norm) between two positions in 3D
float distArray[3] __attribute__((aligned(64)));
distArray[0] = pos2[0]-pos1[0];
distArray[1] = pos2[1]-pos1[1];
distArray[2] = pos2[2]-pos1[2];
return getNorm(distArray);
}``````

#### produceSubstances

``````static void produceSubstances(float**** Conc, float** posAll, int* typesAll, int L, int n){

produceSubstances_sw.reset();
// increases the concentration of substances at the location of the     cells
float sideLength = 1/(float)L; // length of a side of a diffusion voxel
int c, i1, i2, i3;
for (c=0; c< n; c++) {
i1 = std::min((int)floor(posAll[c][0]/sideLength),(L-1));
i2 = std::min((int)floor(posAll[c][1]/sideLength),(L-1));
i3 = std::min((int)floor(posAll[c][2]/sideLength),(L-1));
if (typesAll[c]==1) {
Conc[0][i1][i2][i3]+=0.1;
if (Conc[0][i1][i2][i3]>1) {
Conc[0][i1][i2][i3]=1;
}
} else {
Conc[1][i1][i2][i3]+=0.1;
if (Conc[1][i1][i2][i3]>1) {
Conc[1][i1][i2][i3]=1;
}
}
}
produceSubstances_sw.mark();
}``````

``````static void produceSubstances(int L, float Conc[2][L][L][L], float posAll[][3], int* typesAll, int n) {

produceSubstances_sw.reset();
// increases the concentration of substances at the location of the  cells

const int auxL=L;
--L;
int c,i[3] __attribute__((aligned(32))); //i array aligned

#pragma omp parallel for schedule(static) private(i,c)
for (c=0; c< n; ++c) {
__assume_aligned((int*)i, 32);
__assume_aligned((float*)posAll, 64);
i[0] = std::min((int)floor(posAll[c][0]*auxL),L);
i[1] = std::min((int)floor(posAll[c][1]*auxL),L);
i[2] = std::min((int)floor(posAll[c][2]*auxL),L);

if (typesAll[c]==1) {
(Conc[0][i[0]][i[1]][i[2]]>0.9)?
Conc[0][i[0]][i[1]][i[2]]=1 :
Conc[0][i[0]][i[1]][i[2]]+=0.1;
} else {
(Conc[1][i[0]][i[1]][i[2]]>0.9)?
Conc[1][i[0]][i[1]][i[2]]=1 :
Conc[1][i[0]][i[1]][i[2]]+=0.1;
}
}
produceSubstances_sw.mark();
}``````

#### 优化说明

L 变量的使用在优化时进行了更改。 原始代码 3 次使用运算 L-1，从而使用了 3 次额外运算。 在优化版本中，我们只需简单地递减之前的 L 变量。 auxL 常量（用于优化，因为只读变量比读/写变量快）保存 L 的原值，在该函数执行期间保持不变。

#### runDiffusionStep

``````static void runDiffusionStep(float**** Conc, int L, float D) {
runDiffusionStep_sw.reset();
// computes the changes in substance concentrations due to diffusion

int i1,i2,i3, subInd;
float tempConc[2][L][L][L];
for (i1 = 0; i1 < L; i1++) {
for (i2 = 0; i2 < L; i2++) {
for (i3 = 0; i3 < L; i3++) {
tempConc[0][i1][i2][i3] = Conc[0][i1][i2][i3];
tempConc[1][i1][i2][i3] = Conc[1][i1][i2][i3];
}
}
}

int xUp, xDown, yUp, yDown, zUp, zDown;

for (i1 = 0; i1 < L; i1++) {
for (i2 = 0; i2 < L; i2++) {
for (i3 = 0; i3 < L; i3++) {
xUp = (i1+1);
xDown = (i1-1);
yUp = (i2+1);
yDown = (i2-1);
zUp = (i3+1);
zDown = (i3-1);
for (subInd = 0; subInd < 2; subInd++) {
if (xUp<L) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][xUp][i2][i3]-tempConc[subInd][i1][i2][i3])*D/6;
}
if (xDown>=0) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][xDown][i2][i3]-tempConc[subInd][i1][i2][i3])*D/6;
}
if (yUp<L) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][i1][yUp][i3]-tempConc[subInd][i1][i2][i3])*D/6;
}
if (yDown>=0) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][i1][yDown][i3]-tempConc[subInd][i1][i2][i3])*D/6;
}
if (zUp<L) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][i1][i2][zUp]-tempConc[subInd][i1][i2][i3])*D/6;
}
if (zDown>=0) {
Conc[subInd][i1][i2][i3] += (tempConc[subInd][i1][i2][zDown]-tempConc[subInd][i1][i2][i3])*D/6;
}
}
}
}
}
runDiffusionStep_sw.mark();
}``````

``````static void runDiffusionStep(int L, float Conc[2][L][L][L], float D) {
runDiffusionStep_sw.reset();
// computes the changes in substance concentrations due to diffusion
int i1,i2,i3,auxx;
const float auxD=D/6;
const int auxL=L-1;
float tempConc[2][L][L][L] __attribute__((aligned(64)));
#pragma omp parallel
{
#pragma omp for schedule(static) private(i1,i2) collapse(2)
for (i1 = 0; i1 < L; ++i1) {
for (i2 = 0; i2 < L; ++i2) {
memcpy(tempConc[0][i1][i2],
Conc[0][i1][i2],sizeof(float)*L);
memcpy(tempConc[1][i1][i2],
Conc[1][i1][i2],sizeof(float)*L);
}
}

#pragma omp for schedule(static) private(i1,i2,i3) collapse(2)
for (i1 = 0; i1 < L; ++i1) {
for (i2 = 0; i2 < L; ++i2) {
Conc[0][i1][i2][0] += (tempConc[0][i1][i2][1]-
tempConc[0][i1][i2][0])*auxD;
Conc[1][i1][i2][0] += (tempConc[1][i1][i2][1]-
tempConc[1][i1][i2][0])*auxD;
for (i3 = 1; i3 < auxL; ++i3) {
const float aux=tempConc[0][i1][i2][i3];
const float aux1=tempConc[1][i1][i2][i3];
__assume_aligned((float*)tempConc[0], 64);
__assume_aligned((float*)tempConc[1], 64);
__assume_aligned((float*)Conc[0], 64);
__assume_aligned((float*)Conc[1], 64);
if (i1<auxL) {
Conc[0][i1][i2][i3] +=
(tempConc[0][(i1+1)][i2][i3]-aux)*auxD;
Conc[1][i1][i2][i3] +=
(tempConc[1][(i1+1)][i2][i3]-aux1)*auxD;
}
if (i1>0) {
Conc[0][i1][i2][i3] +=
(tempConc[0][(i1-1)][i2][i3]-aux)*auxD;
Conc[1][i1][i2][i3] +=
(tempConc[1][(i1-1)][i2][i3]-aux1)*auxD;
}
if (i2<auxL) {
Conc[0][i1][i2][i3] +=
(tempConc[0][i1][(i2+1)][i3]-aux)*auxD;
Conc[1][i1][i2][i3] +=
(tempConc[1][i1][(i2+1)][i3]-aux1)*auxD;
}
if (i2>0) {
Conc[0][i1][i2][i3] +=
(tempConc[0][i1][(i2-1)][i3]-aux)*auxD;
Conc[1][i1][i2][i3] += (tempConc[1][i1][(i2-1)][i3]-aux1)*auxD;
}
Conc[0][i1][i2][i3] += (tempConc[0][i1][i2][(i3+1)]-aux)*auxD;
Conc[1][i1][i2][i3] += (tempConc[1][i1][i2][(i3+1)]-aux1)*auxD;
Conc[0][i1][i2][i3] +=  (tempConc[0][i1][i2][(i3-1)]-aux)*auxD;
Conc[1][i1][i2][i3] += (tempConc[1][i1][i2][(i3-1)]-aux1)*auxD;
}
Conc[0][i1][i2][auxL-1] += (tempConc[0][i1][i2][auxL]-tempConc[0][i1][i2][auxL-1])*auxD;
Conc[1][i1][i2][auxL-1] += (tempConc[1][i1][i2][auxL]-tempConc[0][i1][i2][auxL-1])*auxD;
}
}
}``````

#### 优化说明

• 第一个循环在
#pragma omp for schedule(static) private(i1,i2) collapse(2) 下方
• 第二个循环在
#pragma omp for schedule(static) private(i1,i2,i3) collapse(2) 下方

``````Conc[0][i1][i2][0]
Conc[1][i1][i2][0]

Conc[0][i1][i2][auxL-1]
Conc[1][i1][i2][auxL-1]``````

``````Conc[0][i1][i2][i3] += (tempConc[0][i1][i2][(i3+1)]-aux)*auxD;
Conc[1][i1][i2][i3] += (tempConc[1][i1][i2][(i3+1)]-aux1)*auxD;
Conc[0][i1][i2][i3] += (tempConc[0][i1][i2][(i3-1)]-aux)*auxD;
Conc[1][i1][i2][i3] += (tempConc[1][i1][i2][(i3-1)]-aux1)*auxD;``````

#### runDecayStep

``````static void runDecayStep(float**** Conc, int L, float mu) {
runDecayStep_sw.reset();
// computes the changes in substance concentrations due to decay
int i1,i2,i3;
for (i1 = 0; i1 < L; i1++) {
for (i2 = 0; i2 < L; i2++) {
for (i3 = 0; i3 < L; i3++) {
Conc[0][i1][i2][i3] = Conc[0][i1][i2][i3]*(1-mu);
Conc[1][i1][i2][i3] = Conc[1][i1][i2][i3]*(1-mu);
}
}
}
runDecayStep_sw.mark();
}``````

``````static void runDecayStep(int L, float Conc[2][L][L][L], float mu) {
runDecayStep_sw.reset();
// computes the changes in substance concentrations due to decay
const float muu=1-mu;
int i1,i2,i3;
#pragma omp parallel for schedule(static) private(i1,i2,i3) collapse(3)
#pragma simd
for (i1 = 0; i1 < L; ++i1) {
for (i2 = 0; i2 < L; ++i2) {
for (i3 = 0; i3 < L; ++i3) {
__assume_aligned((float*)Conc[0][i1][i2], 64);
__assume_aligned((float*)Conc[1][i1][i2], 64);
Conc[0][i1][i2][i3] = Conc[0][i1][i2][i3]*muu;
Conc[1][i1][i2][i3] = Conc[1][i1][i2][i3]*muu;
}
}
}
runDecayStep_sw.mark();
}``````

#### 优化说明

• 函数的输入阵列在标头上定义。
• 变量的优化方式与之前的函数一样（尽量使用常量，并对齐阵列）。
• 循环在 OpenMP 编译指示标头下方‘重叠’。
• 使用 #pragma simd 循环。 借助 SIMD 指令对该循环进行矢量化处理，可缩短优化后代码的执行时间。 使用 SIMD 指令支持同时执行乘法运算。

#### cellMovementAndDuplication

``````static int cellMovementAndDuplication(float** posAll, float* pathTraveled,
int* typesAll, int* numberDivisions, float pathThreshold, int
divThreshold, int n) {
cellMovementAndDuplication_sw.reset();
int c;
currentNumberCells = n;
float currentNorm;
float currentCellMovement[3];
float duplicatedCellOffset[3];
for (c=0; c<n; c++) {
// random cell movement
currentCellMovement[0]=RandomFloatPos()-0.5;
currentCellMovement[1]=RandomFloatPos()-0.5;
currentCellMovement[2]=RandomFloatPos()-0.5;
currentNorm = getNorm(currentCellMovement);
posAll[c][0]+=0.1*currentCellMovement[0]/currentNorm;
posAll[c][1]+=0.1*currentCellMovement[1]/currentNorm;
posAll[c][2]+=0.1*currentCellMovement[2]/currentNorm;
pathTraveled[c]+=0.1;
// cell duplication if conditions fulfilled
if (numberDivisions[c]<divThreshold) {
if (pathTraveled[c]>pathThreshold) {
pathTraveled[c]-=pathThreshold;
numberDivisions[c]+=1; // update number of divisions this cell has undergone
currentNumberCells++;  // update number of cells in
the simulation
numberDivisions[currentNumberCells-1]=numberDivisions[c]; // update number of divisions the  duplicated cell has undergone
typesAll[currentNumberCells-1]=-typesAll[c];
// assign type of duplicated cell (opposite to current cell)

// assign location of duplicated cell
duplicatedCellOffset[0]=RandomFloatPos()-0.5;
duplicatedCellOffset[1]=RandomFloatPos()-0.5;
duplicatedCellOffset[2]=RandomFloatPos()-0.5;
currentNorm = getNorm(duplicatedCellOffset);
posAll[currentNumberCells-1][0]=posAll[c][0]+0.05*duplicatedCellOffset[0]/currentNorm;
posAll[currentNumberCells-1][1]=posAll[c][1]+0.05*duplicatedCellOffset[1]/currentNorm;
posAll[currentNumberCells-1][2]=posAll[c][2]+0.05*duplicatedCellOffset[2]/currentNorm;
}
}
}
cellMovementAndDuplication_sw.mark();
return currentNumberCells;
}``````

``````static int cellMovementAndDuplication(float posAll[][3], float* pathTraveled,
int* typesAll, int* numberDivisions, float pathThreshold, int
divThreshold, int n) {
cellMovementAndDuplication_sw.reset();
int c,currentNumberCells = n;
float currentNorm;
unsigned int seed=rand();
float currentCellMovement[3] __attribute__((aligned(64)));
float duplicatedCellOffset[3] __attribute__((aligned(64)));

#pragma omp parallel for simd schedule (static) shared(posAll) private(c,currentNorm,currentCellMovement)

for (c=0; c<n; ++c) {
// random cell movement
RandomFloatPos(currentCellMovement,seed+c);
currentNorm = getNorm(currentCellMovement)*10;
__assume_aligned((float*)posAll, 64);
__assume_aligned((float*)currentCellMovement, 64);
posAll[c][0:3]+=currentCellMovement[0:3]/currentNorm;
__assume_aligned((float*)pathTraveled, 64);
pathTraveled[c]+=0.1;
}
seed=rand();
for (c=0; c<n; ++c) {
if ((numberDivisions[c]<divThreshold) && (pathTraveled[c]>pathThreshold)) {
pathTraveled[c]-=pathThreshold;
++numberDivisions[c]; // update number of divisions this cell has undergone
numberDivisions[currentNumberCells]=numberDivisions[c]; // update number of divisions the duplicated cell has undergone
typesAll[currentNumberCells]=-typesAll[c]; // assign type of
duplicated cell (opposite to current cell)

// assign location of duplicated cell
RandomFloatPos(duplicatedCellOffset,seed+c); //The seed+c value will be different for each thread and iteration, this way the random value is random always.
currentNorm = getNorm(duplicatedCellOffset)*20;
__assume_aligned((float*)posAll, 64);
__assume_aligned((float*)duplicatedCellOffset, 64);
posAll[currentNumberCells][0:3]=posAll[c][0:3]+duplicatedCellOffset[0:3]/currentNorm;
++currentNumberCells; // update number of cells in the simulation
}
}
cellMovementAndDuplication_sw.mark();
return currentNumberCells;
}``````

#### 优化说明

• RandomFloatPos() 函数。 这一优化后的函数可以实现多线程化，并返回整个位置阵列（3D 位置阵列）。
• 包含 currentNorm 的代码行可通过 SIMD 实现矢量化，也可以以数学的方式进行简化。

#### runDiffusionClusterStep

``````static void runDiffusionClusterStep(float**** Conc, float** movVec, float** posAll, int* typesAll, int n, int L, float speed) {
runDiffusionClusterStep_sw.reset();
// computes movements of all cells based on gradients of the two substances
float sideLength = 1/(float)L; // length of a side of a diffusion voxel

int c, i1, i2, i3, xUp, xDown, yUp, yDown, zUp, zDown;

for (c = 0; c < n; c++) {
i1 = std::min((int)floor(posAll[c][0]/sideLength),(L-1));
i2 = std::min((int)floor(posAll[c][1]/sideLength),(L-1));
i3 = std::min((int)floor(posAll[c][2]/sideLength),(L-1));

xUp = std::min((i1+1),L-1);
xDown = std::max((i1-1),0);
yUp = std::min((i2+1),L-1);
yDown = std::max((i2-1),0);
zUp = std::min((i3+1),L-1);
zDown = std::max((i3-1),0);

} else {
movVec[c][0]=0;
movVec[c][1]=0;
movVec[c][2]=0;
}
}
runDiffusionClusterStep_sw.mark();
}``````

``````static void runDiffusionClusterStep(int L, float Conc[2][L][L][L], float movVec[][3], float posAll[][3], int* typesAll, int n, float speed) {
runDiffusionClusterStep_sw.reset();
// computes movements of all cells based on gradients of the two substances
const float auxL=L;
--L;
float aux[3] __attribute__((aligned(64)));
int i[3] __attribute__((aligned(32)));
int c, xUp, xDown, yUp, yDown, zUp, zDown;

for (c = 0; c < n; ++c) {
__assume_aligned((int*)i, 32);
__assume_aligned((float*)posAll, 64);
__assume_aligned((float*)movVec, 64);
__assume_aligned((float*)typesAll, 64);
i[0:3] = std::min((int)floor(posAll[c][0:3]*auxL),L)-1;
xDown = std::max(i[0],0);
yDown = std::max(i[1],0);
zDown = std::max(i[2],0);
xUp = std::min((i[0]+2),L);
yUp = std::min((i[1]+2),L);
zUp = std::min((i[2]+2),L);
aux[0]=auxL/((xUp-xDown));
aux[1]=auxL/((yUp-yDown));
aux[2]=auxL/((zUp-zDown));
} else movVec[c][0:3]=0;
}
}
runDiffusionClusterStep_sw.mark();
}``````

#### 优化说明

• 输入阵列的维度在标头中规定。
• 阵列对齐。
• 尽量将变量替换成常量。
• OpenMP 并行化。
• 简化运算。
• 简化矢量化。

#### getEnergy

``````Original Code
static float getEnergy(float** posAll, int* typesAll, int n, float spatialRange, int targetN) {
getEnergy_sw.reset();
// Computes an energy measure of clusteredness within a subvolume. The
// size of the subvolume is computed by assuming roughly uniform
// distribution within the whole volume, and selecting a volume
// comprising approximately targetN cells.
int i1, i2;
float currDist;
float** posSubvol=0; // array of all 3 dimensional cell positions
posSubvol = new float*[n];
int typesSubvol[n];
float subVolMax = pow(float(targetN)/float(n),1.0/3.0)/2;
if(quiet < 1)
printf("subVolMax: %f\n", subVolMax);
int nrCellsSubVol = 0;
float intraClusterEnergy = 0.0;
float extraClusterEnergy = 0.0;
float nrSmallDist=0.0;

for (i1 = 0; i1 < n; i1++) {
posSubvol[i1] = new float[3];
if ((fabs(posAll[i1][0]-0.5)<subVolMax) && (fabs(posAll[i1][1]-0.5)<subVolMax) && (fabs(posAll[i1][2]-0.5)<subVolMax)) {
posSubvol[nrCellsSubVol][0] = posAll[i1][0];
posSubvol[nrCellsSubVol][1] = posAll[i1][1];
posSubvol[nrCellsSubVol][2] = posAll[i1][2];
typesSubvol[nrCellsSubVol] = typesAll[i1];
nrCellsSubVol++;
}
}

for (i1 = 0; i1 < nrCellsSubVol; i1++) {
for (i2 = i1+1; i2 < nrCellsSubVol; i2++) {
currDist = getL2Distance(posSubvol[i1][0],posSubvol[i1][1],posSubvol[i1][2],posSubvol[i2][0],
posSubvol[i2][1],posSubvol[i2][2]);
if (currDist<spatialRange) {
nrSmallDist = nrSmallDist+1;//currDist/spatialRange;
if (typesSubvol[i1]*typesSubvol[i2]>0) {
intraClusterEnergy = intraClusterEnergy+fmin(100.0,spatialRange/currDist);
} else {
extraClusterEnergy = extraClusterEnergy+fmin(100.0,spatialRange/currDist);
}
}
}
}
float totalEnergy = (extraClusterEnergy-intraClusterEnergy)/(1.0+100.0*nrSmallDist);
getEnergy_sw.mark();
}``````

``````static float getEnergy(float posAll[][3], int* typesAll, int n, float spatialRange, int targetN) {
getEnergy_sw.reset();
// Computes an energy measure of clusteredness within a subvolume. The
// size of the subvolume is computed by assuming roughly uniform
// distribution within the whole volume, and selecting a volume
// comprising approximately targetN cells.
int i1, i2;
float currDist;
float posSubvol[n][3] __attribute__((aligned(64)));
int typesSubvol[n] __attribute__((aligned(64)));
const float subVolMax = pow(float(targetN)/float(n),1.0/3.0)/2;
if(quiet < 1)printf("subVolMax: %f\n", subVolMax);

int nrCellsSubVol = 0;
float intraClusterEnergy = 0.0;
float extraClusterEnergy = 0.0;
float nrSmallDist=0.0;
for (i1 = 0; i1 < n; ++i1) {
__assume_aligned((float*)posAll, 64);
if ((fabs(posAll[i1][0]-0.5)<subVolMax) && (fabs(posAll[i1][1]-0.5)<subVolMax) && (fabs(posAll[i1][2]-
0.5)<subVolMax)) {
__assume_aligned((float*)posSubvol[nrCellsSubVol], 64);
__assume_aligned((float*)typesAll, 64);
__assume_aligned((int*)typesSubvol, 64);
posSubvol[nrCellsSubVol][0] = posAll[i1][0];
posSubvol[nrCellsSubVol][1] = posAll[i1][1];
posSubvol[nrCellsSubVol][2] = posAll[i1][2];
typesSubvol[nrCellsSubVol] = typesAll[i1];
++nrCellsSubVol;
}
}
#pragma omp parallel for schedule(static) reduction(+:nrSmallDist,intraClusterEnergy,extraClusterEnergy) private(i1,i2,currDist)
for (i1 = 0; i1 < nrCellsSubVol; ++i1) {
for (i2 = i1+1; i2 < nrCellsSubVol; ++i2) {
currDist = getL2Distance(posSubvol[i1],posSubvol[i2]);
if (currDist<spatialRange) {
++nrSmallDist; //currDist/spatialRange;
(typesSubvol[i1]*typesSubvol[i2]>0)? intraClusterEnergy += fmin(100.0,spatialRange/currDist) :
extraClusterEnergy += fmin(100.0,spatialRange/currDist);
}
}
}
getEnergy_sw.mark();
return (extraClusterEnergy-intraClusterEnergy)/(1.0+100.0*nrSmallDist);
}``````

#### 优化说明

• 输入阵列的维度在标头中规定。
• 尽量使用常量。
• 对齐阵列。
• 使用减法在第二个 ‘for loop’ 上实现 OpenMP 并行化。

#### getCriterion

``````static bool getCriterion(float** posAll, int* typesAll, int n, float spatialRange, int targetN) {
getCriterion_sw.reset();
// Returns 0 if the cell locations within a subvolume of the total
// system, comprising approximately targetN cells, are arranged as clusters, and 1 otherwise.
int i1, i2;
int nrClose=0; // number of cells that are close (i.e. within a distance of spatialRange)
float currDist;
int sameTypeClose=0; // number of cells of the same type, and that are close (i.e. within a distance of spatialRange)
int diffTypeClose=0; //number of cells of opposite types, and that are close (i.e. within a distance of spatialRange)
float** posSubvol=0; // array of all 3 dimensional cell positions in the subcube
posSubvol = new float*[n];
int typesSubvol[n];
float subVolMax = pow(float(targetN)/float(n),1.0/3.0)/2;
int nrCellsSubVol = 0;

// the locations of all cells within the subvolume are copied to array PosSubvol
for (i1 = 0; i1 < n; i1++) {
posSubvol[i1] = new float[3];
if ((fabs(posAll[i1][0]-0.5)<subVolMax) && (fabs(posAll[i1][1]-0.5)<subVolMax) && (fabs(posAll[i1][2]
-0.5)<subVolMax)) {
posSubvol[nrCellsSubVol][0] = posAll[i1][0];
posSubvol[nrCellsSubVol][1] = posAll[i1][1];
posSubvol[nrCellsSubVol][2] = posAll[i1][2];
typesSubvol[nrCellsSubVol] = typesAll[i1];
nrCellsSubVol++;
}
}

[section of truncated code]

for (i1 = 0; i1 < nrCellsSubVol; i1++) {
for (i2 = i1+1; i2 < nrCellsSubVol; i2++) {
currDist = getL2Distance(posSubvol[i1][0],posSubvol[i1][1],posSubvol[i1][2],posSubvol[i2][0], posSubvol[i2][1],posSubvol[i2][2]);
if (currDist<spatialRange) {
nrClose++;
if (typesSubvol[i1]*typesSubvol[i2]<0) {
diffTypeClose++;
} else {
sameTypeClose++;
}
}
}

[section of truncated code]

}``````

``````static bool getCriterion(float posAll[][3], int* typesAll, int n, float spatialRange, int targetN) {
getCriterion_sw.reset();
// Returns 0 if the cell locations within a subvolume of the total
// system, comprising approximately targetN cells, are arranged as clusters, and 1 otherwise.
int i1, i2;
int nrClose=0; // number of cells that are close (i.e. within a  distance of spatialRange)
int sameTypeClose=0; // number of cells of the same type, and that are  close (i.e. within a distance of spatialRange)
int diffTypeClose=0; // number of cells of opposite types, and that are  close (i.e. within a distance of spatialRange)
float posSubvol[n][3] __attribute__((aligned(64)));
int typesSubvol[n] __attribute__((aligned(64)));
const float subVolMax = pow(float(targetN)/float(n),1.0/3.0)/2;
int nrCellsSubVol = 0;

// the locations of all cells within the subvolume are copied to array  posSubvol

for (i1 = 0; i1 < n; ++i1) {
__assume_aligned((float*)posAll, 64);
if ((fabs(posAll[i1][0]-0.5)<subVolMax) && (fabs(posAll[i1][1]-0.5)<subVolMax) && (fabs(posAll[i1][2]
-0.5)<subVolMax)) {
__assume_aligned((float*)posSubvol[nrCellsSubVol], 64);
__assume_aligned((float*)typesAll, 64);
__assume_aligned((int*)typesSubvol, 64);
posSubvol[nrCellsSubVol][0] = posAll[i1][0];
posSubvol[nrCellsSubVol][1] = posAll[i1][1];
posSubvol[nrCellsSubVol][2] = posAll[i1][2];
typesSubvol[nrCellsSubVol] = typesAll[i1];
++nrCellsSubVol;
}
}

[section of truncated code]

#pragma omp parallel for schedule(static)
reduction(+:nrClose,diffTypeClose,sameTypeClose) private(i1,i2)
for (i1 = 0; i1 < nrCellsSubVol; ++i1) {
for (i2 = i1+1; i2 < nrCellsSubVol; ++i2) {
if (getL2Distance(posSubvol[i1],posSubvol[i2])<spatialRange) {
++nrClose;
(typesSubvol[i1]*typesSubvol[i2]<0) ? ++diffTypeClose: ++sameTypeClose;
}
}
}

[section of truncated code]

}``````

#### 优化说明

• 输入阵列的维度在标头中规定。
• 尽量使用常量。
• 使用减法在第二个 ‘for loop’ 上实现 OpenMP 并行化。

#### 主 – 变量声明

``````int i,c,d;
int i1, i2, i3, i4;
float energy; // value that quantifies the quality of the cell clustering   output. The smaller this value, the better the clustering
float** posAll=0; // array of all 3 dimensional cell positions
posAll = new float*[finalNumberCells];
float** currMov=0; // array of all 3 dimensional cell movements at the last  time point
currMov = new float*[finalNumberCells]; // array of all cell movements in the  last time step
float zeroFloat = 0.0;
float pathTraveled[finalNumberCells]; // array keeping track of length of path traveled until cell divides
int numberDivisions[finalNumberCells]; //array keeping track of number of  division a cell has undergone
int typesAll[finalNumberCells]; // array specifying cell type (+1 or -1)``````

``````int i,c,d,i1;
float energy; // value that quantifies the quality of the cell clustering output. The smaller this value, the better the clustering
float posAll[finalNumberCells][3] __attribute__((aligned(64)));
float currMov[finalNumberCells][3] __attribute__((aligned(64))); //array of
// all cell movements in the last time step
float pathTraveled[finalNumberCells] __attribute__((aligned(64))); // array
// keeping track of length of path traveled until cell divides
int numberDivisions[finalNumberCells] __attribute__((aligned(64))); //array
// keeping track of number of division a cell has undergone
int typesAll[finalNumberCells] __attribute__((aligned(64))); // array
// specifying cell type (+1 or -1)
float Conc[2][L][L][L] __attribute__((aligned(64)));``````

#### 优化说明

• 避免使用‘new’构造函数。
• 删除无用变量（比如 zeroFloat）。
• 如果可以声明大小已知的静态阵列，避免使用指示器。
• 内存对齐；尽管可以使用更多内存，但内存访问速度更快。
• 在初始化阶段定义变量；这比之后定义该值的速度更快。

#### 主 – 变量初始化

``````// Initialization of the various arrays
for (i1 = 0; i1 < finalNumberCells; i1++) {
currMov[i1] = new float[3];
posAll[i1] = new float[3];
pathTraveled[i1] = zeroFloat;
pathTraveled[i1] = 0;
for (i2 = 0; i2 < 3; i2++) {
currMov[i1][i2] = zeroFloat;
posAll[i1][i2] = 0.5;
}
}
// create 3D concentration matrix
float**** Conc;
Conc = new float***[L];
for (i1 = 0; i1 < 2; i1++) {
Conc[i1] = new float**[L];
for (i2 = 0; i2 < L; i2++) {
Conc[i1][i2] = new float*[L];
for (i3 = 0; i3 < L; i3++) {
Conc[i1][i2][i3] = new float[L];
for (i4 = 0; i4 < L; i4++) {
Conc[i1][i2][i3][i4] = zeroFloat;
}
}
}
}``````

``````// Initialization of the various arrays
#pragma omp parallel
{
#pragma omp for simd schedule (static) private (i1) nowait
for (i1 = 0; i1 < finalNumberCells; ++i1) {
__assume_aligned((float*)posAll, 64);
__assume_aligned((float*)currMov[i1], 64);
__assume_aligned((float*)pathTraveled, 64);
currMov[i1][0:3]=0;
posAll[i1][0]=0.5;
posAll[i1][1]=0.5;
posAll[i1][2]=0.5;
pathTraveled[i1] = 0;
}
#pragma omp for schedule (static) private (i1,c,d) collapse(3)
for (i1 = 0; i1 < 2; ++i1) {
for (c = 0; c < L; ++c) {
for (d = 0; d < L; ++d) {
Conc[i1][c][d][0:L]=0;
}
}
}
}``````

#### 主 – 阶段 1

``````// Phase 1: Cells move randomly and divide until final number of cells is reached
while (n<finalNumberCells) {
produceSubstances(Conc, posAll, typesAll, L, n); // Cells produce
// substances. Depending on the cell type, one of the two substances is produced.
runDiffusionStep(Conc, L, D); // Simulation of substance diffusion
runDecayStep(Conc, L, mu);
n = cellMovementAndDuplication(posAll, pathTraveled, typesAll, numberDivisions, pathThreshold, divThreshold, n);

for (c=0; c<n; c++) {
// boundary conditions
for (d=0; d<3; d++) {
if (posAll[c][d]<0) {posAll[c][d]=0;}
if (posAll[c][d]>1) {posAll[c][d]=1;}
}
}
}``````

``````// Phase 1: Cells move randomly and divide until final number of cells is reached
while (n<finalNumberCells) {
produceSubstances(L,Conc, posAll, typesAll, n); // Cells produce substances. Depending on the cell type, one of the two substances is produced.
runDiffusionStep(L,Conc, D); // Simulation of substance diffusion
runDecayStep(L,Conc, mu);
n = cellMovementAndDuplication(posAll, pathTraveled, typesAll, numberDivisions, pathThreshold, divThreshold, n);
#pragma omp parallel for simd schedule (static) private (c,d) if (n>500)
for (c=0; c<n; ++c) {
// boundary conditions
for (d=0; d<3; d++) {
if (posAll[c][d]<0) {
posAll[c][d]=0;
}else if (posAll[c][d]>1) posAll[c][d]=1;
}
}
}``````

#### 主 – 阶段 2 结束

``````for (c=0; c<n; c++) {
posAll[c][0] = posAll[c][0]+currMov[c][0];
posAll[c][1] = posAll[c][1]+currMov[c][1];
posAll[c][2] = posAll[c][2]+currMov[c][2];

// boundary conditions: cells can not move out of the cube [0,1]^3
for (d=0; d<3; d++) {
if (posAll[c][d]<0) {posAll[c][d]=0;}
if (posAll[c][d]>1) {posAll[c][d]=1;}
}
}``````

``````#pragma omp parallel for simd schedule (static) private (c,d) if (n>500)
for (c=0; c<n; ++c) {
__assume_aligned((float*)posAll, 64);
__assume_aligned((float*)currMov[c], 64);
posAll[c][0:3] += currMov[c][0:3];
// boundary conditions: cells can not move out of the cube [0,1]^3
for (d=0; d<3; d++) {
if (posAll[c][d]<0) {
posAll[c][d]=0;
}else if (posAll[c][d]>1) posAll[c][d]=1;
}
}``````

#### 其他优化

• 将所有后增量定义为前增量。 后增量拷贝实际变量，然后对其进行增量处理。 前增量不进行拷贝，所以速度更快。 只有当增量的点不影响结果时，才可以实现这一性能提升。
• 在所有阵列上实现内存对齐。 如果阵列对齐，内存操作的速度更快，而且 CPU 无需标记或转换数据。

#### 关于作者

Daniel Vea Falguera 是电子系统工程专业的学生，也是一名创业者，在电子与计算机编程方面拥有浓厚的兴趣和深厚的功底。

#### 链接

https://software.intel.com/zh-cn/forums/intel-moderncode-for-parallel-architectures

OpenMP*: http://openmp.org/wp/

1