loop_in_codes

低调做技术__欢迎移步我的独立博客 codemaro.com 微博 kevinlynx

建立异步操作组件:队列和线程

6.25.2008

Kevin Lynx

引言

在一个高效的系统中,我们经常会将一些费时的操作转换为异步操作。例如往数据库中写日志。如果数据库
配置在网络上,那么往数据库中插入一些日志信息将非常慢(相对于程序其他部分)。

如何转换为异步?

将类似于以上过程转换为异步操作,一个典型的做法是:建立一个单独的数据库日志线程,一个线程安全的
队列。要写日志时,只需要往队列里放入数据,数据库日志线程则从这个队列里取数据然后完成写操作。

大致的过程类似于:

typedef safe_list<std::string> SafeList;
SafeList gLogList; 

// database thread.read log string.
unsigned int __stdcall DBThread( void *p )
{
    
whiletrue )
    
{
        
// read gLogList and write log string into the database
    }
 

    _endthreadex( 
0 );
    
return 0;
}
 

// other threads. write log string.
void wrtie_log( const std::string &log )
{
    gLogList.push_back( log );
}
 


将他们包装起来

我们很有可能会在同一个系统中多次遇到类似需要转换为异步操作的地方,如果每一次都手动去创建一个队列和一
个线程,那将会多么乏味啊!懒惰的程序员喜欢重用各种代码。所以,我自己觉得很有必要将这一切封装起来。我
们只需要封装这个队列和创建线程的繁琐细节,让应用层全部专注于具体的逻辑处理:

template <typename _NodeType>
class async_operator
{
public:
    
/// the list node type.
    typedef _NodeType node_type;
    
/// the list.
    typedef multi_list<node_type, Mutex, Semaphore> list_type;
    
/// operator signature
    typedef functor<void, TYPE_LIST1( list_type& )> operator_type;
    
/// init function signature, called when the thread starts.
    typedef functor<void> init_type;
    
/// called before the thread exits.
    typedef functor<void> release_type; 

public:
    
///
    
/// start the thread and execute the operation.
    
/// @param op the callback function operate the list node.
    
/// 

    void execute( operator_type &op, init_type &init = init_type(), release_type &release = release_type() ); 

    
///
    
/// exit the thread.It will block until the thread exited.
    
///

    void exit(); 

    
///
    
/// get the list so that you can pust list nodes.
    
///

    list_type &list(); 

private:
    list_type _list;
    operator_type _operator;
    init_type _init;
    release_type _release;
    thread _thread;
}



我利用了已有的组件:线程安全的容器multi_list、包装任意执行体的functor、线程维护类thread。那么,现在,
应用层只需要定义队列节点类型,写应用相关的回调函数(任意可被functor包装的广义函数)。(见附件例子)

之所以为这个组件加上init和release,是因为有些东西(例如COM)需要在线程启动时初始化,而在线程快结束时释放,例如对于
使用COM的应用来说,就需要在线程初始化时CoInitialize,结束时CoUninitialize。

闲说下其他东西

在本文的附件代码里,你可以获取到functor、thread、multi_list这些东西,所以我有必要提一下。

关于functor,你可以参看<实现functor - 增强型的函数指针>,基本上可以看成增强版的C回调函数;至于multi_list,基本上
是一个container adapter (套用下STL的概念),使用条件变量参与线程同步,据说效率要比简单的互斥高点;至于thread,我需要
特别说下:

thread最为重要的就是为其附加了一个windows的消息队列(只要调用PeekMessage之类的函数该队列就存在),本意是可以让其他线
程传送数据到该线程,但是目前只用于线程退出,即其他线程可以在任何时候要求该线程安全地退出(该线程没有阻塞的情况下,
阻塞时获取不到消息)。我不知道这个安全退出策略是否真的有必要存在,但是我讨厌看到各种撇脚的退出方法(例如设置全局标志
变量,增加额外的--没封装前---event对象之类)。

结束

不知道其他人是如何做这种异步转换操作的,在这里我只是起个抛砖引玉的作用,欢迎大家提出意见。

 

例子下载

posted on 2008-06-25 15:47 Kevin Lynx 阅读(5035) 评论(29)  编辑 收藏 引用 所属分类: 通用编程

评论

# re: 建立异步操作组件:队列和线程 2008-06-25 16:13 true

既然允许阻塞时不能退出,为什么不能设全局变量呢?他更及时,不用参与排队,当前日志写完后,就可以退出了,当然你要考虑日志是否要保存。  回复  更多评论   

# re: 建立异步操作组件:队列和线程[未登录] 2008-06-25 16:22 Kevin Lynx

@true
貌似你误解我意思了;)

’阻塞时不能退出‘:
例如线程里 WaitFor...INFINITE了,那么它就无法处理消息,也无法判断退出标志变量,代码卡在这里了。

’他更及时,不用参与排队‘:
这个跟上面不是说的同一个问题,队列是用于实际业务处理,例如写日志,上面那个说的是线程。

  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-25 16:39 true

一般这么搞,日志线程退出的及时:WaitFor。。。设个timeout,返回后,一是检查是否超时,二检查全局变量。除非在代码结束阶段,否则还是少用infinite。这种用法,用个模式的术语就是Active Object(感觉这个概念提出的很强,思路清晰)  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-25 18:49 饭中淹

撇脚的退出方法,其实没什么,主要是个人喜恶。
我喜欢用标记,可以在任何平台下使用都没问题。


另外,我不明白为什么用了condition还要用GUARD
那样不是死锁了么?
BLOCK等待condition的时候,container被GUARD锁死在一个线程内,其他线程调用PUSH的时候都会被挡在GUARD那里。


  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-25 21:42 Kevin Lynx

@饭中淹
不会的,condition和guard使用的都是同一个mutex。guard用于一般的同步,condition用于队列元素为0时的临界条件。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-25 23:08 影视剧

“如果数据库配置在网络上,那么往数据库中插入一些日志信息将非常慢”,没错,有时还会发生网络错。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-26 01:52 饭中淹

@Kevin Lynx
是啊,当元素为0,又正好BLOCK=TRUE的时候,就会在POP里面死循环了。
因为POP里的GUARD锁住了MUTEX,PUSH的GUARD就会等在GUARD的MUTEX那里。这样的话,POP就永远等不到CONDITION的SIGNAL。  回复  更多评论   

# re: 建立异步操作组件:队列和线程[未登录] 2008-06-26 09:08 raof01

正准备写一篇类似的东西。被你抢先了……
还有没有必要写呢?  回复  更多评论   

# re: 建立异步操作组件:队列和线程[未登录] 2008-06-26 09:14 Kevin Lynx

@饭中淹
前面我说了,condition和guard使用的是同一个mutex,在condition的wait里,会先_external_mutex.release();,然后push的时候,就不会阻塞在guard那里,于是condition.signal(元素为0时,看下代码),然后pop里的condition.wait就过了。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-26 10:30 空明流转

最近准备把我的软件渲染器搞成异步的,思路也差不多和LZ一样。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-26 12:39 饭中淹

@Kevin Lynx
这样会造成很多误解啊。。。。
WAIT完成,再去LOCK一下MUTEX么?  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-06-30 19:40 Wealth

@true
同意这种做法  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-07-02 09:27 yt

我倒是有个想法,可以考虑使用一个"特殊的日志消息",根据消息内容得知是退出消息  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-07-02 19:45 ojoj

例如线程里 WaitFor...INFINITE了,那么它就无法处理消息,也无法判断退出标志变量,代码卡在这里了。

我是这样判断,设置个计数器,可以是全局/局部,在线程执行体里不断自加,主进程隔一段时间去比较这个值是否增大。如果没有,则判断线程已经死掉,这样就可以避免许多其他的异常而没有捕捉到的情况。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-02 15:22 francis

原文如下:
void push_back( const value_type &data )
{
guard<mutex_type> g( _mutex );

if( _container.size() == 0 )
{
_condition.signal();
}

_container.push_back( data );
}
不知道此处这样写何解?
我觉得是不是应该写成这样:
void push_back( const value_type &data )
{
guard<mutex_type> g( _mutex );

_container.push_back( data );
_condition.signal();
}
?  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-03 10:40 Kevin Lynx

@francis
你可以查找‘条件变量’(condition variable)的相关资料。
signal只在队列尺寸为0时的push操作中调用,因为在pop中会在队列尺寸为0时wait.  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 10:17 francis

@Kevin Lynx “signal只在队列尺寸为0时的push操作中调用”没必要这样吧?只要push之后signal,信号量加1, pop时wait信号量减1,为0时wait就可以了  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 12:55 Kevin Lynx

@francis
关键在于,signal和wait会对外部mutex做操作。参见kl_condition.h相关代码。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 15:32 francis

@Kevin Lynx signal并没有对mutex操作,
void push_back( const value_type &data )
{
guard<mutex_type> g( _mutex );

_container.push_back( data );
_condition.signal();
}
这样写是不是好些?
  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 15:33 francis

if( _container.size() == 0 )
{
_condition.signal();
}
这么写没有必要  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 16:43 Kevin Lynx

@francis
我们看一次常规的同步操作:
push_front()
{
guard<mutex_type> g(_mutex );
...
}

pop_front()
{
guadr<mutex_type> g(_mutex );
...
}

那么,在消费者线程里可能会不断地去检查队列大小是否为0(size操作同样会涉及到同步),
这浪费了CPU资源。而如果:
pop_front()
{
guadr<mutex_type> g(_mutex );
while( size == 0 ) _cond.wait();
}
当大小为0时,wait操作将阻塞此线程,从而让出了CPU资源(wait会阻塞)。

另一方面,如果每次push操作都进行条件变量的signal,这个所谓的原语操作开销有多大?
查看condition::signal/wait代码,其内部还存在一个_wait_count的同步。另外,如果
每次push都signal的话,那么pop操作也需要进行wait,这样以来它其实已经不是条件变量,
而是信号量了。

我们所要的效果,就是在队列元素为0时,进行pop操作的时候让其阻塞而已。


  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 19:53 francis

@Kevin Lynx 注意看,想想当有线程在wait,同时container不为空时的情况吧,明明container里有数据,但是wait的线程却取不出来。。。 这样看来貌似是存在逻辑错误的。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-04 22:28 Kevin Lynx

@francis
while( _container.size() == 0 ) _condition.wait();
仅当为空的情况下才wait的。代码在逻辑上不存在大问题,因为这个基础部件已经被用于实际项目(用于保存验证服务器端的验证帐号)。
  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-05 01:09 francis

@Kevin Lynx
while( _container.size() == 0 ) _condition.wait();
并不能保证“有线程在wait,同时container不为空”,
是可以模拟出现的。这样的话,逻辑就出问题了。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-05 09:00 Kevin Lynx

@francis
详述下这种会出现问题的情况。如果可以模拟出现的话,麻烦基于multi_list这个类制造这种情况。  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-05 20:54 francis

:-) 不是太好找,应该是没什么问题的。
下面这个情况有太多假设,:-)

有2线程在等待,
thread1.pop ->wait
thread2.pop ->wait

然后下面运行
thread3.push
thread3.push
thread3.push
。。。

thread3 push之后会激活一个线程,假设thread1被激活,假设这个时候线程没有切换,thread3继续push之后才切换thread1, 这个时候, container被push了两次,thread1这时取出一个数据处理,处理完之后切换线程,thread2还是继续等待,thread3继续push,这个时候假设又push了2个,
thread1由于没有wait可以不停的取数据,但是thread2就惨了,不是没有数据可处理,而是它只能wait了。
  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-06 09:07 Kevin Lynx

@francis
我读了几遍你的这次回复,希望我没有误解你的意思:
thread1被激活,取数据,就可能导致container的size为0,一旦size为0,thread3继续push的时候就可能会激活thread2。如果OS一直没有调度到thread2,那么,container就可能经历过多次size为0,size不为0,也就是thread1和thread3发生多次数据交互。thread2看起来是惨了,因为container曾经有数据,但是thread2却没取到?为什么呢?thread2根本没被OS调度到,从没有获取到CPU控制权,它又有什么理由不wait?

归根结底,你说的这个问题,只是因为thread2没有被调度到而已。
  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-06 12:55 francis

我说的就是会出现“container不空,但是有wait的线程”这种情况,这里就是thread2在wait.如果有更多的线程做push, thread3本身也在不停的push,这个时候,处理的线程只有thread1,thread2只能wait, 因为container一直不空,永远没法发出激活信息。当然这样不会出现什么错误,只是可能会和实际想的功能不符合,本意是想两个线程处理container里的内容,但是实际只有thread1在处理,thread2一直wait.  回复  更多评论   

# re: 建立异步操作组件:队列和线程 2008-08-06 13:11 Kevin Lynx

@francis
thread2没有获得CPU权!thread2没有被调度!
这里的用法完全是条件变量的标准用法,如果你硬是要从理论上认为它有问题,那你可以去查条件变量相关资料。这里的结构和条件变量通用用法一样。你甚至可以找本操作系统书,在上面找到条件变量的使用结构。你想推翻这一切?建议你大量查阅条件变量相关资料。如果你还认为有问题,你应该去对发明条件变量的某个可能已经死掉的牛人说:你这个条件变量的结构有问题。  回复  更多评论   


只有注册用户登录后才能发表评论。
网站导航: 博客园   IT新闻   BlogJava   博问   Chat2DB   管理