This commit is contained in:
Bel LaPointe
2019-05-02 10:08:11 -06:00
commit 1b60da38c5
10 changed files with 1898 additions and 0 deletions

195
pipe/pipe.go Normal file
View File

@@ -0,0 +1,195 @@
package pipe
import "io"
type PipeReader struct {
last []byte
semaphore chan struct{}
hasWrite chan struct{}
closed bool
buffer []byte
lastWrite int
capacity int64
r io.ReadCloser
w io.WriteCloser
}
func New() *PipeReader {
r, w := io.Pipe()
return &PipeReader{
semaphore: make(chan struct{}, 1),
hasWrite: make(chan struct{}),
buffer: make([]byte, 64000),
capacity: 64000,
last: nil,
r: r,
w: w,
}
}
func (pr *PipeReader) Read(b []byte) (int, error) {
return pr.r.Read(b)
}
func (pr *PipeReader) Write(b []byte) (int, error) {
chunks := int(int64(len(b)) / pr.capacity)
if int64(len(b))%pr.capacity > 0 {
chunks += 1
}
ttl := 0
for i := 0; i < chunks-1; i++ {
n, err := pr.w.Write(b[ttl:pr.capacity])
if err != nil {
return ttl, err
}
ttl += n
}
n, err := pr.w.Write(b[ttl:])
return ttl + n, err
}
func (pr *PipeReader) Close() error {
return pr.w.Close()
}
func (pr *PipeReader) SetCap(n int64) {
pr.capacity = n
}
/*
type PipeReader struct {
last []byte
semaphore chan struct{}
hasWrite chan struct{}
closed bool
buffer []byte
lastWrite int
capacity int
}
func New() *PipeReader {
return &PipeReader{
semaphore: make(chan struct{}, 1),
hasWrite: make(chan struct{}),
buffer: make([]byte, 64000),
capacity: 64000,
last: nil,
}
}
func (pr *PipeReader) SetCap(n int) {
pr.capacity = n
pr.buffer = make([]byte, n)
}
func (pr *PipeReader) String() string {
return fmt.Sprintf("buf=%v, cap=%v, last=%v", len(pr.buffer), pr.capacity, len(pr.last))
}
func (pr *PipeReader) lock() {
pr.semaphore <- struct{}{}
}
func (pr *PipeReader) unlock() {
for _ = range pr.semaphore {
return
}
}
func (pr *PipeReader) blockUntilWrite() {
for _ = range pr.hasWrite {
return
}
}
func (pr *PipeReader) unblockOnWrite() {
pr.hasWrite <- struct{}{}
}
func (pr *PipeReader) readCurrentBuffer() []byte {
pr.blockUntilWrite()
pr.lock()
defer pr.unlock()
ret := pr.buffer[:pr.lastWrite]
pr.lastWrite = 0
return ret
}
func (pr *PipeReader) writeBuffer(b []byte) {
pr.lock()
defer pr.unblockOnWrite()
defer pr.unlock()
copy(pr.buffer[:len(b)], b)
pr.lastWrite = len(b)
}
func (pr *PipeReader) Read(b []byte) (int, error) {
logger.Log("READ", len(b), pr.isClosed())
defer logger.Log("/READ")
ttl := 0
m, n, hasMore := xfer(b, pr.last)
pr.last = m
ttl = n
if hasMore {
return ttl, nil
}
for {
m, n, hasMore := xfer(b[ttl:], pr.readCurrentBuffer())
logger.Log("XFER", m, n, hasMore, ttl, pr.isClosed())
pr.last = m
ttl += n
if hasMore {
return ttl, nil
}
if n == 0 && pr.isClosed() {
return ttl, io.EOF
}
}
return ttl, io.EOF
}
func (pr *PipeReader) Write(b []byte) (int, error) {
logger.Log("WRIT", len(b), pr.isClosed())
defer logger.Log("/WRIT")
chunklen := pr.capacity
wrote := 0
for wrote+chunklen < len(b) {
pr.writeBuffer(b[wrote : wrote+chunklen])
wrote += chunklen
}
pr.writeBuffer(b[wrote:])
return wrote + len(b[wrote:]), nil
}
func (pr *PipeReader) isClosed() bool {
pr.lock()
defer pr.unlock()
return pr.closed
}
func (pr *PipeReader) Close() error {
pr.lock()
pr.closed = true
close(pr.hasWrite)
pr.unlock()
return nil
}
func copyByteSlice(b []byte) []byte {
return append([]byte{}, b...)
}
func xfer(dst, src []byte) ([]byte, int, bool) {
min := len(dst)
if len(src) < min {
min = len(src)
}
copy(dst[:min], src[:min])
src = src[min:]
if len(src) > 0 {
return src, min, true
}
return src, min, false
}
*/

95
pipe/pipe_test.go Normal file
View File

@@ -0,0 +1,95 @@
package pipe
import (
"crypto/rand"
"io/ioutil"
"strings"
"testing"
)
func Test_PipeReader_Bench(t *testing.T) {
input := []byte(strings.Repeat("hello world", 1000))
pr := New()
pr.SetCap(1000)
writing := func() {
defer pr.Close()
if n, err := pr.Write(input); err != nil {
t.Errorf("failed to write: %v", err)
} else if n != len(input) {
t.Errorf("failed to write: wrote %v, expected %v", n, len(input))
}
}
reading := func() {
if bytes, err := ioutil.ReadAll(pr); err != nil {
t.Errorf("failed to read: %v", err)
} else if len(bytes) != len(input) {
t.Errorf("failed to read: read %v, expected %v", len(bytes), len(input))
}
}
go writing()
reading()
}
func Test_PipeReader(t *testing.T) {
cases := []struct {
streamLength int
bufferCap int
}{
{
streamLength: 10000000,
bufferCap: 50,
},
{
streamLength: 1000000,
bufferCap: 5000,
},
{
streamLength: 100,
bufferCap: 50,
},
{
streamLength: 50,
bufferCap: 100,
},
{
streamLength: 10,
bufferCap: 5,
},
{
streamLength: 5,
bufferCap: 10,
},
{
streamLength: 1,
bufferCap: 10,
},
{
streamLength: 0,
bufferCap: 10,
},
}
for _, c := range cases {
pr := New()
pr.SetCap(int64(c.bufferCap))
input := make([]byte, c.streamLength)
if _, err := rand.Read(input); err != nil {
t.Fatalf("cannot read random bytes: %v", err)
}
go func() {
defer pr.Close()
n, err := pr.Write(input)
if n != c.streamLength || err != nil {
t.Errorf("ERR: failed to write %v bytes to %v cap: %v", c.streamLength, c.bufferCap, err)
}
t.Logf("wrote %v bytes to pipereader", n)
}()
output, err := ioutil.ReadAll(pr)
if err != nil {
t.Errorf("ERR: failed to read %v bytes to %v cap: %v", c.streamLength, c.bufferCap, err)
} else if string(output) != string(input) {
t.Errorf("ERR: sl=%v, bc=%v, read wrong output from pipe: \nwanted %v, \n got %v", c.streamLength, c.bufferCap, len(input), len(output))
}
t.Logf("read %v bytes from pipereader", len(output))
}
}