Line data Source code
1 : /************************************************************************
2 : **
3 : **
4 : ** This file is property of and copyright by the Technical Computer
5 : ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 : ** University, Heidelberg, Germany, 2001
7 : ** This file has been written by Timm Morten Steinbeck,
8 : ** timm@kip.uni-heidelberg.de
9 : **
10 : **
11 : ** See the file license.txt for details regarding usage, modification,
12 : ** distribution and warranty.
13 : ** Important: This file is provided without any warranty, including
14 : ** fitness for any particular purpose.
15 : **
16 : **
17 : ** Newer versions of this file's package will be made available from
18 : ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 : ** or the corresponding page of the Heidelberg Alice Level 3 group.
20 : **
21 : *************************************************************************/
22 :
23 : /*
24 : ***************************************************************************
25 : **
26 : ** $Author$ - Initial Version by Timm Morten Steinbeck
27 : **
28 : ** $Id$
29 : **
30 : ***************************************************************************
31 : */
32 :
33 : /** @file AliHLTHOMERReader.cxx
34 : @author Timm Steinbeck
35 : @date Sep 14 2007
36 : @brief HLT Online Monitoring Environment including ROOT - Reader
37 : @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38 :
39 : // see header file for class documentation
40 : // or
41 : // refer to README to build package
42 : // or
43 : // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44 :
45 : #include "AliHLTHOMERReader.h"
46 : //#include <stdio.h>
47 : #ifdef __SUNPRO_CC
48 : #include <string.h>
49 : #else
50 : #include <cstring>
51 : #endif
52 : #include <cerrno>
53 : #include <netdb.h>
54 : //#include <sys/types.h>
55 : //#include <sys/socket.h>
56 : //#include <netinet/in.h>
57 : //#include <netinet/tcp.h>
58 : #include <unistd.h>
59 : #ifndef __CYGWIN__
60 : #include <rpc/types.h>
61 : #endif
62 : #include <fcntl.h>
63 : #include <sys/stat.h>
64 : #include <netinet/in.h>
65 : #include <arpa/inet.h>
66 : #ifdef USE_ROOT
67 : #include <Rtypes.h>
68 : #endif
69 :
70 :
71 : #define MOD_BIN "MOD BIN\n"
72 : #define MOD_ASC "MOD ASC\n"
73 : #define GET_ONE "GET ONE\n"
74 : #define GET_ALL "GET ALL\n"
75 :
76 : // MAXHOSTNAMELEN not defined on macosx
77 : // 686-apple-darwin9-gcc-4.0.1
78 : #ifndef MAXHOSTNAMELEN
79 : #define MAXHOSTNAMELEN 64
80 : #endif
81 :
82 : #ifdef USE_ROOT
83 126 : ClassImp(AliHLTMonitoringReader);
84 126 : ClassImp(AliHLTHOMERReader);
85 : #endif
86 :
87 :
88 :
89 :
90 :
91 : #ifdef USE_ROOT
92 0 : AliHLTHOMERReader::AliHLTHOMERReader()
93 : :
94 0 : fCurrentEventType(~(homer_uint64)0),
95 0 : fCurrentEventID(~(homer_uint64)0),
96 0 : fBlockCnt(0),
97 0 : fMaxBlockCnt(0),
98 0 : fBlocks(NULL),
99 0 : fDataSourceCnt(0),
100 0 : fTCPDataSourceCnt(0),
101 0 : fShmDataSourceCnt(0),
102 0 : fDataSourceMaxCnt(0),
103 0 : fDataSources(NULL),
104 0 : fConnectionStatus(0),
105 0 : fErrorConnection(~(unsigned int)0),
106 0 : fEventRequestAdvanceTime(0)
107 0 : {
108 : // Reader implementation of the HOMER interface.
109 : // The HLT Monitoring Environment including ROOT is
110 : // a native interface to ship out data from the HLT chain.
111 : // See pdf document shiped with the package
112 : // for class documentation and tutorial.
113 0 : Init();
114 0 : }
115 : #endif
116 :
117 :
118 : AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
119 : :
120 0 : AliHLTMonitoringReader(),
121 0 : TObject(),
122 0 : fCurrentEventType(~(homer_uint64)0),
123 0 : fCurrentEventID(~(homer_uint64)0),
124 0 : fBlockCnt(0),
125 0 : fMaxBlockCnt(0),
126 0 : fBlocks(NULL),
127 0 : fDataSourceCnt(0),
128 0 : fTCPDataSourceCnt(0),
129 0 : fShmDataSourceCnt(0),
130 0 : fDataSourceMaxCnt(0),
131 0 : fDataSources(NULL),
132 0 : fConnectionStatus(0),
133 0 : fErrorConnection(~(unsigned int)0),
134 0 : fEventRequestAdvanceTime(0)
135 0 : {
136 : // see header file for class documentation
137 : // For reading from a TCP port
138 0 : Init();
139 0 : if ( !AllocDataSources(1) )
140 : {
141 0 : fErrorConnection = 0;
142 0 : fConnectionStatus = ENOMEM;
143 0 : return;
144 : }
145 0 : fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
146 0 : if ( fConnectionStatus )
147 0 : fErrorConnection = 0;
148 : else
149 : {
150 0 : fDataSourceCnt++;
151 0 : fTCPDataSourceCnt++;
152 0 : fDataSources[0].fNdx = 0;
153 : }
154 0 : }
155 :
156 : AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports )
157 : :
158 0 : AliHLTMonitoringReader(),
159 0 : TObject(),
160 0 : fCurrentEventType(~(homer_uint64)0),
161 0 : fCurrentEventID(~(homer_uint64)0),
162 0 : fBlockCnt(0),
163 0 : fMaxBlockCnt(0),
164 0 : fBlocks(NULL),
165 0 : fDataSourceCnt(0),
166 0 : fTCPDataSourceCnt(0),
167 0 : fShmDataSourceCnt(0),
168 0 : fDataSourceMaxCnt(0),
169 0 : fDataSources(NULL),
170 0 : fConnectionStatus(0),
171 0 : fErrorConnection(~(unsigned int)0),
172 0 : fEventRequestAdvanceTime(0)
173 0 : {
174 : // see header file for class documentation
175 : // For reading from multiple TCP ports
176 0 : Init();
177 0 : if ( !AllocDataSources(tcpCnt) )
178 : {
179 0 : fErrorConnection = 0;
180 0 : fConnectionStatus = ENOMEM;
181 0 : return;
182 : }
183 0 : for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
184 : {
185 0 : fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
186 0 : if ( fConnectionStatus )
187 : {
188 0 : fErrorConnection = n;
189 0 : return;
190 : }
191 0 : fDataSources[n].fNdx = n;
192 : }
193 0 : }
194 :
195 : AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
196 : :
197 0 : AliHLTMonitoringReader(),
198 0 : TObject(),
199 0 : fCurrentEventType(~(homer_uint64)0),
200 0 : fCurrentEventID(~(homer_uint64)0),
201 0 : fBlockCnt(0),
202 0 : fMaxBlockCnt(0),
203 0 : fBlocks(NULL),
204 0 : fDataSourceCnt(0),
205 0 : fTCPDataSourceCnt(0),
206 0 : fShmDataSourceCnt(0),
207 0 : fDataSourceMaxCnt(0),
208 0 : fDataSources(NULL),
209 0 : fConnectionStatus(0),
210 0 : fErrorConnection(~(unsigned int)0),
211 0 : fEventRequestAdvanceTime(0)
212 0 : {
213 : // see header file for class documentation
214 : // For reading from a System V shared memory segment
215 0 : Init();
216 0 : if ( !AllocDataSources(1) )
217 : {
218 0 : fErrorConnection = 0;
219 0 : fConnectionStatus = ENOMEM;
220 0 : return;
221 : }
222 0 : fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
223 0 : if ( fConnectionStatus )
224 0 : fErrorConnection = 0;
225 : else
226 : {
227 0 : fDataSourceCnt++;
228 0 : fShmDataSourceCnt++;
229 0 : fDataSources[0].fNdx = 0;
230 : }
231 0 : }
232 :
233 : AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
234 : :
235 0 : AliHLTMonitoringReader(),
236 0 : TObject(),
237 0 : fCurrentEventType(~(homer_uint64)0),
238 0 : fCurrentEventID(~(homer_uint64)0),
239 0 : fBlockCnt(0),
240 0 : fMaxBlockCnt(0),
241 0 : fBlocks(NULL),
242 0 : fDataSourceCnt(0),
243 0 : fTCPDataSourceCnt(0),
244 0 : fShmDataSourceCnt(0),
245 0 : fDataSourceMaxCnt(0),
246 0 : fDataSources(NULL),
247 0 : fConnectionStatus(0),
248 0 : fErrorConnection(~(unsigned int)0),
249 0 : fEventRequestAdvanceTime(0)
250 0 : {
251 : // see header file for class documentation
252 : // For reading from multiple System V shared memory segments
253 0 : Init();
254 0 : if ( !AllocDataSources(shmCnt) )
255 : {
256 0 : fErrorConnection = 0;
257 0 : fConnectionStatus = ENOMEM;
258 0 : return;
259 : }
260 0 : for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
261 : {
262 0 : fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
263 0 : if ( fConnectionStatus )
264 : {
265 0 : fErrorConnection = n;
266 0 : return;
267 : }
268 0 : fDataSources[n].fNdx = n;
269 : }
270 0 : }
271 :
272 : AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, const unsigned short* ports,
273 : unsigned int shmCnt, const key_t* shmKeys, const int* shmSizes )
274 : :
275 0 : AliHLTMonitoringReader(),
276 0 : TObject(),
277 0 : fCurrentEventType(~(homer_uint64)0),
278 0 : fCurrentEventID(~(homer_uint64)0),
279 0 : fBlockCnt(0),
280 0 : fMaxBlockCnt(0),
281 0 : fBlocks(NULL),
282 0 : fDataSourceCnt(0),
283 0 : fTCPDataSourceCnt(0),
284 0 : fShmDataSourceCnt(0),
285 0 : fDataSourceMaxCnt(0),
286 0 : fDataSources(NULL),
287 0 : fConnectionStatus(0),
288 0 : fErrorConnection(~(unsigned int)0),
289 0 : fEventRequestAdvanceTime(0)
290 0 : {
291 : // see header file for class documentation
292 : // For reading from multiple TCP ports and multiple System V shared memory segments
293 0 : Init();
294 0 : if ( !AllocDataSources(tcpCnt+shmCnt) )
295 : {
296 0 : fErrorConnection = 0;
297 0 : fConnectionStatus = ENOMEM;
298 0 : return;
299 : }
300 0 : for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
301 : {
302 0 : fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
303 0 : if ( fConnectionStatus )
304 : {
305 0 : fErrorConnection = n;
306 0 : return;
307 : }
308 0 : fDataSources[n].fNdx = n;
309 : }
310 0 : for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
311 : {
312 0 : fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
313 0 : if ( fConnectionStatus )
314 : {
315 0 : fErrorConnection = tcpCnt+n;
316 0 : return;
317 : }
318 0 : fDataSources[n].fNdx = n;
319 : }
320 0 : }
321 :
322 : AliHLTHOMERReader::AliHLTHOMERReader( const void* pBuffer, int size )
323 : :
324 0 : AliHLTMonitoringReader(),
325 0 : TObject(),
326 0 : fCurrentEventType(~(homer_uint64)0),
327 0 : fCurrentEventID(~(homer_uint64)0),
328 0 : fBlockCnt(0),
329 0 : fMaxBlockCnt(0),
330 0 : fBlocks(NULL),
331 0 : fDataSourceCnt(0),
332 0 : fTCPDataSourceCnt(0),
333 0 : fShmDataSourceCnt(0),
334 0 : fDataSourceMaxCnt(0),
335 0 : fDataSources(NULL),
336 0 : fConnectionStatus(0),
337 0 : fErrorConnection(~(unsigned int)0),
338 0 : fEventRequestAdvanceTime(0)
339 0 : {
340 : // see header file for class documentation
341 : // For reading from a System V shared memory segment
342 0 : Init();
343 0 : if ( !AllocDataSources(1) )
344 : {
345 0 : fErrorConnection = 0;
346 0 : fConnectionStatus = ENOMEM;
347 0 : return;
348 : }
349 0 : fConnectionStatus = AddDataSource(const_cast<void*>(pBuffer), size, fDataSources[0] );
350 0 : if ( fConnectionStatus )
351 0 : fErrorConnection = 0;
352 : else
353 : {
354 0 : fDataSourceCnt++;
355 0 : fShmDataSourceCnt++;
356 0 : fDataSources[0].fNdx = 0;
357 : }
358 0 : }
359 :
360 0 : AliHLTHOMERReader::~AliHLTHOMERReader()
361 0 : {
362 : // see header file for class documentation
363 0 : ReleaseCurrentEvent();
364 0 : FreeDataSources();
365 :
366 0 : if (fDataSources)
367 0 : delete [] fDataSources;
368 0 : }
369 :
370 : int AliHLTHOMERReader::ReadNextEvent()
371 : {
372 : // see header file for class documentation
373 : // Read in the next available event
374 0 : return ReadNextEvent( false, 0 );
375 : }
376 :
377 : int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
378 : {
379 : // see header file for class documentation
380 : // Read in the next available event
381 0 : return ReadNextEvent( true, timeout );
382 : }
383 :
384 : unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
385 : {
386 : // see header file for class documentation
387 : // Return the size (in bytes) of the current event's data
388 : // block with the given block index (starting at 0).
389 0 : if ( ndx >= fBlockCnt )
390 0 : return 0;
391 0 : return fBlocks[ndx].fLength;
392 0 : }
393 :
394 : const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
395 : {
396 : // see header file for class documentation
397 : // Return a pointer to the start of the current event's data
398 : // block with the given block index (starting at 0).
399 0 : if ( ndx >= fBlockCnt )
400 0 : return NULL;
401 0 : return fBlocks[ndx].fData;
402 0 : }
403 :
404 : const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
405 : {
406 : // see header file for class documentation
407 : // Return IP address or hostname of node which sent the
408 : // current event's data block with the given block index
409 : // (starting at 0).
410 : // For HOMER this is the ID of the node on which the subscriber
411 : // that provided this data runs/ran.
412 0 : if ( ndx >= fBlockCnt )
413 0 : return NULL;
414 : #ifdef DEBUG
415 : if ( fBlocks[ndx].fSource >= fDataSourceCnt )
416 : {
417 : fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%u) >= fDataSourceCnt (%u)\n",
418 : __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
419 : return NULL;
420 : }
421 : #endif
422 0 : return fDataSources[ fBlocks[ndx].fSource ].fHostname;
423 : //return fBlocks[ndx].fOriginatingNodeID;
424 0 : }
425 :
426 : homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
427 : {
428 : // see header file for class documentation
429 : // Return byte order of the data stored in the
430 : // current event's data block with the given block index (starting at 0).
431 : // 0 is unknown alignment,
432 : // 1 ist little endian,
433 : // 2 is big endian. */
434 0 : if ( ndx >= fBlockCnt )
435 0 : return 0;
436 : //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
437 0 : return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
438 0 : }
439 :
440 : homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
441 : {
442 : // see header file for class documentation
443 : // Return the alignment (in bytes) of the given datatype
444 : // in the data stored in the current event's data block
445 : // with the given block index (starting at 0).
446 : // Possible values for the data type are
447 : // 0: homer_uint64
448 : // 1: homer_uint32
449 : // 2: uin16
450 : // 3: homer_uint8
451 : // 4: double
452 : // 5: float
453 0 : if ( ndx >= fBlockCnt )
454 0 : return 0;
455 0 : if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
456 0 : return 0;
457 : //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
458 0 : return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
459 0 : }
460 :
461 : homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
462 : {
463 : // see header file for class documentation
464 0 : if ( ndx >= fBlockCnt )
465 0 : return 0;
466 0 : return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
467 0 : }
468 :
469 : /* HOMER specific */
470 : /* Return the type of the data in the current event's data
471 : block with the given block index (starting at 0). */
472 : homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
473 : {
474 : // see header file for class documentation
475 0 : if ( ndx >= fBlockCnt )
476 0 : return ~(homer_uint64)0;
477 : //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
478 0 : return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
479 0 : }
480 :
481 : /* Return the origin of the data in the current event's data
482 : block with the given block index (starting at 0). */
483 : homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
484 : {
485 : // see header file for class documentation
486 0 : if ( ndx >= fBlockCnt )
487 0 : return ~(homer_uint32)0;
488 : //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
489 0 : return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
490 0 : }
491 :
492 : /* Return a specification of the data in the current event's data
493 : block with the given block index (starting at 0). */
494 : homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
495 : {
496 : // see header file for class documentation
497 0 : if ( ndx >= fBlockCnt )
498 0 : return ~(homer_uint32)0;
499 : //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
500 0 : return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
501 0 : }
502 :
503 : homer_uint64 AliHLTHOMERReader::GetBlockBirthSeconds( unsigned long ndx ) const
504 : {
505 : // see header file for class documentation
506 0 : if ( ndx >= fBlockCnt )
507 0 : return 0;
508 0 : return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kBirth_s_64b_Offset);
509 0 : }
510 :
511 : homer_uint64 AliHLTHOMERReader::GetBlockBirthMicroSeconds( unsigned long ndx ) const
512 : {
513 : // see header file for class documentation
514 0 : if ( ndx >= fBlockCnt )
515 0 : return 0;
516 0 : return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kBirth_us_64b_Offset);
517 0 : }
518 :
519 : /* Find the next data block in the current event with the given
520 : data type, origin, and specification. Returns the block's
521 : index. */
522 : unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
523 : homer_uint32 spec, unsigned long startNdx ) const
524 : {
525 : // see header file for class documentation
526 0 : for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
527 : {
528 0 : if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
529 0 : ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
530 0 : ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
531 0 : return n;
532 : }
533 0 : return ~(unsigned long)0;
534 0 : }
535 :
536 : /* Find the next data block in the current event with the given
537 : data type, origin, and specification. Returns the block's
538 : index. */
539 : unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
540 : homer_uint32 spec, unsigned long startNdx ) const
541 : {
542 : // see header file for class documentation
543 0 : for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
544 : {
545 : bool found1=true, found2=true;
546 0 : for ( unsigned i = 0; i < 8; i++ )
547 : {
548 0 : if ( type[i] != (char)0xFF )
549 : {
550 : found1=false;
551 0 : break;
552 : }
553 : }
554 0 : if ( !found1 )
555 : {
556 : found1 = true;
557 0 : for ( unsigned i = 0; i < 8; i++ )
558 : {
559 : //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
560 0 : if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
561 : {
562 : found1=false;
563 0 : break;
564 : }
565 : }
566 0 : }
567 0 : for ( unsigned i = 0; i < 4; i++ )
568 : {
569 0 : if ( origin[i] != (char)0xFF )
570 : {
571 : found2 = false;
572 0 : break;
573 : }
574 : }
575 0 : if ( !found2 )
576 : {
577 : found2 = true;
578 0 : for ( unsigned i = 0; i < 4; i++ )
579 : {
580 : //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
581 0 : if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
582 : {
583 : found2=false;
584 0 : break;
585 : }
586 : }
587 0 : }
588 : //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
589 0 : if ( found1 && found2 &&
590 0 : ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
591 0 : return n;
592 0 : }
593 0 : return ~(unsigned long)0;
594 0 : }
595 :
596 : /* Return the ID of the node that actually produced this data block.
597 : This may be different from the node which sent the data to this
598 : monitoring object as returned by GetBlockSendNodeID. */
599 : const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
600 : {
601 : // see header file for class documentation
602 0 : if ( ndx >= fBlockCnt )
603 0 : return NULL;
604 0 : return fBlocks[ndx].fOriginatingNodeID;
605 0 : }
606 :
607 :
608 : void AliHLTHOMERReader::Init()
609 : {
610 : // see header file for class documentation
611 0 : fCurrentEventType = ~(homer_uint64)0;
612 0 : fCurrentEventID = ~(homer_uint64)0;
613 0 : fMaxBlockCnt = fBlockCnt = 0;
614 0 : fBlocks = NULL;
615 :
616 0 : fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
617 0 : fDataSources = NULL;
618 :
619 :
620 0 : fConnectionStatus = 0;
621 0 : fErrorConnection = ~(unsigned int)0;
622 :
623 0 : fEventRequestAdvanceTime = 0;
624 0 : }
625 :
626 : bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
627 : {
628 : // see header file for class documentation
629 0 : fDataSources = new DataSource[ sourceCnt ];
630 0 : if ( !fDataSources )
631 0 : return false;
632 0 : memset(fDataSources, 0, sizeof(DataSource)*sourceCnt);
633 0 : fDataSourceCnt = 0;
634 0 : fDataSourceMaxCnt = sourceCnt;
635 0 : return true;
636 0 : }
637 :
638 : int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
639 : {
640 : // see header file for class documentation
641 : struct hostent* he;
642 0 : he = gethostbyname( hostname );
643 0 : if ( he == NULL )
644 : {
645 : //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
646 0 : return EADDRNOTAVAIL;
647 : }
648 :
649 0 : struct sockaddr_in remoteAddr;
650 0 : remoteAddr.sin_family = AF_INET; // host byte order
651 0 : remoteAddr.sin_port = htons(port); // short, network byte order
652 0 : remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
653 0 : memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
654 :
655 : // Create socket and connect to target program on remote node
656 0 : source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
657 0 : if ( source.fTCPConnection == -1 )
658 : {
659 0 : return errno;
660 : }
661 :
662 : int ret;
663 :
664 0 : ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
665 0 : if ( ret == -1 )
666 : {
667 0 : ret=errno;
668 0 : close( source.fTCPConnection );
669 0 : return ret;
670 : }
671 :
672 0 : ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
673 0 : if ( ret != (int)strlen(MOD_BIN) )
674 : {
675 0 : ret=errno;
676 0 : close( source.fTCPConnection );
677 0 : return ret;
678 : }
679 :
680 0 : unsigned hostnamelen=strlen( hostname );
681 0 : char* tmpchar = new char[ hostnamelen+1 ];
682 0 : if ( !tmpchar )
683 : {
684 0 : close( source.fTCPConnection );
685 0 : return ENOMEM;
686 : }
687 0 : strncpy( tmpchar, hostname, hostnamelen );
688 0 : tmpchar[hostnamelen]=0;
689 0 : source.fHostname = tmpchar;
690 :
691 0 : source.fType = kTCP;
692 0 : source.fTCPPort = port;
693 0 : source.fData = NULL;
694 0 : source.fDataSize = 0;
695 0 : source.fDataRead = 0;
696 0 : return 0;
697 0 : }
698 :
699 : int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
700 : {
701 : // see header file for class documentation
702 : int ret;
703 0 : char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
704 0 : if ( !tmpchar )
705 : {
706 0 : return ENOMEM;
707 : }
708 0 : gethostname( tmpchar, MAXHOSTNAMELEN );
709 0 : tmpchar[MAXHOSTNAMELEN]=(char)0;
710 0 : source.fHostname = tmpchar;
711 :
712 0 : source.fShmID = shmget( shmKey, shmSize, 0660 );
713 0 : if ( source.fShmID == -1 )
714 : {
715 0 : ret = errno;
716 0 : delete [] source.fHostname;
717 0 : return ret;
718 : }
719 :
720 0 : source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
721 :
722 0 : if ( !source.fShmPtr )
723 : {
724 0 : ret = errno;
725 0 : shmctl( source.fShmID, IPC_RMID, NULL );
726 0 : delete [] source.fHostname;
727 0 : return ret;
728 : }
729 :
730 0 : source.fType = kShm;
731 0 : source.fShmKey = shmKey;
732 0 : source.fShmSize = shmSize;
733 0 : source.fDataSize = 0;
734 0 : source.fDataRead = 0;
735 0 : return 0;
736 0 : }
737 :
738 : int AliHLTHOMERReader::AddDataSource( void* pBuffer, int size, DataSource& source )
739 : {
740 : // see header file for class documentation
741 : // a buffer data source is like a shm source apart from the shm attach and detach
742 : // procedure. Furthermore, the size indicator at the beginning of the buffer is not
743 : // cleared right before sources are read but after the reading.
744 : //int ret;
745 0 : if ( !pBuffer || size<=0) return EINVAL;
746 :
747 0 : char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
748 0 : if ( !tmpchar )
749 : {
750 0 : return ENOMEM;
751 : }
752 0 : gethostname( tmpchar, MAXHOSTNAMELEN );
753 0 : tmpchar[MAXHOSTNAMELEN]=(char)0;
754 0 : source.fHostname = tmpchar;
755 :
756 0 : source.fShmID = -1;
757 : // the data buffer does not contain a size indicator in the first 4 bytes
758 : // like the shm source buffer. Still we want to use the mechanism to invalidate/
759 : // trigger by clearing the size indicator. Take the source.fShmSize variable.
760 0 : source.fShmPtr = &source.fShmSize;
761 0 : source.fType = kBuf;
762 0 : source.fShmKey = 0;
763 0 : source.fShmSize = size;
764 0 : source.fData = pBuffer;
765 0 : source.fDataSize = 0;
766 0 : source.fDataRead = 0;
767 0 : return 0;
768 0 : }
769 :
770 : void AliHLTHOMERReader::FreeDataSources()
771 : {
772 : // see header file for class documentation
773 0 : for ( unsigned n=0; n < fDataSourceCnt; n++ )
774 : {
775 0 : if ( fDataSources[n].fType == kTCP )
776 0 : FreeTCPDataSource( fDataSources[n] );
777 0 : else if ( fDataSources[n].fType == kShm )
778 0 : FreeShmDataSource( fDataSources[n] );
779 0 : if ( fDataSources[n].fHostname )
780 0 : delete [] fDataSources[n].fHostname;
781 : }
782 0 : fDataSourceCnt=0;
783 0 : }
784 :
785 : int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
786 : {
787 : // see header file for class documentation
788 0 : if ( source.fShmPtr )
789 0 : shmdt( (char*)source.fShmPtr );
790 : // if ( source.fShmID != -1 )
791 : // shmctl( source.fShmID, IPC_RMID, NULL );
792 0 : return 0;
793 : }
794 :
795 : int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
796 : {
797 : // see header file for class documentation
798 0 : if ( source.fTCPConnection )
799 0 : close( source.fTCPConnection );
800 0 : return 0;
801 : }
802 :
803 : int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
804 : {
805 : // see header file for class documentation
806 0 : if ( fDataSourceCnt<=0 )
807 0 : return ENXIO;
808 : // Clean up currently active event.
809 0 : ReleaseCurrentEvent();
810 : int ret=0;
811 : // Trigger all configured data sources
812 0 : for ( unsigned n = 0; n<fDataSourceCnt; n++ ){
813 0 : if ( fDataSources[n].fType == kTCP )
814 0 : ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
815 0 : else if ( fDataSources[n].fType == kShm )
816 0 : ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
817 0 : if ( ret )
818 : {
819 0 : fErrorConnection = n;
820 0 : fConnectionStatus=ret;
821 0 : return fConnectionStatus;
822 : }
823 : }
824 : // Now read in data from the configured data source
825 0 : ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
826 :
827 0 : if ( ret )
828 : {
829 0 : return ret;
830 : }
831 0 : ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
832 0 : if ( ret )
833 : {
834 0 : return ret;
835 : }
836 : // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
837 : // {
838 : // if ( fDataSources[n].fType == kTCP )
839 : // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
840 : // else
841 : // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
842 : // if ( ret )
843 : // {
844 : // fErrorConnection = n;
845 : // fConnectionStatus=ret;
846 : // return fConnectionStatus;
847 : // }
848 : // }
849 : //Check to see that all sources contributed data for the same event
850 : homer_uint64 eventID;
851 : homer_uint64 eventType;
852 0 : if (!fDataSources[0].fData)
853 : {
854 0 : fErrorConnection = 0;
855 0 : fConnectionStatus=56;//ENOBUF;
856 0 : return fConnectionStatus;
857 : }
858 0 : eventID = GetSourceEventID( fDataSources[0] );
859 0 : eventType = GetSourceEventType( fDataSources[0] );
860 0 : for ( unsigned n = 1; n < fDataSourceCnt; n++ )
861 : {
862 0 : if ( !fDataSources[n].fData || GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
863 : {
864 0 : fErrorConnection = n;
865 0 : fConnectionStatus=56;//EBADRQC;
866 0 : return fConnectionStatus;
867 : }
868 : }
869 : // Find all the different data blocks contained in the data from all
870 : // the sources.
871 0 : for ( unsigned n = 0; n < fDataSourceCnt; n++ )
872 : {
873 0 : ret = ParseSourceData( fDataSources[n] );
874 0 : if ( ret )
875 : {
876 0 : fErrorConnection = n;
877 0 : fConnectionStatus=57;//EBADSLT;
878 0 : return ret;
879 : }
880 : }
881 0 : fCurrentEventID = eventID;
882 0 : fCurrentEventType = eventType;
883 0 : return 0;
884 0 : }
885 :
886 : void AliHLTHOMERReader::ReleaseCurrentEvent()
887 : {
888 : // see header file for class documentation
889 : // sources.fDataRead = 0;
890 : // fMaxBlockCnt
891 0 : fCurrentEventID = ~(homer_uint64)0;
892 0 : fCurrentEventType = ~(homer_uint64)0;
893 0 : for ( unsigned n = 0; n < fDataSourceCnt; n++ )
894 : {
895 0 : if ( fDataSources[n].fData )
896 : {
897 0 : if ( fDataSources[n].fType == kTCP )
898 0 : delete [] (homer_uint8*)fDataSources[n].fData;
899 : // do not reset the data pointer for kBuf sources since this
900 : // can not be set again.
901 0 : if ( fDataSources[n].fType != kBuf )
902 0 : fDataSources[n].fData = NULL;
903 : }
904 0 : fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
905 : }
906 0 : if ( fBlocks )
907 : {
908 0 : for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
909 : {
910 0 : if ( fBlocks[n].fOriginatingNodeID )
911 0 : delete [] fBlocks[n].fOriginatingNodeID;
912 : }
913 0 : delete [] fBlocks;
914 0 : fBlocks=0;
915 0 : fMaxBlockCnt = 0;
916 0 : fBlockCnt=0;
917 0 : }
918 0 : }
919 :
920 : int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
921 : {
922 : // see header file for class documentation
923 : int ret=0;
924 0 : struct timeval oldSndTO, newSndTO;
925 0 : memset(&oldSndTO, 0, sizeof(oldSndTO));
926 0 : memset(&newSndTO, 0, sizeof(newSndTO));
927 0 : if ( useTimeout )
928 : {
929 0 : socklen_t optlen=sizeof(oldSndTO);
930 0 : ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
931 0 : if ( ret )
932 : {
933 0 : return errno;
934 : }
935 0 : if ( optlen!=sizeof(oldSndTO) )
936 : {
937 0 : return ENXIO;
938 : }
939 0 : newSndTO.tv_sec = timeoutUsec / 1000000;
940 0 : newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
941 0 : ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
942 0 : if ( ret )
943 : {
944 0 : return errno;
945 : }
946 0 : }
947 : // Send one event request
948 0 : if ( !fEventRequestAdvanceTime )
949 : {
950 0 : ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
951 :
952 0 : if ( ret != (int)strlen(GET_ONE) )
953 : {
954 0 : ret=errno;
955 0 : setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
956 0 : return ret;
957 : }
958 : }
959 : else
960 : {
961 0 : char tmpCmd[ 128 ];
962 :
963 0 : int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%llu\n", (unsigned long long)fEventRequestAdvanceTime );
964 0 : if ( len>128 || len<0 )
965 : {
966 : ret=EMSGSIZE;
967 0 : setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
968 0 : return ret;
969 : }
970 :
971 0 : ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
972 :
973 0 : if ( ret != (int)strlen(tmpCmd) )
974 : {
975 0 : ret=errno;
976 0 : setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
977 0 : return ret;
978 : }
979 :
980 0 : }
981 0 : return 0;
982 0 : }
983 :
984 : int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long ) const
985 : {
986 : // see header file for class documentation
987 : // clear the size indicator in the first 4 bytes of the buffer to request data
988 : // from the HOMER writer.
989 0 : if ( source.fShmPtr )
990 : {
991 0 : *(homer_uint32*)( source.fShmPtr ) = 0;
992 0 : return 0;
993 : }
994 : else
995 0 : return EFAULT;
996 0 : }
997 :
998 : int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
999 : {
1000 : // see header file for class documentation
1001 : bool toRead = false;
1002 0 : do
1003 : {
1004 0 : fd_set conns;
1005 0 : FD_ZERO( &conns );
1006 : int highestConn=0;
1007 : toRead = false;
1008 : unsigned firstConnection=~(unsigned)0;
1009 0 : for ( unsigned long n = 0; n < sourceCnt; n++ )
1010 : {
1011 0 : if ( sources[n].fDataSize == 0 // size specifier not yet read
1012 0 : || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
1013 : {
1014 : toRead = true;
1015 0 : FD_SET( sources[n].fTCPConnection, &conns );
1016 0 : if ( sources[n].fTCPConnection > highestConn )
1017 0 : highestConn = sources[n].fTCPConnection;
1018 0 : fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
1019 0 : if ( firstConnection == ~(unsigned)0 )
1020 0 : firstConnection = n;
1021 : }
1022 : else
1023 : {
1024 0 : fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
1025 : }
1026 : }
1027 0 : if ( toRead )
1028 : {
1029 0 : struct timeval tv, *ptv;
1030 0 : if ( useTimeout )
1031 : {
1032 0 : tv.tv_sec = timeout / 1000000;
1033 0 : tv.tv_usec = timeout - (tv.tv_sec*1000000);
1034 : ptv = &tv;
1035 0 : }
1036 : else
1037 : ptv = NULL;
1038 : // wait until something is ready to be read
1039 : // either for timeout usecs or until eternity
1040 : int ret;
1041 0 : ret = select( highestConn+1, &conns, NULL, NULL, ptv );
1042 0 : if ( ret <=0 )
1043 : {
1044 0 : fErrorConnection = firstConnection;
1045 0 : if ( errno )
1046 0 : fConnectionStatus = errno;
1047 : else
1048 0 : fConnectionStatus = ETIMEDOUT;
1049 0 : return fConnectionStatus;
1050 : }
1051 0 : for ( unsigned n = 0; n < sourceCnt; n++ )
1052 : {
1053 0 : if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
1054 : {
1055 0 : if ( sources[n].fDataSize == 0 )
1056 : {
1057 0 : ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
1058 0 : if ( ret != sizeof(homer_uint32) )
1059 : {
1060 0 : fErrorConnection = n;
1061 0 : if ( errno )
1062 0 : fConnectionStatus = errno;
1063 : else
1064 0 : fConnectionStatus = ENOMSG;
1065 0 : return fConnectionStatus;
1066 : }
1067 0 : sources[n].fDataSize = ntohl( sources[n].fDataSize );
1068 0 : sources[n].fDataRead = 0;
1069 0 : sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
1070 0 : if ( !sources[n].fData )
1071 : {
1072 0 : fErrorConnection = n;
1073 0 : fConnectionStatus = ENOMEM;
1074 0 : return fConnectionStatus;
1075 : }
1076 : }
1077 0 : else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
1078 : {
1079 0 : ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
1080 0 : if ( ret>0 )
1081 0 : sources[n].fDataRead += ret;
1082 0 : else if ( ret == 0 )
1083 : {
1084 0 : fErrorConnection = n;
1085 0 : fConnectionStatus = ECONNRESET;
1086 0 : return fConnectionStatus;
1087 : }
1088 : else
1089 : {
1090 : fErrorConnection = n;
1091 0 : fConnectionStatus = errno;
1092 0 : return fConnectionStatus;
1093 : }
1094 : }
1095 : else
1096 : {
1097 0 : fErrorConnection = n;
1098 0 : fConnectionStatus = ENXIO;
1099 0 : return fConnectionStatus;
1100 : }
1101 : }
1102 : }
1103 0 : }
1104 0 : }
1105 : while ( toRead );
1106 0 : return 0;
1107 0 : }
1108 :
1109 : /*
1110 : int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
1111 : {
1112 : #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
1113 : // Send one event request
1114 : ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
1115 : if ( ret != strlen(GET_ONE) )
1116 : {
1117 : return errno;
1118 : }
1119 : // wait for and read back size specifier
1120 : unsigned sizeNBO;
1121 : // The value transmitted is binary, in network byte order
1122 : ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1123 : if ( ret != sizeof(sizeNBO) )
1124 : {
1125 : return errno;
1126 : }
1127 : // Convert back to host byte order
1128 : source.fDataSize = ntohl( sizeNBO );
1129 : source.fData = new homer_uint8[ source.fDataSize ];
1130 : unsigned long dataRead=0, toRead;
1131 : if ( !source.fData )
1132 : {
1133 : char buffer[1024];
1134 : // Read in data into buffer in order not to block connection
1135 : while ( dataRead < source.fDataSize )
1136 : {
1137 : if ( source.fDataSize-dataRead > 1024 )
1138 : toRead = 1024;
1139 : else
1140 : toRead = source.fDataSize-dataRead;
1141 : ret = read( source.fTCPConnection, buffer, toRead );
1142 : if ( ret > 0 )
1143 : dataRead += ret;
1144 : else
1145 : return errno;
1146 : }
1147 : return ENOMEM;
1148 : }
1149 : while ( dataRead < source.fDataSize )
1150 : {
1151 : toRead = source.fDataSize-dataRead;
1152 : ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1153 : if ( ret > 0 )
1154 : dataRead += ret;
1155 : else if ( ret == 0 && useTimeout )
1156 : {
1157 : struct timeval tv;
1158 : tv.tv_sec = timeout / 1000000;
1159 : tv.tv_usec = timeout - (tv.tv_sec*1000000);
1160 : fd_set conns;
1161 : FD_ZERO( &conns );
1162 : FD_SET( source.fTCPConnection, &conns );
1163 : ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1164 : if ( ret <=0 )
1165 : return errno;
1166 : }
1167 : else if ( ret == 0 )
1168 : {
1169 : if ( errno == EOK )
1170 : return ECONNRESET;
1171 : else
1172 : return errno;
1173 : }
1174 : else
1175 : {
1176 : return errno;
1177 : }
1178 : }
1179 : return 0;
1180 : }
1181 : */
1182 :
1183 : /*
1184 : int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1185 : {
1186 :
1187 : }
1188 : */
1189 :
1190 : int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1191 : {
1192 : // see header file for class documentation
1193 0 : struct timeval tv1, tv2;
1194 : bool found=false;
1195 : bool all=true;
1196 0 : if ( useTimeout )
1197 0 : gettimeofday( &tv1, NULL );
1198 : do
1199 : {
1200 : found = false;
1201 : all = true;
1202 0 : for ( unsigned n = 0; n < sourceCnt; n++ )
1203 : {
1204 0 : if ( !sources[n].fDataSize )
1205 0 : all = false;
1206 0 : if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1207 : {
1208 : found = true;
1209 0 : sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1210 0 : if (sources[n].fType==kBuf)
1211 : {
1212 : // the data buffer is already set to fData, just need to set fDataSize member
1213 : // to invalidate after the first reading. Subsequent calls to ReadNextEvent return 0
1214 0 : TriggerShmSource( sources[n], 0, 0 );
1215 0 : } else
1216 : {
1217 0 : sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1218 : }
1219 : }
1220 : }
1221 0 : if ( found && useTimeout )
1222 0 : gettimeofday( &tv1, NULL );
1223 0 : if ( !all && useTimeout )
1224 : {
1225 0 : gettimeofday( &tv2, NULL );
1226 : unsigned long long tdiff;
1227 0 : tdiff = tv2.tv_sec-tv1.tv_sec;
1228 0 : tdiff *= 1000000;
1229 0 : tdiff += tv2.tv_usec-tv1.tv_usec;
1230 0 : if ( tdiff > timeout )
1231 0 : return ETIMEDOUT;
1232 0 : }
1233 0 : if ( !all )
1234 0 : usleep( 0 );
1235 0 : }
1236 0 : while ( !all );
1237 0 : return 0;
1238 0 : }
1239 :
1240 : int AliHLTHOMERReader::ParseSourceData( const DataSource& source )
1241 : {
1242 : // see header file for class documentation
1243 0 : if ( source.fData )
1244 : {
1245 0 : homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1246 0 : if (sourceByteOrder!=kHOMERLittleEndianByteOrder && sourceByteOrder!=kHOMERBigEndianByteOrder) return EBADMSG;
1247 0 : homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1248 : // block count is not related to size of the data in the way the
1249 : // following condition implies. But we can at least limit the block
1250 : // count for the case the data is corrupted
1251 0 : if (blockCnt>source.fDataSize) return EBADMSG;
1252 0 : int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1253 0 : if ( ret )
1254 0 : return ret;
1255 0 : homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1256 0 : for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1257 : {
1258 0 : if (descrOffset+kLength_64b_Offset>=source.fDataSize) return EBADMSG;
1259 0 : homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1260 0 : unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1261 0 : if (descrOffset+descrLen>=source.fDataSize) return EBADMSG;
1262 0 : if (Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kID_64b_Offset ] ) != HOMER_BLOCK_DESCRIPTOR_TYPEID) return 126/*ENOKEY*/;
1263 0 : fBlocks[fBlockCnt].fSource = source.fNdx;
1264 0 : fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1265 0 : fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1266 0 : fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1267 : struct in_addr tmpA;
1268 0 : tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1269 0 : char* addr = inet_ntoa( tmpA );
1270 0 : unsigned straddrlen=strlen(addr);
1271 0 : char* tmpchar = new char[ straddrlen+1 ];
1272 0 : if ( !tmpchar )
1273 0 : return ENOMEM;
1274 0 : strncpy( tmpchar, addr, straddrlen );
1275 0 : tmpchar[straddrlen]=0;
1276 0 : fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1277 : descrOffset += descrLen;
1278 0 : }
1279 0 : return 0;
1280 : }
1281 0 : return EFAULT;
1282 0 : }
1283 :
1284 : int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1285 : {
1286 : // see header file for class documentation
1287 : DataBlock* newBlocks;
1288 0 : newBlocks = new DataBlock[ newCnt ];
1289 0 : if ( !newBlocks )
1290 0 : return ENOMEM;
1291 0 : unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1292 0 : if ( fBlocks ) {
1293 0 : memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1294 0 : } else {
1295 0 : fMaxBlockCnt=0;
1296 : }
1297 0 : if ( newCnt > fMaxBlockCnt )
1298 0 : memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1299 0 : if ( fBlocks )
1300 0 : delete [] fBlocks;
1301 0 : fBlocks = newBlocks;
1302 0 : fMaxBlockCnt = newCnt;
1303 : return 0;
1304 0 : }
1305 :
1306 : homer_uint64 AliHLTHOMERReader::GetSourceEventID( const DataSource& source )
1307 : {
1308 : // see header file for class documentation
1309 0 : homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1310 0 : return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1311 : }
1312 :
1313 : homer_uint64 AliHLTHOMERReader::GetSourceEventType( const DataSource& source )
1314 : {
1315 : // see header file for class documentation
1316 0 : homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1317 0 : return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1318 : }
1319 :
1320 : homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1321 : {
1322 : // see header file for class documentation
1323 0 : if ( destFormat == sourceFormat )
1324 0 : return source;
1325 : else
1326 0 : return ((source & 0xFFULL) << 56) |
1327 0 : ((source & 0xFF00ULL) << 40) |
1328 0 : ((source & 0xFF0000ULL) << 24) |
1329 0 : ((source & 0xFF000000ULL) << 8) |
1330 0 : ((source & 0xFF00000000ULL) >> 8) |
1331 0 : ((source & 0xFF0000000000ULL) >> 24) |
1332 0 : ((source & 0xFF000000000000ULL) >> 40) |
1333 0 : ((source & 0xFF00000000000000ULL) >> 56);
1334 0 : }
1335 :
1336 : homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1337 : {
1338 : // see header file for class documentation
1339 0 : if ( destFormat == sourceFormat )
1340 0 : return source;
1341 : else
1342 0 : return ((source & 0xFFUL) << 24) |
1343 0 : ((source & 0xFF00UL) << 8) |
1344 0 : ((source & 0xFF0000UL) >> 8) |
1345 0 : ((source & 0xFF000000UL) >> 24);
1346 0 : }
1347 :
1348 : AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPort(const char* hostname, unsigned short port )
1349 : {
1350 : // see header file for function documentation
1351 0 : return new AliHLTHOMERReader(hostname, port);
1352 0 : }
1353 :
1354 : AliHLTHOMERReader* AliHLTHOMERReaderCreateFromTCPPorts(unsigned int tcpCnt, const char** hostnames, unsigned short* ports)
1355 : {
1356 : // see header file for function documentation
1357 0 : return new AliHLTHOMERReader(tcpCnt, hostnames, ports);
1358 0 : }
1359 :
1360 : AliHLTHOMERReader* AliHLTHOMERReaderCreateFromBuffer(const void* pBuffer, int size)
1361 : {
1362 : // see header file for function documentation
1363 0 : return new AliHLTHOMERReader(pBuffer, size);
1364 0 : }
1365 :
1366 : void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1367 : {
1368 : // see header file for function documentation
1369 0 : if (pInstance) delete pInstance;
1370 0 : }
1371 :
1372 : /*
1373 : ***************************************************************************
1374 : **
1375 : ** $Author$ - Initial Version by Timm Morten Steinbeck
1376 : **
1377 : ** $Id$
1378 : **
1379 : ***************************************************************************
1380 : */
|