diff --git a/client.go b/client.go index ad8e9e7..0b65377 100644 --- a/client.go +++ b/client.go @@ -10,13 +10,13 @@ import ( // StarNotifyC 为Client端 type StarNotifyC struct { - connc net.Conn + Connc net.Conn clientSign map[string]chan string // FuncLists 当不使用channel时,使用此记录调用函数 FuncLists map[string]func(CMsg) clientStopSign chan int defaultFunc func(CMsg) - // Stop 停止信号 + // Stop 停止信 号 Stop chan int // UseChannel 是否使用channel作为信息传递 UseChannel bool @@ -71,34 +71,34 @@ func NewNotifyC(netype, value string) (*StarNotifyC, error) { if strings.Index(netype, "udp") >= 0 { star.isUDP = true } - star.connc, err = net.Dial(netype, value) + star.Connc, err = net.Dial(netype, value) if err != nil { return nil, err } go star.cnotify() + go func() { + <-star.clientStopSign + star.notifychan <- 2 + star.Connc.Close() + star.Stop <- 0 + star.Online = false + return + }() go func() { for { - go func() { - <-star.clientStopSign - star.notifychan <- 2 - star.connc.Close() - star.Stop <- 0 - star.Online = false - return - }() buf := make([]byte, 8192) - n, err := star.connc.Read(buf) + n, err := star.Connc.Read(buf) + if n != 0 { + star.Queue.ParseMessage(buf[0:n], star.Connc) + } if err != nil { - star.connc.Close() + star.Connc.Close() star.Stop <- 1 star.notifychan <- 3 //star, _ = NewNotifyC(netype, value) star.Online = false return } - if n != 0 { - star.Queue.ParseMessage(buf[0:n], star.connc) - } } }() star.Online = true @@ -107,13 +107,13 @@ func NewNotifyC(netype, value string) (*StarNotifyC, error) { // Send 用于向Server端发送数据 func (star *StarNotifyC) Send(name string) error { - _, err := star.connc.Write(star.Queue.BuildMessage(name)) + _, err := star.Connc.Write(star.Queue.BuildMessage(name)) return err } // SendValue 用于向Server端发送key-value类型数据 func (star *StarNotifyC) SendValue(name, value string) error { - _, err := star.connc.Write(star.Queue.BuildMessage(name + "||" + value)) + _, err := star.Connc.Write(star.Queue.BuildMessage(name + "||" + value)) return err } diff --git a/server.go b/server.go index c6f385d..b76fa19 100644 --- a/server.go +++ b/server.go @@ -31,6 +31,13 @@ type StarNotifyS struct { UDPConn *net.UDPConn // Online 当前链接是否处于活跃状态 Online bool + // ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn + ReadDeadline time.Time + // WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn + WriteDeadline time.Time + + // Deadline tcp/unix中超时设置,udp请直接调用UDPConn + Deadline time.Time } // SMsg 指明当前服务端被通知的关键字 @@ -94,30 +101,28 @@ func doudps(netype, value string) (*StarNotifyS, error) { return nil, err } go star.notify() + go func() { + <-star.serverStopSign + star.notifychan <- 1 + star.notifychan <- 2 + for k, v := range star.udpPool { + star.UDPConn.WriteToUDP(star.Queue.BuildMessage("b612ryzstop"), v) + delete(star.connPool, k) + } + star.UDPConn.Close() + star.Online = false + return + }() go func() { for { - go func() { - <-star.serverStopSign - star.notifychan <- 1 - star.notifychan <- 2 - for k, v := range star.udpPool { - star.UDPConn.WriteToUDP(star.Queue.BuildMessage("b612ryzstop"), v) - delete(star.connPool, k) - } - star.UDPConn.Close() - star.Online = false - return - }() - for { - buf := make([]byte, 8192) - n, addr, err := star.UDPConn.ReadFromUDP(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], addr) - star.udpPool[addr.String()] = addr - } - if err != nil { - continue - } + buf := make([]byte, 8192) + n, addr, err := star.UDPConn.ReadFromUDP(buf) + if n != 0 { + star.Queue.ParseMessage(buf[0:n], addr) + star.udpPool[addr.String()] = addr + } + if err != nil { + continue } } }() @@ -134,21 +139,20 @@ func notudps(netype, value string) (*StarNotifyS, error) { return nil, err } go star.notify() + go func() { + <-star.serverStopSign + star.notifychan <- 3 + star.notifychan <- 4 + for k, v := range star.connPool { + v.Close() + delete(star.connPool, k) + } + listener.Close() + star.Online = false + return + }() go func() { for { - go func() { - <-star.serverStopSign - star.notifychan <- 3 - star.notifychan <- 4 - for k, v := range star.connPool { - v.Close() - delete(star.connPool, k) - } - listener.Close() - star.Online = false - return - }() - conn, err := listener.Accept() if err != nil { select { @@ -159,6 +163,15 @@ func notudps(netype, value string) (*StarNotifyS, error) { continue } } + if !star.ReadDeadline.IsZero() { + conn.SetReadDeadline(star.ReadDeadline) + } + if !star.WriteDeadline.IsZero() { + conn.SetWriteDeadline(star.WriteDeadline) + } + if !star.Deadline.IsZero() { + conn.SetDeadline(star.Deadline) + } go func(conn net.Conn) { for { buf := make([]byte, 8192) @@ -213,21 +226,23 @@ func (star *StarNotifyS) notify() { continue } } - if msg, ok := star.FuncLists[key]; ok { - sdata := msg(rmsg) - if sdata == "" { - continue - } - rmsg.Reply(sdata) - } else { - if star.defaultFunc != nil { - sdata := star.defaultFunc(rmsg) + go func() { + if msg, ok := star.FuncLists[key]; ok { + sdata := msg(rmsg) if sdata == "" { - continue + return } rmsg.Reply(sdata) + } else { + if star.defaultFunc != nil { + sdata := star.defaultFunc(rmsg) + if sdata == "" { + return + } + rmsg.Reply(sdata) + } } - } + }() } }