Line data Source code
1 : #ifndef H_ALIHLTASYNCPROCESSORBACKEND
2 : #define H_ALIHLTASYNCPROCESSORBACKEND
3 :
4 : /* This file is property of and copyright by the ALICE HLT Project *
5 : * ALICE Experiment at CERN, All rights reserved. *
6 : * See cxx source for full Copyright notice */
7 :
8 : /** @file AliHLTAsyncProcessorBackend.h
9 : @author David Rohr (drohr@cern.ch)
10 : */
11 :
12 : //This file is a backend for AliHLTAsyncProcessor, that implements the
13 : //mutex and thread handling via pthreads
14 :
15 : //0: User Mutex, 1: Operation Mutex, 2: Input Mutex, 3: Output Mutex, 4: Output Full Mutex, 5: Buffer Mutex
16 : #define ASYNC_MUTEX_COUNT 6
17 :
18 : #define HLT_ASYNC_USE_SEM_T //Use POSIX semaphores instead of PTHREAD mutexes
19 :
20 : #include <pthread.h>
21 : #include <sys/wait.h>
22 : #include <sys/types.h>
23 : #include <unistd.h>
24 : #include <errno.h>
25 : #include <signal.h>
26 : #ifdef HLT_ASYNC_USE_SEM_T
27 : #include <semaphore.h>
28 : #endif
29 :
30 : class AliHLTAsyncProcessorBackend
31 : {
32 : public:
33 0 : AliHLTAsyncProcessorBackend() : fInitialized(false), fAsyncThread(0), fAsyncPID(0) {};
34 : ~AliHLTAsyncProcessorBackend()
35 0 : {
36 0 : if (!fInitialized) return;
37 0 : for (int i = 0;i < ASYNC_MUTEX_COUNT;i++)
38 : {
39 : #ifdef HLT_ASYNC_USE_SEM_T
40 0 : sem_destroy(&fMutexes[i]);
41 : #else
42 : pthread_mutex_destroy(&fMutexes[i]);
43 : #endif
44 : }
45 0 : };
46 :
47 : int Initialize(bool shared)
48 : {
49 : #ifdef HLT_ASYNC_USE_SEM_T
50 0 : memset(fMutexes, 0, sizeof(fMutexes[0]) * ASYNC_MUTEX_COUNT);
51 : #else
52 : pthread_mutexattr_t attr;
53 : pthread_mutexattr_init(&attr);
54 : if (shared) if (pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED)) return(1);
55 : #endif
56 0 : for (int i = 0;i < ASYNC_MUTEX_COUNT;i++)
57 : {
58 : #ifdef HLT_ASYNC_USE_SEM_T
59 0 : if (sem_init(&fMutexes[i], shared, 1)) return(1);
60 : #else
61 : if (pthread_mutex_init(&fMutexes[i], &attr)) return(1);
62 : #endif
63 : }
64 : #ifndef HLT_ASYNC_USE_SEM_T
65 : pthread_mutexattr_destroy(&attr);
66 : #endif
67 0 : fInitialized = true;
68 0 : return(0);
69 0 : }
70 :
71 : int LockMutex(int i)
72 : {
73 : #ifdef HLT_ASYNC_USE_SEM_T
74 0 : int retVal = sem_wait(&fMutexes[i]);
75 : #else
76 : int retVal = pthread_mutex_lock(&fMutexes[i]);
77 : #endif
78 0 : if (retVal)
79 : {
80 0 : fprintf(stderr, "Error locking mutex %d error %d\n", i, errno);
81 0 : usleep(10000);
82 0 : }
83 0 : return retVal;
84 : }
85 :
86 : int UnlockMutex(int i)
87 : {
88 : #ifdef HLT_ASYNC_USE_SEM_T
89 0 : int retVal = sem_post(&fMutexes[i]);
90 : #else
91 : int retVal = pthread_mutex_unlock(&fMutexes[i]);
92 : #endif
93 0 : if (retVal)
94 : {
95 0 : fprintf(stderr, "Error unlocking mutex %d error %d\n", i, errno);
96 0 : usleep(100000);
97 0 : }
98 0 : return(retVal);
99 : }
100 :
101 : int TryLockMutex(int i)
102 : {
103 : #ifdef HLT_ASYNC_USE_SEM_T
104 0 : return(sem_trywait(&fMutexes[i]));
105 : #else
106 : return(pthread_mutex_trylock(&fMutexes[i]));
107 : #endif
108 : }
109 :
110 : int StartThread(void* (*function)(void*), void* data)
111 : {
112 0 : return(pthread_create(&fAsyncThread, NULL, function, data));
113 : }
114 :
115 : void* StopThread()
116 : {
117 0 : void* retVal;
118 0 : if (pthread_join(fAsyncThread, &retVal)) retVal = (void*) -1;
119 0 : return(retVal);
120 0 : }
121 :
122 : int StartProcess(void* (*function)(void*), void* data)
123 : {
124 0 : int pid = fork();
125 0 : if (pid < 0) return(1);
126 0 : if (pid == 0)
127 : {
128 0 : function(data);
129 0 : exit(0);
130 : }
131 0 : fAsyncPID = pid;
132 0 : return(0);
133 0 : }
134 :
135 : int StopProcess()
136 : {
137 0 : int returnStatus;
138 0 : waitpid(fAsyncPID, &returnStatus, 0);
139 0 : return(returnStatus);
140 0 : }
141 :
142 : int KillChildProcess(int waitTime)
143 : {
144 0 : do
145 : {
146 0 : int status;
147 0 : pid_t result = waitpid(fAsyncPID, &status, WNOHANG);
148 0 : if (result == 0)
149 : {
150 0 : usleep(1000);
151 : }
152 0 : else if (result == -1)
153 : {
154 0 : return(-1);
155 : }
156 : else
157 : {
158 0 : return(0);
159 : }
160 0 : } while (waitTime-- > 0);
161 0 : if (kill(fAsyncPID, SIGKILL)) return(-1);
162 0 : return(1);
163 0 : }
164 :
165 : private:
166 : #ifdef HLT_ASYNC_USE_SEM_T
167 : sem_t fMutexes[ASYNC_MUTEX_COUNT];
168 : #else
169 : pthread_mutex_t fMutexes[ASYNC_MUTEX_COUNT];
170 : #endif
171 : pthread_t fAsyncThread;
172 : pid_t fAsyncPID;
173 : bool fInitialized;
174 : };
175 :
176 : #undef ASYNC_MUTEX_COUNT
177 : #ifdef HLT_ASYNC_USE_SEM_T
178 : #undef HLT_ASYNC_USE_SEM_T
179 : #endif
180 :
181 : #endif
|