Line data Source code
1 : // blame: Mikolaj Krzewicki, mikolaj.krzewicki@cern.ch
2 : //see header file for details
3 : //
4 :
5 : #include "AliZMQhelpers.h"
6 :
7 : #include "zmq.h"
8 : #include <cstring>
9 : #include <cassert>
10 : #include <unistd.h>
11 : #include <vector>
12 :
13 : #include "AliHLTDataTypes.h"
14 : #include "TString.h"
15 : #include "TObjString.h"
16 : #include "TPRegexp.h"
17 : #include "TObjArray.h"
18 : #include "AliHLTMessage.h"
19 : #include "TStreamerInfo.h"
20 : #include "TClass.h"
21 :
22 : //init the shared context to null
23 : void* AliZMQhelpers::gZMQcontext = NULL;
24 :
25 : //_______________________________________________________________________________________
26 : void* alizmq_context()
27 : {
28 0 : if (!AliZMQhelpers::gZMQcontext) AliZMQhelpers::gZMQcontext=zmq_ctx_new();
29 0 : return AliZMQhelpers::gZMQcontext;
30 : }
31 :
32 : //_______________________________________________________________________________________
33 : int alizmq_detach (void *self, const char *endpoints, bool serverish)
34 : {
35 0 : assert (self);
36 0 : if (!endpoints)
37 0 : return 0;
38 :
39 0 : if (strlen(endpoints)<2)
40 0 : return 0;
41 :
42 : // We hold each individual endpoint here
43 0 : char endpoint [256];
44 0 : while (*endpoints) {
45 0 : const char *delimiter = strchr (endpoints, ',');
46 0 : if (!delimiter)
47 0 : delimiter = endpoints + strlen (endpoints);
48 0 : if (delimiter - endpoints > 255)
49 0 : return -1;
50 0 : memcpy (endpoint, endpoints, delimiter - endpoints);
51 0 : endpoint [delimiter - endpoints] = 0;
52 :
53 : int rc;
54 0 : if (endpoint [0] == '@')
55 0 : rc = zmq_bind (self, endpoint + 1);
56 : else
57 0 : if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
58 0 : rc = zmq_connect (self, endpoint + 1);
59 : else
60 0 : if (serverish)
61 0 : rc = zmq_unbind (self, endpoint);
62 : else
63 0 : rc = zmq_disconnect (self, endpoint);
64 :
65 0 : if (rc == -1)
66 0 : return -1; // Bad endpoint syntax
67 :
68 0 : if (*delimiter == 0)
69 0 : break;
70 0 : endpoints = delimiter + 1;
71 0 : }
72 0 : return 0;
73 0 : }
74 :
75 : //_______________________________________________________________________________________
76 : int alizmq_attach (void *self, const char *endpoints, bool serverish)
77 : {
78 0 : assert (self);
79 0 : if (!endpoints)
80 0 : return 0;
81 0 : if (strlen(endpoints)<2)
82 0 : return 0;
83 :
84 : // We hold each individual endpoint here
85 0 : char endpoint [256];
86 0 : while (*endpoints) {
87 0 : const char *delimiter = strchr (endpoints, ',');
88 0 : if (!delimiter)
89 0 : delimiter = endpoints + strlen (endpoints);
90 0 : if (delimiter - endpoints > 255)
91 0 : return -1;
92 0 : memcpy (endpoint, endpoints, delimiter - endpoints);
93 0 : endpoint [delimiter - endpoints] = 0;
94 :
95 : int rc;
96 0 : if (endpoint [0] == '@')
97 0 : rc = zmq_bind (self, endpoint + 1);
98 : else
99 0 : if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
100 0 : rc = zmq_connect (self, endpoint + 1);
101 : else
102 0 : if (serverish)
103 0 : rc = zmq_bind (self, endpoint);
104 : else
105 0 : rc = zmq_connect (self, endpoint);
106 :
107 0 : if (rc == -1)
108 0 : return -1; // Bad endpoint syntax
109 :
110 0 : if (*delimiter == 0)
111 0 : break;
112 0 : endpoints = delimiter + 1;
113 0 : }
114 0 : return 0;
115 0 : }
116 :
117 : //_______________________________________________________________________________________
118 : int alizmq_socket_state(void* socket)
119 : {
120 0 : int events=0;
121 0 : size_t len = sizeof(events);
122 0 : zmq_getsockopt(socket, ZMQ_EVENTS, &events, &len);
123 0 : return events;
124 0 : }
125 :
126 : //_______________________________________________________________________________________
127 : int alizmq_socket_type(void* socket)
128 : {
129 : //get the type of the socket
130 0 : int type=-1;
131 0 : size_t typeLen=sizeof(type);
132 : int rc=0;
133 0 : rc = zmq_getsockopt(socket, ZMQ_TYPE, &type, &typeLen);
134 0 : if (rc<0) return rc;
135 0 : return type;
136 0 : }
137 :
138 : //_______________________________________________________________________________________
139 : int alizmq_socket_type(std::string config)
140 : {
141 0 : if (config.compare(0,3,"PUB")==0) return ZMQ_PUB;
142 0 : else if (config.compare(0,3,"SUB")==0) return ZMQ_SUB;
143 0 : else if (config.compare(0,3,"REP")==0) return ZMQ_REP;
144 0 : else if (config.compare(0,3,"REQ")==0) return ZMQ_REQ;
145 0 : else if (config.compare(0,4,"PUSH")==0) return ZMQ_PUSH;
146 0 : else if (config.compare(0,4,"PULL")==0) return ZMQ_PULL;
147 0 : else if (config.compare(0,6,"DEALER")==0) return ZMQ_DEALER;
148 0 : else if (config.compare(0,6,"ROUTER")==0) return ZMQ_ROUTER;
149 0 : else if (config.compare(0,6,"STREAM")==0) return ZMQ_STREAM;
150 0 : else if (config.compare(0,4,"PAIR")==0) return ZMQ_PAIR;
151 0 : else if (config.compare(0,4,"XSUB")==0) return ZMQ_XSUB;
152 0 : else if (config.compare(0,4,"XPUB")==0) return ZMQ_XPUB;
153 :
154 : //printf("Invalid socket type %s\n", config.c_str());
155 0 : return -1;
156 0 : }
157 :
158 : //_______________________________________________________________________________________
159 : const char* alizmq_socket_name(int socketType)
160 : {
161 0 : switch (socketType)
162 : {
163 0 : case ZMQ_PUB: return "PUB";
164 0 : case ZMQ_SUB: return "SUB";
165 0 : case ZMQ_REP: return "REP";
166 0 : case ZMQ_REQ: return "REQ";
167 0 : case ZMQ_PUSH: return "PUSH";
168 0 : case ZMQ_PULL: return "PULL";
169 0 : case ZMQ_DEALER: return "DEALER";
170 0 : case ZMQ_ROUTER: return "ROUTER";
171 0 : case ZMQ_STREAM: return "STREAM";
172 0 : case ZMQ_PAIR: return "PAIR";
173 0 : case ZMQ_XPUB: return "XPUB";
174 0 : case ZMQ_XSUB: return "XSUB";
175 0 : default: return "INVALID";
176 : }
177 0 : }
178 :
179 : //_______________________________________________________________________________________
180 : int alizmq_socket_close(void*& socket, int linger)
181 : {
182 0 : zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
183 0 : int rc = zmq_close(socket);
184 0 : if (rc>=0) socket = NULL;
185 0 : return rc;
186 : }
187 :
188 : //_______________________________________________________________________________________
189 : int alizmq_socket_init(void*& socket, void* context, std::string config, int timeout, int highWaterMark)
190 : {
191 : int rc = 0;
192 : int zmqSocketMode = 0;
193 0 : std::string zmqEndpoints = "";
194 :
195 0 : size_t configStartPos = config.find_first_not_of(" \t\n");
196 0 : size_t configEndPos = config.find_last_not_of(" \t\n");
197 0 : if (configStartPos!=std::string::npos && configEndPos!=std::string::npos)
198 0 : { config = config.substr(configStartPos,configEndPos-configStartPos+1); }
199 :
200 0 : if (config.empty()) {
201 0 : alizmq_socket_close(socket);
202 0 : socket = NULL;
203 0 : return 999999;
204 : }
205 :
206 0 : std::size_t found = config.find_first_of("@>-+");
207 0 : if (found == 0)
208 : {
209 : //printf("misformed socket config string %s\n", config.c_str());
210 0 : return -1;
211 : }
212 :
213 0 : zmqSocketMode = alizmq_socket_type(config);
214 :
215 0 : if (found!=std::string::npos)
216 0 : { zmqEndpoints=config.substr(found,std::string::npos); }
217 :
218 : bool newSocket=true;
219 : //init the ZMQ socket
220 0 : if (socket)
221 : {
222 : newSocket=false;
223 0 : int lingerValue = 10;
224 0 : rc = zmq_setsockopt(socket, ZMQ_LINGER, &lingerValue, sizeof(lingerValue));
225 0 : if (rc!=0)
226 : {
227 : //printf("cannot set linger 0 on socket before closing\n");
228 0 : return -2;
229 : }
230 0 : rc = zmq_close(socket);
231 0 : if (rc!=0)
232 : {
233 : //printf("zmq_close() says: %s\n",zmq_strerror(errno));
234 0 : return -3;
235 : }
236 0 : }
237 :
238 0 : socket = zmq_socket(context, zmqSocketMode);
239 :
240 : //set socket options
241 0 : rc += zmq_setsockopt(socket, ZMQ_RCVHWM, &highWaterMark, sizeof(highWaterMark));
242 0 : rc += zmq_setsockopt(socket, ZMQ_SNDHWM, &highWaterMark, sizeof(highWaterMark));
243 0 : rc += zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
244 0 : rc += zmq_setsockopt(socket, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));
245 0 : if (rc!=0)
246 : {
247 : //printf("cannot set socket options\n");
248 0 : return -4;
249 : }
250 :
251 : //by default subscribe to everything if we happen to be SUB
252 0 : if (zmqSocketMode == ZMQ_SUB) {
253 0 : rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
254 0 : }
255 :
256 : //connect the socket to the endpoints
257 : //when reinitializing sometimes it is not possible to bind the same port again fast,
258 : //we need to retry a few times if we are indeed reconnecting, otherwise we just exit
259 : int i=100;
260 0 : while (i-->0)
261 : {
262 0 : rc = alizmq_attach(socket, zmqEndpoints.c_str() );
263 0 : if ( rc==0 || newSocket ) break;
264 0 : usleep(100000);
265 : }
266 0 : if (rc!=0)
267 : {
268 : //printf("cannot attach to %s\n",zmqEndpoints.c_str());
269 0 : return -5;
270 : }
271 :
272 0 : int lingerValue = 0;
273 0 : rc += zmq_setsockopt(socket, ZMQ_LINGER, &lingerValue, sizeof(lingerValue));
274 : //printf("socket mode: %s, endpoints: %s\n",alizmq_socket_name(zmqSocketMode), zmqEndpoints.c_str());
275 :
276 : //reset the object containers
277 : return zmqSocketMode;
278 0 : }
279 :
280 : //_______________________________________________________________________________________
281 : int alizmq_msg_add(aliZMQmsg* message, const std::string& topic, const std::string& data)
282 : {
283 : //add a frame to the mesage
284 : int rc = 0;
285 :
286 : //prepare topic msg
287 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
288 0 : rc = zmq_msg_init_size( topicMsg, topic.size());
289 0 : if (rc<0) {
290 0 : zmq_msg_close(topicMsg);
291 0 : delete topicMsg;
292 0 : return -1;
293 : }
294 0 : memcpy(zmq_msg_data(topicMsg),topic.data(),topic.size());
295 :
296 : //prepare data msg
297 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
298 0 : rc = zmq_msg_init_size( dataMsg, data.size());
299 0 : if (rc<0) {
300 0 : zmq_msg_close(topicMsg);
301 0 : zmq_msg_close(dataMsg);
302 0 : delete topicMsg;
303 0 : delete dataMsg;
304 0 : return -1;
305 : }
306 0 : memcpy(zmq_msg_data(dataMsg),data.data(),data.size());
307 :
308 : //add the frame to the message
309 0 : message->push_back(std::make_pair(topicMsg,dataMsg));
310 0 : return message->size();
311 0 : }
312 :
313 : //_______________________________________________________________________________________
314 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, void* data, int size)
315 : {
316 : //add a frame to the mesage
317 : int rc = 0;
318 :
319 : //prepare topic msg
320 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
321 0 : rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
322 0 : if (rc<0) {
323 0 : zmq_msg_close(topicMsg);
324 0 : delete topicMsg;
325 0 : return -1;
326 : }
327 0 : memcpy(zmq_msg_data(topicMsg),topic,sizeof(*topic));
328 :
329 : //prepare data msg
330 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
331 0 : rc = zmq_msg_init_size( dataMsg, size);
332 0 : if (rc<0) {
333 0 : zmq_msg_close(topicMsg);
334 0 : zmq_msg_close(dataMsg);
335 0 : delete topicMsg;
336 0 : delete dataMsg;
337 0 : return -1;
338 : }
339 0 : memcpy(zmq_msg_data(dataMsg),data,size);
340 :
341 : //add the frame to the message
342 0 : message->push_back(std::make_pair(topicMsg,dataMsg));
343 0 : return message->size();
344 0 : }
345 :
346 : //_______________________________________________________________________________________
347 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, const std::string& data)
348 : {
349 : //add a frame to the mesage
350 : int rc = 0;
351 :
352 : //prepare topic msg
353 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
354 0 : rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
355 0 : if (rc<0) {
356 0 : zmq_msg_close(topicMsg);
357 0 : delete topicMsg;
358 0 : return -1;
359 : }
360 0 : memcpy(zmq_msg_data(topicMsg),topic,sizeof(*topic));
361 :
362 : //prepare data msg
363 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
364 0 : rc = zmq_msg_init_size( dataMsg, data.size());
365 0 : if (rc<0) {
366 0 : zmq_msg_close(topicMsg);
367 0 : zmq_msg_close(dataMsg);
368 0 : delete topicMsg;
369 0 : delete dataMsg;
370 0 : return -1;
371 : }
372 0 : memcpy(zmq_msg_data(dataMsg),data.data(),data.size());
373 :
374 : //add the frame to the message
375 0 : message->push_back(std::make_pair(topicMsg,dataMsg));
376 0 : return message->size();
377 0 : }
378 :
379 : //_______________________________________________________________________________________
380 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, TObject* object,
381 : int compression, aliZMQrootStreamerInfo* streamers)
382 : {
383 : //add a frame to the mesage
384 : int rc = 0;
385 :
386 : //prepare topic msg
387 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
388 0 : rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
389 0 : if (rc<0) {
390 0 : zmq_msg_close(topicMsg);
391 0 : delete topicMsg;
392 0 : return -1;
393 : }
394 0 : memcpy(zmq_msg_data(topicMsg), topic, sizeof(*topic));
395 :
396 : //prepare data msg
397 0 : AliHLTMessage* tmessage = AliHLTMessage::Stream(object, compression, 0, streamers);
398 0 : if (!tmessage) {
399 0 : zmq_msg_close(topicMsg);
400 0 : delete topicMsg;
401 0 : return -1;
402 : }
403 :
404 0 : if (streamers) {
405 0 : alizmq_update_streamerlist(streamers, tmessage->GetStreamerInfos());
406 0 : }
407 :
408 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
409 0 : rc = zmq_msg_init_data( dataMsg, tmessage->Buffer(), tmessage->Length(),
410 0 : alizmq_deleteTObject, tmessage);
411 0 : if (rc<0) {
412 0 : zmq_msg_close(topicMsg);
413 0 : zmq_msg_close(dataMsg);
414 0 : delete topicMsg;
415 0 : delete dataMsg;
416 0 : return -1;
417 : }
418 :
419 : //add the frame to the message
420 0 : message->push_back(std::make_pair(topicMsg,dataMsg));
421 0 : return message->size();
422 0 : }
423 :
424 : //_______________________________________________________________________________________
425 : int alizmq_msg_send(aliZMQmsg* message, void* socket, int flagsUser)
426 : {
427 : int nBytes=0;
428 : int rc = 0;
429 0 : int flags = flagsUser | ZMQ_SNDMORE;
430 :
431 0 : for (aliZMQmsg::iterator i=message->begin(); i!=message->end(); ++i)
432 : {
433 0 : zmq_msg_t* topic = i->first;
434 0 : zmq_msg_t* data = i->second;
435 :
436 0 : rc = zmq_msg_send(topic, socket, flags);
437 0 : if (rc<0) break;
438 0 : nBytes+=rc;
439 :
440 0 : if (&*i == &*message->rbegin()) flags=flagsUser; //last frame
441 0 : rc = zmq_msg_send(data, socket, flags);
442 0 : if (rc<0) break;
443 0 : nBytes+=rc;
444 0 : }
445 0 : if (rc<0) nBytes=rc;
446 0 : return nBytes;
447 : }
448 :
449 : //_______________________________________________________________________________________
450 : int alizmq_msg_send(std::string topic, std::string data, void* socket, int flags)
451 : {
452 : int rc = 0;
453 0 : zmq_msg_t topicMsg;
454 0 : zmq_msg_init_size(&topicMsg, topic.size());
455 0 : memcpy(zmq_msg_data(&topicMsg), topic.data(), zmq_msg_size(&topicMsg));
456 0 : rc = zmq_msg_send(&topicMsg, socket, ZMQ_SNDMORE);
457 0 : if (rc<0)
458 : {
459 : //printf("unable to send topic: %s\n", topic.c_str());
460 0 : zmq_msg_close(&topicMsg);
461 0 : return rc;
462 : }
463 :
464 0 : zmq_msg_t dataMsg;
465 0 : zmq_msg_init_size(&dataMsg, data.size());
466 0 : memcpy(zmq_msg_data(&dataMsg), data.data(), zmq_msg_size(&dataMsg));
467 0 : rc = zmq_msg_send(&dataMsg, socket, flags);
468 0 : if (rc<0)
469 : {
470 : //printf("unable to send data: %s\n", data.c_str());
471 0 : zmq_msg_close(&dataMsg);
472 0 : return rc;
473 : }
474 0 : return rc;
475 0 : }
476 :
477 : //_______________________________________________________________________________________
478 : int alizmq_msg_prepend_streamer_infos(aliZMQmsg* message, aliZMQrootStreamerInfo* streamers)
479 : {
480 : //prepend the streamer info to the message as first block.
481 : int rc = 0;
482 :
483 0 : AliHLTDataTopic topic = kAliHLTDataTypeStreamerInfo;
484 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
485 0 : rc = zmq_msg_init_size( topicMsg, sizeof(topic));
486 0 : if (rc<0) {
487 0 : zmq_msg_close(topicMsg);
488 0 : delete topicMsg;
489 0 : return -1;
490 : }
491 0 : memcpy(zmq_msg_data(topicMsg), &topic, sizeof(topic));
492 :
493 : //prepare data msg
494 0 : TObjArray listOfInfos;
495 0 : for (aliZMQrootStreamerInfo::const_iterator i=streamers->begin(); i!=streamers->end(); ++i) {
496 0 : listOfInfos.Add(*i);
497 : }
498 0 : AliHLTMessage* tmessage = AliHLTMessage::Stream(&listOfInfos, 1); //compress
499 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
500 0 : rc = zmq_msg_init_data( dataMsg, tmessage->Buffer(), tmessage->Length(),
501 0 : alizmq_deleteTObject, tmessage);
502 0 : if (rc<0) {
503 0 : zmq_msg_close(topicMsg);
504 0 : zmq_msg_close(dataMsg);
505 0 : delete topicMsg;
506 0 : delete dataMsg;
507 0 : return -1;
508 : }
509 :
510 0 : message->insert(message->begin(),std::make_pair(topicMsg,dataMsg));
511 :
512 0 : return 0;
513 0 : }
514 :
515 : //_______________________________________________________________________________________
516 : void alizmq_update_streamerlist(aliZMQrootStreamerInfo* streamers, const TObjArray* newStreamers)
517 : {
518 : //update the list of streamers used
519 0 : if (!streamers) return;
520 0 : if (!newStreamers) return;
521 :
522 0 : for (int i=0; i<newStreamers->GetEntriesFast(); i++) {
523 0 : TVirtualStreamerInfo* info = const_cast<TVirtualStreamerInfo*> (
524 0 : static_cast<const TVirtualStreamerInfo*>((*newStreamers)[i]) );
525 0 : const char* name = info->GetName();
526 0 : int version = info->GetClassVersion();
527 : bool found=false;
528 0 : for (aliZMQrootStreamerInfo::iterator i=streamers->begin(); i!=streamers->end(); ++i)
529 : {
530 0 : const char* existingName = (*i)->GetName();
531 0 : int existingVersion = (*i)->GetClassVersion();
532 0 : if (name == existingName && version==existingVersion) {
533 : found=true;
534 0 : break;
535 : }
536 0 : }
537 0 : if (!found) {
538 0 : streamers->push_back(info);
539 0 : }
540 0 : }
541 0 : }
542 :
543 : //_______________________________________________________________________________________
544 : int alizmq_msg_iter_init_streamer_infos(aliZMQmsg::iterator it)
545 : {
546 : int rc = 0;
547 0 : TObject* obj = NULL;
548 0 : rc = alizmq_msg_iter_data(it,obj);
549 0 : TObjArray* pSchemas = dynamic_cast<TObjArray*>(obj);
550 0 : if (!pSchemas) {
551 0 : return -1;
552 : }
553 :
554 0 : pSchemas->SetOwner(kTRUE);
555 :
556 0 : for (int i=0; i<pSchemas->GetEntriesFast(); i++) {
557 0 : if (pSchemas->At(i)) {
558 0 : TStreamerInfo* pSchema=dynamic_cast<TStreamerInfo*>(pSchemas->At(i));
559 0 : if (pSchema) {
560 0 : int version=pSchema->GetClassVersion();
561 0 : TClass* pClass=TClass::GetClass(pSchema->GetName());
562 0 : if (pClass) {
563 0 : if (pClass->GetClassVersion()==version) {
564 : //AliDebug(0,Form("skipping schema definition %d version %d to class %s as this is the native version", i, version, pSchema->GetName()));
565 0 : continue;
566 : }
567 0 : TObjArray* pInfos=const_cast<TObjArray*>(pClass->GetStreamerInfos());
568 0 : if (pInfos /*&& version<pInfos->GetEntriesFast()*/) {
569 0 : TVirtualStreamerInfo* pInfo = dynamic_cast<TVirtualStreamerInfo*>(pInfos->At(version));
570 0 : if (pInfo==NULL) {
571 0 : pSchema->SetClass(pClass);
572 0 : pSchema->BuildOld();
573 0 : pInfos->AddAtAndExpand(pSchema, version);
574 0 : pSchemas->Remove(pSchema);
575 0 : printf("adding %s %i\n",pSchema->GetName(),version);
576 : //AliDebug(0,Form("adding schema definition %d version %d to class %s", i, version, pSchema->GetName()));
577 0 : } else {
578 0 : if (pInfo && pInfo->GetClassVersion()==version) {
579 : //AliDebug(0,Form("schema definition %d version %d already available in class %s, skipping ...", i, version, pSchema->GetName()));
580 : } else {
581 : //AliError(Form("can not verify version for already existing schema definition %d (%s) version %d: version of existing definition is %d", i, pSchema->GetName(), version, pInfo?pInfo->GetClassVersion():-1));
582 : }
583 : }
584 0 : } else {
585 : //AliError(Form("skipping schema definition %d (%s), unable to set version %d in info array of size %d", i, pSchema->GetName(), version, pInfos?pInfos->GetEntriesFast():-1));
586 : }
587 0 : } else {
588 : //AliError(Form("skipping schema definition %d (%s), unable to find class", i, pSchema->GetName()));
589 : }
590 0 : } else {
591 : //AliError(Form("skipping schema definition %d, not of TStreamerInfo", i));
592 : }
593 0 : }
594 : }
595 :
596 0 : delete pSchemas; //this destroys remaining schemas as pSchemas is set owner
597 0 : return 0;
598 0 : }
599 :
600 : //_______________________________________________________________________________________
601 : int alizmq_msg_send(const AliHLTDataTopic& topic, TObject* object, void* socket, int flags,
602 : int compression, aliZMQrootStreamerInfo* streamers)
603 : {
604 : int rc = 0;
605 :
606 0 : AliHLTMessage* tmessage = AliHLTMessage::Stream(object, compression);
607 0 : zmq_msg_t dataMsg;
608 0 : rc = zmq_msg_init_data( &dataMsg, tmessage->Buffer(), tmessage->Length(),
609 0 : alizmq_deleteTObject, tmessage);
610 :
611 0 : if (streamers) {
612 0 : alizmq_update_streamerlist(streamers, tmessage->GetStreamerInfos());
613 0 : }
614 :
615 : //then send the object topic
616 0 : rc = zmq_send( socket, &topic, sizeof(topic), ZMQ_SNDMORE );
617 0 : if (rc<0)
618 : {
619 0 : zmq_msg_close(&dataMsg);
620 : //printf("unable to send topic: %s %s\n", topic.Description().c_str(), zmq_strerror(errno));
621 0 : return rc;
622 : }
623 :
624 : //send the object itself
625 0 : rc = zmq_msg_send(&dataMsg, socket, flags);
626 0 : if (rc<0)
627 : {
628 : //printf("unable to send data: %s %s\n", tmessage->GetName(), zmq_strerror(errno));
629 0 : zmq_msg_close(&dataMsg);
630 0 : return rc;
631 : }
632 0 : return rc;
633 0 : }
634 :
635 : //______________________________________________________________________________
636 : int alizmq_msg_send(const AliHLTDataTopic& topic, const std::string& data, void* socket, int flags)
637 : {
638 : int rc = 0;
639 0 : rc = zmq_send( socket, &topic, sizeof(topic), ZMQ_SNDMORE );
640 0 : if (rc<0)
641 : {
642 : //printf("unable to send topic: %s %s\n", topic.Description().c_str(), zmq_strerror(errno));
643 0 : return rc;
644 : }
645 :
646 0 : zmq_msg_t dataMsg;
647 0 : zmq_msg_init_size(&dataMsg, data.size());
648 0 : memcpy(zmq_msg_data(&dataMsg), data.data(), zmq_msg_size(&dataMsg));
649 0 : rc = zmq_msg_send(&dataMsg, socket, flags);
650 0 : if (rc<0)
651 : {
652 : //printf("unable to send data: %s\n", data.c_str());
653 0 : zmq_msg_close(&dataMsg);
654 0 : return rc;
655 : }
656 0 : return rc;
657 0 : }
658 :
659 : //______________________________________________________________________________
660 : void alizmq_deleteTObject(void*, void* object)
661 : {
662 : //delete the TBuffer, for use in zmq_msg_init_data(...) only.
663 : //printf("deleteObject called! ZMQ just sent and destroyed the message!\n");
664 0 : TObject* tobject = static_cast<TObject*>(object);
665 0 : delete tobject;
666 0 : }
667 :
668 : //______________________________________________________________________________
669 : void alizmq_deleteTopic(void*, void* object)
670 : {
671 : //delete the TBuffer, for use in zmq_msg_init_data(...) only.
672 : //printf("deleteObject called! ZMQ just sent and destroyed the message!\n");
673 0 : AliHLTDataTopic* topic = static_cast<AliHLTDataTopic*>(object);
674 0 : delete topic;
675 0 : }
676 :
677 :
678 : //_______________________________________________________________________________________
679 : int alizmq_msg_close(aliZMQmsg* message)
680 : {
681 : int rc = 0;
682 0 : for (aliZMQmsg::iterator i=message->begin(); i!=message->end(); ++i)
683 : {
684 0 : int rc1 = zmq_msg_close(i->first);
685 0 : delete i->first; i->first=NULL;
686 0 : int rc2 = zmq_msg_close(i->second);
687 0 : delete (i->second); i->second=NULL;
688 : }
689 0 : message->clear();
690 0 : return 0;
691 : }
692 :
693 : //_______________________________________________________________________________________
694 : int alizmq_msg_iter_check(aliZMQmsg::iterator it, const AliHLTDataTopic& topic)
695 : {
696 0 : AliHLTDataTopic actualTopic;
697 0 : alizmq_msg_iter_topic(it, actualTopic);
698 0 : if (actualTopic == topic) return 0;
699 0 : return 1;
700 0 : }
701 :
702 : //_______________________________________________________________________________________
703 : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const AliHLTDataTopic& topic)
704 : {
705 0 : AliHLTDataTopic actualTopic;
706 0 : alizmq_msg_iter_topic(it, actualTopic);
707 0 : if (actualTopic.GetID() == topic.GetID()) return 0;
708 0 : return 1;
709 0 : }
710 :
711 : //_______________________________________________________________________________________
712 : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const std::string& topic)
713 : {
714 0 : AliHLTDataTopic actualTopic;
715 0 : alizmq_msg_iter_topic(it, actualTopic);
716 0 : std::string topicID = actualTopic.GetID();
717 0 : return topicID.compare(0,topic.size(),topic);
718 0 : }
719 :
720 : //_______________________________________________________________________________________
721 : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, std::string& topic)
722 : {
723 0 : zmq_msg_t* message = it->first;
724 0 : topic.assign((char*)zmq_msg_data(message),zmq_msg_size(message));
725 0 : return 0;
726 : }
727 :
728 : //_______________________________________________________________________________________
729 : int alizmq_msg_iter_data(aliZMQmsg::iterator it, std::string& data)
730 : {
731 0 : zmq_msg_t* message = it->second;
732 0 : data.assign((char*)zmq_msg_data(message),zmq_msg_size(message));
733 0 : return 0;
734 : }
735 :
736 : //_______________________________________________________________________________________
737 : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, AliHLTDataTopic& topic)
738 : {
739 0 : zmq_msg_t* message = it->first;
740 0 : memcpy(&topic, zmq_msg_data(message),std::min(zmq_msg_size(message),sizeof(topic)));
741 0 : return 0;
742 : }
743 :
744 : //_______________________________________________________________________________________
745 : int alizmq_msg_iter_data(aliZMQmsg::iterator it, TObject*& object)
746 : {
747 0 : zmq_msg_t* message = it->second;
748 0 : size_t size = zmq_msg_size(message);
749 0 : void* data = zmq_msg_data(message);
750 :
751 0 : object = AliHLTMessage::Extract(data, size);
752 0 : return 0;
753 : }
754 :
755 : //_______________________________________________________________________________________
756 : int alizmq_msg_copy(aliZMQmsg* dst, aliZMQmsg* src)
757 : {
758 : //copy (append) src to dst
759 : int numberOfMessages=0;
760 0 : for (aliZMQmsg::iterator i=src->begin(); i!=src->end(); ++i)
761 : {
762 : int rc=0;
763 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
764 0 : rc = zmq_msg_init(topicMsg);
765 0 : rc += zmq_msg_copy(topicMsg, i->first);
766 0 : if (rc<0) numberOfMessages=-1;
767 :
768 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
769 0 : rc = zmq_msg_init(dataMsg);
770 0 : rc += zmq_msg_copy(dataMsg, i->second);
771 0 : if (rc<0) numberOfMessages=-1;
772 :
773 0 : if (numberOfMessages<0)
774 : {
775 0 : zmq_msg_close(topicMsg);
776 0 : zmq_msg_close(dataMsg);
777 0 : delete topicMsg;
778 0 : delete dataMsg;
779 0 : return -1;
780 : }
781 :
782 0 : dst->push_back(std::make_pair(topicMsg, dataMsg));
783 0 : numberOfMessages++;
784 0 : }
785 0 : return numberOfMessages;
786 0 : }
787 :
788 : //_______________________________________________________________________________________
789 : int alizmq_msg_recv(aliZMQmsg* message, void* socket, int flags)
790 : {
791 : int rc = -1;
792 : int receiveStatus=0;
793 0 : while (true)
794 : {
795 0 : zmq_msg_t* topicMsg = new zmq_msg_t;
796 0 : rc = zmq_msg_init(topicMsg);
797 0 : rc = zmq_msg_recv(topicMsg, socket, flags);
798 0 : if (!zmq_msg_more(topicMsg) || rc<0)
799 : {
800 0 : zmq_msg_close(topicMsg);
801 0 : delete topicMsg;
802 : receiveStatus=-1;
803 0 : break;
804 : }
805 0 : receiveStatus+=rc;
806 :
807 0 : zmq_msg_t* dataMsg = new zmq_msg_t;
808 0 : rc = zmq_msg_init(dataMsg);
809 0 : rc = zmq_msg_recv(dataMsg, socket, flags);
810 0 : if (rc<0)
811 : {
812 0 : zmq_msg_close(topicMsg);
813 0 : zmq_msg_close(dataMsg);
814 0 : delete topicMsg;
815 0 : delete dataMsg;
816 : receiveStatus=-1;
817 0 : break;
818 : }
819 0 : receiveStatus+=rc;
820 :
821 0 : message->push_back(std::make_pair(topicMsg,dataMsg));
822 :
823 0 : int more=0;
824 0 : size_t moreLength = sizeof(more);
825 0 : rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreLength);
826 0 : if (!more) break;
827 0 : }
828 0 : return receiveStatus;
829 : }
830 :
831 : //_______________________________________________________________________________________
832 : int alizmq_msg_recv(aliZMQmsgStr* message, void* socket, int flags)
833 : {
834 : int rc = -1;
835 : int receiveStatus=0;
836 0 : while (true)
837 : {
838 0 : zmq_msg_t topicMsg;
839 0 : rc = zmq_msg_init(&topicMsg);
840 0 : rc = zmq_msg_recv(&topicMsg, socket, flags);
841 0 : if (!zmq_msg_more(&topicMsg) || receiveStatus<0)
842 : {
843 0 : zmq_msg_close(&topicMsg);
844 : receiveStatus=-1;
845 0 : break;
846 : }
847 0 : receiveStatus+=rc;
848 :
849 0 : zmq_msg_t dataMsg;
850 0 : rc = zmq_msg_init(&dataMsg);
851 0 : rc = zmq_msg_recv(&dataMsg, socket, flags);
852 0 : if (receiveStatus<0)
853 : {
854 0 : zmq_msg_close(&topicMsg);
855 0 : zmq_msg_close(&dataMsg);
856 : receiveStatus=-1;
857 0 : break;
858 : }
859 0 : receiveStatus+=rc;
860 :
861 0 : std::string data;
862 0 : std::string topic;
863 0 : topic.assign(static_cast<char*>(zmq_msg_data(&topicMsg)), zmq_msg_size(&topicMsg));
864 0 : data.assign(static_cast<char*>(zmq_msg_data(&dataMsg)), zmq_msg_size(&dataMsg));
865 :
866 0 : message->push_back(std::make_pair(topic,data));
867 :
868 0 : rc = zmq_msg_close(&topicMsg);
869 0 : rc = zmq_msg_close(&dataMsg);
870 :
871 0 : int more=0;
872 0 : size_t moreLength = sizeof(more);
873 0 : rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreLength);
874 0 : if (!more) break;
875 0 : }
876 0 : return receiveStatus;
877 0 : }
878 :
879 : //_______________________________________________________________________________________
880 : TString AliOptionParser::GetFullArgString(int argc, char** argv)
881 : {
882 0 : TString argString;
883 0 : TString argument="";
884 0 : if (argc>0) {
885 0 : for (int i=1; i<argc; i++) {
886 0 : argument=argv[i];
887 0 : if (argument.IsNull()) continue;
888 0 : if (!argString.IsNull()) argString+=" ";
889 0 : argString+=argument;
890 : }
891 0 : }
892 : return argString;
893 0 : }
894 :
895 : //______________________________________________________________________________
896 : int AliOptionParser::ProcessOptionString(TString arguments)
897 : {
898 : //process passed options, return number of processed valid options
899 0 : aliStringVec* options = TokenizeOptionString(arguments);
900 : int nOptions=0;
901 0 : for (aliStringVec::iterator i=options->begin(); i!=options->end(); ++i)
902 : {
903 : //printf(" %s : %s\n", i->first.data(), i->second.data());
904 0 : if (ProcessOption(i->first,i->second)<0)
905 : {
906 : nOptions=-1;
907 0 : break;
908 : }
909 0 : nOptions++;
910 : }
911 0 : delete options; //tidy up
912 :
913 0 : return nOptions;
914 0 : }
915 :
916 : //______________________________________________________________________________
917 : aliStringVec* AliOptionParser::TokenizeOptionString(const TString strIn)
918 : {
919 : //options have the form:
920 : // -option value
921 : // -option=value
922 : // -option
923 : // --option value
924 : // --option=value
925 : // --option
926 : // option=value
927 : // option value
928 : // (value can also be a string like 'some string')
929 : //
930 : // options can be separated by ' ' arbitrarily combined, e.g:
931 : //"-option option1=value1 --option2 value2, -option4=\'some string\'"
932 :
933 : //optionRE by construction contains a pure option name as 3rd submatch (without --,-, =)
934 : //valueRE does NOT match options
935 0 : TPRegexp optionRE("(?:(-{1,2})|((?='?[^=]+=?)))"
936 : "((?(2)(?:(?(?=')'(?:[^'\\\\]++|\\.)*+'|[^ =]+))(?==?))"
937 : "(?(1)[^ =]+(?=[= $])))");
938 0 : TPRegexp valueRE("(?(?!(-{1,2}|[^ =]+=))"
939 : "(?(?=')'(?:[^'\\\\]++|\\.)*+'"
940 : "|[^ =]+))");
941 :
942 0 : aliStringVec* options = new aliStringVec;
943 :
944 : //first split in lines (by newline) and ignore comments
945 0 : TObjArray* lines = strIn.Tokenize("\n\r");
946 0 : TIter nextLine(lines);
947 0 : while (TObjString* objString = (TObjString*)nextLine())
948 : {
949 0 : TString line = objString->String();
950 0 : if (line.BeginsWith("#")) continue;
951 0 : if (line.BeginsWith("//")) continue;
952 0 : TArrayI pos;
953 0 : const TString mods="";
954 : Int_t start = 0;
955 0 : while (1) {
956 : Int_t prevStart=start;
957 0 : TString optionStr="";
958 0 : TString valueStr="";
959 :
960 : //check if we have a new option in this field
961 0 : Int_t nOption=optionRE.Match(line,mods,start,10,&pos);
962 0 : if (nOption>0)
963 : {
964 0 : optionStr = line(pos[6],pos[7]-pos[6]);
965 0 : optionStr=optionStr.Strip(TString::kBoth,'\n');
966 0 : optionStr=optionStr.Strip(TString::kBoth,'\'');
967 0 : optionStr=optionStr.Strip(TString::kLeading,'-');
968 0 : start=pos[1]; //update the current character to the end of match
969 0 : }
970 :
971 : //check if the next field is a value
972 0 : Int_t nValue=valueRE.Match(line,mods,start,10,&pos);
973 0 : if (nValue>0)
974 : {
975 0 : valueStr = line(pos[0],pos[1]-pos[0]);
976 0 : valueStr=valueStr.Strip(TString::kBoth,'\n');
977 0 : valueStr=valueStr.Strip(TString::kBoth,'\'');
978 0 : start=pos[1]; //update the current character to the end of match
979 0 : }
980 :
981 : //skip empty entries
982 0 : if (nOption>0 || nValue>0)
983 : {
984 0 : options->push_back(std::make_pair(optionStr.Data(),valueStr.Data()));
985 0 : }
986 :
987 0 : if (start>=line.Length()-1 || start==prevStart ) break;
988 0 : }
989 :
990 0 : }//while(nextLine())
991 0 : lines->Delete();
992 0 : delete lines;
993 :
994 : return options;
995 0 : }
996 :
997 : //tokenize a std::string
998 : using namespace std;
999 : vector<string> TokenizeString(const string input, const string delimiters)
1000 : {
1001 0 : vector<string> output;
1002 0 : output.reserve(10);
1003 : size_t start = 0;
1004 0 : size_t end = input.find_first_of(delimiters);
1005 0 : do
1006 : {
1007 0 : output.push_back(input.substr(start, end-start));
1008 0 : start = ++end;
1009 0 : end = input.find_first_of(delimiters, start);
1010 0 : if (end == string::npos)
1011 : {
1012 0 : output.push_back(input.substr(start, input.length()));
1013 0 : }
1014 0 : } while (end != string::npos);
1015 : return output;
1016 0 : }
1017 :
1018 : //tokenize a string delimited by semi-colons and return a map
1019 : //of key value pairs, "KEY=VALUE;key2=value2"
1020 : stringMap ParseParamString(const string paramString)
1021 : {
1022 0 : vector<string> tokens = TokenizeString( paramString, ";");
1023 0 : stringMap output;
1024 0 : for (vector<string>::iterator i=tokens.begin(); i!=tokens.end(); ++i)
1025 : {
1026 0 : if (i->empty()) continue;
1027 0 : size_t pos = i->find_first_of("=");
1028 0 : if (pos == string::npos)
1029 : {
1030 0 : output[*i] = "";
1031 : }
1032 : else
1033 : {
1034 0 : output[i->substr(0,pos)] = i->substr(pos+1,string::npos);
1035 : }
1036 0 : }
1037 : return output;
1038 0 : }
1039 :
1040 : //a much faster version of the param string parser - just gives you one value
1041 : std::string GetParamString(const std::string param, const std::string paramstring)
1042 : {
1043 0 : size_t start = paramstring.find(param+"=");
1044 0 : if (start==std::string::npos) return "";
1045 0 : start = paramstring.find_first_of("=",start);
1046 0 : if (start==std::string::npos) return "";
1047 0 : size_t end = paramstring.find_first_of(";", start);
1048 0 : start++;
1049 0 : return paramstring.substr(start,end-start);
1050 0 : }
1051 :
|