diff --git a/client.go b/client.go index e3769f3..b4206c9 100644 --- a/client.go +++ b/client.go @@ -4,83 +4,96 @@ import ( "net" "strings" "time" -) -var connc net.Conn + "b612.me/starainrt" +) -var clientSign map[string]chan string -var clientStopSign chan int +// StarNotifyC 为Client端 +type StarNotifyC struct { + connc net.Conn + clientSign map[string]chan string + clientStopSign chan int + // Stop 停止信号 + Stop chan int + notifychan chan int + // Queue 是用来处理收发信息的简单消息队列 + Queue *starainrt.StarQueue +} -func init() { - clientStopSign = make(chan int) - clientSign = make(map[string]chan string) +func (star *StarNotifyC) starinitc() { + star.Queue = starainrt.NewQueue() + star.clientStopSign, star.notifychan, star.Stop = make(chan int), make(chan int, 3), make(chan int, 5) + star.clientSign = make(map[string]chan string) } // Notify 用于获取一个通知 -func Notify(key string) chan string { - if _, ok := clientSign[key]; !ok { +func (star *StarNotifyC) Notify(key string) chan string { + if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 5) - clientSign[key] = ch + star.clientSign[key] = ch } - return clientSign[key] + return star.clientSign[key] } -func store(key, value string) { - if _, ok := clientSign[key]; !ok { +func (star *StarNotifyC) store(key, value string) { + if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 5) ch <- value - clientSign[key] = ch + star.clientSign[key] = ch return } - clientSign[key] <- value + star.clientSign[key] <- value } // NewNotifyC 用于新建一个Client端进程 -func NewNotifyC(netype, value string) error { +func NewNotifyC(netype, value string) (StarNotifyC, error) { var err error - connc, err = net.Dial(netype, value) + var star StarNotifyC + star.starinitc() + star.connc, err = net.Dial(netype, value) if err != nil { - return err + return star, err } - go cnotify() + go star.cnotify() go func() { for { select { - case <-clientStopSign: - connc.Close() - break + case <-star.clientStopSign: + star.notifychan <- 1 + star.connc.Close() + star.Stop <- 1 + return default: } buf := make([]byte, 8192) - n, err := connc.Read(buf) - Queue.ParseMessage(buf[0:n], connc) + n, err := star.connc.Read(buf) + star.Queue.ParseMessage(buf[0:n], star.connc) if err != nil { - connc.Close() - notifychan <- 0 + star.connc.Close() + star.Stop <- 1 + star.notifychan <- 0 go NewNotifyC(netype, value) - break + return } } }() - return nil + return star, nil } // Send 用于向Server端发送数据 -func Send(name string) error { - _, err := connc.Write(Queue.BuildMessage(name)) +func (star *StarNotifyC) Send(name string) error { + _, err := star.connc.Write(star.Queue.BuildMessage(name)) return err } -func cnotify() { +func (star *StarNotifyC) cnotify() { for { select { - case <-clientStopSign: - break - case <-notifychan: - break + case <-star.notifychan: + return default: } - data, err := Queue.RestoreOne() + data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue @@ -89,11 +102,11 @@ func cnotify() { if len(strs) < 2 { continue } - go store(strs[0], strs[1]) + go star.store(strs[0], strs[1]) } } // ClientStop 终止client端运行 -func ClientStop() { - clientStopSign <- 0 +func (star *StarNotifyC) ClientStop() { + star.clientStopSign <- 0 } diff --git a/server.go b/server.go index abebe37..9eaab4b 100644 --- a/server.go +++ b/server.go @@ -8,13 +8,22 @@ import ( "b612.me/starainrt" ) -// Queue 是用来处理收发信息的简单消息队列 -var Queue *starainrt.StarQueue +var builder *starainrt.StarQueue -// FuncLists 记录了被通知项所记录的函数 -var FuncLists map[string]func(NetMsg) string -var serverStopSign chan int -var notifychan chan int +func init() { + builder = starainrt.NewQueue() +} + +// StarNotifyS 为Server端 +type StarNotifyS struct { + // Queue 是用来处理收发信息的简单消息队列 + Queue *starainrt.StarQueue + // FuncLists 记录了被通知项所记录的函数 + FuncLists map[string]func(NetMsg) string + defaultFunc func(NetMsg) string + serverStopSign chan int + notifychan chan int +} // NetMsg 指明当前被通知的关键字 type NetMsg struct { @@ -24,27 +33,30 @@ type NetMsg struct { // Send 用于向client端发送数据 func (nmsg *NetMsg) Send(msg string) error { - _, err := nmsg.Conn.Write(Queue.BuildMessage(nmsg.key + "||" + msg)) + _, err := nmsg.Conn.Write(builder.BuildMessage(nmsg.key + "||" + msg)) return err } -func init() { - serverStopSign, notifychan = make(chan int), make(chan int) - Queue = starainrt.NewQueue() - FuncLists = make(map[string]func(NetMsg) string) +func (star *StarNotifyS) starinits() { + star.serverStopSign, star.notifychan = make(chan int), make(chan int, 3) + star.Queue = starainrt.NewQueue() + star.FuncLists = make(map[string]func(NetMsg) string) } // NewNotifyS 开启一个新的Server端通知 -func NewNotifyS(netype, value string) error { +func NewNotifyS(netype, value string) (StarNotifyS, error) { + var star StarNotifyS + star.starinits() listener, err := net.Listen(netype, value) if err == nil { - go notify() + go star.notify() go func() { for { select { - case <-serverStopSign: + case <-star.serverStopSign: + star.notifychan <- 1 listener.Close() - break + return default: } conn, err := listener.Accept() @@ -54,14 +66,16 @@ func NewNotifyS(netype, value string) error { go func(conn net.Conn) { for { select { - case <-serverStopSign: - break + case <-star.serverStopSign: + star.notifychan <- 1 + conn.Close() + return default: } buf := make([]byte, 8192) n, err := conn.Read(buf) if n != 0 { - Queue.ParseMessage(buf[0:n], conn) + star.Queue.ParseMessage(buf[0:n], conn) } if err != nil { conn.Close() @@ -72,40 +86,52 @@ func NewNotifyS(netype, value string) error { } }() } - return err + return star, err +} + +// SetNotify 用于设置通知关键词的调用函数 +func (star *StarNotifyS) SetNotify(name string, data func(NetMsg) string) { + star.FuncLists[name] = data } -// SetNotify 用于设置通知关键词和调用函数 -func SetNotify(name string, data func(NetMsg) string) { - FuncLists[name] = data +// SetDefaultNotify 用于设置默认关键词的调用函数 +func (star *StarNotifyS) SetDefaultNotify(name string, data func(NetMsg) string) { + star.defaultFunc = data } -func notify() { +func (star *StarNotifyS) notify() { for { select { - case <-serverStopSign: - break - case <-notifychan: - break + case <-star.notifychan: + return default: } - data, err := Queue.RestoreOne() + data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue } - if msg, ok := FuncLists[data.Msg]; ok { + if msg, ok := star.FuncLists[data.Msg]; ok { sdata := msg(NetMsg{data.Conn.(net.Conn), data.Msg}) if sdata == "" { continue } sdata = data.Msg + "||" + sdata - data.Conn.(net.Conn).Write(Queue.BuildMessage(sdata)) + data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) + } else { + if star.defaultFunc != nil { + sdata := star.defaultFunc(NetMsg{data.Conn.(net.Conn), data.Msg}) + if sdata == "" { + continue + } + sdata = data.Msg + "||" + sdata + data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) + } } } } // ServerStop 用于终止Server端运行 -func ServerStop() { - serverStopSign <- 0 +func (star *StarNotifyS) ServerStop() { + star.serverStopSign <- 0 }