Encoder/pipe/pipe.go

196 lines
3.5 KiB
Go

package pipe
import "io"
type PipeReader struct {
last []byte
semaphore chan struct{}
hasWrite chan struct{}
closed bool
buffer []byte
lastWrite int
capacity int64
r io.ReadCloser
w io.WriteCloser
}
func New() *PipeReader {
r, w := io.Pipe()
return &PipeReader{
semaphore: make(chan struct{}, 1),
hasWrite: make(chan struct{}),
buffer: make([]byte, 64000),
capacity: 64000,
last: nil,
r: r,
w: w,
}
}
func (pr *PipeReader) Read(b []byte) (int, error) {
return pr.r.Read(b)
}
func (pr *PipeReader) Write(b []byte) (int, error) {
chunks := int(int64(len(b)) / pr.capacity)
if int64(len(b))%pr.capacity > 0 {
chunks += 1
}
ttl := 0
for i := 0; i < chunks-1; i++ {
n, err := pr.w.Write(b[ttl:pr.capacity])
if err != nil {
return ttl, err
}
ttl += n
}
n, err := pr.w.Write(b[ttl:])
return ttl + n, err
}
func (pr *PipeReader) Close() error {
return pr.w.Close()
}
func (pr *PipeReader) SetCap(n int64) {
pr.capacity = n
}
/*
type PipeReader struct {
last []byte
semaphore chan struct{}
hasWrite chan struct{}
closed bool
buffer []byte
lastWrite int
capacity int
}
func New() *PipeReader {
return &PipeReader{
semaphore: make(chan struct{}, 1),
hasWrite: make(chan struct{}),
buffer: make([]byte, 64000),
capacity: 64000,
last: nil,
}
}
func (pr *PipeReader) SetCap(n int) {
pr.capacity = n
pr.buffer = make([]byte, n)
}
func (pr *PipeReader) String() string {
return fmt.Sprintf("buf=%v, cap=%v, last=%v", len(pr.buffer), pr.capacity, len(pr.last))
}
func (pr *PipeReader) lock() {
pr.semaphore <- struct{}{}
}
func (pr *PipeReader) unlock() {
for _ = range pr.semaphore {
return
}
}
func (pr *PipeReader) blockUntilWrite() {
for _ = range pr.hasWrite {
return
}
}
func (pr *PipeReader) unblockOnWrite() {
pr.hasWrite <- struct{}{}
}
func (pr *PipeReader) readCurrentBuffer() []byte {
pr.blockUntilWrite()
pr.lock()
defer pr.unlock()
ret := pr.buffer[:pr.lastWrite]
pr.lastWrite = 0
return ret
}
func (pr *PipeReader) writeBuffer(b []byte) {
pr.lock()
defer pr.unblockOnWrite()
defer pr.unlock()
copy(pr.buffer[:len(b)], b)
pr.lastWrite = len(b)
}
func (pr *PipeReader) Read(b []byte) (int, error) {
logger.Log("READ", len(b), pr.isClosed())
defer logger.Log("/READ")
ttl := 0
m, n, hasMore := xfer(b, pr.last)
pr.last = m
ttl = n
if hasMore {
return ttl, nil
}
for {
m, n, hasMore := xfer(b[ttl:], pr.readCurrentBuffer())
logger.Log("XFER", m, n, hasMore, ttl, pr.isClosed())
pr.last = m
ttl += n
if hasMore {
return ttl, nil
}
if n == 0 && pr.isClosed() {
return ttl, io.EOF
}
}
return ttl, io.EOF
}
func (pr *PipeReader) Write(b []byte) (int, error) {
logger.Log("WRIT", len(b), pr.isClosed())
defer logger.Log("/WRIT")
chunklen := pr.capacity
wrote := 0
for wrote+chunklen < len(b) {
pr.writeBuffer(b[wrote : wrote+chunklen])
wrote += chunklen
}
pr.writeBuffer(b[wrote:])
return wrote + len(b[wrote:]), nil
}
func (pr *PipeReader) isClosed() bool {
pr.lock()
defer pr.unlock()
return pr.closed
}
func (pr *PipeReader) Close() error {
pr.lock()
pr.closed = true
close(pr.hasWrite)
pr.unlock()
return nil
}
func copyByteSlice(b []byte) []byte {
return append([]byte{}, b...)
}
func xfer(dst, src []byte) ([]byte, int, bool) {
min := len(dst)
if len(src) < min {
min = len(src)
}
copy(dst[:min], src[:min])
src = src[min:]
if len(src) > 0 {
return src, min, true
}
return src, min, false
}
*/