xrootd
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Macros
XrdClXRootDMsgHandler.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2014 by European Organization for Nuclear Research (CERN)
3 // Author: Lukasz Janyst <ljanyst@cern.ch>
4 //------------------------------------------------------------------------------
5 // This file is part of the XRootD software suite.
6 //
7 // XRootD is free software: you can redistribute it and/or modify
8 // it under the terms of the GNU Lesser General Public License as published by
9 // the Free Software Foundation, either version 3 of the License, or
10 // (at your option) any later version.
11 //
12 // XRootD is distributed in the hope that it will be useful,
13 // but WITHOUT ANY WARRANTY; without even the implied warranty of
14 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 // GNU General Public License for more details.
16 //
17 // You should have received a copy of the GNU Lesser General Public License
18 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
19 //
20 // In applying this licence, CERN does not waive the privileges and immunities
21 // granted to it by virtue of its status as an Intergovernmental Organization
22 // or submit itself to any jurisdiction.
23 //------------------------------------------------------------------------------
24 
25 #ifndef __XRD_CL_XROOTD_MSG_HANDLER_HH__
26 #define __XRD_CL_XROOTD_MSG_HANDLER_HH__
27 
30 #include "XrdCl/XrdClDefaultEnv.hh"
31 #include "XrdCl/XrdClMessage.hh"
32 #include "XProtocol/XProtocol.hh"
33 
34 #include <sys/uio.h>
35 
36 #include <list>
37 #include <memory>
38 
39 namespace XrdCl
40 {
41  class PostMaster;
42  class SIDManager;
43  class URL;
44  class LocalFileHandler;
45 
46  //----------------------------------------------------------------------------
47  // Single entry in the redirect-trace-back
48  //----------------------------------------------------------------------------
50  {
51  RedirectEntry( const URL &from, const URL &to ) : from( from ), to( to )
52  {
53 
54  }
55 
59 
60  std::string ToString( bool prevok = true )
61  {
62  const std::string tostr = to.GetLocation();
63  const std::string fromstr = from.GetLocation();
64 
65  if( prevok )
66  {
67  if( tostr == fromstr )
68  return "Retrying: " + tostr;
69  return "Redirected from: " + fromstr + " to: " + tostr;
70  }
71  return "Failed at: " + fromstr + ", retrying at: " + tostr;
72  }
73  };
74 
75  class XRootDMsgHandler;
76 
77  //----------------------------------------------------------------------------
78  // Counted reference to XRootDMsgHandler, to be used with WaitTask
79  //----------------------------------------------------------------------------
81  {
82  public:
83 
84  MsgHandlerRef( XRootDMsgHandler *handler) : ref( handler ), count( 1 )
85  {
86 
87  }
88 
90  {
91  return ref;
92  }
93 
94  operator bool() const
95  {
96  return ref;
97  }
98 
99  operator XrdSysMutex&()
100  {
101  return mtx;
102  }
103 
105  {
106  XrdSysMutexHelper lck( mtx );
107  ++count;
108  return *this;
109  }
110 
111  void Invalidate()
112  {
113  XrdSysMutexHelper lck( mtx );
114  ref = 0;
115  }
116 
117  void Free()
118  {
119  XrdSysMutexHelper lck( mtx );
120  --count;
121  if( count == 0 )
122  {
123  lck.UnLock();
124  delete this;
125  }
126  }
127 
128  private:
129 
132  uint16_t count;
133  };
134 
135  //----------------------------------------------------------------------------
137  //----------------------------------------------------------------------------
139  public OutgoingMsgHandler
140  {
141  friend class HandleRspJob;
142 
143  public:
144  //------------------------------------------------------------------------
153  //------------------------------------------------------------------------
155  ResponseHandler *respHandler,
156  const URL *url,
157  SIDManager *sidMgr,
158  LocalFileHandler *lFileHandler):
159  pRequest( msg ),
160  pResponse( 0 ),
161  pResponseHandler( respHandler ),
162  pUrl( *url ),
163  pSidMgr( sidMgr ),
164  pLFileHandler( lFileHandler ),
165  pExpiration( 0 ),
166  pRedirectAsAnswer( false ),
167  pHosts( 0 ),
168  pHasLoadBalancer( false ),
169  pHasSessionId( false ),
170  pChunkList( 0 ),
171  pRedirectCounter( 0 ),
172 
173  pAsyncOffset( 0 ),
174  pAsyncReadSize( 0 ),
175  pAsyncReadBuffer( 0 ),
176  pAsyncMsgSize( 0 ),
177 
178  pReadRawStarted( false ),
180 
181  pReadVRawMsgOffset( 0 ),
182  pReadVRawChunkHeaderDone( false ),
184  pReadVRawSizeError( false ),
185  pReadVRawChunkIndex( 0 ),
186  pReadVRawMsgDiscard( false ),
187 
188  pOtherRawStarted( false ),
189 
190  pFollowMetalink( false ),
191 
192  pStateful( false ),
193 
194  pAggregatedWaitTime( 0 ),
195 
196  pMsgInFly( false ),
197 
198  pRef( new MsgHandlerRef( this ) )
199  {
201  if( msg->GetSessionId() )
202  pHasSessionId = true;
203  memset( &pReadVRawChunkHeader, 0, sizeof( readahead_list ) );
204  }
205 
206  //------------------------------------------------------------------------
208  //------------------------------------------------------------------------
210  {
211  pRef->Free();
212 
214 
215  if( !pHasSessionId )
216  delete pRequest;
217  delete pResponse;
218  std::vector<Message *>::iterator it;
219  for( it = pPartialResps.begin(); it != pPartialResps.end(); ++it )
220  delete *it;
221  }
222 
223  //------------------------------------------------------------------------
229  //------------------------------------------------------------------------
230  virtual uint16_t Examine( Message *msg );
231 
232  //------------------------------------------------------------------------
236  //------------------------------------------------------------------------
237  virtual uint16_t GetSid() const;
238 
239  //------------------------------------------------------------------------
243  //------------------------------------------------------------------------
244  virtual void Process( Message *msg );
245 
246  //------------------------------------------------------------------------
256  //------------------------------------------------------------------------
257  virtual Status ReadMessageBody( Message *msg,
258  int socket,
259  uint32_t &bytesRead );
260 
261  //------------------------------------------------------------------------
267  //------------------------------------------------------------------------
268  virtual uint8_t OnStreamEvent( StreamEvent event,
269  uint16_t streamNum,
270  Status status );
271 
272  //------------------------------------------------------------------------
274  //------------------------------------------------------------------------
275  virtual void OnStatusReady( const Message *message,
276  Status status );
277 
278  //------------------------------------------------------------------------
280  //------------------------------------------------------------------------
281  virtual bool IsRaw() const;
282 
283  //------------------------------------------------------------------------
292  //------------------------------------------------------------------------
293  Status WriteMessageBody( int socket,
294  uint32_t &bytesRead );
295 
296  //------------------------------------------------------------------------
301  //------------------------------------------------------------------------
302  ChunkList* GetMessageBody( uint32_t *&asyncOffset )
303  {
304  asyncOffset = &pAsyncOffset;
305  return pChunkList;
306  }
307 
308  //------------------------------------------------------------------------
312  //------------------------------------------------------------------------
313  void WaitDone( time_t now );
314 
315  //------------------------------------------------------------------------
317  //------------------------------------------------------------------------
318  void SetExpiration( time_t expiration )
319  {
320  pExpiration = expiration;
321  }
322 
323  //------------------------------------------------------------------------
326  //------------------------------------------------------------------------
327  void SetRedirectAsAnswer( bool redirectAsAnswer )
328  {
329  pRedirectAsAnswer = redirectAsAnswer;
330  }
331 
332  //------------------------------------------------------------------------
334  //------------------------------------------------------------------------
335  const Message *GetRequest() const
336  {
337  return pRequest;
338  }
339 
340  //------------------------------------------------------------------------
342  //------------------------------------------------------------------------
343  void SetLoadBalancer( const HostInfo &loadBalancer )
344  {
345  if( !loadBalancer.url.IsValid() )
346  return;
347  pLoadBalancer = loadBalancer;
348  pHasLoadBalancer = true;
349  }
350 
351  //------------------------------------------------------------------------
353  //------------------------------------------------------------------------
354  void SetHostList( HostList *hostList )
355  {
356  delete pHosts;
357  pHosts = hostList;
358  }
359 
360  //------------------------------------------------------------------------
362  //------------------------------------------------------------------------
363  void SetChunkList( ChunkList *chunkList )
364  {
365  pChunkList = chunkList;
366  if( chunkList )
367  pChunkStatus.resize( chunkList->size() );
368  else
369  pChunkStatus.clear();
370  }
371 
372  //------------------------------------------------------------------------
374  //------------------------------------------------------------------------
375  void SetRedirectCounter( uint16_t redirectCounter )
376  {
377  pRedirectCounter = redirectCounter;
378  }
379 
380  void SetFollowMetalink( bool followMetalink )
381  {
382  pFollowMetalink = followMetalink;
383  }
384 
385  void SetStateful( bool stateful )
386  {
387  pStateful = stateful;
388  }
389 
390  private:
391  //------------------------------------------------------------------------
393  //------------------------------------------------------------------------
394  Status ReadRawRead( Message *msg,
395  int socket,
396  uint32_t &bytesRead );
397 
398  //------------------------------------------------------------------------
400  //------------------------------------------------------------------------
402  int socket,
403  uint32_t &bytesRead );
404 
405  //------------------------------------------------------------------------
407  //------------------------------------------------------------------------
409  int socket,
410  uint32_t &bytesRead );
411 
412  //------------------------------------------------------------------------
415  //------------------------------------------------------------------------
416  Status ReadAsync( int socket, uint32_t &btesRead );
417 
418  //------------------------------------------------------------------------
420  //------------------------------------------------------------------------
421  void HandleError( Status status, Message *msg = 0 );
422 
423  //------------------------------------------------------------------------
425  //------------------------------------------------------------------------
426  Status RetryAtServer( const URL &url );
427 
428  //------------------------------------------------------------------------
430  //------------------------------------------------------------------------
431  void HandleResponse();
432 
433  //------------------------------------------------------------------------
435  //------------------------------------------------------------------------
437 
438  //------------------------------------------------------------------------
441  //------------------------------------------------------------------------
442  Status ParseResponse( AnyObject *&response );
443 
444  //------------------------------------------------------------------------
447  //------------------------------------------------------------------------
448  Status RewriteRequestRedirect( const URL &newUrl );
449 
450  //------------------------------------------------------------------------
452  //------------------------------------------------------------------------
454 
455  //------------------------------------------------------------------------
457  //------------------------------------------------------------------------
458  Status PostProcessReadV( VectorReadInfo *vReadInfo );
459 
460  //------------------------------------------------------------------------
462  //------------------------------------------------------------------------
464 
465  //------------------------------------------------------------------------
467  //------------------------------------------------------------------------
468  void UpdateTriedCGI(uint32_t errNo=0);
469 
470  //------------------------------------------------------------------------
472  //------------------------------------------------------------------------
473  void SwitchOnRefreshFlag();
474 
475  //------------------------------------------------------------------------
478  //------------------------------------------------------------------------
479  void HandleRspOrQueue();
480 
481  //------------------------------------------------------------------------
483  //------------------------------------------------------------------------
484  void HandleLocalRedirect( URL *url );
485 
486  //------------------------------------------------------------------------
491  //------------------------------------------------------------------------
492  bool IsRetryable( Message *request );
493 
494  //------------------------------------------------------------------------
501  //------------------------------------------------------------------------
502  bool OmitWait( Message *request, const URL &url );
503 
504  //------------------------------------------------------------------------
506  //------------------------------------------------------------------------
507  void DumpRedirectTraceBack();
508 
509  //------------------------------------------------------------------------
510  // Helper struct for async reading of chunks
511  //------------------------------------------------------------------------
512  struct ChunkStatus
513  {
514  ChunkStatus(): sizeError( false ), done( false ) {}
515  bool sizeError;
516  bool done;
517  };
518 
519  typedef std::list<std::unique_ptr<RedirectEntry>> RedirectTraceBack;
520 
523  std::vector<Message *> pPartialResps;
531  time_t pExpiration;
537  std::string pRedirectUrl;
539  std::vector<ChunkStatus> pChunkStatus;
541 
542  uint32_t pAsyncOffset;
543  uint32_t pAsyncReadSize;
545  uint32_t pAsyncMsgSize;
546 
549 
557 
559 
561 
562  bool pStateful;
564 
565  std::unique_ptr<RedirectEntry> pRdirEntry;
567 
568  bool pMsgInFly;
569 
570  //------------------------------------------------------------------------
571  // (Counted) Reference to myself - passed to WaitTask
572  //------------------------------------------------------------------------
574  };
575 }
576 
577 #endif // __XRD_CL_XROOTD_MSG_HANDLER_HH__
bool pOtherRawStarted
Definition: XrdClXRootDMsgHandler.hh:558
std::vector< Message * > pPartialResps
Definition: XrdClXRootDMsgHandler.hh:523
Definition: XrdClAnyObject.hh:32
URL to
Definition: XrdClXRootDMsgHandler.hh:57
void UpdateTriedCGI(uint32_t errNo=0)
Update the &quot;tried=&quot; part of the CGI of the current message.
std::vector< ChunkInfo > ChunkList
List of chunks.
Definition: XrdClXRootDResponses.hh:769
PostMaster * pPostMaster
Definition: XrdClXRootDMsgHandler.hh:526
virtual bool IsRaw() const
Are we a raw writer or not?
Status RewriteRequestWait()
Some requests need to be rewritten also after getting kXR_wait - sigh.
Definition: XProtocol.hh:658
The message representation used throughout the system.
Definition: XrdClMessage.hh:29
std::string GetLocation() const
Get location (protocol://host:port/path)
XRootDMsgHandler * ref
Definition: XrdClXRootDMsgHandler.hh:131
ChunkStatus()
Definition: XrdClXRootDMsgHandler.hh:514
bool IsValid() const
Is the url valid.
Definition: XrdClXRootDMsgHandler.hh:80
void SetRedirectCounter(uint16_t redirectCounter)
Set the redirect counter.
Definition: XrdClXRootDMsgHandler.hh:375
bool pReadRawStarted
Definition: XrdClXRootDMsgHandler.hh:547
MsgHandlerRef * pRef
Definition: XrdClXRootDMsgHandler.hh:573
~XRootDMsgHandler()
Destructor.
Definition: XrdClXRootDMsgHandler.hh:209
void SetFollowMetalink(bool followMetalink)
Definition: XrdClXRootDMsgHandler.hh:380
RedirectTraceBack pRedirectTraceBack
Definition: XrdClXRootDMsgHandler.hh:566
URL from
Definition: XrdClXRootDMsgHandler.hh:56
LocalFileHandler * pLFileHandler
Definition: XrdClXRootDMsgHandler.hh:528
std::string ToString(bool prevok=true)
Definition: XrdClXRootDMsgHandler.hh:60
XrdSysMutex mtx
Definition: XrdClXRootDMsgHandler.hh:130
void Free()
Definition: XrdClXRootDMsgHandler.hh:117
std::vector< ChunkStatus > pChunkStatus
Definition: XrdClXRootDMsgHandler.hh:539
Definition: XrdClXRootDMsgHandler.hh:512
MsgHandlerRef & Self()
Definition: XrdClXRootDMsgHandler.hh:104
void UnLock()
Definition: XrdSysPthread.hh:272
uint32_t pReadVRawMsgOffset
Definition: XrdClXRootDMsgHandler.hh:550
friend class HandleRspJob
Definition: XrdClXRootDMsgHandler.hh:141
Status RetryAtServer(const URL &url)
Retry the request at another server.
bool pReadVRawChunkHeaderDone
Definition: XrdClXRootDMsgHandler.hh:551
Status ParseResponse(AnyObject *&response)
StreamEvent
Events that may have occurred to the stream.
Definition: XrdClPostMasterInterfaces.hh:91
virtual uint16_t GetSid() const
bool pFollowMetalink
Definition: XrdClXRootDMsgHandler.hh:560
bool done
Definition: XrdClXRootDMsgHandler.hh:516
Procedure execution status.
Definition: XrdClStatus.hh:109
Status RewriteRequestRedirect(const URL &newUrl)
URL pUrl
Definition: XrdClXRootDMsgHandler.hh:525
virtual void Process(Message *msg)
Message * pRequest
Definition: XrdClXRootDMsgHandler.hh:521
Status UnPackReadVResponse(Message *msg)
Unpack a single readv response.
time_t pExpiration
Definition: XrdClXRootDMsgHandler.hh:531
void SetExpiration(time_t expiration)
Set a timestamp after which we give up.
Definition: XrdClXRootDMsgHandler.hh:318
uint16_t pRedirectCounter
Definition: XrdClXRootDMsgHandler.hh:540
Definition: XrdSysPthread.hh:165
uint16_t count
Definition: XrdClXRootDMsgHandler.hh:132
XRootDMsgHandler(Message *msg, ResponseHandler *respHandler, const URL *url, SIDManager *sidMgr, LocalFileHandler *lFileHandler)
Definition: XrdClXRootDMsgHandler.hh:154
bool pHasSessionId
Definition: XrdClXRootDMsgHandler.hh:536
ChunkList * pChunkList
Definition: XrdClXRootDMsgHandler.hh:538
Definition: XrdClXRootDResponses.hh:822
char * pAsyncReadBuffer
Definition: XrdClXRootDMsgHandler.hh:544
Status ReadRawReadV(Message *msg, int socket, uint32_t &bytesRead)
Handle a kXR_readv in raw mode.
void HandleError(Status status, Message *msg=0)
Recover error.
std::vector< HostInfo > HostList
Definition: XrdClXRootDResponses.hh:834
virtual uint16_t Examine(Message *msg)
SIDManager * pSidMgr
Definition: XrdClXRootDMsgHandler.hh:527
Status ReadRawOther(Message *msg, int socket, uint32_t &bytesRead)
Handle anything other than kXR_read and kXR_readv in raw mode.
virtual Status ReadMessageBody(Message *msg, int socket, uint32_t &bytesRead)
std::list< std::unique_ptr< RedirectEntry > > RedirectTraceBack
Definition: XrdClXRootDMsgHandler.hh:519
Status WriteMessageBody(int socket, uint32_t &bytesRead)
bool pReadVRawMsgDiscard
Definition: XrdClXRootDMsgHandler.hh:556
Message * pResponse
Definition: XrdClXRootDMsgHandler.hh:522
virtual uint8_t OnStreamEvent(StreamEvent event, uint16_t streamNum, Status status)
XRootDStatus status
Definition: XrdClXRootDMsgHandler.hh:58
Request status.
Definition: XrdClXRootDResponses.hh:212
Handle XRootD stream IDs.
Definition: XrdClSIDManager.hh:33
Status PostProcessReadV(VectorReadInfo *vReadInfo)
Post process vector read.
Message handler.
Definition: XrdClPostMasterInterfaces.hh:68
int pAggregatedWaitTime
Definition: XrdClXRootDMsgHandler.hh:563
ChunkList * GetMessageBody(uint32_t *&asyncOffset)
Definition: XrdClXRootDMsgHandler.hh:302
XRootDMsgHandler * operator->()
Definition: XrdClXRootDMsgHandler.hh:89
void SwitchOnRefreshFlag()
Switch on the refresh flag for some requests.
void SetChunkList(ChunkList *chunkList)
Set the chunk list.
Definition: XrdClXRootDMsgHandler.hh:363
uint64_t GetSessionId() const
Get the session ID the message is meant for.
Definition: XrdClMessage.hh:90
void SetRedirectAsAnswer(bool redirectAsAnswer)
Definition: XrdClXRootDMsgHandler.hh:327
std::string pRedirectUrl
Definition: XrdClXRootDMsgHandler.hh:537
URL url
URL of the host.
Definition: XrdClXRootDResponses.hh:831
bool sizeError
Definition: XrdClXRootDMsgHandler.hh:515
bool pRedirectAsAnswer
Definition: XrdClXRootDMsgHandler.hh:532
bool pStateful
Definition: XrdClXRootDMsgHandler.hh:562
void HandleLocalRedirect(URL *url)
Handle a redirect to a local file.
Vector read info.
Definition: XrdClXRootDResponses.hh:774
const Message * GetRequest() const
Get the request pointer.
Definition: XrdClXRootDMsgHandler.hh:335
Status ReadAsync(int socket, uint32_t &btesRead)
Handle an async response.
Definition: XrdClXRootDResponses.hh:839
void SetStateful(bool stateful)
Definition: XrdClXRootDMsgHandler.hh:385
Definition: XrdClLocalFileHandler.hh:32
uint32_t pAsyncMsgSize
Definition: XrdClXRootDMsgHandler.hh:545
uint32_t pAsyncReadSize
Definition: XrdClXRootDMsgHandler.hh:543
URL representation.
Definition: XrdClURL.hh:30
RedirectEntry(const URL &from, const URL &to)
Definition: XrdClXRootDMsgHandler.hh:51
uint32_t pAsyncOffset
Definition: XrdClXRootDMsgHandler.hh:542
bool OmitWait(Message *request, const URL &url)
Status ReadRawRead(Message *msg, int socket, uint32_t &bytesRead)
Handle a kXR_read in raw mode.
bool pReadVRawSizeError
Definition: XrdClXRootDMsgHandler.hh:553
std::unique_ptr< RedirectEntry > pRdirEntry
Definition: XrdClXRootDMsgHandler.hh:565
HostList * pHosts
Definition: XrdClXRootDMsgHandler.hh:533
int32_t pReadVRawChunkIndex
Definition: XrdClXRootDMsgHandler.hh:554
void WaitDone(time_t now)
XRootDStatus * ProcessStatus()
Extract the status information from the stuff that we got.
Status pLastError
Definition: XrdClXRootDMsgHandler.hh:530
uint32_t pReadRawCurrentOffset
Definition: XrdClXRootDMsgHandler.hh:548
Message status handler.
Definition: XrdClPostMasterInterfaces.hh:167
A hub for dispatching and receiving messages.
Definition: XrdClPostMaster.hh:42
bool pReadVRawChunkHeaderStarted
Definition: XrdClXRootDMsgHandler.hh:552
HostInfo pLoadBalancer
Definition: XrdClXRootDMsgHandler.hh:535
static PostMaster * GetPostMaster()
Get default post master.
MsgHandlerRef(XRootDMsgHandler *handler)
Definition: XrdClXRootDMsgHandler.hh:84
Handle/Process/Forward XRootD messages.
Definition: XrdClXRootDMsgHandler.hh:138
virtual void OnStatusReady(const Message *message, Status status)
The requested action has been performed and the status is available.
void HandleResponse()
Unpack the message and call the response handler.
bool pHasLoadBalancer
Definition: XrdClXRootDMsgHandler.hh:534
ResponseHandler * pResponseHandler
Definition: XrdClXRootDMsgHandler.hh:524
Definition: XrdClXRootDMsgHandler.hh:49
Status pStatus
Definition: XrdClXRootDMsgHandler.hh:529
bool pMsgInFly
Definition: XrdClXRootDMsgHandler.hh:568
Definition: XrdSysPthread.hh:260
void DumpRedirectTraceBack()
Dump the redirect-trace-back into the log file.
bool IsRetryable(Message *request)
void SetLoadBalancer(const HostInfo &loadBalancer)
Set the load balancer.
Definition: XrdClXRootDMsgHandler.hh:343
void Invalidate()
Definition: XrdClXRootDMsgHandler.hh:111
readahead_list pReadVRawChunkHeader
Definition: XrdClXRootDMsgHandler.hh:555
void SetHostList(HostList *hostList)
Set host list.
Definition: XrdClXRootDMsgHandler.hh:354