From bb9a240a03a409ca28317dff34a72da9b14be599 Mon Sep 17 00:00:00 2001 From: 兔子 Date: Tue, 3 Dec 2019 11:34:12 +0800 Subject: [PATCH] update code --- client.go | 69 ++++++++++++++++++++------ client_test.go | 75 +++++++++++++++++++++++++++++ server.go | 128 +++++++++++++++++++++++++++++-------------------- 3 files changed, 206 insertions(+), 66 deletions(-) create mode 100644 client_test.go diff --git a/client.go b/client.go index b4206c9..453b91a 100644 --- a/client.go +++ b/client.go @@ -10,26 +10,39 @@ import ( // StarNotifyC 为Client端 type StarNotifyC struct { - connc net.Conn - clientSign map[string]chan string + connc net.Conn + clientSign map[string]chan string + // FuncLists 当不使用channel时,使用此记录调用函数 + FuncLists map[string]func(CMsg) clientStopSign chan int + defaultFunc func(CMsg) // Stop 停止信号 - Stop chan int + Stop chan int + // UseChannel 是否使用channel作为信息传递 + UseChannel bool notifychan chan int // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue } +// CMsg 指明当前客户端被通知的关键字 +type CMsg struct { + Key string + Value string +} + func (star *StarNotifyC) starinitc() { star.Queue = starainrt.NewQueue() - star.clientStopSign, star.notifychan, star.Stop = make(chan int), make(chan int, 3), make(chan int, 5) + star.FuncLists = make(map[string]func(CMsg)) + star.UseChannel = true + star.clientStopSign, star.notifychan, star.Stop = make(chan int, 1), make(chan int, 3), make(chan int, 5) star.clientSign = make(map[string]chan string) } // Notify 用于获取一个通知 func (star *StarNotifyC) Notify(key string) chan string { if _, ok := star.clientSign[key]; !ok { - ch := make(chan string, 5) + ch := make(chan string, 20) star.clientSign[key] = ch } return star.clientSign[key] @@ -37,7 +50,7 @@ func (star *StarNotifyC) Notify(key string) chan string { func (star *StarNotifyC) store(key, value string) { if _, ok := star.clientSign[key]; !ok { - ch := make(chan string, 5) + ch := make(chan string, 20) ch <- value star.clientSign[key] = ch return @@ -46,25 +59,24 @@ func (star *StarNotifyC) store(key, value string) { } // NewNotifyC 用于新建一个Client端进程 -func NewNotifyC(netype, value string) (StarNotifyC, error) { +func NewNotifyC(netype, value string) (*StarNotifyC, error) { var err error var star StarNotifyC star.starinitc() star.connc, err = net.Dial(netype, value) if err != nil { - return star, err + return nil, err } go star.cnotify() go func() { for { - select { - case <-star.clientStopSign: + go func() { + <-star.clientStopSign star.notifychan <- 1 star.connc.Close() star.Stop <- 1 return - default: - } + }() buf := make([]byte, 8192) n, err := star.connc.Read(buf) star.Queue.ParseMessage(buf[0:n], star.connc) @@ -72,12 +84,12 @@ func NewNotifyC(netype, value string) (StarNotifyC, error) { star.connc.Close() star.Stop <- 1 star.notifychan <- 0 - go NewNotifyC(netype, value) + //star, _ = NewNotifyC(netype, value) return } } }() - return star, nil + return &star, nil } // Send 用于向Server端发送数据 @@ -86,6 +98,12 @@ func (star *StarNotifyC) Send(name string) error { return err } +// SendValue 用于向Server端发送key-value类型数据 +func (star *StarNotifyC) SendValue(name, value string) error { + _, err := star.connc.Write(star.Queue.BuildMessage(name + "||" + value)) + return err +} + func (star *StarNotifyC) cnotify() { for { select { @@ -102,7 +120,18 @@ func (star *StarNotifyC) cnotify() { if len(strs) < 2 { continue } - go star.store(strs[0], strs[1]) + if star.UseChannel { + go star.store(strs[0], strs[1]) + } else { + key, value := strs[0], strs[1] + if msg, ok := star.FuncLists[key]; ok { + go msg(CMsg{key, value}) + } else { + if star.defaultFunc != nil { + go star.defaultFunc(CMsg{key, value}) + } + } + } } } @@ -110,3 +139,13 @@ func (star *StarNotifyC) cnotify() { func (star *StarNotifyC) ClientStop() { star.clientStopSign <- 0 } + +// SetNotify 用于设置关键词的调用函数 +func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) { + star.FuncLists[name] = data +} + +// SetDefaultNotify 用于设置默认关键词的调用函数 +func (star *StarNotifyC) SetDefaultNotify(name string, data func(CMsg)) { + star.defaultFunc = data +} diff --git a/client_test.go b/client_test.go new file mode 100644 index 0000000..0e0ab45 --- /dev/null +++ b/client_test.go @@ -0,0 +1,75 @@ +package notify + +import ( + "fmt" + "testing" + "time" +) + +func Test_usechannel(t *testing.T) { + server, err := NewNotifyS("tcp", "127.0.0.1:1926") + if err != nil { + fmt.Println(err) + return + } + server.SetNotify("nihao", func(data SMsg) string { + fmt.Println("server recv:", data.Key, data.Value) + if data.Value != "" { + data.Reply("nba") + return "nb" + } + return "" + }) + client, err := NewNotifyC("tcp", "127.0.0.1:1926") + if err != nil { + fmt.Println(err) + return + } + //time.Sleep(time.Second * 10) + client.Send("nihao") + client.SendValue("nihao", "lalala") + txt := <-client.Notify("nihao") + fmt.Println("client", txt) + txt = <-client.Notify("nihao") + fmt.Println("client", txt) + server.ServerStop() + <-client.Stop + client.ClientStop() + time.Sleep(time.Second * 3) +} + +func Test_nochannel(t *testing.T) { + server, err := NewNotifyS("tcp", "127.0.0.1:1926") + if err != nil { + fmt.Println(err) + return + } + server.SetNotify("nihao", func(data SMsg) string { + fmt.Println("server recv:", data.Key, data.Value) + if data.Value != "" { + data.Reply("nba") + return "nb" + } + return "" + }) + client, err := NewNotifyC("tcp", "127.0.0.1:1926") + if err != nil { + fmt.Println(err) + return + } + //time.Sleep(time.Second * 10) + client.UseChannel = false + client.SetNotify("nihao", func(data CMsg) { + fmt.Println("client recv:", data.Key, data.Value) + if data.Value != "" { + time.Sleep(time.Millisecond * 900) + client.SendValue("nihao", "dsb") + } + }) + client.SendValue("nihao", "lalala") + time.Sleep(time.Second * 3) + server.ServerStop() + <-client.Stop + client.ClientStop() + time.Sleep(time.Second * 3) +} diff --git a/server.go b/server.go index 9eaab4b..cc377c2 100644 --- a/server.go +++ b/server.go @@ -3,6 +3,7 @@ package notify import ( "net" + "strings" "time" "b612.me/starainrt" @@ -19,83 +20,99 @@ type StarNotifyS struct { // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue // FuncLists 记录了被通知项所记录的函数 - FuncLists map[string]func(NetMsg) string - defaultFunc func(NetMsg) string + FuncLists map[string]func(SMsg) string + defaultFunc func(SMsg) string serverStopSign chan int notifychan chan int + connPool map[string]net.Conn } -// NetMsg 指明当前被通知的关键字 -type NetMsg struct { - Conn net.Conn - key string +// SMsg 指明当前服务端被通知的关键字 +type SMsg struct { + Conn net.Conn + Key string + Value string } -// Send 用于向client端发送数据 -func (nmsg *NetMsg) Send(msg string) error { - _, err := nmsg.Conn.Write(builder.BuildMessage(nmsg.key + "||" + msg)) +// Reply 用于向client端回复数据 +func (nmsg *SMsg) Reply(msg string) error { + _, err := nmsg.Conn.Write(builder.BuildMessage(nmsg.Key + "||" + msg)) + return err +} + +// Send 用于向client端发送key-value数据 +func (nmsg *SMsg) Send(key, value string) error { + _, err := nmsg.Conn.Write(builder.BuildMessage(key + "||" + value)) return err } func (star *StarNotifyS) starinits() { - star.serverStopSign, star.notifychan = make(chan int), make(chan int, 3) + star.serverStopSign, star.notifychan = make(chan int, 1), make(chan int, 5) star.Queue = starainrt.NewQueue() - star.FuncLists = make(map[string]func(NetMsg) string) + star.FuncLists = make(map[string]func(SMsg) string) + star.connPool = make(map[string]net.Conn) } // NewNotifyS 开启一个新的Server端通知 -func NewNotifyS(netype, value string) (StarNotifyS, error) { +func NewNotifyS(netype, value string) (*StarNotifyS, error) { var star StarNotifyS star.starinits() listener, err := net.Listen(netype, value) - if err == nil { - go star.notify() - go func() { - for { + if err != nil { + return nil, err + } + go star.notify() + go func() { + for { + go func() { + <-star.serverStopSign + star.notifychan <- 1 + star.notifychan <- 1 + for k, v := range star.connPool { + v.Close() + delete(star.connPool, k) + } + listener.Close() + return + }() + + conn, err := listener.Accept() + if err != nil { select { - case <-star.serverStopSign: - star.notifychan <- 1 + case <-star.notifychan: listener.Close() return default: - } - conn, err := listener.Accept() - if err != nil { continue } - go func(conn net.Conn) { - for { - select { - case <-star.serverStopSign: - star.notifychan <- 1 - conn.Close() - return - default: - } - buf := make([]byte, 8192) - n, err := conn.Read(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], conn) - } - if err != nil { - conn.Close() - break - } - } - }(conn) } - }() - } - return star, err + go func(conn net.Conn) { + for { + buf := make([]byte, 8192) + n, err := conn.Read(buf) + if n != 0 { + star.Queue.ParseMessage(buf[0:n], conn) + } + if err != nil { + conn.Close() + delete(star.connPool, conn.RemoteAddr().String()) + break + } + } + }(conn) + star.connPool[conn.RemoteAddr().String()] = conn + } + }() + return &star, nil } // SetNotify 用于设置通知关键词的调用函数 -func (star *StarNotifyS) SetNotify(name string, data func(NetMsg) string) { +func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) { star.FuncLists[name] = data } // SetDefaultNotify 用于设置默认关键词的调用函数 -func (star *StarNotifyS) SetDefaultNotify(name string, data func(NetMsg) string) { +func (star *StarNotifyS) SetDefaultNotify(name string, data func(SMsg) string) { star.defaultFunc = data } @@ -111,26 +128,35 @@ func (star *StarNotifyS) notify() { time.Sleep(time.Millisecond * 20) continue } - if msg, ok := star.FuncLists[data.Msg]; ok { - sdata := msg(NetMsg{data.Conn.(net.Conn), data.Msg}) + key, value := analyseData(data.Msg) + if msg, ok := star.FuncLists[key]; ok { + sdata := msg(SMsg{data.Conn.(net.Conn), key, value}) if sdata == "" { continue } - sdata = data.Msg + "||" + sdata + sdata = key + "||" + sdata data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) } else { if star.defaultFunc != nil { - sdata := star.defaultFunc(NetMsg{data.Conn.(net.Conn), data.Msg}) + sdata := star.defaultFunc(SMsg{data.Conn.(net.Conn), key, value}) if sdata == "" { continue } - sdata = data.Msg + "||" + sdata + sdata = key + "||" + sdata data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) } } } } +func analyseData(msg string) (key, value string) { + slice := strings.SplitN(msg, "||", 2) + if len(slice) == 1 { + return msg, "" + } + return slice[0], slice[1] +} + // ServerStop 用于终止Server端运行 func (star *StarNotifyS) ServerStop() { star.serverStopSign <- 0