libdl  0.0.1
Simple yet powerful deep learning
Loading...
Searching...
No Matches
pipe.hpp
1#ifndef DL_UTILS_PIPE_HPP
2#define DL_UTILS_PIPE_HPP
3
4#include <assert.h>
5#include <barrier>
6#include <streambuf>
7#include <vector>
8
9namespace dl::utils {
10 template <class CharT, class Traits = std::char_traits<CharT>, typename Allocator = std::allocator<CharT>>
11 class basic_pipebuf : public std::basic_streambuf<CharT, Traits> {
12 private:
13 using int_type = typename Traits::int_type;
14 bool readerClosed = false;
15 bool writerClosed = false;
16 std::size_t numLastWritten = 0;
18 std::barrier<> barrier{2};
21
22 public:
23 basic_pipebuf(const basic_pipebuf& other) = default;
24 basic_pipebuf(basic_pipebuf&& other) = delete;
25 basic_pipebuf(size_t bufsize = 1024) : readbuf(bufsize), writebuf(bufsize) {
26 // Read Pointers are all at the end of the buffer
27 this->setg(readbuf.data(), readbuf.data() + readbuf.size(), readbuf.data() + readbuf.size());
28 // Set write buffer to whole buffer
29 this->setp(writebuf.data(), writebuf.data() + writebuf.size());
30 }
32 if (basic_pipebuf::is_open())
33 basic_pipebuf::overflow(Traits::eof());
34 }
35
36 bool is_open() const noexcept { return !writerClosed; }
37
39 if (!is_open())
40 return nullptr;
41 overflow(Traits::eof()); // Will close the writer
42 return this;
43 }
44
45 int_type underflow() override {
46 if (readerClosed)
47 return Traits::eof();
48 barrier.arrive_and_wait();
49 // Swap buffers
50 std::swap(readbuf, writebuf);
51 writerClosed = readerClosed = readerClosed || writerClosed;
52 numLastWritten = this->pptr() - this->pbase();
53 assert(numLastWritten <= readbuf.size());
54 barrier.arrive_and_wait();
55 if (readerClosed && numLastWritten == 0) // Reader was freshly closed and no new data was given
56 return Traits::eof();
57 // Reset Pointer
58 this->setg(readbuf.data(), readbuf.data(), readbuf.data() + numLastWritten);
59 return Traits::to_int_type(*this->gptr());
60 }
61
62 int_type overflow(int_type c = Traits::eof()) override {
63 if (c == Traits::eof())
64 writerClosed = true;
65 barrier.arrive_and_wait();
66 // Reader will switch buffers and update *Closed flags
67 barrier.arrive_and_wait();
68 if (writerClosed)
69 return (c == Traits::eof()) ? 1 : Traits::eof();
70 // Reset Pointers
71 this->setp(writebuf.data(), writebuf.data() + writebuf.size());
72 *this->pptr() = c;
73 this->pbump(1);
74 return c;
75 }
76 };
77
80
81 template <class CharT, class Traits = std::char_traits<CharT>, typename Allocator = std::allocator<CharT>>
82 class basic_pipestream : public std::basic_iostream<CharT, Traits> {
83 private:
85
86 public:
89 }
91 /*basic_pipestream(basic_pipestream<CharT, Traits, Allocator>&& other)
92 : std::basic_iostream<CharT, Traits>(nullptr), buf(std::move(other.buf)) {
93 std::basic_iostream<CharT, Traits>::rdbuf(&buf);
94 }*/
95
96 bool is_open() const noexcept { return buf.is_open(); }
97 void close() {
98 if (buf.close() == nullptr)
99 this->setstate(std::ios_base::failbit);
100 }
101 };
102
105} // namespace dl::utils
106
107#endif
T data(T... args)
T pbump(T... args)
T rdbuf(T... args)
T setstate(T... args)
T size(T... args)
T swap(T... args)