Line data Source code
1 : #ifndef ALIHLTASYNCPROCESSOR_H
2 : #define ALIHLTASYNCPROCESSOR_H
3 :
4 : #define ALIHLTASYNCPROCESSOR_ALIGN 64
5 :
6 : /* This file is property of and copyright by the ALICE HLT Project *
7 : * ALICE Experiment at CERN, All rights reserved. *
8 : * See cxx source for full Copyright notice */
9 :
10 : /** @file AliHLTAsyncProcessor.h
11 : @author David Rohr (drohr@cern.ch)
12 : */
13 :
14 : //The AliHLTAsyncProcessor class implements an interface for asynchronous
15 : //tasks for HLT components.
16 :
17 : #include "AliHLTLogging.h"
18 : class AliHLTAsyncProcessorBackend;
19 : class AliHLTComponent;
20 :
21 : class AliHLTAsyncProcessor : public AliHLTLogging
22 : {
23 : public:
24 : AliHLTAsyncProcessor();
25 : virtual ~AliHLTAsyncProcessor();
26 :
27 : //Initialize the Async Processor with given queue depth.
28 : //If initialize is not called, or depth is 0, the async processor will serialize all jobs.
29 : //In that case, QueueAsyncTask() will just run the asynchronous task itself immediately.
30 : //Returns 0 on success.
31 : int Initialize(int depth, bool process = false, size_t process_buffer_size = 0);
32 0 : void SetFullQueueWarning(int val) {fMe->fFullQueueWarning = val;}
33 :
34 : int Deinitialize();
35 : //Deinitialize async queue. Terminates async thread and frees buffers.
36 : //Requires that no tasks are left in queue. Call WaitForTasks and RetrieveQueuedTaskResult before.
37 : //Return 1 if it cannot deinitialize.
38 :
39 : //Returns the queue depth
40 0 : int GetQueueDepth() {return fMe->fQueueDepth;}
41 :
42 : //Returns the number of total async tasks currently in the queue
43 : int GetNumberOfAsyncTasksInQueue();
44 :
45 : //Can be used to perform additional initialization inside the asynchronously spawned thread.
46 : //Pass the initialization function as argument.
47 : //InitializeAsyncTask will wait for the passed initialization function in the async thread to finish.
48 : //This second initialization step is optional, and it is needed if some thread-local contexts must be initialized.
49 : //Returns the return value returned by the passed function in the async thread.
50 : //Must not be called whily async tasks are running, will return NULL in that case
51 : //In principle, the interface is the same as QueueAsyncTask below, but the function will wait for the task
52 : //to finish, collect the result, and return it automatically.
53 : void* InitializeAsyncTask(void* (*initFunction)(void*), void* data);
54 :
55 : //Queue an asynchronous task. Provide a function pointer to the processing function and a pointer to a data object.
56 : //The asynchronous task shall take ownership of this data object, so the main task may no longer use it.
57 : //Returns 0 on success. Fails if the queue limit has already been reached.
58 : int QueueAsyncTask(void* (*)(void*), void*);
59 :
60 : //Returns the number of async tasks that are completed.
61 : //If the return value is greater than 1, RetrieveQueuedTaskResult() can be used to fetch the result of the first task that finished.
62 : int IsQueuedTaskCompleted();
63 :
64 : //Fetch the result of the first task that finished.
65 : //Returns NULL if no task has finished.
66 : void* RetrieveQueuedTaskResult();
67 :
68 : //Force the child process to quit. Only works with async process not thread.
69 : //WaitTime is the maximum time allowed for the child to quit properly in msec.
70 : //Returns 0 if the child exited properly, 1 if the child was killed hard, -1 on error
71 : int ForceChildExit(int waitTime);
72 :
73 : //Wait for n async tasks to finish, where n is the argument passed.
74 : //If n=0, the function waits for all queued tasks to finish.
75 : //If n is greater than the number of queued tasks, it will also wait for all tasks.
76 : void WaitForTasks(int);
77 :
78 : //The mutex can be used to serialize execution of the main task and the asynchronous task for non-thread-save parts.
79 : //This should be used carefully.
80 : //In particular, the main task must not be locked out for a long time. Return 0 on success
81 : int LockMutex(); //Gets the lock on the mutex
82 : int UnlockMutex(); //Relieves the lock on the mutex
83 : int TryLockMutex(); //Tries to get the lock on the mutex. Returns 0 on success if mutex could be locked. Always returns immediately.
84 :
85 : //Get size of async process shared buffer objects
86 0 : int GetBufferSize() {return(fMe->fBufferSize);}
87 :
88 : struct AliHLTAsyncProcessorBuffer
89 : {
90 : friend class AliHLTAsyncProcessor;
91 :
92 : void* fPtr;
93 : size_t fSize;
94 :
95 : private:
96 : AliHLTAsyncProcessorBuffer* fNext;
97 : bool fTObject;
98 : };
99 : static const size_t fgkBufferHeaderSize = sizeof(AliHLTAsyncProcessorBuffer) + (ALIHLTASYNCPROCESSOR_ALIGN - sizeof(AliHLTAsyncProcessorBuffer) % ALIHLTASYNCPROCESSOR_ALIGN) % ALIHLTASYNCPROCESSOR_ALIGN;
100 :
101 : struct AliHLTAsyncProcessorMultiBuffer
102 : {
103 : size_t fNumberOfEntries;
104 : AliHLTAsyncProcessorBuffer* fFirst;
105 : };
106 : static const size_t fgkMultiBufferHeaderSize = sizeof(AliHLTAsyncProcessorMultiBuffer) + (ALIHLTASYNCPROCESSOR_ALIGN - sizeof(AliHLTAsyncProcessorMultiBuffer) % ALIHLTASYNCPROCESSOR_ALIGN) % ALIHLTASYNCPROCESSOR_ALIGN;
107 :
108 : //Simple version to allocate and delete a void* ptr to a buffer for an async process only
109 : void* AllocateBuffer();
110 : void FreeBuffer(void* ptr);
111 :
112 : //More elaborate version: Allocate and delete a buffer object that contains also the size (good for passing with the functions).
113 : //Works for async threads and async processes. For an async process, size = 0 allocates the maximum buffer size.
114 : AliHLTAsyncProcessorBuffer* AllocateBuffer(size_t size);
115 : void FreeBuffer(AliHLTAsyncProcessorBuffer* buffer);
116 :
117 : //Serializes an object into an AliHLTAsyncProcessorBuffer and returns the pointer to it, containing the pointer to the serialized object and its size
118 : AliHLTAsyncProcessorBuffer* SerializeIntoBuffer(TObject* obj, AliHLTComponent* cls, AliHLTAsyncProcessorMultiBuffer* multiBuf = NULL);
119 :
120 : //Allocate and Free MultiBuffer that stores multiple objects
121 : AliHLTAsyncProcessorMultiBuffer* AllocateMultiBuffer();
122 : void FreeBuffer(AliHLTAsyncProcessorMultiBuffer* ptr);
123 :
124 : //Add a new buffer to a multibuffer. If ptr is nonzero, copy the content of ptr there.
125 : AliHLTAsyncProcessorBuffer* AddBuffer(AliHLTAsyncProcessorMultiBuffer* multiBuf, size_t size, void* ptr = NULL);
126 : AliHLTAsyncProcessorBuffer* GetEntry(AliHLTAsyncProcessorMultiBuffer* multiBuf, int num);
127 :
128 0 : size_t BufferSize() {return(fMe->fBufferSize);}
129 0 : void RequestPush() {fMe->fRequestPush = true;}
130 0 : bool PushRequested() {bool tmp = fMe->fRequestPush;fMe->fRequestPush = false;return tmp;}
131 :
132 : private:
133 : AliHLTAsyncProcessor(const AliHLTAsyncProcessor&);
134 : AliHLTAsyncProcessor& operator=(const AliHLTAsyncProcessor&);
135 :
136 : //Provide additional shared buffer resources for a derived class when mode is fAsyncProcess.
137 : //Memory is zero-initialized.
138 : virtual size_t ChildSharedProcessBufferSize();
139 :
140 : size_t GetTotalSize(AliHLTAsyncProcessorMultiBuffer* multiBuf);
141 :
142 : struct AliHLTAsyncProcessorInput
143 : {
144 : void* (*fFunction)(void*);
145 : void* fData;
146 : };
147 :
148 : int GetTotalQueue();
149 : void AsyncThread();
150 : static void* AsyncThreadStartHelper(void*);
151 : static void* AsyncThreadStop(void*);
152 :
153 : static void* alignPointer(void* ptr, size_t size);
154 : static size_t alignSize(size_t size);
155 :
156 : protected:
157 : struct AliHLTAsyncProcessorContent
158 : {
159 : int fQueueDepth;
160 : bool fAsyncThreadRunning, fAsyncThreadProcessing;
161 : bool fExit;
162 : bool fChildStopped;
163 : AliHLTAsyncProcessorBackend* fBackend;
164 :
165 : AliHLTAsyncProcessorInput* fInputQueue;
166 : void** fOutputQueue;
167 : int fInputQueueUsed, fOutputQueueUsed;
168 : int fWaitingForTasks;
169 : int fFullQueueWarning;
170 :
171 : void* fSynchronousOutput; //In synchronous mode, we need one output buffer, without initialization
172 :
173 : void* fBasePtr;
174 : void* fBufferPtr;
175 : bool* fBufferUsed;
176 : size_t fBufferSize;
177 : size_t fmmapSize;
178 :
179 : int fAsyncProcess;
180 : void* fChildBufferSpace;
181 :
182 : bool fRequestPush;
183 : };
184 :
185 : int LockMutex(int i);
186 : int UnlockMutex(int i);
187 :
188 : AliHLTAsyncProcessorContent* fMe; //This points to the interior of the Async Processor, possible in shared memory if the async part is an individual process
189 :
190 126 : ClassDef(AliHLTAsyncProcessor, 0);
191 : };
192 :
193 : #endif
|