返回信息流第四节:事件对象(Event Objects)
本节介绍如下内容:
1. 同步与异步
2. 为何需要同步
3. 什么是事件对象(Event Object)
4. 事件对象类接口定义
5. 示例程序
6. 事件对象类的UNIX和Windows实现
同步(Synchronization)与异步(Asynchronization)
首先对同步与异步的概念做一个简单的说明。
当程序1调用程序2时,程序1停下不动,直到程序2完成回到程序1来,程序1才继续下去,
这就是所谓的同步。如果程序1调用程序2后,径自继续自己的下一个动作,那么两者之间
就是所谓的异步。
举个例子,在WIN32 API中,SendMessage()就是同步行为,而PostMessage()就是异步行
为。在Windows系统中,PostMessage()是把消息放到对方的消息队列中,然后回到原调用
点继续执行,所以这就是异步(asynchronous)行为。而SendMessage()根本就像是“直
接调用窗口的窗口函数”,直到该窗口函数结束,然后才回到原调用点,所以它是同步(
synchronous)行为。
为何需要同步
撰写多线程程序的一个最具挑战性的问题就是:如何让一个线程和另一个线程合作。除非
你让他们同心协力,否则必然会出现如第三节所说的竞争条件(race condition)和数据
被破坏(data corruption)的情况。
当多个线程共享同一内存区域的时候,我们需要确保每一个线程所看到的数据的一致性。
假如对于每一个线程所使用的变量来说,其它任何线程都不会读取或使用该变量,那么根
本不存在数据一致性的问题。同样地,对于一个有着只读属性的变量来说,多个线程同时
读取它的值的话,也不会有数据一致性的问题存在。然而,当一个线程可以修改一个变量
,同时其它线程也能够读取或修改该变量的话,我们就需要同步这些线程,以确保每一个
线程在访问该变量的内存内容时所用到的值是有效的。
举个例子,假设有一块未初始化的内存块和两个线程,一个读线程,一个写线程。我们应
该保证读线程在读取该内存块时,它已经被写线程初始化好了,否则读线程只能读到一块
未初始化完成的无效数据。这就需要用到线程的同步机制(synchronous mechanism)。
线程间的协调工作是由同步机制来完成的。同步机制相当于线程之间的红绿灯。程序员可
以设计让一组线程使用同一个红绿灯系统。这个红绿灯系统负责给某个线程绿灯而给其他
线程红灯。这一组红绿灯系统必须确保每一个线程都有机会获得绿灯。
有好多种同步机制可以运用。使用哪一种完全视欲解决的问题而定。这些同步机制常常以
各种方式组合在一起,以产生出更精密的机制。
什么是事件对象(Event Object)
事件对象(Event Object)是一种最具弹性的同步机制,它的唯一目的就是成为激发(
Signaled)状态或未激发(Unsignaled)状态。这两种状态完全由程序控制。
我们通过上面介绍的读写线程的例子来说明事件对象的激发状态和未激发状态的含义。读
线程和写线程拥有同一个事件对象。该事件对象的初始状态为非激发状态。当读线程需要
读共享的那块内存时,它需要判断该事件对象的状态。如果该事件对象处于非激发状态,
则读线程等待,直到该事件对象处于激发状态为止。写线程会在那块共享的内存被初始化
好之后将该事件对象的状态设为激发状态。这时读线程得知了该事件对象的状态已经由非
激发状态变为激发状态,于是它开始读取那块共享的内存,并执行后续的操作。
事件对象之所以有大用途,正是因为它们的状态完全在程序员的掌控之下。因此,程序员
可以精确的告诉一个事件对象做什么事,以及什么时候去做。
事件对象可以分为自动重置的事件对象(Automatic-Reset Event Object)和手动重置的
事件对象(Manual-Reset Event Object)。自动重置的事件对象会在事件对象变成激发
状态(因而唤醒一个线程)之后,自动重置为非激发状态。而手动重置的事件对象,不会
自动重置,必须靠程序操作才能将激发状态的事件对象重置为非激发状态。
事件对象所能完成的一切功能都可以通过互斥来完成。下面我们通过比较使用事件对象来
实现读写线程的例子和使用互斥来实现读写线程的例子,以说明事件对象的作用和它存在
的必要性。
例一:使用事件对象来实现读写线程
void threadRead(事件对象类型 *事件对象)
{
阻塞事件对象;
读取共享内存的数据;
}
void threadWrite(事件对象类型 *事件对象)
{
将适当的数据写入共享内存;
激发事件对象;
}
例二:使用互斥来实现读写线程
bool globalIsWritten = false;
void threadRead(通行证类型 *通行证)
{
获取通行证;
while (!globalIsWritten)
{
归还通行证;
sleep(sometime);
获取通行证;
}
归还通行证;
读取共享内存的数据;
}
void threadWrite(通行证类型 *通行证)
{
将适当的数据写入共享内存;
获取通行证;
globalIsWritten = true;
归还通行证;
}
很明显,使用事件对象来实现读写线程的代码要比使用互斥来实现读写线程的代码优雅许
多。使用事件对象来实现读写线程的代码显得更加干净整洁,而且可读性更高。使用互斥
来实现读写线程时,在读线程中,需要轮询地互斥访问读写线程间的共享变量
globalIsWritten,因此其效率一定不如使用事件对象来实现读写线程的效率高。我将后
面的“手动重置的事件对象”的示例程序改为完全使用互斥来实现后,发现其运行时间是
使用事件对象来实现的1.21倍。这个测试结果和我们的预期相一致。
因此,对于类似于读写线程这样的例子,事件对象相对于互斥提供了更加优雅和高效的解
决方案。
事件对象类接口定义
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <windows.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
// 依赖于具体实现,后面再说。
};
#endif
其中,
Event::Event(bool bManualUnsignal, bool bSignaled),事件对象类的构造函数。
bManualUnsignal用于指定事件对象的类型。如果其值为true,则该事件对象是手动重置
的事件对象;如果其值为false,则该事件对象是自动重置的事件对象。bSignaled用于指
定事件对象的初始状态。如果其值为true,则该事件对象的初始状态为激发状态;如果其
值为false,则该事件对象的初始状态为非激发状态。
Event::~Event(),事件对象类的析构函数。用于摧毁事件对象。
Event::block(),根据事件对象的状态,对拥有该事件对象的线程进行控制。如果事件对
象处于非激发状态,则拥有该事件对象的线程开始等待,直到该事件对象的状态变为激发
状态。如果事件对象处于激发状态或者当事件对象的状态由非激发状态变为激发状态的时
候,首先判断该事件对象是那种类型的,如果该事件对象是自动重置的,那么需要将该事
件对象的状态设为非激发状态,然后唤醒等待该事件对象的线程。
Event::signal(),将事件对象的状态设为激发状态。如果事件对象是手动重置的事件对
象,那么该事件对象会一直保持激发状态,直到Event::unsignal()被调用,该事件对象
才会由激发状态变为非激发状态。在手动设置的事件对象保持激发状态的时候,所有等待
该事件对象的线程都将被唤醒。如果事件对象是自动重置的事件对象,那么该事件对象会
一直保持激发状态,直到一个等待该事件对象的线程被唤醒,这时该事件对象会由激发状
态变为非激发状态(由Event::block()来完成)。
Event::unsignal(),将事件对象的状态设为非激发状态。该方法主要用于手动重置的事
件对象,它必须显式地调用该方法以使得自己的状态变为非激发状态。而对于自动重置的
事件对象来说,当一个等待线程被唤醒时,它会自动地将自己的状态由激发状态变为非激
发状态。
在Windows操作系统中,还有一种对事件对象的操作,叫做PulseEvent()。在我们的事件
对象模型中并没有引入该接口,因为PulseEvent()是一个不稳定的操作。Windows只是为
了向后兼容才保留了PulseEvent()。
下面对PulseEvent()函数做一个简单的介绍,并且说明为什么该操作不稳定。
如果一个事件对象是手动重置的,那么对该事件对象进行PulseEvent()操作后,该事件对
象会被设为激发状态,所有的等待该事件对象的线程都会被唤醒,之后该事件对象恢复为
非激发状态。如果一个事件对象是自动重置的,那么对该事件对象进行PulseEvent()操作
后,该事件对象会被设为激发状态,一个等待该事件对象的线程会被唤醒,之后该事件对
象恢复为非激发状态。
注意,如果没有任何线程在等待事件对象(不管是手动重置的还是自动重置的),或者没
有任何线程可以立即被唤醒的话,对该事件对象进行PulseEvent()操作后,唯一的结果是
该事件对象的状态被设置为非激发状态。在这种情况下,这个事件对象会被遗失。这时,
可能会引起死锁。
举个例子,假设一个程序由两个线程(线程A和线程B)组成。线程A累加一个计数器,之
后调用Event::block()等待一个事件对象。如果在这两个操作之间发生了上下文切换(
context switch),线程B开始执行,它检查计数器内容然后对着同一个事件对象进行
PulseEvent()操作。这时候这个要求苏醒的请求会被遗失掉。而线程A会因为它等待的事
件对象永远不会被设置为激发状态而永远等待下去,程序进入死锁状态。这时,线程A被
称作饥饿线程。
因此,PulseEvent()是一个不稳定的操作,在我们的事件对象模型中将不包括该操作。
示例程序
自动重置的事件对象
文件common.h
#ifndef __COMMON_H__
#define __COMMON_H__
struct Param
{
long threadID;
int *count;
};
const int TCOUNT = 10;
const int COUNT_LIMIT = 12;
#endif
文件watchcount.h
#ifndef __WATCH_COUNT_H__
#define __WATCH_COUNT_H__
#include "thread.h"
class Event;
class Mutex;
class WatchCount : public Thread
{
public:
WatchCount(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件watchcount.cpp
#include "watchcount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
WatchCount::WatchCount(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* WatchCount::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
int *count = prm->count;
mutex.acquire();
cout << "Starting WatchCount: thread "
<< id
<< "."
<< endl;
cout << "WatchCount: thread "
<< id
<< " going into wait..."
<< endl;
mutex.release();
event.block();
mutex.acquire();
cout << "WatchCount: thread "
<< id
<< " Event signaled."
<< endl;
*count += 125;
cout << "WatchCount: thread "
<< id
<< " count now = "
<< *count
<< "."
<< endl;
mutex.release();
return NULL;
}
文件inccount.h
#ifndef __INC_COUNT_H__
#define __INC_COUNT_H__
#include "thread.h"
class Event;
class Mutex;
class IncCount : public Thread
{
public:
IncCount(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件inccount.cpp
#include "inccount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
IncCount::IncCount(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* IncCount::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
int *count = prm->count;
for (int i = 0; i < TCOUNT; ++i)
{
mutex.acquire();
++(*count);
/*
* Check the value of count and signal waiting thread when condition
is
* reached.
*/
if (*count == COUNT_LIMIT)
{
cout << "IntCount: thread "
<< id
<< ", count = "
<< *count
<< " Threshold reached. ";
event.signal();
cout << "Just sent signal."
<< endl;
}
cout << "IncCount: thread "
<< id
<< ", count = "
<< *count
<< ", unlocking mutex."
<< endl;
mutex.release();
/* Do some work so threads can alternate on mutex lock */
sleep(1000);
}
return NULL;
}
文件mainautounsignal.cpp
#include "inccount.h"
#include "watchcount.h"
#include "common.h"
#include "mutex.h"
#include "event.h"
#include <iostream>
using std::cout;
using std::endl;
int main(int argc, char* argv[])
{
Event event(false, false);
Mutex mutex;
int count = 0;
Param prm1 = {1, &count};
Param prm2 = {2, &count};
Param prm3 = {3, &count};
WatchCount wc(event, mutex);
IncCount ic1(event, mutex);
IncCount ic2(event, mutex);
wc.start(&prm1);
ic1.start(&prm2);
ic2.start(&prm3);
/* Wait for all thread to complete */
wc.wait();
ic1.wait();
ic2.wait();
cout << "Main(): Waited on 3 thread. Final value of count = "
<< count
<< ". Done."
<< endl;
return 0;
}
在此示例程序中,主线程创造了三个线程。其中,两个线程(IncCount)对一个“count
”变量执行递增操作,第三个线程(WatchCount)观察那个“count”变量的值。当“
count”变量达到一个预定义的值(COUNT_LIMIT)时,等待线程(WatchCount)被两个递
增线程(IncCount)中的一个唤醒。等待线程(WatchCount)被唤醒后会立即修改“
count”变量的值。两个递增线程(IncCount)会一直执行,直到达到TCOUNT为止。最后
,主线程会打印出“count”变量的最终值。
手动重置的事件对象
文件common.h
#ifndef __COMMON_H__
#define __COMMON_H__
#include <string>
using std::string;
struct Param
{
long threadID;
string *data;
};
#endif
文件readfrombuffer.h
#ifndef __READ_FROM_BUFFER_H__
#define __READ_FROM_BUFFER_H__
#include "thread.h"
class Event;
class Mutex;
class ReadFromBuffer : public Thread
{
public:
ReadFromBuffer(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件readfrombuffer.cpp
#include "readfrombuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
ReadFromBuffer::ReadFromBuffer(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* ReadFromBuffer::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
string *data = prm->data;
mutex.acquire();
cout << "ReadFromBuffer: thread "
<< id
<< " waiting for event signaled..."
<< endl;
mutex.release();
event.block();
mutex.acquire();
cout << "ReadFromBuffer: thread "
<< id
<< " reading from buffer ("
<< *data
<< ")"
<< endl;
mutex.release();
return NULL;
}
文件writetobuffer.h
#ifndef __WRITE_TO_BUFFER__
#define __WRITE_TO_BUFFER__
#include "thread.h"
class Event;
class Mutex;
class WriteToBuffer : public Thread
{
public:
WriteToBuffer(Event& e, Mutex& m);
protected:
void* run(void *param);
private:
Event& event;
Mutex& mutex;
};
#endif
文件writetobuffer.cpp
#include "writetobuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
WriteToBuffer::WriteToBuffer(Event& e, Mutex& m) : event(e), mutex(m)
{
}
void* WriteToBuffer::run(void *param)
{
Param *prm = static_cast<Param *>(param);
long id = prm->threadID;
string *data = prm->data;
*data = "Hello World!";
mutex.acquire();
cout << "WriteToBuffer: thread "
<< id
<< " writing to the shared buffer..."
<< endl;
mutex.release();
event.signal();
return NULL;
}
文件mainmanualunsignal.cpp
#include "writetobuffer.h"
#include "readfrombuffer.h"
#include "common.h"
#include "event.h"
#include "mutex.h"
#include <iostream>
using std::cout;
using std::endl;
int main(int argc, char* argv[])
{
Event event(true, false);
Mutex mutex;
string data;
Param prm1 = {1, &data};
Param prm2 = {2, &data};
Param prm3 = {3, &data};
Param prm4 = {4, &data};
Param prm5 = {5, &data};
ReadFromBuffer read1(event, mutex);
ReadFromBuffer read2(event, mutex);
ReadFromBuffer read3(event, mutex);
ReadFromBuffer read4(event, mutex);
WriteToBuffer write(event, mutex);
read1.start(&prm1);
read2.start(&prm2);
read3.start(&prm3);
read4.start(&prm4);
write.start(&prm5);
mutex.acquire();
cout << "Main thread waiting for threads to exit..."
<< endl;
mutex.release();
read1.wait();
read2.wait();
read3.wait();
read4.wait();
write.wait();
cout << "All threads ended, cleaning up for application exit..."
<< endl;
return 0;
}
在此示例程序中,主线程创造了五个线程。其中,四个线程(ReadFromBuffer)读取“
data”变量的内容,第五个线程(WriteToBuffer)初始化“data”变量。四个读线程(
ReadFromBuffer)会在写线程(WriteToBuffer)完成对“data”变量的初始化之前一直
保持等待状态。当写线程(WriteToBuffer)将“data”变量初始化好之后,四个读线程
(ReadFromBuffer)才会被一一唤醒。最后,主线程会在这四个读线程(ReadFromBuffer
)和一个写线程(WriteToBuffer)都执行完成后退出,从而结束整个程序。
事件对象类的UNIX和Windows实现
UNIX实现
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <pthread.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
const bool bManUnsig;
pthread_cond_t cv;
pthread_mutex_t mutex;
bool bSig;
};
#endif
文件event.cpp
#include "event.h"
Event::Event(bool bManualUnsignal, bool bSignaled) : bManUnsig(bManualUnsignal
), bSig(bSignaled)
{
pthread_mutex_init(&mutex, NULL);
pthread_cond_init(&cv, NULL);
}
Event::~Event()
{
pthread_mutex_destroy(&mutex);
pthread_cond_destroy(&cv);
}
bool Event::block()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (bSig)
{
if (!bManUnsig)
{
bSig = false;
}
}
else
{
pthread_cond_wait(&cv, &mutex);
if (!bManUnsig)
{
bSig = false;
}
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
bool Event::signal()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (!bSig)
{
if (bManUnsig)
{
ret += pthread_cond_broadcast(&cv);
}
else
{
ret += pthread_cond_signal(&cv);
}
bSig = true;
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
bool Event::unsignal()
{
int ret = 0;
ret += pthread_mutex_lock(&mutex);
if (bSig)
{
bSig = false;
}
ret += pthread_mutex_unlock(&mutex);
return ret == 0;
}
Windows实现
文件event.h
#ifndef __EVENT_H__
#define __EVENT_H__
#include <windows.h>
class Event
{
public:
Event(bool bManualUnsignal, bool bSignaled);
virtual ~Event();
virtual bool block();
virtual bool signal();
virtual bool unsignal();
private:
HANDLE handle;
};
#endif
文件event.cpp
#include "event.h"
Event::Event(bool bManualUnsignal, bool bSignaled)
{
handle = CreateEvent(NULL, bManualUnsignal, bSignaled, NULL);
}
Event::~Event()
{
CloseHandle(handle);
}
bool Event::block()
{
return WaitForSingleObject(handle, INFINITE) == WAIT_OBJECT_0;
}
bool Event::signal()
{
return SetEvent(handle) == TRUE;
}
bool Event::unsignal()
{
return ResetEvent(handle) == TRUE;
}
小结
本节首先介绍了同步与异步的基本概念,进而说明了同步在多线程编程中的作用。
事件对象(Event Object)是一种最具弹性的同步机制。事件对象在某些条件满足之前将
一直保持非激发状态。程序员可以完全控制事件对象的状态(激发状态和非激发状态)。
事件对象使得程序员可以以最大的灵活性来定义复杂的同步对象。有两种类型的事件对象
(自动重置的事件对象和手动重置的事件对象)。一个手动重置的事件对象需要程序员显
式地将其状态从激发状态返回到非激发状态。然而一个自动重置的事件对象会在一个
Event::block()操作完成后自动地返回到非激发状态。
虽然事件对象所能完成的一切功能都可以通过互斥来完成,但是使用事件对象的解决方案
显得更加优雅,并且效率更高。
这是一条镜像帖。来源:北邮人论坛 / cpp / #36530同步于 2010/3/11
CPP机器人发帖
C++多线程入门(三)
hma
2010/3/11镜像同步0 回复
订阅后,新回复会通过你的通知中心匿名送达。
0 条回复
暂无回复 · 你可以订阅本帖等待新回复。