diff --git a/client.go b/client.go index 05f3f3c..cdcd613 100644 --- a/client.go +++ b/client.go @@ -9,13 +9,14 @@ import ( "strings" "time" - "b612.me/starainrt" + "b612.me/starnet" ) // StarNotifyC 为Client端 type StarNotifyC struct { - Connc net.Conn - clientSign map[string]chan string + Connc net.Conn + dialTimeout time.Duration + clientSign map[string]chan string // FuncLists 当不使用channel时,使用此记录调用函数 FuncLists map[string]func(CMsg) stopSign context.Context @@ -27,7 +28,7 @@ type StarNotifyC struct { UseChannel bool isUDP bool // Queue 是用来处理收发信息的简单消息队列 - Queue *starainrt.StarQueue + Queue *starnet.StarQueue // Online 当前链接是否处于活跃状态 Online bool lockPool map[string]CMsg @@ -43,7 +44,10 @@ type CMsg struct { func (star *StarNotifyC) starinitc() { star.stopSign, star.cancel = context.WithCancel(context.Background()) - star.Queue = starainrt.NewQueue() + star.Queue = starnet.NewQueue() + star.Queue.EncodeFunc = encodeFunc + star.Queue.DecodeFunc = decodeFunc + star.Queue.Encode = true star.FuncLists = make(map[string]func(CMsg)) star.UseChannel = false star.Stop = make(chan int, 5) @@ -71,6 +75,45 @@ func (star *StarNotifyC) store(key, value string) { } star.clientSign[key] <- value } +func NewNotifyCWithTimeOut(netype, value string, timeout time.Duration) (*StarNotifyC, error) { + var err error + var star StarNotifyC + star.starinitc() + star.isUDP = false + if strings.Index(netype, "udp") >= 0 { + star.isUDP = true + } + star.Connc, err = net.DialTimeout(netype, value, timeout) + if err != nil { + return nil, err + } + star.dialTimeout = timeout + go star.cnotify() + go func() { + <-star.stopSign.Done() + star.Connc.Close() + star.Online = false + return + }() + go func() { + for { + buf := make([]byte, 8192) + n, err := star.Connc.Read(buf) + if n != 0 { + star.Queue.ParseMessage(buf[0:n], star.Connc) + } + if err != nil { + star.Connc.Close() + star.ClientStop() + //star, _ = NewNotifyC(netype, value) + star.Online = false + return + } + } + }() + star.Online = true + return &star, nil +} // NewNotifyC 用于新建一个Client端进程 func NewNotifyC(netype, value string) (*StarNotifyC, error) { diff --git a/client_test.go b/client_test.go index 3cf5f2f..b3561c9 100644 --- a/client_test.go +++ b/client_test.go @@ -25,6 +25,7 @@ func Test_usechannel(t *testing.T) { fmt.Println(err) return } + client.UseChannel = true //time.Sleep(time.Second * 10) client.Send("nihao") client.SendValue("nihao", "lalala") diff --git a/server.go b/server.go index adf754a..12293b9 100644 --- a/server.go +++ b/server.go @@ -10,19 +10,33 @@ import ( "strings" "time" - "b612.me/starainrt" + "b612.me/starcrypto" + + "b612.me/starnet" ) -var builder *starainrt.StarQueue +var builder *starnet.StarQueue +var aesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133} + +func encodeFunc(data []byte) []byte { + return starcrypto.AesEncryptCFB(data, aesKey) +} + +func decodeFunc(data []byte) []byte { + return starcrypto.AesDecryptCFB(data, aesKey) +} func init() { - builder = starainrt.NewQueue() + builder = starnet.NewQueue() + builder.EncodeFunc = encodeFunc + builder.DecodeFunc = decodeFunc + builder.Encode = true } // StarNotifyS 为Server端 type StarNotifyS struct { // Queue 是用来处理收发信息的简单消息队列 - Queue *starainrt.StarQueue + Queue *starnet.StarQueue // FuncLists 记录了被通知项所记录的函数 FuncLists map[string]func(SMsg) string defaultFunc func(SMsg) string @@ -199,7 +213,10 @@ func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Dur func (star *StarNotifyS) starinits() { star.stopSign, star.cancel = context.WithCancel(context.Background()) - star.Queue = starainrt.NewQueue() + star.Queue = starnet.NewQueue() + star.Queue.EncodeFunc = encodeFunc + star.Queue.DecodeFunc = decodeFunc + star.Queue.Encode = true star.udpPool = make(map[string]*net.UDPAddr) star.FuncLists = make(map[string]func(SMsg) string) star.connPool = make(map[string]net.Conn) diff --git a/starnotify/define.go b/starnotify/define.go index 050340b..c9d8161 100644 --- a/starnotify/define.go +++ b/starnotify/define.go @@ -2,6 +2,7 @@ package starnotify import ( "errors" + "time" "b612.me/notify" ) @@ -25,6 +26,15 @@ func NewClient(key, netype, value string) (*notify.StarNotifyC, error) { return client, err } +func NewClientWithTimeout(key, netype, value string, timeout time.Duration) (*notify.StarNotifyC, error) { + client, err := notify.NewNotifyCWithTimeOut(netype, value, timeout) + if err != nil { + return client, err + } + starClient[key] = client + return client, err +} + func DeleteClient(key string) error { client, ok := starClient[key] if !ok {