LCOV - code coverage report
Current view: top level - HLT/BASE - AliHLTAsyncProcessor.cxx (source / functions) Hit Total Coverage
Test: coverage.info Lines: 28 353 7.9 %
Date: 2016-06-14 17:26:59 Functions: 3 37 8.1 %

          Line data    Source code
       1             : /* This file is property of and copyright by the ALICE HLT Project        * 
       2             : * ALICE Experiment at CERN, All rights reserved.                         *
       3             : * See cxx source for full Copyright notice                               */
       4             : 
       5             : /** @file    AliHLTAsyncProcessor.cxx
       6             : @author  David Rohr (drohr@cern.ch)
       7             : */
       8             : 
       9             : #ifdef __MACH__
      10             : #define MAP_ANONYMOUS MAP_ANON
      11             : #endif
      12             : 
      13             : #include "AliHLTAsyncProcessor.h"
      14             : #include "AliHLTAsyncProcessorBackend.h"
      15             : #include "AliHLTComponent.h"
      16             : #include <sys/mman.h>
      17             : 
      18         126 : ClassImp(AliHLTAsyncProcessor)
      19             : 
      20          18 : AliHLTAsyncProcessor::AliHLTAsyncProcessor() : AliHLTLogging(), fMe(new AliHLTAsyncProcessorContent)
      21          18 : {
      22           6 :         fMe->fQueueDepth = 0;
      23           6 :         fMe->fAsyncThreadRunning = false;
      24           6 :         fMe->fAsyncThreadProcessing = false;
      25           6 :         fMe->fExit = false;
      26           6 :         fMe->fChildStopped = false;
      27           6 :         fMe->fBackend = NULL;
      28           6 :         fMe->fInputQueue = NULL;
      29           6 :         fMe->fOutputQueue = NULL;
      30           6 :         fMe->fInputQueueUsed = 0;
      31           6 :         fMe->fOutputQueueUsed = 0;
      32           6 :         fMe->fWaitingForTasks = 0;
      33           6 :         fMe->fFullQueueWarning = 1;
      34           6 :         fMe->fSynchronousOutput = NULL;
      35           6 :         fMe->fBasePtr = NULL;
      36           6 :         fMe->fBufferPtr = NULL;
      37           6 :         fMe->fBufferSize = 0;
      38           6 :         fMe->fAsyncProcess = 0;
      39           6 :         fMe->fChildBufferSpace = NULL;
      40           6 :         fMe->fmmapSize = 0;
      41           6 :         fMe->fRequestPush = false;
      42           6 : }
      43             : 
      44             : AliHLTAsyncProcessor::~AliHLTAsyncProcessor()
      45          12 : {
      46           6 :         if (fMe->fQueueDepth) Deinitialize();
      47          12 :         delete fMe;
      48           6 : }
      49             : 
      50             : size_t AliHLTAsyncProcessor::alignSize(size_t size)
      51             : {
      52           0 :         if (size % ALIHLTASYNCPROCESSOR_ALIGN) size += ALIHLTASYNCPROCESSOR_ALIGN - size % ALIHLTASYNCPROCESSOR_ALIGN;
      53           0 :         return(size);
      54             : }
      55             : 
      56             : void* AliHLTAsyncProcessor::alignPointer(void* ptr, size_t size)
      57             : {
      58           0 :         size_t tmp = (size_t) ptr;
      59           0 :         tmp += size;
      60           0 :         tmp = alignSize(tmp);
      61           0 :         return (void*) tmp;
      62             : }
      63             : 
      64             : int AliHLTAsyncProcessor::Initialize(int depth, bool process, size_t process_buffer_size)
      65             : {
      66           0 :         HLTInfo("Initializing ASYNC Processor");
      67           0 :         if (fMe->fQueueDepth) return(1);
      68           0 :         fMe->fQueueDepth = depth;
      69           0 :         if (fMe->fQueueDepth)
      70             :         {
      71           0 :                 fMe->fAsyncProcess = process;
      72           0 :                 size_t size = sizeof(AliHLTAsyncProcessorBackend) +
      73           0 :                               (sizeof(AliHLTAsyncProcessorInput) + sizeof(void*)) * fMe->fQueueDepth +
      74           0 :                               process_buffer_size * (fMe->fQueueDepth + 3) +
      75           0 :                               ChildSharedProcessBufferSize() +
      76           0 :                               (6 + fMe->fQueueDepth) * ALIHLTASYNCPROCESSOR_ALIGN;
      77             :                 void* tmpPtr;
      78           0 :                 if (fMe->fAsyncProcess) //promote to running async process instead of async thread
      79             :                 {
      80           0 :                         fMe->fmmapSize = sizeof(AliHLTAsyncProcessorContent) +
      81           0 :                                          sizeof(bool) * (fMe->fQueueDepth + 3) +
      82           0 :                                          2 * ALIHLTASYNCPROCESSOR_ALIGN +
      83             :                                          size;
      84           0 :                         fMe->fBasePtr = mmap(NULL, fMe->fmmapSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
      85           0 :                         if (fMe->fBasePtr == NULL) return(1);
      86           0 :                         memcpy(fMe->fBasePtr, fMe, sizeof(AliHLTAsyncProcessorContent));
      87           0 :                         delete fMe;
      88           0 :                         fMe = (AliHLTAsyncProcessorContent*) fMe->fBasePtr;
      89           0 :                         fMe->fBufferSize = process_buffer_size;
      90           0 :                         if (fMe->fBufferSize % ALIHLTASYNCPROCESSOR_ALIGN) fMe->fBufferSize += ALIHLTASYNCPROCESSOR_ALIGN - fMe->fBufferSize % ALIHLTASYNCPROCESSOR_ALIGN;
      91           0 :                         tmpPtr = alignPointer(fMe->fBasePtr, sizeof(AliHLTAsyncProcessorContent));
      92             :                         
      93           0 :                         fMe->fBufferUsed = new (tmpPtr) bool[fMe->fQueueDepth + 3];
      94           0 :                         memset(fMe->fBufferUsed, 0, sizeof(bool) * (fMe->fQueueDepth + 3));
      95           0 :                         tmpPtr = alignPointer(tmpPtr, sizeof(bool) * (fMe->fQueueDepth + 3));
      96           0 :                 }
      97             :                 else
      98             :                 {
      99           0 :                         fMe->fBasePtr = malloc(size);
     100           0 :                         if (fMe->fBasePtr == 0) return(1);
     101             :                         tmpPtr = fMe->fBasePtr;
     102             :                 }
     103             :                 
     104           0 :                 fMe->fBackend = new (tmpPtr) AliHLTAsyncProcessorBackend;
     105           0 :                 if (fMe->fBackend->Initialize(fMe->fAsyncProcess)) return(1);
     106           0 :                 tmpPtr = alignPointer(tmpPtr, sizeof(AliHLTAsyncProcessorBackend));
     107             :                 
     108           0 :                 fMe->fInputQueue = new (tmpPtr) AliHLTAsyncProcessorInput[fMe->fQueueDepth];
     109           0 :                 tmpPtr = alignPointer(tmpPtr, sizeof(AliHLTAsyncProcessorInput) * fMe->fQueueDepth);
     110             :                 
     111           0 :                 fMe->fOutputQueue = new (tmpPtr) void*[fMe->fQueueDepth];
     112           0 :                 tmpPtr = alignPointer(tmpPtr, sizeof(void*) * fMe->fQueueDepth);
     113             :                 
     114           0 :                 if (ChildSharedProcessBufferSize())
     115             :                 {
     116           0 :                         fMe->fChildBufferSpace = tmpPtr;
     117           0 :                         memset(fMe->fChildBufferSpace, 0, ChildSharedProcessBufferSize());
     118           0 :                         tmpPtr = alignPointer(tmpPtr, ChildSharedProcessBufferSize());
     119           0 :                 }
     120             :                 
     121           0 :                 fMe->fBufferPtr = tmpPtr;
     122             :                 
     123           0 :                 for (int i = 2;i <= 4;i++) fMe->fBackend->LockMutex(i); //Lock thread control mutexes
     124           0 :                 if (fMe->fAsyncProcess)
     125             :                 {
     126           0 :                         if (fMe->fBackend->StartProcess(AsyncThreadStartHelper, this)) return(1);
     127             :                 }
     128             :                 else
     129             :                 {
     130           0 :                         if (fMe->fBackend->StartThread(AsyncThreadStartHelper, this)) return(1);
     131             :                 }
     132           0 :         }
     133           0 :         return(0);
     134           0 : }
     135             : 
     136             : int AliHLTAsyncProcessor::Deinitialize()
     137             : {
     138           0 :         HLTInfo("Deinitializing ASYNC Processor");
     139           0 :         if (fMe->fQueueDepth == 0) return(0);
     140           0 :         fMe->fBackend->LockMutex(1);
     141           0 :         if (GetTotalQueue())
     142             :         {
     143           0 :                 fMe->fBackend->UnlockMutex(1);
     144           0 :                 HLTError("Error during deinitialization of ASYNC Processor - Still tasks in queue");
     145           0 :                 return(1);
     146             :         }
     147             :         fMe->fBackend->UnlockMutex(1);
     148           0 :         if (!fMe->fChildStopped) QueueAsyncTask(AsyncThreadStop, this);
     149           0 :         if (fMe->fAsyncProcess)
     150             :         {
     151           0 :                 fMe->fBackend->StopProcess();
     152           0 :         }
     153             :         else
     154             :         {
     155           0 :                 fMe->fBackend->StopThread();
     156             :         }
     157           0 :         for (int i = 3;i <= 4;i++) fMe->fBackend->UnlockMutex(i); //Unlock remaining thread control mutexes
     158           0 :         for (int i = 0;i < fMe->fQueueDepth;i++)
     159             :         {
     160           0 :                 fMe->fInputQueue[i].~AliHLTAsyncProcessorInput();
     161             :                 //fMe->fOutputQueue.~void*(); //Adapt if this is becoming a real struct!
     162             :         }
     163           0 :         fMe->fAsyncThreadRunning = fMe->fAsyncThreadProcessing = false;
     164           0 :         fMe->fBackend->~AliHLTAsyncProcessorBackend();;
     165           0 :         fMe->fBackend = NULL;
     166           0 :         fMe->fQueueDepth = 0;
     167           0 :         fMe->fBufferSize = 0;
     168           0 :         if (fMe->fAsyncProcess)
     169             :         {
     170           0 :                 void* tmp = fMe;
     171           0 :                 fMe = new AliHLTAsyncProcessorContent;
     172           0 :                 memcpy(fMe, tmp, sizeof(AliHLTAsyncProcessorContent));
     173           0 :                 munmap(fMe->fBasePtr, fMe->fmmapSize);
     174           0 :         }
     175             :         else
     176             :         {
     177           0 :                 free(fMe->fBasePtr);
     178             :         }
     179             : 
     180           0 :         fMe->fExit = false;
     181           0 :         fMe->fChildStopped = false;
     182           0 :         fMe->fBackend = NULL;
     183           0 :         fMe->fInputQueue = NULL;
     184           0 :         fMe->fOutputQueue = NULL;
     185           0 :         fMe->fInputQueueUsed = 0;
     186           0 :         fMe->fOutputQueueUsed = 0;
     187           0 :         fMe->fWaitingForTasks = 0;
     188           0 :         fMe->fBasePtr = NULL;
     189           0 :         fMe->fBufferPtr = NULL;
     190           0 :         fMe->fBufferSize = 0;
     191           0 :         fMe->fAsyncProcess = 0;
     192           0 :         fMe->fChildBufferSpace = NULL;
     193           0 :         fMe->fmmapSize = 0;
     194           0 :         fMe->fRequestPush = false;
     195             : 
     196           0 :         HLTInfo("Deinitialization of ASYNC Processor done");
     197           0 :         return(0);
     198           0 : }
     199             : 
     200             : void* AliHLTAsyncProcessor::AsyncThreadStartHelper(void* obj)
     201             : {
     202           0 :         ((AliHLTAsyncProcessor*) obj)->AsyncThread();
     203           0 :         pthread_exit(NULL);
     204             :         return(NULL);
     205             : }
     206             : 
     207             : void* AliHLTAsyncProcessor::AsyncThreadStop(void* obj)
     208             : {
     209           0 :         ((AliHLTAsyncProcessor*) obj)->fMe->fExit = true;
     210           0 :         return(NULL);
     211             : }
     212             : 
     213             : void AliHLTAsyncProcessor::AsyncThread()
     214             : {
     215           0 :         HLTInfo("Async Thread Running");
     216             :         while (true)
     217             :         {
     218           0 :                 fMe->fBackend->LockMutex(2); //Lock async thread if nothing to do
     219           0 :                 while (true)
     220             :                 {
     221           0 :                         fMe->fBackend->LockMutex(1);
     222           0 :                         if (fMe->fInputQueueUsed == 0)
     223             :                         {
     224           0 :                                 fMe->fAsyncThreadRunning = false;
     225           0 :                                 fMe->fBackend->UnlockMutex(1);
     226           0 :                                 break; //No work to do, finish loops and lock this thread again
     227             :                         }
     228           0 :                         void* (*function)(void*) = fMe->fInputQueue[0].fFunction;
     229           0 :                         void* data = fMe->fInputQueue[0].fData;
     230           0 :                         for (int i = 0;i < fMe->fInputQueueUsed - 1;i++)
     231             :                         {
     232           0 :                                 fMe->fInputQueue[i] = fMe->fInputQueue[i + 1];
     233             :                         }
     234           0 :                         fMe->fInputQueueUsed--;
     235           0 :                         fMe->fAsyncThreadProcessing = true;
     236           0 :                         fMe->fBackend->UnlockMutex(1);
     237           0 :                         void* retVal = function(data);
     238           0 :                         fMe->fBackend->LockMutex(1);
     239           0 :                         if (fMe->fExit)
     240             :                         {
     241           0 :                                 fMe->fAsyncThreadProcessing = false;
     242           0 :                                 fMe->fAsyncThreadRunning = false;
     243           0 :                                 fMe->fBackend->UnlockMutex(1);
     244           0 :                                 break;
     245             :                         }
     246           0 :                         if (fMe->fOutputQueueUsed == fMe->fQueueDepth)
     247             :                         {
     248           0 :                                 fMe->fBackend->UnlockMutex(1);
     249           0 :                                 fMe->fBackend->LockMutex(4);
     250           0 :                                 fMe->fBackend->LockMutex(1);
     251           0 :                         }
     252           0 :                         fMe->fAsyncThreadProcessing = false;
     253           0 :                         fMe->fOutputQueue[fMe->fOutputQueueUsed++] = retVal;
     254           0 :                         if (fMe->fWaitingForTasks && fMe->fWaitingForTasks <= fMe->fOutputQueueUsed)
     255             :                         {
     256           0 :                                 fMe->fWaitingForTasks = 0;
     257           0 :                                 fMe->fBackend->UnlockMutex(3); //Enough results in queue for WaitForTasks()
     258           0 :                         }
     259           0 :                         fMe->fBackend->UnlockMutex(1);
     260           0 :                 }
     261           0 :                 if (fMe->fExit) break;
     262             :         }
     263           0 :         HLTInfo("Async Thread Terminating");
     264           0 : }
     265             : 
     266             : int AliHLTAsyncProcessor::GetTotalQueue()
     267             : {
     268           0 :         return(fMe->fInputQueueUsed + fMe->fOutputQueueUsed + (int) fMe->fAsyncThreadProcessing);
     269             : }
     270             : 
     271             : int AliHLTAsyncProcessor::GetNumberOfAsyncTasksInQueue()
     272             : {
     273           0 :         if (fMe->fQueueDepth == 0) return(0);
     274           0 :         fMe->fBackend->LockMutex(1);
     275           0 :         int retVal = GetTotalQueue();
     276           0 :         fMe->fBackend->UnlockMutex(1);
     277             :         return(retVal);
     278           0 : }
     279             : 
     280             : void* AliHLTAsyncProcessor::InitializeAsyncTask(void* (*initFunction)(void*), void* data)
     281             : {
     282           0 :         HLTInfo("Running Initialization of ASYNC Task");
     283           0 :         if (GetNumberOfAsyncTasksInQueue()) return(NULL);
     284           0 :         QueueAsyncTask(initFunction, data);
     285           0 :         WaitForTasks(1);
     286           0 :         void* retVal = RetrieveQueuedTaskResult();
     287           0 :         HLTInfo("Initialization of ASYNC Task finished");
     288             :         return(retVal);
     289           0 : }
     290             : 
     291             : int AliHLTAsyncProcessor::QueueAsyncTask(void* (*processFunction)(void*), void* data)
     292             : {
     293           0 :         if (fMe->fChildStopped)
     294             :         {
     295           0 :                 HLTError("Cannot queue new tasks after the child was stopped");
     296           0 :                 return(1);
     297             :         }
     298           0 :         if (fMe->fQueueDepth == 0)
     299             :         {
     300           0 :                 if (fMe->fOutputQueueUsed) return(1);
     301           0 :                 fMe->fOutputQueueUsed = 1;
     302           0 :                 fMe->fSynchronousOutput = processFunction(data);
     303           0 :                 return(0);
     304             :         }
     305           0 :         HLTInfo("Queuing task (Queue Fill Status %d %d %d)", fMe->fInputQueueUsed, (int) fMe->fAsyncThreadProcessing, fMe->fOutputQueueUsed);
     306           0 :         fMe->fBackend->LockMutex(1);
     307           0 :         if (GetTotalQueue() == fMe->fQueueDepth)
     308             :         {
     309           0 :                 fMe->fBackend->UnlockMutex(1);
     310           0 :                 if (fMe->fFullQueueWarning) HLTWarning("Cannot Queue Task... Queue Full");
     311           0 :                 return(1);
     312             :         }
     313           0 :         fMe->fInputQueue[fMe->fInputQueueUsed].fFunction = processFunction;
     314           0 :         fMe->fInputQueue[fMe->fInputQueueUsed].fData = data;
     315           0 :         fMe->fInputQueueUsed++;
     316           0 :         if (!fMe->fAsyncThreadRunning)
     317             :         {
     318           0 :                 fMe->fAsyncThreadRunning = true;
     319           0 :                 fMe->fBackend->UnlockMutex(2);
     320           0 :         }
     321           0 :         fMe->fBackend->UnlockMutex(1);
     322           0 :         return(0);
     323           0 : }
     324             : 
     325             : int AliHLTAsyncProcessor::IsQueuedTaskCompleted()
     326             : {
     327             :         //HLTInfo("%d results ready for retrieval", fMe->fOutputQueueUsed);
     328           0 :         return(fMe->fOutputQueueUsed);
     329             : }
     330             : 
     331             : void* AliHLTAsyncProcessor::RetrieveQueuedTaskResult()
     332             : {
     333             :         void* retVal;
     334           0 :         if (fMe->fQueueDepth == 0)
     335             :         {
     336           0 :                 fMe->fOutputQueueUsed = 0;
     337           0 :                 retVal = fMe->fSynchronousOutput;
     338           0 :                 fMe->fSynchronousOutput = NULL;
     339           0 :                 return(retVal);
     340             :         }
     341           0 :         HLTInfo("Retrieving Queued Result");
     342           0 :         if (fMe->fOutputQueueUsed == 0) return(NULL);
     343           0 :         fMe->fBackend->LockMutex(1);
     344           0 :         retVal = fMe->fOutputQueue[0];
     345           0 :         for (int i = 0;i < fMe->fOutputQueueUsed - 1;i++)
     346             :         {
     347           0 :                 fMe->fOutputQueue[i] = fMe->fOutputQueue[i + 1];
     348             :         }
     349           0 :         if (fMe->fOutputQueueUsed-- == fMe->fQueueDepth) fMe->fBackend->UnlockMutex(4); //There is no space in output queue, async thread can go on
     350           0 :         fMe->fBackend->UnlockMutex(1);
     351           0 :         return(retVal);
     352           0 : }
     353             : 
     354             : void AliHLTAsyncProcessor::WaitForTasks(int n)
     355             : {
     356           0 :         if (fMe->fQueueDepth == 0) return;
     357           0 :         fMe->fBackend->LockMutex(1);
     358           0 :         if (n == 0) n = fMe->fQueueDepth;
     359           0 :         if (n > GetTotalQueue()) n = GetTotalQueue();
     360           0 :         HLTInfo("Waiting for %d tasks", n);
     361           0 :         if (n <= fMe->fOutputQueueUsed)
     362             :         {
     363           0 :                 fMe->fBackend->UnlockMutex(1);
     364           0 :                 HLTInfo("%d Tasks already ready, no need to wait", n);
     365             :                 return;
     366             :         }
     367           0 :         fMe->fWaitingForTasks = n;
     368           0 :         fMe->fBackend->UnlockMutex(1);
     369           0 :         fMe->fBackend->LockMutex(3);
     370           0 :         HLTInfo("Waiting for %d tasks finished", n);
     371           0 : }
     372             : 
     373             : int AliHLTAsyncProcessor::LockMutex()
     374             : {
     375           0 :         if (fMe->fQueueDepth == 0) return(0);
     376           0 :         return(fMe->fBackend->LockMutex(0));
     377           0 : }
     378             : 
     379             : int AliHLTAsyncProcessor::UnlockMutex()
     380             : {
     381           0 :         if (fMe->fQueueDepth == 0) return(0);
     382           0 :         return(fMe->fBackend->UnlockMutex(0));
     383           0 : }
     384             : 
     385             : int AliHLTAsyncProcessor::TryLockMutex()
     386             : {
     387           0 :         if (fMe->fQueueDepth == 0) return(0);
     388           0 :         return(fMe->fBackend->TryLockMutex(0));
     389           0 : }
     390             : 
     391             : void* AliHLTAsyncProcessor::AllocateBuffer()
     392             : {
     393           0 :         if (!fMe->fAsyncProcess) return NULL;
     394           0 :         fMe->fBackend->LockMutex(5);
     395           0 :         for (int i = 0;i < fMe->fQueueDepth + 3;i++)
     396             :         {
     397           0 :                 if (!fMe->fBufferUsed[i])
     398             :                 {
     399           0 :                         fMe->fBufferUsed[i] = true;
     400           0 :                         fMe->fBackend->UnlockMutex(5);
     401           0 :                         return(((char*) fMe->fBufferPtr) + i * fMe->fBufferSize);
     402             :                 }
     403             :         }
     404           0 :         fMe->fBackend->UnlockMutex(5);
     405           0 :         return(NULL);
     406           0 : }
     407             : 
     408             : void AliHLTAsyncProcessor::FreeBuffer(void* ptr)
     409             : {
     410           0 :         if (fMe->fAsyncProcess)
     411             :         {
     412           0 :                 for (int i = 0;i < fMe->fQueueDepth + 3;i++)
     413             :                 {
     414           0 :                         if (((char*) fMe->fBufferPtr) + i * fMe->fBufferSize == (char*) ptr)
     415             :                         {
     416           0 :                                 fMe->fBufferUsed[i] = false;
     417           0 :                                 return;
     418             :                         }
     419             :                 }
     420             :         }
     421           0 : }
     422             : 
     423             : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::AllocateBuffer(size_t size)
     424             : {
     425             :         AliHLTAsyncProcessorBuffer* retVal;
     426           0 :         if (fMe->fAsyncProcess)
     427             :         {
     428           0 :                 if (size == 0) size = fMe->fBufferSize - fgkBufferHeaderSize;
     429           0 :                 if (size + fgkBufferHeaderSize > fMe->fBufferSize) return(NULL);
     430           0 :                 retVal = (AliHLTAsyncProcessorBuffer*) AllocateBuffer();
     431           0 :         }
     432             :         else
     433             :         {
     434           0 :                 retVal = (AliHLTAsyncProcessorBuffer*) malloc(size + fgkBufferHeaderSize);
     435             :         }
     436           0 :         if (retVal == NULL) return(NULL);
     437           0 :         retVal->fSize = size;
     438           0 :         retVal->fPtr = (AliHLTAsyncProcessorBuffer*) (((char*) retVal) + fgkBufferHeaderSize);
     439           0 :         return(retVal);
     440           0 : }
     441             : 
     442             : void AliHLTAsyncProcessor::FreeBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* buffer)
     443             : {
     444           0 :         if (fMe->fAsyncProcess)
     445             :         {
     446           0 :                 FreeBuffer((void*) buffer);
     447           0 :         }
     448             :         else
     449             :         {
     450           0 :                 free(buffer);
     451             :         }
     452           0 : }
     453             : 
     454             : 
     455             : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::SerializeIntoBuffer(TObject* obj, AliHLTComponent* cls, AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf)
     456             : {
     457           0 :         if (multiBuf)
     458             :         {
     459           0 :                 HLTFatal("Not yet implemented!!!");
     460             :         }
     461             :         AliHLTAsyncProcessorBuffer* retVal;
     462           0 :         if (fMe->fAsyncProcess)
     463             :         {
     464           0 :                 if ((retVal = AllocateBuffer(0)) == NULL) return(NULL);
     465           0 :                 if (cls->SerializeObject(obj, retVal->fPtr, retVal->fSize))
     466             :                 {
     467           0 :                         FreeBuffer(retVal);
     468           0 :                         return(NULL);
     469             :                 }
     470             :         }
     471             :         else
     472             :         {
     473           0 :                 size_t size = fgkBufferHeaderSize;
     474           0 :                 char* buffer = NULL;
     475           0 :                 if (cls->SerializeObject(obj, (void*&) buffer, size)) return(NULL);
     476           0 :                 retVal = (AliHLTAsyncProcessorBuffer*) buffer;
     477           0 :                 retVal->fPtr = buffer + fgkBufferHeaderSize;
     478           0 :                 retVal->fSize = size;
     479           0 :         }
     480           0 :         return(retVal);
     481           0 : }
     482             : 
     483             : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::AddBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf, size_t size, void* ptr)
     484             : {
     485             :         AliHLTAsyncProcessorBuffer* retVal;
     486           0 :         if (fMe->fAsyncProcess)
     487             :         {
     488           0 :                 size_t totalSize = GetTotalSize(multiBuf);
     489           0 :                 if (totalSize + fgkBufferHeaderSize + size > fMe->fBufferSize) return(NULL);
     490           0 :                 retVal = (AliHLTAsyncProcessorBuffer*) (((char*) multiBuf) + totalSize);
     491           0 :         }
     492             :         else
     493             :         {
     494           0 :                 retVal = (AliHLTAsyncProcessorBuffer*) malloc(size + fgkBufferHeaderSize);
     495           0 :                 if (retVal == NULL) return(NULL);
     496             :         }
     497           0 :         AliHLTAsyncProcessorBuffer** b = &multiBuf->fFirst;
     498           0 :         for (int i = 0;i < multiBuf->fNumberOfEntries;i++) b = &((*b)->fNext);
     499           0 :         *b = retVal;
     500           0 :         retVal->fSize = size;
     501           0 :         retVal->fPtr = (AliHLTAsyncProcessorBuffer*) (((char*) retVal) + fgkBufferHeaderSize);
     502           0 :         retVal->fNext = NULL;
     503           0 :         if (ptr) memcpy(retVal->fPtr, ptr, size);
     504           0 :         multiBuf->fNumberOfEntries++;
     505             :         return(retVal);
     506           0 : }
     507             : 
     508             : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::GetEntry(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf, int num)
     509             : {
     510           0 :         if (num >= multiBuf->fNumberOfEntries) return(NULL);
     511           0 :         AliHLTAsyncProcessorBuffer* b = multiBuf->fFirst;
     512           0 :         for (int i = 0;i < multiBuf->fNumberOfEntries - 1;i++) b = b->fNext;
     513             :         return(b);
     514           0 : }
     515             : 
     516             : size_t AliHLTAsyncProcessor::GetTotalSize(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf)
     517             : {
     518             :         size_t totalSize = fgkMultiBufferHeaderSize;
     519           0 :         AliHLTAsyncProcessorBuffer* b = multiBuf->fFirst;
     520           0 :         for (int i = 0;i < multiBuf->fNumberOfEntries;i++)
     521             :         {
     522           0 :                 totalSize += fgkBufferHeaderSize;
     523           0 :                 totalSize += alignSize(b->fSize);
     524           0 :                 b = b->fNext;
     525             :         }
     526           0 :         return(totalSize);
     527             : }
     528             : 
     529             : AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* AliHLTAsyncProcessor::AllocateMultiBuffer()
     530             : {
     531             :         AliHLTAsyncProcessorMultiBuffer* retVal;
     532           0 :         if (fMe->fAsyncProcess)
     533             :         {
     534           0 :                 if (fgkMultiBufferHeaderSize > fMe->fBufferSize) return(NULL);
     535           0 :                 retVal = (AliHLTAsyncProcessorMultiBuffer*) AllocateBuffer();
     536           0 :         }
     537             :         else
     538             :         {
     539           0 :                 retVal = (AliHLTAsyncProcessorMultiBuffer*) malloc(fgkBufferHeaderSize);
     540             :         }
     541           0 :         if (retVal == NULL) return(NULL);
     542           0 :         retVal->fNumberOfEntries = 0;
     543           0 :         retVal->fFirst = NULL;
     544           0 :         return(retVal);
     545           0 : }
     546             : 
     547             : void AliHLTAsyncProcessor::FreeBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* buffer)
     548             : {
     549           0 :         if (fMe->fAsyncProcess)
     550             :         {
     551           0 :                 FreeBuffer((void*) buffer);
     552           0 :         }
     553             :         else
     554             :         {
     555           0 :                 AliHLTAsyncProcessorBuffer* b = buffer->fFirst;
     556           0 :                 for (int i = 0;i < buffer->fNumberOfEntries;i++)
     557             :                 {
     558             :                         AliHLTAsyncProcessorBuffer* del = b;
     559           0 :                         b = b->fNext;
     560           0 :                         FreeBuffer(del);
     561             :                 }
     562           0 :                 free(buffer);
     563             :         }
     564           0 : }
     565             : 
     566             : size_t AliHLTAsyncProcessor::ChildSharedProcessBufferSize()
     567             : {
     568           0 :         return 0;
     569             : }
     570             : 
     571           0 : int AliHLTAsyncProcessor::LockMutex(int i) {return(fMe->fBackend->LockMutex(i));}
     572           0 : int AliHLTAsyncProcessor::UnlockMutex(int i) {return(fMe->fBackend->UnlockMutex(i));}
     573             : 
     574             : int AliHLTAsyncProcessor::ForceChildExit(int waitTime)
     575             : {
     576           0 :         if (fMe->fChildStopped) return(0);
     577           0 :         if (!fMe->fAsyncProcess) return(-1);
     578           0 :         if (fMe->fQueueDepth == 0) return(0);
     579           0 :         fMe->fExit = true;
     580           0 :         QueueAsyncTask(AsyncThreadStop, this);
     581           0 :         int retVal = fMe->fBackend->KillChildProcess(waitTime);
     582           0 :         if (retVal == 1) HLTWarning("Async Worker Child dit not terminate in time and was killed");
     583           0 :         fMe->fChildStopped = true;
     584           0 :         fMe->fInputQueueUsed = 0;
     585           0 :         fMe->fAsyncThreadProcessing = false;
     586           0 :         fMe->fAsyncThreadRunning = false;
     587           0 :         if (retVal == -1) HLTError("Error: Cannot stop async child");
     588             :         return(retVal);
     589           0 : }

Generated by: LCOV version 1.11