00001 /* 00002 ----------------------------------------------------------------------------- 00003 This source file is part of OGRE 00004 (Object-oriented Graphics Rendering Engine) 00005 For the latest info, see http://www.ogre3d.org/ 00006 00007 Copyright (c) 2000-2012 Torus Knot Software Ltd 00008 00009 Permission is hereby granted, free of charge, to any person obtaining a copy 00010 of this software and associated documentation files (the "Software"), to deal 00011 in the Software without restriction, including without limitation the rights 00012 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 00013 copies of the Software, and to permit persons to whom the Software is 00014 furnished to do so, subject to the following conditions: 00015 00016 The above copyright notice and this permission notice shall be included in 00017 all copies or substantial portions of the Software. 00018 00019 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 00020 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 00021 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 00022 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 00023 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 00024 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 00025 THE SOFTWARE. 00026 ----------------------------------------------------------------------------- 00027 */ 00028 #ifndef __OgreWorkQueue_H__ 00029 #define __OgreWorkQueue_H__ 00030 00031 #include "OgrePrerequisites.h" 00032 #include "OgreAtomicWrappers.h" 00033 #include "OgreAny.h" 00034 #include "OgreSharedPtr.h" 00035 00036 namespace Ogre 00037 { 00069 class _OgreExport WorkQueue : public UtilityAlloc 00070 { 00071 protected: 00072 typedef std::map<String, uint16> ChannelMap; 00073 ChannelMap mChannelMap; 00074 uint16 mNextChannel; 00075 OGRE_MUTEX(mChannelMapMutex) 00076 public: 00078 typedef unsigned long long int RequestID; 00079 00082 class _OgreExport Request : public UtilityAlloc 00083 { 00084 friend class WorkQueue; 00085 protected: 00087 uint16 mChannel; 00089 uint16 mType; 00091 Any mData; 00093 uint8 mRetryCount; 00095 RequestID mID; 00097 mutable bool mAborted; 00098 00099 public: 00101 Request(uint16 channel, uint16 rtype, const Any& rData, uint8 retry, RequestID rid); 00102 ~Request(); 00104 void abortRequest() const { mAborted = true; } 00106 uint16 getChannel() const { return mChannel; } 00108 uint16 getType() const { return mType; } 00110 const Any& getData() const { return mData; } 00112 uint8 getRetryCount() const { return mRetryCount; } 00114 RequestID getID() const { return mID; } 00116 bool getAborted() const { return mAborted; } 00117 }; 00118 00121 struct _OgreExport Response : public UtilityAlloc 00122 { 00124 const Request* mRequest; 00126 bool mSuccess; 00128 String mMessages; 00130 Any mData; 00131 00132 public: 00133 Response(const Request* rq, bool success, const Any& data, const String& msg = StringUtil::BLANK); 00134 ~Response(); 00136 const Request* getRequest() const { return mRequest; } 00138 bool succeeded() const { return mSuccess; } 00140 const String& getMessages() const { return mMessages; } 00142 const Any& getData() const { return mData; } 00144 void abortRequest() { mRequest->abortRequest(); mData.destroy(); } 00145 }; 00146 00160 class _OgreExport RequestHandler 00161 { 00162 public: 00163 RequestHandler() {} 00164 virtual ~RequestHandler() {} 00165 00172 virtual bool canHandleRequest(const Request* req, const WorkQueue* srcQ) 00173 { (void)srcQ; return !req->getAborted(); } 00174 00185 virtual Response* handleRequest(const Request* req, const WorkQueue* srcQ) = 0; 00186 }; 00187 00195 class _OgreExport ResponseHandler 00196 { 00197 public: 00198 ResponseHandler() {} 00199 virtual ~ResponseHandler() {} 00200 00207 virtual bool canHandleResponse(const Response* res, const WorkQueue* srcQ) 00208 { (void)srcQ; return !res->getRequest()->getAborted(); } 00209 00217 virtual void handleResponse(const Response* res, const WorkQueue* srcQ) = 0; 00218 }; 00219 00220 WorkQueue() : mNextChannel(0) {} 00221 virtual ~WorkQueue() {} 00222 00227 virtual void startup(bool forceRestart = true) = 0; 00237 virtual void addRequestHandler(uint16 channel, RequestHandler* rh) = 0; 00239 virtual void removeRequestHandler(uint16 channel, RequestHandler* rh) = 0; 00240 00250 virtual void addResponseHandler(uint16 channel, ResponseHandler* rh) = 0; 00252 virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh) = 0; 00253 00266 virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 00267 bool forceSynchronous = false) = 0; 00268 00274 virtual void abortRequest(RequestID id) = 0; 00275 00281 virtual void abortRequestsByChannel(uint16 channel) = 0; 00282 00287 virtual void abortAllRequests() = 0; 00288 00294 virtual void setPaused(bool pause) = 0; 00296 virtual bool isPaused() const = 0; 00297 00302 virtual void setRequestsAccepted(bool accept) = 0; 00304 virtual bool getRequestsAccepted() const = 0; 00305 00314 virtual void processResponses() = 0; 00315 00319 virtual unsigned long getResponseProcessingTimeLimit() const = 0; 00320 00326 virtual void setResponseProcessingTimeLimit(unsigned long ms) = 0; 00327 00330 virtual void shutdown() = 0; 00331 00339 virtual uint16 getChannel(const String& channelName); 00340 00341 }; 00342 00345 class _OgreExport DefaultWorkQueueBase : public WorkQueue 00346 { 00347 public: 00348 00353 DefaultWorkQueueBase(const String& name = StringUtil::BLANK); 00354 virtual ~DefaultWorkQueueBase(); 00356 const String& getName() const; 00360 virtual size_t getWorkerThreadCount() const; 00361 00367 virtual void setWorkerThreadCount(size_t c); 00368 00378 virtual bool getWorkersCanAccessRenderSystem() const; 00379 00380 00392 virtual void setWorkersCanAccessRenderSystem(bool access); 00393 00401 virtual void _processNextRequest(); 00402 00404 virtual void _threadMain() = 0; 00405 00407 virtual bool isShuttingDown() const { return mShuttingDown; } 00408 00410 virtual void addRequestHandler(uint16 channel, RequestHandler* rh); 00412 virtual void removeRequestHandler(uint16 channel, RequestHandler* rh); 00414 virtual void addResponseHandler(uint16 channel, ResponseHandler* rh); 00416 virtual void removeResponseHandler(uint16 channel, ResponseHandler* rh); 00417 00419 virtual RequestID addRequest(uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount = 0, 00420 bool forceSynchronous = false); 00422 virtual void abortRequest(RequestID id); 00424 virtual void abortRequestsByChannel(uint16 channel); 00426 virtual void abortAllRequests(); 00428 virtual void setPaused(bool pause); 00430 virtual bool isPaused() const; 00432 virtual void setRequestsAccepted(bool accept); 00434 virtual bool getRequestsAccepted() const; 00436 virtual void processResponses(); 00438 virtual unsigned long getResponseProcessingTimeLimit() const { return mResposeTimeLimitMS; } 00440 virtual void setResponseProcessingTimeLimit(unsigned long ms) { mResposeTimeLimitMS = ms; } 00441 protected: 00442 String mName; 00443 size_t mWorkerThreadCount; 00444 bool mWorkerRenderSystemAccess; 00445 bool mIsRunning; 00446 unsigned long mResposeTimeLimitMS; 00447 00448 typedef deque<Request*>::type RequestQueue; 00449 typedef deque<Response*>::type ResponseQueue; 00450 RequestQueue mRequestQueue; 00451 RequestQueue mProcessQueue; 00452 ResponseQueue mResponseQueue; 00453 00455 struct WorkerFunc OGRE_THREAD_WORKER_INHERIT 00456 { 00457 DefaultWorkQueueBase* mQueue; 00458 00459 WorkerFunc(DefaultWorkQueueBase* q) 00460 : mQueue(q) {} 00461 00462 void operator()(); 00463 00464 void run(); 00465 }; 00466 WorkerFunc* mWorkerFunc; 00467 00472 class _OgreExport RequestHandlerHolder : public UtilityAlloc 00473 { 00474 protected: 00475 OGRE_RW_MUTEX(mRWMutex); 00476 RequestHandler* mHandler; 00477 public: 00478 RequestHandlerHolder(RequestHandler* handler) 00479 : mHandler(handler) {} 00480 00481 // Disconnect the handler to allow it to be destroyed 00482 void disconnectHandler() 00483 { 00484 // write lock - must wait for all requests to finish 00485 OGRE_LOCK_RW_MUTEX_WRITE(mRWMutex); 00486 mHandler = 0; 00487 } 00488 00492 RequestHandler* getHandler() { return mHandler; } 00493 00497 Response* handleRequest(const Request* req, const WorkQueue* srcQ) 00498 { 00499 // Read mutex so that multiple requests can be processed by the 00500 // same handler in parallel if required 00501 OGRE_LOCK_RW_MUTEX_READ(mRWMutex); 00502 Response* response = 0; 00503 if (mHandler) 00504 { 00505 if (mHandler->canHandleRequest(req, srcQ)) 00506 { 00507 response = mHandler->handleRequest(req, srcQ); 00508 } 00509 } 00510 return response; 00511 } 00512 00513 }; 00514 // Hold these by shared pointer so they can be copied keeping same instance 00515 typedef SharedPtr<RequestHandlerHolder> RequestHandlerHolderPtr; 00516 00517 typedef list<RequestHandlerHolderPtr>::type RequestHandlerList; 00518 typedef list<ResponseHandler*>::type ResponseHandlerList; 00519 typedef map<uint16, RequestHandlerList>::type RequestHandlerListByChannel; 00520 typedef map<uint16, ResponseHandlerList>::type ResponseHandlerListByChannel; 00521 00522 RequestHandlerListByChannel mRequestHandlers; 00523 ResponseHandlerListByChannel mResponseHandlers; 00524 RequestID mRequestCount; 00525 bool mPaused; 00526 bool mAcceptRequests; 00527 bool mShuttingDown; 00528 00529 OGRE_MUTEX(mRequestMutex) 00530 OGRE_MUTEX(mProcessMutex) 00531 OGRE_MUTEX(mResponseMutex) 00532 OGRE_RW_MUTEX(mRequestHandlerMutex); 00533 00534 00535 void processRequestResponse(Request* r, bool synchronous); 00536 Response* processRequest(Request* r); 00537 void processResponse(Response* r); 00539 virtual void notifyWorkers() = 0; 00541 void addRequestWithRID(RequestID rid, uint16 channel, uint16 requestType, const Any& rData, uint8 retryCount); 00542 00543 }; 00544 00545 00546 00547 00548 00552 } 00553 00554 00555 #endif 00556
Copyright © 2012 Torus Knot Software Ltd
This work is licensed under a Creative Commons Attribution-ShareAlike 3.0 Unported License.
Last modified Fri May 25 23:36:28 2012