xrootd
XrdTpcStream.hh
Go to the documentation of this file.
1 
10 #include <memory>
11 #include <vector>
12 
13 #include <cstring>
14 
15 struct stat;
16 
17 class XrdSfsFile;
18 class XrdSysError;
19 
20 namespace TPC {
21 class Stream {
22 public:
23  Stream(std::unique_ptr<XrdSfsFile> fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
24  : m_open_for_write(false),
25  m_avail_count(max_blocks),
26  m_fh(std::move(fh)),
27  m_offset(0),
28  m_log(log)
29  {
30  m_buffers.reserve(max_blocks);
31  for (size_t idx=0; idx < max_blocks; idx++) {
32  m_buffers.push_back(new Entry(buffer_size));
33  }
34  m_open_for_write = true;
35  }
36 
37  ~Stream();
38 
39  int Stat(struct stat *);
40 
41  int Read(off_t offset, char *buffer, size_t size);
42 
43  int Write(off_t offset, const char *buffer, size_t size);
44 
45  size_t AvailableBuffers() const {return m_avail_count;}
46 
47  void DumpBuffers() const;
48 
49  // Flush and finalize the stream. If all data has been sent to the underlying
50  // file handle, close() will be invoked on the file handle.
51  //
52  // Further write operations on this stream will result in an error.
53  // If any memory buffers remain, an error occurs.
54  //
55  // Returns true on success; false otherwise.
56  bool Finalize();
57 
58 private:
59 
60  class Entry {
61  public:
62  Entry(size_t capacity) :
63  m_offset(-1),
64  m_capacity(capacity),
65  m_size(0)
66  {}
67 
68  bool Available() const {return m_offset == -1;}
69 
70  int Write(Stream &stream) {
71  if (Available() || !CanWrite(stream)) {return 0;}
72  // Currently, only full writes are accepted.
73  int size_desired = m_size;
74  int retval = stream.Write(m_offset, &m_buffer[0], size_desired);
75  m_size = 0;
76  m_offset = -1;
77  if (retval != size_desired) {
78  return -1;
79  }
80  return retval;
81  }
82 
83  bool Accept(off_t offset, const char *buf, size_t size) {
84  // Validate acceptance criteria.
85  if ((m_offset != -1) && (offset != m_offset + static_cast<ssize_t>(m_size))) {
86  return false;
87  }
88  if (size > m_capacity - m_size) {
89  return false;
90  }
91 
92  // Inflate the underlying buffer if needed.
93  ssize_t new_bytes_needed = (m_size + size) - m_buffer.capacity();
94  if (new_bytes_needed > 0) {
95  m_buffer.reserve(m_capacity);
96  }
97 
98  // Finally, do the copy.
99  memcpy(&m_buffer[0] + m_size, buf, size);
100  m_size += size;
101  if (m_offset == -1) {
102  m_offset = offset;
103  }
104  return true;
105  }
106 
107  void ShrinkIfUnused() {
108  if (!Available()) {return;}
109 #if __cplusplus > 199711L
110  m_buffer.shrink_to_fit();
111 #endif
112  }
113 
114  void Move(Entry &other) {
115  m_buffer.swap(other.m_buffer);
116  m_offset = other.m_offset;
117  m_size = other.m_size;
118  }
119 
120  off_t GetOffset() const {return m_offset;}
121  size_t GetCapacity() const {return m_capacity;}
122  size_t GetSize() const {return m_size;}
123 
124  private:
125 
126  Entry(const Entry&) = delete;
127 
128  bool CanWrite(Stream &stream) const {
129  return (m_size > 0) && (m_offset == stream.m_offset);
130  }
131 
132  off_t m_offset; // Offset within file that m_buffer[0] represents.
133  size_t m_capacity;
134  size_t m_size; // Number of bytes held in buffer.
135  std::vector<char> m_buffer;
136  };
137 
140  std::unique_ptr<XrdSfsFile> m_fh;
141  off_t m_offset;
142  std::vector<Entry*> m_buffers;
144 };
145 }
Definition: XrdTpcStream.hh:21
bool Available() const
Definition: XrdTpcStream.hh:68
XrdSysError & m_log
Definition: XrdTpcStream.hh:143
size_t AvailableBuffers() const
Definition: XrdTpcStream.hh:45
size_t m_capacity
Definition: XrdTpcStream.hh:133
size_t m_avail_count
Definition: XrdTpcStream.hh:139
bool CanWrite(Stream &stream) const
Definition: XrdTpcStream.hh:128
void DumpBuffers() const
int Stat(struct stat *)
Stream(std::unique_ptr< XrdSfsFile > fh, size_t max_blocks, size_t buffer_size, XrdSysError &log)
Definition: XrdTpcStream.hh:23
off_t m_offset
Definition: XrdTpcStream.hh:132
Definition: XrdSysError.hh:89
int Read(off_t offset, char *buffer, size_t size)
size_t GetCapacity() const
Definition: XrdTpcStream.hh:121
void ShrinkIfUnused()
Definition: XrdTpcStream.hh:107
off_t m_offset
Definition: XrdTpcStream.hh:141
int Write(Stream &stream)
Definition: XrdTpcStream.hh:70
bool Finalize()
Definition: XrdTpcStream.hh:60
size_t GetSize() const
Definition: XrdTpcStream.hh:122
int Write(off_t offset, const char *buffer, size_t size)
bool Accept(off_t offset, const char *buf, size_t size)
Definition: XrdTpcStream.hh:83
Entry(size_t capacity)
Definition: XrdTpcStream.hh:62
Definition: XrdTpcState.hh:16
std::vector< Entry * > m_buffers
Definition: XrdTpcStream.hh:142
off_t GetOffset() const
Definition: XrdTpcStream.hh:120
std::unique_ptr< XrdSfsFile > m_fh
Definition: XrdTpcStream.hh:140
#define stat(a, b)
Definition: XrdPosix.hh:96
bool m_open_for_write
Definition: XrdTpcStream.hh:138
size_t m_size
Definition: XrdTpcStream.hh:134
void Move(Entry &other)
Definition: XrdTpcStream.hh:114
std::vector< char > m_buffer
Definition: XrdTpcStream.hh:135