Compare commits

...

6 Commits

@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
"os"
@ -15,7 +16,6 @@ import (
"time"
)
//var nowd int64
type ClientCommon struct {
alive atomic.Value
status Status
@ -47,6 +47,7 @@ type ClientCommon struct {
useHeartBeat bool
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
debugMode bool
}
func (c *ClientCommon) Connect(network string, addr string) error {
@ -54,7 +55,7 @@ func (c *ClientCommon) Connect(network string, addr string) error {
return errors.New("client already run")
}
c.stopCtx, c.stopFn = context.WithCancel(context.Background())
c.queue = starnet.NewQueueCtx(c.stopCtx, 4)
c.queue = starnet.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
conn, err := net.Dial(network, addr)
if err != nil {
return err
@ -68,12 +69,22 @@ func (c *ClientCommon) Connect(network string, addr string) error {
return c.clientPostInit()
}
func (c *ClientCommon) DebugMode(dmg bool) {
c.mu.Lock()
c.debugMode = dmg
c.mu.Unlock()
}
func (c *ClientCommon) IsDebugMode() bool {
return c.debugMode
}
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
if c.alive.Load().(bool) {
return errors.New("client already run")
}
c.stopCtx, c.stopFn = context.WithCancel(context.Background())
c.queue = starnet.NewQueueCtx(c.stopCtx, 4)
c.queue = starnet.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return err
@ -185,9 +196,14 @@ func (c *ClientCommon) Heartbeat() {
c.lastHeartbeat = time.Now().Unix()
failedCount = 0
}
if c.debugMode {
fmt.Println("failed to recv heartbeat,timeout!")
}
failedCount++
if failedCount >= 3 {
//fmt.Println("heatbeat failed,stop client")
if c.debugMode {
fmt.Println("heatbeat failed more than 3 times,stop client")
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
@ -218,7 +234,9 @@ func (c *ClientCommon) readMessage() {
}
data := make([]byte, 8192)
if c.maxReadTimeout.Seconds() != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
//TODO:ALERT
}
}
readNum, err := c.conn.Read(data)
if err == os.ErrDeadlineExceeded {
@ -228,7 +246,7 @@ func (c *ClientCommon) readMessage() {
continue
}
if err != nil {
if c.showError {
if c.showError || c.debugMode {
fmt.Println("client read error", err)
}
c.alive.Store(false)
@ -278,7 +296,7 @@ func (c *ClientCommon) loadMessage() {
//transfer to Msg
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
if err != nil {
if c.showError {
if c.showError || c.debugMode {
fmt.Println("client decode data error", err)
}
return
@ -314,8 +332,8 @@ func (c *ClientCommon) dispatchMsg(message Message) {
c.noFinSyncMsgPool.Delete(message.ID)
return
}
return
//fallthrough
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
@ -411,7 +429,7 @@ func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("Service shutdown")
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
@ -434,7 +452,7 @@ func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, e
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("Service shutdown")
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
@ -497,7 +515,11 @@ func (c *ClientCommon) Reply(m Message, value MsgVal) error {
}
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
newSendKey, err := starcrypto.RSAEncrypt(newKey, c.handshakeRsaPubKey)
pubKey, err := starcrypto.DecodeRsaPublicKey(c.handshakeRsaPubKey)
if err != nil {
return err
}
newSendKey, err := starcrypto.RSAEncrypt(pubKey, newKey)
if err != nil {
return err
}

@ -39,6 +39,8 @@ type Client interface {
StopMonitorChan() <-chan struct{}
Status() Status
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error))

@ -2,6 +2,7 @@ package notify
import (
"b612.me/starcrypto"
"log"
)
var defaultRsaKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
@ -74,13 +75,23 @@ GoBHEfvmAoGGrk4qNbjm7JECAwEAAQ==
var defaultAesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133}
func defaultMsgEn(key []byte, d []byte) []byte {
return starcrypto.AesEncryptCFB(d, key)
data, err := starcrypto.CustomEncryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func defaultMsgDe(key []byte, d []byte) []byte {
return starcrypto.AesDecryptCFB(d, key)
data, err := starcrypto.CustomDecryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func init() {
Register(TransferMsg{})
RegisterName("b612.me/notify.Transfer", TransferMsg{})
}

@ -3,7 +3,7 @@ module b612.me/notify
go 1.16
require (
b612.me/starcrypto v0.0.1
b612.me/stario v0.0.5
b612.me/starnet v0.1.3
b612.me/starcrypto v0.0.3
b612.me/stario v0.0.9
b612.me/starnet v0.1.8
)

@ -1,17 +1,51 @@
b612.me/starcrypto v0.0.1 h1:xGngzXPUrVbqtWzNw2e+0eWsdG7GG1/X+ONDGIzdriI=
b612.me/starcrypto v0.0.1/go.mod h1:hz0xRnfWNpYOlVrIPoGrQOWPibq4YiUZ7qN5tsQbzPo=
b612.me/stario v0.0.5 h1:Q1OGF+8eOoK49zMzkyh80GWaMuknhey6+PWJJL9ZuNo=
b612.me/stario v0.0.5/go.mod h1:or4ssWcxQSjMeu+hRKEgtp0X517b3zdlEOAms8Qscvw=
b612.me/starnet v0.1.3 h1:UjY6M96gdPdJtxnQGzCttqSwFw93sDZSHiIGtdOlFfk=
b612.me/starnet v0.1.3/go.mod h1:j/dd6BKwQK80O4gfbGYg2aYtPH76gSdgpuKboK/DwN4=
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000 h1:SL+8VVnkqyshUSz5iNnXtrBQzvFF2SkROm6t5RczFAE=
golang.org/x/crypto v0.0.0-20220313003712-b769efc7c000/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
b612.me/starcrypto v0.0.3 h1:lBGtz0kBdsV198BDQ72zBgOXYKBCb47R9tCWP/PFIwA=
b612.me/starcrypto v0.0.3/go.mod h1:pF5A16p8r/h1G0x7ZNmmAF6K1sdIMpbCUxn2WGC8gZ0=
b612.me/stario v0.0.9 h1:bFDlejUJMwZ12a09snZJspQsOlkqpDAl9qKPEYOGWCk=
b612.me/stario v0.0.9/go.mod h1:x4D/x8zA5SC0pj/uJAi4FyG5p4j5UZoMEZfvuRR6VNw=
b612.me/starnet v0.1.8 h1:sTNytUFP38i2BFR9nha3lTSLYb7El3tvKpZplYCrhZk=
b612.me/starnet v0.1.8/go.mod h1:k862Kf8DiVWTqdX6PHTFb6NoT+3G3Y74n8NCyNhuP0Y=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1 h1:v+OssWQX+hTHEmOBgwxdZxK4zHq3yOs8F9J7mk0PY8E=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0 h1:FcHjZXDMxI8mM3nwhX9HlKop4C0YQvCVCdwYl2wOtE8=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

@ -7,7 +7,7 @@ import (
"fmt"
"net"
"os"
"sync"
"reflect"
"sync/atomic"
"time"
)
@ -45,7 +45,6 @@ type Message struct {
ServerConn Client
TransferMsg
Time time.Time
sync.Mutex
}
type WaitMsg struct {
@ -146,8 +145,13 @@ func (c *ClientConn) readTUMessage() {
}
func (c *ClientConn) rsaDecode(message Message) {
unknownKey := message.Value
data, err := starcrypto.RSADecrypt(unknownKey, c.handshakeRsaKey, "")
privKey, err := starcrypto.DecodeRsaPrivateKey(c.handshakeRsaKey, "")
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
data, err := starcrypto.RSADecrypt(privKey, message.Value)
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
@ -470,3 +474,29 @@ func MustToMsgVal(val interface{}) MsgVal {
}
return d
}
func (m MsgVal) Orm(stu interface{}) error {
inf, err := m.ToInterface()
if err != nil {
return err
}
t := reflect.TypeOf(stu)
if t.Kind() != reflect.Ptr {
return errors.New("interface not writable(pointer wanted)")
}
if !reflect.ValueOf(stu).Elem().CanSet() {
return errors.New("interface not writable")
}
it := reflect.TypeOf(inf)
if t.Elem().Kind() != it.Kind() {
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
}
if t.Elem().Name() != it.Name() {
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
}
if t.Elem().String() != it.String() {
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
}
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
return nil
}

@ -0,0 +1,96 @@
package notify
import (
"fmt"
"testing"
"time"
)
func TestMsgEnDeCode(t *testing.T) {
Register(HelloMessage{})
Register(Error{})
go ServerRun(time.Second * 30)
time.Sleep(time.Second * 2)
ClientRun(time.Second * 35)
}
type Error struct {
Msg string
}
func (e Error) Error() string {
return e.Msg
}
type WorldMessage struct {
Port int
MyCode string
MyInfo []int
Err error
}
type HelloMessage struct {
ID string
MyMap map[string]string
MySlice []int
World WorldMessage
}
func ClientRun(stopTime time.Duration) {
c := NewClient()
err := c.Connect("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
c.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from server,struct detail is %+v\n", hi)
})
timer := time.NewTimer(stopTime)
for {
select {
case <-timer.C:
c.Stop()
return
case <-time.After(time.Second * 2):
fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
ID: "client",
MyMap: map[string]string{"hello": "world"},
MySlice: []int{int(time.Now().Unix())},
World: WorldMessage{
Port: 520,
MyCode: "b612",
MyInfo: []int{0, 1, 2, 3},
Err: Error{Msg: "Hello World"},
},
}))
}
}
}
func ServerRun(stopTime time.Duration) {
s := NewServer()
err := s.Listen("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
s.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
hi.ID = "server"
hi.World.Port = 666
hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
msg.ReplyObj(hi)
})
<-time.After(stopTime)
s.Stop()
}

@ -9,11 +9,22 @@ func Register(data interface{}) {
gob.Register(data)
}
func RegisterName(name string, data interface{}) {
gob.RegisterName(name, data)
}
func RegisterAll(data []interface{}) {
for _, v := range data {
gob.Register(v)
}
}
func RegisterNames(data map[string]interface{}) {
for k, v := range data {
gob.RegisterName(k, v)
}
}
func encode(src interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)

@ -6,6 +6,7 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
"os"
@ -42,6 +43,7 @@ type ServerCommon struct {
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
showError bool
debugMode bool
}
func NewServer() Server {
@ -65,6 +67,19 @@ func NewServer() Server {
}
return &server
}
func (s *ServerCommon) DebugMode(dmg bool) {
s.mu.Lock()
s.debugMode = dmg
s.mu.Unlock()
}
func (s *ServerCommon) IsDebugMode() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.debugMode
}
func (s *ServerCommon) ShowError(std bool) {
s.mu.Lock()
s.showError = std
@ -91,7 +106,7 @@ func (s *ServerCommon) Listen(network string, addr string) error {
return errors.New("server already run")
}
s.stopCtx, s.stopFn = context.WithCancel(context.Background())
s.queue = starnet.NewQueueCtx(s.stopCtx, 128)
s.queue = starnet.NewQueueCtx(s.stopCtx, 128, math.MaxUint32)
if strings.Contains(strings.ToLower(network), "udp") {
return s.ListenUDP(network, addr)
}
@ -185,16 +200,22 @@ func (s *ServerCommon) acceptTU() {
for {
select {
case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return
default:
}
conn, err := s.listener.Accept()
if err != nil {
if s.showError {
if s.showError || s.debugMode {
fmt.Println("error accept:", err)
}
continue
}
if s.debugMode {
fmt.Println("accept new connection from", conn.RemoteAddr())
}
var id string
for {
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
@ -241,7 +262,7 @@ func (s *ServerCommon) loadMessage() {
s.mu.RLock()
for _, v := range s.clientPool {
wg.Add(1)
go func() {
go func(v *ClientConn) {
defer wg.Done()
v.sayGoodByeForTU()
v.alive.Store(false)
@ -252,7 +273,7 @@ func (s *ServerCommon) loadMessage() {
}
v.stopFn()
s.removeClient(v)
}()
}(v)
}
s.mu.RUnlock()
select {
@ -282,7 +303,7 @@ func (s *ServerCommon) loadMessage() {
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
if err != nil {
if s.showError {
if s.showError || s.debugMode {
fmt.Println("server decode data error", err)
}
return
@ -343,8 +364,8 @@ func (s *ServerCommon) dispatchMsg(message Message) {
return
}
//just throw
return
//fallthrough
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
@ -377,7 +398,9 @@ func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
return WaitMsg{}, err
}
}
_, err = c.tuConn.Write(data)
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
@ -416,7 +439,7 @@ func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Dur
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-s.stopCtx.Done():
return Message{}, errors.New("Service shutdown")
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
@ -447,7 +470,7 @@ func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Conte
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrClosed
case <-s.stopCtx.Done():
return Message{}, errors.New("Service shutdown")
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
@ -524,6 +547,9 @@ func (s *ServerCommon) acceptUDP() {
for {
select {
case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return
default:
}
@ -533,6 +559,9 @@ func (s *ServerCommon) acceptUDP() {
data := make([]byte, 4096)
num, addr, err := s.udpListener.ReadFromUDP(data)
id := addr.String()
if s.debugMode {
fmt.Println("accept new udp message from", id)
}
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
s.mu.RLock()
if _, ok := s.clientPool[id]; !ok {
@ -626,7 +655,7 @@ func (s *ServerCommon) GetClient(id string) *ClientConn {
func (s *ServerCommon) GetClientLists() []*ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
var list []*ClientConn = make([]*ClientConn, 0, len(s.clientPool))
var list = make([]*ClientConn, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v)
}

@ -41,6 +41,8 @@ type Server interface {
GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error))
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
HeartbeatTimeoutSec() int64
SetHeartbeatTimeoutSec(int64)

@ -14,22 +14,24 @@ func Test_ServerTuAndClientCommon(t *testing.T) {
noEn := func(key, bn []byte) []byte {
return bn
}
_ = noEn
server := NewServer()
server.SetDefaultCommDecode(noEn)
server.SetDefaultCommEncode(noEn)
//server.SetDefaultCommDecode(noEn)
//server.SetDefaultCommEncode(noEn)
err := server.Listen("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
server.SetLink("notify", notify)
for i := 1; i <= 5000; i++ {
for i := 1; i <= 100; i++ {
go func() {
client := NewClient()
client.SetMsgEn(noEn)
client.SetMsgDe(noEn)
client.SetSkipExchangeKey(true)
//client.SetMsgEn(noEn)
//client.SetMsgDe(noEn)
//client.SetSkipExchangeKey(true)
err = client.Connect("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
time.Sleep(time.Second * 2)
return
}
@ -37,7 +39,8 @@ func Test_ServerTuAndClientCommon(t *testing.T) {
for {
//nowd = time.Now().UnixNano()
client.SendWait("notify", []byte("client hello"),time.Second*15)
client.SendWait("notify", []byte("client hello"), time.Second*15)
//client.Send("notify", []byte("client hello"))
//time.Sleep(time.Millisecond)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
//client.Send("notify", []byte("client"))
@ -65,7 +68,10 @@ func notify(msg *Message) {
}
func Test_normal(t *testing.T) {
server, _ := net.Listen("udp", "127.0.0.1:12345")
server, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
}
go func() {
for {
conn, err := server.Accept()
@ -87,7 +93,7 @@ func Test_normal(t *testing.T) {
time.Sleep(time.Second * 5)
for i := 1; i <= 100; i++ {
go func() {
conn, err := net.Dial("udp", "127.0.0.1:12345")
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}

Loading…
Cancel
Save