From 14203b8fa36afd2ba704e3e283adce022d5758c7 Mon Sep 17 00:00:00 2001 From: starainrt Date: Fri, 12 Nov 2021 16:01:35 +0800 Subject: [PATCH] more feature add --- circle.go | 73 ++++++++++++++++++++++++++++++-------------------- circle_test.go | 61 +++++++++++++++++++++++++++++++++++++++++ fn.go | 6 ++--- sync.go | 55 +++++++++++++++++++++++++++++++++++++ 4 files changed, 163 insertions(+), 32 deletions(-) create mode 100644 circle_test.go create mode 100644 sync.go diff --git a/circle.go b/circle.go index 22caa0c..676fa7a 100644 --- a/circle.go +++ b/circle.go @@ -4,7 +4,10 @@ import ( "errors" "fmt" "io" + "os" + "runtime" "sync" + "sync/atomic" "time" ) @@ -13,79 +16,88 @@ type StarBuffer struct { io.Writer io.Closer datas []byte - pStart int - pEnd int - cap int - isClose bool - isEnd bool + pStart uint64 + pEnd uint64 + cap uint64 + isClose atomic.Value rmu sync.Mutex wmu sync.Mutex } -func NewStarBuffer(cap int) *StarBuffer { +func NewStarBuffer(cap uint64) *StarBuffer { rtnBuffer := new(StarBuffer) rtnBuffer.cap = cap rtnBuffer.datas = make([]byte, cap) + rtnBuffer.isClose.Store(false) return rtnBuffer } -func (star *StarBuffer) Free() int { +func (star *StarBuffer) Free() uint64 { return star.cap - star.Len() } -func (star *StarBuffer) Cap() int { +func (star *StarBuffer) Cap() uint64 { return star.cap } -func (star *StarBuffer) Len() int { - length := star.pEnd - star.pStart - if length < 0 { - return star.cap + length - 1 +func (star *StarBuffer) Len() uint64 { + if star.pEnd >= star.pStart { + return star.pEnd - star.pStart } - return length + return star.pEnd - star.pStart + star.cap } func (star *StarBuffer) getByte() (byte, error) { - if star.isClose || (star.isEnd && star.Len() == 0) { + if star.isClose.Load().(bool) { return 0, io.EOF } if star.Len() == 0 { - return 0, errors.New("no byte available now") + return 0, os.ErrNotExist } - data := star.datas[star.pStart] - star.pStart++ - if star.pStart == star.cap { - star.pStart = 0 + nowPtr := star.pStart + nextPtr := star.pStart + 1 + if nextPtr >= star.cap { + nextPtr = 0 + } + data := star.datas[nowPtr] + ok := atomic.CompareAndSwapUint64(&star.pStart, nowPtr, nextPtr) + if !ok { + return 0, os.ErrInvalid } return data, nil } func (star *StarBuffer) putByte(data byte) error { - if star.isClose || star.isEnd { + if star.isClose.Load().(bool) { return io.EOF } - kariEnd := star.pEnd + 1 + nowPtr := star.pEnd + kariEnd := nowPtr + 1 if kariEnd == star.cap { kariEnd = 0 } - if kariEnd == star.pStart { + if kariEnd == atomic.LoadUint64(&star.pStart) { for { time.Sleep(time.Microsecond) - if kariEnd != star.pStart { + runtime.Gosched() + if kariEnd != atomic.LoadUint64(&star.pStart) { break } } } - star.datas[star.pEnd] = data - star.pEnd = kariEnd + star.datas[nowPtr] = data + if ok := atomic.CompareAndSwapUint64(&star.pEnd, nowPtr, kariEnd); !ok { + return os.ErrInvalid + } return nil } + func (star *StarBuffer) Close() error { - star.isClose = true + star.isClose.Store(true) return nil } func (star *StarBuffer) Read(buf []byte) (int, error) { - if star.isClose { + if star.isClose.Load().(bool) { return 0, io.EOF } if buf == nil { @@ -100,6 +112,10 @@ func (star *StarBuffer) Read(buf []byte) (int, error) { if err == io.EOF { return sum, err } + if err == os.ErrNotExist { + i-- + continue + } return sum, nil } buf[i] = data @@ -109,8 +125,7 @@ func (star *StarBuffer) Read(buf []byte) (int, error) { } func (star *StarBuffer) Write(bts []byte) (int, error) { - if bts == nil || star.isClose { - star.isEnd = true + if bts == nil || star.isClose.Load().(bool) { return 0, io.EOF } star.wmu.Lock() diff --git a/circle_test.go b/circle_test.go new file mode 100644 index 0000000..0a13c62 --- /dev/null +++ b/circle_test.go @@ -0,0 +1,61 @@ +package stario + +import ( + "fmt" + "sync/atomic" + "testing" + "time" +) + +func Test_Circle(t *testing.T) { + buf := NewStarBuffer(2048) + go func() { + for { + //fmt.Println("write start") + buf.Write([]byte("中华人民共和国\n")) + //fmt.Println("write success") + time.Sleep(time.Millisecond * 50) + } + }() + cpp := "" + go func() { + time.Sleep(time.Second * 3) + for { + cache := make([]byte, 64) + ints, err := buf.Read(cache) + if err != nil { + fmt.Println("read error", err) + return + } + if ints != 0 { + cpp += string(cache[:ints]) + } + } + }() + time.Sleep(time.Second * 13) + fmt.Println(cpp) +} + +func Test_Circle_Speed(t *testing.T) { + buf := NewStarBuffer(1048976) + count := uint64(0) + for i := 1; i <= 10; i++ { + go func() { + for { + buf.putByte('a') + } + }() + } + for i := 1; i <= 10; i++ { + go func() { + for { + _,err:=buf.getByte() + if err == nil { + atomic.AddUint64(&count, 1) + } + } + }() + } + time.Sleep(time.Second * 10) + fmt.Println(count) +} diff --git a/fn.go b/fn.go index 9d7e309..b82370b 100644 --- a/fn.go +++ b/fn.go @@ -7,7 +7,7 @@ import ( var ERR_TIMEOUT = errors.New("TIME OUT") -func StopUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error { +func WaitUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error { var err error finished := make(chan struct{}) imout := make(chan struct{}) @@ -24,7 +24,7 @@ func StopUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error { } } -func StopUntilFinished(fn func() error) <-chan error { +func WaitUntilFinished(fn func() error) <-chan error { finished := make(chan error) go func() { err := fn() @@ -33,7 +33,7 @@ func StopUntilFinished(fn func() error) <-chan error { return finished } -func StopUntilTimeoutFinished(tm time.Duration, fn func(chan struct{}) error) <-chan error { +func WaitUntilTimeoutFinished(tm time.Duration, fn func(chan struct{}) error) <-chan error { var err error finished := make(chan struct{}) result := make(chan error) diff --git a/sync.go b/sync.go new file mode 100644 index 0000000..bbca37a --- /dev/null +++ b/sync.go @@ -0,0 +1,55 @@ +package stario + +import ( + "sync" + "sync/atomic" + "time" +) + +type WaitGroup struct { + wg *sync.WaitGroup + maxCount uint32 + allCount uint32 +} + +func NewWaitGroup(maxCount int) WaitGroup { + return WaitGroup{wg: &sync.WaitGroup{}, maxCount: uint32(maxCount)} +} + +func (swg *WaitGroup) Add(delta int) { + var Udelta uint32 + if delta < 0 { + Udelta = uint32(-delta - 1) + } else { + Udelta = uint32(delta) + } + for { + allC := atomic.LoadUint32(&swg.allCount) + if atomic.LoadUint32(&swg.maxCount) == 0 || atomic.LoadUint32(&swg.maxCount) >= allC+uint32(delta) { + if delta < 0 { + atomic.AddUint32(&swg.allCount, ^uint32(Udelta)) + } else { + atomic.AddUint32(&swg.allCount, uint32(Udelta)) + } + break + } + time.Sleep(time.Microsecond) + } + swg.wg.Add(delta) +} + +func (swg *WaitGroup) Done() { + swg.Add(-1) +} + +func (swg *WaitGroup) Wait() { + swg.wg.Wait() +} + +func (swg *WaitGroup) GetMaxWaitNum() int { + return int(atomic.LoadUint32(&swg.maxCount)) +} + +func (swg *WaitGroup) SetMaxWaitNum(num int) { + atomic.AddUint32(&swg.maxCount, uint32(num)) +}