OgreWorkQueue.h

Go to the documentation of this file.
00001 /*
00002 -----------------------------------------------------------------------------
00003 This source file is part of OGRE
00004 (Object-oriented Graphics Rendering Engine)
00005 For the latest info, see http://www.ogre3d.org/
00006 
00007 Copyright (c) 2000-2012 Torus Knot Software Ltd
00008 
00009 Permission is hereby granted, free of charge, to any person obtaining a copy
00010 of this software and associated documentation files (the "Software"), to deal
00011 in the Software without restriction, including without limitation the rights
00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
00013 copies of the Software, and to permit persons to whom the Software is
00014 furnished to do so, subject to the following conditions:
00015 
00016 The above copyright notice and this permission notice shall be included in
00017 all copies or substantial portions of the Software.
00018 
00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
00025 THE SOFTWARE.
00026 -----------------------------------------------------------------------------
00027 */
00028 #ifndef __OgreWorkQueue_H__
00029 #define __OgreWorkQueue_H__
00030 
00031 #include "OgrePrerequisites.h"
00032 #include "OgreAtomicWrappers.h"
00033 #include "OgreAny.h"
00034 #include "OgreSharedPtr.h"
00035 
00036 namespace Ogre
00037 {
00069     class _OgreExport WorkQueue : public UtilityAlloc
00070     {
00071     protected:
00072         typedef std::map<String, uint16> ChannelMap;
00073         ChannelMap mChannelMap;
00074         uint16 mNextChannel;
00075         OGRE_MUTEX(mChannelMapMutex)
00076     public:
00078         typedef unsigned long long int RequestID;
00079 
00082         class _OgreExport Request : public UtilityAlloc
00083         {
00084             friend class WorkQueue;
00085         protected:
00087             uint16 mChannel;
00089             uint16 mType;
00091             Any mData;
00093             uint8 mRetryCount;
00095             RequestID mID;
00097             mutable bool mAborted;
00098 
00099         public:
00101             Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid);
00102             ~Request();
00104             void abortRequest() const { mAborted = true; }
00106             uint16 getChannel() const { return mChannel; }
00108             uint16 getType() const { return mType; }
00110             const Any& getData() const { return mData; }
00112             uint8 getRetryCount() const { return mRetryCount; }
00114             RequestID getID() const { return mID; }
00116             bool getAborted() const { return mAborted; }
00117         };
00118 
00121         struct _OgreExport Response : public UtilityAlloc
00122         {
00124             const Request* mRequest;
00126             bool mSuccess;
00128             String mMessages;
00130             Any mData;
00131 
00132         public:
00133             Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK);
00134             ~Response();
00136             const Request* getRequest() const { return mRequest; }
00138             bool succeeded() const { return mSuccess; }
00140             const String& getMessages() const { return mMessages; }
00142             const Any& getData() const { return mData; }
00144             void abortRequest() { mRequest->abortRequest(); mData.destroy(); }
00145         };
00146 
00160         class _OgreExport RequestHandler
00161         {
00162         public:
00163             RequestHandler() {}
00164             virtual ~RequestHandler() {}
00165 
00172             virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) 
00173             { (void)srcQ; return !req->getAborted(); }
00174 
00185             virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0;
00186         };
00187 
00195         class _OgreExport ResponseHandler
00196         {
00197         public:
00198             ResponseHandler() {}
00199             virtual ~ResponseHandler() {}
00200 
00207             virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) 
00208             { (void)srcQ; return !res->getRequest()->getAborted(); }
00209 
00217             virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0;
00218         };
00219 
00220         WorkQueue() : mNextChannel(0) {}
00221         virtual ~WorkQueue() {}
00222 
00227         virtual void startup(bool forceRestart = true) = 0;
00237         virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0;
00239         virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0;
00240 
00250         virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
00252         virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0;
00253 
00266         virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
00267             bool forceSynchronous = false) = 0;
00268 
00274         virtual void abortRequest(RequestID id) = 0;
00275 
00281         virtual void abortRequestsByChannel(uint16 channel) = 0;
00282 
00287         virtual void abortAllRequests() = 0;
00288         
00294         virtual void setPaused(bool pause) = 0;
00296         virtual bool isPaused() const = 0;
00297 
00302         virtual void setRequestsAccepted(bool accept) = 0;
00304         virtual bool getRequestsAccepted() const = 0;
00305 
00314         virtual void processResponses() = 0; 
00315 
00319         virtual unsigned long getResponseProcessingTimeLimit() const = 0;
00320 
00326         virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0;
00327 
00330         virtual void shutdown() = 0;
00331 
00339         virtual uint16 getChannel(const String& channelName);
00340 
00341     };
00342 
00345     class _OgreExport DefaultWorkQueueBase : public WorkQueue
00346     {
00347     public:
00348 
00353         DefaultWorkQueueBase(const String& name = StringUtil::BLANK);
00354         virtual ~DefaultWorkQueueBase();
00356         const String& getName() const;
00360         virtual size_t getWorkerThreadCount() const;
00361 
00367         virtual void setWorkerThreadCount(size_t c);
00368 
00378         virtual bool getWorkersCanAccessRenderSystem() const;
00379 
00380 
00392         virtual void setWorkersCanAccessRenderSystem(bool access);
00393 
00401         virtual void _processNextRequest();
00402 
00404         virtual void _threadMain() = 0;
00405 
00407         virtual bool isShuttingDown() const { return mShuttingDown; }
00408 
00410         virtual void addRequestHandler(uint16 channel, RequestHandler* rh);
00412         virtual void removeRequestHandler(uint16 channel, RequestHandler* rh);
00414         virtual void addResponseHandler(uint16 channel, ResponseHandler* rh);
00416         virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh);
00417 
00419         virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 
00420             bool forceSynchronous = false);
00422         virtual void abortRequest(RequestID id);
00424         virtual void abortRequestsByChannel(uint16 channel);
00426         virtual void abortAllRequests();
00428         virtual void setPaused(bool pause);
00430         virtual bool isPaused() const;
00432         virtual void setRequestsAccepted(bool accept);
00434         virtual bool getRequestsAccepted() const;
00436         virtual void processResponses(); 
00438         virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; }
00440         virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; }
00441     protected:
00442         String mName;
00443         size_t mWorkerThreadCount;
00444         bool mWorkerRenderSystemAccess;
00445         bool mIsRunning;
00446         unsigned long mResposeTimeLimitMS;
00447 
00448         typedef deque<Request*>::type RequestQueue;
00449         typedef deque<Response*>::type ResponseQueue;
00450         RequestQueue mRequestQueue;
00451         RequestQueue mProcessQueue;
00452         ResponseQueue mResponseQueue;
00453 
00455         struct WorkerFunc OGRE_THREAD_WORKER_INHERIT
00456         {
00457             DefaultWorkQueueBase* mQueue;
00458 
00459             WorkerFunc(DefaultWorkQueueBase* q) 
00460                 : mQueue(q) {}
00461 
00462             void operator()();
00463 
00464             void run();
00465         };
00466         WorkerFunc* mWorkerFunc;
00467 
00472         class _OgreExport RequestHandlerHolder : public UtilityAlloc
00473         {
00474         protected:
00475             OGRE_RW_MUTEX(mRWMutex);
00476             RequestHandler* mHandler;
00477         public:
00478             RequestHandlerHolder(RequestHandler* handler)
00479                 : mHandler(handler) {}
00480 
00481             // Disconnect the handler to allow it to be destroyed
00482             void disconnectHandler()
00483             {
00484                 // write lock - must wait for all requests to finish
00485                 OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex);
00486                 mHandler = 0;
00487             }
00488 
00492             RequestHandler* getHandler() { return mHandler; }
00493 
00497             Response* handleRequest(const Request* req, const WorkQueue* srcQ)
00498             {
00499                 // Read mutex so that multiple requests can be processed by the
00500                 // same handler in parallel if required
00501                 OGRE_LOCK_RW_MUTEX_READ(mRWMutex);
00502                 Response* response = 0;
00503                 if (mHandler)
00504                 {
00505                     if (mHandler->canHandleRequest(req, srcQ))
00506                     {
00507                         response = mHandler->handleRequest(req, srcQ);
00508                     }
00509                 }
00510                 return response;
00511             }
00512 
00513         };
00514         // Hold these by shared pointer so they can be copied keeping same instance
00515         typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr;
00516 
00517         typedef list<RequestHandlerHolderPtr>::type RequestHandlerList;
00518         typedef list<ResponseHandler*>::type ResponseHandlerList;
00519         typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel;
00520         typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel;
00521 
00522         RequestHandlerListByChannel mRequestHandlers;
00523         ResponseHandlerListByChannel mResponseHandlers;
00524         RequestID mRequestCount;
00525         bool mPaused;
00526         bool mAcceptRequests;
00527         bool mShuttingDown;
00528 
00529         OGRE_MUTEX(mRequestMutex)
00530         OGRE_MUTEX(mProcessMutex)
00531         OGRE_MUTEX(mResponseMutex)
00532         OGRE_RW_MUTEX(mRequestHandlerMutex);
00533 
00534 
00535         void processRequestResponse(Request* r, bool synchronous);
00536         Response* processRequest(Request* r);
00537         void processResponse(Response* r);
00539         virtual void notifyWorkers() = 0;
00541         void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount);
00542 
00543     };
00544 
00545 
00546 
00547 
00548 
00552 }
00553 
00554 
00555 #endif
00556 

Copyright © 2012 Torus Knot Software Ltd
Creative Commons License
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.
Last modified Fri May 25 23:36:28 2012