LCOV - code coverage report
Current view: top level - HLT/BASE - AliHLTAsyncProcessor.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 1 7 14.3 %
Date: 2016-06-14 17:26:59 Functions: 1 11 9.1 %

          Line data    Source code
       1             : #ifndef ALIHLTASYNCPROCESSOR_H
       2             : #define ALIHLTASYNCPROCESSOR_H
       3             : 
       4             : #define ALIHLTASYNCPROCESSOR_ALIGN 64
       5             : 
       6             : /* This file is property of and copyright by the ALICE HLT Project        * 
       7             : * ALICE Experiment at CERN, All rights reserved.                         *
       8             : * See cxx source for full Copyright notice                               */
       9             : 
      10             : /** @file    AliHLTAsyncProcessor.h
      11             : @author  David Rohr (drohr@cern.ch)
      12             : */
      13             : 
      14             : //The AliHLTAsyncProcessor class implements an interface for asynchronous
      15             : //tasks for HLT components.
      16             : 
      17             : #include "AliHLTLogging.h"
      18             : class AliHLTAsyncProcessorBackend;
      19             : class AliHLTComponent;
      20             : 
      21             : class AliHLTAsyncProcessor : public AliHLTLogging
      22             : {
      23             : public:
      24             :         AliHLTAsyncProcessor();
      25             :         virtual ~AliHLTAsyncProcessor();
      26             : 
      27             :         //Initialize the Async Processor with given queue depth.
      28             :         //If initialize is not called, or depth is 0, the async processor will serialize all jobs.
      29             :         //In that case, QueueAsyncTask() will just run the asynchronous task itself immediately.
      30             :         //Returns 0 on success.
      31             :         int Initialize(int depth, bool process = false, size_t process_buffer_size = 0);
      32           0 :         void SetFullQueueWarning(int val) {fMe->fFullQueueWarning = val;}
      33             : 
      34             :         int Deinitialize();
      35             :         //Deinitialize async queue. Terminates async thread and frees buffers.
      36             :         //Requires that no tasks are left in queue. Call WaitForTasks and RetrieveQueuedTaskResult before.
      37             :         //Return 1 if it cannot deinitialize.
      38             :         
      39             :         //Returns the queue depth
      40           0 :         int GetQueueDepth() {return fMe->fQueueDepth;}
      41             : 
      42             :         //Returns the number of total async tasks currently in the queue
      43             :         int GetNumberOfAsyncTasksInQueue();
      44             : 
      45             :         //Can be used to perform additional initialization inside the asynchronously spawned thread.
      46             :         //Pass the initialization function as argument.
      47             :         //InitializeAsyncTask will wait for the passed initialization function in the async thread to finish.
      48             :         //This second initialization step is optional, and it is needed if some thread-local contexts must be initialized.
      49             :         //Returns the return value returned by the passed function in the async thread.
      50             :         //Must not be called whily async tasks are running, will return NULL in that case
      51             :         //In principle, the interface is the same as QueueAsyncTask below, but the function will wait for the task
      52             :         //to finish, collect the result, and return it automatically.
      53             :         void* InitializeAsyncTask(void* (*initFunction)(void*), void* data);
      54             : 
      55             :         //Queue an asynchronous task. Provide a function pointer to the processing function and a pointer to a data object.
      56             :         //The asynchronous task shall take ownership of this data object, so the main task may no longer use it.
      57             :         //Returns 0 on success. Fails if the queue limit has already been reached.
      58             :         int QueueAsyncTask(void* (*)(void*), void*);
      59             : 
      60             :         //Returns the number of async tasks that are completed.
      61             :         //If the return value is greater than 1, RetrieveQueuedTaskResult() can be used to fetch the result of the first task that finished.
      62             :         int IsQueuedTaskCompleted();
      63             : 
      64             :         //Fetch the result of the first task that finished.
      65             :         //Returns NULL if no task has finished.
      66             :         void* RetrieveQueuedTaskResult();
      67             :         
      68             :         //Force the child process to quit. Only works with async process not thread.
      69             :         //WaitTime is the maximum time allowed for the child to quit properly in msec.
      70             :         //Returns 0 if the child exited properly, 1 if the child was killed hard, -1 on error
      71             :         int ForceChildExit(int waitTime);
      72             : 
      73             :         //Wait for n async tasks to finish, where n is the argument passed.
      74             :         //If n=0, the function waits for all queued tasks to finish.
      75             :         //If n is greater than the number of queued tasks, it will also wait for all tasks.
      76             :         void WaitForTasks(int);
      77             : 
      78             :         //The mutex can be used to serialize execution of the main task and the asynchronous task for non-thread-save parts.
      79             :         //This should be used carefully.
      80             :         //In particular, the main task must not be locked out for a long time. Return 0 on success
      81             :         int LockMutex(); //Gets the lock on the mutex
      82             :         int UnlockMutex(); //Relieves the lock on the mutex
      83             :         int TryLockMutex(); //Tries to get the lock on the mutex. Returns 0 on success if mutex could be locked. Always returns immediately.
      84             :         
      85             :         //Get size of async process shared buffer objects
      86           0 :         int GetBufferSize() {return(fMe->fBufferSize);}
      87             :         
      88             :         struct AliHLTAsyncProcessorBuffer
      89             :         {
      90             :                 friend class AliHLTAsyncProcessor;
      91             : 
      92             :                 void* fPtr;
      93             :                 size_t fSize;
      94             : 
      95             :                 private:
      96             :                 AliHLTAsyncProcessorBuffer* fNext;
      97             :                 bool fTObject;
      98             :         };
      99             :         static const size_t fgkBufferHeaderSize = sizeof(AliHLTAsyncProcessorBuffer) + (ALIHLTASYNCPROCESSOR_ALIGN - sizeof(AliHLTAsyncProcessorBuffer) % ALIHLTASYNCPROCESSOR_ALIGN) % ALIHLTASYNCPROCESSOR_ALIGN;
     100             :         
     101             :         struct AliHLTAsyncProcessorMultiBuffer
     102             :         {
     103             :                 size_t fNumberOfEntries;
     104             :                 AliHLTAsyncProcessorBuffer* fFirst;
     105             :         };
     106             :         static const size_t fgkMultiBufferHeaderSize = sizeof(AliHLTAsyncProcessorMultiBuffer) + (ALIHLTASYNCPROCESSOR_ALIGN - sizeof(AliHLTAsyncProcessorMultiBuffer) % ALIHLTASYNCPROCESSOR_ALIGN) % ALIHLTASYNCPROCESSOR_ALIGN;
     107             :         
     108             :         //Simple version to allocate and delete a void* ptr to a buffer for an async process only
     109             :         void* AllocateBuffer();
     110             :         void FreeBuffer(void* ptr);
     111             :         
     112             :         //More elaborate version: Allocate and delete a buffer object that contains also the size (good for passing with the functions).
     113             :         //Works for async threads and async processes. For an async process, size = 0 allocates the maximum buffer size.
     114             :         AliHLTAsyncProcessorBuffer* AllocateBuffer(size_t size);
     115             :         void FreeBuffer(AliHLTAsyncProcessorBuffer* buffer);
     116             :         
     117             :         //Serializes an object into an AliHLTAsyncProcessorBuffer and returns the pointer to it, containing the pointer to the serialized object and its size
     118             :         AliHLTAsyncProcessorBuffer* SerializeIntoBuffer(TObject* obj, AliHLTComponent* cls, AliHLTAsyncProcessorMultiBuffer* multiBuf = NULL);
     119             :         
     120             :         //Allocate and Free MultiBuffer that stores multiple objects
     121             :         AliHLTAsyncProcessorMultiBuffer* AllocateMultiBuffer();
     122             :         void FreeBuffer(AliHLTAsyncProcessorMultiBuffer* ptr);
     123             :         
     124             :         //Add a new buffer to a multibuffer. If ptr is nonzero, copy the content of ptr there.
     125             :         AliHLTAsyncProcessorBuffer* AddBuffer(AliHLTAsyncProcessorMultiBuffer* multiBuf, size_t size, void* ptr = NULL);
     126             :         AliHLTAsyncProcessorBuffer* GetEntry(AliHLTAsyncProcessorMultiBuffer* multiBuf, int num);
     127             :         
     128           0 :         size_t BufferSize() {return(fMe->fBufferSize);}
     129           0 :         void RequestPush() {fMe->fRequestPush = true;}
     130           0 :         bool PushRequested() {bool tmp = fMe->fRequestPush;fMe->fRequestPush = false;return tmp;}
     131             : 
     132             : private:
     133             :         AliHLTAsyncProcessor(const AliHLTAsyncProcessor&);
     134             :         AliHLTAsyncProcessor& operator=(const AliHLTAsyncProcessor&);
     135             :         
     136             :         //Provide additional shared buffer resources for a derived class when mode is fAsyncProcess.
     137             :         //Memory is zero-initialized.
     138             :         virtual size_t ChildSharedProcessBufferSize();
     139             :         
     140             :         size_t GetTotalSize(AliHLTAsyncProcessorMultiBuffer* multiBuf);
     141             : 
     142             :         struct AliHLTAsyncProcessorInput
     143             :         {
     144             :                 void* (*fFunction)(void*);
     145             :                 void* fData;
     146             :         };
     147             : 
     148             :         int GetTotalQueue();
     149             :         void AsyncThread();
     150             :         static void* AsyncThreadStartHelper(void*);
     151             :         static void* AsyncThreadStop(void*);
     152             : 
     153             :         static void* alignPointer(void* ptr, size_t size);
     154             :         static size_t alignSize(size_t size);
     155             :         
     156             : protected:
     157             :         struct AliHLTAsyncProcessorContent
     158             :         {
     159             :                 int fQueueDepth;
     160             :                 bool fAsyncThreadRunning, fAsyncThreadProcessing;
     161             :                 bool fExit;
     162             :                 bool fChildStopped;
     163             :                 AliHLTAsyncProcessorBackend* fBackend;
     164             : 
     165             :                 AliHLTAsyncProcessorInput* fInputQueue;
     166             :                 void** fOutputQueue;
     167             :                 int fInputQueueUsed, fOutputQueueUsed;
     168             :                 int fWaitingForTasks;
     169             :                 int fFullQueueWarning;
     170             : 
     171             :                 void* fSynchronousOutput;       //In synchronous mode, we need one output buffer, without initialization
     172             :                 
     173             :                 void* fBasePtr;
     174             :                 void* fBufferPtr;
     175             :                 bool* fBufferUsed;
     176             :                 size_t fBufferSize;
     177             :                 size_t fmmapSize;
     178             :                 
     179             :                 int fAsyncProcess;
     180             :                 void* fChildBufferSpace;
     181             :                 
     182             :                 bool fRequestPush;
     183             :         };
     184             :         
     185             :         int LockMutex(int i);
     186             :         int UnlockMutex(int i);
     187             :         
     188             :         AliHLTAsyncProcessorContent* fMe;       //This points to the interior of the Async Processor, possible in shared memory if the async part is an individual process
     189             : 
     190         126 :         ClassDef(AliHLTAsyncProcessor, 0);
     191             : };
     192             : 
     193             : #endif

Generated by: LCOV version 1.11