You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
150 lines
2.5 KiB
Go
150 lines
2.5 KiB
Go
2 months ago
|
package starlog
|
||
|
|
||
|
import (
|
||
|
"errors"
|
||
|
"io"
|
||
|
"os"
|
||
|
"sync"
|
||
|
"sync/atomic"
|
||
|
)
|
||
|
|
||
|
type starMapKV struct {
|
||
|
kvMap map[interface{}]interface{}
|
||
|
mu sync.RWMutex
|
||
|
}
|
||
|
|
||
|
func newStarMap() starMapKV {
|
||
|
var mp starMapKV
|
||
|
mp.kvMap = make(map[interface{}]interface{})
|
||
|
return mp
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) Get(key interface{}) (interface{}, error) {
|
||
|
var err error
|
||
|
m.mu.RLock()
|
||
|
defer m.mu.RUnlock()
|
||
|
data, ok := m.kvMap[key]
|
||
|
if !ok {
|
||
|
err = os.ErrNotExist
|
||
|
}
|
||
|
return data, err
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) MustGet(key interface{}) interface{} {
|
||
|
result, _ := m.Get(key)
|
||
|
return result
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) Store(key interface{}, value interface{}) error {
|
||
|
m.mu.Lock()
|
||
|
defer m.mu.Unlock()
|
||
|
m.kvMap[key] = value
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) Exists(key interface{}) bool {
|
||
|
m.mu.RLock()
|
||
|
defer m.mu.RUnlock()
|
||
|
_, ok := m.kvMap[key]
|
||
|
return ok
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) Delete(key interface{}) error {
|
||
|
m.mu.Lock()
|
||
|
defer m.mu.Unlock()
|
||
|
delete(m.kvMap, key)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (m *starMapKV) Range(run func(k interface{}, v interface{}) bool) error {
|
||
|
for k, v := range m.kvMap {
|
||
|
if !run(k, v) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
type starChanStack struct {
|
||
|
data chan interface{}
|
||
|
cap uint64
|
||
|
current uint64
|
||
|
isClose atomic.Value
|
||
|
}
|
||
|
|
||
|
func newStarChanStack(cap uint64) *starChanStack {
|
||
|
rtnBuffer := new(starChanStack)
|
||
|
rtnBuffer.cap = cap
|
||
|
rtnBuffer.isClose.Store(false)
|
||
|
rtnBuffer.data = make(chan interface{}, cap)
|
||
|
return rtnBuffer
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) init() {
|
||
|
s.cap = 1024
|
||
|
s.data = make(chan interface{}, s.cap)
|
||
|
s.isClose.Store(false)
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Free() uint64 {
|
||
|
return s.cap - s.current
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Cap() uint64 {
|
||
|
return s.cap
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Len() uint64 {
|
||
|
return s.current
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Pop() (interface{}, error) {
|
||
|
if s.isClose.Load() == nil {
|
||
|
s.init()
|
||
|
}
|
||
|
if s.isClose.Load().(bool) {
|
||
|
return 0, io.EOF
|
||
|
}
|
||
|
data, ok := <-s.data
|
||
|
if !ok {
|
||
|
s.isClose.Store(true)
|
||
|
return 0, errors.New("channel read error")
|
||
|
}
|
||
|
for {
|
||
|
current := atomic.LoadUint64(&s.current)
|
||
|
if atomic.CompareAndSwapUint64(&s.current, current, current-1) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
return data, nil
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Push(data interface{}) error {
|
||
|
defer func() {
|
||
|
recover()
|
||
|
}()
|
||
|
if s.isClose.Load() == nil {
|
||
|
s.init()
|
||
|
}
|
||
|
if s.isClose.Load().(bool) {
|
||
|
return io.EOF
|
||
|
}
|
||
|
s.data <- data
|
||
|
for {
|
||
|
current := atomic.LoadUint64(&s.current)
|
||
|
if atomic.CompareAndSwapUint64(&s.current, current, current+1) {
|
||
|
break
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (s *starChanStack) Close() error {
|
||
|
if s.isClose.Load() == nil {
|
||
|
s.init()
|
||
|
}
|
||
|
s.isClose.Store(true)
|
||
|
close(s.data)
|
||
|
return nil
|
||
|
}
|