Line data Source code
1 : /* This file is property of and copyright by the ALICE HLT Project *
2 : * ALICE Experiment at CERN, All rights reserved. *
3 : * See cxx source for full Copyright notice */
4 :
5 : /** @file AliHLTAsyncProcessor.cxx
6 : @author David Rohr (drohr@cern.ch)
7 : */
8 :
9 : #ifdef __MACH__
10 : #define MAP_ANONYMOUS MAP_ANON
11 : #endif
12 :
13 : #include "AliHLTAsyncProcessor.h"
14 : #include "AliHLTAsyncProcessorBackend.h"
15 : #include "AliHLTComponent.h"
16 : #include <sys/mman.h>
17 :
18 126 : ClassImp(AliHLTAsyncProcessor)
19 :
20 18 : AliHLTAsyncProcessor::AliHLTAsyncProcessor() : AliHLTLogging(), fMe(new AliHLTAsyncProcessorContent)
21 18 : {
22 6 : fMe->fQueueDepth = 0;
23 6 : fMe->fAsyncThreadRunning = false;
24 6 : fMe->fAsyncThreadProcessing = false;
25 6 : fMe->fExit = false;
26 6 : fMe->fChildStopped = false;
27 6 : fMe->fBackend = NULL;
28 6 : fMe->fInputQueue = NULL;
29 6 : fMe->fOutputQueue = NULL;
30 6 : fMe->fInputQueueUsed = 0;
31 6 : fMe->fOutputQueueUsed = 0;
32 6 : fMe->fWaitingForTasks = 0;
33 6 : fMe->fFullQueueWarning = 1;
34 6 : fMe->fSynchronousOutput = NULL;
35 6 : fMe->fBasePtr = NULL;
36 6 : fMe->fBufferPtr = NULL;
37 6 : fMe->fBufferSize = 0;
38 6 : fMe->fAsyncProcess = 0;
39 6 : fMe->fChildBufferSpace = NULL;
40 6 : fMe->fmmapSize = 0;
41 6 : fMe->fRequestPush = false;
42 6 : }
43 :
44 : AliHLTAsyncProcessor::~AliHLTAsyncProcessor()
45 12 : {
46 6 : if (fMe->fQueueDepth) Deinitialize();
47 12 : delete fMe;
48 6 : }
49 :
50 : size_t AliHLTAsyncProcessor::alignSize(size_t size)
51 : {
52 0 : if (size % ALIHLTASYNCPROCESSOR_ALIGN) size += ALIHLTASYNCPROCESSOR_ALIGN - size % ALIHLTASYNCPROCESSOR_ALIGN;
53 0 : return(size);
54 : }
55 :
56 : void* AliHLTAsyncProcessor::alignPointer(void* ptr, size_t size)
57 : {
58 0 : size_t tmp = (size_t) ptr;
59 0 : tmp += size;
60 0 : tmp = alignSize(tmp);
61 0 : return (void*) tmp;
62 : }
63 :
64 : int AliHLTAsyncProcessor::Initialize(int depth, bool process, size_t process_buffer_size)
65 : {
66 0 : HLTInfo("Initializing ASYNC Processor");
67 0 : if (fMe->fQueueDepth) return(1);
68 0 : fMe->fQueueDepth = depth;
69 0 : if (fMe->fQueueDepth)
70 : {
71 0 : fMe->fAsyncProcess = process;
72 0 : size_t size = sizeof(AliHLTAsyncProcessorBackend) +
73 0 : (sizeof(AliHLTAsyncProcessorInput) + sizeof(void*)) * fMe->fQueueDepth +
74 0 : process_buffer_size * (fMe->fQueueDepth + 3) +
75 0 : ChildSharedProcessBufferSize() +
76 0 : (6 + fMe->fQueueDepth) * ALIHLTASYNCPROCESSOR_ALIGN;
77 : void* tmpPtr;
78 0 : if (fMe->fAsyncProcess) //promote to running async process instead of async thread
79 : {
80 0 : fMe->fmmapSize = sizeof(AliHLTAsyncProcessorContent) +
81 0 : sizeof(bool) * (fMe->fQueueDepth + 3) +
82 0 : 2 * ALIHLTASYNCPROCESSOR_ALIGN +
83 : size;
84 0 : fMe->fBasePtr = mmap(NULL, fMe->fmmapSize, PROT_READ | PROT_WRITE, MAP_SHARED | MAP_ANONYMOUS, 0, 0);
85 0 : if (fMe->fBasePtr == NULL) return(1);
86 0 : memcpy(fMe->fBasePtr, fMe, sizeof(AliHLTAsyncProcessorContent));
87 0 : delete fMe;
88 0 : fMe = (AliHLTAsyncProcessorContent*) fMe->fBasePtr;
89 0 : fMe->fBufferSize = process_buffer_size;
90 0 : if (fMe->fBufferSize % ALIHLTASYNCPROCESSOR_ALIGN) fMe->fBufferSize += ALIHLTASYNCPROCESSOR_ALIGN - fMe->fBufferSize % ALIHLTASYNCPROCESSOR_ALIGN;
91 0 : tmpPtr = alignPointer(fMe->fBasePtr, sizeof(AliHLTAsyncProcessorContent));
92 :
93 0 : fMe->fBufferUsed = new (tmpPtr) bool[fMe->fQueueDepth + 3];
94 0 : memset(fMe->fBufferUsed, 0, sizeof(bool) * (fMe->fQueueDepth + 3));
95 0 : tmpPtr = alignPointer(tmpPtr, sizeof(bool) * (fMe->fQueueDepth + 3));
96 0 : }
97 : else
98 : {
99 0 : fMe->fBasePtr = malloc(size);
100 0 : if (fMe->fBasePtr == 0) return(1);
101 : tmpPtr = fMe->fBasePtr;
102 : }
103 :
104 0 : fMe->fBackend = new (tmpPtr) AliHLTAsyncProcessorBackend;
105 0 : if (fMe->fBackend->Initialize(fMe->fAsyncProcess)) return(1);
106 0 : tmpPtr = alignPointer(tmpPtr, sizeof(AliHLTAsyncProcessorBackend));
107 :
108 0 : fMe->fInputQueue = new (tmpPtr) AliHLTAsyncProcessorInput[fMe->fQueueDepth];
109 0 : tmpPtr = alignPointer(tmpPtr, sizeof(AliHLTAsyncProcessorInput) * fMe->fQueueDepth);
110 :
111 0 : fMe->fOutputQueue = new (tmpPtr) void*[fMe->fQueueDepth];
112 0 : tmpPtr = alignPointer(tmpPtr, sizeof(void*) * fMe->fQueueDepth);
113 :
114 0 : if (ChildSharedProcessBufferSize())
115 : {
116 0 : fMe->fChildBufferSpace = tmpPtr;
117 0 : memset(fMe->fChildBufferSpace, 0, ChildSharedProcessBufferSize());
118 0 : tmpPtr = alignPointer(tmpPtr, ChildSharedProcessBufferSize());
119 0 : }
120 :
121 0 : fMe->fBufferPtr = tmpPtr;
122 :
123 0 : for (int i = 2;i <= 4;i++) fMe->fBackend->LockMutex(i); //Lock thread control mutexes
124 0 : if (fMe->fAsyncProcess)
125 : {
126 0 : if (fMe->fBackend->StartProcess(AsyncThreadStartHelper, this)) return(1);
127 : }
128 : else
129 : {
130 0 : if (fMe->fBackend->StartThread(AsyncThreadStartHelper, this)) return(1);
131 : }
132 0 : }
133 0 : return(0);
134 0 : }
135 :
136 : int AliHLTAsyncProcessor::Deinitialize()
137 : {
138 0 : HLTInfo("Deinitializing ASYNC Processor");
139 0 : if (fMe->fQueueDepth == 0) return(0);
140 0 : fMe->fBackend->LockMutex(1);
141 0 : if (GetTotalQueue())
142 : {
143 0 : fMe->fBackend->UnlockMutex(1);
144 0 : HLTError("Error during deinitialization of ASYNC Processor - Still tasks in queue");
145 0 : return(1);
146 : }
147 : fMe->fBackend->UnlockMutex(1);
148 0 : if (!fMe->fChildStopped) QueueAsyncTask(AsyncThreadStop, this);
149 0 : if (fMe->fAsyncProcess)
150 : {
151 0 : fMe->fBackend->StopProcess();
152 0 : }
153 : else
154 : {
155 0 : fMe->fBackend->StopThread();
156 : }
157 0 : for (int i = 3;i <= 4;i++) fMe->fBackend->UnlockMutex(i); //Unlock remaining thread control mutexes
158 0 : for (int i = 0;i < fMe->fQueueDepth;i++)
159 : {
160 0 : fMe->fInputQueue[i].~AliHLTAsyncProcessorInput();
161 : //fMe->fOutputQueue.~void*(); //Adapt if this is becoming a real struct!
162 : }
163 0 : fMe->fAsyncThreadRunning = fMe->fAsyncThreadProcessing = false;
164 0 : fMe->fBackend->~AliHLTAsyncProcessorBackend();;
165 0 : fMe->fBackend = NULL;
166 0 : fMe->fQueueDepth = 0;
167 0 : fMe->fBufferSize = 0;
168 0 : if (fMe->fAsyncProcess)
169 : {
170 0 : void* tmp = fMe;
171 0 : fMe = new AliHLTAsyncProcessorContent;
172 0 : memcpy(fMe, tmp, sizeof(AliHLTAsyncProcessorContent));
173 0 : munmap(fMe->fBasePtr, fMe->fmmapSize);
174 0 : }
175 : else
176 : {
177 0 : free(fMe->fBasePtr);
178 : }
179 :
180 0 : fMe->fExit = false;
181 0 : fMe->fChildStopped = false;
182 0 : fMe->fBackend = NULL;
183 0 : fMe->fInputQueue = NULL;
184 0 : fMe->fOutputQueue = NULL;
185 0 : fMe->fInputQueueUsed = 0;
186 0 : fMe->fOutputQueueUsed = 0;
187 0 : fMe->fWaitingForTasks = 0;
188 0 : fMe->fBasePtr = NULL;
189 0 : fMe->fBufferPtr = NULL;
190 0 : fMe->fBufferSize = 0;
191 0 : fMe->fAsyncProcess = 0;
192 0 : fMe->fChildBufferSpace = NULL;
193 0 : fMe->fmmapSize = 0;
194 0 : fMe->fRequestPush = false;
195 :
196 0 : HLTInfo("Deinitialization of ASYNC Processor done");
197 0 : return(0);
198 0 : }
199 :
200 : void* AliHLTAsyncProcessor::AsyncThreadStartHelper(void* obj)
201 : {
202 0 : ((AliHLTAsyncProcessor*) obj)->AsyncThread();
203 0 : pthread_exit(NULL);
204 : return(NULL);
205 : }
206 :
207 : void* AliHLTAsyncProcessor::AsyncThreadStop(void* obj)
208 : {
209 0 : ((AliHLTAsyncProcessor*) obj)->fMe->fExit = true;
210 0 : return(NULL);
211 : }
212 :
213 : void AliHLTAsyncProcessor::AsyncThread()
214 : {
215 0 : HLTInfo("Async Thread Running");
216 : while (true)
217 : {
218 0 : fMe->fBackend->LockMutex(2); //Lock async thread if nothing to do
219 0 : while (true)
220 : {
221 0 : fMe->fBackend->LockMutex(1);
222 0 : if (fMe->fInputQueueUsed == 0)
223 : {
224 0 : fMe->fAsyncThreadRunning = false;
225 0 : fMe->fBackend->UnlockMutex(1);
226 0 : break; //No work to do, finish loops and lock this thread again
227 : }
228 0 : void* (*function)(void*) = fMe->fInputQueue[0].fFunction;
229 0 : void* data = fMe->fInputQueue[0].fData;
230 0 : for (int i = 0;i < fMe->fInputQueueUsed - 1;i++)
231 : {
232 0 : fMe->fInputQueue[i] = fMe->fInputQueue[i + 1];
233 : }
234 0 : fMe->fInputQueueUsed--;
235 0 : fMe->fAsyncThreadProcessing = true;
236 0 : fMe->fBackend->UnlockMutex(1);
237 0 : void* retVal = function(data);
238 0 : fMe->fBackend->LockMutex(1);
239 0 : if (fMe->fExit)
240 : {
241 0 : fMe->fAsyncThreadProcessing = false;
242 0 : fMe->fAsyncThreadRunning = false;
243 0 : fMe->fBackend->UnlockMutex(1);
244 0 : break;
245 : }
246 0 : if (fMe->fOutputQueueUsed == fMe->fQueueDepth)
247 : {
248 0 : fMe->fBackend->UnlockMutex(1);
249 0 : fMe->fBackend->LockMutex(4);
250 0 : fMe->fBackend->LockMutex(1);
251 0 : }
252 0 : fMe->fAsyncThreadProcessing = false;
253 0 : fMe->fOutputQueue[fMe->fOutputQueueUsed++] = retVal;
254 0 : if (fMe->fWaitingForTasks && fMe->fWaitingForTasks <= fMe->fOutputQueueUsed)
255 : {
256 0 : fMe->fWaitingForTasks = 0;
257 0 : fMe->fBackend->UnlockMutex(3); //Enough results in queue for WaitForTasks()
258 0 : }
259 0 : fMe->fBackend->UnlockMutex(1);
260 0 : }
261 0 : if (fMe->fExit) break;
262 : }
263 0 : HLTInfo("Async Thread Terminating");
264 0 : }
265 :
266 : int AliHLTAsyncProcessor::GetTotalQueue()
267 : {
268 0 : return(fMe->fInputQueueUsed + fMe->fOutputQueueUsed + (int) fMe->fAsyncThreadProcessing);
269 : }
270 :
271 : int AliHLTAsyncProcessor::GetNumberOfAsyncTasksInQueue()
272 : {
273 0 : if (fMe->fQueueDepth == 0) return(0);
274 0 : fMe->fBackend->LockMutex(1);
275 0 : int retVal = GetTotalQueue();
276 0 : fMe->fBackend->UnlockMutex(1);
277 : return(retVal);
278 0 : }
279 :
280 : void* AliHLTAsyncProcessor::InitializeAsyncTask(void* (*initFunction)(void*), void* data)
281 : {
282 0 : HLTInfo("Running Initialization of ASYNC Task");
283 0 : if (GetNumberOfAsyncTasksInQueue()) return(NULL);
284 0 : QueueAsyncTask(initFunction, data);
285 0 : WaitForTasks(1);
286 0 : void* retVal = RetrieveQueuedTaskResult();
287 0 : HLTInfo("Initialization of ASYNC Task finished");
288 : return(retVal);
289 0 : }
290 :
291 : int AliHLTAsyncProcessor::QueueAsyncTask(void* (*processFunction)(void*), void* data)
292 : {
293 0 : if (fMe->fChildStopped)
294 : {
295 0 : HLTError("Cannot queue new tasks after the child was stopped");
296 0 : return(1);
297 : }
298 0 : if (fMe->fQueueDepth == 0)
299 : {
300 0 : if (fMe->fOutputQueueUsed) return(1);
301 0 : fMe->fOutputQueueUsed = 1;
302 0 : fMe->fSynchronousOutput = processFunction(data);
303 0 : return(0);
304 : }
305 0 : HLTInfo("Queuing task (Queue Fill Status %d %d %d)", fMe->fInputQueueUsed, (int) fMe->fAsyncThreadProcessing, fMe->fOutputQueueUsed);
306 0 : fMe->fBackend->LockMutex(1);
307 0 : if (GetTotalQueue() == fMe->fQueueDepth)
308 : {
309 0 : fMe->fBackend->UnlockMutex(1);
310 0 : if (fMe->fFullQueueWarning) HLTWarning("Cannot Queue Task... Queue Full");
311 0 : return(1);
312 : }
313 0 : fMe->fInputQueue[fMe->fInputQueueUsed].fFunction = processFunction;
314 0 : fMe->fInputQueue[fMe->fInputQueueUsed].fData = data;
315 0 : fMe->fInputQueueUsed++;
316 0 : if (!fMe->fAsyncThreadRunning)
317 : {
318 0 : fMe->fAsyncThreadRunning = true;
319 0 : fMe->fBackend->UnlockMutex(2);
320 0 : }
321 0 : fMe->fBackend->UnlockMutex(1);
322 0 : return(0);
323 0 : }
324 :
325 : int AliHLTAsyncProcessor::IsQueuedTaskCompleted()
326 : {
327 : //HLTInfo("%d results ready for retrieval", fMe->fOutputQueueUsed);
328 0 : return(fMe->fOutputQueueUsed);
329 : }
330 :
331 : void* AliHLTAsyncProcessor::RetrieveQueuedTaskResult()
332 : {
333 : void* retVal;
334 0 : if (fMe->fQueueDepth == 0)
335 : {
336 0 : fMe->fOutputQueueUsed = 0;
337 0 : retVal = fMe->fSynchronousOutput;
338 0 : fMe->fSynchronousOutput = NULL;
339 0 : return(retVal);
340 : }
341 0 : HLTInfo("Retrieving Queued Result");
342 0 : if (fMe->fOutputQueueUsed == 0) return(NULL);
343 0 : fMe->fBackend->LockMutex(1);
344 0 : retVal = fMe->fOutputQueue[0];
345 0 : for (int i = 0;i < fMe->fOutputQueueUsed - 1;i++)
346 : {
347 0 : fMe->fOutputQueue[i] = fMe->fOutputQueue[i + 1];
348 : }
349 0 : if (fMe->fOutputQueueUsed-- == fMe->fQueueDepth) fMe->fBackend->UnlockMutex(4); //There is no space in output queue, async thread can go on
350 0 : fMe->fBackend->UnlockMutex(1);
351 0 : return(retVal);
352 0 : }
353 :
354 : void AliHLTAsyncProcessor::WaitForTasks(int n)
355 : {
356 0 : if (fMe->fQueueDepth == 0) return;
357 0 : fMe->fBackend->LockMutex(1);
358 0 : if (n == 0) n = fMe->fQueueDepth;
359 0 : if (n > GetTotalQueue()) n = GetTotalQueue();
360 0 : HLTInfo("Waiting for %d tasks", n);
361 0 : if (n <= fMe->fOutputQueueUsed)
362 : {
363 0 : fMe->fBackend->UnlockMutex(1);
364 0 : HLTInfo("%d Tasks already ready, no need to wait", n);
365 : return;
366 : }
367 0 : fMe->fWaitingForTasks = n;
368 0 : fMe->fBackend->UnlockMutex(1);
369 0 : fMe->fBackend->LockMutex(3);
370 0 : HLTInfo("Waiting for %d tasks finished", n);
371 0 : }
372 :
373 : int AliHLTAsyncProcessor::LockMutex()
374 : {
375 0 : if (fMe->fQueueDepth == 0) return(0);
376 0 : return(fMe->fBackend->LockMutex(0));
377 0 : }
378 :
379 : int AliHLTAsyncProcessor::UnlockMutex()
380 : {
381 0 : if (fMe->fQueueDepth == 0) return(0);
382 0 : return(fMe->fBackend->UnlockMutex(0));
383 0 : }
384 :
385 : int AliHLTAsyncProcessor::TryLockMutex()
386 : {
387 0 : if (fMe->fQueueDepth == 0) return(0);
388 0 : return(fMe->fBackend->TryLockMutex(0));
389 0 : }
390 :
391 : void* AliHLTAsyncProcessor::AllocateBuffer()
392 : {
393 0 : if (!fMe->fAsyncProcess) return NULL;
394 0 : fMe->fBackend->LockMutex(5);
395 0 : for (int i = 0;i < fMe->fQueueDepth + 3;i++)
396 : {
397 0 : if (!fMe->fBufferUsed[i])
398 : {
399 0 : fMe->fBufferUsed[i] = true;
400 0 : fMe->fBackend->UnlockMutex(5);
401 0 : return(((char*) fMe->fBufferPtr) + i * fMe->fBufferSize);
402 : }
403 : }
404 0 : fMe->fBackend->UnlockMutex(5);
405 0 : return(NULL);
406 0 : }
407 :
408 : void AliHLTAsyncProcessor::FreeBuffer(void* ptr)
409 : {
410 0 : if (fMe->fAsyncProcess)
411 : {
412 0 : for (int i = 0;i < fMe->fQueueDepth + 3;i++)
413 : {
414 0 : if (((char*) fMe->fBufferPtr) + i * fMe->fBufferSize == (char*) ptr)
415 : {
416 0 : fMe->fBufferUsed[i] = false;
417 0 : return;
418 : }
419 : }
420 : }
421 0 : }
422 :
423 : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::AllocateBuffer(size_t size)
424 : {
425 : AliHLTAsyncProcessorBuffer* retVal;
426 0 : if (fMe->fAsyncProcess)
427 : {
428 0 : if (size == 0) size = fMe->fBufferSize - fgkBufferHeaderSize;
429 0 : if (size + fgkBufferHeaderSize > fMe->fBufferSize) return(NULL);
430 0 : retVal = (AliHLTAsyncProcessorBuffer*) AllocateBuffer();
431 0 : }
432 : else
433 : {
434 0 : retVal = (AliHLTAsyncProcessorBuffer*) malloc(size + fgkBufferHeaderSize);
435 : }
436 0 : if (retVal == NULL) return(NULL);
437 0 : retVal->fSize = size;
438 0 : retVal->fPtr = (AliHLTAsyncProcessorBuffer*) (((char*) retVal) + fgkBufferHeaderSize);
439 0 : return(retVal);
440 0 : }
441 :
442 : void AliHLTAsyncProcessor::FreeBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* buffer)
443 : {
444 0 : if (fMe->fAsyncProcess)
445 : {
446 0 : FreeBuffer((void*) buffer);
447 0 : }
448 : else
449 : {
450 0 : free(buffer);
451 : }
452 0 : }
453 :
454 :
455 : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::SerializeIntoBuffer(TObject* obj, AliHLTComponent* cls, AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf)
456 : {
457 0 : if (multiBuf)
458 : {
459 0 : HLTFatal("Not yet implemented!!!");
460 : }
461 : AliHLTAsyncProcessorBuffer* retVal;
462 0 : if (fMe->fAsyncProcess)
463 : {
464 0 : if ((retVal = AllocateBuffer(0)) == NULL) return(NULL);
465 0 : if (cls->SerializeObject(obj, retVal->fPtr, retVal->fSize))
466 : {
467 0 : FreeBuffer(retVal);
468 0 : return(NULL);
469 : }
470 : }
471 : else
472 : {
473 0 : size_t size = fgkBufferHeaderSize;
474 0 : char* buffer = NULL;
475 0 : if (cls->SerializeObject(obj, (void*&) buffer, size)) return(NULL);
476 0 : retVal = (AliHLTAsyncProcessorBuffer*) buffer;
477 0 : retVal->fPtr = buffer + fgkBufferHeaderSize;
478 0 : retVal->fSize = size;
479 0 : }
480 0 : return(retVal);
481 0 : }
482 :
483 : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::AddBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf, size_t size, void* ptr)
484 : {
485 : AliHLTAsyncProcessorBuffer* retVal;
486 0 : if (fMe->fAsyncProcess)
487 : {
488 0 : size_t totalSize = GetTotalSize(multiBuf);
489 0 : if (totalSize + fgkBufferHeaderSize + size > fMe->fBufferSize) return(NULL);
490 0 : retVal = (AliHLTAsyncProcessorBuffer*) (((char*) multiBuf) + totalSize);
491 0 : }
492 : else
493 : {
494 0 : retVal = (AliHLTAsyncProcessorBuffer*) malloc(size + fgkBufferHeaderSize);
495 0 : if (retVal == NULL) return(NULL);
496 : }
497 0 : AliHLTAsyncProcessorBuffer** b = &multiBuf->fFirst;
498 0 : for (int i = 0;i < multiBuf->fNumberOfEntries;i++) b = &((*b)->fNext);
499 0 : *b = retVal;
500 0 : retVal->fSize = size;
501 0 : retVal->fPtr = (AliHLTAsyncProcessorBuffer*) (((char*) retVal) + fgkBufferHeaderSize);
502 0 : retVal->fNext = NULL;
503 0 : if (ptr) memcpy(retVal->fPtr, ptr, size);
504 0 : multiBuf->fNumberOfEntries++;
505 : return(retVal);
506 0 : }
507 :
508 : AliHLTAsyncProcessor::AliHLTAsyncProcessorBuffer* AliHLTAsyncProcessor::GetEntry(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf, int num)
509 : {
510 0 : if (num >= multiBuf->fNumberOfEntries) return(NULL);
511 0 : AliHLTAsyncProcessorBuffer* b = multiBuf->fFirst;
512 0 : for (int i = 0;i < multiBuf->fNumberOfEntries - 1;i++) b = b->fNext;
513 : return(b);
514 0 : }
515 :
516 : size_t AliHLTAsyncProcessor::GetTotalSize(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* multiBuf)
517 : {
518 : size_t totalSize = fgkMultiBufferHeaderSize;
519 0 : AliHLTAsyncProcessorBuffer* b = multiBuf->fFirst;
520 0 : for (int i = 0;i < multiBuf->fNumberOfEntries;i++)
521 : {
522 0 : totalSize += fgkBufferHeaderSize;
523 0 : totalSize += alignSize(b->fSize);
524 0 : b = b->fNext;
525 : }
526 0 : return(totalSize);
527 : }
528 :
529 : AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* AliHLTAsyncProcessor::AllocateMultiBuffer()
530 : {
531 : AliHLTAsyncProcessorMultiBuffer* retVal;
532 0 : if (fMe->fAsyncProcess)
533 : {
534 0 : if (fgkMultiBufferHeaderSize > fMe->fBufferSize) return(NULL);
535 0 : retVal = (AliHLTAsyncProcessorMultiBuffer*) AllocateBuffer();
536 0 : }
537 : else
538 : {
539 0 : retVal = (AliHLTAsyncProcessorMultiBuffer*) malloc(fgkBufferHeaderSize);
540 : }
541 0 : if (retVal == NULL) return(NULL);
542 0 : retVal->fNumberOfEntries = 0;
543 0 : retVal->fFirst = NULL;
544 0 : return(retVal);
545 0 : }
546 :
547 : void AliHLTAsyncProcessor::FreeBuffer(AliHLTAsyncProcessor::AliHLTAsyncProcessorMultiBuffer* buffer)
548 : {
549 0 : if (fMe->fAsyncProcess)
550 : {
551 0 : FreeBuffer((void*) buffer);
552 0 : }
553 : else
554 : {
555 0 : AliHLTAsyncProcessorBuffer* b = buffer->fFirst;
556 0 : for (int i = 0;i < buffer->fNumberOfEntries;i++)
557 : {
558 : AliHLTAsyncProcessorBuffer* del = b;
559 0 : b = b->fNext;
560 0 : FreeBuffer(del);
561 : }
562 0 : free(buffer);
563 : }
564 0 : }
565 :
566 : size_t AliHLTAsyncProcessor::ChildSharedProcessBufferSize()
567 : {
568 0 : return 0;
569 : }
570 :
571 0 : int AliHLTAsyncProcessor::LockMutex(int i) {return(fMe->fBackend->LockMutex(i));}
572 0 : int AliHLTAsyncProcessor::UnlockMutex(int i) {return(fMe->fBackend->UnlockMutex(i));}
573 :
574 : int AliHLTAsyncProcessor::ForceChildExit(int waitTime)
575 : {
576 0 : if (fMe->fChildStopped) return(0);
577 0 : if (!fMe->fAsyncProcess) return(-1);
578 0 : if (fMe->fQueueDepth == 0) return(0);
579 0 : fMe->fExit = true;
580 0 : QueueAsyncTask(AsyncThreadStop, this);
581 0 : int retVal = fMe->fBackend->KillChildProcess(waitTime);
582 0 : if (retVal == 1) HLTWarning("Async Worker Child dit not terminate in time and was killed");
583 0 : fMe->fChildStopped = true;
584 0 : fMe->fInputQueueUsed = 0;
585 0 : fMe->fAsyncThreadProcessing = false;
586 0 : fMe->fAsyncThreadRunning = false;
587 0 : if (retVal == -1) HLTError("Error: Cannot stop async child");
588 : return(retVal);
589 0 : }
|