Line data Source code
1 : // $Id$
2 :
3 : /** @file AliHLTMessage.cxx
4 : @author Matthias Richter (customization of Root TMessage )
5 : @date
6 : @brief Serialization of Root objects in the ALICE HLT. */
7 :
8 : // This is the original Root TMessage implementation with a few minor
9 : // modifications, original revision:
10 : // root/net: v5-14-00 $: TMessage.cxx,v 1.6 2004/05/07 09:51:58 brun
11 : // Author: Fons Rademakers 19/12/96
12 :
13 : /*************************************************************************
14 : * Copyright (C) 1995-2000, Rene Brun and Fons Rademakers. *
15 : * All rights reserved. *
16 : * *
17 : * For the licensing terms see $ROOTSYS/LICENSE. *
18 : * For the list of contributors see $ROOTSYS/README/CREDITS. *
19 : *************************************************************************/
20 :
21 : //////////////////////////////////////////////////////////////////////////
22 : // //
23 : // TMessage //
24 : // //
25 : // Message buffer class used for serializing objects and sending them //
26 : // over a network. This class inherits from TBuffer the basic I/O //
27 : // serializer. //
28 : // //
29 : //////////////////////////////////////////////////////////////////////////
30 :
31 : #include "AliHLTMessage.h"
32 : #include "TVirtualStreamerInfo.h"
33 : #include "Bytes.h"
34 : #include "TFile.h"
35 : #include "TProcessID.h"
36 : #include "TClass.h"
37 :
38 : extern "C" void R__zip (Int_t cxlevel, Int_t *nin, char *bufin, Int_t *lout, char *bufout, Int_t *nout);
39 : extern "C" void R__unzip(Int_t *nin, UChar_t *bufin, Int_t *lout, char *bufout, Int_t *nout);
40 : const Int_t kMAXBUF = 0xffffff;
41 :
42 : Bool_t AliHLTMessage::fgEvolution = kFALSE;
43 :
44 126 : ClassImp(AliHLTMessage)
45 :
46 : //______________________________________________________________________________
47 : AliHLTMessage::AliHLTMessage(UInt_t what)
48 : :
49 : # ifdef ROOT_TBufferFile
50 0 : TBufferFile(kWrite),
51 : # else
52 : TBuffer(kWrite),
53 : # endif
54 0 : AliHLTLogging(),
55 0 : fWhat(what),
56 0 : fClass(0),
57 0 : fCompress(0),
58 0 : fBufComp(0),
59 0 : fBufCompCur(0),
60 0 : fCompPos(0)
61 0 : , fBufUncompressed(0)
62 0 : , fBitsPIDs(0)
63 0 : , fInfos(NULL)
64 0 : , fEvolution(kFALSE)
65 0 : {
66 : // Create a AliHLTMessage object for storing objects. The "what" integer
67 : // describes the type of message. Predifined ROOT system message types
68 : // can be found in MessageTypes.h. Make sure your own message types are
69 : // unique from the ROOT defined message types (i.e. 0 - 10000 are
70 : // reserved by ROOT). In case you OR "what" with kMESS_ACK, the message
71 : // will wait for an acknowledgement from the remote side. This makes
72 : // the sending process synchronous. In case you OR "what" with kMESS_ZIP,
73 : // the message will be compressed in TSocket using the zip algorithm
74 : // (only if message is > 256 bytes).
75 :
76 : // space at the beginning of the message reserved for the message length
77 : UInt_t reserved = 0;
78 0 : *this << reserved;
79 :
80 0 : *this << what;
81 :
82 0 : SetBit(kCannotHandleMemberWiseStreaming);
83 0 : }
84 :
85 : const Int_t AliHLTMessage::fgkMinimumSize=30;
86 : UInt_t AliHLTMessage::fgkDefaultBuffer[2]={0,0};
87 :
88 : //______________________________________________________________________________
89 : AliHLTMessage::AliHLTMessage(void *buf, Int_t bufsize)
90 : :
91 : # if defined(ROOT_TBufferFile)
92 0 : TBufferFile(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
93 : # else
94 : TBuffer(kRead, bufsize>fgkMinimumSize?bufsize:sizeof(fgkDefaultBuffer), bufsize>fgkMinimumSize?buf:&fgkDefaultBuffer, 0),
95 : # endif
96 0 : AliHLTLogging(),
97 0 : fWhat(0),
98 0 : fClass(0),
99 0 : fCompress(0),
100 0 : fBufComp(0),
101 0 : fBufCompCur(0),
102 0 : fCompPos(0)
103 0 : , fBufUncompressed(0)
104 0 : , fBitsPIDs(0)
105 0 : , fInfos(NULL)
106 0 : , fEvolution(kFALSE)
107 0 : {
108 : // Create a AliHLTMessage object for reading objects. The objects will be
109 : // read from buf. Use the What() method to get the message type.
110 :
111 : // skip space at the beginning of the message reserved for the message length
112 0 : fBufCur += sizeof(UInt_t);
113 :
114 0 : *this >> fWhat;
115 :
116 0 : if (fWhat & kMESS_ZIP) {
117 : // if buffer has kMESS_ZIP set, move it to fBufComp and uncompress
118 0 : fBufComp = fBuffer;
119 0 : fBufCompCur = fBuffer + bufsize;
120 0 : fBuffer = 0;
121 0 : Uncompress();
122 : // Matthias Sep 2008
123 : // NOTE: this is not done in TMessage and will lead to the deletion
124 : // of the buffer. This is not allowed in case of HLT where the
125 : // buffer is handled by the framework. In general, I think this
126 : // is a very bad idea to do it like that in TMessage
127 0 : fBufComp = NULL;
128 0 : fBufCompCur = 0;
129 0 : }
130 :
131 0 : if (fWhat == kMESS_OBJECT) {
132 0 : InitMap();
133 0 : fClass = ReadClass(); // get first the class stored in message
134 0 : SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
135 0 : ResetMap();
136 : } else {
137 0 : fClass = 0;
138 : }
139 0 : }
140 :
141 : //______________________________________________________________________________
142 0 : AliHLTMessage::~AliHLTMessage()
143 0 : {
144 : // Clean up compression buffer.
145 0 : Reset();
146 0 : }
147 :
148 : //______________________________________________________________________________
149 : void AliHLTMessage::EnableSchemaEvolutionForAll(Bool_t enable)
150 : {
151 : // Static function enabling or disabling the automatic schema evolution.
152 : // By default schema evolution support is off.
153 :
154 0 : fgEvolution = enable;
155 0 : }
156 :
157 : //______________________________________________________________________________
158 : Bool_t AliHLTMessage::UsesSchemaEvolutionForAll()
159 : {
160 : // Static function returning status of global schema evolution.
161 :
162 0 : return fgEvolution;
163 : }
164 :
165 : //______________________________________________________________________________
166 : void AliHLTMessage::ForceWriteInfo(TVirtualStreamerInfo *info, Bool_t /* force */)
167 : {
168 : // Force writing the TStreamerInfo to the message.
169 :
170 0 : if (fgEvolution || fEvolution) {
171 0 : if (!fInfos) fInfos = new TObjArray();
172 0 : if (fInfos->FindObject(info->GetName())==NULL) {
173 0 : fInfos->Add(info);
174 0 : }
175 : }
176 0 : }
177 :
178 : //______________________________________________________________________________
179 : void AliHLTMessage::Forward()
180 : {
181 : // Change a buffer that was received into one that can be send, i.e.
182 : // forward a just received message.
183 :
184 0 : if (IsReading()) {
185 0 : SetWriteMode();
186 0 : SetBufferOffset(fBufSize);
187 0 : SetBit(kCannotHandleMemberWiseStreaming);
188 :
189 0 : if (fBufComp) {
190 0 : fCompPos = fBufCur;
191 0 : }
192 : }
193 0 : }
194 :
195 : //______________________________________________________________________________
196 : void AliHLTMessage::TagStreamerInfo(TVirtualStreamerInfo *info)
197 : {
198 : // Remember that the StreamerInfo is being used in writing.
199 :
200 0 : if (fgEvolution || fEvolution) {
201 0 : if (!fInfos) fInfos = new TObjArray();
202 0 : fInfos->Add(info);
203 0 : }
204 0 : }
205 :
206 : //______________________________________________________________________________
207 : void AliHLTMessage::IncrementLevel(TVirtualStreamerInfo *info)
208 : {
209 : // Increment level.
210 :
211 0 : TBufferFile::IncrementLevel(info);
212 :
213 0 : if (!info) return;
214 0 : if (fgEvolution || fEvolution) {
215 0 : if (!fInfos) fInfos = new TObjArray();
216 :
217 : // add the streamer info, but only once
218 : // this assumes that there is only one version
219 0 : if (fInfos->FindObject(info->GetName())==NULL) {
220 0 : fInfos->Add(info);
221 0 : }
222 : }
223 0 : }
224 :
225 : //______________________________________________________________________________
226 : void AliHLTMessage::Reset()
227 : {
228 : // Reset the message buffer so we can use (i.e. fill) it again.
229 :
230 0 : SetBufferOffset(sizeof(UInt_t) + sizeof(fWhat));
231 0 : ResetMap();
232 :
233 0 : if (fBufComp) {
234 0 : delete [] fBufComp;
235 0 : fBufComp = 0;
236 0 : fBufCompCur = 0;
237 0 : fCompPos = 0;
238 0 : }
239 0 : if (fBufUncompressed) {
240 0 : delete [] fBufUncompressed;
241 0 : fBufUncompressed=NULL;
242 0 : }
243 :
244 0 : delete fInfos; fInfos=NULL;
245 0 : }
246 :
247 : //______________________________________________________________________________
248 : void AliHLTMessage::SetLength() const
249 : {
250 : // Set the message length at the beginning of the message buffer.
251 :
252 0 : if (IsWriting()) {
253 0 : char *buf = Buffer();
254 0 : *((UInt_t*)buf) = (UInt_t)(Length() - sizeof(UInt_t));
255 :
256 0 : if (fBufComp) {
257 : buf = fBufComp;
258 0 : *((UInt_t*)buf) = (UInt_t)(CompLength() - sizeof(UInt_t));
259 0 : }
260 0 : }
261 0 : }
262 :
263 : //______________________________________________________________________________
264 : void AliHLTMessage::SetWhat(UInt_t what)
265 : {
266 : // Using this method one can change the message type a-posteriory.
267 : // In case you OR "what" with kMESS_ACK, the message will wait for
268 : // an acknowledgement from the remote side. This makes the sending
269 : // process synchronous.
270 :
271 0 : fWhat = what;
272 :
273 0 : char *buf = Buffer();
274 0 : buf += sizeof(UInt_t); // skip reserved length space
275 0 : tobuf(buf, what);
276 :
277 0 : if (fBufComp) {
278 : buf = fBufComp;
279 0 : buf += sizeof(UInt_t); // skip reserved length space
280 0 : tobuf(buf, what | kMESS_ZIP);
281 0 : }
282 0 : }
283 :
284 : //______________________________________________________________________________
285 : void AliHLTMessage::SetCompressionLevel(Int_t level)
286 : {
287 : // Set the message compression level. Can be between 0 and 9 with 0
288 : // being no compression and 9 maximum compression. In general the default
289 : // level of 1 is the best compromise between achieved compression and
290 : // cpu time. Compression will only happen when the message is > 256 bytes.
291 :
292 0 : if (level < 0) level = 0;
293 0 : if (level > 9) level = 9;
294 :
295 0 : if (level != fCompress && fBufComp) {
296 0 : delete [] fBufComp;
297 0 : fBufComp = 0;
298 0 : fBufCompCur = 0;
299 0 : fCompPos = 0;
300 0 : }
301 0 : fCompress = level;
302 0 : }
303 :
304 : //______________________________________________________________________________
305 : Int_t AliHLTMessage::Compress()
306 : {
307 : // Compress the message. The message will only be compressed if the
308 : // compression level > 0 and the if the message is > 256 bytes.
309 : // Returns -1 in case of error (when compression fails or
310 : // when the message increases in size in some pathological cases),
311 : // otherwise returns 0.
312 :
313 0 : if (fCompress == 0) {
314 : // no compression specified
315 0 : if (fBufComp) {
316 0 : delete [] fBufComp;
317 0 : fBufComp = 0;
318 0 : fBufCompCur = 0;
319 0 : fCompPos = 0;
320 0 : }
321 0 : return 0;
322 : }
323 :
324 0 : if (fBufComp && fCompPos == fBufCur) {
325 : // the message was already compressed
326 0 : return 0;
327 : }
328 :
329 : // remove any existing compressed buffer before compressing modified message
330 0 : if (fBufComp) {
331 0 : delete [] fBufComp;
332 0 : fBufComp = 0;
333 0 : fBufCompCur = 0;
334 0 : fCompPos = 0;
335 0 : }
336 :
337 0 : if (Length() <= (Int_t)(256 + 2*sizeof(UInt_t))) {
338 : // this message is too small to be compressed
339 0 : return 0;
340 : }
341 :
342 : Int_t hdrlen = 2*sizeof(UInt_t);
343 0 : Int_t messlen = Length() - hdrlen;
344 0 : Int_t nbuffers = messlen / kMAXBUF;
345 : Int_t chdrlen = 3*sizeof(UInt_t); // compressed buffer header length
346 0 : Int_t buflen = TMath::Max(512, chdrlen + messlen + 9*nbuffers);
347 0 : fBufComp = new char[buflen];
348 0 : char *messbuf = Buffer() + hdrlen;
349 0 : char *bufcur = fBufComp + chdrlen;
350 : Int_t noutot = 0;
351 : Int_t nzip = 0;
352 0 : Int_t nout, bufmax;
353 0 : for (Int_t i = 0; i <= nbuffers; i++) {
354 0 : if (i == nbuffers)
355 0 : bufmax = messlen - nzip;
356 : else
357 0 : bufmax = kMAXBUF;
358 0 : R__zip(fCompress, &bufmax, messbuf, &bufmax, bufcur, &nout);
359 0 : if (nout == 0 || nout >= messlen) {
360 : //this happens when the buffer cannot be compressed
361 0 : delete [] fBufComp;
362 0 : fBufComp = 0;
363 0 : fBufCompCur = 0;
364 0 : fCompPos = 0;
365 0 : return -1;
366 : }
367 0 : bufcur += nout;
368 0 : noutot += nout;
369 0 : messbuf += kMAXBUF;
370 0 : nzip += kMAXBUF;
371 : }
372 0 : fBufCompCur = bufcur;
373 0 : fCompPos = fBufCur;
374 :
375 0 : bufcur = fBufComp;
376 0 : tobuf(bufcur, (UInt_t)(CompLength() - sizeof(UInt_t)));
377 0 : Int_t what = fWhat | kMESS_ZIP;
378 0 : tobuf(bufcur, what);
379 0 : tobuf(bufcur, Length()); // original uncompressed buffer length
380 :
381 : return 0;
382 0 : }
383 :
384 : //______________________________________________________________________________
385 : Int_t AliHLTMessage::Uncompress()
386 : {
387 : // Uncompress the message. The message will only be uncompressed when
388 : // kMESS_ZIP is set. Returns -1 in case of error, 0 otherwise.
389 :
390 0 : if (!fBufComp || !(fWhat & kMESS_ZIP))
391 0 : return -1;
392 :
393 0 : Int_t buflen;
394 : Int_t hdrlen = 2*sizeof(UInt_t);
395 0 : char *bufcur1 = fBufComp + hdrlen;
396 0 : frombuf(bufcur1, &buflen);
397 0 : UChar_t *bufcur = (UChar_t*)bufcur1;
398 0 : fBuffer = new char[buflen];
399 0 : fBufUncompressed = fBuffer;
400 0 : fBufSize = buflen;
401 0 : fBufCur = fBuffer + sizeof(UInt_t) + sizeof(fWhat);
402 0 : fBufMax = fBuffer + fBufSize;
403 0 : char *messbuf = fBuffer + hdrlen;
404 :
405 0 : Int_t nin, nout, nbuf;
406 : Int_t noutot = 0;
407 0 : while (1) {
408 0 : nin = 9 + ((Int_t)bufcur[3] | ((Int_t)bufcur[4] << 8) | ((Int_t)bufcur[5] << 16));
409 0 : nbuf = (Int_t)bufcur[6] | ((Int_t)bufcur[7] << 8) | ((Int_t)bufcur[8] << 16);
410 0 : R__unzip(&nin, bufcur, &nbuf, messbuf, &nout);
411 0 : if (!nout) break;
412 0 : noutot += nout;
413 0 : if (noutot >= buflen - hdrlen) break;
414 0 : bufcur += nin;
415 0 : messbuf += nout;
416 : }
417 :
418 0 : fWhat &= ~kMESS_ZIP;
419 0 : fCompress = 1;
420 :
421 : return 0;
422 0 : }
423 :
424 : //______________________________________________________________________________
425 : void AliHLTMessage::WriteObject(const TObject *obj)
426 : {
427 : // Write object to message buffer.
428 : // When support for schema evolution is enabled the list of TStreamerInfo
429 : // used to stream this object is kept in fInfos. This information is used
430 : // by TSocket::Send that sends this list through the socket. This list is in
431 : // turn used by TSocket::Recv to store the TStreamerInfo objects in the
432 : // relevant TClass in case the TClass does not know yet about a particular
433 : // class version. This feature is implemented to support clients and servers
434 : // with either different ROOT versions or different user classes versions.
435 :
436 0 : if (fgEvolution || fEvolution) {
437 0 : if (fInfos)
438 0 : fInfos->Clear();
439 : else
440 0 : fInfos = new TObjArray();
441 : }
442 :
443 0 : fBitsPIDs.ResetAllBits();
444 0 : WriteObjectAny(obj, TObject::Class());
445 0 : }
446 :
447 : //______________________________________________________________________________
448 : UShort_t AliHLTMessage::WriteProcessID(TProcessID *pid)
449 : {
450 : // Check if the ProcessID pid is already in the message.
451 : // If not, then:
452 : // - mark bit 0 of fBitsPIDs to indicate that a ProcessID has been found
453 : // - mark bit uid+1 where uid id the uid of the ProcessID
454 :
455 0 : if (fBitsPIDs.TestBitNumber(0)) return 0;
456 0 : if (!pid)
457 0 : pid = TProcessID::GetPID();
458 0 : if (!pid) return 0;
459 0 : fBitsPIDs.SetBitNumber(0);
460 0 : UInt_t uid = pid->GetUniqueID();
461 0 : fBitsPIDs.SetBitNumber(uid+1);
462 : return 1;
463 0 : }
464 :
465 : AliHLTMessage* AliHLTMessage::Stream(TObject* pSrc, Int_t compression, unsigned verbosity, bool enableSchema)
466 : {
467 : /// Helper function to stream an object into an AliHLTMessage
468 : /// The returned instance must be cleaned by the caller
469 : ///
470 : /// Get the data and data size from the message:
471 : /// first check
472 : /// pMsg->CompLength();
473 : /// pMsg->CompBuffer();
474 : /// if that is NULL
475 : /// pMsg->Length();
476 : /// pMsg->Buffer();
477 : ///
478 : /// Note: accessing scheme will be change din the future to just have the two
479 : /// latter ones.
480 0 : if (!pSrc) return NULL;
481 :
482 0 : AliHLTLogging log;
483 0 : AliHLTMessage* pMsg=new AliHLTMessage(kMESS_OBJECT);
484 0 : if (!pMsg) {
485 0 : log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "memory allocation failed");
486 0 : return NULL;
487 : }
488 :
489 0 : pMsg->EnableSchemaEvolution(enableSchema);
490 0 : pMsg->SetCompressionLevel(compression);
491 0 : pMsg->WriteObject(pSrc);
492 0 : if (pMsg->Length()>0) {
493 : // Matthias Sep 2008
494 : // NOTE: AliHLTMessage does implement it's own SetLength method
495 : // which is not architecture independent. The original SetLength
496 : // stores the size always in network byte order.
497 : // I'm trying to remember the rational for that, might be that
498 : // it was just some lack of knowledge. Want to change this, but
499 : // has to be done carefully to be backward compatible.
500 0 : pMsg->SetLength(); // sets the length to the first (reserved) word
501 :
502 : // does nothing if the level is 0
503 0 : pMsg->Compress();
504 :
505 0 : if (pMsg->CompBuffer()) {
506 0 : pMsg->SetLength(); // set once more to have the byte order
507 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->CompLength());
508 : } else {
509 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Stream" , __FILE__ , __LINE__ , "object %p type %s streamed: size %d", pSrc, pSrc->GetName(), pMsg->Length());
510 : }
511 : }
512 0 : return pMsg;
513 0 : }
514 :
515 : TObject* AliHLTMessage::Extract(const void* pBuffer, unsigned bufferSize, unsigned verbosity)
516 : {
517 : /// Helper function to extract an object from a buffer.
518 : /// The returned object must be cleaned by the caller
519 0 : AliHLTLogging log;
520 0 : if (!pBuffer || bufferSize<sizeof(AliHLTUInt32_t)) {
521 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "invalid input buffer %p %d", pBuffer, bufferSize);
522 0 : return NULL;
523 : }
524 :
525 0 : AliHLTUInt32_t firstWord=*((AliHLTUInt32_t*)pBuffer);
526 0 : if (firstWord==bufferSize-sizeof(AliHLTUInt32_t) &&
527 0 : firstWord>=34 /*thats the minimum size of a streamed TObject*/) {
528 0 : AliHLTMessage msg((AliHLTUInt8_t*)pBuffer, bufferSize);
529 0 : TClass* objclass=msg.GetClass();
530 0 : TObject* pObject=msg.ReadObject(objclass);
531 0 : if (pObject && objclass) {
532 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogInfo, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "object %p type %s created", pObject, objclass->GetName());
533 0 : return pObject;
534 : } else {
535 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed to create object from buffer of size %d", bufferSize);
536 : }
537 0 : } else {
538 0 : if (verbosity>0) log.LoggingVarargs(kHLTLogWarning, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "not a streamed TObject: block size %d, indicated %d", bufferSize, firstWord+sizeof(AliHLTUInt32_t));
539 : }
540 0 : return NULL;
541 0 : }
542 :
543 : TObject* AliHLTMessage::Extract(const char* filename, unsigned verbosity)
544 : {
545 : /// Helper function to extract an object from a file containing the streamed object.
546 : /// The returned object must be cleaned by the caller
547 0 : if (!filename) return NULL;
548 :
549 0 : AliHLTLogging log;
550 0 : TString input=filename;
551 0 : input+="?filetype=raw";
552 0 : TFile* pFile=new TFile(input);
553 0 : if (!pFile) return NULL;
554 : TObject* pObject=NULL;
555 0 : if (!pFile->IsZombie()) {
556 0 : pFile->Seek(0);
557 0 : TArrayC buffer;
558 0 : buffer.Set(pFile->GetSize());
559 0 : if (pFile->ReadBuffer(buffer.GetArray(), buffer.GetSize())==0) {
560 0 : pObject=Extract(buffer.GetArray(), buffer.GetSize(), verbosity);
561 0 : } else {
562 0 : log.LoggingVarargs(kHLTLogError, "AliHLTMessage", "Extract" , __FILE__ , __LINE__ , "failed reading %d byte(s) from file %s", pFile->GetSize(), filename);
563 : }
564 0 : }
565 :
566 0 : delete pFile;
567 : return pObject;
568 0 : }
|