// Package notify is a package which provide common tcp/udp/unix socket service package notify import ( "net" "strings" "time" "b612.me/starainrt" ) var builder *starainrt.StarQueue func init() { builder = starainrt.NewQueue() } // StarNotifyS 为Server端 type StarNotifyS struct { // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue // FuncLists 记录了被通知项所记录的函数 FuncLists map[string]func(SMsg) string defaultFunc func(SMsg) string serverStopSign chan int notifychan chan int connPool map[string]net.Conn udpPool map[string]*net.UDPAddr isUDP bool // UDPConn UDP监听 UDPConn *net.UDPConn // Online 当前链接是否处于活跃状态 Online bool } // SMsg 指明当前服务端被通知的关键字 type SMsg struct { Conn net.Conn Key string Value string UDP *net.UDPAddr uconn *net.UDPConn } // Reply 用于向client端回复数据 func (nmsg *SMsg) Reply(msg string) error { var err error if nmsg.uconn == nil { _, err = nmsg.Conn.Write(builder.BuildMessage(nmsg.Key + "||" + msg)) } else { _, err = nmsg.uconn.WriteToUDP(builder.BuildMessage(nmsg.Key+"||"+msg), nmsg.UDP) } return err } // Send 用于向client端发送key-value数据 func (nmsg *SMsg) Send(key, value string) error { var err error if nmsg.uconn == nil { _, err = nmsg.Conn.Write(builder.BuildMessage(key + "||" + value)) } else { _, err = nmsg.uconn.WriteToUDP(builder.BuildMessage(key+"||"+value), nmsg.UDP) } return err } func (star *StarNotifyS) starinits() { star.serverStopSign, star.notifychan = make(chan int, 1), make(chan int, 5) star.Queue = starainrt.NewQueue() star.udpPool = make(map[string]*net.UDPAddr) star.FuncLists = make(map[string]func(SMsg) string) star.connPool = make(map[string]net.Conn) star.Online = false } // NewNotifyS 开启一个新的Server端通知 func NewNotifyS(netype, value string) (*StarNotifyS, error) { if netype[0:3] != "udp" { return notudps(netype, value) } return doudps(netype, value) } func doudps(netype, value string) (*StarNotifyS, error) { var star StarNotifyS star.starinits() star.isUDP = true udpaddr, err := net.ResolveUDPAddr(netype, value) if err != nil { return nil, err } star.UDPConn, err = net.ListenUDP(netype, udpaddr) if err != nil { return nil, err } go star.notify() go func() { for { go func() { <-star.serverStopSign star.notifychan <- 1 star.notifychan <- 2 for k, v := range star.udpPool { star.UDPConn.WriteToUDP(star.Queue.BuildMessage("b612ryzstop"), v) delete(star.connPool, k) } star.UDPConn.Close() star.Online = false return }() for { buf := make([]byte, 8192) n, addr, err := star.UDPConn.ReadFromUDP(buf) if n != 0 { star.Queue.ParseMessage(buf[0:n], addr) star.udpPool[addr.String()] = addr } if err != nil { continue } } } }() star.Online = true return &star, nil } func notudps(netype, value string) (*StarNotifyS, error) { var star StarNotifyS star.starinits() star.isUDP = false listener, err := net.Listen(netype, value) if err != nil { return nil, err } go star.notify() go func() { for { go func() { <-star.serverStopSign star.notifychan <- 3 star.notifychan <- 4 for k, v := range star.connPool { v.Close() delete(star.connPool, k) } listener.Close() star.Online = false return }() conn, err := listener.Accept() if err != nil { select { case <-star.notifychan: listener.Close() return default: continue } } go func(conn net.Conn) { for { buf := make([]byte, 8192) n, err := conn.Read(buf) if n != 0 { star.Queue.ParseMessage(buf[0:n], conn) } if err != nil { conn.Close() delete(star.connPool, conn.RemoteAddr().String()) break } } }(conn) star.connPool[conn.RemoteAddr().String()] = conn } }() star.Online = true return &star, nil } // SetNotify 用于设置通知关键词的调用函数 func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) { star.FuncLists[name] = data } // SetDefaultNotify 用于设置默认关键词的调用函数 func (star *StarNotifyS) SetDefaultNotify(name string, data func(SMsg) string) { star.defaultFunc = data } func (star *StarNotifyS) notify() { for { select { case <-star.notifychan: return default: } data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue } key, value := analyseData(data.Msg) var rmsg SMsg if !star.isUDP { rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil} } else { rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn} if key == "b612ryzstop" { delete(star.udpPool, rmsg.UDP.String()) continue } } if msg, ok := star.FuncLists[key]; ok { sdata := msg(rmsg) if sdata == "" { continue } rmsg.Reply(sdata) } else { if star.defaultFunc != nil { sdata := star.defaultFunc(rmsg) if sdata == "" { continue } rmsg.Reply(sdata) } } } } func analyseData(msg string) (key, value string) { slice := strings.SplitN(msg, "||", 2) if len(slice) == 1 { return msg, "" } return slice[0], slice[1] } // ServerStop 用于终止Server端运行 func (star *StarNotifyS) ServerStop() { star.serverStopSign <- 0 }