196 lines
3.5 KiB
Go
196 lines
3.5 KiB
Go
package pipe
|
|
|
|
import "io"
|
|
|
|
type PipeReader struct {
|
|
last []byte
|
|
semaphore chan struct{}
|
|
hasWrite chan struct{}
|
|
closed bool
|
|
buffer []byte
|
|
lastWrite int
|
|
capacity int64
|
|
r io.ReadCloser
|
|
w io.WriteCloser
|
|
}
|
|
|
|
func New() *PipeReader {
|
|
r, w := io.Pipe()
|
|
return &PipeReader{
|
|
semaphore: make(chan struct{}, 1),
|
|
hasWrite: make(chan struct{}),
|
|
buffer: make([]byte, 64000),
|
|
capacity: 64000,
|
|
last: nil,
|
|
r: r,
|
|
w: w,
|
|
}
|
|
}
|
|
|
|
func (pr *PipeReader) Read(b []byte) (int, error) {
|
|
return pr.r.Read(b)
|
|
}
|
|
|
|
func (pr *PipeReader) Write(b []byte) (int, error) {
|
|
chunks := int(int64(len(b)) / pr.capacity)
|
|
if int64(len(b))%pr.capacity > 0 {
|
|
chunks += 1
|
|
}
|
|
ttl := 0
|
|
for i := 0; i < chunks-1; i++ {
|
|
n, err := pr.w.Write(b[ttl:pr.capacity])
|
|
if err != nil {
|
|
return ttl, err
|
|
}
|
|
ttl += n
|
|
}
|
|
n, err := pr.w.Write(b[ttl:])
|
|
return ttl + n, err
|
|
}
|
|
|
|
func (pr *PipeReader) Close() error {
|
|
return pr.w.Close()
|
|
}
|
|
|
|
func (pr *PipeReader) SetCap(n int64) {
|
|
pr.capacity = n
|
|
}
|
|
|
|
/*
|
|
|
|
type PipeReader struct {
|
|
last []byte
|
|
semaphore chan struct{}
|
|
hasWrite chan struct{}
|
|
closed bool
|
|
buffer []byte
|
|
lastWrite int
|
|
capacity int
|
|
}
|
|
|
|
func New() *PipeReader {
|
|
return &PipeReader{
|
|
semaphore: make(chan struct{}, 1),
|
|
hasWrite: make(chan struct{}),
|
|
buffer: make([]byte, 64000),
|
|
capacity: 64000,
|
|
last: nil,
|
|
}
|
|
}
|
|
|
|
func (pr *PipeReader) SetCap(n int) {
|
|
pr.capacity = n
|
|
pr.buffer = make([]byte, n)
|
|
}
|
|
|
|
func (pr *PipeReader) String() string {
|
|
return fmt.Sprintf("buf=%v, cap=%v, last=%v", len(pr.buffer), pr.capacity, len(pr.last))
|
|
}
|
|
|
|
func (pr *PipeReader) lock() {
|
|
pr.semaphore <- struct{}{}
|
|
}
|
|
|
|
func (pr *PipeReader) unlock() {
|
|
for _ = range pr.semaphore {
|
|
return
|
|
}
|
|
}
|
|
|
|
func (pr *PipeReader) blockUntilWrite() {
|
|
for _ = range pr.hasWrite {
|
|
return
|
|
}
|
|
}
|
|
|
|
func (pr *PipeReader) unblockOnWrite() {
|
|
pr.hasWrite <- struct{}{}
|
|
}
|
|
|
|
func (pr *PipeReader) readCurrentBuffer() []byte {
|
|
pr.blockUntilWrite()
|
|
pr.lock()
|
|
defer pr.unlock()
|
|
ret := pr.buffer[:pr.lastWrite]
|
|
pr.lastWrite = 0
|
|
return ret
|
|
}
|
|
|
|
func (pr *PipeReader) writeBuffer(b []byte) {
|
|
pr.lock()
|
|
defer pr.unblockOnWrite()
|
|
defer pr.unlock()
|
|
copy(pr.buffer[:len(b)], b)
|
|
pr.lastWrite = len(b)
|
|
}
|
|
|
|
func (pr *PipeReader) Read(b []byte) (int, error) {
|
|
logger.Log("READ", len(b), pr.isClosed())
|
|
defer logger.Log("/READ")
|
|
ttl := 0
|
|
m, n, hasMore := xfer(b, pr.last)
|
|
pr.last = m
|
|
ttl = n
|
|
if hasMore {
|
|
return ttl, nil
|
|
}
|
|
for {
|
|
m, n, hasMore := xfer(b[ttl:], pr.readCurrentBuffer())
|
|
logger.Log("XFER", m, n, hasMore, ttl, pr.isClosed())
|
|
pr.last = m
|
|
ttl += n
|
|
if hasMore {
|
|
return ttl, nil
|
|
}
|
|
if n == 0 && pr.isClosed() {
|
|
return ttl, io.EOF
|
|
}
|
|
}
|
|
return ttl, io.EOF
|
|
}
|
|
|
|
func (pr *PipeReader) Write(b []byte) (int, error) {
|
|
logger.Log("WRIT", len(b), pr.isClosed())
|
|
defer logger.Log("/WRIT")
|
|
chunklen := pr.capacity
|
|
wrote := 0
|
|
for wrote+chunklen < len(b) {
|
|
pr.writeBuffer(b[wrote : wrote+chunklen])
|
|
wrote += chunklen
|
|
}
|
|
pr.writeBuffer(b[wrote:])
|
|
return wrote + len(b[wrote:]), nil
|
|
}
|
|
|
|
func (pr *PipeReader) isClosed() bool {
|
|
pr.lock()
|
|
defer pr.unlock()
|
|
return pr.closed
|
|
}
|
|
|
|
func (pr *PipeReader) Close() error {
|
|
pr.lock()
|
|
pr.closed = true
|
|
close(pr.hasWrite)
|
|
pr.unlock()
|
|
return nil
|
|
}
|
|
|
|
func copyByteSlice(b []byte) []byte {
|
|
return append([]byte{}, b...)
|
|
}
|
|
|
|
func xfer(dst, src []byte) ([]byte, int, bool) {
|
|
min := len(dst)
|
|
if len(src) < min {
|
|
min = len(src)
|
|
}
|
|
copy(dst[:min], src[:min])
|
|
src = src[min:]
|
|
if len(src) > 0 {
|
|
return src, min, true
|
|
}
|
|
return src, min, false
|
|
}
|
|
*/
|