diff --git a/circle.go b/circle.go index 676fa7a..b27429c 100644 --- a/circle.go +++ b/circle.go @@ -20,6 +20,7 @@ type StarBuffer struct { pEnd uint64 cap uint64 isClose atomic.Value + isEnd atomic.Value rmu sync.Mutex wmu sync.Mutex } @@ -29,6 +30,7 @@ func NewStarBuffer(cap uint64) *StarBuffer { rtnBuffer.cap = cap rtnBuffer.datas = make([]byte, cap) rtnBuffer.isClose.Store(false) + rtnBuffer.isEnd.Store(false) return rtnBuffer } @@ -48,7 +50,7 @@ func (star *StarBuffer) Len() uint64 { } func (star *StarBuffer) getByte() (byte, error) { - if star.isClose.Load().(bool) { + if star.isClose.Load().(bool) || (star.Len() == 0 && star.isEnd.Load().(bool)) { return 0, io.EOF } if star.Len() == 0 { @@ -97,7 +99,7 @@ func (star *StarBuffer) Close() error { return nil } func (star *StarBuffer) Read(buf []byte) (int, error) { - if star.isClose.Load().(bool) { + if star.isClose.Load().(bool) || (star.Len() == 0 && star.isEnd.Load().(bool)) { return 0, io.EOF } if buf == nil { @@ -125,6 +127,10 @@ func (star *StarBuffer) Read(buf []byte) (int, error) { } func (star *StarBuffer) Write(bts []byte) (int, error) { + if bts == nil && !star.isEnd.Load().(bool) { + star.isEnd.Store(true) + return 0, nil + } if bts == nil || star.isClose.Load().(bool) { return 0, io.EOF } diff --git a/circle_test.go b/circle_test.go index 0a13c62..3b3bef9 100644 --- a/circle_test.go +++ b/circle_test.go @@ -49,7 +49,7 @@ func Test_Circle_Speed(t *testing.T) { for i := 1; i <= 10; i++ { go func() { for { - _,err:=buf.getByte() + _, err := buf.getByte() if err == nil { atomic.AddUint64(&count, 1) } @@ -59,3 +59,28 @@ func Test_Circle_Speed(t *testing.T) { time.Sleep(time.Second * 10) fmt.Println(count) } + +func Test_Circle_Speed2(t *testing.T) { + buf := NewStarBuffer(8192) + count := uint64(0) + for i := 1; i <= 10; i++ { + go func() { + for { + buf.Write([]byte("hello world b612 hello world b612 b612 b612 b612 b612 b612")) + } + }() + } + for i := 1; i <= 10; i++ { + go func() { + for { + mybuf := make([]byte, 1024) + j, err := buf.Read(mybuf) + if err == nil { + atomic.AddUint64(&count, uint64(j)) + } + } + }() + } + time.Sleep(time.Second * 10) + fmt.Println(float64(count) / 10 / 1024 / 1024) +}