diff --git a/basicrw.go b/basicrw.go index d347291..15a9eb4 100644 --- a/basicrw.go +++ b/basicrw.go @@ -1,21 +1,24 @@ package starmap -import "errors" +import ( + "errors" + "os" +) -func (stack *StarStack) Count() int { +func (stack *StarStackMem) Count() int { stack.kvPushmu.Lock() defer stack.kvPushmu.Unlock() return len(stack.kvStack) } -func (stack *StarStack) Push(val interface{}) error { +func (stack *StarStackMem) Push(val interface{}) error { stack.kvPushmu.Lock() defer stack.kvPushmu.Unlock() stack.kvStack = append(stack.kvStack, val) return nil } -func (stack *StarStack) Pop() (interface{}, error) { +func (stack *StarStackMem) Pop() (interface{}, error) { stack.kvPushmu.Lock() defer stack.kvPushmu.Unlock() if len(stack.kvStack) == 0 { @@ -26,59 +29,70 @@ func (stack *StarStack) Pop() (interface{}, error) { return val, nil } -func (stack *StarStack) MustPop() interface{} { +func (stack *StarStackMem) MustPop() interface{} { val, _ := stack.Pop() return val } -func Get(key string) (interface{}, error) { +func Get(key interface{}) (interface{}, error) { return globalMap.Get(key) } -func (m *StarMapKV) Get(key string) (interface{}, error) { +func (m *StarMapKV) Get(key interface{}) (interface{}, error) { var err error - m.kvmu.RLock() - defer m.kvmu.RUnlock() + m.mu.RLock() + defer m.mu.RUnlock() data, ok := m.kvMap[key] if !ok { - err = errors.New("key not exists") + err = os.ErrNotExist } return data, err } -func (m *StarMapKV) MustGet(key string) interface{} { +func (m *StarMapKV) MustGet(key interface{}) interface{} { result, _ := m.Get(key) return result } -func MustGet(key string) interface{} { +func MustGet(key interface{}) interface{} { return globalMap.MustGet(key) } -func Store(key string, value interface{}) error { +func Store(key interface{}, value interface{}) error { return globalMap.Store(key, value) } -func (m *StarMapKV) Store(key string, value interface{}) error { - m.kvmu.Lock() - defer m.kvmu.Unlock() +func (m *StarMapKV) Store(key interface{}, value interface{}) error { + m.mu.Lock() + defer m.mu.Unlock() m.kvMap[key] = value return nil } -func Delete(key string) error { +func Exists(key interface{}) bool { + return globalMap.Exists(key) +} + +func (m *StarMapKV) Exists(key interface{}) bool { + m.mu.RLock() + defer m.mu.RUnlock() + _, ok := m.kvMap[key] + return ok +} + +func Delete(key interface{}) error { return globalMap.Delete(key) } -func (m *StarMapKV) Delete(key string) error { - m.kvmu.Lock() - defer m.kvmu.Unlock() +func (m *StarMapKV) Delete(key interface{}) error { + m.mu.Lock() + defer m.mu.Unlock() delete(m.kvMap, key) return nil } -func Range(run func(k string, v interface{}) bool) error { +func Range(run func(k interface{}, v interface{}) bool) error { return globalMap.Range(run) } -func (m *StarMapKV) Range(run func(k string, v interface{}) bool) error { +func (m *StarMapKV) Range(run func(k interface{}, v interface{}) bool) error { for k, v := range m.kvMap { if !run(k, v) { break diff --git a/client_basicrw.go b/client_basicrw.go index b304309..f0d22f7 100644 --- a/client_basicrw.go +++ b/client_basicrw.go @@ -1,19 +1,11 @@ package starmap import ( - "errors" - "time" + "b612.me/notify" ) func (kv *RemoteKv) clientOnline() error { - if kv.client == nil || (kv.setKeepAlive && !kv.client.Online) { - return errors.New("Client Not Online,Please Contact A Server First") - } - if (!kv.setKeepAlive) && (!kv.client.Online) { - err := kv.reconnect() - return err - } - return nil + return kv.reconnect() } func (kv *RemoteKv) MustGet(key string) interface{} { @@ -21,77 +13,84 @@ func (kv *RemoteKv) MustGet(key string) interface{} { return result } -func (kv *RemoteKv) Get(key string) (interface{}, error) { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() +func (kv *RemoteKv) Get(key interface{}) (interface{}, error) { if err := kv.clientOnline(); err != nil { return nil, err } - data, err := kv.client.SendValueWait("Get", key, kv.TimeOut) + keyCode, err := notify.ToMsgVal(key) if err != nil { return nil, err } - inf, err := decode([]byte(data.Value)) + data, err := kv.client.SendWait("get", keyCode, kv.readTimeout) if err != nil { return nil, err } - var repInf *basicReply - repInf = inf.(*basicReply) - if repInf.Key == "ok" { - inf2, err := decode(repInf.Value) - if err != nil { - return nil, err - } - return inf2, nil + rp, err := data.Value.ToInterface() + if err != nil { + return nil, err } - return nil, errors.New(string(repInf.Value)) + reply := rp.(kvMsg) + return reply.Value, reply.Err } -func (kv *RemoteKv) Store(key string, value interface{}) error { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() +func (kv *RemoteKv) Store(key interface{}, value interface{}) error { if err := kv.clientOnline(); err != nil { return err } - encodeData, err := encode(value) + encodeData, err := notify.ToMsgVal(kvMsg{ + Key: key, + Value: value, + Err: nil, + }) if err != nil { return err } - sendData, _ := encode(basicReply{key, encodeData}) - data, err := kv.client.SendValueWait("Store", string(sendData), kv.TimeOut) + data, err := kv.client.SendWait("store", encodeData, kv.readTimeout) if err != nil { return err } - if data.Value == "ok" { - return nil + rp, err := data.Value.ToInterface() + if err != nil { + return err } - return errors.New(data.Value) + return rp.(kvMsg).Err } -func (kv *RemoteKv) Delete(key string) error { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() +func (kv *RemoteKv) Delete(key interface{}) error { if err := kv.clientOnline(); err != nil { return err } - data, err := kv.client.SendValueWait("Delete", key, kv.TimeOut) + keyCode, err := notify.ToMsgVal(key) if err != nil { return err } - if data.Value == "ok" { - return nil + data, err := kv.client.SendWait("delete", keyCode, kv.readTimeout) + if err != nil { + return err } - return errors.New(data.Value) + rp, err := data.Value.ToInterface() + if err != nil { + return err + } + return rp.(kvMsg).Err } -func (kv *RemoteKv) keepalive() { - if !kv.setKeepAlive && kv.client.Online { - kv.client.ClientStop() - for kv.client.Online { - time.Sleep(time.Millisecond) - } +func (kv *RemoteKv) Exists(key interface{}) bool { + if err := kv.clientOnline(); err != nil { + return false + } + keyCode, err := notify.ToMsgVal(key) + if err != nil { + return false + } + data, err := kv.client.SendWait("exists", keyCode, kv.readTimeout) + if err != nil { + return false + } + rp, err := data.Value.ToInterface() + if err != nil { + return false } + reply := rp.(kvMsg) + return reply.Value.(bool) } diff --git a/client_prorw.go b/client_prorw.go deleted file mode 100644 index 6fb751f..0000000 --- a/client_prorw.go +++ /dev/null @@ -1,55 +0,0 @@ -package starmap - -import ( - "errors" -) - -func (kv *RemoteKv) GetMap(key string) (StarMap, error) { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() - recv, err := kv.client.SendValueWait("GetMap", key, kv.TimeOut) - if err != nil { - return StarMapSample{}, err - } - if recv.Key == "error" { - return StarMapSample{}, errors.New(recv.Value) - } - value, err := decodeMap([]byte(recv.Value)) - if err != nil { - return StarMapSample{}, err - } - return value, nil -} - -func (kv *RemoteKv) StoreMap(value StarMap) error { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() - encodeData, err := encodeMap(value) - if err != nil { - return err - } - recv, err := kv.client.SendValueWait("StoreMap", string(encodeData), kv.TimeOut) - if err != nil { - return err - } - if recv.Key == "error" { - return errors.New(recv.Value) - } - return nil -} - -func (kv *RemoteKv) DeleteMap(value StarMap) error { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - defer kv.keepalive() - recv, err := kv.client.SendValueWait("DeleteMap", value.GetName(), kv.TimeOut) - if err != nil { - return err - } - if recv.Key == "error" { - return errors.New(recv.Value) - } - return nil -} diff --git a/prorw.go b/prorw.go deleted file mode 100644 index 4b37724..0000000 --- a/prorw.go +++ /dev/null @@ -1,48 +0,0 @@ -package starmap - -import ( - "errors" -) - -type StarMap interface { - GetName() string -} - -type StarMapSample struct{} - -func (sample StarMapSample) GetName() string { - return "starsample" -} - -func GetMap(value StarMap) error { - return globalMap.GetMap(value) -} -func (m *StarMapKV) GetMap(value StarMap) error { - m.kvmuPro.RLock() - defer m.kvmuPro.RUnlock() - value, ok := m.kvMapPro[value.GetName()] - if !ok { - return errors.New("key not exists") - } - return nil -} - -func StoreMap(value StarMap) error { - return globalMap.StoreMap(value) -} -func (m *StarMapKV) StoreMap(value StarMap) error { - m.kvmuPro.Lock() - defer m.kvmuPro.Unlock() - m.kvMapPro[value.GetName()] = value - return nil -} - -func DeleteMap(value StarMap) error { - return globalMap.DeleteMap(value) -} -func (m *StarMapKV) DeleteMap(value StarMap) error { - m.kvmuPro.RLock() - defer m.kvmuPro.RUnlock() - delete(m.kvMapPro, value.GetName()) - return nil -} diff --git a/remote_define.go b/remote_define.go index 7e98096..2d84ff4 100644 --- a/remote_define.go +++ b/remote_define.go @@ -2,47 +2,73 @@ package starmap import ( "encoding/gob" - "errors" - "sync" + "os" "time" "b612.me/notify" ) +func init() { + notify.Register(kvMsg{}) + notify.Register(starMapErr{}) +} + +type starMapErr struct { + Err string +} + +func (s starMapErr) Error() string { + return s.Err +} +func newStarMapErr(err error) error { + if err == nil { + return nil + } + return starMapErr{Err: err.Error()} +} + +type kvMsg struct { + Key interface{} + Value interface{} + Err error +} + type RemoteKv struct { - kvmu sync.RWMutex - server *notify.StarNotifyS - client *notify.StarNotifyC - addr string - setKeepAlive bool - TimeOut time.Duration - ErrMsg string - kvmap StarMapKV + server notify.Server + client notify.Client + kvmap StarMapKV + addr string + network string + readTimeout time.Duration + timeout time.Duration } -func NewServer(addr string) (*RemoteKv, error) { - var kv RemoteKv +func NewServer(network, addr string) (*RemoteKv, error) { var err error - kv.addr = addr - kv.TimeOut = time.Second * 10 - kv.kvmap = NewStarMap() - kv.server, err = notify.NewNotifyS("tcp", addr) + kv := RemoteKv{ + server: notify.NewServer(), + kvmap: NewStarMap(), + addr: addr, + network: network, + } + err = kv.server.Listen(network, addr) if err == nil { kv.bind() } return &kv, err } -func NewClient(addr string) (*RemoteKv, error) { +func NewClient(network, addr string, dialTimeout time.Duration) (*RemoteKv, error) { var err error - var kv RemoteKv - kv.addr = addr - kv.TimeOut = time.Second * 10 - kv.client, err = notify.NewNotifyC("tcp", addr) - if err == nil { - kv.setKeepAlive = true - kv.client.UseChannel = false + kv := RemoteKv{ + client: notify.NewClient(), + kvmap: NewStarMap(), + addr: addr, + network: network, + timeout: dialTimeout, + readTimeout: time.Second * 5, } + err = kv.client.ConnectTimeout(network, addr, dialTimeout) return &kv, err } @@ -56,38 +82,21 @@ func (kv *RemoteKv) RegisterAll(data []interface{}) { } } -func (kv *RemoteKv) SetKeepAlive(alive bool) { - kv.setKeepAlive = alive -} - -func (kv *RemoteKv) GetKeepAlive() bool { - return kv.setKeepAlive +func (kv *RemoteKv) bind() { + //for server + kv.server.SetDefaultLink(kv.dispatch) + //for client } func (kv *RemoteKv) reconnect() error { - var err error - if kv.addr != "" { - kv.client, err = notify.NewNotifyC("tcp", kv.addr) - if err == nil { - kv.client.UseChannel = false - } - return err + if kv.server != nil { + return nil } - return errors.New("Client Not Online!") -} - -func (kv *RemoteKv) bind() { - kv.server.SetNotify("Get", kv.get) - kv.server.SetNotify("Store", kv.store) - kv.server.SetNotify("Delete", kv.store) - kv.server.SetNotify("GetMap", kv.getMap) - kv.server.SetNotify("StoreMap", kv.storeMap) - kv.server.SetNotify("DeleteMap", kv.deleteMap) -} - -func (kv *RemoteKv) ErrTimeOut(err error) bool { - if err.Error() == "Time Exceed" { - return true + if kv.client != nil { + if kv.client.Status().Alive { + return nil + } + return kv.client.ConnectTimeout(kv.network, kv.addr, kv.timeout) } - return false + return os.ErrInvalid } diff --git a/server_basicrw.go b/server_basicrw.go index 011416b..22e33a2 100644 --- a/server_basicrw.go +++ b/server_basicrw.go @@ -1,63 +1,39 @@ package starmap -import "b612.me/notify" +import ( + "b612.me/notify" +) -type basicReply struct { - Key string - Value []byte -} - -func (kv *RemoteKv) get(data notify.SMsg) string { - key := data.Value - value, err := kv.kvmap.Get(key) - if err != nil { - sendData, err := encode(basicReply{"error", []byte("key not exists")}) - if err != nil { - kv.ErrMsg += err.Error() - return "" - } - data.Reply(string(sendData)) - } else { - prepareData, err := encode(value) - if err != nil { - kv.ErrMsg += err.Error() - sendData, _ := encode(basicReply{"error", []byte(err.Error())}) - data.Reply(string(sendData)) - return "" - } - sendData, err := encode(basicReply{"ok", prepareData}) - if err != nil { - kv.ErrMsg += err.Error() - sendData, _ := encode(basicReply{"error", []byte(err.Error())}) - data.Reply(string(sendData)) - return "" - } - data.Reply(string(sendData)) - } - return "" -} - -func (kv *RemoteKv) store(data notify.SMsg) string { - decodeData, err := decode([]byte(data.Value)) - if err != nil { - return err.Error() - } - inf := decodeData.(*basicReply) - customStruct, err := decode(inf.Value) - if err != nil { - return err.Error() - } - err = kv.kvmap.Store(inf.Key, customStruct) - if err != nil { - return err.Error() - } - return "ok" -} - -func (kv *RemoteKv) delete(data notify.SMsg) string { - err := kv.kvmap.Delete(data.Value) - if err != nil { - return err.Error() +func (r *RemoteKv) dispatch(msg *notify.Message) { + switch msg.Key { + case "get": + data, err := r.kvmap.Get(msg.Value.MustToInterface()) + msg.ReplyObj(kvMsg{ + Key: msg.Value.MustToInterface(), + Value: data, + Err: newStarMapErr(err), + }) + case "delete": + err := r.kvmap.Delete(msg.Value.MustToInterface()) + msg.ReplyObj(kvMsg{ + Key: msg.Value.MustToInterface(), + Value: nil, + Err: newStarMapErr(err), + }) + case "exists": + ext := r.kvmap.Exists(msg.Value.MustToInterface()) + msg.ReplyObj(kvMsg{ + Key: msg.Value.MustToInterface(), + Value: ext, + Err: newStarMapErr(nil), + }) + case "store": + ext := msg.Value.MustToInterface().(kvMsg) + err := r.kvmap.Store(ext.Key, ext.Value) + msg.ReplyObj(kvMsg{ + Key: msg.Value.MustToInterface(), + Value: nil, + Err: newStarMapErr(err), + }) } - return "ok" } diff --git a/server_prorw.go b/server_prorw.go deleted file mode 100644 index 1e5d06e..0000000 --- a/server_prorw.go +++ /dev/null @@ -1,48 +0,0 @@ -package starmap - -import ( - "b612.me/notify" -) - -func (kv *RemoteKv) getMap(data notify.SMsg) string { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - val, ok := kv.kvmap.kvMapPro[data.Value] - if !ok { - data.Key = "error" - data.Reply("key not exists") - return "" - } - encodeData, err := encodeMap(val) - if err != nil { - data.Key = "error" - data.Reply(err.Error()) - return "" - } - return string(encodeData) -} - -func (kv *RemoteKv) storeMap(data notify.SMsg) string { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - recvData, err := decodeMap([]byte(data.Value)) - if err != nil { - data.Key = "error" - data.Reply(err.Error()) - return "" - } - err = kv.kvmap.StoreMap(recvData) - if err != nil { - data.Key = "error" - data.Reply(err.Error()) - return "" - } - return "ok" -} - -func (kv *RemoteKv) deleteMap(data notify.SMsg) string { - kv.kvmu.Lock() - defer kv.kvmu.Unlock() - delete(kv.kvmap.kvMapPro, data.Value) - return "ok" -} diff --git a/stack.go b/stack.go new file mode 100644 index 0000000..56dcc40 --- /dev/null +++ b/stack.go @@ -0,0 +1,199 @@ +package starmap + +import ( + "errors" + "fmt" + "io" + "os" + "runtime" + "sync" + "sync/atomic" + "time" +) + +type StarStack struct { + datas []interface{} + pStart uint64 + pEnd uint64 + cap uint64 + isClose atomic.Value + rmu sync.Mutex + wmu sync.Mutex +} + +func NewStarStack(cap uint64) *StarStack { + rtnBuffer := new(StarStack) + rtnBuffer.cap = cap + rtnBuffer.datas = make([]interface{}, cap) + rtnBuffer.isClose.Store(false) + return rtnBuffer +} + +func (star *StarStack) init() { + star.cap = 1024 + star.datas = make([]interface{}, star.cap) + star.isClose.Store(false) +} + +func (star *StarStack) Free() uint64 { + return star.cap - star.Len() +} + +func (star *StarStack) Cap() uint64 { + return star.cap +} + +func (star *StarStack) Len() uint64 { + if star.pEnd >= star.pStart { + return star.pEnd - star.pStart + } + return star.pEnd - star.pStart + star.cap +} + +func (star *StarStack) PopNoWait() (interface{}, error) { + if star.isClose.Load() == nil { + star.init() + } + if star.isClose.Load().(bool) { + return 0, io.EOF + } + if star.Len() == 0 { + return 0, os.ErrNotExist + } + nowPtr := star.pStart + nextPtr := star.pStart + 1 + if nextPtr >= star.cap { + nextPtr = 0 + } + data := star.datas[nowPtr] + ok := atomic.CompareAndSwapUint64(&star.pStart, nowPtr, nextPtr) + if !ok { + return 0, os.ErrInvalid + } + return data, nil +} + +func (star *StarStack) MustPop() interface{} { + if star.isClose.Load() == nil { + star.init() + } + data, err := star.Pop() + if err != nil { + return nil + } + return data +} + +func (star *StarStack) Pop() (interface{}, error) { + if star.isClose.Load() == nil { + star.init() + } + for { + if star.isClose.Load().(bool) { + return 0, io.EOF + } + if star.Len() == 0 { + return 0, os.ErrNotExist + } + nowPtr := star.pStart + nextPtr := star.pStart + 1 + if nextPtr >= star.cap { + nextPtr = 0 + } + data := star.datas[nowPtr] + ok := atomic.CompareAndSwapUint64(&star.pStart, nowPtr, nextPtr) + if !ok { + time.Sleep(time.Microsecond) + runtime.Gosched() + continue + } + return data, nil + } +} + +func (star *StarStack) Push(data interface{}) error { + if star.isClose.Load() == nil { + star.init() + } + if star.isClose.Load().(bool) { + return io.EOF + } + nowPtr := star.pEnd + kariEnd := nowPtr + 1 + if kariEnd == star.cap { + kariEnd = 0 + } + if kariEnd == atomic.LoadUint64(&star.pStart) { + for { + time.Sleep(time.Microsecond) + runtime.Gosched() + if kariEnd != atomic.LoadUint64(&star.pStart) { + break + } + } + } + star.datas[nowPtr] = data + if ok := atomic.CompareAndSwapUint64(&star.pEnd, nowPtr, kariEnd); !ok { + return os.ErrInvalid + } + return nil +} + +func (star *StarStack) Close() error { + if star.isClose.Load() == nil { + star.init() + } + star.isClose.Store(true) + return nil +} +func (star *StarStack) Read(buf []interface{}) (int, error) { + if star.isClose.Load() == nil { + star.init() + } + if star.isClose.Load().(bool) { + return 0, io.EOF + } + if buf == nil { + return 0, errors.New("buffer is nil") + } + star.rmu.Lock() + defer star.rmu.Unlock() + var sum int = 0 + for i := 0; i < len(buf); i++ { + data, err := star.PopNoWait() + if err != nil { + if err == io.EOF { + return sum, err + } + if err == os.ErrNotExist { + i-- + continue + } + return sum, nil + } + buf[i] = data + sum++ + } + return sum, nil +} + +func (star *StarStack) Write(bts []byte) (int, error) { + if star.isClose.Load() == nil { + star.init() + } + if bts == nil || star.isClose.Load().(bool) { + return 0, io.EOF + } + star.wmu.Lock() + defer star.wmu.Unlock() + var sum = 0 + for i := 0; i < len(bts); i++ { + err := star.Push(bts[i]) + if err != nil { + fmt.Println("Write bts err:", err) + return sum, err + } + sum++ + } + return sum, nil +} diff --git a/stack_test.go b/stack_test.go new file mode 100644 index 0000000..192825b --- /dev/null +++ b/stack_test.go @@ -0,0 +1,33 @@ +package starmap + +import ( + "fmt" + "sync/atomic" + "testing" + "time" +) + +func Test_Circle_Speed(t *testing.T) { + buf := StarStack{} + count := uint64(0) + for i := 1; i <= 10; i++ { + go func() { + for { + buf.Push('a') + + } + }() + } + for i := 1; i <= 10; i++ { + go func() { + for { + _, err := buf.Pop() + if err == nil { + atomic.AddUint64(&count, 1) + } + } + }() + } + time.Sleep(time.Second * 10) + fmt.Println(count) +} diff --git a/starmap.go b/starmap.go index 6d724d5..5e03b53 100644 --- a/starmap.go +++ b/starmap.go @@ -5,13 +5,11 @@ import "sync" var globalMap StarMapKV type StarMapKV struct { - kvMap map[string]interface{} - kvmu sync.RWMutex - kvMapPro map[string]StarMap - kvmuPro sync.RWMutex + kvMap map[interface{}]interface{} + mu sync.RWMutex } -type StarStack struct { +type StarStackMem struct { kvPushmu sync.RWMutex kvStack []interface{} } @@ -22,7 +20,6 @@ func init() { func NewStarMap() StarMapKV { var mp StarMapKV - mp.kvMap = make(map[string]interface{}) - mp.kvMapPro = make(map[string]StarMap) + mp.kvMap = make(map[interface{}]interface{}) return mp } diff --git a/starmap_test.go b/starmap_test.go index 7566a24..f83aeb1 100644 --- a/starmap_test.go +++ b/starmap_test.go @@ -15,32 +15,22 @@ type Miaomiao struct { func Test_Remote(t *testing.T) { Store("nmb", 22222) - server, err := NewServer("127.0.0.1:45678") + server, _ := NewServer("tcp", "127.0.0.1:45678") server.Register(&Miaomiao{}) - fmt.Println(err) - client, err := NewClient("127.0.0.1:45678") - client.SetKeepAlive(false) - fmt.Println(err) + client, _ := NewClient("tcp", "127.0.0.1:45678", time.Second*2) _ = server - fmt.Println(client.Get("nmb")) + fmt.Println(client.Get("maio")) + fmt.Println(client.Exists("maio")) fmt.Println(client.Store("maio", Miaomiao{"sss", 222, true})) fmt.Println(client.Get("maio")) + fmt.Println(client.Exists("maio")) + fmt.Println(client.Delete("maio")) + fmt.Println(client.Exists("maio")) } func (cat *Miaomiao) GetName() string { return "miaomiao" } -func Test_Remote2(t *testing.T) { - server, err := NewServer("127.0.0.1:45678") - server.Register(&Miaomiao{}) - fmt.Println(err) - client, err := NewClient("127.0.0.1:45678") - //client.SetKeepAlive(false) - fmt.Println(err) - _ = server - fmt.Println(client.StoreMap(&Miaomiao{"suki", 1127, false})) - fmt.Println(client.GetMap("miaomiao")) -} func Test_Math(t *testing.T) { wg := NewWaitGroup(5000) diff --git a/sync.go b/sync.go index 57405b5..bcb5a84 100644 --- a/sync.go +++ b/sync.go @@ -25,7 +25,7 @@ func (swg *WaitGroup) Add(delta int) { } for { allC := atomic.LoadUint32(&swg.allCount) - if atomic.LoadUint32(&swg.maxCount) >= allC+uint32(delta) { + if atomic.LoadUint32(&swg.maxCount) == 0 || atomic.LoadUint32(&swg.maxCount) >= allC+uint32(delta) { if delta < 0 { atomic.AddUint32(&swg.allCount, ^uint32(Udelta)) } else { diff --git a/transfer.go b/transfer.go deleted file mode 100644 index 5d4e6f2..0000000 --- a/transfer.go +++ /dev/null @@ -1,38 +0,0 @@ -package starmap - -import ( - "bytes" - "encoding/gob" -) - -func init() { - gob.Register(&basicReply{}) -} - -func encode(src interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(&src) - return buf.Bytes(), err -} - -func decode(src []byte) (interface{}, error) { - dec := gob.NewDecoder(bytes.NewReader(src)) - var dst interface{} - err := dec.Decode(&dst) - return dst, err -} - -func encodeMap(src StarMap) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(&src) - return buf.Bytes(), err -} - -func decodeMap(src []byte) (StarMap, error) { - dec := gob.NewDecoder(bytes.NewReader(src)) - var dst StarMap - err := dec.Decode(&dst) - return dst, err -}