Threads(线程)
libjingle 考虑到使用到此库的应用程序的性能,libjingle内部支持多线程。其内组件使用1或2个全局线程:
● signaling thread 被用作创建底层(基础)组件,
例如:Session Management,Control,XMPP Messaging组件。
● worker thread ( 有时称作channel thread)用来集中处理p2p组件中的对象提交过来的大量资源,例如:数据流。之所以这样用另外的线程单独处理,是为了避免数据流阻塞或被XMPP/用户界面组件阻塞。使用 worker thread的类包括ChannelManage,SocketMonitor,P2PTransportChannel 和 属于Port类的对象。
若起用worker thread,使之工作,在应用中必须创建一个Thread类对象,并把此对象当作SessionManager的构造函数的参数。(如果SessionManager类对象在创建时,没有传递给它Thread对象,则SessionManager类将在内部创建一个线程,当作worker thread)。CallClient::InitPhone示范了如何为底层组件(low-level components)创建一个worker thread方法。
另外、libjingle提供了一个基类SignalThread。扩展此类可以让一个扩展类对象存在于它自身代表的线程,此扩展类对象可以被实例化,启动,单独离开,结束时自释放。更多信息请查看signalthread.h/cc。
注意:尽管libjingle支持多线程,但是只有几个函数通过呼叫方线程的验证来支持线程安全,并且极少函数做了线程锁定。下面的片断示范了在函数中如何安全地呼叫线程(或线程安全地被呼叫):
// Check that being called from the channel (e.g., worker) thread.
ASSERT(talk_base::Thread::Current() == channel_thread_);
channel_thread_->Clear(this);
libjingle中用到的所有线程,signaling thread,worker thread,其它的一些线程,都是talk_base::Thread的对象(或子类的对象)。所有的Thread对象都被ThreadManager管理,当被请求时,ThreadManager会返回这些Thread对象。SessionManager被创建时通过调用ThreadManager::CurrentThread得到一个signal thread(当无worker thread 传递给SessionManager构造函数时,同时得到一个work thread)。XmppPump类把当前线程当作它的signal thread来用(XmppPump uses the current thread for its signaling thread)。所以,应用程序必须为signal thread创建一个Thread对象(或其子类对象),并在SessionManager对象创建之前或在XmppPump工作之前,把此对象放进ThreadManager的线程池里。(Signing In to a Server(登录服务器) 有示例)有两种方法创建一个Thread对象:
AutoThread 这种方式就是libjingle用Thread对象包装一个操作系统中的线程,并把它当作ThreadManager线程池里的当前线程(当然,Thread::CurrentThread()被调用时,此线程会被提取出来)。
Thread 这种方式将创建一个新线程并用Thread类包装,比较典型就是的创建worker thread。使此线程发生作用,应用程序必须新创建一个Thread对象,调用ThreadManager::Add()或ThreadManager::SetCurrent()把它丢进线程池里,并且调用Run()使之在阻塞状态下运行或调用Start()使之处于监听状态。
线程为对象间或对象内部的消息沟通提供了“管道”()。例如:SocketManager可以通过其它线程向自己发送销毁一个套接字的消息,或当链接候选被产生时向SessionManager发送消息。Thread继承自MessageQueue,所以Thread的对象具有了Send,Post,和一些同步或异步发送消息的函数。如果要使一个对象能够接收到MessageQueue送出的消息,那么此对象必须继承和实现MessageHandler。MessageHandler定义了一个OnMessage函数,此函数在MessageQueue送出消息时被调用,用来接收MessageQueue送出的消息。
你可以通过任何线程向继承自talk_base::MessageHandler的任何对象发送消息。尽管能够做到,如果你发出的消息是为了集中处理大量的数据,应用程序应该通过worker thread。调用SessionManager::worker_thread()可以得到worker thread的句柄。
调用Session::Manager::signaling_thread()可以得到 signaling thrread的句柄。
对象使用一个指定的线程有如下几种方式:
对象要求一个线程指针作输入参数,并储存这个指针。
对象在创建时取得当前线程(构造函数中调用ThreadManager::CurrentThread()取得),把取得的线程存进对象内部成员变量引用它,一般应用于获取特定的线程。(it can assume that the current thread when it is created (accessed byThreadManager::CurrentThread in its constructor) is a particular thread and cache a member pointer to it)
对象调用SessionManger::signal_thread() 或 SessionManager::worker_thread()获取线程。
以上三种方法,libjingle均有用到。
因为一个对象可以被任意线程使用,对象可能需要验证当前调用是来自哪个线程的方法。应用可以调用Thread::Current()得到当前线程的句柄,然后与对象内部保存线程的数据成员进行比较,此数据成员的值可以是从SessionManager中暴露在外面的线程,或是对象在创建时通过构造函数传进去的初始化值。
这是一个对象通过其它线程调用自身函数时而广范使用的范例:
// Note that worker_thread_ is not initialized until someone
// calls PseudoTcpChannel::Connect
// Also note that this method *is* thread-safe.
bool PseudoTcpChannel::Connect(const std::string& channel_name) {
ASSERT(signal_thread_->IsCurrent());
CritScope lock(&cs_);
if (channel_)
return false;
ASSERT(session_ != NULL);
worker_thread_ = session_->session_manager()->worker_thread();
...
}
void PseudoTcpChannel::SomeFunction(){
...
// Post a message to yourself over the worker thread.
worker_thread_->Post(this, MSG_PING); // <- Goes in here....
...
}
// Handle queued requests.
void PseudoTcpChannel::OnMessage(Message *pmsg) {
if (pmsg->message_id == MSG_SORT)
OnSort();
else if (pmsg->message_id == MSG_PING) // -> And comes out here!
// Check that we're in the worker thread before proceding.
ASSERT(worker_thread_->IsCurrent());
OnPing();
else if (pmsg->message_id == MSG_ALLOCATE)
OnAllocate();
else
assert(false);
}