XRootD
XrdLinkXeq.cc
Go to the documentation of this file.
1 /******************************************************************************/
2 /* */
3 /* X r d L i n k X e q . c c */
4 /* */
5 /* (c) 2018 by the Board of Trustees of the Leland Stanford, Jr., University */
6 /* Produced by Andrew Hanushevsky for Stanford University under contract */
7 /* DE-AC02-76-SFO0515 with the Department of Energy */
8 /* */
9 /* This file is part of the XRootD software suite. */
10 /* */
11 /* XRootD is free software: you can redistribute it and/or modify it under */
12 /* the terms of the GNU Lesser General Public License as published by the */
13 /* Free Software Foundation, either version 3 of the License, or (at your */
14 /* option) any later version. */
15 /* */
16 /* XRootD is distributed in the hope that it will be useful, but WITHOUT */
17 /* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or */
18 /* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public */
19 /* License for more details. */
20 /* */
21 /* You should have received a copy of the GNU Lesser General Public License */
22 /* along with XRootD in a file called COPYING.LESSER (LGPL license) and file */
23 /* COPYING (GPL license). If not, see <http://www.gnu.org/licenses/>. */
24 /* */
25 /* The copyright holder's institutional names and contributor's names may not */
26 /* be used to endorse or promote products derived from this software without */
27 /* specific prior written permission of the institution or contributor. */
28 /******************************************************************************/
29 
30 #include <limits.h>
31 #include <poll.h>
32 #include <signal.h>
33 #include <cstdio>
34 #include <cstring>
35 #include <unistd.h>
36 #include <sys/types.h>
37 #include <sys/uio.h>
38 
39 #if defined(__linux__) || defined(__GNU__)
40 #include <netinet/tcp.h>
41 #if !defined(TCP_CORK)
42 #undef HAVE_SENDFILE
43 #endif
44 #endif
45 
46 #ifdef HAVE_SENDFILE
47 
48 #if defined(__solaris__) || defined(__linux__) || defined(__GNU__)
49 #include <sys/sendfile.h>
50 #endif
51 
52 #endif
53 
54 #include "XrdSys/XrdSysAtomics.hh"
55 #include "XrdSys/XrdSysError.hh"
56 #include "XrdSys/XrdSysFD.hh"
57 #include "XrdSys/XrdSysPlatform.hh"
58 
59 #include "Xrd/XrdBuffer.hh"
60 #include "Xrd/XrdLink.hh"
61 #include "Xrd/XrdLinkCtl.hh"
62 #include "Xrd/XrdLinkXeq.hh"
63 #include "Xrd/XrdPoll.hh"
64 #include "Xrd/XrdScheduler.hh"
65 #include "Xrd/XrdSendQ.hh"
66 #include "Xrd/XrdTcpMonPin.hh"
67 
68 #define TRACE_IDENT ID
69 #include "Xrd/XrdTrace.hh"
70 
71 /******************************************************************************/
72 /* G l o b a l s */
73 /******************************************************************************/
74 
75 namespace XrdGlobal
76 {
77 extern XrdSysError Log;
78 extern XrdScheduler Sched;
79 extern XrdTlsContext *tlsCtx;
81 extern int devNull;
82  const int maxIOV = XrdSys::getIovMax();
83 };
84 
85 using namespace XrdGlobal;
86 
87 /******************************************************************************/
88 /* S t a t i c s */
89 /******************************************************************************/
90 
91  const char *XrdLinkXeq::TraceID = "LinkXeq";
92 
93  long long XrdLinkXeq::LinkBytesIn = 0;
94  long long XrdLinkXeq::LinkBytesOut = 0;
95  long long XrdLinkXeq::LinkConTime = 0;
96  long long XrdLinkXeq::LinkCountTot = 0;
97  int XrdLinkXeq::LinkCount = 0;
100  int XrdLinkXeq::LinkStalls = 0;
101  int XrdLinkXeq::LinkSfIntr = 0;
103 
104 /******************************************************************************/
105 /* C o n s t r u c t o r */
106 /******************************************************************************/
107 
108 XrdLinkXeq::XrdLinkXeq() : XrdLink(*this), PollInfo((XrdLink &)*this)
109 {
111 }
112 
114 {
115  memcpy(Uname+sizeof(Uname)-7, "anon.0@", 7);
116  strcpy(Lname, "somewhere");
117  ID = &Uname[sizeof(Uname)-5];
118  Comment = ID;
119  sendQ = 0;
120  stallCnt = stallCntTot = 0;
121  tardyCnt = tardyCntTot = 0;
122  SfIntr = 0;
123  isIdle = 0;
125  LockReads= false;
126  KeepFD = false;
127  Protocol = 0;
128  ProtoAlt = 0;
129 
130  LinkInfo.Reset();
131  PollInfo.Zorch();
132  ResetLink();
133 }
134 
135 /******************************************************************************/
136 /* B a c k l o g */
137 /******************************************************************************/
138 
140 {
142 
143 // Return backlog information
144 //
145  return (sendQ ? sendQ->Backlog() : 0);
146 }
147 
148 /******************************************************************************/
149 /* C l i e n t */
150 /******************************************************************************/
151 
152 int XrdLinkXeq::Client(char *nbuf, int nbsz)
153 {
154  int ulen;
155 
156 // Generate full client name
157 //
158  if (nbsz <= 0) return 0;
159  ulen = (Lname - ID);
160  if ((ulen + HNlen) >= nbsz) ulen = 0;
161  else {strncpy(nbuf, ID, ulen);
162  strcpy(nbuf+ulen, HostName);
163  ulen += HNlen;
164  }
165  return ulen;
166 }
167 
168 /******************************************************************************/
169 /* C l o s e */
170 /******************************************************************************/
171 
172 int XrdLinkXeq::Close(bool defer)
174  int csec, fd, rc = 0;
175 
176 // If a defer close is requested, we can close the descriptor but we must
177 // keep the slot number to prevent a new client getting the same fd number.
178 // Linux is peculiar in that any in-progress operations will remain in that
179 // state even after the FD is closed unless there is some activity either on
180 // the connection or an event occurs that causes an operation restart. We
181 // portably solve this problem by issuing a shutdown() on the socket prior
182 // closing it. On most platforms, this informs readers that the connection is
183 // gone (though not on old (i.e. <= 2.3) versions of Linux, sigh). Also, if
184 // nonblocking mode is enabled, we need to do this in a separate thread as
185 // a shutdown may block for a pretty long time if lots\ of messages are queued.
186 // We will ask the SendQ object to schedule the shutdown for us before it
187 // commits suicide.
188 // Note that we can hold the opMutex while we also get the wrMutex.
189 //
190  if (defer)
191  {if (!sendQ) Shutdown(false);
192  else {TRACEI(DEBUG, "Shutdown FD " <<LinkInfo.FD<<" only via SendQ");
193  LinkInfo.InUse++;
194  LinkInfo.FD = -LinkInfo.FD; // Leave poll version untouched!
195  wrMutex.Lock();
196  sendQ->Terminate(this);
197  sendQ = 0;
198  wrMutex.UnLock();
199  }
200  return 0;
201  }
202 
203 // If we got here then this is not a deferred close so we just need to check
204 // if there is a sendq appendage we need to get rid of.
205 //
206  if (sendQ)
207  {wrMutex.Lock();
208  sendQ->Terminate();
209  sendQ = 0;
210  wrMutex.UnLock();
211  }
212 
213 // Multiple protocols may be bound to this link. If it is in use, defer the
214 // actual close until the use count drops to one.
215 //
216  while(LinkInfo.InUse > 1)
217  {opHelper.UnLock();
218  TRACEI(DEBUG, "Close FD "<<LinkInfo.FD <<" deferred, use count="
219  <<LinkInfo.InUse);
220  Serialize();
221  opHelper.Lock(&LinkInfo.opMutex);
222  }
223  LinkInfo.InUse--;
224  Instance = 0;
225 
226 // Add up the statistic for this link
227 //
228  syncStats(&csec);
229 
230 // Cleanup TLS if it is active
231 //
232  if (isTLS) tlsIO.Shutdown();
233 
234 // Clean this link up
235 //
236  if (Protocol) {Protocol->Recycle(this, csec, LinkInfo.Etext); Protocol = 0;}
237  if (ProtoAlt) {ProtoAlt->Recycle(this, csec, LinkInfo.Etext); ProtoAlt = 0;}
238  if (LinkInfo.Etext) {free(LinkInfo.Etext); LinkInfo.Etext = 0;}
239  LinkInfo.InUse = 0;
240 
241 // At this point we can have no lock conflicts, so if someone is waiting for
242 // us to terminate let them know about it. Note that we will get the condvar
243 // mutex while we hold the opMutex. This is the required order! We will also
244 // zero out the pointer to the condvar while holding the opmutex.
245 //
246  if (LinkInfo.KillcvP)
247  {LinkInfo.KillcvP->Lock();
250  LinkInfo.KillcvP = 0;
251  }
252 
253 // Remove ourselves from the poll table and then from the Link table. We may
254 // not hold on to the opMutex when we acquire the LTMutex. However, the link
255 // table needs to be cleaned up prior to actually closing the socket. So, we
256 // do some fancy footwork to prevent multiple closes of this link.
257 //
258  fd = abs(LinkInfo.FD);
259  if (PollInfo.FD > 0)
261  PollInfo.FD = -1;
262  opHelper.UnLock();
263  XrdLinkCtl::Unhook(fd);
264  } else opHelper.UnLock();
265 
266 // Invoke the TCP monitor if it was loaded.
267 //
268  if (TcpMonPin && fd > 2)
269  {XrdTcpMonPin::LinkInfo lnkInfo;
270  lnkInfo.tident = ID;
271  lnkInfo.fd = fd;
272  lnkInfo.consec = csec;
273  lnkInfo.bytesIn = BytesInTot;
274  lnkInfo.bytesOut = BytesOutTot;
275  TcpMonPin->Monitor(Addr, lnkInfo, sizeof(lnkInfo));
276  }
277 
278 // Close the file descriptor if it isn't being shared. Do it as the last
279 // thing because closes and accepts and not interlocked.
280 //
281  if (fd >= 2) {if (KeepFD) rc = 0;
282  else rc = (close(fd) < 0 ? errno : 0);
283  }
284  if (rc) Log.Emsg("Link", rc, "close", ID);
285  return rc;
286 }
287 
288 /******************************************************************************/
289 /* D o I t */
290 /******************************************************************************/
291 
293 {
294  int rc;
295 
296 // The Process() return code tells us what to do:
297 // < 0 -> Stop getting requests,
298 // -EINPROGRESS leave link disabled but otherwise all is well
299 // -n Error, disable and close the link
300 // = 0 -> OK, get next request, if allowed, o/w enable the link
301 // > 0 -> Slow link, stop getting requests and enable the link
302 //
303  if (Protocol)
304  do {rc = Protocol->Process(this);} while (!rc && Sched.canStick());
305  else {Log.Emsg("Link", "Dispatch on closed link", ID);
306  return;
307  }
308 
309 // Either re-enable the link and cycle back waiting for a new request, leave
310 // disabled, or terminate the connection.
311 //
312  if (rc >= 0)
314  else if (rc != -EINPROGRESS) Close();
315 }
316 
317 /******************************************************************************/
318 /* g e t P e e r C e r t s */
319 /******************************************************************************/
320 
322 {
323  return (isTLS ? tlsIO.getCerts(true) : 0);
324 }
325 
326 /******************************************************************************/
327 /* P e e k */
328 /******************************************************************************/
329 
330 int XrdLinkXeq::Peek(char *Buff, int Blen, int timeout)
331 {
332  XrdSysMutexHelper theMutex;
333  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
334  ssize_t mlen;
335  int retc;
336 
337 // Lock the read mutex if we need to, the helper will unlock it upon exit
338 //
339  if (LockReads) theMutex.Lock(&rdMutex);
340 
341 // Wait until we can actually read something
342 //
343  isIdle = 0;
344  do {retc = poll(&polltab, 1, timeout);} while(retc < 0 && errno == EINTR);
345  if (retc != 1)
346  {if (retc == 0) return 0;
347  return Log.Emsg("Link", -errno, "poll", ID);
348  }
349 
350 // Verify it is safe to read now
351 //
352  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
353  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
354  return -1;
355  }
356 
357 // Do the peek.
358 //
359  do {mlen = recv(LinkInfo.FD, Buff, Blen, MSG_PEEK);}
360  while(mlen < 0 && errno == EINTR);
361 
362 // Return the result
363 //
364  if (mlen >= 0) return int(mlen);
365  Log.Emsg("Link", errno, "peek on", ID);
366  return -1;
367 }
368 
369 /******************************************************************************/
370 /* R e c v */
371 /******************************************************************************/
372 
373 int XrdLinkXeq::Recv(char *Buff, int Blen)
374 {
375  ssize_t rlen;
376 
377 // Note that we will read only as much as is queued. Use Recv() with a
378 // timeout to receive as much data as possible.
379 //
380  if (LockReads) rdMutex.Lock();
381  isIdle = 0;
382  do {rlen = read(LinkInfo.FD, Buff, Blen);} while(rlen < 0 && errno == EINTR);
383  if (rlen > 0) AtomicAdd(BytesIn, rlen);
384  if (LockReads) rdMutex.UnLock();
385 
386  if (rlen >= 0) return int(rlen);
387  if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
388  return -1;
389 }
390 
391 /******************************************************************************/
392 
393 int XrdLinkXeq::Recv(char *Buff, int Blen, int timeout)
394 {
395  XrdSysMutexHelper theMutex;
396  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
397  ssize_t rlen, totlen = 0;
398  int retc;
399 
400 // Lock the read mutex if we need to, the helper will unlock it upon exit
401 //
402  if (LockReads) theMutex.Lock(&rdMutex);
403 
404 // Wait up to timeout milliseconds for data to arrive
405 //
406  isIdle = 0;
407  while(Blen > 0)
408  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
409  if (retc != 1)
410  {if (retc == 0)
411  {tardyCnt++;
412  if (totlen)
413  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
414  AtomicAdd(BytesIn, totlen);
415  }
416  return int(totlen);
417  }
418  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
419  }
420 
421  // Verify it is safe to read now
422  //
423  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
424  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents),
425  "polling", ID);
426  return -1;
427  }
428 
429  // Read as much data as you can. Note that we will force an error
430  // if we get a zero-length read after poll said it was OK.
431  //
432  do {rlen = recv(LinkInfo.FD, Buff, Blen, 0);}
433  while(rlen < 0 && errno == EINTR);
434  if (rlen <= 0)
435  {if (!rlen) return -ENOMSG;
436  if (LinkInfo.FD > 0) Log.Emsg("Link", -errno, "receive from", ID);
437  return -1;
438  }
439  totlen += rlen; Blen -= rlen; Buff += rlen;
440  }
441 
442  AtomicAdd(BytesIn, totlen);
443  return int(totlen);
444 }
445 
446 /******************************************************************************/
447 
448 int XrdLinkXeq::Recv(const struct iovec *iov, int iocnt, int timeout)
449 {
450  XrdSysMutexHelper theMutex;
451  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
452  int retc, rlen;
453 
454 // Lock the read mutex if we need to, the helper will unlock it upon exit
455 //
456  if (LockReads) theMutex.Lock(&rdMutex);
457 
458 // Wait up to timeout milliseconds for data to arrive
459 //
460  isIdle = 0;
461  do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
462  if (retc != 1)
463  {if (retc == 0)
464  {tardyCnt++;
465  return 0;
466  }
467  return (LinkInfo.FD >= 0 ? Log.Emsg("Link",-errno,"poll",ID) : -1);
468  }
469 
470 // Verify it is safe to read now
471 //
472  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
473  {Log.Emsg("Link", XrdPoll::Poll2Text(polltab.revents), "polling", ID);
474  return -1;
475  }
476 
477 // If the iocnt is within limits then just go ahead and read once.
478 //
479  if (iocnt <= maxIOV)
480  {rlen = RecvIOV(iov, iocnt);
481  if (rlen > 0) {AtomicAdd(BytesIn, rlen);}
482  return rlen;
483  }
484 
485 // We will have to break this up into allowable segments and we need to add up
486 // the bytes in each segment so that we know when to stop reading.
487 //
488  int seglen, segcnt = maxIOV, totlen = 0;
489  do {seglen = 0;
490  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
491  if ((rlen = RecvIOV(iov, segcnt)) < 0) return rlen;
492  totlen += rlen;
493  if (rlen < seglen) break;
494  iov += segcnt;
495  iocnt -= segcnt;
496  if (iocnt <= maxIOV) segcnt = iocnt;
497  } while(iocnt > 0);
498 
499 // All done
500 //
501  AtomicAdd(BytesIn, totlen);
502  return totlen;
503 }
504 
505 /******************************************************************************/
506 /* R e c v A l l */
507 /******************************************************************************/
508 
509 int XrdLinkXeq::RecvAll(char *Buff, int Blen, int timeout)
510 {
511  struct pollfd polltab = {PollInfo.FD, POLLIN|POLLRDNORM, 0};
512  ssize_t rlen;
513  int retc;
514 
515 // Check if timeout specified. Notice that the timeout is the max we will
516 // for some data. We will wait forever for all the data. Yeah, it's weird.
517 //
518  if (timeout >= 0)
519  {do {retc = poll(&polltab,1,timeout);} while(retc < 0 && errno == EINTR);
520  if (retc != 1)
521  {if (!retc) return -ETIMEDOUT;
522  Log.Emsg("Link",errno,"poll",ID);
523  return -1;
524  }
525  if (!(polltab.revents & (POLLIN|POLLRDNORM)))
526  {Log.Emsg("Link",XrdPoll::Poll2Text(polltab.revents),"polling",ID);
527  return -1;
528  }
529  }
530 
531 // Note that we will block until we receive all he bytes.
532 //
533  if (LockReads) rdMutex.Lock();
534  isIdle = 0;
535  do {rlen = recv(LinkInfo.FD, Buff, Blen, MSG_WAITALL);}
536  while(rlen < 0 && errno == EINTR);
537  if (rlen > 0) AtomicAdd(BytesIn, rlen);
538  if (LockReads) rdMutex.UnLock();
539 
540  if (int(rlen) == Blen) return Blen;
541  if (!rlen) {TRACEI(DEBUG, "No RecvAll() data; errno=" <<errno);}
542  else if (rlen > 0) Log.Emsg("RecvAll", "Premature end from", ID);
543  else if (LinkInfo.FD >= 0) Log.Emsg("Link", errno, "receive from", ID);
544  return -1;
545 }
546 
547 /******************************************************************************/
548 /* Protected: R e c v I O V */
549 /******************************************************************************/
550 
551 int XrdLinkXeq::RecvIOV(const struct iovec *iov, int iocnt)
552 {
553  ssize_t retc = 0;
554 
555 // Read the data in. On some version of Unix (e.g., Linux) a readv() may
556 // end at any time without reading all the bytes when directed to a socket.
557 // We always return the number bytes read (or an error). The caller needs to
558 // restart the read at the appropriate place in the iovec when more data arrives.
559 //
560  do {retc = readv(LinkInfo.FD, iov, iocnt);}
561  while(retc < 0 && errno == EINTR);
562 
563 // Check how we completed
564 //
565  if (retc < 0) Log.Emsg("Link", errno, "receive from", ID);
566  return retc;
567 }
568 
569 /******************************************************************************/
570 /* R e g i s t e r */
571 /******************************************************************************/
572 
573 bool XrdLinkXeq::Register(const char *hName)
574 {
575 
576 // First see if we can register this name with the address object
577 //
578  if (!Addr.Register(hName)) return false;
579 
580 // Make appropriate changes here
581 //
582  if (HostName) free(HostName);
583  HostName = strdup(hName);
584  strlcpy(Lname, hName, sizeof(Lname));
585  return true;
586 }
587 
588 /******************************************************************************/
589 /* S e n d */
590 /******************************************************************************/
591 
592 int XrdLinkXeq::Send(const char *Buff, int Blen)
593 {
594  ssize_t retc = 0, bytesleft = Blen;
595 
596 // Get a lock
597 //
598  wrMutex.Lock();
599  isIdle = 0;
600  AtomicAdd(BytesOut, Blen);
601 
602 // Do non-blocking writes if we are setup to do so.
603 //
604  if (sendQ)
605  {retc = sendQ->Send(Buff, Blen);
606  wrMutex.UnLock();
607  return retc;
608  }
609 
610 // Write the data out
611 //
612  while(bytesleft)
613  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
614  {if (errno == EINTR) continue;
615  else break;
616  }
617  bytesleft -= retc; Buff += retc;
618  }
619 
620 // All done
621 //
622  wrMutex.UnLock();
623  if (retc >= 0) return Blen;
624  Log.Emsg("Link", errno, "send to", ID);
625  return -1;
626 }
627 
628 /******************************************************************************/
629 
630 int XrdLinkXeq::Send(const struct iovec *iov, int iocnt, int bytes)
631 {
632  int retc;
633 
634 // Get a lock and assume we will be successful (statistically we are)
635 //
636  wrMutex.Lock();
637  isIdle = 0;
638  AtomicAdd(BytesOut, bytes);
639 
640 // Do non-blocking writes if we are setup to do so.
641 //
642  if (sendQ)
643  {retc = sendQ->Send(iov, iocnt, bytes);
644  wrMutex.UnLock();
645  return retc;
646  }
647 
648 // If the iocnt is within limits then just go ahead and write this out
649 //
650  if (iocnt <= maxIOV)
651  {retc = SendIOV(iov, iocnt, bytes);
652  wrMutex.UnLock();
653  return retc;
654  }
655 
656 // We will have to break this up into allowable segments
657 //
658  int seglen, segcnt = maxIOV, iolen = 0;
659  do {seglen = 0;
660  for (int i = 0; i < segcnt; i++) seglen += iov[i].iov_len;
661  if ((retc = SendIOV(iov, segcnt, seglen)) < 0)
662  {wrMutex.UnLock();
663  return retc;
664  }
665  iolen += retc;
666  iov += segcnt;
667  iocnt -= segcnt;
668  if (iocnt <= maxIOV) segcnt = iocnt;
669  } while(iocnt > 0);
670 
671 // All done
672 //
673  wrMutex.UnLock();
674  return iolen;
675 }
676 
677 /******************************************************************************/
678 
679 int XrdLinkXeq::Send(const sfVec *sfP, int sfN)
680 {
681 #if !defined(HAVE_SENDFILE)
682 
683  return -1;
684 
685 #elif defined(__solaris__)
686 
687  sendfilevec_t vecSF[XrdOucSFVec::sfMax], *vecSFP = vecSF;
688  size_t xframt, totamt, bytes = 0;
689  ssize_t retc;
690  int i = 0;
691 
692 // Construct the sendfilev() vector
693 //
694  for (i = 0; i < sfN; sfP++, i++)
695  {if (sfP->fdnum < 0)
696  {vecSF[i].sfv_fd = SFV_FD_SELF;
697  vecSF[i].sfv_off = (off_t)sfP->buffer;
698  } else {
699  vecSF[i].sfv_fd = sfP->fdnum;
700  vecSF[i].sfv_off = sfP->offset;
701  }
702  vecSF[i].sfv_flag = 0;
703  vecSF[i].sfv_len = sfP->sendsz;
704  bytes += sfP->sendsz;
705  }
706  totamt = bytes;
707 
708 // Lock the link, issue sendfilev(), and unlock the link. The documentation
709 // is very spotty and inconsistent. We can only retry this operation under
710 // very limited conditions.
711 //
712  wrMutex.Lock();
713  isIdle = 0;
714 do{retc = sendfilev(LinkInfo.FD, vecSFP, sfN, &xframt);
715 
716 // Check if all went well and return if so (usual case)
717 //
718  if (xframt == bytes)
719  {AtomicAdd(BytesOut, bytes);
720  wrMutex.UnLock();
721  return totamt;
722  }
723 
724 // The only one we will recover from is EINTR. We cannot legally get EAGAIN.
725 //
726  if (retc < 0 && errno != EINTR) break;
727 
728 // Try to resume the transfer
729 //
730  if (xframt > 0)
731  {AtomicAdd(BytesOut, xframt); bytes -= xframt; SfIntr++;
732  while(xframt > 0 && sfN)
733  {if ((ssize_t)xframt < (ssize_t)vecSFP->sfv_len)
734  {vecSFP->sfv_off += xframt; vecSFP->sfv_len -= xframt; break;}
735  xframt -= vecSFP->sfv_len; vecSFP++; sfN--;
736  }
737  }
738  } while(sfN > 0);
739 
740 // See if we can recover without destroying the connection
741 //
742  retc = (retc < 0 ? errno : ECANCELED);
743  wrMutex.UnLock();
744  Log.Emsg("Link", retc, "send file to", ID);
745  return -1;
746 
747 #elif defined(__linux__) || defined(__GNU__)
748 
749  static const int setON = 1, setOFF = 0;
750  ssize_t retc = 0, bytesleft;
751  off_t myOffset;
752  int i, xfrbytes = 0, uncork = 1, xIntr = 0;
753 
754 // lock the link
755 //
756  wrMutex.Lock();
757  isIdle = 0;
758 
759 // In linux we need to cork the socket. On permanent errors we do not uncork
760 // the socket because it will be closed in short order.
761 //
762  if (setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setON, sizeof(setON)) < 0)
763  {Log.Emsg("Link", errno, "cork socket for", ID);
764  uncork = 0; sfOK = 0;
765  }
766 
767 // Send the header first
768 //
769  for (i = 0; i < sfN; sfP++, i++)
770  {if (sfP->fdnum < 0) retc = sendData(sfP->buffer, sfP->sendsz);
771  else {myOffset = sfP->offset; bytesleft = sfP->sendsz;
772  while(bytesleft
773  && (retc=sendfile(LinkInfo.FD,sfP->fdnum,&myOffset,bytesleft)) > 0)
774  {bytesleft -= retc; xIntr++;}
775  }
776  if (retc < 0 && errno == EINTR) continue;
777  if (retc <= 0) break;
778  xfrbytes += sfP->sendsz;
779  }
780 
781 // Diagnose any sendfile errors
782 //
783  if (retc <= 0)
784  {if (retc == 0) errno = ECANCELED;
785  wrMutex.UnLock();
786  Log.Emsg("Link", errno, "send file to", ID);
787  return -1;
788  }
789 
790 // Now uncork the socket
791 //
792  if (uncork
793  && setsockopt(PollInfo.FD, SOL_TCP, TCP_CORK, &setOFF, sizeof(setOFF)) < 0)
794  Log.Emsg("Link", errno, "uncork socket for", ID);
795 
796 // All done
797 //
798  if (xIntr > sfN) SfIntr += (xIntr - sfN);
799  AtomicAdd(BytesOut, xfrbytes);
800  wrMutex.UnLock();
801  return xfrbytes;
802 
803 #else
804 
805  return -1;
806 
807 #endif
808 }
809 
810 /******************************************************************************/
811 /* Protected: s e n d D a t a */
812 /******************************************************************************/
813 
814 int XrdLinkXeq::sendData(const char *Buff, int Blen)
815 {
816  ssize_t retc = 0, bytesleft = Blen;
817 
818 // Write the data out
819 //
820  while(bytesleft)
821  {if ((retc = write(LinkInfo.FD, Buff, bytesleft)) < 0)
822  {if (errno == EINTR) continue;
823  else break;
824  }
825  bytesleft -= retc; Buff += retc;
826  }
827 
828 // All done
829 //
830  return retc;
831 }
832 
833 /******************************************************************************/
834 /* Protected: S e n d I O V */
835 /******************************************************************************/
836 
837 int XrdLinkXeq::SendIOV(const struct iovec *iov, int iocnt, int bytes)
838 {
839  ssize_t bytesleft, n, retc = 0;
840  const char *Buff;
841 
842 // Write the data out. On some version of Unix (e.g., Linux) a writev() may
843 // end at any time without writing all the bytes when directed to a socket.
844 // So, we attempt to resume the writev() using a combination of write() and
845 // a writev() continuation. This approach slowly converts a writev() to a
846 // series of writes if need be. We must do this inline because we must hold
847 // the lock until all the bytes are written or an error occurs.
848 //
849  bytesleft = static_cast<ssize_t>(bytes);
850  while(bytesleft)
851  {do {retc = writev(LinkInfo.FD, iov, iocnt);}
852  while(retc < 0 && errno == EINTR);
853  if (retc >= bytesleft || retc < 0) break;
854  bytesleft -= retc;
855  while(retc >= (n = static_cast<ssize_t>(iov->iov_len)))
856  {retc -= n; iov++; iocnt--;}
857  Buff = (const char *)iov->iov_base + retc; n -= retc; iov++; iocnt--;
858  while(n) {if ((retc = write(LinkInfo.FD, Buff, n)) < 0)
859  {if (errno == EINTR) continue;
860  else break;
861  }
862  n -= retc; Buff += retc; bytesleft -= retc;
863  }
864  if (retc < 0 || iocnt < 1) break;
865  }
866 
867 // All done
868 //
869  if (retc >= 0) return bytes;
870  Log.Emsg("Link", errno, "send to", ID);
871  return -1;
872 }
873 
874 /******************************************************************************/
875 /* s e t I D */
876 /******************************************************************************/
877 
878 void XrdLinkXeq::setID(const char *userid, int procid)
879 {
880  char buff[sizeof(Uname)], *bp, *sp;
881  int ulen;
882 
883  snprintf(buff, sizeof(buff), "%s.%d:%d", userid, procid, PollInfo.FD);
884  ulen = strlen(buff);
885  sp = buff + ulen - 1;
886  bp = &Uname[sizeof(Uname)-1];
887  if (ulen > (int)sizeof(Uname)) ulen = sizeof(Uname);
888  *bp = '@'; bp--;
889  while(ulen--) {*bp = *sp; bp--; sp--;}
890  ID = bp+1;
891  Comment = (const char *)ID;
892 
893 // Update the ID in the TLS socket if enabled
894 //
895  if (isTLS) tlsIO.SetTraceID(ID);
896 }
897 
898 /******************************************************************************/
899 /* s e t N B */
900 /******************************************************************************/
901 
903 {
904 // We don't support non-blocking output except for Linux at the moment
905 //
906 #if !defined(__linux__)
907  return false;
908 #else
909 // Trace this request
910 //
911  TRACEI(DEBUG,"enabling non-blocking output");
912 
913 // If we don't already have a sendQ object get one. This is a one-time call
914 // so to optimize checking if this object exists we also get the opMutex.'
915 //
917  if (!sendQ)
918  {wrMutex.Lock();
919  sendQ = new XrdSendQ(*this, wrMutex);
920  wrMutex.UnLock();
921  }
923  return true;
924 #endif
925 }
926 
927 /******************************************************************************/
928 /* s e t P r o t o c o l */
929 /******************************************************************************/
930 
932 {
933 
934 // Set new protocol.
935 //
937  XrdProtocol *op = Protocol;
938  if (push) ProtoAlt = Protocol;
939  Protocol = pp;
941  return op;
942 }
943 
944 /******************************************************************************/
945 /* s e t P r o t N a m e */
946 /******************************************************************************/
947 
948 void XrdLinkXeq::setProtName(const char *name)
949 {
950 
951 // Set the protocol name.
952 //
954  Addr.SetDialect(name);
956 }
957 
958 /******************************************************************************/
959 /* s e t T L S */
960 /******************************************************************************/
961 
962 bool XrdLinkXeq::setTLS(bool enable, XrdTlsContext *ctx)
963 { //???
964 // static const XrdTlsConnection::RW_Mode rwMode=XrdTlsConnection::TLS_RNB_WBL;
967  const char *eNote;
968  XrdTls::RC rc;
969 
970 // If we are already in a compatible mode, we are done
971 //
972 
973  if (isTLS == enable) return true;
974 
975 // If this is a shutdown, then do it now.
976 //
977  if (!enable)
978  {tlsIO.Shutdown();
979  isTLS = enable;
980  Addr.SetTLS(enable);
981  return true;
982  }
983 // We want to initialize TLS, do so now.
984 //
985  if (!ctx) ctx = tlsCtx;
986  eNote = tlsIO.Init(*ctx, PollInfo.FD, rwMode, hsMode, false, false, ID);
987 
988 // Check for errors
989 //
990  if (eNote)
991  {char buff[1024];
992  snprintf(buff, sizeof(buff), "Unable to enable tls for %s;", ID);
993  Log.Emsg("LinkXeq", buff, eNote);
994  return false;
995  }
996 
997 // Now we need to accept this TLS connection
998 //
999  std::string eMsg;
1000  rc = tlsIO.Accept(&eMsg);
1001 
1002 // Diagnose return state
1003 //
1004  if (rc != XrdTls::TLS_AOK) Log.Emsg("LinkXeq", eMsg.c_str());
1005  else {isTLS = enable;
1006  Addr.SetTLS(enable);
1007  Log.Emsg("LinkXeq", ID, "connection upgraded to", verTLS());
1008  }
1009  return rc == XrdTls::TLS_AOK;
1010 }
1011 
1012 /******************************************************************************/
1013 /* S F E r r o r */
1014 /******************************************************************************/
1015 
1017 {
1018  Log.Emsg("TLS", rc, "send file to", ID);
1019  return -1;
1020 }
1021 
1022 /******************************************************************************/
1023 /* S h u t d o w n */
1024 /******************************************************************************/
1025 
1026 void XrdLinkXeq::Shutdown(bool getLock)
1027 {
1028  int temp;
1029 
1030 // Trace the entry
1031 //
1032  TRACEI(DEBUG, (getLock ? "Async" : "Sync") <<" link shutdown in progress");
1033 
1034 // Get the lock if we need too (external entry via another thread)
1035 //
1036  if (getLock) LinkInfo.opMutex.Lock();
1037 
1038 // If there is something to do, do it now
1039 //
1040  temp = Instance; Instance = 0;
1041  if (!KeepFD)
1042  {shutdown(PollInfo.FD, SHUT_RDWR);
1043  if (dup2(devNull, PollInfo.FD) < 0)
1044  {Instance = temp;
1045  Log.Emsg("Link", errno, "shutdown FD for", ID);
1046  }
1047  }
1048 
1049 // All done
1050 //
1051  if (getLock) LinkInfo.opMutex.UnLock();
1052 }
1053 
1054 /******************************************************************************/
1055 /* S t a t s */
1056 /******************************************************************************/
1057 
1058 int XrdLinkXeq::Stats(char *buff, int blen, bool do_sync)
1059 {
1060  static const char statfmt[] = "<stats id=\"link\"><num>%d</num>"
1061  "<maxn>%d</maxn><tot>%lld</tot><in>%lld</in><out>%lld</out>"
1062  "<ctime>%lld</ctime><tmo>%d</tmo><stall>%d</stall>"
1063  "<sfps>%d</sfps></stats>";
1064  int i;
1065 
1066 // Check if actual length wanted
1067 //
1068  if (!buff) return sizeof(statfmt)+17*6;
1069 
1070 // We must synchronize the statistical counters
1071 //
1072  if (do_sync) XrdLinkCtl::SyncAll();
1073 
1074 // Obtain lock on the stats area and format it
1075 //
1077  i = snprintf(buff, blen, statfmt, AtomicGet(LinkCount),
1087  return i;
1088 }
1089 
1090 /******************************************************************************/
1091 /* s y n c S t a t s */
1092 /******************************************************************************/
1093 
1094 void XrdLinkXeq::syncStats(int *ctime)
1095 {
1096  long long tmpLL;
1097  int tmpI4;
1098 
1099 // If this is dynamic, get the opMutex lock
1100 //
1101  if (!ctime) LinkInfo.opMutex.Lock();
1102 
1103 // Either the caller has the opMutex or this is called out of close. In either
1104 // case, we need to get the read and write mutexes; each followed by the stats
1105 // mutex. This order is important because we should not hold the stats mutex
1106 // for very long and the r/w mutexes may take a long time to acquire. If we
1107 // must maintain the link count we need to actually acquire the stats mutex as
1108 // we will be doing compound operations. Atomics are still used to keep other
1109 // threads from seeing partial results.
1110 //
1111  AtomicBeg(rdMutex);
1112 
1113  if (ctime)
1114  {*ctime = time(0) - LinkInfo.conTime;
1115  AtomicAdd(LinkConTime, *ctime);
1116  statsMutex.Lock();
1117  if (LinkCount > 0) AtomicDec(LinkCount);
1118  statsMutex.UnLock();
1119  }
1120 
1122 
1123  tmpLL = AtomicFAZ(BytesIn);
1124  AtomicAdd(LinkBytesIn, tmpLL); AtomicAdd(BytesInTot, tmpLL);
1125  tmpI4 = AtomicFAZ(tardyCnt);
1126  AtomicAdd(LinkTimeOuts, tmpI4); AtomicAdd(tardyCntTot, tmpI4);
1127  tmpI4 = AtomicFAZ(stallCnt);
1128  AtomicAdd(LinkStalls, tmpI4); AtomicAdd(stallCntTot, tmpI4);
1130 
1132  tmpLL = AtomicFAZ(BytesOut);
1133  AtomicAdd(LinkBytesOut, tmpLL); AtomicAdd(BytesOutTot, tmpLL);
1134  tmpI4 = AtomicFAZ(SfIntr);
1135  AtomicAdd(LinkSfIntr, tmpI4);
1137 
1138 // Make sure the protocol updates it's statistics as well
1139 //
1140  if (Protocol) Protocol->Stats(0, 0, 1);
1141 
1142 // All done
1143 //
1144  if (!ctime) LinkInfo.opMutex.UnLock();
1145 }
1146 
1147 /******************************************************************************/
1148 /* Protected: T L S _ E r r o r */
1149 /******************************************************************************/
1150 
1151 int XrdLinkXeq::TLS_Error(const char *act, XrdTls::RC rc)
1152 {
1153  std::string reason = XrdTls::RC2Text(rc);
1154  char msg[512];
1155 
1156  snprintf(msg, sizeof(msg), "Unable to %s %s;", act, ID);
1157  Log.Emsg("TLS", msg, reason.c_str());
1158  return -1;
1159 }
1160 
1161 /******************************************************************************/
1162 /* T L S _ P e e k */
1163 /******************************************************************************/
1164 
1165 int XrdLinkXeq::TLS_Peek(char *Buff, int Blen, int timeout)
1166 {
1167  XrdSysMutexHelper theMutex;
1168  XrdTls::RC retc;
1169  int rc, rlen;
1170 
1171 // Lock the read mutex if we need to, the helper will unlock it upon exit
1172 //
1173  if (LockReads) theMutex.Lock(&rdMutex);
1174 
1175 // Wait until we can actually read something
1176 //
1177  isIdle = 0;
1178  if (timeout)
1179  {rc = Wait4Data(timeout);
1180  if (rc < 1) return rc;
1181  }
1182 
1183 // Do the peek and if sucessful, the number of bytes available.
1184 //
1185  retc = tlsIO.Peek(Buff, Blen, rlen);
1186  if (retc == XrdTls::TLS_AOK) return rlen;
1187 
1188 // Dianose the TLS error and return failure
1189 //
1190  return TLS_Error("peek on", retc);
1191 }
1192 
1193 /******************************************************************************/
1194 /* T L S _ R e c v */
1195 /******************************************************************************/
1196 
1197 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen)
1198 {
1199  XrdSysMutexHelper theMutex;
1200  XrdTls::RC retc;
1201  int rlen;
1202 
1203 // Lock the read mutex if we need to, the helper will unlock it upon exit
1204 //
1205  if (LockReads) theMutex.Lock(&rdMutex);
1206 
1207 // Note that we will read only as much as is queued. Use Recv() with a
1208 // timeout to receive as much data as possible.
1209 //
1210  isIdle = 0;
1211  retc = tlsIO.Read(Buff, Blen, rlen);
1212  if (retc != XrdTls::TLS_AOK) return TLS_Error("receive from", retc);
1213  if (rlen > 0) AtomicAdd(BytesIn, rlen);
1214  return rlen;
1215 }
1216 
1217 /******************************************************************************/
1218 
1219 int XrdLinkXeq::TLS_Recv(char *Buff, int Blen, int timeout, bool havelock)
1220 {
1221  XrdSysMutexHelper theMutex;
1222  XrdTls::RC retc;
1223  int pend, rlen, totlen = 0;
1224 
1225 // Lock the read mutex if we need to, the helper will unlock it upon exit
1226 //
1227  if (LockReads && !havelock) theMutex.Lock(&rdMutex);
1228 
1229 // Wait up to timeout milliseconds for data to arrive
1230 //
1231  isIdle = 0;
1232  while(Blen > 0)
1233  {pend = tlsIO.Pending(true);
1234  if (!pend) pend = Wait4Data(timeout);
1235  if (pend < 1)
1236  {if (pend < 0) return -1;
1237  tardyCnt++;
1238  if (totlen)
1239  {if ((++stallCnt & 0xff) == 1) TRACEI(DEBUG,"read timed out");
1240  AtomicAdd(BytesIn, totlen);
1241  }
1242  return totlen;
1243  }
1244 
1245  // Read as much data as you can. Note that we will force an error
1246  // if we get a zero-length read after poll said it was OK. However,
1247  // if we never read anything, then we simply return -ENOMSG to avoid
1248  // generating a "read link error" as clearly there was a hangup.
1249  //
1250  retc = tlsIO.Read(Buff, Blen, rlen);
1251  if (retc != XrdTls::TLS_AOK)
1252  {if (!totlen) return -ENOMSG;
1253  AtomicAdd(BytesIn, totlen);
1254  return TLS_Error("receive from", retc);
1255  }
1256  if (rlen <= 0) break;
1257  totlen += rlen; Blen -= rlen; Buff += rlen;
1258  }
1259 
1260  AtomicAdd(BytesIn, totlen);
1261  return totlen;
1262 }
1263 
1264 /******************************************************************************/
1265 
1266 int XrdLinkXeq::TLS_Recv(const struct iovec *iov, int iocnt, int timeout)
1267 {
1268  XrdSysMutexHelper theMutex;
1269  char *Buff;
1270  int Blen, rlen, totlen = 0;
1271 
1272 // Lock the read mutex if we need to, the helper will unlock it upon exit
1273 //
1274  if (LockReads) theMutex.Lock(&rdMutex);
1275 
1276 // Individually process each element until we can't read any more
1277 //
1278  isIdle = 0;
1279  for (int i = 0; i < iocnt; i++)
1280  {Buff = (char *)iov[i].iov_base;
1281  Blen = iov[i].iov_len;
1282  rlen = TLS_Recv(Buff, Blen, timeout, true);
1283  if (rlen <= 0) break;
1284  totlen += rlen;
1285  if (rlen < Blen) break;
1286  }
1287 
1288  if (totlen) {AtomicAdd(BytesIn, totlen);}
1289  return totlen;
1290 }
1291 
1292 /******************************************************************************/
1293 /* T L S _ R e c v A l l */
1294 /******************************************************************************/
1295 
1296 int XrdLinkXeq::TLS_RecvAll(char *Buff, int Blen, int timeout)
1297 {
1298  int retc;
1299 
1300 // Check if timeout specified. Notice that the timeout is the max we will
1301 // wait for some data. We will wait forever for all the data. Yeah, it's weird.
1302 //
1303  if (timeout >= 0)
1304  {retc = tlsIO.Pending(true);
1305  if (!retc) retc = Wait4Data(timeout);
1306  if (retc < 1) return (retc ? -1 : -ETIMEDOUT);
1307  }
1308 
1309 // Note that we will block until we receive all the bytes.
1310 //
1311  return TLS_Recv(Buff, Blen, -1);
1312 }
1313 
1314 /******************************************************************************/
1315 /* T L S _ S e n d */
1316 /******************************************************************************/
1317 
1318 int XrdLinkXeq::TLS_Send(const char *Buff, int Blen)
1319 {
1321  ssize_t bytesleft = Blen;
1322  XrdTls::RC retc;
1323  int byteswritten;
1324 
1325 // Prepare to send
1326 //
1327  isIdle = 0;
1328  AtomicAdd(BytesOut, Blen);
1329 
1330 // Do non-blocking writes if we are setup to do so.
1331 //
1332  if (sendQ) return sendQ->Send(Buff, Blen);
1333 
1334 // Write the data out
1335 //
1336  while(bytesleft)
1337  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1338  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1339  bytesleft -= byteswritten; Buff += byteswritten;
1340  }
1341 
1342 // All done
1343 //
1344  return Blen;
1345 }
1346 
1347 /******************************************************************************/
1348 
1349 int XrdLinkXeq::TLS_Send(const struct iovec *iov, int iocnt, int bytes)
1350 {
1352  XrdTls::RC retc;
1353  int byteswritten;
1354 
1355 // Get a lock and assume we will be successful (statistically we are). Note
1356 // that the calling interface gauranteed bytes are not zero.
1357 //
1358  isIdle = 0;
1359  AtomicAdd(BytesOut, bytes);
1360 
1361 // Do non-blocking writes if we are setup to do so.
1362 //
1363  if (sendQ) return sendQ->Send(iov, iocnt, bytes);
1364 
1365 // Write the data out.
1366 //
1367  for (int i = 0; i < iocnt; i++)
1368  {ssize_t bytesleft = iov[i].iov_len;
1369  char *Buff = (char *)iov[i].iov_base;
1370  while(bytesleft)
1371  {retc = tlsIO.Write(Buff, bytesleft, byteswritten);
1372  if (retc != XrdTls::TLS_AOK) return TLS_Error("send to", retc);
1373  bytesleft -= byteswritten; Buff += byteswritten;
1374  }
1375  }
1376 
1377 // All done
1378 //
1379  return bytes;
1380 }
1381 
1382 /******************************************************************************/
1383 
1384 int XrdLinkXeq::TLS_Send(const sfVec *sfP, int sfN)
1385 {
1387  int bytes, buffsz, fileFD, retc;
1388  off_t offset;
1389  ssize_t totamt = 0;
1390  char myBuff[65536];
1391 
1392 // Convert the sendfile to a regular send. The conversion is not particularly
1393 // fast and caller are advised to avoid using sendfile on TLS connections.
1394 //
1395  isIdle = 0;
1396  for (int i = 0; i < sfN; sfP++, i++)
1397  {if (!(bytes = sfP->sendsz)) continue;
1398  totamt += bytes;
1399  if (sfP->fdnum < 0)
1400  {if (!TLS_Write(sfP->buffer, bytes)) return -1;
1401  continue;
1402  }
1403  offset = sfP->offset;
1404  fileFD = sfP->fdnum;
1405  buffsz = (bytes < (int)sizeof(myBuff) ? bytes : sizeof(myBuff));
1406  do {do {retc = pread(fileFD, myBuff, buffsz, offset);}
1407  while(retc < 0 && errno == EINTR);
1408  if (retc < 0) return SFError(errno);
1409  if (!retc) break;
1410  if (!TLS_Write(myBuff, buffsz)) return -1;
1411  offset += buffsz; bytes -= buffsz; totamt += retc;
1412  } while(bytes > 0);
1413  }
1414 
1415 // We are done
1416 //
1417  AtomicAdd(BytesOut, totamt);
1418  return totamt;
1419 }
1420 
1421 /******************************************************************************/
1422 /* Protected: T L S _ W r i t e */
1423 /******************************************************************************/
1424 
1425 bool XrdLinkXeq::TLS_Write(const char *Buff, int Blen)
1426 {
1427  XrdTls::RC retc;
1428  int byteswritten;
1429 
1430 // Write the data out
1431 //
1432  while(Blen)
1433  {retc = tlsIO.Write(Buff, Blen, byteswritten);
1434  if (retc != XrdTls::TLS_AOK)
1435  {TLS_Error("write to", retc);
1436  return false;
1437  }
1438  Blen -= byteswritten; Buff += byteswritten;
1439  }
1440 
1441 // All done
1442 //
1443  return true;
1444 }
1445 
1446 /******************************************************************************/
1447 /* v e r T L S */
1448 /******************************************************************************/
1449 
1450 const char *XrdLinkXeq::verTLS()
1451 {
1452  return tlsIO.Version();
1453 }
#define DEBUG(x)
Definition: XrdBwmTrace.hh:54
ssize_t pread(int fildes, void *buf, size_t nbyte, off_t offset)
ssize_t readv(int fildes, const struct iovec *iov, int iovcnt)
ssize_t write(int fildes, const void *buf, size_t nbyte)
ssize_t writev(int fildes, const struct iovec *iov, int iovcnt)
ssize_t read(int fildes, void *buf, size_t nbyte)
#define close(a)
Definition: XrdPosix.hh:48
#define eMsg(x)
#define AtomicFAZ(x)
#define AtomicBeg(Mtx)
#define AtomicDec(x)
#define AtomicGet(x)
#define AtomicEnd(Mtx)
#define AtomicAdd(x, y)
size_t strlcpy(char *dst, const char *src, size_t sz)
#define TRACEI(act, x)
Definition: XrdTrace.hh:66
const char * Comment
Definition: XrdJob.hh:47
static void SyncAll()
Synchronize statustics for ll links.
Definition: XrdLinkCtl.cc:374
static void Unhook(int fd)
Unhook a link from the active table of links.
Definition: XrdLinkCtl.cc:392
time_t conTime
Definition: XrdLinkInfo.hh:44
void Reset()
Definition: XrdLinkInfo.hh:52
char * Etext
Definition: XrdLinkInfo.hh:45
XrdSysRecMutex opMutex
Definition: XrdLinkInfo.hh:46
XrdSysCondVar * KillcvP
Definition: XrdLinkInfo.hh:42
static const char * TraceID
Definition: XrdLinkXeq.hh:157
int TLS_Send(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1318
long long BytesOut
Definition: XrdLinkXeq.hh:172
int TLS_Error(const char *act, XrdTls::RC rc)
Definition: XrdLinkXeq.cc:1151
int TLS_Peek(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1165
int stallCntTot
Definition: XrdLinkXeq.hh:175
int Client(char *buff, int blen)
Definition: XrdLinkXeq.cc:152
char Uname[24]
Definition: XrdLinkXeq.hh:200
XrdTlsPeerCerts * getPeerCerts()
Definition: XrdLinkXeq.cc:321
static int LinkCountMax
Definition: XrdLinkXeq.hh:166
XrdLinkInfo LinkInfo
Definition: XrdLinkXeq.hh:144
XrdProtocol * ProtoAlt
Definition: XrdLinkXeq.hh:184
int Close(bool defer=false)
Definition: XrdLinkXeq.cc:172
XrdNetAddr Addr
Definition: XrdLinkXeq.hh:192
int TLS_Recv(char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1197
int sendData(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:814
long long BytesInTot
Definition: XrdLinkXeq.hh:171
bool TLS_Write(const char *Buff, int Blen)
Definition: XrdLinkXeq.cc:1425
int SendIOV(const struct iovec *iov, int iocnt, int bytes)
Definition: XrdLinkXeq.cc:837
XrdProtocol * setProtocol(XrdProtocol *pp, bool push)
Definition: XrdLinkXeq.cc:931
static long long LinkCountTot
Definition: XrdLinkXeq.hh:164
long long BytesOutTot
Definition: XrdLinkXeq.hh:173
void Shutdown(bool getLock)
Definition: XrdLinkXeq.cc:1026
int Peek(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:330
static int LinkCount
Definition: XrdLinkXeq.hh:165
void Reset()
Definition: XrdLinkXeq.cc:113
int Backlog()
Definition: XrdLinkXeq.cc:139
XrdSysMutex wrMutex
Definition: XrdLinkXeq.hh:194
static int Stats(char *buff, int blen, bool do_sync=false)
Definition: XrdLinkXeq.cc:1058
XrdSendQ * sendQ
Definition: XrdLinkXeq.hh:195
XrdPollInfo PollInfo
Definition: XrdLinkXeq.hh:145
void setID(const char *userid, int procid)
Definition: XrdLinkXeq.cc:878
bool LockReads
Definition: XrdLinkXeq.hh:197
int Recv(char *buff, int blen)
Definition: XrdLinkXeq.cc:373
static long long LinkBytesIn
Definition: XrdLinkXeq.hh:161
int TLS_RecvAll(char *Buff, int Blen, int timeout)
Definition: XrdLinkXeq.cc:1296
int SFError(int rc)
Definition: XrdLinkXeq.cc:1016
long long BytesIn
Definition: XrdLinkXeq.hh:170
int tardyCntTot
Definition: XrdLinkXeq.hh:177
int Send(const char *buff, int blen)
Definition: XrdLinkXeq.cc:592
XrdSysMutex rdMutex
Definition: XrdLinkXeq.hh:193
const char * verTLS()
Definition: XrdLinkXeq.cc:1450
bool setNB()
Definition: XrdLinkXeq.cc:902
int RecvIOV(const struct iovec *iov, int iocnt)
Definition: XrdLinkXeq.cc:551
char Lname[256]
Definition: XrdLinkXeq.hh:201
static long long LinkConTime
Definition: XrdLinkXeq.hh:163
static int LinkSfIntr
Definition: XrdLinkXeq.hh:169
XrdTlsSocket tlsIO
Definition: XrdLinkXeq.hh:188
void DoIt()
Definition: XrdLinkXeq.cc:292
int RecvAll(char *buff, int blen, int timeout=-1)
Definition: XrdLinkXeq.cc:509
XrdProtocol * Protocol
Definition: XrdLinkXeq.hh:183
bool Register(const char *hName)
Definition: XrdLinkXeq.cc:573
static XrdSysMutex statsMutex
Definition: XrdLinkXeq.hh:179
void setProtName(const char *name)
Definition: XrdLinkXeq.cc:948
static int LinkStalls
Definition: XrdLinkXeq.hh:168
static long long LinkBytesOut
Definition: XrdLinkXeq.hh:162
void syncStats(int *ctime=0)
Definition: XrdLinkXeq.cc:1094
bool setTLS(bool enable, XrdTlsContext *ctx=0)
Definition: XrdLinkXeq.cc:962
static int LinkTimeOuts
Definition: XrdLinkXeq.hh:167
void SetDialect(const char *dP)
Definition: XrdNetAddr.hh:205
bool Register(const char *hName)
Definition: XrdNetAddr.cc:180
void SetTLS(bool val)
Definition: XrdNetAddr.cc:590
void Zorch()
Definition: XrdPollInfo.hh:49
XrdPoll * Poller
Definition: XrdPollInfo.hh:43
virtual int Enable(XrdPollInfo &pInfo)=0
static char * Poll2Text(short events)
Definition: XrdPoll.cc:272
static void Detach(XrdPollInfo &pInfo)
Definition: XrdPoll.cc:177
virtual void Recycle(XrdLink *lp=0, int consec=0, const char *reason=0)=0
virtual int Stats(char *buff, int blen, int do_sync=0)=0
virtual int Process(XrdLink *lp)=0
void Terminate(XrdLink *lP=0)
Definition: XrdSendQ.cc:396
int Send(const char *buff, int blen)
Definition: XrdSendQ.cc:230
unsigned int Backlog()
Definition: XrdSendQ.hh:46
int Emsg(const char *esfx, int ecode, const char *text1, const char *text2=0)
Definition: XrdSysError.cc:95
void Lock(XrdSysMutex *Mutex)
int fd
Socket file descriptor.
Definition: XrdTcpMonPin.hh:61
long long bytesOut
Bytes written to the socket.
Definition: XrdTcpMonPin.hh:64
int consec
Seconds connected.
Definition: XrdTcpMonPin.hh:62
virtual void Monitor(XrdNetAddrInfo &netInfo, LinkInfo &lnkInfo, int liLen)=0
long long bytesIn
Bytes read from the socket.
Definition: XrdTcpMonPin.hh:63
const char * tident
Pointer to the client's trace identifier.
Definition: XrdTcpMonPin.hh:60
@ TLS_HS_BLOCK
Always block during handshake.
Definition: XrdTlsSocket.hh:53
XrdTls::RC Accept(std::string *eMsg=0)
void Shutdown(SDType=sdImmed)
@ TLS_RBL_WBL
blocking read blocking write
Definition: XrdTlsSocket.hh:48
XrdTls::RC Write(const char *buffer, size_t size, int &bytesOut)
const char * Version()
XrdTls::RC Read(char *buffer, size_t size, int &bytesRead)
Read from the TLS connection. If necessary, a handshake will be done.
const char * Init(XrdTlsContext &ctx, int sfd, RW_Mode rwm, HS_Mode hsm, bool isClient, bool serial=true, const char *tid="")
void SetTraceID(const char *tid)
int Pending(bool any=true)
XrdTls::RC Peek(char *buffer, size_t size, int &bytesPeek)
XrdTlsPeerCerts * getCerts(bool ver=true)
static std::string RC2Text(XrdTls::RC rc, bool dbg=false)
Definition: XrdTls.cc:127
@ TLS_AOK
All went well, will always be zero.
Definition: XrdTls.hh:40
XrdTlsContext * tlsCtx
Definition: XrdGlobals.cc:52
XrdTcpMonPin * TcpMonPin
Definition: XrdLinkXeq.cc:80
const int maxIOV
Definition: XrdLinkXeq.cc:82
XrdSysError Log
Definition: XrdConfig.cc:113
XrdScheduler Sched
Definition: XrdLinkCtl.cc:54
int devNull
Definition: XrdGlobals.cc:55
int getIovMax()
int fdnum
File descriptor for data.
Definition: XrdOucSFVec.hh:47
int sendsz
Length of data at offset.
Definition: XrdOucSFVec.hh:46