XRootD
XrdClOperations.hh
Go to the documentation of this file.
1 //------------------------------------------------------------------------------
2 // Copyright (c) 2011-2017 by European Organization for Nuclear Research (CERN)
3 // Author: Krzysztof Jamrog <krzysztof.piotr.jamrog@cern.ch>,
4 // Michal Simon <michal.simon@cern.ch>
5 //------------------------------------------------------------------------------
6 // This file is part of the XRootD software suite.
7 //
8 // XRootD is free software: you can redistribute it and/or modify
9 // it under the terms of the GNU Lesser General Public License as published by
10 // the Free Software Foundation, either version 3 of the License, or
11 // (at your option) any later version.
12 //
13 // XRootD is distributed in the hope that it will be useful,
14 // but WITHOUT ANY WARRANTY; without even the implied warranty of
15 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 // GNU General Public License for more details.
17 //
18 // You should have received a copy of the GNU Lesser General Public License
19 // along with XRootD. If not, see <http://www.gnu.org/licenses/>.
20 //
21 // In applying this licence, CERN does not waive the privileges and immunities
22 // granted to it by virtue of its status as an Intergovernmental Organization
23 // or submit itself to any jurisdiction.
24 //------------------------------------------------------------------------------
25 
26 #ifndef __XRD_CL_OPERATIONS_HH__
27 #define __XRD_CL_OPERATIONS_HH__
28 
29 #include <memory>
30 #include <stdexcept>
31 #include <sstream>
32 #include <tuple>
33 #include <future>
36 #include "XrdCl/XrdClArg.hh"
39 #include "XrdSys/XrdSysPthread.hh"
40 
42 #include "XrdCl/XrdClJobManager.hh"
43 #include "XrdCl/XrdClPostMaster.hh"
44 #include "XrdCl/XrdClDefaultEnv.hh"
45 
46 namespace XrdCl
47 {
48 
49  class Pipeline;
50  class PipelineHandler;
51 
52  //----------------------------------------------------------------------------
58  //----------------------------------------------------------------------------
59  template<bool HasHndl>
60  class Operation
61  {
62  // Declare friendship between templates
63  template<bool>
64  friend class Operation;
65 
66  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
67 
68  friend class Pipeline;
69  friend class PipelineHandler;
70 
71  public:
72 
73  //------------------------------------------------------------------------
75  //------------------------------------------------------------------------
76  Operation() : valid( true )
77  {
78  }
79 
80  //------------------------------------------------------------------------
82  //------------------------------------------------------------------------
83  template<bool from>
85  handler( std::move( op.handler ) ), valid( true )
86  {
87  if( !op.valid ) throw std::invalid_argument( "Cannot construct "
88  "Operation from an invalid Operation!" );
89  op.valid = false;
90  }
91 
92  //------------------------------------------------------------------------
94  //------------------------------------------------------------------------
95  virtual ~Operation()
96  {
97  }
98 
99  //------------------------------------------------------------------------
101  //------------------------------------------------------------------------
102  virtual std::string ToString() = 0;
103 
104  //------------------------------------------------------------------------
108  //------------------------------------------------------------------------
109  virtual Operation<HasHndl>* Move() = 0;
110 
111  //------------------------------------------------------------------------
116  //------------------------------------------------------------------------
117  virtual Operation<true>* ToHandled() = 0;
118 
119  protected:
120 
121  //------------------------------------------------------------------------
126  //------------------------------------------------------------------------
127  void Run( Timeout timeout,
128  std::promise<XRootDStatus> prms,
129  std::function<void(const XRootDStatus&)> final );
130 
131  //------------------------------------------------------------------------
137  //------------------------------------------------------------------------
138  virtual XRootDStatus RunImpl( PipelineHandler *handler, uint16_t timeout ) = 0;
139 
140  //------------------------------------------------------------------------
144  //------------------------------------------------------------------------
145  void AddOperation( Operation<true> *op );
146 
147  //------------------------------------------------------------------------
149  //------------------------------------------------------------------------
150  std::unique_ptr<PipelineHandler> handler;
151 ;
152  //------------------------------------------------------------------------
154  //------------------------------------------------------------------------
155  bool valid;
156  };
157 
158  //----------------------------------------------------------------------------
160  //----------------------------------------------------------------------------
161  typedef std::function<Operation<true>*(const XRootDStatus&)> rcvry_func;
162 
163  //----------------------------------------------------------------------------
166  //----------------------------------------------------------------------------
168  {
169  template<bool> friend class Operation;
170 
171  public:
172 
173  //------------------------------------------------------------------------
177  //------------------------------------------------------------------------
178  PipelineHandler( ResponseHandler *handler );
179 
180  //------------------------------------------------------------------------
182  //------------------------------------------------------------------------
184  {
185  }
186 
187  //------------------------------------------------------------------------
189  //------------------------------------------------------------------------
190  void HandleResponseWithHosts( XRootDStatus *status, AnyObject *response,
191  HostList *hostList );
192 
193  //------------------------------------------------------------------------
195  //------------------------------------------------------------------------
196  void HandleResponse( XRootDStatus *status, AnyObject *response );
197 
198  //------------------------------------------------------------------------
200  //------------------------------------------------------------------------
202  {
203  }
204 
205  //------------------------------------------------------------------------
209  //------------------------------------------------------------------------
210  void AddOperation( Operation<true> *operation );
211 
212  //------------------------------------------------------------------------
219  //------------------------------------------------------------------------
220  void Assign( const Timeout &timeout,
221  std::promise<XRootDStatus> prms,
222  std::function<void(const XRootDStatus&)> final,
223  Operation<true> *opr );
224 
225  //------------------------------------------------------------------------
227  //------------------------------------------------------------------------
228  void Assign( std::function<void(const XRootDStatus&)> final );
229 
230  //------------------------------------------------------------------------
232  //------------------------------------------------------------------------
233  void PreparePipelineStart();
234 
235  private:
236 
237  //------------------------------------------------------------------------
239  //------------------------------------------------------------------------
240  void HandleResponseImpl( XRootDStatus *status, AnyObject *response,
241  HostList *hostList = nullptr );
242 
243  inline void dealloc( XRootDStatus *status, AnyObject *response,
244  HostList *hostList )
245  {
246  delete status;
247  delete response;
248  delete hostList;
249  }
250 
251  //------------------------------------------------------------------------
253  //------------------------------------------------------------------------
254  std::unique_ptr<ResponseHandler> responseHandler;
255 
256  //------------------------------------------------------------------------
258  //------------------------------------------------------------------------
259  std::unique_ptr<Operation<true>> currentOperation;
260 
261  //------------------------------------------------------------------------
263  //------------------------------------------------------------------------
264  std::unique_ptr<Operation<true>> nextOperation;
265 
266  //------------------------------------------------------------------------
268  //------------------------------------------------------------------------
269  Timeout timeout;
270 
271  //------------------------------------------------------------------------
273  //------------------------------------------------------------------------
274  std::promise<XRootDStatus> prms;
275 
276  //------------------------------------------------------------------------
279  //------------------------------------------------------------------------
280  std::function<void(const XRootDStatus&)> final;
281  };
282 
283  //----------------------------------------------------------------------------
289  //----------------------------------------------------------------------------
290  class Pipeline
291  {
292  template<bool> friend class ParallelOperation;
293  friend std::future<XRootDStatus> Async( Pipeline, uint16_t );
294  friend class PipelineHandler;
295 
296  public:
297 
298  //------------------------------------------------------------------------
300  //------------------------------------------------------------------------
302  {
303  }
304 
305  //------------------------------------------------------------------------
307  //------------------------------------------------------------------------
309  operation( op->Move() )
310  {
311  }
312 
313  //------------------------------------------------------------------------
315  //------------------------------------------------------------------------
317  operation( op.Move() )
318  {
319  }
320 
321  //------------------------------------------------------------------------
323  //------------------------------------------------------------------------
325  operation( op.Move() )
326  {
327  }
328 
330  operation( op->ToHandled() )
331  {
332  }
333 
334  //------------------------------------------------------------------------
336  //------------------------------------------------------------------------
338  operation( op.ToHandled() )
339  {
340  }
341 
342  //------------------------------------------------------------------------
344  //------------------------------------------------------------------------
346  operation( op.ToHandled() )
347  {
348  }
349 
350  Pipeline( Pipeline &&pipe ) :
351  operation( std::move( pipe.operation ) )
352  {
353  }
354 
355  //------------------------------------------------------------------------
357  //------------------------------------------------------------------------
359  {
360  operation = std::move( pipe.operation );
361  return *this;
362  }
363 
364  //------------------------------------------------------------------------
366  //------------------------------------------------------------------------
368  {
369  operation->AddOperation( op.Move() );
370  return *this;
371  }
372 
373  //------------------------------------------------------------------------
375  //------------------------------------------------------------------------
377  {
378  operation->AddOperation( op.ToHandled() );
379  return *this;
380  }
381 
382  //------------------------------------------------------------------------
386  //------------------------------------------------------------------------
387  operator Operation<true>&()
388  {
389  if( !bool( operation ) ) throw std::logic_error( "Invalid pipeline." );
390  return *operation.get();
391  }
392 
393  //------------------------------------------------------------------------
397  //------------------------------------------------------------------------
398  operator bool()
399  {
400  return bool( operation );
401  }
402 
403  //------------------------------------------------------------------------
407  //------------------------------------------------------------------------
408  static void Stop( const XRootDStatus &status = XrdCl::XRootDStatus() );
409 
410  //------------------------------------------------------------------------
412  //------------------------------------------------------------------------
413  static void Repeat();
414 
415  //------------------------------------------------------------------------
417  //------------------------------------------------------------------------
418  static void Replace( Operation<false> &&opr );
419 
420  //------------------------------------------------------------------------
422  //------------------------------------------------------------------------
423  static void Replace( Pipeline p );
424 
425  //------------------------------------------------------------------------
427  //------------------------------------------------------------------------
428  static void Ignore();
429 
430  private:
431 
432  //------------------------------------------------------------------------
437  //------------------------------------------------------------------------
438  Operation<true>* operator->()
439  {
440  return operation.get();
441  }
442 
443  //------------------------------------------------------------------------
448  //------------------------------------------------------------------------
449  void Run( Timeout timeout, std::function<void(const XRootDStatus&)> final = nullptr )
450  {
451  if( ftr.valid() )
452  throw std::logic_error( "Pipeline is already running!" );
453 
454  // a promise that the pipe will have a result
455  std::promise<XRootDStatus> prms;
456  ftr = prms.get_future();
457 
458  if( !operation ) std::logic_error( "Empty pipeline!" );
459 
460  Operation<true> *opr = operation.release();
461  PipelineHandler *h = opr->handler.get();
462  if( h )
463  h->PreparePipelineStart();
464 
465  opr->Run( timeout, std::move( prms ), std::move( final ) );
466  }
467 
468  //------------------------------------------------------------------------
470  //------------------------------------------------------------------------
471  std::unique_ptr<Operation<true>> operation;
472 
473  //------------------------------------------------------------------------
475  //------------------------------------------------------------------------
476  std::future<XRootDStatus> ftr;
477 
478  };
479 
480  //----------------------------------------------------------------------------
487  //----------------------------------------------------------------------------
488  inline std::future<XRootDStatus> Async( Pipeline pipeline, uint16_t timeout = 0 )
489  {
490  pipeline.Run( timeout );
491  return std::move( pipeline.ftr );
492  }
493 
494  //----------------------------------------------------------------------------
502  //----------------------------------------------------------------------------
503  inline XRootDStatus WaitFor( Pipeline pipeline, uint16_t timeout = 0 )
504  {
505  return Async( std::move( pipeline ), timeout ).get();
506  }
507 
508  //----------------------------------------------------------------------------
515  //----------------------------------------------------------------------------
516  template<template<bool> class Derived, bool HasHndl, typename HdlrFactory, typename ... Args>
517  class ConcreteOperation: public Operation<HasHndl>
518  {
519  template<template<bool> class, bool, typename, typename ...>
520  friend class ConcreteOperation;
521 
522  public:
523 
524  //------------------------------------------------------------------------
528  //------------------------------------------------------------------------
529  ConcreteOperation( Args&&... args ) : args( std::tuple<Args...>( std::move( args )... ) ),
530  timeout( 0 )
531  {
532  static_assert( !HasHndl, "It is only possible to construct operation without handler" );
533  }
534 
535  //------------------------------------------------------------------------
541  //------------------------------------------------------------------------
542  template<bool from>
544  Operation<HasHndl>( std::move( op ) ), args( std::move( op.args ) ), timeout( 0 )
545  {
546  }
547 
548  //------------------------------------------------------------------------
556  //------------------------------------------------------------------------
557  template<typename Hdlr>
558  Derived<true> operator>>( Hdlr &&hdlr )
559  {
560  return this->StreamImpl( HdlrFactory::Create( hdlr ) );
561  }
562 
563  //------------------------------------------------------------------------
569  //------------------------------------------------------------------------
570  Derived<true> operator|( Operation<true> &op )
571  {
572  return PipeImpl( *this, op );
573  }
574 
575  //------------------------------------------------------------------------
581  //------------------------------------------------------------------------
582  Derived<true> operator|( Operation<true> &&op )
583  {
584  return PipeImpl( *this, op );
585  }
586 
587  //------------------------------------------------------------------------
593  //------------------------------------------------------------------------
594  Derived<true> operator|( Operation<false> &op )
595  {
596  return PipeImpl( *this, op );
597  }
598 
599  //------------------------------------------------------------------------
605  //------------------------------------------------------------------------
606  Derived<true> operator|( Operation<false> &&op )
607  {
608  return PipeImpl( *this, op );
609  }
610 
611  //------------------------------------------------------------------------
613  //------------------------------------------------------------------------
614  Derived<true> operator|( FinalOperation &&fo )
615  {
616  AllocHandler( *this );
617  this->handler->Assign( fo.final );
618  return this->template Transform<true>();
619  }
620 
621  //------------------------------------------------------------------------
625  //------------------------------------------------------------------------
627  {
628  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
629  return new Derived<HasHndl>( std::move( *me ) );
630  }
631 
632  //------------------------------------------------------------------------
636  //------------------------------------------------------------------------
638  {
639  this->handler.reset( new PipelineHandler() );
640  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
641  return new Derived<true>( std::move( *me ) );
642  }
643 
644  //------------------------------------------------------------------------
646  //------------------------------------------------------------------------
647  Derived<HasHndl> Timeout( uint16_t timeout )
648  {
649  this->timeout = timeout;
650  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
651  return std::move( *me );
652  }
653 
654  protected:
655 
656  //------------------------------------------------------------------------
660  //------------------------------------------------------------------------
661  template<bool to>
662  inline Derived<to> Transform()
663  {
664  Derived<HasHndl> *me = static_cast<Derived<HasHndl>*>( this );
665  return Derived<to>( std::move( *me ) );
666  }
667 
668  //------------------------------------------------------------------------
674  //------------------------------------------------------------------------
675  inline Derived<true> StreamImpl( ResponseHandler *handler )
676  {
677  static_assert( !HasHndl, "Operator >> is available only for operation without handler" );
678  this->handler.reset( new PipelineHandler( handler ) );
679  return Transform<true>();
680  }
681 
682  //------------------------------------------------------------------------
683  // Allocate handler if necessary
684  //------------------------------------------------------------------------
685  inline static
687  {
688  // nothing to do
689  }
690 
691  //------------------------------------------------------------------------
692  // Allocate handler if necessary
693  //------------------------------------------------------------------------
694  inline static
696  {
697  me.handler.reset( new PipelineHandler() );
698  }
699 
700  //------------------------------------------------------------------------
707  //------------------------------------------------------------------------
708  inline static
709  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
710  Args...> &me, Operation<true> &op )
711  {
712  AllocHandler( me ); // if HasHndl is false allocate handler
713  me.AddOperation( op.Move() );
714  return me.template Transform<true>();
715  }
716 
717  //------------------------------------------------------------------------
724  //------------------------------------------------------------------------
725  inline static
726  Derived<true> PipeImpl( ConcreteOperation<Derived, HasHndl, HdlrFactory,
727  Args...> &me, Operation<false> &op )
728  {
729  AllocHandler( me ); // if HasHndl is false allocate handler
730  me.AddOperation( op.ToHandled() );
731  return me.template Transform<true>();
732  }
733 
734  //------------------------------------------------------------------------
736  //------------------------------------------------------------------------
737  std::tuple<Args...> args;
738 
739  //------------------------------------------------------------------------
741  //------------------------------------------------------------------------
742  uint16_t timeout;
743  };
744 
745  // Out-of-line methods for class Operation
746 
747  template <bool HasHndl>
748  void Operation<HasHndl>::Run(Timeout timeout, std::promise<XRootDStatus> prms,
749  std::function<void(const XRootDStatus &)> f)
750  {
751  static_assert(HasHndl, "Only an operation that has a handler can be assigned to workflow");
752 
753  XRootDStatus st;
754  handler->Assign(timeout, std::move(prms), std::move(f), this);
755  PipelineHandler *h = handler.release();
756 
757  try {
758  st = RunImpl(h, timeout);
759  } catch (const operation_expired &ex) {
761  } catch (const PipelineException &ex) { // probably not needed
762  st = ex.GetError();
763  } catch (const std::exception &ex) {
764  st = XRootDStatus(stError, errInternal, 0, ex.what());
765  }
766 
767  if (!st.IsOK()) {
768  ResponseJob *job = new ResponseJob(h, new XRootDStatus(st), 0, nullptr);
770  }
771  }
772 
773  template <bool HasHndl>
775  {
776  if (handler)
777  handler->AddOperation(op);
778  }
779 }
780 
781 #endif // __XRD_CL_OPERATIONS_HH__
bool Create
static void AllocHandler(ConcreteOperation< Derived, true, HdlrFactory, Args... > &me)
static void AllocHandler(ConcreteOperation< Derived, false, HdlrFactory, Args... > &me)
std::tuple< Args... > args
Operation arguments.
ConcreteOperation(ConcreteOperation< Derived, from, HdlrFactory, Args... > &&op)
uint16_t timeout
Operation timeout.
ConcreteOperation(Args &&... args)
Operation< HasHndl > * Move()
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< false > &op)
Operation< true > * ToHandled()
Derived< true > operator|(FinalOperation &&fo)
Adds a final operation to the pipeline.
Derived< HasHndl > Timeout(uint16_t timeout)
Set operation timeout.
Derived< true > operator|(Operation< true > &&op)
Derived< true > StreamImpl(ResponseHandler *handler)
Derived< true > operator|(Operation< false > &op)
Derived< true > operator|(Operation< true > &op)
static Derived< true > PipeImpl(ConcreteOperation< Derived, HasHndl, HdlrFactory, Args... > &me, Operation< true > &op)
Derived< true > operator>>(Hdlr &&hdlr)
Derived< true > operator|(Operation< false > &&op)
static PostMaster * GetPostMaster()
Get default post master.
void QueueJob(Job *job, void *arg=0)
Add a job to be run.
virtual ~Operation()
Destructor.
friend class PipelineHandler
Operation()
Constructor.
void AddOperation(Operation< true > *op)
void Run(Timeout timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final)
bool valid
Flag indicating if it is a valid object.
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
virtual std::string ToString()=0
Name of the operation.
virtual XRootDStatus RunImpl(PipelineHandler *handler, uint16_t timeout)=0
std::unique_ptr< PipelineHandler > handler
Operation handler.
virtual Operation< true > * ToHandled()=0
virtual Operation< HasHndl > * Move()=0
Operation(Operation< from > &&op)
Move constructor between template instances.
Pipeline exception, wrapps an XRootDStatus.
void HandleResponseWithHosts(XRootDStatus *status, AnyObject *response, HostList *hostList)
Callback function.
PipelineHandler()
Default Constructor.
void PreparePipelineStart()
Called by a pipeline on the handler of its first operation before Run.
void Assign(const Timeout &timeout, std::promise< XRootDStatus > prms, std::function< void(const XRootDStatus &)> final, Operation< true > *opr)
void HandleResponse(XRootDStatus *status, AnyObject *response)
Callback function.
void AddOperation(Operation< true > *operation)
Pipeline(Operation< true > *op)
Constructor.
Pipeline(Operation< true > &&op)
Constructor.
static void Repeat()
Repeat current operation.
Pipeline(Operation< true > &op)
Constructor.
friend class PipelineHandler
Pipeline & operator=(Pipeline &&pipe)
Constructor.
Pipeline & operator|=(Operation< false > &&op)
Extend pipeline.
Pipeline(Pipeline &&pipe)
friend std::future< XRootDStatus > Async(Pipeline, uint16_t)
Pipeline(Operation< false > *op)
static void Stop(const XRootDStatus &status=XrdCl::XRootDStatus())
Pipeline(Operation< false > &&op)
Constructor.
Pipeline & operator|=(Operation< true > &&op)
Extend pipeline.
static void Replace(Operation< false > &&opr)
Replace current operation.
static void Ignore()
Ignore error and proceed with the pipeline.
Pipeline(Operation< false > &op)
Constructor.
Pipeline()
Default constructor.
JobManager * GetJobManager()
Get the job manager object user by the post master.
Handle an async response.
Call the user callback.
const uint16_t errOperationExpired
Definition: XrdClStatus.hh:90
const uint16_t stError
An error occurred that could potentially be retried.
Definition: XrdClStatus.hh:32
std::future< XRootDStatus > Async(Pipeline pipeline, uint16_t timeout=0)
std::function< Operation< true > *(const XRootDStatus &)> rcvry_func
Type of the recovery function to be provided by the user.
XRootDStatus WaitFor(Pipeline pipeline, uint16_t timeout=0)
std::vector< HostInfo > HostList
const uint16_t errInternal
Internal error.
Definition: XrdClStatus.hh:56
ssize_t Move(KernelBuffer &kbuff, char *&ubuff)
bool IsOK() const
We're fine.
Definition: XrdClStatus.hh:124