在网络应用中使用英特尔® TBB:网络路由器模拟器

简介

英特尔® 线程构建模块被广泛应用于各种应用中。如果十分注重性能并使用多核平台,那么最好在 C++ 程序中添加 TBB。网络应用通常承受着较高的负载,这是因为它们要处理海量的流量,而且处理时间限制较高。本文介绍了如何在网络数据包处理软件中使用 TBB,以便提高效率并缩短处理时间。

我针对示例项目创建了一个简单的网络路由器模拟器。网络路由器是一种能够在局域网内路由和传输 IP(互联网协议)数据包的设备。它可以连接多台 PC,并使其访问互联网和内部网络。该设备具有多个内部网络端口和一个外部端口。

示例项目模拟网络路由器逻辑。它具有以下功能:

  • 来自文件的输入数据包 - 该应用只是一个模型,因此无需与网络接口进行物理连接。来自文件的读数模拟来自网络接口的真实读数。
  • NAT - 网络地址转换。路由器只有一个外部 IP 地址,但是数据包应传输到路由器后部的多台内部设备。NAT 支持从外到内或从内到外的端口与 IP 映射功能。
  • IP 路由 - 根据目的地 IP 向相应的路由器网卡发送数据包。
  • 带宽管理 - 某些流量是实时的,因此尽可能块地提供这些数据包是十分重要的(例如 IP 语音)。VoIP 支持以电话的形式进行通话,而延迟会降低通话质量。路由器能够对这些重要的数据包进行优先级划分,从而加快处理速度。

我创建了两个网络路由器版本:串口和并口。后者使用英特尔® 线程构建模块。我将介绍如何在该项目中使用 TBB,并提供程序并行处理的性能结果。

网络路由器实施

网络路由器模拟器从文件获得数据包并对其进行处理。数据包处理包括带宽管理、NAT 转换和 IP 路由。数据包由多个程序模块处理。这些处理模块依次得到执行,就像工厂的组装线一样。数据包处理应用大多由这些组件组成。输入文件是一个文本文件,每一行均代表一个 IP 数据包。单独的线程按照较大的数据块来读取数据包。

英特尔® TBB 包含 tbb::pipeline 类,可针对此类程序结构提供高级框架。它提供了可在每个阶段处理数据包的过滤器。每个数据包均通过管线,并由其过滤器分布处理。数据包依次得到处理 - 从第一个过滤器到第二个、第三个等。但是,对一个数据包进行处理不影响其它数据包,因此过滤器可以并行执行。


网络路由器方案



主函数:

#include <iostream> 
#include <sstream>
#include <fstream>
#include <vector>
#include <algorithm>
#include <ittnotify.h>
#include <tbb/pipeline.h>
#include <tbb/concurrent_hash_map.h>
#include <tbb/atomic.h>
#include <tbb/concurrent_queue.h>
#include <tbb/compat/thread>
// Redirects calls to "new" and "delete" to TBB thread safe allocators
#include <tbb/tbbmalloc_proxy.h>

using namespace tbb;
using namespace std;

class bandwidth_manager_t;
class network_adress_translator_t;
class ip_router_t;
class compute_t;
typedef vector<packet_trace_t> packet_chunk_t;

int chunk_size = 1600;
concurrent_queue<packet_chunk_t> chunk_queue;
atomic<bool> stop_flag;

int main(int argc, char* argv[])
{
	ip_addr_t external_ip;
	nic_t external_nic;	
	nat_table_t nat_table;	// NAT table   
	ip_config_t ip_config;	// Router network configuration 					
	int ntokens = 24;	
	
	get_args (argc, argv);	
    ifstream config_file (config_file_name);

    if (!config_file) {
        cerr << "Cannot open config file " << config_file_name << "n";
        exit (1);
    }		
	if (! initialize_router (external_ip, external_nic, 
                            ip_config, config_file)) exit (1);	
	
	thread input_thread(input_function);

	// packet processing objects
	bandwidth_manager_t bwm;	
	network_adress_translator_t nat(external_ip, external_nic, nat_table);
	ip_router_t ip_router(external_ip, external_nic, ip_config);		

__itt_resume();
	bool stop_pipeline = false;	
	
	parallel_pipeline(ntokens,		
		make_filter<void, packet_chunk_t*>(		// Input filter
			filter::parallel,
			[&](flow_control& fc)-> packet_chunk_t*{				
				
				if (stop_pipeline){					
					fc.stop();
				}				
				packet_chunk_t* packet_chunk = new packet_chunk_t(chunk_size);
					
				if(!chunk_queue.try_pop(*packet_chunk)){				
					if (stop_flag) {
						stop_pipeline = true;
					}
				}				
				return packet_chunk;
			}
		)&	// Bandwidth manager filter
		make_filter<packet_chunk_t*, packet_chunk_t*>(		
			filter::parallel,
			[&](packet_chunk_t* packet_chunk)-> packet_chunk_t*{								
				
				for(int i=0; i<packet_chunk->size(); i++){
					packet_trace_t packet;
					packet = (*packet_chunk)[i];				
					
					if (packet.nic == empty){
						break;
					}
					else{
						bwm.prioritize(packet);									
						compute_t compute;
						compute.work();						
					}										
				}
				std::sort(packet_chunk->begin(), packet_chunk->end(),
							packet_comparator);
				return packet_chunk;	
			}
		)&	// NAT filter
		make_filter<packet_chunk_t*, packet_chunk_t*>(	
			filter::parallel,
			[&](packet_chunk_t* packet_chunk)-> packet_chunk_t*{

				for(int i=0; i<packet_chunk->size(); i++){	
					packet_trace_t packet;

					packet = (*packet_chunk)[i];					
					if (packet.nic == empty)
						break;
					else{				
						nat.map(packet);
						compute_t compute;
						compute.work();	
					}
				}				
				return packet_chunk;
			}
		)&	// IP routing filter
		make_filter<packet_chunk_t*, packet_chunk_t*>(		
			filter::parallel,
			[&](packet_chunk_t* packet_chunk)-> packet_chunk_t*{			

				for(int i=0; i<packet_chunk->size(); i++){						
					packet_trace_t packet;
					packet = (*packet_chunk)[i];
					
					if (packet.nic == empty)
						break;
					else{				
						ip_router.route(packet);
						compute_t compute;
						compute.work();	
					}
				}				
				return packet_chunk;
			}
		)&	// Output filter
		make_filter<packet_chunk_t*, void>(	
			filter::parallel,
			[&](packet_chunk_t* packet_chunk){														
				
				for(int i=0; i<packet_chunk->size(); i++){						
					packet_trace_t packet;
					packet = (*packet_chunk)[i];	
					compute_t compute;
					compute.work();	

					if (packet.nic == empty)
						break;
				}	
				// No output is required , just drop packets
				delete packet_chunk; 
			}
		)
	);	
__itt_pause();

	cout << "nAll packets are processednn";		
	return 0;
}

 

第一部分是“准备” – 创建对象、读取命令行、打开文件和初始化。配置文件包含路由器接口信息。对象 bwm、nat 和 ip_router 是数据包处理对象。他们使用容器 nat_table 和 ip_config 来存储 NAT 和 IP 表。

网络路由器的核心组件是管线。该组件通过 tbb::parallel_pipeline() 函数实施,可将令牌数量和过滤器列表作为参数。通过管线的工作元素是类型 packet_chunk_t。参数 ntokens 控制并发处理的元素的最大数量。它的值为 24,这是因为该项目在一台 24 核设备上进行测试,更大的数值不会产生作用。

管线过滤器可执行部分工作,尤其是在该应用中进行数据包处理。过滤器可以是串行或者是并行的。该模式由面向所有过滤器的过滤器参数 filter::parallel 控制。这意味着任意过滤器都可以同时处理某些元素。

第一个过滤器从 chunk_queue 提取数据块,并将其传输至第二个过滤器。第二个过滤器在来自数据块的每个数据包上执行带宽管理操作。bwm 模块根据协议为数据包分配优先级。然后,系统根据优先级对数据块中的数据包进行排序。这有助于尽可能早地处理重要流量。随后的过滤器可以进行 NAT 映射和 IP 路由。最后一个过滤器是输出,但出于简化的目的,并未提供真实的输出。数据包被丢弃。

数据块足够大,可作为管线令牌使用。如果单个数据包通过管线,则线程之间的转换将会过多,付出的开销将会超过所取得的成效。

用于测量性能的英特尔® VTuneTM Amplifier XE使用函数 __itt_resume() 和 __itt_pause()。这些 API 函数将在相关区域的开头和结尾进行标记。

计算对象类型 compute_t 为 CPU 生成工作负载。它只执行额外的计算以模拟真实系统中的计算任务。该应用不会执行在真正网络设备中处理和路由数据包所需的整个任务。它只是对真实应用的框架进行建模,因此不会占用足够的 CPU 资源。方法 compute_t:: work()开始计算 "N Queens" 算法。

输入文件的打开和读取由单独的线程完成。使用 std::thread 类实现实例化,该类是即将发布的新 C++ 11 标准的一部分。

串行实施

要了解并行化的效果,可以创建一个串行版本。它具有类似的结构。唯一的区别在于,parallel_pipeline 被简单的 while 循环所取代。

网络路由器串行方案

While 循环(取代 parallel_pipeline):

__itt_resume();
	bool stop = false;

	while (!stop){
		packet_chunk_t packet_chunk(chunk_size);
		
		if(!chunk_queue.try_pop(packet_chunk)){				
			if (stop_flag) {
				stop = true;
			}
		}		
		
		for(int i=0; i < packet_chunk.size(); i++){
			packet_trace_t packet = packet_chunk[i];;			
			bwm.prioritize(packet);	
			compute_t compute;
			compute.work();									
		}
		std::sort(packet_chunk.begin(), packet_chunk.end(), packet_comparator);
		for(int i=0; i < packet_chunk.size(); i++){
			packet_trace_t packet = packet_chunk[i];				
			nat.map(packet);
			compute_t compute;
			compute.work();		
			ip_router.route(packet);				
			compute.work();							
			compute.work();								
		}
	}
__itt_pause();


共有四次 compute.work() 调用 - 与 TBB 版本中相同。这是占用 CPU 时间最多的函数,因此需要相同数量的调用。

数据结构

输入文件的格式如下:

eth3 104.44.44.10 10.230.30.03 4003 5003 ftp
eth3 104.44.44.10 10.230.30.03 4003 5003 rtp
eth0 134.77.77.30 104.44.44.10 2004 4003 sip
eth3 104.44.44.10 10.230.30.03 4003 5003 http

每行均代表一个数据包。它包括网络接口、来源、目标 IP 和端口、协议。数据包存储在 packet_trace_t 结构中:

typedef struct {
	nic_t nic;			// network interface where packet arrived
	ip_addr_t destIp;		// destination IP
	ip_addr_t srcIp;		// source IP
	port_t destPort;		// destination port
	port_t srcPort;		// source port 
	protocol_t protocol;	// protocol type (rtp, ftp, http, sip, etc)
	int priority;			// packet priority
} packet_trace_t;


NAT 表和 IP 配置表存储在 tbb::concurrent_hash_map 之中。数据块存储在 std::vector 中,数据块队列的类型为 tbb::concurrent_queue:
 

typedef concurrent_hash_map<port_t, address*, string_comparator> nat_table_t; 
typedef concurrent_hash_map<ip_addr_t, nic_t, string_comparator> ip_config_t; 
typedef vector<packet_trace_t> packet_chunk_t;
concurrent_queue<packet_chunk_t> chunk_queue;


输入文件的读取由执行 input_function 的单独线程完成。input_function 打开并读取文件。读取由传输到数据块队列的数据块来执行。TBB 容器可确保线程的安全,因此主线程可以同时从数据块队列读取,无需手动执行额外的同步化操作。输入线程函数:
 

void input_function(){	
    ifstream in_file (in_file_name);
    if (!in_file) {
        cerr << "Cannot open input file " << in_file_name << "n";
        exit (1);
    }
	stop_flag = false;	
	
	while(in_file.good()){			
		packet_chunk_t packet_chunk(chunk_size);
								
		for(int i=0; i<chunk_size; i++){
			packet_trace_t packet;
			in_file >> packet;					
			packet_chunk[i] = packet;			
		}
		chunk_queue.push(packet_chunk);			
	}
	stop_flag = true;
}

 

性能测量

该项目的目标是通过使用 TBB 实现出色的性能和可扩展性。使用下列设置进行测量:

处理器:4 路英特尔® 至强® X7460 处理器,2,66 Ghz,共 24 个物理核心
RAM: 16 GB
操作系统:Microsoft Windows Server® Enterprise 2008 SP2
工作负载:输入文件:113405 个数据包 (5,1 MB)
测量工具:Intel® VTuneTM Amplifier XE 2011
分析类型:采用默认设置的并发性

共执行两项测试:面向串行和并行版本。以下为这两项分析的总结。左侧为串行版本,右侧为 TBB 版本:
 

 

从上表可以看出,CPU 时间十分接近。这是系统所有核心的CPU 时间的总和。但所用时间相差较大。这是应用执行处理所用的时钟时间。在串行版本中,该值接近总 CPU 时间。TBB 版本中则减少 19 倍。因此应用的处理速度快 19 倍。

面向串行版本的 CPU 使用
 


面向 TBB 版本的 CPU 使用
 


 

TBB 版本中,平均使用的核心数量为 20.5,在大多数处理时间内,所有 24 个核心均得到利用。这表明该应用具备足够的扩展能力,能够使用多核系统上的几乎所有核心。

串行应用的由上而下视图显示,计算模块的几乎所有时间都用在了模拟真实的工作负载:
 



TBB 版本中的情况也十分相似,主要热点是相同的 compute_t::do_work 方法。但是,其大部分显示的是绿色,这说明处理器的利用率良好。此外,表中还列出了使用 TBB 结构所获得的更多功能:
 


 

结果显示,基于 TBB 的应用取得了出色的性能结果。请牢记下列条件:

1) 可使用 Amplifier XE API 函数 __itt_resume() 和 __itt_pause() 来限制测量区域。结果显示了 TBB 版本的 tbb::parallel_pipeline 性能,以及串行版本的 while 循环。总体应用的测量结果可能稍有不同。

2) 通过模拟任务来使用处理器。compute_t 类计算“N queens”任务的算法。实际处理情况有所不同。如果 CPU 没有足够的任务,文件输入将会花费相对更多的时间。因此,实际的应用可扩展性和性能结果可能更糟。

结论

该示例项目显示了在构建网络数据包处理应用中使用 TBB 的可能性,以及 tbb::管线的适用性。这些方法可应用于 IP 路由交换机、电信服务器(VoIP 电话、视频会议)、各种网关和协议等。很少加载的应用网络软件可从多线程能力获益。而且,借助英特尔® 线程构建模块,用户可以轻松、高效地在应用中管理并行性。

 
完整的项目源代码:
Para obtener información más completa sobre las optimizaciones del compilador, consulte nuestro Aviso de optimización.