From cf286209495892762a0866cf29816af3dae3f3e4 Mon Sep 17 00:00:00 2001 From: 兔子 Date: Mon, 20 Jul 2020 11:24:42 +0800 Subject: [PATCH] init --- basicrw.go | 37 +++++++++++++++++++ client_basicrw.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++ client_prorw.go | 55 ++++++++++++++++++++++++++++ prorw.go | 37 +++++++++++++++++++ remote_define.go | 91 +++++++++++++++++++++++++++++++++++++++++++++++ server_basicrw.go | 63 ++++++++++++++++++++++++++++++++ server_prorw.go | 42 ++++++++++++++++++++++ starmap.go | 13 +++++++ starmap_test.go | 41 +++++++++++++++++++++ transfer.go | 38 ++++++++++++++++++++ 10 files changed, 508 insertions(+) create mode 100644 basicrw.go create mode 100644 client_basicrw.go create mode 100644 client_prorw.go create mode 100644 prorw.go create mode 100644 remote_define.go create mode 100644 server_basicrw.go create mode 100644 server_prorw.go create mode 100644 starmap.go create mode 100644 starmap_test.go create mode 100644 transfer.go diff --git a/basicrw.go b/basicrw.go new file mode 100644 index 0000000..84cd449 --- /dev/null +++ b/basicrw.go @@ -0,0 +1,37 @@ +package starmap + +import "errors" + +func Get(key string) (interface{}, error) { + var err error + kvmu.RLock() + defer kvmu.RUnlock() + data, ok := kvMap[key] + if !ok { + err = errors.New("key not exists") + } + return data, err +} + +func Store(key string, value interface{}) error { + kvmu.Lock() + defer kvmu.Unlock() + kvMap[key] = value + return nil +} + +func Delete(key string) error { + kvmu.Lock() + defer kvmu.Unlock() + delete(kvMap, key) + return nil +} + +func Range(run func(k string, v interface{}) bool) error { + for k, v := range kvMap { + if !run(k, v) { + break + } + } + return nil +} diff --git a/client_basicrw.go b/client_basicrw.go new file mode 100644 index 0000000..28ac15f --- /dev/null +++ b/client_basicrw.go @@ -0,0 +1,91 @@ +package starmap + +import ( + "errors" + "time" +) + +func (kv *RemoteKv) clientOnline() error { + if kv.client == nil || (kv.setKeepAlive && !kv.client.Online) { + return errors.New("Client Not Online,Please Contact A Server First") + } + if (!kv.setKeepAlive) && (!kv.client.Online) { + err := kv.reconnect() + return err + } + return nil +} +func (kv *RemoteKv) Get(key string) (interface{}, error) { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + if err := kv.clientOnline(); err != nil { + return nil, err + } + data, err := kv.client.SendValueWait("Get", key, kv.TimeOut) + if err != nil { + return nil, err + } + inf, err := decode([]byte(data.Value)) + if err != nil { + return nil, err + } + var repInf *basicReply + repInf = inf.(*basicReply) + if repInf.Key == "ok" { + inf2, err := decode(repInf.Value) + if err != nil { + return nil, err + } + return inf2, nil + } + return nil, errors.New(string(repInf.Value)) +} + +func (kv *RemoteKv) Store(key string, value interface{}) error { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + if err := kv.clientOnline(); err != nil { + return err + } + encodeData, err := encode(value) + if err != nil { + return err + } + sendData, _ := encode(basicReply{key, encodeData}) + data, err := kv.client.SendValueWait("Store", string(sendData), kv.TimeOut) + if err != nil { + return err + } + if data.Value == "ok" { + return nil + } + return errors.New(data.Value) +} + +func (kv *RemoteKv) Delete(key string) error { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + if err := kv.clientOnline(); err != nil { + return err + } + data, err := kv.client.SendValueWait("Delete", key, kv.TimeOut) + if err != nil { + return err + } + if data.Value == "ok" { + return nil + } + return errors.New(data.Value) +} + +func (kv *RemoteKv) keepalive() { + if !kv.setKeepAlive && kv.client.Online { + kv.client.ClientStop() + for kv.client.Online { + time.Sleep(time.Millisecond) + } + } +} diff --git a/client_prorw.go b/client_prorw.go new file mode 100644 index 0000000..6fb751f --- /dev/null +++ b/client_prorw.go @@ -0,0 +1,55 @@ +package starmap + +import ( + "errors" +) + +func (kv *RemoteKv) GetMap(key string) (StarMap, error) { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + recv, err := kv.client.SendValueWait("GetMap", key, kv.TimeOut) + if err != nil { + return StarMapSample{}, err + } + if recv.Key == "error" { + return StarMapSample{}, errors.New(recv.Value) + } + value, err := decodeMap([]byte(recv.Value)) + if err != nil { + return StarMapSample{}, err + } + return value, nil +} + +func (kv *RemoteKv) StoreMap(value StarMap) error { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + encodeData, err := encodeMap(value) + if err != nil { + return err + } + recv, err := kv.client.SendValueWait("StoreMap", string(encodeData), kv.TimeOut) + if err != nil { + return err + } + if recv.Key == "error" { + return errors.New(recv.Value) + } + return nil +} + +func (kv *RemoteKv) DeleteMap(value StarMap) error { + kv.kvmu.Lock() + defer kv.kvmu.Unlock() + defer kv.keepalive() + recv, err := kv.client.SendValueWait("DeleteMap", value.GetName(), kv.TimeOut) + if err != nil { + return err + } + if recv.Key == "error" { + return errors.New(recv.Value) + } + return nil +} diff --git a/prorw.go b/prorw.go new file mode 100644 index 0000000..89701d5 --- /dev/null +++ b/prorw.go @@ -0,0 +1,37 @@ +package starmap + +import ( + "errors" +) + +type StarMap interface { + GetName() string +} + +type StarMapSample struct{} + +func (sample StarMapSample) GetName() string { + return "starsample" +} + +func GetMap(value StarMap) error { + kvmuPro.RLock() + defer kvmuPro.RUnlock() + value, ok := kvMapPro[value.GetName()] + if !ok { + return errors.New("key not exists") + } + return nil +} + +func StoreMap(value StarMap) error { + kvmuPro.Lock() + defer kvmuPro.Unlock() + kvMapPro[value.GetName()] = value + return nil +} + +func DeleteMap(value StarMap) error { + delete(kvMapPro, value.GetName()) + return nil +} diff --git a/remote_define.go b/remote_define.go new file mode 100644 index 0000000..a87900b --- /dev/null +++ b/remote_define.go @@ -0,0 +1,91 @@ +package starmap + +import ( + "encoding/gob" + "errors" + "sync" + "time" + + "b612.me/notify" +) + +type RemoteKv struct { + kvmu sync.RWMutex + server *notify.StarNotifyS + client *notify.StarNotifyC + addr string + setKeepAlive bool + TimeOut time.Duration + ErrMsg string +} + +func NewServer(addr string) (*RemoteKv, error) { + var kv RemoteKv + var err error + kv.addr = addr + kv.TimeOut = time.Second * 10 + kv.server, err = notify.NewNotifyS("tcp", addr) + if err == nil { + kv.bind() + } + return &kv, err +} + +func NewClient(addr string) (*RemoteKv, error) { + var err error + var kv RemoteKv + kv.addr = addr + kv.TimeOut = time.Second * 10 + kv.client, err = notify.NewNotifyC("tcp", addr) + if err == nil { + kv.setKeepAlive = true + kv.client.UseChannel = false + } + return &kv, err +} + +func (kv *RemoteKv) Register(data interface{}) { + gob.Register(data) +} + +func (kv *RemoteKv) RegisterAll(data []interface{}) { + for _, v := range data { + gob.Register(v) + } +} + +func (kv *RemoteKv) SetKeepAlive(alive bool) { + kv.setKeepAlive = alive +} + +func (kv *RemoteKv) GetKeepAlive() bool { + return kv.setKeepAlive +} + +func (kv *RemoteKv) reconnect() error { + var err error + if kv.addr != "" { + kv.client, err = notify.NewNotifyC("tcp", kv.addr) + if err == nil { + kv.client.UseChannel = false + } + return err + } + return errors.New("Client Not Online!") +} + +func (kv *RemoteKv) bind() { + kv.server.SetNotify("Get", kv.get) + kv.server.SetNotify("Store", kv.store) + kv.server.SetNotify("Delete", kv.store) + kv.server.SetNotify("GetMap", kv.getMap) + kv.server.SetNotify("StoreMap", kv.storeMap) + kv.server.SetNotify("DeleteMap", kv.deleteMap) +} + +func (kv *RemoteKv) ErrTimeOut(err error) bool { + if err.Error() == "Time Exceed" { + return true + } + return false +} diff --git a/server_basicrw.go b/server_basicrw.go new file mode 100644 index 0000000..219eb18 --- /dev/null +++ b/server_basicrw.go @@ -0,0 +1,63 @@ +package starmap + +import "b612.me/notify" + +type basicReply struct { + Key string + Value []byte +} + +func (kv *RemoteKv) get(data notify.SMsg) string { + key := data.Value + value, err := Get(key) + if err != nil { + sendData, err := encode(basicReply{"error", []byte("key not exists")}) + if err != nil { + kv.ErrMsg += err.Error() + return "" + } + data.Reply(string(sendData)) + } else { + prepareData, err := encode(value) + if err != nil { + kv.ErrMsg += err.Error() + sendData, _ := encode(basicReply{"error", []byte(err.Error())}) + data.Reply(string(sendData)) + return "" + } + sendData, err := encode(basicReply{"ok", prepareData}) + if err != nil { + kv.ErrMsg += err.Error() + sendData, _ := encode(basicReply{"error", []byte(err.Error())}) + data.Reply(string(sendData)) + return "" + } + data.Reply(string(sendData)) + } + return "" +} + +func (kv *RemoteKv) store(data notify.SMsg) string { + decodeData, err := decode([]byte(data.Value)) + if err != nil { + return err.Error() + } + inf := decodeData.(*basicReply) + customStruct, err := decode(inf.Value) + if err != nil { + return err.Error() + } + err = Store(inf.Key, customStruct) + if err != nil { + return err.Error() + } + return "ok" +} + +func (kv *RemoteKv) delete(data notify.SMsg) string { + err := Delete(data.Value) + if err != nil { + return err.Error() + } + return "ok" +} diff --git a/server_prorw.go b/server_prorw.go new file mode 100644 index 0000000..25984a8 --- /dev/null +++ b/server_prorw.go @@ -0,0 +1,42 @@ +package starmap + +import ( + "b612.me/notify" +) + +func (kv *RemoteKv) getMap(data notify.SMsg) string { + val, ok := kvMapPro[data.Value] + if !ok { + data.Key = "error" + data.Reply("key not exists") + return "" + } + encodeData, err := encodeMap(val) + if err != nil { + data.Key = "error" + data.Reply(err.Error()) + return "" + } + return string(encodeData) +} + +func (kv *RemoteKv) storeMap(data notify.SMsg) string { + recvData, err := decodeMap([]byte(data.Value)) + if err != nil { + data.Key = "error" + data.Reply(err.Error()) + return "" + } + err = StoreMap(recvData) + if err != nil { + data.Key = "error" + data.Reply(err.Error()) + return "" + } + return "ok" +} + +func (kv *RemoteKv) deleteMap(data notify.SMsg) string { + delete(kvMapPro, data.Value) + return "ok" +} diff --git a/starmap.go b/starmap.go new file mode 100644 index 0000000..9cd711b --- /dev/null +++ b/starmap.go @@ -0,0 +1,13 @@ +package starmap + +import "sync" + +var kvMap map[string]interface{} +var kvmu sync.RWMutex +var kvMapPro map[string]StarMap +var kvmuPro sync.RWMutex + +func init() { + kvMap = make(map[string]interface{}) + kvMapPro = make(map[string]StarMap) +} diff --git a/starmap_test.go b/starmap_test.go new file mode 100644 index 0000000..ab32bc6 --- /dev/null +++ b/starmap_test.go @@ -0,0 +1,41 @@ +package starmap + +import ( + "fmt" + "testing" +) + +type Miaomiao struct { + Sakura string + Fuck int + Mimi bool +} + +func Test_Remote(t *testing.T) { + Store("nmb", 22222) + server, err := NewServer("127.0.0.1:45678") + server.Register(&Miaomiao{}) + fmt.Println(err) + client, err := NewClient("127.0.0.1:45678") + client.SetKeepAlive(false) + fmt.Println(err) + _ = server + fmt.Println(client.Get("nmb")) + fmt.Println(client.Store("maio", Miaomiao{"sss", 222, true})) + fmt.Println(client.Get("maio")) +} + +func (cat *Miaomiao) GetName() string { + return "miaomiao" +} +func Test_Remote2(t *testing.T) { + server, err := NewServer("127.0.0.1:45678") + server.Register(&Miaomiao{}) + fmt.Println(err) + client, err := NewClient("127.0.0.1:45678") + //client.SetKeepAlive(false) + fmt.Println(err) + _ = server + fmt.Println(client.StoreMap(&Miaomiao{"suki", 1127, false})) + fmt.Println(client.GetMap("miaomiao")) +} diff --git a/transfer.go b/transfer.go new file mode 100644 index 0000000..5d4e6f2 --- /dev/null +++ b/transfer.go @@ -0,0 +1,38 @@ +package starmap + +import ( + "bytes" + "encoding/gob" +) + +func init() { + gob.Register(&basicReply{}) +} + +func encode(src interface{}) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(&src) + return buf.Bytes(), err +} + +func decode(src []byte) (interface{}, error) { + dec := gob.NewDecoder(bytes.NewReader(src)) + var dst interface{} + err := dec.Decode(&dst) + return dst, err +} + +func encodeMap(src StarMap) ([]byte, error) { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + err := enc.Encode(&src) + return buf.Bytes(), err +} + +func decodeMap(src []byte) (StarMap, error) { + dec := gob.NewDecoder(bytes.NewReader(src)) + var dst StarMap + err := dec.Decode(&dst) + return dst, err +}