LCOV - code coverage report
Current view: top level - HLT/BASE/util - AliZMQhelpers.cxx (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 569 0.0 %
Date: 2016-06-14 17:26:59 Functions: 0 39 0.0 %

          Line data    Source code
       1             : // blame: Mikolaj Krzewicki, mikolaj.krzewicki@cern.ch
       2             : //see header file for details
       3             : //
       4             : 
       5             : #include "AliZMQhelpers.h"
       6             : 
       7             : #include "zmq.h"
       8             : #include <cstring>
       9             : #include <cassert>
      10             : #include <unistd.h>
      11             : #include <vector>
      12             : 
      13             : #include "AliHLTDataTypes.h"
      14             : #include "TString.h"
      15             : #include "TObjString.h"
      16             : #include "TPRegexp.h"
      17             : #include "TObjArray.h"
      18             : #include "AliHLTMessage.h"
      19             : #include "TStreamerInfo.h"
      20             : #include "TClass.h"
      21             : 
      22             : //init the shared context to null
      23             : void* AliZMQhelpers::gZMQcontext = NULL;
      24             : 
      25             : //_______________________________________________________________________________________
      26             : void* alizmq_context()
      27             : {
      28           0 :   if (!AliZMQhelpers::gZMQcontext) AliZMQhelpers::gZMQcontext=zmq_ctx_new();
      29           0 :   return AliZMQhelpers::gZMQcontext;
      30             : }
      31             : 
      32             : //_______________________________________________________________________________________
      33             : int alizmq_detach (void *self, const char *endpoints, bool serverish)
      34             : {
      35           0 :     assert (self);
      36           0 :     if (!endpoints)
      37           0 :         return 0;
      38             : 
      39           0 :     if (strlen(endpoints)<2)
      40           0 :         return 0;
      41             : 
      42             :     //  We hold each individual endpoint here
      43           0 :     char endpoint [256];
      44           0 :     while (*endpoints) {
      45           0 :         const char *delimiter = strchr (endpoints, ',');
      46           0 :         if (!delimiter)
      47           0 :             delimiter = endpoints + strlen (endpoints);
      48           0 :         if (delimiter - endpoints > 255)
      49           0 :             return -1;
      50           0 :         memcpy (endpoint, endpoints, delimiter - endpoints);
      51           0 :         endpoint [delimiter - endpoints] = 0;
      52             : 
      53             :         int rc;
      54           0 :         if (endpoint [0] == '@')
      55           0 :             rc = zmq_bind (self, endpoint + 1);
      56             :         else
      57           0 :         if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
      58           0 :             rc = zmq_connect (self, endpoint + 1);
      59             :         else
      60           0 :         if (serverish)
      61           0 :             rc = zmq_unbind (self, endpoint);
      62             :         else
      63           0 :             rc = zmq_disconnect (self, endpoint);
      64             : 
      65           0 :         if (rc == -1)
      66           0 :             return -1;          //  Bad endpoint syntax
      67             : 
      68           0 :         if (*delimiter == 0)
      69           0 :             break;
      70           0 :         endpoints = delimiter + 1;
      71           0 :     }
      72           0 :     return 0;
      73           0 : }
      74             : 
      75             : //_______________________________________________________________________________________
      76             : int alizmq_attach (void *self, const char *endpoints, bool serverish)
      77             : {
      78           0 :     assert (self);
      79           0 :     if (!endpoints)
      80           0 :         return 0;
      81           0 :     if (strlen(endpoints)<2)
      82           0 :         return 0;
      83             : 
      84             :     //  We hold each individual endpoint here
      85           0 :     char endpoint [256];
      86           0 :     while (*endpoints) {
      87           0 :         const char *delimiter = strchr (endpoints, ',');
      88           0 :         if (!delimiter)
      89           0 :             delimiter = endpoints + strlen (endpoints);
      90           0 :         if (delimiter - endpoints > 255)
      91           0 :             return -1;
      92           0 :         memcpy (endpoint, endpoints, delimiter - endpoints);
      93           0 :         endpoint [delimiter - endpoints] = 0;
      94             : 
      95             :         int rc;
      96           0 :         if (endpoint [0] == '@')
      97           0 :             rc = zmq_bind (self, endpoint + 1);
      98             :         else
      99           0 :         if (endpoint [0] == '>' || endpoint [0] == '-' || endpoint [0] == '+' )
     100           0 :             rc = zmq_connect (self, endpoint + 1);
     101             :         else
     102           0 :         if (serverish)
     103           0 :             rc = zmq_bind (self, endpoint);
     104             :         else
     105           0 :             rc = zmq_connect (self, endpoint);
     106             : 
     107           0 :         if (rc == -1)
     108           0 :             return -1;          //  Bad endpoint syntax
     109             : 
     110           0 :         if (*delimiter == 0)
     111           0 :             break;
     112           0 :         endpoints = delimiter + 1;
     113           0 :     }
     114           0 :     return 0;
     115           0 : }
     116             : 
     117             : //_______________________________________________________________________________________
     118             : int alizmq_socket_state(void* socket)
     119             : {
     120           0 :   int events=0;
     121           0 :   size_t len = sizeof(events);
     122           0 :   zmq_getsockopt(socket, ZMQ_EVENTS, &events, &len);
     123           0 :   return events;
     124           0 : }
     125             : 
     126             : //_______________________________________________________________________________________
     127             : int alizmq_socket_type(void* socket)
     128             : {
     129             :   //get the type of the socket
     130           0 :   int type=-1;
     131           0 :   size_t typeLen=sizeof(type);
     132             :   int rc=0;
     133           0 :   rc = zmq_getsockopt(socket, ZMQ_TYPE, &type, &typeLen);
     134           0 :   if (rc<0) return rc;
     135           0 :   return type;
     136           0 : }
     137             : 
     138             : //_______________________________________________________________________________________
     139             : int alizmq_socket_type(std::string config)
     140             : {
     141           0 :   if (config.compare(0,3,"PUB")==0) return ZMQ_PUB;
     142           0 :   else if (config.compare(0,3,"SUB")==0) return ZMQ_SUB;
     143           0 :   else if (config.compare(0,3,"REP")==0) return ZMQ_REP;
     144           0 :   else if (config.compare(0,3,"REQ")==0) return ZMQ_REQ;
     145           0 :   else if (config.compare(0,4,"PUSH")==0) return ZMQ_PUSH;
     146           0 :   else if (config.compare(0,4,"PULL")==0) return ZMQ_PULL;
     147           0 :   else if (config.compare(0,6,"DEALER")==0) return ZMQ_DEALER;
     148           0 :   else if (config.compare(0,6,"ROUTER")==0) return ZMQ_ROUTER;
     149           0 :   else if (config.compare(0,6,"STREAM")==0) return ZMQ_STREAM;
     150           0 :   else if (config.compare(0,4,"PAIR")==0) return ZMQ_PAIR;
     151           0 :   else if (config.compare(0,4,"XSUB")==0) return ZMQ_XSUB;
     152           0 :   else if (config.compare(0,4,"XPUB")==0) return ZMQ_XPUB;
     153             :   
     154             :   //printf("Invalid socket type %s\n", config.c_str());
     155           0 :   return -1;
     156           0 : }
     157             : 
     158             : //_______________________________________________________________________________________
     159             : const char* alizmq_socket_name(int socketType)
     160             : {
     161           0 :   switch (socketType)
     162             :   {
     163           0 :     case ZMQ_PUB: return "PUB";
     164           0 :     case ZMQ_SUB: return "SUB";
     165           0 :     case ZMQ_REP: return "REP";
     166           0 :     case ZMQ_REQ: return "REQ";
     167           0 :     case ZMQ_PUSH: return "PUSH";
     168           0 :     case ZMQ_PULL: return "PULL";
     169           0 :     case ZMQ_DEALER: return "DEALER";
     170           0 :     case ZMQ_ROUTER: return "ROUTER";
     171           0 :     case ZMQ_STREAM: return "STREAM";
     172           0 :     case ZMQ_PAIR: return "PAIR";
     173           0 :     case ZMQ_XPUB: return "XPUB";
     174           0 :     case ZMQ_XSUB: return "XSUB";
     175           0 :     default: return "INVALID";
     176             :   }
     177           0 : }
     178             : 
     179             : //_______________________________________________________________________________________
     180             : int alizmq_socket_close(void*& socket, int linger)
     181             : {
     182           0 :   zmq_setsockopt(socket, ZMQ_LINGER, &linger, sizeof(linger));
     183           0 :   int rc = zmq_close(socket);
     184           0 :   if (rc>=0) socket = NULL;
     185           0 :   return rc;
     186             : }
     187             : 
     188             : //_______________________________________________________________________________________
     189             : int alizmq_socket_init(void*& socket, void* context, std::string config, int timeout, int highWaterMark)
     190             : {
     191             :   int rc = 0;
     192             :   int zmqSocketMode = 0;
     193           0 :   std::string zmqEndpoints = "";
     194             : 
     195           0 :   size_t configStartPos = config.find_first_not_of(" \t\n");
     196           0 :   size_t configEndPos = config.find_last_not_of(" \t\n");
     197           0 :   if (configStartPos!=std::string::npos && configEndPos!=std::string::npos)
     198           0 :   { config = config.substr(configStartPos,configEndPos-configStartPos+1); }
     199             : 
     200           0 :   if (config.empty()) {
     201           0 :     alizmq_socket_close(socket);
     202           0 :     socket = NULL;
     203           0 :     return 999999;
     204             :   }
     205             : 
     206           0 :   std::size_t found = config.find_first_of("@>-+");
     207           0 :   if (found == 0)
     208             :   {
     209             :     //printf("misformed socket config string %s\n", config.c_str());
     210           0 :     return -1;
     211             :   }
     212             :   
     213           0 :   zmqSocketMode = alizmq_socket_type(config);
     214             :   
     215           0 :   if (found!=std::string::npos)
     216           0 :   { zmqEndpoints=config.substr(found,std::string::npos); }
     217             : 
     218             :   bool newSocket=true;
     219             :   //init the ZMQ socket
     220           0 :   if (socket)
     221             :   {
     222             :     newSocket=false;
     223           0 :     int lingerValue = 10;
     224           0 :     rc = zmq_setsockopt(socket, ZMQ_LINGER, &lingerValue, sizeof(lingerValue));
     225           0 :     if (rc!=0) 
     226             :     {
     227             :       //printf("cannot set linger 0 on socket before closing\n");
     228           0 :       return -2;
     229             :     }
     230           0 :     rc = zmq_close(socket);
     231           0 :     if (rc!=0)
     232             :     {
     233             :       //printf("zmq_close() says: %s\n",zmq_strerror(errno));
     234           0 :       return -3;
     235             :     }
     236           0 :   }
     237             : 
     238           0 :   socket  = zmq_socket(context, zmqSocketMode);
     239             : 
     240             :   //set socket options
     241           0 :   rc += zmq_setsockopt(socket, ZMQ_RCVHWM, &highWaterMark, sizeof(highWaterMark));
     242           0 :   rc += zmq_setsockopt(socket, ZMQ_SNDHWM, &highWaterMark, sizeof(highWaterMark));
     243           0 :   rc += zmq_setsockopt(socket, ZMQ_RCVTIMEO, &timeout, sizeof(timeout));
     244           0 :   rc += zmq_setsockopt(socket, ZMQ_SNDTIMEO, &timeout, sizeof(timeout));
     245           0 :   if (rc!=0)
     246             :   {
     247             :     //printf("cannot set socket options\n");
     248           0 :     return -4;
     249             :   }
     250             : 
     251             :   //by default subscribe to everything if we happen to be SUB
     252           0 :   if (zmqSocketMode == ZMQ_SUB) {
     253           0 :     rc = zmq_setsockopt(socket, ZMQ_SUBSCRIBE, "", 0);
     254           0 :   }
     255             : 
     256             :   //connect the socket to the endpoints
     257             :   //when reinitializing sometimes it is not possible to bind the same port again fast,
     258             :   //we need to retry a few times if we are indeed reconnecting, otherwise we just exit
     259             :   int i=100;
     260           0 :   while (i-->0)
     261             :   {
     262           0 :     rc = alizmq_attach(socket,  zmqEndpoints.c_str() );
     263           0 :     if ( rc==0 || newSocket ) break;
     264           0 :     usleep(100000);
     265             :   }
     266           0 :   if (rc!=0) 
     267             :   {
     268             :     //printf("cannot attach to %s\n",zmqEndpoints.c_str());
     269           0 :     return -5;
     270             :   }
     271             : 
     272           0 :   int lingerValue = 0;
     273           0 :   rc += zmq_setsockopt(socket, ZMQ_LINGER, &lingerValue, sizeof(lingerValue));
     274             :   //printf("socket mode: %s, endpoints: %s\n",alizmq_socket_name(zmqSocketMode), zmqEndpoints.c_str());
     275             : 
     276             :   //reset the object containers
     277             :   return zmqSocketMode;
     278           0 : }
     279             : 
     280             : //_______________________________________________________________________________________
     281             : int alizmq_msg_add(aliZMQmsg* message, const std::string& topic, const std::string& data)
     282             : {
     283             :   //add a frame to the mesage
     284             :   int rc = 0;
     285             :   
     286             :   //prepare topic msg
     287           0 :   zmq_msg_t* topicMsg = new zmq_msg_t;
     288           0 :   rc = zmq_msg_init_size( topicMsg, topic.size());
     289           0 :   if (rc<0) {
     290           0 :     zmq_msg_close(topicMsg);
     291           0 :     delete topicMsg;
     292           0 :     return -1;
     293             :   }
     294           0 :   memcpy(zmq_msg_data(topicMsg),topic.data(),topic.size());
     295             : 
     296             :   //prepare data msg
     297           0 :   zmq_msg_t* dataMsg = new zmq_msg_t;
     298           0 :   rc = zmq_msg_init_size( dataMsg, data.size());
     299           0 :   if (rc<0) {
     300           0 :     zmq_msg_close(topicMsg);
     301           0 :     zmq_msg_close(dataMsg);
     302           0 :     delete topicMsg;
     303           0 :     delete dataMsg;
     304           0 :     return -1;
     305             :   }
     306           0 :   memcpy(zmq_msg_data(dataMsg),data.data(),data.size());
     307             : 
     308             :   //add the frame to the message
     309           0 :   message->push_back(std::make_pair(topicMsg,dataMsg));
     310           0 :   return message->size();
     311           0 : }
     312             : 
     313             : //_______________________________________________________________________________________
     314             : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, void* data, int size)
     315             : {
     316             :   //add a frame to the mesage
     317             :   int rc = 0;
     318             :   
     319             :   //prepare topic msg
     320           0 :   zmq_msg_t* topicMsg = new zmq_msg_t;
     321           0 :   rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
     322           0 :   if (rc<0) {
     323           0 :     zmq_msg_close(topicMsg);
     324           0 :     delete topicMsg;
     325           0 :     return -1;
     326             :   }
     327           0 :   memcpy(zmq_msg_data(topicMsg),topic,sizeof(*topic));
     328             : 
     329             :   //prepare data msg
     330           0 :   zmq_msg_t* dataMsg = new zmq_msg_t;
     331           0 :   rc = zmq_msg_init_size( dataMsg, size);
     332           0 :   if (rc<0) {
     333           0 :     zmq_msg_close(topicMsg);
     334           0 :     zmq_msg_close(dataMsg);
     335           0 :     delete topicMsg;
     336           0 :     delete dataMsg;
     337           0 :     return -1;
     338             :   }
     339           0 :   memcpy(zmq_msg_data(dataMsg),data,size);
     340             : 
     341             :   //add the frame to the message
     342           0 :   message->push_back(std::make_pair(topicMsg,dataMsg));
     343           0 :   return message->size();
     344           0 : }
     345             : 
     346             : //_______________________________________________________________________________________
     347             : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, const std::string& data)
     348             : {
     349             :   //add a frame to the mesage
     350             :   int rc = 0;
     351             :   
     352             :   //prepare topic msg
     353           0 :   zmq_msg_t* topicMsg = new zmq_msg_t;
     354           0 :   rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
     355           0 :   if (rc<0) {
     356           0 :     zmq_msg_close(topicMsg);
     357           0 :     delete topicMsg;
     358           0 :     return -1;
     359             :   }
     360           0 :   memcpy(zmq_msg_data(topicMsg),topic,sizeof(*topic));
     361             : 
     362             :   //prepare data msg
     363           0 :   zmq_msg_t* dataMsg = new zmq_msg_t;
     364           0 :   rc = zmq_msg_init_size( dataMsg, data.size());
     365           0 :   if (rc<0) {
     366           0 :     zmq_msg_close(topicMsg);
     367           0 :     zmq_msg_close(dataMsg);
     368           0 :     delete topicMsg;
     369           0 :     delete dataMsg;
     370           0 :     return -1;
     371             :   }
     372           0 :   memcpy(zmq_msg_data(dataMsg),data.data(),data.size());
     373             : 
     374             :   //add the frame to the message
     375           0 :   message->push_back(std::make_pair(topicMsg,dataMsg));
     376           0 :   return message->size();
     377           0 : }
     378             : 
     379             : //_______________________________________________________________________________________
     380             : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, TObject* object,
     381             :                    int compression, aliZMQrootStreamerInfo* streamers)
     382             : {
     383             :   //add a frame to the mesage
     384             :   int rc = 0;
     385             :   
     386             :   //prepare topic msg
     387           0 :   zmq_msg_t* topicMsg = new zmq_msg_t;
     388           0 :   rc = zmq_msg_init_size( topicMsg, sizeof(*topic));
     389           0 :   if (rc<0) {
     390           0 :     zmq_msg_close(topicMsg);
     391           0 :     delete topicMsg;
     392           0 :     return -1;
     393             :   }
     394           0 :   memcpy(zmq_msg_data(topicMsg), topic, sizeof(*topic));
     395             : 
     396             :   //prepare data msg
     397           0 :   AliHLTMessage* tmessage = AliHLTMessage::Stream(object, compression, 0, streamers);
     398           0 :   if (!tmessage) {
     399           0 :     zmq_msg_close(topicMsg);
     400           0 :     delete topicMsg;
     401           0 :     return -1;
     402             :   }
     403             : 
     404           0 :   if (streamers) {
     405           0 :     alizmq_update_streamerlist(streamers, tmessage->GetStreamerInfos());
     406           0 :   }
     407             : 
     408           0 :   zmq_msg_t* dataMsg = new zmq_msg_t;
     409           0 :   rc = zmq_msg_init_data( dataMsg, tmessage->Buffer(), tmessage->Length(),
     410           0 :        alizmq_deleteTObject, tmessage);
     411           0 :   if (rc<0) {
     412           0 :     zmq_msg_close(topicMsg);
     413           0 :     zmq_msg_close(dataMsg);
     414           0 :     delete topicMsg;
     415           0 :     delete dataMsg;
     416           0 :     return -1;
     417             :   }
     418             :   
     419             :   //add the frame to the message
     420           0 :   message->push_back(std::make_pair(topicMsg,dataMsg));
     421           0 :   return message->size();
     422           0 : }
     423             : 
     424             : //_______________________________________________________________________________________
     425             : int alizmq_msg_send(aliZMQmsg* message, void* socket, int flagsUser)
     426             : {
     427             :   int nBytes=0;
     428             :   int rc = 0;
     429           0 :   int flags = flagsUser | ZMQ_SNDMORE;
     430             :  
     431           0 :   for (aliZMQmsg::iterator i=message->begin(); i!=message->end(); ++i)
     432             :   {
     433           0 :     zmq_msg_t* topic = i->first;
     434           0 :     zmq_msg_t* data = i->second;
     435             : 
     436           0 :     rc = zmq_msg_send(topic, socket, flags);
     437           0 :     if (rc<0) break;
     438           0 :     nBytes+=rc;
     439             : 
     440           0 :     if (&*i == &*message->rbegin()) flags=flagsUser; //last frame
     441           0 :     rc = zmq_msg_send(data, socket, flags);
     442           0 :     if (rc<0) break;
     443           0 :     nBytes+=rc;
     444           0 :   }
     445           0 :   if (rc<0) nBytes=rc;
     446           0 :   return nBytes;
     447             : }
     448             : 
     449             : //_______________________________________________________________________________________
     450             : int alizmq_msg_send(std::string topic, std::string data, void* socket, int flags)
     451             : {
     452             :   int rc = 0;
     453           0 :   zmq_msg_t topicMsg;
     454           0 :   zmq_msg_init_size(&topicMsg, topic.size());
     455           0 :   memcpy(zmq_msg_data(&topicMsg), topic.data(), zmq_msg_size(&topicMsg));
     456           0 :   rc = zmq_msg_send(&topicMsg, socket, ZMQ_SNDMORE);
     457           0 :   if (rc<0) 
     458             :   {
     459             :     //printf("unable to send topic: %s\n", topic.c_str());
     460           0 :     zmq_msg_close(&topicMsg);
     461           0 :     return rc;
     462             :   }
     463             : 
     464           0 :   zmq_msg_t dataMsg;
     465           0 :   zmq_msg_init_size(&dataMsg, data.size());
     466           0 :   memcpy(zmq_msg_data(&dataMsg), data.data(), zmq_msg_size(&dataMsg));
     467           0 :   rc = zmq_msg_send(&dataMsg, socket, flags);
     468           0 :   if (rc<0) 
     469             :   {
     470             :     //printf("unable to send data: %s\n", data.c_str());
     471           0 :     zmq_msg_close(&dataMsg);
     472           0 :     return rc;
     473             :   }
     474           0 :   return rc;
     475           0 : }
     476             : 
     477             : //_______________________________________________________________________________________
     478             : int alizmq_msg_prepend_streamer_infos(aliZMQmsg* message, aliZMQrootStreamerInfo* streamers)
     479             : {
     480             :   //prepend the streamer info to the message as first block.
     481             :   int rc = 0;
     482             : 
     483           0 :   AliHLTDataTopic topic = kAliHLTDataTypeStreamerInfo;
     484           0 :   zmq_msg_t* topicMsg = new zmq_msg_t;
     485           0 :   rc = zmq_msg_init_size( topicMsg, sizeof(topic));
     486           0 :   if (rc<0) {
     487           0 :     zmq_msg_close(topicMsg);
     488           0 :     delete topicMsg;
     489           0 :     return -1;
     490             :   }
     491           0 :   memcpy(zmq_msg_data(topicMsg), &topic, sizeof(topic));
     492             : 
     493             :   //prepare data msg
     494           0 :   TObjArray listOfInfos;
     495           0 :   for (aliZMQrootStreamerInfo::const_iterator i=streamers->begin(); i!=streamers->end(); ++i) {
     496           0 :     listOfInfos.Add(*i);
     497             :   }
     498           0 :   AliHLTMessage* tmessage = AliHLTMessage::Stream(&listOfInfos, 1); //compress
     499           0 :   zmq_msg_t* dataMsg = new zmq_msg_t;
     500           0 :   rc = zmq_msg_init_data( dataMsg, tmessage->Buffer(), tmessage->Length(),
     501           0 :        alizmq_deleteTObject, tmessage);
     502           0 :   if (rc<0) {
     503           0 :     zmq_msg_close(topicMsg);
     504           0 :     zmq_msg_close(dataMsg);
     505           0 :     delete topicMsg;
     506           0 :     delete dataMsg;
     507           0 :     return -1;
     508             :   }
     509             : 
     510           0 :   message->insert(message->begin(),std::make_pair(topicMsg,dataMsg));
     511             : 
     512           0 :   return 0;
     513           0 : }
     514             : 
     515             : //_______________________________________________________________________________________
     516             : void alizmq_update_streamerlist(aliZMQrootStreamerInfo* streamers, const TObjArray* newStreamers)
     517             : {
     518             :   //update the list of streamers used
     519           0 :   if (!streamers) return;
     520           0 :   if (!newStreamers) return;
     521             : 
     522           0 :   for (int i=0; i<newStreamers->GetEntriesFast(); i++) {
     523           0 :     TVirtualStreamerInfo* info = const_cast<TVirtualStreamerInfo*> (
     524           0 :       static_cast<const TVirtualStreamerInfo*>((*newStreamers)[i]) );
     525           0 :     const char* name = info->GetName();
     526           0 :     int version = info->GetClassVersion();
     527             :     bool found=false;
     528           0 :     for (aliZMQrootStreamerInfo::iterator i=streamers->begin(); i!=streamers->end(); ++i)
     529             :     {
     530           0 :       const char* existingName = (*i)->GetName();
     531           0 :       int existingVersion = (*i)->GetClassVersion();
     532           0 :       if (name == existingName && version==existingVersion) {
     533             :         found=true;
     534           0 :         break;
     535             :       }
     536           0 :     }
     537           0 :     if (!found) { 
     538           0 :       streamers->push_back(info);
     539           0 :     }
     540           0 :   }
     541           0 : }
     542             : 
     543             : //_______________________________________________________________________________________
     544             : int alizmq_msg_iter_init_streamer_infos(aliZMQmsg::iterator it)
     545             : {
     546             :   int rc = 0;
     547           0 :   TObject* obj = NULL;
     548           0 :   rc = alizmq_msg_iter_data(it,obj); 
     549           0 :   TObjArray* pSchemas = dynamic_cast<TObjArray*>(obj);
     550           0 :   if (!pSchemas) {
     551           0 :     return -1;
     552             :   }
     553             : 
     554           0 :   pSchemas->SetOwner(kTRUE);
     555             : 
     556           0 :   for (int i=0; i<pSchemas->GetEntriesFast(); i++) {
     557           0 :     if (pSchemas->At(i)) {
     558           0 :       TStreamerInfo* pSchema=dynamic_cast<TStreamerInfo*>(pSchemas->At(i));
     559           0 :       if (pSchema) {
     560           0 :         int version=pSchema->GetClassVersion();
     561           0 :         TClass* pClass=TClass::GetClass(pSchema->GetName());
     562           0 :         if (pClass) {
     563           0 :           if (pClass->GetClassVersion()==version) {
     564             :             //AliDebug(0,Form("skipping schema definition %d version %d to class %s as this is the native version", i, version, pSchema->GetName()));
     565           0 :             continue;
     566             :           }
     567           0 :           TObjArray* pInfos=const_cast<TObjArray*>(pClass->GetStreamerInfos());
     568           0 :           if (pInfos /*&& version<pInfos->GetEntriesFast()*/) {
     569           0 :             TVirtualStreamerInfo* pInfo = dynamic_cast<TVirtualStreamerInfo*>(pInfos->At(version));
     570           0 :             if (pInfo==NULL) {
     571           0 :               pSchema->SetClass(pClass);
     572           0 :               pSchema->BuildOld();
     573           0 :               pInfos->AddAtAndExpand(pSchema, version);
     574           0 :               pSchemas->Remove(pSchema);
     575           0 :               printf("adding %s %i\n",pSchema->GetName(),version);
     576             :               //AliDebug(0,Form("adding schema definition %d version %d to class %s", i, version, pSchema->GetName()));
     577           0 :             } else {
     578           0 :               if (pInfo && pInfo->GetClassVersion()==version) {
     579             :                 //AliDebug(0,Form("schema definition %d version %d already available in class %s, skipping ...", i, version, pSchema->GetName()));
     580             :               } else {
     581             :                 //AliError(Form("can not verify version for already existing schema definition %d (%s) version %d: version of existing definition is %d", i, pSchema->GetName(), version, pInfo?pInfo->GetClassVersion():-1));
     582             :               }
     583             :             }
     584           0 :           } else {
     585             :             //AliError(Form("skipping schema definition %d (%s), unable to set version %d in info array of size %d", i, pSchema->GetName(), version, pInfos?pInfos->GetEntriesFast():-1));
     586             :           }
     587           0 :         } else {
     588             :           //AliError(Form("skipping schema definition %d (%s), unable to find class", i, pSchema->GetName()));
     589             :         }
     590           0 :       } else {
     591             :         //AliError(Form("skipping schema definition %d, not of TStreamerInfo", i));
     592             :       }
     593           0 :     }
     594             :   }
     595             : 
     596           0 :   delete pSchemas; //this destroys remaining schemas as pSchemas is set owner
     597           0 :   return 0;
     598           0 : }
     599             : 
     600             : //_______________________________________________________________________________________
     601             : int alizmq_msg_send(const AliHLTDataTopic& topic, TObject* object, void* socket, int flags, 
     602             :                     int compression, aliZMQrootStreamerInfo* streamers)
     603             : {
     604             :   int rc = 0;
     605             : 
     606           0 :   AliHLTMessage* tmessage = AliHLTMessage::Stream(object, compression);
     607           0 :   zmq_msg_t dataMsg;
     608           0 :   rc = zmq_msg_init_data( &dataMsg, tmessage->Buffer(), tmessage->Length(),
     609           0 :       alizmq_deleteTObject, tmessage);
     610             :   
     611           0 :   if (streamers) {
     612           0 :     alizmq_update_streamerlist(streamers, tmessage->GetStreamerInfos());
     613           0 :   }
     614             : 
     615             :   //then send the object topic
     616           0 :   rc = zmq_send( socket, &topic, sizeof(topic), ZMQ_SNDMORE );
     617           0 :   if (rc<0) 
     618             :   {
     619           0 :     zmq_msg_close(&dataMsg);
     620             :     //printf("unable to send topic: %s %s\n", topic.Description().c_str(), zmq_strerror(errno));
     621           0 :     return rc;
     622             :   }
     623             : 
     624             :   //send the object itself
     625           0 :   rc = zmq_msg_send(&dataMsg, socket, flags);
     626           0 :   if (rc<0) 
     627             :   {
     628             :     //printf("unable to send data: %s %s\n", tmessage->GetName(), zmq_strerror(errno));
     629           0 :     zmq_msg_close(&dataMsg);
     630           0 :     return rc;
     631             :   }
     632           0 :   return rc;
     633           0 : }
     634             : 
     635             : //______________________________________________________________________________
     636             : int alizmq_msg_send(const AliHLTDataTopic& topic, const std::string& data, void* socket, int flags)
     637             : {
     638             :   int rc = 0;
     639           0 :   rc = zmq_send( socket, &topic, sizeof(topic), ZMQ_SNDMORE );
     640           0 :   if (rc<0) 
     641             :   {
     642             :     //printf("unable to send topic: %s %s\n", topic.Description().c_str(), zmq_strerror(errno));
     643           0 :     return rc;
     644             :   }
     645             : 
     646           0 :   zmq_msg_t dataMsg;
     647           0 :   zmq_msg_init_size(&dataMsg, data.size());
     648           0 :   memcpy(zmq_msg_data(&dataMsg), data.data(), zmq_msg_size(&dataMsg));
     649           0 :   rc = zmq_msg_send(&dataMsg, socket, flags);
     650           0 :   if (rc<0) 
     651             :   {
     652             :     //printf("unable to send data: %s\n", data.c_str());
     653           0 :     zmq_msg_close(&dataMsg);
     654           0 :     return rc;
     655             :   }
     656           0 :   return rc;
     657           0 : }
     658             : 
     659             : //______________________________________________________________________________
     660             : void alizmq_deleteTObject(void*, void* object)
     661             : {
     662             :   //delete the TBuffer, for use in zmq_msg_init_data(...) only.
     663             :   //printf("deleteObject called! ZMQ just sent and destroyed the message!\n");
     664           0 :   TObject* tobject = static_cast<TObject*>(object);
     665           0 :   delete tobject;
     666           0 : }
     667             : 
     668             : //______________________________________________________________________________
     669             : void alizmq_deleteTopic(void*, void* object)
     670             : {
     671             :   //delete the TBuffer, for use in zmq_msg_init_data(...) only.
     672             :   //printf("deleteObject called! ZMQ just sent and destroyed the message!\n");
     673           0 :   AliHLTDataTopic* topic = static_cast<AliHLTDataTopic*>(object);
     674           0 :   delete topic;
     675           0 : }
     676             : 
     677             : 
     678             : //_______________________________________________________________________________________
     679             : int alizmq_msg_close(aliZMQmsg* message)
     680             : {
     681             :   int rc = 0;
     682           0 :   for (aliZMQmsg::iterator i=message->begin(); i!=message->end(); ++i)
     683             :   {
     684           0 :     int rc1 = zmq_msg_close(i->first);
     685           0 :     delete i->first; i->first=NULL;
     686           0 :     int rc2 = zmq_msg_close(i->second);
     687           0 :     delete (i->second); i->second=NULL;
     688             :   }
     689           0 :   message->clear();
     690           0 :   return 0;
     691             : }
     692             : 
     693             : //_______________________________________________________________________________________
     694             : int alizmq_msg_iter_check(aliZMQmsg::iterator it, const AliHLTDataTopic& topic)
     695             : {
     696           0 :   AliHLTDataTopic actualTopic;
     697           0 :   alizmq_msg_iter_topic(it, actualTopic);
     698           0 :   if (actualTopic == topic) return 0;
     699           0 :   return 1;
     700           0 : }
     701             : 
     702             : //_______________________________________________________________________________________
     703             : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const AliHLTDataTopic& topic)
     704             : {
     705           0 :   AliHLTDataTopic actualTopic;
     706           0 :   alizmq_msg_iter_topic(it, actualTopic);
     707           0 :   if (actualTopic.GetID() == topic.GetID()) return 0;
     708           0 :   return 1;
     709           0 : }
     710             : 
     711             : //_______________________________________________________________________________________
     712             : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const std::string& topic)
     713             : {
     714           0 :   AliHLTDataTopic actualTopic;
     715           0 :   alizmq_msg_iter_topic(it, actualTopic);
     716           0 :   std::string topicID = actualTopic.GetID();
     717           0 :   return topicID.compare(0,topic.size(),topic);
     718           0 : }
     719             : 
     720             : //_______________________________________________________________________________________
     721             : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, std::string& topic)
     722             : {
     723           0 :   zmq_msg_t* message = it->first;
     724           0 :   topic.assign((char*)zmq_msg_data(message),zmq_msg_size(message));
     725           0 :   return 0;
     726             : }
     727             : 
     728             : //_______________________________________________________________________________________
     729             : int alizmq_msg_iter_data(aliZMQmsg::iterator it, std::string& data)
     730             : {
     731           0 :   zmq_msg_t* message = it->second;
     732           0 :   data.assign((char*)zmq_msg_data(message),zmq_msg_size(message));
     733           0 :   return 0;
     734             : }
     735             : 
     736             : //_______________________________________________________________________________________
     737             : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, AliHLTDataTopic& topic)
     738             : {
     739           0 :   zmq_msg_t* message = it->first;
     740           0 :   memcpy(&topic, zmq_msg_data(message),std::min(zmq_msg_size(message),sizeof(topic)));
     741           0 :   return 0;
     742             : }
     743             : 
     744             : //_______________________________________________________________________________________
     745             : int alizmq_msg_iter_data(aliZMQmsg::iterator it, TObject*& object)
     746             : {
     747           0 :   zmq_msg_t* message = it->second;
     748           0 :   size_t size = zmq_msg_size(message);
     749           0 :   void* data = zmq_msg_data(message);
     750             : 
     751           0 :   object = AliHLTMessage::Extract(data, size);
     752           0 :   return 0;  
     753             : }
     754             : 
     755             : //_______________________________________________________________________________________
     756             : int alizmq_msg_copy(aliZMQmsg* dst, aliZMQmsg* src)
     757             : {
     758             :   //copy (append) src to dst
     759             :   int numberOfMessages=0;
     760           0 :   for (aliZMQmsg::iterator i=src->begin(); i!=src->end(); ++i)
     761             :   {
     762             :     int rc=0;
     763           0 :     zmq_msg_t* topicMsg = new zmq_msg_t;
     764           0 :     rc  = zmq_msg_init(topicMsg);
     765           0 :     rc += zmq_msg_copy(topicMsg, i->first);
     766           0 :     if (rc<0) numberOfMessages=-1;
     767             :     
     768           0 :     zmq_msg_t* dataMsg = new zmq_msg_t;
     769           0 :     rc  = zmq_msg_init(dataMsg);
     770           0 :     rc += zmq_msg_copy(dataMsg, i->second);
     771           0 :     if (rc<0) numberOfMessages=-1;
     772             : 
     773           0 :     if (numberOfMessages<0)
     774             :     {
     775           0 :       zmq_msg_close(topicMsg);
     776           0 :       zmq_msg_close(dataMsg);
     777           0 :       delete topicMsg;
     778           0 :       delete dataMsg;
     779           0 :       return -1;
     780             :     }
     781             :     
     782           0 :     dst->push_back(std::make_pair(topicMsg, dataMsg));
     783           0 :     numberOfMessages++;
     784           0 :   }
     785           0 :   return numberOfMessages;
     786           0 : }
     787             : 
     788             : //_______________________________________________________________________________________
     789             : int alizmq_msg_recv(aliZMQmsg* message, void* socket, int flags)
     790             : {
     791             :   int rc = -1;
     792             :   int receiveStatus=0;
     793           0 :   while (true)
     794             :   {
     795           0 :     zmq_msg_t* topicMsg = new zmq_msg_t;
     796           0 :     rc = zmq_msg_init(topicMsg);
     797           0 :     rc = zmq_msg_recv(topicMsg, socket, flags);
     798           0 :     if (!zmq_msg_more(topicMsg) || rc<0)
     799             :     {
     800           0 :       zmq_msg_close(topicMsg);
     801           0 :       delete topicMsg;
     802             :       receiveStatus=-1;
     803           0 :       break;
     804             :     }
     805           0 :     receiveStatus+=rc;
     806             : 
     807           0 :     zmq_msg_t* dataMsg = new zmq_msg_t;
     808           0 :     rc = zmq_msg_init(dataMsg);
     809           0 :     rc = zmq_msg_recv(dataMsg, socket, flags);
     810           0 :     if (rc<0)
     811             :     {
     812           0 :       zmq_msg_close(topicMsg);
     813           0 :       zmq_msg_close(dataMsg);
     814           0 :       delete topicMsg;
     815           0 :       delete dataMsg;
     816             :       receiveStatus=-1;
     817           0 :       break;
     818             :     }
     819           0 :     receiveStatus+=rc;
     820             : 
     821           0 :     message->push_back(std::make_pair(topicMsg,dataMsg));
     822             : 
     823           0 :     int more=0;
     824           0 :     size_t moreLength = sizeof(more);
     825           0 :     rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreLength);
     826           0 :     if (!more) break;
     827           0 :   }
     828           0 :   return receiveStatus;
     829             : }
     830             : 
     831             : //_______________________________________________________________________________________
     832             : int alizmq_msg_recv(aliZMQmsgStr* message, void* socket, int flags)
     833             : {
     834             :   int rc = -1;
     835             :   int receiveStatus=0;
     836           0 :   while (true)
     837             :   {
     838           0 :     zmq_msg_t topicMsg;
     839           0 :     rc = zmq_msg_init(&topicMsg);
     840           0 :     rc = zmq_msg_recv(&topicMsg, socket, flags);
     841           0 :     if (!zmq_msg_more(&topicMsg) || receiveStatus<0)
     842             :     {
     843           0 :       zmq_msg_close(&topicMsg);
     844             :       receiveStatus=-1;
     845           0 :       break;
     846             :     }
     847           0 :     receiveStatus+=rc;
     848             : 
     849           0 :     zmq_msg_t dataMsg;
     850           0 :     rc = zmq_msg_init(&dataMsg);
     851           0 :     rc = zmq_msg_recv(&dataMsg, socket, flags);
     852           0 :     if (receiveStatus<0)
     853             :     {
     854           0 :       zmq_msg_close(&topicMsg);
     855           0 :       zmq_msg_close(&dataMsg);
     856             :       receiveStatus=-1;
     857           0 :       break;
     858             :     }
     859           0 :     receiveStatus+=rc;
     860             : 
     861           0 :     std::string data;
     862           0 :     std::string topic;
     863           0 :     topic.assign(static_cast<char*>(zmq_msg_data(&topicMsg)), zmq_msg_size(&topicMsg));
     864           0 :     data.assign(static_cast<char*>(zmq_msg_data(&dataMsg)), zmq_msg_size(&dataMsg));
     865             : 
     866           0 :     message->push_back(std::make_pair(topic,data));
     867             : 
     868           0 :     rc = zmq_msg_close(&topicMsg);
     869           0 :     rc = zmq_msg_close(&dataMsg);
     870             :     
     871           0 :     int more=0;
     872           0 :     size_t moreLength = sizeof(more);
     873           0 :     rc = zmq_getsockopt(socket, ZMQ_RCVMORE, &more, &moreLength);
     874           0 :     if (!more) break;
     875           0 :   }
     876           0 :   return receiveStatus;
     877           0 : }
     878             : 
     879             : //_______________________________________________________________________________________
     880             : TString AliOptionParser::GetFullArgString(int argc, char** argv)
     881             : {
     882           0 :   TString argString;
     883           0 :   TString argument="";
     884           0 :   if (argc>0) {
     885           0 :     for (int i=1; i<argc; i++) {
     886           0 :       argument=argv[i];
     887           0 :       if (argument.IsNull()) continue;
     888           0 :       if (!argString.IsNull()) argString+=" ";
     889           0 :       argString+=argument;
     890             :     }
     891           0 :   }
     892             :   return argString;
     893           0 : }
     894             : 
     895             : //______________________________________________________________________________
     896             : int AliOptionParser::ProcessOptionString(TString arguments)
     897             : {
     898             :   //process passed options, return number of processed valid options
     899           0 :   aliStringVec* options = TokenizeOptionString(arguments);
     900             :   int nOptions=0;
     901           0 :   for (aliStringVec::iterator i=options->begin(); i!=options->end(); ++i)
     902             :   {
     903             :     //printf("  %s : %s\n", i->first.data(), i->second.data());
     904           0 :     if (ProcessOption(i->first,i->second)<0)
     905             :     {
     906             :       nOptions=-1;
     907           0 :       break;
     908             :     }
     909           0 :     nOptions++;
     910             :   }
     911           0 :   delete options; //tidy up
     912             : 
     913           0 :   return nOptions;
     914           0 : }
     915             : 
     916             : //______________________________________________________________________________
     917             : aliStringVec* AliOptionParser::TokenizeOptionString(const TString strIn)
     918             : {
     919             :   //options have the form:
     920             :   // -option value
     921             :   // -option=value
     922             :   // -option
     923             :   // --option value
     924             :   // --option=value
     925             :   // --option
     926             :   // option=value
     927             :   // option value
     928             :   // (value can also be a string like 'some string')
     929             :   //
     930             :   // options can be separated by ' ' arbitrarily combined, e.g:
     931             :   //"-option option1=value1 --option2 value2, -option4=\'some string\'"
     932             : 
     933             :   //optionRE by construction contains a pure option name as 3rd submatch (without --,-, =)
     934             :   //valueRE does NOT match options
     935           0 :   TPRegexp optionRE("(?:(-{1,2})|((?='?[^=]+=?)))"
     936             :                     "((?(2)(?:(?(?=')'(?:[^'\\\\]++|\\.)*+'|[^ =]+))(?==?))"
     937             :                     "(?(1)[^ =]+(?=[= $])))");
     938           0 :   TPRegexp valueRE("(?(?!(-{1,2}|[^ =]+=))"
     939             :                    "(?(?=')'(?:[^'\\\\]++|\\.)*+'"
     940             :                    "|[^ =]+))");
     941             : 
     942           0 :   aliStringVec* options = new aliStringVec;
     943             : 
     944             :   //first split in lines (by newline) and ignore comments
     945           0 :   TObjArray* lines = strIn.Tokenize("\n\r");
     946           0 :   TIter nextLine(lines);
     947           0 :   while (TObjString* objString = (TObjString*)nextLine())
     948             :   {
     949           0 :   TString line = objString->String();
     950           0 :   if (line.BeginsWith("#")) continue;
     951           0 :   if (line.BeginsWith("//")) continue;
     952           0 :   TArrayI pos;
     953           0 :   const TString mods="";
     954             :   Int_t start = 0;
     955           0 :   while (1) {
     956             :     Int_t prevStart=start;
     957           0 :     TString optionStr="";
     958           0 :     TString valueStr="";
     959             : 
     960             :     //check if we have a new option in this field
     961           0 :     Int_t nOption=optionRE.Match(line,mods,start,10,&pos);
     962           0 :     if (nOption>0)
     963             :     {
     964           0 :       optionStr = line(pos[6],pos[7]-pos[6]);
     965           0 :       optionStr=optionStr.Strip(TString::kBoth,'\n');
     966           0 :       optionStr=optionStr.Strip(TString::kBoth,'\'');
     967           0 :       optionStr=optionStr.Strip(TString::kLeading,'-');
     968           0 :       start=pos[1]; //update the current character to the end of match
     969           0 :     }
     970             : 
     971             :     //check if the next field is a value
     972           0 :     Int_t nValue=valueRE.Match(line,mods,start,10,&pos);
     973           0 :     if (nValue>0)
     974             :     {
     975           0 :       valueStr = line(pos[0],pos[1]-pos[0]);
     976           0 :       valueStr=valueStr.Strip(TString::kBoth,'\n');
     977           0 :       valueStr=valueStr.Strip(TString::kBoth,'\'');
     978           0 :       start=pos[1]; //update the current character to the end of match
     979           0 :     }
     980             : 
     981             :     //skip empty entries
     982           0 :     if (nOption>0 || nValue>0)
     983             :     {
     984           0 :       options->push_back(std::make_pair(optionStr.Data(),valueStr.Data()));
     985           0 :     }
     986             : 
     987           0 :     if (start>=line.Length()-1 || start==prevStart ) break;
     988           0 :   }
     989             : 
     990           0 :   }//while(nextLine())
     991           0 :   lines->Delete();
     992           0 :   delete lines;
     993             : 
     994             :   return options;
     995           0 : }
     996             : 
     997             : //tokenize a std::string
     998             : using namespace std;
     999             : vector<string> TokenizeString(const string input, const string delimiters)
    1000             : {
    1001           0 :   vector<string> output;
    1002           0 :   output.reserve(10);
    1003             :   size_t start = 0;
    1004           0 :   size_t end = input.find_first_of(delimiters);
    1005           0 :   do
    1006             :   {
    1007           0 :     output.push_back(input.substr(start, end-start));
    1008           0 :     start = ++end;
    1009           0 :     end = input.find_first_of(delimiters, start);
    1010           0 :     if (end == string::npos)
    1011             :     {
    1012           0 :       output.push_back(input.substr(start, input.length()));
    1013           0 :     }
    1014           0 :   } while (end != string::npos);
    1015             :   return output;
    1016           0 : }
    1017             : 
    1018             : //tokenize a string delimited by semi-colons and return a map
    1019             : //of key value pairs, "KEY=VALUE;key2=value2"
    1020             : stringMap ParseParamString(const string paramString)
    1021             : {
    1022           0 :   vector<string> tokens = TokenizeString( paramString, ";");
    1023           0 :   stringMap output;
    1024           0 :   for (vector<string>::iterator i=tokens.begin(); i!=tokens.end(); ++i)
    1025             :   {
    1026           0 :     if (i->empty()) continue;
    1027           0 :     size_t pos = i->find_first_of("=");
    1028           0 :     if (pos == string::npos)
    1029             :     {
    1030           0 :       output[*i] = "";
    1031             :     }
    1032             :     else
    1033             :     {
    1034           0 :       output[i->substr(0,pos)] = i->substr(pos+1,string::npos);
    1035             :     }
    1036           0 :   }
    1037             :   return output;
    1038           0 : }
    1039             : 
    1040             : //a much faster version of the param string parser - just gives you one value
    1041             : std::string GetParamString(const std::string param, const std::string paramstring)
    1042             : {
    1043           0 :   size_t start = paramstring.find(param+"=");
    1044           0 :   if (start==std::string::npos) return "";
    1045           0 :   start = paramstring.find_first_of("=",start);
    1046           0 :   if (start==std::string::npos) return "";
    1047           0 :   size_t end = paramstring.find_first_of(";", start);
    1048           0 :   start++;
    1049           0 :   return paramstring.substr(start,end-start);
    1050           0 : }
    1051             : 

Generated by: LCOV version 1.11