diff --git a/go.mod b/go.mod index 5b8f277..bc4a1f0 100644 --- a/go.mod +++ b/go.mod @@ -2,4 +2,4 @@ module b612.me/starmap go 1.16 -require b612.me/notify v1.2.3 +require b612.me/notify v1.2.4 diff --git a/go.sum b/go.sum index e9cd58a..ba00fb0 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,11 @@ -b612.me/notify v1.2.3 h1:4w5zDakubC8jIPg4JT8DANczSMxRhwEFjM/2A0dvuvY= -b612.me/notify v1.2.3/go.mod h1:VV29yq3KXTKJKSSjr5yS9bD2c2MGzuypYalfL9yGQ6M= +b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= +b612.me/notify v1.2.4/go.mod h1:SlCrG1kPRVhYUrIkwY/j0zAwCU4VeTHubcZoQXW8Anw= b612.me/starcrypto v0.0.2 h1:aRf1HcqK8GqHYxLAhWfFC4W/EqQLEFNEmxsBu3wG30o= b612.me/starcrypto v0.0.2/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo= -b612.me/stario v0.0.7 h1:QbQcsHCVLE6vRgVrPN4+9DGiSaC6IWdtm4ClL2tpMUg= -b612.me/stario v0.0.7/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw= -b612.me/starnet v0.1.6 h1:/QaaKpuXfvJm6ayvk85jaLaKBmO1zx+XSxfnlSB3xGw= -b612.me/starnet v0.1.6/go.mod h1:JjFLTMPsWsPei7AiXwBTt4QCoB9gux1XS7SHv/Ux+D4= +b612.me/stario v0.0.8 h1:kaA4pszAKLZJm2D9JmiuYSpgjTeE3VaO74vm+H0vBGM= +b612.me/stario v0.0.8/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw= +b612.me/starnet v0.1.7 h1:k3CUfYNRolC/xw5Ekus2NVWHlqeykSyAH8USGTPKA5o= +b612.me/starnet v0.1.7/go.mod h1:DNC4i/ezgVLlmxnquf8AeljsL4mQ5vAyxh8vGPQqsys= golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000 h1:SL+8VVnkqyshUSz5iNnXtrBQzvFF2SkROm6t5RczFAE= golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= diff --git a/remote_define.go b/remote_define.go index 2d84ff4..0a0798e 100644 --- a/remote_define.go +++ b/remote_define.go @@ -9,8 +9,8 @@ import ( ) func init() { - notify.Register(kvMsg{}) - notify.Register(starMapErr{}) + notify.RegisterName("b612.me/starmap/kvmsg", kvMsg{}) + notify.RegisterName("b612.me/starmap/error", starMapErr{}) } type starMapErr struct { diff --git a/stack.go b/stack.go index 56dcc40..c8caa07 100644 --- a/stack.go +++ b/stack.go @@ -197,3 +197,86 @@ func (star *StarStack) Write(bts []byte) (int, error) { } return sum, nil } + +type StarChanStack struct { + data chan interface{} + cap uint64 + current uint64 + isClose atomic.Value +} + +func NewStarChanStack(cap uint64) *StarChanStack { + rtnBuffer := new(StarChanStack) + rtnBuffer.cap = cap + rtnBuffer.isClose.Store(false) + rtnBuffer.data = make(chan interface{}, cap) + return rtnBuffer +} + +func (star *StarChanStack) init() { + star.cap = 1024 + star.data = make(chan interface{}, star.cap) + star.isClose.Store(false) +} + +func (star *StarChanStack) Free() uint64 { + return star.cap - star.current +} + +func (star *StarChanStack) Cap() uint64 { + return star.cap +} + +func (star *StarChanStack) Len() uint64 { + return star.current +} + +func (star *StarChanStack) Pop() (interface{}, error) { + if star.isClose.Load() == nil { + star.init() + } + if star.isClose.Load().(bool) { + return 0, io.EOF + } + data, ok := <-star.data + if !ok { + star.isClose.Store(true) + return 0, errors.New("channel read error") + } + for { + current := atomic.LoadUint64(&star.current) + if atomic.CompareAndSwapUint64(&star.current, current, current-1) { + break + } + } + return data, nil +} + +func (star *StarChanStack) Push(data interface{}) error { + defer func() { + recover() + }() + if star.isClose.Load() == nil { + star.init() + } + if star.isClose.Load().(bool) { + return io.EOF + } + star.data <- data + for { + current := atomic.LoadUint64(&star.current) + if atomic.CompareAndSwapUint64(&star.current, current, current+1) { + break + } + } + return nil +} + +func (star *StarChanStack) Close() error { + if star.isClose.Load() == nil { + star.init() + } + star.isClose.Store(true) + close(star.data) + return nil +} diff --git a/stack_test.go b/stack_test.go index 192825b..9a46af1 100644 --- a/stack_test.go +++ b/stack_test.go @@ -31,3 +31,36 @@ func Test_Circle_Speed(t *testing.T) { time.Sleep(time.Second * 10) fmt.Println(count) } + +func Test_Chan_Circle_Speed(t *testing.T) { + buf := StarChanStack{} + count := uint64(0) + for i := 1; i <= 10; i++ { + go func() { + for { + err := buf.Push('a') + if err != nil { + fmt.Println("finished write") + break + } + } + }() + } + for i := 1; i <= 10; i++ { + go func() { + for { + _, err := buf.Pop() + if err == nil { + atomic.AddUint64(&count, 1) + } else { + fmt.Println("finished read") + break + } + } + }() + } + time.Sleep(time.Second * 10) + fmt.Println(count) + buf.Close() + time.Sleep(time.Second * 3) +} diff --git a/starmap_test.go b/starmap_test.go index f83aeb1..fbd033b 100644 --- a/starmap_test.go +++ b/starmap_test.go @@ -8,28 +8,28 @@ import ( ) type Miaomiao struct { - Sakura string - Fuck int - Mimi bool + Val1 string + Val2 int + Val3 bool } func Test_Remote(t *testing.T) { - Store("nmb", 22222) + Store("test", 22222) server, _ := NewServer("tcp", "127.0.0.1:45678") server.Register(&Miaomiao{}) client, _ := NewClient("tcp", "127.0.0.1:45678", time.Second*2) _ = server - fmt.Println(client.Get("maio")) - fmt.Println(client.Exists("maio")) - fmt.Println(client.Store("maio", Miaomiao{"sss", 222, true})) - fmt.Println(client.Get("maio")) - fmt.Println(client.Exists("maio")) - fmt.Println(client.Delete("maio")) - fmt.Println(client.Exists("maio")) + fmt.Println(client.Get("meow")) + fmt.Println(client.Exists("meow")) + fmt.Println(client.Store("meow", Miaomiao{"sss", 222, true})) + fmt.Println(client.Get("meow")) + fmt.Println(client.Exists("meow")) + fmt.Println(client.Delete("meow")) + fmt.Println(client.Exists("meow")) } func (cat *Miaomiao) GetName() string { - return "miaomiao" + return "meow" } func Test_Math(t *testing.T) {