在上一篇文章(TBB:pipeline,软件流水线的威力)最后提出了几个问题,我们逐个来看看TBB::Pipeline是怎么解决的。
为什么Pipeline可以保证数据执行的顺序?既然TBB归根到底是通过多线程执行任务,为什么不会在读入先后两个字符串后,后读入的字符串先被下一个task处理?Pipeline里是不是有一个类似于FIFO 先进先出队列之类的东西?
之前曾经质疑过Pipeline的性能,甚至想自己用MultiThreading来模拟一个流水线,但很快就发现其中实现的难点。数据执行的顺序性就是其中之一。
假设以一个thread代表流水线上的一个节点,如果某节点是并发执行的,那么就需要2个以上的thread(A和B),上一节点处理完毕的顺序数据到底是先送给A还是B呢?处理完毕后后又该先将A还是B中的数据送到下一节点呢?即使可以人为的指定A和B之间的优先规则,由于thread本身被调度的不确定性,实际运行中还是有很多不可预知的困难。
流水线的一个显著特性就是保证每个数据均以相同的顺序流过每个节点。因此,TBB::Pipeline中的一个首要任务就是在节点被并发执行的同时,仍能够保证所处理的数据的次序而不需额外的处理代码。此外,在要求串行处理的节点,要保证即使排在前面的数据先被处理,即使排在后面的数据先到达。
Pipeline的中心思想就是以token来控制数据的处理顺序和流水线的深度。Pipeline::run函数中指定了token的最大值:
void pipeline::run( size_t max_number_of_live_tokens ) {}
每一个数据在进入Pipeline的时候都会按照先后顺序依次分配一个token,如line1处:
task* stage_task::execute() {
__TBB_ASSERT( !my_at_start || !my_object, NULL );
if( my_at_start ) {
if( my_filter->is_serial() ) {
if( (my_object = (*my_filter)(my_object)) ) {
my_token = my_pipeline.token_counter++; //line1
my_token_ready = true;
ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
if( --my_pipeline.input_tokens>0 )
spawn( *new( allocate_additional_child_of(*my_pipeline.end_counter) ) stage_task( my_pipeline ) );
} else {
my_pipeline.end_of_input = true; //line2
return NULL;
}
...
}
如果当前流水线中的token全部用完了,那么暂时就不会处理新的数据,直到已进入Pipeline的数据被处理完毕有空闲的token(line2处)
仍然以TBB中的例子text_filter为例考虑,流水线为 MyInputFilter->MyTransformFilter->MyOutputFiler,MyInputFilter从磁盘上读取数据,MyTransformFilter转换成大写字母,MyOutputFilter将转换好的数据写入磁盘。因此,MyInputFilter节点和MyOutputFiler节点必须是串行执行,而MyTransformFilter可以并发执行。对于MyInputFilter读入的一串顺序数据,token依次为1->2->3,如何保证经过转换后数据也是以相同的顺序写入磁盘?
秘密在于TBB中的一个类tbb::internal::ordered_buffer,MyOutputFilter用它来保证按照token的顺序执行其队列中的数据,而不管数据进入队列的先后次序,换句话说,即使排在后面的数据token 2先被某个MyTransformFilter节点处理完毕送往MyOutputFilter,只要数据token 1没到达没被MyOutputFilter执行,数据2就不会在数据1之前先写入磁盘。每一个需要被串行处理的节点,都会有一个ordered_buffer类型的成员变量。
先看看ordered_buffer的定义:
//! A buffer of ordered items.
/** Each item is a task, inserted into a position in the buffer corrsponding to a Token. */
class ordered_buffer {
typedef Token size_type;
//! Array of deferred tasks that cannot yet start executing.
/** Element is NULL if unused. */
task** array; //数组,以顺序方式保存所有待处理的task
//! Size of array
/** Always 0 or a power of 2 */
size_type array_size; //数组的尺寸
//! Lowest token that can start executing.
/** All prior Token have already been seen. */
Token low_token; //当前正在处理的token,
//! Serializes updates.
spin_mutex array_mutex; //用于保护array并发访问的锁
};
仍然是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer ) {
// The next filter must execute tokens in order.
stage_task& clone = *new( allocate_continuation() ) stage_task( my_pipeline, my_filter );
clone.my_token = my_token; //token号
clone.my_token_ready = my_token_ready;
clone.my_object = my_object; //数据
next = input_buffer->put_token(clone);//将task放入队列
} else {
/* A semi-hackish way to reexecute the same task object immediately without spawning.
recycle_as_continuation marks the task for future execution,
and then 'this' pointer is returned to bypass spawning. */
recycle_as_continuation();
next = this;
}
} else {
...
}
对于需要被串行处理的节点,使用ordered_buffer的put_token函数将相关的数据和task引用放入队列。put_token的实现是关键:
template<typename StageTask>
task* put_token( StageTask& putter ) {
task* result = &putter;
{
spin_mutex::scoped_lock lock( array_mutex );
Token token = putter.next_token_number();
if( token!=low_token ) {
// Trying to put token that is beyond low_token.
// Need to wait until low_token catches up before dispatching.
result = NULL;
__TBB_ASSERT( (tokendiff_t)(token-low_token)>0, NULL );
if( token-low_token>=array_size )
grow( token-low_token+1 );
ITT_NOTIFY( sync_releasing, this );
array[token&array_size-1] = &putter;
}
}
return result;
}
这个函数的实质是,首先取得下一个要处理的token,然后把待执行的task放到ordered_buffer的任务队列中的"合适位置",而low_token指向当前需要处理的token编号。
例如low_token=0,当前需要处理0号token,下一个token为1,因此task保存在array[1]处并处于阻塞状态,待0号token处理完毕后,low_token增加1,再从array数组中取出1号token对应的task进行处理。
Pipeline中是这样通知串行节点以处理好一条数据的:
还是在task* stage_task::execute() {
...
if( ordered_buffer* input_buffer = my_filter->input_buffer )
input_buffer->note_done(my_token,*this);
...
}
看看note_done的实现会有一种大彻大悟的感觉!如果刚完成的token就是次序最优先的token(low_token),那取出下一个要执行的task,以spawn的方式让TBB的task scheduler来调度:
//! Note that processing of a token is finished.
/** Fires up processing of the next token, if processing was deferred. */
void note_done( Token token, task& spawner ) {
task* wakee=NULL;
{
spin_mutex::scoped_lock lock( array_mutex );
if( token==low_token ) {
// Wake the next task
task*& item = array[++low_token & array_size-1];
ITT_NOTIFY( sync_acquired, this );
wakee = item;
item = NULL;
}
}
if( wakee ) {
spawner.spawn(*wakee);
}
}
ordered_buffer是一个非常有趣的实现,相比于常见的用FIFO queue来实现线程间的数据传递,ordered_buffer可谓精巧。我们可以好好利用ordered_buffer的原理来进一步改进我们的代码。