Poco::NotificationCenter

Poco::NotificationCenter を紹介します。

Poco::NotificationCenter は、基本的に Notification のディスパッチャーで、クライアントオブジェクトが Poco::NotificationCenter にオブザーバー method を登録すると、Notification がポストされる度にその method が呼び出されます。
オブザーバー method が呼び出される順番は不定で、Notification をポストするオブジェクトと呼び出されるオブジェクトが同じこともあり得ます。Poco::NotificationCenterNotification を同期的に呼び出すので、全てのオブザーバーが notification を受信し処理し終わるまで、Poco::NotificationCenterNotification のディスパッチを中止し、例外を投げ直します。
マルチスレッド環境の場合、Notification はポストされたスレッドで配信されます。
Notification の処理中に(自分自身を含む)オブザーバーを削除/登録しても大丈夫です。(次回の Notification から有効)

NotificationCenterTest.cpp

・テンプレートクラス TestNotificationCenter を、
  T = Poco::NotificationQueue
  T = Poco::PriorityNotificationQueue
 についてテスト。
・3 つのスレッドを起こし、それぞれの handleNotification() をオブザーバに登録。
・メインスレッドから 4 種類の Notification をポスト。
・handleNotification() 内で、自分向けの Notification かどうかチェックし、自分向けなら queue に積む。
・スレッドの run() ループ終了用の BaseNotification が来た時も、自分向けの queue に積む。

#include <Poco/Format.h>
#include <Poco/Notification.h>
#include <Poco/NotificationQueue.h>
#include <Poco/PriorityNotificationQueue.h>
#include <Poco/ThreadPool.h>
#include <Poco/Thread.h>
#include <Poco/Runnable.h>
#include <Poco/Random.h>
#include <Poco/AutoPtr.h>
#include <Poco/NotificationCenter.h>
#include <Poco/Observer.h>
 
#include <string>
#include <typeinfo>
 
#include "ScopedLogMessage.h"
#include "PrepareConsoleLogger.h"
 
const int kNumNotification	= 3;
const int kPriorityHighest	= 0;
const int kPriorityNormal	= 5;
const long kSleepMaxTime	= 200;
const long kSleepWaitTime	= 500;
 
class BaseNotification : public Poco::Notification
{
public:
	BaseNotification(int data):
		m_data(data)
	{
	}
 
	int data() const
	{
		return m_data;
	}
 
private:
	int	m_data;
};
 
template<int N>
class EachNotification : public BaseNotification
{
public:
	EachNotification(int data):
		BaseNotification(data)
	{
	}
};
 
template<class T>
class MyNotificationQueue : public T
{
public:
	MyNotificationQueue() :
		m_pMsg(NULL)
	{
	}
 
	MyNotificationQueue(ScopedLogMessage& msg) :
		m_pMsg(&msg)
	{
	}
 
	void MyEnqueueNotification(Poco::Notification::Ptr pNotification, int /*priority*/)
	{
		m_pMsg->Message(Poco::format("Class %s is not supported!", std::string(typeid(T).name())));
	}
 
private:
	ScopedLogMessage*	m_pMsg;
};
 
template<>
void MyNotificationQueue<Poco::NotificationQueue>::MyEnqueueNotification(Poco::Notification::Ptr pNotification, int /*priority*/)
{
	enqueueNotification(pNotification);
}
 
template<>
void MyNotificationQueue<Poco::PriorityNotificationQueue>::MyEnqueueNotification(Poco::Notification::Ptr pNotification, int priority)
{
	enqueueNotification(pNotification, priority);
}
 
template<class T, int N>
class Worker : public Poco::Runnable
{
public:
	Worker(ScopedLogMessage& msg) :
		m_name(Poco::format("Worker<T,%d>", N))
	,	m_msg(msg)
	{
		m_msg.Message(Poco::format("  %s created", m_name));
	}
 
	~Worker()
	{
		m_msg.Message(Poco::format("  %s deleted", m_name));
	}
 
	void run()
	{
		Poco::Random rnd;
		for(;;)
		{
			Poco::Notification::Ptr pNf(m_queue.waitDequeueNotification());
			if(pNf)
			{
				Poco::AutoPtr< EachNotification<N> > pWorkNf = pNf.cast< EachNotification<N> >();
				if(pWorkNf)
				{
					m_msg.Message(Poco::format("    %s got EachNotification<%d>(%d)"
							, m_name
							, N
							, pWorkNf->data()));
					if(0 == pWorkNf->data())
					{
						m_msg.Message(Poco::format("    %s exits run loop", m_name));
						break;
					}
					Poco::Thread::sleep(rnd.next(kSleepMaxTime));
				}
			}
			else break;
		}
		m_queue.wakeUpAll();
	}
 
	void handleNotification(Poco::Notification* pNf)
	{
		Poco::AutoPtr< EachNotification<N> > pWorkNf = dynamic_cast< EachNotification<N>* >(pNf);
		if(pWorkNf)
		{
			m_msg.Message(Poco::format("    %s handleNotification got EachNotification<%d>(%d)"
							, m_name
							, N
							, pWorkNf->data()));
			m_msg.Message(Poco::format("     -> enqueue EachNotification<%d>(%d)"
							, N
							, pWorkNf->data()));
			m_queue.MyEnqueueNotification(pWorkNf, kPriorityNormal);
		}
		else
		{
			Poco::AutoPtr< BaseNotification > pNotification = dynamic_cast<BaseNotification*>(pNf);
			if(pNotification)
			{
				m_msg.Message(Poco::format("    %s handleNotification got BaseNotification(%d)"
							, m_name
							, pNotification->data()));
				if(0 == pNotification->data())
				{
					m_msg.Message(Poco::format("     -> enqueue EachNotification<%d>(%d)"
							, N
							, pNotification->data()));
					m_queue.MyEnqueueNotification(new EachNotification<N>(0), kPriorityHighest);
				}
			}
		}
	}
 
private:
	std::string		m_name;
	T			m_queue;
	ScopedLogMessage&	m_msg;
};
 
template<class T>
void TestNotificationCenter(ScopedLogMessage& msg, const std::string& title)
{
	msg.Message(Poco::format("--- T = %s ---", title));
 
	Poco::NotificationCenter nc;
 
	Worker< MyNotificationQueue<T>, 1 > worker1(msg);
	Worker< MyNotificationQueue<T>, 2 > worker2(msg);
	Worker< MyNotificationQueue<T>, 3 > worker3(msg);
 
	nc.addObserver(Poco::Observer<Worker< MyNotificationQueue<T>, 1 >, Poco::Notification>
					(worker1, &Worker< MyNotificationQueue<T>, 1 >::handleNotification));
	nc.addObserver(Poco::Observer<Worker< MyNotificationQueue<T>, 2 >, Poco::Notification>
					(worker2, &Worker< MyNotificationQueue<T>, 2 >::handleNotification));
	nc.addObserver(Poco::Observer<Worker< MyNotificationQueue<T>, 3 >, Poco::Notification>
					(worker3, &Worker< MyNotificationQueue<T>, 3 >::handleNotification));
 
	nc.postNotification(new EachNotification<3>(3));
	nc.postNotification(new EachNotification<2>(2));
	nc.postNotification(new EachNotification<1>(1));
	nc.postNotification(new BaseNotification(0));	// exit trigger for run loop
 
	Poco::ThreadPool::defaultPool().start(worker1);
	Poco::ThreadPool::defaultPool().start(worker2);
	Poco::ThreadPool::defaultPool().start(worker3);
 
	Poco::Thread::sleep(kSleepWaitTime);
 
	Poco::ThreadPool::defaultPool().joinAll();
}
 
int main(int /*argc*/, char** /*argv*/)
{
	PrepareConsoleLogger logger(Poco::Logger::ROOT, Poco::Message::PRIO_INFORMATION);
 
	ScopedLogMessage msg("NotificationCenterTest ", "start", "end");
 
	TestNotificationCenter<Poco::NotificationQueue>(msg, "Poco::NotificationQueue");
	TestNotificationCenter<Poco::PriorityNotificationQueue>(msg, "Poco::PriorityNotificationQueue");
 
	return 0;
}

Results of execution

・handleNotification は、Notification をポストしたスレッド[0]で実行され、そこから queue に積まれた後
 各スレッドで実行される。
Poco::PriorityNotificationQueue の場合は、run() ループ終了用の BaseNotification の優先度が高いので、
 最初に実行される。

[0] NotificationCenterTest start
[0] --- T = Poco::NotificationQueue ---
[0]   Worker<T,1> created
[0]   Worker<T,2> created
[0]   Worker<T,3> created
[0]     Worker<T,1> handleNotification got BaseNotification(3)
[0]     Worker<T,2> handleNotification got BaseNotification(3)
[0]     Worker<T,3> handleNotification got EachNotification<3>(3)
[0]      -> enqueue EachNotification<3>(3)
[0]     Worker<T,1> handleNotification got BaseNotification(2)
[0]     Worker<T,2> handleNotification got EachNotification<2>(2)
[0]      -> enqueue EachNotification<2>(2)
[0]     Worker<T,3> handleNotification got BaseNotification(2)
[0]     Worker<T,1> handleNotification got EachNotification<1>(1)
[0]      -> enqueue EachNotification<1>(1)
[0]     Worker<T,2> handleNotification got BaseNotification(1)
[0]     Worker<T,3> handleNotification got BaseNotification(1)
[0]     Worker<T,1> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<1>(0)
[0]     Worker<T,2> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<2>(0)
[0]     Worker<T,3> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<3>(0)
[2]     Worker<T,2> got EachNotification<2>(2)
[1]     Worker<T,1> got EachNotification<1>(1)
[3]     Worker<T,3> got EachNotification<3>(3)
[1]     Worker<T,1> got EachNotification<1>(0)
[2]     Worker<T,2> got EachNotification<2>(0)
[3]     Worker<T,3> got EachNotification<3>(0)
[1]     Worker<T,1> exits run loop
[2]     Worker<T,2> exits run loop
[3]     Worker<T,3> exits run loop
[0]   Worker<T,3> deleted
[0]   Worker<T,2> deleted
[0]   Worker<T,1> deleted
[0] --- T = Poco::PriorityNotificationQueue ---
[0]   Worker<T,1> created
[0]   Worker<T,2> created
[0]   Worker<T,3> created
[0]     Worker<T,1> handleNotification got BaseNotification(3)
[0]     Worker<T,2> handleNotification got BaseNotification(3)
[0]     Worker<T,3> handleNotification got EachNotification<3>(3)
[0]      -> enqueue EachNotification<3>(3)
[0]     Worker<T,1> handleNotification got BaseNotification(2)
[0]     Worker<T,2> handleNotification got EachNotification<2>(2)
[0]      -> enqueue EachNotification<2>(2)
[0]     Worker<T,3> handleNotification got BaseNotification(2)
[0]     Worker<T,1> handleNotification got EachNotification<1>(1)
[0]      -> enqueue EachNotification<1>(1)
[0]     Worker<T,2> handleNotification got BaseNotification(1)
[0]     Worker<T,3> handleNotification got BaseNotification(1)
[0]     Worker<T,1> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<1>(0)
[0]     Worker<T,2> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<2>(0)
[0]     Worker<T,3> handleNotification got BaseNotification(0)
[0]      -> enqueue EachNotification<3>(0)
[1]     Worker<T,1> got EachNotification<1>(0)
[1]     Worker<T,1> exits run loop
[3]     Worker<T,3> got EachNotification<3>(0)
[3]     Worker<T,3> exits run loop
[2]     Worker<T,2> got EachNotification<2>(0)
[2]     Worker<T,2> exits run loop
[0]   Worker<T,3> deleted
[0]   Worker<T,2> deleted
[0]   Worker<T,1> deleted
[0] NotificationCenterTest end

Downloads

ここをクリックすると、makefile や VC++ プロジェクトなど一式がダウンロードできます。
(2013.05.31 updated)
・2010年7月5日からのダウンロード数:801

Subversion

・フリーの Subversion ホスティングサービス Assemblaで、ソースコードを管理しています。

Reference

http://pocoproject.org にある NotificationsEvents のプレセンテーション。(PDF)

Powered by POCO Copyright © 2010 Round Square Inc. All rights reserved.


Leave a Comment


NOTE - You can use these HTML tags and attributes:
<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>