Poco::SynchronizedObject

Poco::SynchronizedObject を紹介します。

このクラスは Poco::MutexPoco::Event を集約したもので、マルチスレッド環境で同期を必要とする全てのオブジェクトのベースクラスとして機能します。

SynchronizedObjectTest.cpp

・それぞれが 4 つの Job を持つ Thread を 3 つ立ち上げ。
・各 Thread の 2 つ目と 3 つ目の Job は、特定の他 Thread の Job 終了を待ってから開始。
・図示するとこんな感じ:
Thread construction
・上図の Job(水色の四角)同士を繋いでいる 6 つ(6 色)の線が、それぞれ Poco::SynchronizedObject
Poco::SynchronizedObject は 3 つ(Thread 間で 1 つずつ)でも同等の挙動を実現できるが、自 Thread
 が発行する Poco::SynchronizedObject::notify() に、自分自身の Poco::SynchronizedObject::wait() が即座
 に反応しないように、Poco::Thread::yield() を呼んだりする手間が増えるので、ここでは 6 つの
 Poco::SynchronizedObject を使うことにした。

#include <Poco/SynchronizedObject.h>
#include <Poco/Runnable.h>
#include <Poco/Thread.h>
#include <Poco/Format.h>
#include <Poco/Stopwatch.h>
 
#include <string>
 
#include "ScopedLogMessage.h"
#include "PrepareConsoleLogger.h"
 
const std::size_t kNumThreads = 3;
const std::size_t kNumSyncObjects = 2*kNumThreads;
const std::size_t kNumJobs = 4;
 
enum
{
	eNotifyNone = 0
,	eNotifyAB
,	eNotifyAC
,	eNotifyBA
,	eNotifyBC
,	eNotifyCA
,	eNotifyCB
};
 
enum
{
	eWaitNone = 0
,	eWaitAB
,	eWaitAC
,	eWaitBA
,	eWaitBC
,	eWaitCA
,	eWaitCB
};
 
typedef struct
{
	long	sleep;
	int	notify[kNumThreads-1];
	int	wait;
} JobSpec;
 
const JobSpec kJobSpec[kNumThreads][kNumJobs] =
{
	{	// for threadA
		{	200,	{eNotifyAB,   eNotifyAC},	eWaitBA		}
	,	{	100,	{eNotifyNone, eNotifyNone},	eWaitCA		}
	,	{	 50,	{eNotifyNone, eNotifyNone},	eWaitNone	}
	,	{	400,	{eNotifyNone, eNotifyNone},	eWaitNone	}
	},
	{	// for threadB
		{	 50,	{eNotifyNone, eNotifyNone},	eWaitAB		}
	,	{	200,	{eNotifyBA,   eNotifyBC},	eWaitCB		}
	,	{	100,	{eNotifyNone, eNotifyNone},	eWaitNone	}
	,	{	400,	{eNotifyNone, eNotifyNone},	eWaitNone	}
	},
	{	// for threadC
		{	100,	{eNotifyNone, eNotifyNone},	eWaitAC		}
	,	{	 50,	{eNotifyNone, eNotifyNone},	eWaitBC		}
	,	{	200,	{eNotifyCA,   eNotifyCB},	eWaitNone	}
	,	{	550,	{eNotifyNone, eNotifyNone},	eWaitNone	}
	}
};
 
class MySyncObject: public Poco::SynchronizedObject
{
public:
	MySyncObject() :
		m_Name("")
	{
	}
 
	void setName(const std::string& name)
	{
		m_Name = name;
	}
 
	std::string getName(void) const
	{
		return m_Name;
	}
 
private:
	std::string	m_Name;
};
 
class MyRunnable : public Poco::Runnable
{
public:
	MyRunnable() :
		m_pMsg(NULL)
	,	m_pStopwatch(NULL)
	,	m_Index(0)
	,	m_IndexChar('A')
	,	m_SyncObjects(NULL)
	{
	}
 
	void setIndex(	ScopedLogMessage* pMsg
		,	Poco::Stopwatch* pStopwatch
		,	std::size_t index
		,	MySyncObject* syncObjects)
	{
		m_pMsg = pMsg;
		m_pStopwatch = pStopwatch;
		m_Index = index;
		m_IndexChar = static_cast<char>(m_IndexChar+index);
		m_SyncObjects = syncObjects;
	}
 
	void run()
	{
		if(NULL != m_pMsg)
		{
			for(std::size_t i=0; i<kNumJobs; ++i)
			{
				const JobSpec& spec = kJobSpec[m_Index][i];
 
				job(i, spec);
 
				for(std::size_t j=0; j<kNumThreads-1; ++j)
				{
					int notify = spec.notify[j];
					if(eNotifyNone != notify)
					{
						s_Indent += " ";
						m_SyncObjects[--notify].notify();
					}
				}
 
				int wait = spec.wait;
				if(eWaitNone != wait)
				{
					m_pMsg->Message(Poco::format("%06Ld %s Thread%c sleeps"
							, m_pStopwatch->elapsed()/1000
							, s_Indent
							, m_IndexChar));
 
					m_SyncObjects[--wait].wait();
 
					m_pMsg->Message(Poco::format("%06Ld %s Thread%c woke up by SyncObject%s"
							, m_pStopwatch->elapsed()/1000
							, s_Indent
							, m_IndexChar
							, m_SyncObjects[wait].getName()));
				}
			}
		}
	}
 
private:
	void job(std::size_t jobIndex, const JobSpec& spec)
	{
		++jobIndex;
		m_pMsg->Message(Poco::format("%06Ld %s Thread%c Job%z start"
				, m_pStopwatch->elapsed()/1000
				, s_Indent
				, m_IndexChar
				, jobIndex));
		Poco::Thread::sleep(spec.sleep);
		m_pMsg->Message(Poco::format("%06Ld %s Thread%c Job%z end"
				, m_pStopwatch->elapsed()/1000
				, s_Indent
				, m_IndexChar
				, jobIndex));
	}
 
	static std::string	s_Indent;
	ScopedLogMessage*	m_pMsg;
	Poco::Stopwatch*	m_pStopwatch;
	std::size_t		m_Index;
	char			m_IndexChar;
	MySyncObject*		m_SyncObjects;
};
 
std::string MyRunnable::s_Indent = "";
 
int main(int /*argc*/, char** /*argv*/)
{
	PrepareConsoleLogger logger(Poco::Logger::ROOT, Poco::Message::PRIO_INFORMATION);
 
	ScopedLogMessage msg("SynchronizedObjectTest ", "start", "end");
 
	Poco::Stopwatch		stopwatch;
	Poco::Thread		thread[kNumThreads];
	MyRunnable		runnable[kNumThreads];
	MySyncObject		syncObjects[kNumSyncObjects];
	const std::string	SyncObjNameTable[kNumSyncObjects] =
					{"AB", "AC", "BA", "BC", "CA", "CB"};
 
	for(std::size_t i=0; i<kNumThreads; ++i)
	{
		runnable[i].setIndex(&msg, &stopwatch, i, syncObjects);
	}
	for(std::size_t i=0; i<kNumSyncObjects; ++i)
	{
		syncObjects[i].setName(SyncObjNameTable[i]);
	}
 
	stopwatch.start();
 
	for(std::size_t i=0; i<kNumThreads; ++i)
	{
		thread[i].start(runnable[i]);
	}
 
	for(std::size_t i=0; i<kNumThreads; ++i)
	{
		thread[i].join();
	}
 
	return 0;
}

Results of execution

・左端の 6 桁の数字は、開始からの経過時間(msec)

[0] SynchronizedObjectTest start
[1] 000000  ThreadA Job1 start
[2] 000000  ThreadB Job1 start
[3] 000000  ThreadC Job1 start
[2] 000050  ThreadB Job1 end
[2] 000050  ThreadB sleeps
[3] 000100  ThreadC Job1 end
[3] 000100  ThreadC sleeps
[1] 000200  ThreadA Job1 end
[1] 000200    ThreadA sleeps
[3] 000200    ThreadC woke up by SyncObjectAC
[3] 000200    ThreadC Job2 start
[2] 000200    ThreadB woke up by SyncObjectAB
[2] 000200    ThreadB Job2 start
[3] 000250    ThreadC Job2 end
[3] 000250    ThreadC sleeps
[2] 000401    ThreadB Job2 end
[2] 000401      ThreadB sleeps
[3] 000401      ThreadC woke up by SyncObjectBC
[3] 000401      ThreadC Job3 start
[1] 000401      ThreadA woke up by SyncObjectBA
[1] 000401      ThreadA Job2 start
[1] 000501      ThreadA Job2 end
[1] 000501      ThreadA sleeps
[3] 000601      ThreadC Job3 end
[3] 000601        ThreadC Job4 start
[2] 000601        ThreadB woke up by SyncObjectCB
[2] 000601        ThreadB Job3 start
[1] 000601        ThreadA woke up by SyncObjectCA
[1] 000602        ThreadA Job3 start
[1] 000652        ThreadA Job3 end
[1] 000652        ThreadA Job4 start
[2] 000701        ThreadB Job3 end
[2] 000702        ThreadB Job4 start
[1] 001052        ThreadA Job4 end
[2] 001102        ThreadB Job4 end
[3] 001151        ThreadC Job4 end
[0] SynchronizedObjectTest end

Downloads

ここをクリックすると、makefile や VC++ プロジェクトなど一式がダウンロードできます。
(2013.05.31 updated)
・2010年7月1日からのダウンロード数:761

Subversion

・フリーの Subversion ホスティングサービス Assemblaで、ソースコードを管理しています。

Powered by POCO Copyright © 2010 Round Square Inc. All rights reserved.


Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong> <pre lang="" line="" escaped="" highlight="">