PARSEC* 3.0 中的多线程代码优化: BlackScholes

简介

普林斯顿共享内存计算机应用库 (PARSEC) 是由多线程程序组成的基准测试套件。 本套件主要面向新兴工作负载,堪称下一代芯片多处理器共享内存计划的代表。

基准测试套件及其全部应用和输入集以开源方式免费提供。 一些基准测试计划有自己的许可条款,可能会限制在某些情况下的使用。

毕苏期权定价模式 (Black-Scholes) 基准测试是 PARSEC 中 13 个基准测试之一。 该基准测试可使用 Black-Scholes 偏微分方程 (PDE) 进行期权定价。 Black-Scholes 方程是一个微分方程,可描述在某些前提下,期权的值如何随底层资产价格的变化而变化。

看跌期权的公式相似。 累积正态分布函数 CND(x) 使得正态分布随机变量的值有可能低于 x。 该函数没有闭式表达式,因此必须求出其数字数值。 或者,该函数的值也可在表格中进行预计算和硬编码;此时,可在运行时使用表格查找获取这些值。

根据该公式,用户可以根据五个输入以分析的方式来计算期权价格。 使用分析方法来计算期权价格,限制因素将取决于处理器可执行的浮点计算的数量。

热点

当我们查看英特尔® VTune™ Amplifier XE 的性能分析结果时,我们发现了两个主要的热点。

  • 从输入文件中读取,以及向输出文件写入。
  • Black-Scholes 计算。

下面我们来详细了解一下每一项。

从输入文件中读取,以及向输出文件写入

其中的问题是输入文件包含 1000 万行数据。 每一行是一个 struct OptionData 元素,其中包含 9 个参数。 这也是我们之前开始计算时,花费了许多时间从输入文件中读取并解析从数据文件到一个 OptionData struct 阵列的原因。 该阵列称为 data


显然,经过全部计算之后,会遇到同样的问题,您需要在输出文件中写入结果。

这些问题可通过使用指针并行读写来解决。 更多详情,请阅读此处链接:并行应用中的数据读写优化 | 英特尔® 开发人员专区

Black-Scholes 计算

接下来,我们将更具体地介绍一下此案例。 所有计算都包括两个函数: CNDFBlkSchlsEqEuroNoDiv

CNDF

该函数实现了标准正态分布的累积分布函数。 更多详情,请参考有关或然率理论的书籍。

在我们的案例中,该函数如下:

#define inv_sqrt_2xPI 0.39894228040143270286
fptype CNDF ( fptype InputX ) 
{
    int sign;
    fptype OutputX;
    fptype xInput;
    fptype xNPrimeofX;
    fptype expValues;
    fptype xK2;
    fptype xK2_2, xK2_3;
    fptype xK2_4, xK2_5;
    fptype xLocal, xLocal_1;
    fptype xLocal_2, xLocal_3;
    // Check for negative value of InputX
    if (InputX < 0.0) {
        InputX = -InputX;
        sign = 1;
    } else 
        sign = 0;
    xInput = InputX;
     // Compute NPrimeX term common to both four & six decimal accuracy calcs
    expValues = exp(-0.5f * InputX * InputX);
    xNPrimeofX = expValues;
    xNPrimeofX = xNPrimeofX * inv_sqrt_2xPI;
    xK2 = 0.2316419 * xInput;
    xK2 = 1.0 + xK2;
    xK2 = 1.0 / xK2;
    xK2_2 = xK2 * xK2;
    xK2_3 = xK2_2 * xK2;
    xK2_4 = xK2_3 * xK2;
    xK2_5 = xK2_4 * xK2;
    xLocal_1 = xK2 * 0.319381530;
    xLocal_2 = xK2_2 * (-0.356563782);
    xLocal_3 = xK2_3 * 1.781477937;
    xLocal_2 = xLocal_2 + xLocal_3;
    xLocal_3 = xK2_4 * (-1.821255978);
    xLocal_2 = xLocal_2 + xLocal_3;
    xLocal_3 = xK2_5 * 1.330274429;
    xLocal_2 = xLocal_2 + xLocal_3;

    xLocal_1 = xLocal_2 + xLocal_1;
    xLocal   = xLocal_1 * xNPrimeofX;
    xLocal   = 1.0 - xLocal;

    OutputX  = xLocal;
    
    if (sign) {
        OutputX = 1.0 - OutputX;
    }
       return OutputX;
}

BlkSchlsEqEuroNoDiv

此即 Black-Scholes 模型,可为欧式期权定价提供理论估计。 对于我们的目的而言,我们只需要知道 Black-Scholes 公式并将其用于计算中即可。 公式如下:

C(S,t)=N(d1)S-N(d2)Ke-r(T-t) - 非分红基础股票看涨期权值

P(S,t)=Ke-r(T-t)N(-d2)-N(d1)S - 相应的看跌期权的价格。

d1= 1/(σ√(T-t))(ln(S/K)+(r+σ2/2)(T-t))

d2= 1/(σ√(T-t))(ln(S/K)+(r-σ2/2)(T-t))

对于上述的两个公式:

  • N(∙) 代表 CNDF 函数,
  • T - t 代表期限。
  • S 代表基础资产的现货价格。
  • K 代表执行价格。
  • r 代表风险。
  • σ 代表基础资产的收益波动。

在我们的案例中,该函数如下:

fptype BlkSchlsEqEuroNoDiv( fptype sptprice,
                            fptype strike, fptype rate, fptype volatility,
                            fptype time, int otype, float timet )
{
    fptype OptionPrice;

    // local private working variables for the calculation
    fptype xStockPrice;
    fptype xStrikePrice;
    fptype xRiskFreeRate;
    fptype xVolatility;
    fptype xTime;
    fptype xSqrtTime;

    fptype logValues;
    fptype xLogTerm;
    fptype xD1; 
    fptype xD2;
    fptype xPowerTerm;
    fptype xDen;
    fptype d1;
    fptype d2;
    fptype FutureValueX;
    fptype NofXd1;
    fptype NofXd2;
    fptype NegNofXd1;
    fptype NegNofXd2;    
    
    xStockPrice = sptprice;
    xStrikePrice = strike;
    xRiskFreeRate = rate;
    xVolatility = volatility;

    xTime = time;
    xSqrtTime = sqrt(xTime);

    logValues = log( sptprice / strike );
        
    xLogTerm = logValues;
        
    
    xPowerTerm = xVolatility * xVolatility;
    xPowerTerm = xPowerTerm * 0.5;
        
    xD1 = xRiskFreeRate + xPowerTerm;
    xD1 = xD1 * xTime;
    xD1 = xD1 + xLogTerm;

    xDen = xVolatility * xSqrtTime;
    xD1 = xD1 / xDen;
    xD2 = xD1 -  xDen;

    d1 = xD1;
    d2 = xD2;
    
    NofXd1 = CNDF( d1 );
    NofXd2 = CNDF( d2 );

    FutureValueX = strike * ( exp( -(rate)*(time) ) );        
    if (otype == 0) {            
        OptionPrice = (sptprice * NofXd1) - (FutureValueX * NofXd2);
    } else { 
        NegNofXd1 = (1.0 - NofXd1);
        NegNofXd2 = (1.0 - NofXd2);
        OptionPrice = (FutureValueX * NegNofXd2) - (sptprice * NegNofXd1);
    }
    
    return OptionPrice;
}

优化

main() 中的唯一一个函数是 bs_thread(),该函数如下所示:

#ifdef WIN32
DWORD WINAPI bs_thread(LPVOID tid_ptr){
#else
int bs_thread(void *tid_ptr) {
#endif
    int i, j;
    fptype price;
    fptype priceDelta;
    int tid = *(int *)tid_ptr;
    int start = tid * (numOptions / nThreads);
    int end = start + (numOptions / nThreads);
	
  for (j=0; j<NUM_RUNS; j++) {
#ifdef ENABLE_OPENMP
#pragma omp parallel for private(i, price, priceDelta)
        for (i=0; i<numOptions; i++) {
#else  //ENABLE_OPENMP
        for (i=start; i<end; i++) {
#endif //ENABLE_OPENMP
            /* Calling main function to calculate option value based on 
             * Black & Scholes's equation.
             */
            price = BlkSchlsEqEuroNoDiv( sptprice[i], strike[i],
                                         rate[i], volatility[i], otime[i], 
                                         otype[i], 0);
            prices[i] = price;

#ifdef ERR_CHK
            priceDelta = data[i].DGrefval - price;
            if( fabs(priceDelta) >= 1e-4 ){
                printf("Error on %d. Computed=%.5f, Ref=%.5f, Delta=%.5f\n",
                       i, price, data[i].DGrefval, priceDelta);
                numError ++;
            }
#endif
        }
    }
    return 0;
}
#endif //ENABLE_TBB

我们来做一个从 0 到 numOptions 的循环,并将其放置到 BlkSchlsEqEuroNoDiv 中,然后在该函数中再添加一个参数。 价格阵列即为该参数。 当然,我们的函数 BlkSchlsEqEuroNoDiv 将整个采用 sptpricestrikeratevolatilitytimeotype 阵列。 此外,我们还需要添加一些 OpenMP* 编译指示对该循环进行并行:

#pragma omp parallel for private(i)
#pragma simd
#pragma vector aligned

因此,通过矢量化和并行化,我们可以获得性能提升。

如果我们进一步读取标准正态分布的 CDF 函数,将会发现,该函数可使用 error function 来表达:

N(x)=  1/2(1+erf(x/√2))

接下来,我们来创建 CDF 函数实施并尝试使用一下:

#   define ERF(x)      erff(x)
#   define INVSQRT(x)  1.0f/sqrtf(x)
#   define HALF        0.5f
fptype cdfnorm (fptype input)
{
	fptype buf;
	fptype output;
	buf = input*INVSQRT(2);
	output = HALF + HALF*ERF(buf);
	return output;
}

我们在函数中使用它之前,我想说明一下,我使用了 erff(x) 函数,其包含在英特尔® 数学核心库中。 我在源文件上使用了 #include <mathimf.h>

现在,我们已经做好进行实验的准备!

系统

我用来测试修改的系统是双英特尔® 至强™ E5-2697 v3 服务器。 英特尔至强 E5-2697 v3 处理器包含 14 个内核,内核频率为 2.6 GHz。 同时,它还启用了英特尔® 超线程技术。 因此,每个数据包中有 28 个超线程和 35 MB 的三级缓存。 我们总计有 56 个线程可用。

编译器

我使用了英特尔® C++ Compilers 2013。

我使用的编译器主要元素如下:

export CFLAGS="-O2 -xCORE-AVX2 -funroll-loops -opt-prefetch -g -traceback"

这些主要元素在配置文件 «icc.bldconf» 可能会发生变化。

实验

我启动了 blackscholes 基准测试,结果如下:

Blackscholes benchmark

总结

因此,由于计算函数的更改,我们获得了性能提升。
借助 I/O 并行,使用本文之前指定的方法,我们可以获得显著的性能提升。

有关编译器优化的更完整信息,请参阅优化通知