Line data Source code
1 : #ifndef __AliZMQhelpers__
2 : #define __AliZMQhelpers__
3 :
4 : // blame: Mikolaj Krzewicki, mikolaj.krzewicki@cern.ch
5 : // some of it might be inspired by czmq.h
6 :
7 : namespace AliZMQhelpers {
8 : extern void* gZMQcontext; //a global ZMQ context
9 : }
10 :
11 : #include <string>
12 : #include <map>
13 : #include "TString.h"
14 : struct zmq_msg_t;
15 : struct AliHLTDataTopic;
16 : class TVirtualStreamerInfo;
17 :
18 : //convenience typedefs:
19 : //define a map of strings
20 : typedef std::map<std::string,std::string> stringMap;
21 : typedef std::pair<zmq_msg_t*, zmq_msg_t*> aliZMQframe;
22 : typedef std::vector<aliZMQframe> aliZMQmsg;
23 : typedef std::vector<std::pair<std::string, std::string> > aliZMQmsgStr;
24 : typedef std::vector<std::pair<std::string, std::string> > aliStringVec;
25 : typedef std::vector<TVirtualStreamerInfo*> aliZMQrootStreamerInfo;
26 :
27 : // Init and bind/connect a ZMQ socket using a string:
28 : // PUB@tcp://*:123123
29 : // SUB>tcp://localhost:123123,@tcp://*:454545
30 : // timeout is in ms, -1 is wait forever
31 : int alizmq_socket_init(void*& socket, void* context, std::string config, int timeout=-1, int highWaterMark=10);
32 : int alizmq_socket_close(void*& socket, int linger=0);
33 : int alizmq_socket_state(void* socket);
34 :
35 : //get the global context
36 : void* alizmq_context();
37 :
38 : // extract the socket mode from a config string
39 : int alizmq_socket_type(std::string config);
40 : int alizmq_socket_type(void* socket);
41 : const char* alizmq_socket_name(int socketType);
42 :
43 : // --------------------------------------------------------------------------
44 : // Attach a socket to zero or more endpoints. If endpoints is not null,
45 : // parses as list of ZeroMQ endpoints, separated by commas, and prefixed by
46 : // '@' (to bind the socket) or '>' (to attach the socket - alternative: '-').
47 : // Returns 0 if all endpoints were valid, or -1 if there was a syntax error.
48 : // If the endpoint does not start with '@' or '>'('-'), the serverish
49 : // argument defines whether it is used to bind (serverish = true)
50 : // or connect (serverish = false).
51 : int alizmq_attach (void *self, const char *endpoints, bool serverish=false);
52 : int alizmq_detach (void *self, const char *endpoints, bool serverish=false);
53 :
54 : //general multipart messages (aliZMQmsg)
55 : //to access, just iterate over it.
56 : int alizmq_msg_recv(aliZMQmsg* message, void* socket, int flags);
57 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, TObject* object, int compression=0, aliZMQrootStreamerInfo* streamers=NULL);
58 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, const std::string& data);
59 : int alizmq_msg_add(aliZMQmsg* message, const AliHLTDataTopic* topic, void* buffer, int size);
60 : int alizmq_msg_add(aliZMQmsg* message, const std::string& topic, const std::string& data);
61 : int alizmq_msg_copy(aliZMQmsg* dst, aliZMQmsg* src);
62 : int alizmq_msg_send(aliZMQmsg* message, void* socket, int flags);
63 : int alizmq_msg_close(aliZMQmsg* message);
64 :
65 : //ROOT streamers
66 : int alizmq_msg_prepend_streamer_infos(aliZMQmsg* message, aliZMQrootStreamerInfo* streamers);
67 : int alizmq_msg_iter_init_streamer_infos(aliZMQmsg::iterator it);
68 : void alizmq_update_streamerlist(aliZMQrootStreamerInfo* streamers, const TObjArray* newStreamers);
69 :
70 : //checking identity of the frame via iterator
71 : int alizmq_msg_iter_check(aliZMQmsg::iterator it, const AliHLTDataTopic& topic);
72 : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const AliHLTDataTopic& topic);
73 : int alizmq_msg_iter_check_id(aliZMQmsg::iterator it, const std::string& topic);
74 : //helpers for accessing data via iterators
75 : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, std::string& topic);
76 : int alizmq_msg_iter_data(aliZMQmsg::iterator it, std::string& data);
77 : int alizmq_msg_iter_topic(aliZMQmsg::iterator it, AliHLTDataTopic& topic);
78 : int alizmq_msg_iter_data(aliZMQmsg::iterator it, TObject*& object);
79 :
80 : //string messages, no need to close, strings are copied
81 : int alizmq_msg_send(std::string topic, std::string data, void* socket, int flags);
82 : int alizmq_msg_recv(aliZMQmsgStr* message, void* socket, int flags);
83 :
84 : //send a single block (one header + payload), ZMQ_SNDMORE should not be used
85 : int alizmq_msg_send(const AliHLTDataTopic& topic, TObject* object, void* socket, int flags, int compression=0, aliZMQrootStreamerInfo* streamers=NULL);
86 : int alizmq_msg_send(const AliHLTDataTopic& topic, const std::string& data, void* socket, int flags);
87 :
88 : //deallocate an object - callback for ZMQ
89 : void alizmq_deleteTObject(void*, void* object);
90 : void alizmq_deleteTopic(void*, void* object);
91 :
92 : //simple zmq multi part message class
93 : //behaves like a map.
94 : //this is to simplify receiving/sending multipart msgs
95 : //and to handle message destruction automatically
96 : //keep it simple!
97 : //class AliZMQmsg {
98 : //public:
99 : // AliZMQmsg() {}
100 : // ~AliZMQmsg() {}
101 : // int Receive(void* socket) {return 0;}
102 : // int Send(void* socket) {return 0;}
103 : // void Add(zmq_msg_t* topic, zmq_msg_t* data) {}
104 : //
105 : // //define (delegate) iterators
106 : // typedef aliZMQmsg::iterator iterator;
107 : // typedef aliZMQmsg::const_iterator const_iterator;
108 : // iterator begin() { return fMessage.begin(); }
109 : // iterator end() { return fMessage.end(); }
110 : //private:
111 : // aliZMQmsg fMessage;
112 : //};
113 :
114 : //simple option parser class
115 : class AliOptionParser {
116 : public:
117 6 : AliOptionParser() {}
118 6 : virtual ~AliOptionParser() {}
119 : //implement this to process one option at a time
120 0 : virtual int ProcessOption(TString /*option*/, TString /*value*/) {return 0;}
121 :
122 : //call this to parse the args
123 : int ProcessOptionString(TString arguments);
124 0 : int ProcessOptionString(int argc, char** argv) { return ProcessOptionString(GetFullArgString(argc,argv)); }
125 :
126 : //convert argc/argv into a TString of options
127 : static TString GetFullArgString(int argc, char** argv);
128 : static aliStringVec* TokenizeOptionString(const TString str);
129 : };
130 :
131 : //a general utility to tokenize strings
132 : std::vector<std::string> TokenizeString(const std::string input, const std::string delimiters);
133 : //parse
134 : stringMap ParseParamString(const std::string paramString);
135 : std::string GetParamString(const std::string param, const std::string paramstring);
136 : #endif
|