package notify import ( "net" "strings" "time" "b612.me/starainrt" ) // StarNotifyC 为Client端 type StarNotifyC struct { connc net.Conn clientSign map[string]chan string clientStopSign chan int // Stop 停止信号 Stop chan int notifychan chan int // Queue 是用来处理收发信息的简单消息队列 Queue *starainrt.StarQueue } func (star *StarNotifyC) starinitc() { star.Queue = starainrt.NewQueue() star.clientStopSign, star.notifychan, star.Stop = make(chan int), make(chan int, 3), make(chan int, 5) star.clientSign = make(map[string]chan string) } // Notify 用于获取一个通知 func (star *StarNotifyC) Notify(key string) chan string { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 5) star.clientSign[key] = ch } return star.clientSign[key] } func (star *StarNotifyC) store(key, value string) { if _, ok := star.clientSign[key]; !ok { ch := make(chan string, 5) ch <- value star.clientSign[key] = ch return } star.clientSign[key] <- value } // NewNotifyC 用于新建一个Client端进程 func NewNotifyC(netype, value string) (StarNotifyC, error) { var err error var star StarNotifyC star.starinitc() star.connc, err = net.Dial(netype, value) if err != nil { return star, err } go star.cnotify() go func() { for { select { case <-star.clientStopSign: star.notifychan <- 1 star.connc.Close() star.Stop <- 1 return default: } buf := make([]byte, 8192) n, err := star.connc.Read(buf) star.Queue.ParseMessage(buf[0:n], star.connc) if err != nil { star.connc.Close() star.Stop <- 1 star.notifychan <- 0 go NewNotifyC(netype, value) return } } }() return star, nil } // Send 用于向Server端发送数据 func (star *StarNotifyC) Send(name string) error { _, err := star.connc.Write(star.Queue.BuildMessage(name)) return err } func (star *StarNotifyC) cnotify() { for { select { case <-star.notifychan: return default: } data, err := star.Queue.RestoreOne() if err != nil { time.Sleep(time.Millisecond * 20) continue } strs := strings.SplitN(data.Msg, "||", 2) if len(strs) < 2 { continue } go star.store(strs[0], strs[1]) } } // ClientStop 终止client端运行 func (star *StarNotifyC) ClientStop() { star.clientStopSign <- 0 }