diff --git a/client.go b/client.go index 453b91a..4aba3d4 100644 --- a/client.go +++ b/client.go @@ -21,6 +21,7 @@ type StarNotifyC struct { // UseChannel 是否使用channel作为信息传递 UseChannel bool notifychan chan int + isUDP bool // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue } @@ -63,6 +64,10 @@ func NewNotifyC(netype, value string) (*StarNotifyC, error) { var err error var star StarNotifyC star.starinitc() + star.isUDP = false + if strings.Index(netype, "udp") >= 0 { + star.isUDP = true + } star.connc, err = net.Dial(netype, value) if err != nil { return nil, err @@ -116,6 +121,10 @@ func (star *StarNotifyC) cnotify() { time.Sleep(time.Millisecond * 20) continue } + if data.Msg == "b612ryzstop" { + star.clientStopSign <- 0 + return + } strs := strings.SplitN(data.Msg, "||", 2) if len(strs) < 2 { continue @@ -137,6 +146,9 @@ func (star *StarNotifyC) cnotify() { // ClientStop 终止client端运行 func (star *StarNotifyC) ClientStop() { + if star.isUDP { + star.Send("b612ryzstop") + } star.clientStopSign <- 0 } diff --git a/client_test.go b/client_test.go index 0e0ab45..433ecb7 100644 --- a/client_test.go +++ b/client_test.go @@ -7,7 +7,7 @@ import ( ) func Test_usechannel(t *testing.T) { - server, err := NewNotifyS("tcp", "127.0.0.1:1926") + server, err := NewNotifyS("udp", "127.0.0.1:1926") if err != nil { fmt.Println(err) return @@ -20,7 +20,7 @@ func Test_usechannel(t *testing.T) { } return "" }) - client, err := NewNotifyC("tcp", "127.0.0.1:1926") + client, err := NewNotifyC("udp", "127.0.0.1:1926") if err != nil { fmt.Println(err) return @@ -39,7 +39,7 @@ func Test_usechannel(t *testing.T) { } func Test_nochannel(t *testing.T) { - server, err := NewNotifyS("tcp", "127.0.0.1:1926") + server, err := NewNotifyS("udp", "127.0.0.1:1926") if err != nil { fmt.Println(err) return @@ -52,7 +52,7 @@ func Test_nochannel(t *testing.T) { } return "" }) - client, err := NewNotifyC("tcp", "127.0.0.1:1926") + client, err := NewNotifyC("udp", "127.0.0.1:1926") if err != nil { fmt.Println(err) return diff --git a/server.go b/server.go index cc377c2..049cd24 100644 --- a/server.go +++ b/server.go @@ -25,6 +25,10 @@ type StarNotifyS struct { serverStopSign chan int notifychan chan int connPool map[string]net.Conn + udpPool map[string]*net.UDPAddr + isUDP bool + // UDPConn UDP监听 + UDPConn *net.UDPConn } // SMsg 指明当前服务端被通知的关键字 @@ -32,31 +36,94 @@ type SMsg struct { Conn net.Conn Key string Value string + UDP *net.UDPAddr + uconn *net.UDPConn } // Reply 用于向client端回复数据 func (nmsg *SMsg) Reply(msg string) error { - _, err := nmsg.Conn.Write(builder.BuildMessage(nmsg.Key + "||" + msg)) + var err error + if nmsg.uconn == nil { + _, err = nmsg.Conn.Write(builder.BuildMessage(nmsg.Key + "||" + msg)) + } else { + _, err = nmsg.uconn.WriteToUDP(builder.BuildMessage(nmsg.Key+"||"+msg), nmsg.UDP) + } return err } // Send 用于向client端发送key-value数据 func (nmsg *SMsg) Send(key, value string) error { - _, err := nmsg.Conn.Write(builder.BuildMessage(key + "||" + value)) + var err error + if nmsg.uconn == nil { + _, err = nmsg.Conn.Write(builder.BuildMessage(key + "||" + value)) + } else { + _, err = nmsg.uconn.WriteToUDP(builder.BuildMessage(key+"||"+value), nmsg.UDP) + } return err } func (star *StarNotifyS) starinits() { star.serverStopSign, star.notifychan = make(chan int, 1), make(chan int, 5) star.Queue = starainrt.NewQueue() + star.udpPool = make(map[string]*net.UDPAddr) star.FuncLists = make(map[string]func(SMsg) string) star.connPool = make(map[string]net.Conn) } // NewNotifyS 开启一个新的Server端通知 func NewNotifyS(netype, value string) (*StarNotifyS, error) { + if netype[0:3] != "udp" { + return notudps(netype, value) + } + return doudps(netype, value) +} + +func doudps(netype, value string) (*StarNotifyS, error) { var star StarNotifyS star.starinits() + star.isUDP = true + udpaddr, err := net.ResolveUDPAddr(netype, value) + if err != nil { + return nil, err + } + star.UDPConn, err = net.ListenUDP(netype, udpaddr) + if err != nil { + return nil, err + } + go star.notify() + go func() { + for { + go func() { + <-star.serverStopSign + star.notifychan <- 1 + star.notifychan <- 1 + for k, v := range star.udpPool { + star.UDPConn.WriteToUDP(star.Queue.BuildMessage("b612ryzstop"), v) + delete(star.connPool, k) + } + star.UDPConn.Close() + return + }() + for { + buf := make([]byte, 8192) + n, addr, err := star.UDPConn.ReadFromUDP(buf) + if n != 0 { + star.Queue.ParseMessage(buf[0:n], addr) + star.udpPool[addr.String()] = addr + } + if err != nil { + continue + } + } + } + }() + return &star, nil +} + +func notudps(netype, value string) (*StarNotifyS, error) { + var star StarNotifyS + star.starinits() + star.isUDP = false listener, err := net.Listen(netype, value) if err != nil { return nil, err @@ -129,21 +196,29 @@ func (star *StarNotifyS) notify() { continue } key, value := analyseData(data.Msg) + var rmsg SMsg + if !star.isUDP { + rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil} + } else { + rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn} + if key == "b612ryzstop" { + delete(star.udpPool, rmsg.UDP.String()) + continue + } + } if msg, ok := star.FuncLists[key]; ok { - sdata := msg(SMsg{data.Conn.(net.Conn), key, value}) + sdata := msg(rmsg) if sdata == "" { continue } - sdata = key + "||" + sdata - data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) + rmsg.Reply(sdata) } else { if star.defaultFunc != nil { - sdata := star.defaultFunc(SMsg{data.Conn.(net.Conn), key, value}) + sdata := star.defaultFunc(rmsg) if sdata == "" { continue } - sdata = key + "||" + sdata - data.Conn.(net.Conn).Write(star.Queue.BuildMessage(sdata)) + rmsg.Reply(sdata) } } }