more feature add

master v0.0.4
兔子 3 years ago
parent 95e2899e8a
commit 14203b8fa3
Signed by: b612
GPG Key ID: 481225A74DEB62A1

@ -4,7 +4,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"os"
"runtime"
"sync" "sync"
"sync/atomic"
"time" "time"
) )
@ -13,79 +16,88 @@ type StarBuffer struct {
io.Writer io.Writer
io.Closer io.Closer
datas []byte datas []byte
pStart int pStart uint64
pEnd int pEnd uint64
cap int cap uint64
isClose bool isClose atomic.Value
isEnd bool
rmu sync.Mutex rmu sync.Mutex
wmu sync.Mutex wmu sync.Mutex
} }
func NewStarBuffer(cap int) *StarBuffer { func NewStarBuffer(cap uint64) *StarBuffer {
rtnBuffer := new(StarBuffer) rtnBuffer := new(StarBuffer)
rtnBuffer.cap = cap rtnBuffer.cap = cap
rtnBuffer.datas = make([]byte, cap) rtnBuffer.datas = make([]byte, cap)
rtnBuffer.isClose.Store(false)
return rtnBuffer return rtnBuffer
} }
func (star *StarBuffer) Free() int { func (star *StarBuffer) Free() uint64 {
return star.cap - star.Len() return star.cap - star.Len()
} }
func (star *StarBuffer) Cap() int { func (star *StarBuffer) Cap() uint64 {
return star.cap return star.cap
} }
func (star *StarBuffer) Len() int { func (star *StarBuffer) Len() uint64 {
length := star.pEnd - star.pStart if star.pEnd >= star.pStart {
if length < 0 { return star.pEnd - star.pStart
return star.cap + length - 1
} }
return length return star.pEnd - star.pStart + star.cap
} }
func (star *StarBuffer) getByte() (byte, error) { func (star *StarBuffer) getByte() (byte, error) {
if star.isClose || (star.isEnd && star.Len() == 0) { if star.isClose.Load().(bool) {
return 0, io.EOF return 0, io.EOF
} }
if star.Len() == 0 { if star.Len() == 0 {
return 0, errors.New("no byte available now") return 0, os.ErrNotExist
} }
data := star.datas[star.pStart] nowPtr := star.pStart
star.pStart++ nextPtr := star.pStart + 1
if star.pStart == star.cap { if nextPtr >= star.cap {
star.pStart = 0 nextPtr = 0
}
data := star.datas[nowPtr]
ok := atomic.CompareAndSwapUint64(&star.pStart, nowPtr, nextPtr)
if !ok {
return 0, os.ErrInvalid
} }
return data, nil return data, nil
} }
func (star *StarBuffer) putByte(data byte) error { func (star *StarBuffer) putByte(data byte) error {
if star.isClose || star.isEnd { if star.isClose.Load().(bool) {
return io.EOF return io.EOF
} }
kariEnd := star.pEnd + 1 nowPtr := star.pEnd
kariEnd := nowPtr + 1
if kariEnd == star.cap { if kariEnd == star.cap {
kariEnd = 0 kariEnd = 0
} }
if kariEnd == star.pStart { if kariEnd == atomic.LoadUint64(&star.pStart) {
for { for {
time.Sleep(time.Microsecond) time.Sleep(time.Microsecond)
if kariEnd != star.pStart { runtime.Gosched()
if kariEnd != atomic.LoadUint64(&star.pStart) {
break break
} }
} }
} }
star.datas[star.pEnd] = data star.datas[nowPtr] = data
star.pEnd = kariEnd if ok := atomic.CompareAndSwapUint64(&star.pEnd, nowPtr, kariEnd); !ok {
return os.ErrInvalid
}
return nil return nil
} }
func (star *StarBuffer) Close() error { func (star *StarBuffer) Close() error {
star.isClose = true star.isClose.Store(true)
return nil return nil
} }
func (star *StarBuffer) Read(buf []byte) (int, error) { func (star *StarBuffer) Read(buf []byte) (int, error) {
if star.isClose { if star.isClose.Load().(bool) {
return 0, io.EOF return 0, io.EOF
} }
if buf == nil { if buf == nil {
@ -100,6 +112,10 @@ func (star *StarBuffer) Read(buf []byte) (int, error) {
if err == io.EOF { if err == io.EOF {
return sum, err return sum, err
} }
if err == os.ErrNotExist {
i--
continue
}
return sum, nil return sum, nil
} }
buf[i] = data buf[i] = data
@ -109,8 +125,7 @@ func (star *StarBuffer) Read(buf []byte) (int, error) {
} }
func (star *StarBuffer) Write(bts []byte) (int, error) { func (star *StarBuffer) Write(bts []byte) (int, error) {
if bts == nil || star.isClose { if bts == nil || star.isClose.Load().(bool) {
star.isEnd = true
return 0, io.EOF return 0, io.EOF
} }
star.wmu.Lock() star.wmu.Lock()

@ -0,0 +1,61 @@
package stario
import (
"fmt"
"sync/atomic"
"testing"
"time"
)
func Test_Circle(t *testing.T) {
buf := NewStarBuffer(2048)
go func() {
for {
//fmt.Println("write start")
buf.Write([]byte("中华人民共和国\n"))
//fmt.Println("write success")
time.Sleep(time.Millisecond * 50)
}
}()
cpp := ""
go func() {
time.Sleep(time.Second * 3)
for {
cache := make([]byte, 64)
ints, err := buf.Read(cache)
if err != nil {
fmt.Println("read error", err)
return
}
if ints != 0 {
cpp += string(cache[:ints])
}
}
}()
time.Sleep(time.Second * 13)
fmt.Println(cpp)
}
func Test_Circle_Speed(t *testing.T) {
buf := NewStarBuffer(1048976)
count := uint64(0)
for i := 1; i <= 10; i++ {
go func() {
for {
buf.putByte('a')
}
}()
}
for i := 1; i <= 10; i++ {
go func() {
for {
_,err:=buf.getByte()
if err == nil {
atomic.AddUint64(&count, 1)
}
}
}()
}
time.Sleep(time.Second * 10)
fmt.Println(count)
}

@ -7,7 +7,7 @@ import (
var ERR_TIMEOUT = errors.New("TIME OUT") var ERR_TIMEOUT = errors.New("TIME OUT")
func StopUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error { func WaitUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error {
var err error var err error
finished := make(chan struct{}) finished := make(chan struct{})
imout := make(chan struct{}) imout := make(chan struct{})
@ -24,7 +24,7 @@ func StopUntilTimeout(tm time.Duration, fn func(chan struct{}) error) error {
} }
} }
func StopUntilFinished(fn func() error) <-chan error { func WaitUntilFinished(fn func() error) <-chan error {
finished := make(chan error) finished := make(chan error)
go func() { go func() {
err := fn() err := fn()
@ -33,7 +33,7 @@ func StopUntilFinished(fn func() error) <-chan error {
return finished return finished
} }
func StopUntilTimeoutFinished(tm time.Duration, fn func(chan struct{}) error) <-chan error { func WaitUntilTimeoutFinished(tm time.Duration, fn func(chan struct{}) error) <-chan error {
var err error var err error
finished := make(chan struct{}) finished := make(chan struct{})
result := make(chan error) result := make(chan error)

@ -0,0 +1,55 @@
package stario
import (
"sync"
"sync/atomic"
"time"
)
type WaitGroup struct {
wg *sync.WaitGroup
maxCount uint32
allCount uint32
}
func NewWaitGroup(maxCount int) WaitGroup {
return WaitGroup{wg: &sync.WaitGroup{}, maxCount: uint32(maxCount)}
}
func (swg *WaitGroup) Add(delta int) {
var Udelta uint32
if delta < 0 {
Udelta = uint32(-delta - 1)
} else {
Udelta = uint32(delta)
}
for {
allC := atomic.LoadUint32(&swg.allCount)
if atomic.LoadUint32(&swg.maxCount) == 0 || atomic.LoadUint32(&swg.maxCount) >= allC+uint32(delta) {
if delta < 0 {
atomic.AddUint32(&swg.allCount, ^uint32(Udelta))
} else {
atomic.AddUint32(&swg.allCount, uint32(Udelta))
}
break
}
time.Sleep(time.Microsecond)
}
swg.wg.Add(delta)
}
func (swg *WaitGroup) Done() {
swg.Add(-1)
}
func (swg *WaitGroup) Wait() {
swg.wg.Wait()
}
func (swg *WaitGroup) GetMaxWaitNum() int {
return int(atomic.LoadUint32(&swg.maxCount))
}
func (swg *WaitGroup) SetMaxWaitNum(num int) {
atomic.AddUint32(&swg.maxCount, uint32(num))
}
Loading…
Cancel
Save