package pipe import "io" type PipeReader struct { last []byte semaphore chan struct{} hasWrite chan struct{} closed bool buffer []byte lastWrite int capacity int64 r io.ReadCloser w io.WriteCloser } func New() *PipeReader { r, w := io.Pipe() return &PipeReader{ semaphore: make(chan struct{}, 1), hasWrite: make(chan struct{}), buffer: make([]byte, 64000), capacity: 64000, last: nil, r: r, w: w, } } func (pr *PipeReader) Read(b []byte) (int, error) { return pr.r.Read(b) } func (pr *PipeReader) Write(b []byte) (int, error) { chunks := int(int64(len(b)) / pr.capacity) if int64(len(b))%pr.capacity > 0 { chunks += 1 } ttl := 0 for i := 0; i < chunks-1; i++ { n, err := pr.w.Write(b[ttl:pr.capacity]) if err != nil { return ttl, err } ttl += n } n, err := pr.w.Write(b[ttl:]) return ttl + n, err } func (pr *PipeReader) Close() error { return pr.w.Close() } func (pr *PipeReader) SetCap(n int64) { pr.capacity = n } /* type PipeReader struct { last []byte semaphore chan struct{} hasWrite chan struct{} closed bool buffer []byte lastWrite int capacity int } func New() *PipeReader { return &PipeReader{ semaphore: make(chan struct{}, 1), hasWrite: make(chan struct{}), buffer: make([]byte, 64000), capacity: 64000, last: nil, } } func (pr *PipeReader) SetCap(n int) { pr.capacity = n pr.buffer = make([]byte, n) } func (pr *PipeReader) String() string { return fmt.Sprintf("buf=%v, cap=%v, last=%v", len(pr.buffer), pr.capacity, len(pr.last)) } func (pr *PipeReader) lock() { pr.semaphore <- struct{}{} } func (pr *PipeReader) unlock() { for _ = range pr.semaphore { return } } func (pr *PipeReader) blockUntilWrite() { for _ = range pr.hasWrite { return } } func (pr *PipeReader) unblockOnWrite() { pr.hasWrite <- struct{}{} } func (pr *PipeReader) readCurrentBuffer() []byte { pr.blockUntilWrite() pr.lock() defer pr.unlock() ret := pr.buffer[:pr.lastWrite] pr.lastWrite = 0 return ret } func (pr *PipeReader) writeBuffer(b []byte) { pr.lock() defer pr.unblockOnWrite() defer pr.unlock() copy(pr.buffer[:len(b)], b) pr.lastWrite = len(b) } func (pr *PipeReader) Read(b []byte) (int, error) { logger.Log("READ", len(b), pr.isClosed()) defer logger.Log("/READ") ttl := 0 m, n, hasMore := xfer(b, pr.last) pr.last = m ttl = n if hasMore { return ttl, nil } for { m, n, hasMore := xfer(b[ttl:], pr.readCurrentBuffer()) logger.Log("XFER", m, n, hasMore, ttl, pr.isClosed()) pr.last = m ttl += n if hasMore { return ttl, nil } if n == 0 && pr.isClosed() { return ttl, io.EOF } } return ttl, io.EOF } func (pr *PipeReader) Write(b []byte) (int, error) { logger.Log("WRIT", len(b), pr.isClosed()) defer logger.Log("/WRIT") chunklen := pr.capacity wrote := 0 for wrote+chunklen < len(b) { pr.writeBuffer(b[wrote : wrote+chunklen]) wrote += chunklen } pr.writeBuffer(b[wrote:]) return wrote + len(b[wrote:]), nil } func (pr *PipeReader) isClosed() bool { pr.lock() defer pr.unlock() return pr.closed } func (pr *PipeReader) Close() error { pr.lock() pr.closed = true close(pr.hasWrite) pr.unlock() return nil } func copyByteSlice(b []byte) []byte { return append([]byte{}, b...) } func xfer(dst, src []byte) ([]byte, int, bool) { min := len(dst) if len(src) < min { min = len(src) } copy(dst[:min], src[:min]) src = src[min:] if len(src) > 0 { return src, min, true } return src, min, false } */