From 9065a12b9905342b358cd58cc0a3af9a2d79b818 Mon Sep 17 00:00:00 2001 From: starainrt Date: Thu, 19 May 2022 11:04:52 +0800 Subject: [PATCH] add orm function --- client.go | 13 ++++---- go.mod | 2 +- go.sum | 4 +-- msg.go | 29 ++++++++++++++-- msg_test.go | 96 +++++++++++++++++++++++++++++++++++++++++++++++++++++ server.go | 18 +++++----- 6 files changed, 143 insertions(+), 19 deletions(-) create mode 100644 msg_test.go diff --git a/client.go b/client.go index a7b3d5f..615b7de 100644 --- a/client.go +++ b/client.go @@ -15,7 +15,6 @@ import ( "time" ) -//var nowd int64 type ClientCommon struct { alive atomic.Value status Status @@ -218,7 +217,9 @@ func (c *ClientCommon) readMessage() { } data := make([]byte, 8192) if c.maxReadTimeout.Seconds() != 0 { - c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)) + if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil { + //TODO:ALERT + } } readNum, err := c.conn.Read(data) if err == os.ErrDeadlineExceeded { @@ -314,8 +315,8 @@ func (c *ClientCommon) dispatchMsg(message Message) { c.noFinSyncMsgPool.Delete(message.ID) return } - return - //fallthrough + //return + fallthrough default: } callFn := func(fn func(*Message)) { @@ -411,7 +412,7 @@ func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message c.noFinSyncMsgPool.Delete(data.TransferMsg.ID) return Message{}, os.ErrDeadlineExceeded case <-c.stopCtx.Done(): - return Message{}, errors.New("Service shutdown") + return Message{}, errors.New("service shutdown") case msg, ok := <-data.Reply: if !ok { return msg, os.ErrInvalid @@ -434,7 +435,7 @@ func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, e c.noFinSyncMsgPool.Delete(data.TransferMsg.ID) return Message{}, os.ErrDeadlineExceeded case <-c.stopCtx.Done(): - return Message{}, errors.New("Service shutdown") + return Message{}, errors.New("service shutdown") case msg, ok := <-data.Reply: if !ok { return msg, os.ErrInvalid diff --git a/go.mod b/go.mod index 8aedeff..6f733a3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module b612.me/notify go 1.16 require ( - b612.me/starcrypto v0.0.1 + b612.me/starcrypto v0.0.2 b612.me/stario v0.0.5 b612.me/starnet v0.1.3 ) diff --git a/go.sum b/go.sum index 48a502f..b11ad09 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -b612.me/starcrypto v0.0.1 h1:xGngzXPUrVbqtWzNw2e+0eWsdG7GG1/X+ONDGIzdriI= -b612.me/starcrypto v0.0.1/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo= +b612.me/starcrypto v0.0.2 h1:aRf1HcqK8GqHYxLAhWfFC4W/EqQLEFNEmxsBu3wG30o= +b612.me/starcrypto v0.0.2/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo= b612.me/stario v0.0.5 h1:Q1OGF+8eOoK49zMzkyh80GWaMuknhey6+PWJJL9ZuNo= b612.me/stario v0.0.5/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw= b612.me/starnet v0.1.3 h1:UjY6M96gdPdJtxnQGzCttqSwFw93sDZSHiIGtdOlFfk= diff --git a/msg.go b/msg.go index d21e5be..1350e65 100644 --- a/msg.go +++ b/msg.go @@ -7,7 +7,7 @@ import ( "fmt" "net" "os" - "sync" + "reflect" "sync/atomic" "time" ) @@ -45,7 +45,6 @@ type Message struct { ServerConn Client TransferMsg Time time.Time - sync.Mutex } type WaitMsg struct { @@ -475,3 +474,29 @@ func MustToMsgVal(val interface{}) MsgVal { } return d } + +func (m MsgVal) Orm(stu interface{}) error { + inf, err := m.ToInterface() + if err != nil { + return err + } + t := reflect.TypeOf(stu) + if t.Kind() != reflect.Ptr { + return errors.New("interface not writable(pointer wanted)") + } + if !reflect.ValueOf(stu).Elem().CanSet() { + return errors.New("interface not writable") + } + it := reflect.TypeOf(inf) + if t.Elem().Kind() != it.Kind() { + return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind()) + } + if t.Elem().Name() != it.Name() { + return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name()) + } + if t.Elem().String() != it.String() { + return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String()) + } + reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf)) + return nil +} diff --git a/msg_test.go b/msg_test.go new file mode 100644 index 0000000..72f2c13 --- /dev/null +++ b/msg_test.go @@ -0,0 +1,96 @@ +package notify + +import ( + "fmt" + "testing" + "time" +) + +func TestMsgEnDeCode(t *testing.T) { + Register(HelloMessage{}) + Register(Error{}) + go ServerRun(time.Second * 30) + time.Sleep(time.Second * 2) + ClientRun(time.Second * 35) +} + +type Error struct { + Msg string +} + +func (e Error) Error() string { + return e.Msg +} + +type WorldMessage struct { + Port int + MyCode string + MyInfo []int + Err error +} + +type HelloMessage struct { + ID string + MyMap map[string]string + MySlice []int + World WorldMessage +} + +func ClientRun(stopTime time.Duration) { + c := NewClient() + err := c.Connect("tcp", "127.0.0.1:23456") + if err != nil { + panic(err) + } + c.SetLink("msg", func(msg *Message) { + var hi HelloMessage + err := msg.Value.Orm(&hi) + if err != nil { + panic(err) + } + fmt.Printf("recv info from server,struct detail is %+v\n", hi) + }) + timer := time.NewTimer(stopTime) + for { + select { + case <-timer.C: + c.Stop() + return + case <-time.After(time.Second * 2): + fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{ + ID: "client", + MyMap: map[string]string{"hello": "world"}, + MySlice: []int{int(time.Now().Unix())}, + World: WorldMessage{ + Port: 520, + MyCode: "b612", + MyInfo: []int{0, 1, 2, 3}, + Err: Error{Msg: "Hello World"}, + }, + })) + } + + } +} + +func ServerRun(stopTime time.Duration) { + s := NewServer() + err := s.Listen("tcp", "127.0.0.1:23456") + if err != nil { + panic(err) + } + s.SetLink("msg", func(msg *Message) { + var hi HelloMessage + err := msg.Value.Orm(&hi) + if err != nil { + panic(err) + } + fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi) + hi.ID = "server" + hi.World.Port = 666 + hi.MySlice = append(hi.MySlice, 1, 1, 2, 7) + msg.ReplyObj(hi) + }) + <-time.After(stopTime) + s.Stop() +} diff --git a/server.go b/server.go index 966d94f..f1562ee 100644 --- a/server.go +++ b/server.go @@ -241,7 +241,7 @@ func (s *ServerCommon) loadMessage() { s.mu.RLock() for _, v := range s.clientPool { wg.Add(1) - go func() { + go func(v *ClientConn) { defer wg.Done() v.sayGoodByeForTU() v.alive.Store(false) @@ -252,7 +252,7 @@ func (s *ServerCommon) loadMessage() { } v.stopFn() s.removeClient(v) - }() + }(v) } s.mu.RUnlock() select { @@ -343,8 +343,8 @@ func (s *ServerCommon) dispatchMsg(message Message) { return } //just throw - return - //fallthrough + //return + fallthrough default: } callFn := func(fn func(*Message)) { @@ -377,7 +377,9 @@ func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) { data = c.msgEn(c.SecretKey, data) data = s.queue.BuildMessage(data) if c.maxWriteTimeout.Seconds() != 0 { - c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)) + if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil { + return WaitMsg{}, err + } } _, err = c.tuConn.Write(data) //fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000) @@ -416,7 +418,7 @@ func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Dur s.noFinSyncMsgPool.Delete(data.TransferMsg.ID) return Message{}, os.ErrDeadlineExceeded case <-s.stopCtx.Done(): - return Message{}, errors.New("Service shutdown") + return Message{}, errors.New("service shutdown") case msg, ok := <-data.Reply: if !ok { return msg, os.ErrInvalid @@ -447,7 +449,7 @@ func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Conte s.noFinSyncMsgPool.Delete(data.TransferMsg.ID) return Message{}, os.ErrClosed case <-s.stopCtx.Done(): - return Message{}, errors.New("Service shutdown") + return Message{}, errors.New("service shutdown") case msg, ok := <-data.Reply: if !ok { return msg, os.ErrInvalid @@ -626,7 +628,7 @@ func (s *ServerCommon) GetClient(id string) *ClientConn { func (s *ServerCommon) GetClientLists() []*ClientConn { s.mu.RLock() defer s.mu.RUnlock() - var list []*ClientConn = make([]*ClientConn, 0, len(s.clientPool)) + var list = make([]*ClientConn, 0, len(s.clientPool)) for _, v := range s.clientPool { list = append(list, v) }