diff --git a/v1/client.go b/v1/client.go deleted file mode 100644 index 2402ae8..0000000 --- a/v1/client.go +++ /dev/null @@ -1,394 +0,0 @@ -package notify - -import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "strings" - "sync" - "time" - - "b612.me/starcrypto" - "b612.me/starnet" -) - -// StarNotifyC 为Client端 -type StarNotifyC struct { - Connc net.Conn - dialTimeout time.Duration - clientSign map[string]chan string - mu sync.Mutex - // FuncLists 当不使用channel时,使用此记录调用函数 - FuncLists map[string]func(CMsg) - stopSign context.Context - cancel context.CancelFunc - defaultFunc func(CMsg) - // UseChannel 是否使用channel作为信息传递 - UseChannel bool - isUDP bool - Sync bool - // Queue 是用来处理收发信息的简单消息队列 - Queue *starnet.StarQueue - // Online 当前链接是否处于活跃状态 - Online bool - lockPool map[string]CMsg - aesKey []byte -} - -// CMsg 指明当前客户端被通知的关键字 -type CMsg struct { - Key string - Value string - mode string - wait chan int -} - -func WriteToUDP(local *net.UDPConn, remote *net.UDPAddr, data []byte) error { - var MAX_RECV_LEN = 8192 - var haveErr error - end := len(data) - for i := 0; i < end; i += MAX_RECV_LEN { - step := i + MAX_RECV_LEN - if step > end { - step = end - } - _, err := local.WriteToUDP(data[i:step], remote) - if err != nil { - haveErr = err - } - } - return haveErr -} - -func (star *StarNotifyC) starinitc() { - builder := starnet.NewQueue() - builder.EncodeFunc = encodeFunc - builder.DecodeFunc = decodeFunc - builder.Encode = true - star.stopSign, star.cancel = context.WithCancel(context.Background()) - star.Queue = builder - star.FuncLists = make(map[string]func(CMsg)) - star.UseChannel = false - star.clientSign = make(map[string]chan string) - star.Online = false - star.lockPool = make(map[string]CMsg) - star.Queue.RestoreDuration(time.Millisecond * 50) -} - -func (star *StarNotifyC) SetAesKey(key []byte) { - star.aesKey = key - star.Queue.EncodeFunc = func(data []byte) []byte { - return starcrypto.AesEncryptCFB(data, key) - } - star.Queue.DecodeFunc = func(data []byte) []byte { - return starcrypto.AesDecryptCFB(data, key) - } -} - -func (star *StarNotifyC) GetAesKey() []byte { - if len(star.aesKey) == 0 { - return aesKey - } - return star.aesKey -} - -// Notify 用于获取一个通知 -func (star *StarNotifyC) Notify(key string) chan string { - if _, ok := star.clientSign[key]; !ok { - ch := make(chan string, 20) - star.mu.Lock() - star.clientSign[key] = ch - star.mu.Unlock() - } - return star.clientSign[key] -} - -func (star *StarNotifyC) store(key, value string) { - if _, ok := star.clientSign[key]; !ok { - ch := make(chan string, 20) - ch <- value - star.mu.Lock() - star.clientSign[key] = ch - star.mu.Unlock() - return - } - star.clientSign[key] <- value -} -func NewNotifyCWithTimeOut(netype, value string, timeout time.Duration) (*StarNotifyC, error) { - var err error - var star StarNotifyC - star.starinitc() - star.isUDP = false - if strings.Index(netype, "udp") >= 0 { - star.isUDP = true - } - star.Connc, err = net.DialTimeout(netype, value, timeout) - if err != nil { - return nil, err - } - star.dialTimeout = timeout - go star.cnotify() - go func() { - <-star.stopSign.Done() - star.Connc.Close() - star.Online = false - return - }() - go func() { - for { - buf := make([]byte, 8192) - n, err := star.Connc.Read(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], star.Connc) - } - if err != nil { - star.Connc.Close() - star.ClientStop() - //star, _ = NewNotifyC(netype, value) - star.Online = false - return - } - } - }() - star.Online = true - return &star, nil -} - -// NewNotifyC 用于新建一个Client端进程 -func NewNotifyC(netype, value string) (*StarNotifyC, error) { - var err error - var star StarNotifyC - star.starinitc() - star.isUDP = false - if strings.Index(netype, "udp") >= 0 { - star.isUDP = true - } - star.Connc, err = net.Dial(netype, value) - if err != nil { - return nil, err - } - go star.cnotify() - go func() { - <-star.stopSign.Done() - star.Connc.Close() - star.Online = false - return - }() - go func() { - for { - buf := make([]byte, 8192) - n, err := star.Connc.Read(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], star.Connc) - } - if err != nil { - star.Connc.Close() - star.ClientStop() - //star, _ = NewNotifyC(netype, value) - star.Online = false - return - } - } - }() - star.Online = true - return &star, nil -} - -// Send 用于向Server端发送数据 -func (star *StarNotifyC) Send(name string) error { - return star.SendValue(name, "") -} - -func (star *StarNotifyC) Stoped() <-chan struct{} { - return star.stopSign.Done() -} - -func (star *StarNotifyC) SendValueRaw(key string, msg interface{}) error { - encodeData, err := encode(msg) - if err != nil { - return err - } - return star.SendValue(key, string(encodeData)) -} - -// SendValue 用于向Server端发送key-value类型数据 -func (star *StarNotifyC) SendValue(name, value string) error { - var err error - var key []byte - for _, v := range []byte(name) { - if v == byte(124) || v == byte(92) { - key = append(key, byte(92)) - } - key = append(key, v) - } - _, err = star.Connc.Write(star.Queue.BuildMessage([]byte("pa" + "||" + string(key) + "||" + value))) - return err -} - -func (star *StarNotifyC) trim(name string) string { - var slash bool = false - var key []byte - for _, v := range []byte(name) { - if v == byte(92) && !slash { - slash = true - continue - } - slash = false - key = append(key, v) - } - return string(key) -} -func (star *StarNotifyC) SendValueWaitRaw(key string, msg interface{}, tmout time.Duration) (CMsg, error) { - encodeData, err := encode(msg) - if err != nil { - return CMsg{}, err - } - return star.SendValueWait(key, string(encodeData), tmout) -} - -// SendValueWait 用于向Server端发送key-value类型数据并等待结果返回,此结果不会通过标准返回流程处理 -func (star *StarNotifyC) SendValueWait(name, value string, tmout time.Duration) (CMsg, error) { - var err error - var tmceed <-chan time.Time - if star.UseChannel { - return CMsg{}, errors.New("Do Not Use UseChannel Mode!") - } - rand.Seed(time.Now().UnixNano()) - mode := "cr" + fmt.Sprintf("%d%06d", time.Now().UnixNano(), rand.Intn(999999)) - var key []byte - for _, v := range []byte(name) { - if v == byte(124) || v == byte(92) { - key = append(key, byte(92)) - } - key = append(key, v) - } - _, err = star.Connc.Write(star.Queue.BuildMessage([]byte(mode + "||" + string(key) + "||" + value))) - if err != nil { - return CMsg{}, err - } - if int64(tmout) > 0 { - tmceed = time.After(tmout) - } - var source CMsg - source.wait = make(chan int, 2) - star.mu.Lock() - star.lockPool[mode] = source - star.mu.Unlock() - select { - case <-source.wait: - res := star.lockPool[mode] - star.mu.Lock() - delete(star.lockPool, mode) - star.mu.Unlock() - return res, nil - case <-tmceed: - return CMsg{}, errors.New("Time Exceed") - } -} - -// ReplyMsg 用于向Server端Reply信息 -func (star *StarNotifyC) ReplyMsg(data CMsg, name, value string) error { - var err error - var key []byte - for _, v := range []byte(name) { - if v == byte(124) || v == byte(92) { - key = append(key, byte(92)) - } - key = append(key, v) - } - _, err = star.Connc.Write(star.Queue.BuildMessage([]byte(data.mode + "||" + string(key) + "||" + value))) - return err -} - -func (star *StarNotifyC) cnotify() { - for { - select { - case <-star.stopSign.Done(): - return - default: - } - data, err := star.Queue.RestoreOne() - if err != nil { - time.Sleep(time.Millisecond * 500) - continue - } - if string(data.Msg) == "b612ryzstop" { - star.ClientStop() - star.Online = false - return - } - strs := strings.SplitN(string(data.Msg), "||", 3) - if len(strs) < 3 { - continue - } - strs[1] = star.trim(strs[1]) - if star.UseChannel { - go star.store(strs[1], strs[2]) - } else { - mode, key, value := strs[0], strs[1], strs[2] - if mode[0:2] != "cr" { - if msg, ok := star.FuncLists[key]; ok { - if star.Sync { - msg(CMsg{key, value, mode, nil}) - } else { - go msg(CMsg{key, value, mode, nil}) - } - } else { - if star.defaultFunc != nil { - if star.Sync { - star.defaultFunc(CMsg{key, value, mode, nil}) - } else { - go star.defaultFunc(CMsg{key, value, mode, nil}) - } - } - } - } else { - if sa, ok := star.lockPool[mode]; ok { - sa.Key = key - sa.Value = value - sa.mode = mode - star.mu.Lock() - star.lockPool[mode] = sa - star.mu.Unlock() - sa.wait <- 1 - } else { - if msg, ok := star.FuncLists[key]; ok { - if star.Sync { - msg(CMsg{key, value, mode, nil}) - } else { - go msg(CMsg{key, value, mode, nil}) - } - } else { - if star.defaultFunc != nil { - if star.Sync { - star.defaultFunc(CMsg{key, value, mode, nil}) - } else { - go star.defaultFunc(CMsg{key, value, mode, nil}) - } - } - } - } - } - } - } -} - -// ClientStop 终止client端运行 -func (star *StarNotifyC) ClientStop() { - if star.isUDP { - star.Send("b612ryzstop") - } - star.cancel() -} - -// SetNotify 用于设置关键词的调用函数 -func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) { - star.FuncLists[name] = data -} - -// SetDefaultNotify 用于设置默认关键词的调用函数 -func (star *StarNotifyC) SetDefaultNotify(data func(CMsg)) { - star.defaultFunc = data -} diff --git a/v1/client_test.go b/v1/client_test.go deleted file mode 100644 index 99c16c1..0000000 --- a/v1/client_test.go +++ /dev/null @@ -1,161 +0,0 @@ -package notify - -import ( - "fmt" - "testing" - "time" -) - -func Test_usechannel(t *testing.T) { - server, err := NewNotifyS("udp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - server.SetNotify("nihao", func(data SMsg) string { - fmt.Println("server recv:", data.Key, data.Value) - if data.Value != "" { - data.Reply("nba") - return "nb" - } - return "" - }) - client, err := NewNotifyC("udp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - client.UseChannel = true - //time.Sleep(time.Second * 10) - client.Send("nihao") - client.SendValue("nihao", "lalala") - txt := <-client.Notify("nihao") - fmt.Println("client", txt) - txt = <-client.Notify("nihao") - fmt.Println("client", txt) - server.ServerStop() - <-client.Stoped() - client.ClientStop() - time.Sleep(time.Second * 3) -} - -func Test_nochannel(t *testing.T) { - server, err := NewNotifyS("tcp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - server.SetNotify("nihao", func(data SMsg) string { - fmt.Println("server recv:", data.Key, data.Value) - if data.Value != "" { - data.Reply("nbaz") - return "" - } - return "" - }) - client, err := NewNotifyC("tcp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - //time.Sleep(time.Second * 10) - client.UseChannel = false - client.SetNotify("nihao", func(data CMsg) { - fmt.Println("client recv:", data.Key, data.Value) - if data.Value != "" { - time.Sleep(time.Millisecond * 900) - client.SendValue("nihao", "dsb") - } - }) - client.SendValue("nihao", "lalala") - time.Sleep(time.Second * 3) - server.ServerStop() - <-client.Stoped() - client.ClientStop() - time.Sleep(time.Second * 3) -} - -func Test_pipec(t *testing.T) { - server, err := NewNotifyS("tcp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - server.SetAesKey([]byte("abcdefg123456789")) - server.SetNotify("ni\\||hao", func(data SMsg) string { - fmt.Println("name-get", data.GetName()) - fmt.Println("name-set", data.SetName("iiiis")) - fmt.Println("name-get", data.GetName()) - fmt.Println("server recv:", data.Key, data.Value, data.mode) - if data.Value != "" { - data.Reply("nba") - return "" - } - return "" - }) - client, err := NewNotifyC("tcp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - client.SetAesKey([]byte("abcdefg123456789")) - client.UseChannel = false - sa, err := client.SendValueWait("ni\\||hao", "lalaeee", time.Second*10) - if err != nil { - fmt.Println(err) - return - } - fmt.Println(sa) - sa, err = client.SendValueWait("ni\\||hao", "lalasdeee", time.Second*10) - if err != nil { - fmt.Println(err) - return - } - fmt.Println(sa) - fmt.Println("sukidesu") - time.Sleep(time.Second * 3) - server.ServerStop() - <-client.Stoped() - client.ClientStop() - time.Sleep(time.Second * 2) -} - -func Test_pips(t *testing.T) { - var testmsg SMsg - server, err := NewNotifyS("udp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - server.SetNotify("nihao", func(data SMsg) string { - fmt.Println("server recv:", data.Key, data.Value, data.mode) - testmsg = data - if data.Value != "" { - data.Reply("nbaz") - return "" - } - return "" - }) - client, err := NewNotifyC("udp", "127.0.0.1:1926") - if err != nil { - fmt.Println(err) - return - } - //time.Sleep(time.Second * 10) - client.UseChannel = false - client.SetNotify("nihao", func(data CMsg) { - fmt.Println("client recv:", data.Key, data.Value, data.mode) - if data.mode != "pa" { - time.Sleep(time.Millisecond * 1200) - client.ReplyMsg(data, "nihao", "dsb") - } - }) - client.SendValue("nihao", "lalala") - time.Sleep(time.Second * 3) - fmt.Println(server.SendWait(testmsg, "nihao", "wozuinb", time.Second*20)) - fmt.Println("sakura") - server.ServerStop() - <-client.Stoped() - client.ClientStop() - time.Sleep(time.Second * 3) -} diff --git a/v1/serialization.go b/v1/serialization.go deleted file mode 100644 index c10d889..0000000 --- a/v1/serialization.go +++ /dev/null @@ -1,37 +0,0 @@ -package notify - -import ( - "bytes" - "encoding/gob" -) - -func Register(data interface{}) { - gob.Register(data) -} - -func RegisterAll(data []interface{}) { - for _, v := range data { - gob.Register(v) - } -} -func encode(src interface{}) ([]byte, error) { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(&src) - return buf.Bytes(), err -} - -func Decode(src []byte) (interface{}, error) { - dec := gob.NewDecoder(bytes.NewReader(src)) - var dst interface{} - err := dec.Decode(&dst) - return dst, err -} - -func (nmsg *SMsg) Decode() (interface{}, error) { - return Decode([]byte(nmsg.Value)) -} - -func (nmsg *CMsg) Decode() (interface{}, error) { - return Decode([]byte(nmsg.Value)) -} diff --git a/v1/server.go b/v1/server.go deleted file mode 100644 index 3149e57..0000000 --- a/v1/server.go +++ /dev/null @@ -1,534 +0,0 @@ -// Package notify is a package which provide common tcp/udp/unix socket service -package notify - -import ( - "context" - "errors" - "fmt" - "math/rand" - "net" - "strings" - "sync" - "time" - - "b612.me/starcrypto" - - "b612.me/starnet" -) - -var aesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133} - -func encodeFunc(data []byte) []byte { - return starcrypto.AesEncryptCFB(data, aesKey) -} - -func decodeFunc(data []byte) []byte { - return starcrypto.AesDecryptCFB(data, aesKey) -} - -// StarNotifyS 为Server端 -type StarNotifyS struct { - // Queue 是用来处理收发信息的简单消息队列 - Queue *starnet.StarQueue - // FuncLists 记录了被通知项所记录的函数 - aesKey []byte - FuncLists map[string]func(SMsg) string - funcMu sync.Mutex - defaultFunc func(SMsg) string - Connected func(SMsg) - nickName map[string]string - stopSign context.Context - cancel context.CancelFunc - connPool sync.Map - connMu sync.Mutex - lockPool map[string]SMsg - lockMu sync.Mutex - udpPool map[string]*net.UDPAddr - listener net.Listener - isUDP bool - Sync bool - // UDPConn UDP监听 - UDPConn *net.UDPConn - // Online 当前链接是否处于活跃状态 - Online bool - // ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn - ReadDeadline time.Time - // WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn - WriteDeadline time.Time - - // Deadline tcp/unix中超时设置,udp请直接调用UDPConn - Deadline time.Time -} - -// SMsg 指明当前服务端被通知的关键字 -type SMsg struct { - Conn net.Conn - Key string - Value string - UDP *net.UDPAddr - Uconn *net.UDPConn - mode string - wait chan int - nickName func(string, string) error - getName func(string) string - queue *starnet.StarQueue -} - -func (star *StarNotifyS) SetAesKey(key []byte) { - star.aesKey = key - star.Queue.EncodeFunc = func(data []byte) []byte { - return starcrypto.AesEncryptCFB(data, key) - } - star.Queue.DecodeFunc = func(data []byte) []byte { - return starcrypto.AesDecryptCFB(data, key) - } -} - -func (star *StarNotifyS) GetAesKey() []byte { - if len(star.aesKey) == 0 { - return aesKey - } - return star.aesKey -} - -func (star *StarNotifyS) getName(conn string) string { - for k, v := range star.nickName { - if v == conn { - return k - } - } - return "" -} -func (star *StarNotifyS) Stoped() <-chan struct{} { - return star.stopSign.Done() -} - -// GetConnPool 获取所有Client端信息 -func (star *StarNotifyS) GetConnPool() []SMsg { - var result []SMsg - star.connPool.Range(func(k, val interface{}) bool { - v := val.(net.Conn) - result = append(result, SMsg{Conn: v, mode: "pa", nickName: star.setNickName, getName: star.getName, queue: star.Queue}) - return true - }) - for _, v := range star.udpPool { - result = append(result, SMsg{UDP: v, Uconn: star.UDPConn, mode: "pa0", nickName: star.setNickName, getName: star.getName, queue: star.Queue}) - } - return result -} - -// GetClient 获取所有Client端信息 -func (star *StarNotifyS) GetClient(name string) (SMsg, error) { - if str, ok := star.nickName[name]; ok { - if tmp, ok := star.connPool.Load(str); ok { - conn := tmp.(net.Conn) - return SMsg{Conn: conn, mode: "pa", nickName: star.setNickName, getName: star.getName, queue: star.Queue}, nil - } - if conn, ok := star.udpPool[str]; ok { - return SMsg{UDP: conn, Uconn: star.UDPConn, mode: "pa0", nickName: star.setNickName, getName: star.getName, queue: star.Queue}, nil - } - } - return SMsg{}, errors.New("Not Found") -} - -func (nmsg *SMsg) GetName() string { - if nmsg.Uconn != nil { - return nmsg.getName(nmsg.UDP.String()) - } - return nmsg.getName(fmt.Sprint(nmsg.Conn)) -} - -func (nmsg *SMsg) SetName(name string) error { - if nmsg.Uconn != nil { - return nmsg.nickName(name, nmsg.UDP.String()) - } - return nmsg.nickName(name, fmt.Sprint(nmsg.Conn)) -} - -func (nmsg *SMsg) addSlash(name string) string { - var key []byte - for _, v := range []byte(name) { - if v == byte(124) || v == byte(92) { - key = append(key, byte(92)) - } - key = append(key, v) - } - return string(key) -} - -func (nmsg *SMsg) ReplyRaw(msg interface{}) error { - encodeData, err := encode(msg) - if err != nil { - return err - } - return nmsg.Reply(string(encodeData)) -} - -// Reply 用于向client端回复数据 -func (nmsg *SMsg) Reply(msg string) error { - var err error - if nmsg.Uconn == nil { - _, err = nmsg.Conn.Write(nmsg.queue.BuildMessage([]byte(nmsg.mode + "||" + nmsg.addSlash(nmsg.Key) + "||" + msg))) - } else { - err = WriteToUDP(nmsg.Uconn, nmsg.UDP, nmsg.queue.BuildMessage([]byte(nmsg.mode+"||"+nmsg.addSlash(nmsg.Key)+"||"+msg))) - } - return err -} - -// Send 用于向client端发送key-value数据 -func (nmsg *SMsg) Send(key, value string) error { - var err error - if nmsg.Uconn == nil { - _, err = nmsg.Conn.Write(nmsg.queue.BuildMessage([]byte("pa||" + nmsg.addSlash(key) + "||" + value))) - } else { - err = WriteToUDP(nmsg.Uconn, nmsg.UDP, nmsg.queue.BuildMessage([]byte("pa||"+nmsg.addSlash(key)+"||"+value))) - } - return err -} - -func (nmsg *SMsg) SendRaw(key string, msg interface{}) error { - encodeData, err := encode(msg) - if err != nil { - return err - } - return nmsg.Send(key, string(encodeData)) -} - -func (star *StarNotifyS) SendWaitRaw(source SMsg, key string, msg interface{}, tmout time.Duration) (SMsg, error) { - encodeData, err := encode(msg) - if err != nil { - return SMsg{}, err - } - return star.SendWait(source, key, string(encodeData), tmout) -} - -// SendWait 用于向client端发送key-value数据,并等待 -func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Duration) (SMsg, error) { - var err error - var tmceed <-chan time.Time - rand.Seed(time.Now().UnixNano()) - mode := "sr" + fmt.Sprintf("%d%06d", time.Now().UnixNano(), rand.Intn(999999)) - if source.Uconn == nil { - _, err = source.Conn.Write(star.Queue.BuildMessage([]byte(mode + "||" + source.addSlash(key) + "||" + value))) - } else { - err = WriteToUDP(source.Uconn, source.UDP, star.Queue.BuildMessage([]byte(mode+"||"+source.addSlash(key)+"||"+value))) - } - if err != nil { - return SMsg{}, err - } - if int64(tmout) > 0 { - tmceed = time.After(tmout) - } - source.wait = make(chan int, 2) - star.lockMu.Lock() - star.lockPool[mode] = source - star.lockMu.Unlock() - select { - case <-source.wait: - star.lockMu.Lock() - res := star.lockPool[mode] - delete(star.lockPool, mode) - star.lockMu.Unlock() - return res, nil - case <-tmceed: - return SMsg{}, errors.New("Time Exceed") - } -} - -func (star *StarNotifyS) starinits() { - builder := starnet.NewQueue() - builder.EncodeFunc = encodeFunc - builder.DecodeFunc = decodeFunc - builder.Encode = true - star.stopSign, star.cancel = context.WithCancel(context.Background()) - star.Queue = builder - star.udpPool = make(map[string]*net.UDPAddr) - star.FuncLists = make(map[string]func(SMsg) string) - star.nickName = make(map[string]string) - star.lockPool = make(map[string]SMsg) - star.Online = false - star.Queue.RestoreDuration(time.Millisecond * 50) -} - -// NewNotifyS 开启一个新的Server端通知 -func NewNotifyS(netype, value string) (*StarNotifyS, error) { - if netype[0:3] != "udp" { - return notudps(netype, value) - } - return doudps(netype, value) -} - -func doudps(netype, value string) (*StarNotifyS, error) { - var star StarNotifyS - star.starinits() - star.isUDP = true - udpaddr, err := net.ResolveUDPAddr(netype, value) - if err != nil { - return nil, err - } - star.UDPConn, err = net.ListenUDP(netype, udpaddr) - if err != nil { - return nil, err - } - go star.notify() - go func() { - <-star.stopSign.Done() - for k, v := range star.udpPool { - WriteToUDP(star.UDPConn, v, star.Queue.BuildMessage([]byte("b612ryzstop"))) - star.connMu.Lock() - delete(star.udpPool, k) - star.connMu.Unlock() - for k2, v2 := range star.nickName { - if v2 == k { - delete(star.nickName, k2) - } - } - } - star.UDPConn.Close() - star.Online = false - return - }() - go func() { - for { - buf := make([]byte, 81920) - n, addr, err := star.UDPConn.ReadFromUDP(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], addr) - if _, ok := star.udpPool[addr.String()]; !ok { - if star.Connected != nil { - go star.Connected(SMsg{UDP: addr, Uconn: star.UDPConn, nickName: star.setNickName, getName: star.getName, queue: star.Queue}) - } - } - star.connMu.Lock() - star.udpPool[addr.String()] = addr - star.connMu.Unlock() - } - if err != nil { - continue - } - } - }() - star.Online = true - return &star, nil -} - -func notudps(netype, value string) (*StarNotifyS, error) { - var err error - var star StarNotifyS - star.starinits() - star.isUDP = false - star.listener, err = net.Listen(netype, value) - if err != nil { - return nil, err - } - go star.notify() - go func() { - <-star.stopSign.Done() - star.connPool.Range(func(a, b interface{}) bool { - k := a.(string) - v := b.(net.Conn) - v.Close() - star.connPool.Delete(a) - for k2, v2 := range star.nickName { - if v2 == k { - star.funcMu.Lock() - delete(star.nickName, k2) - star.funcMu.Unlock() - } - } - return true - }) - star.listener.Close() - star.Online = false - return - }() - go func() { - for { - conn, err := star.listener.Accept() - if err != nil { - select { - case <-star.stopSign.Done(): - star.listener.Close() - return - default: - continue - } - } - if !star.ReadDeadline.IsZero() { - conn.SetReadDeadline(star.ReadDeadline) - } - if !star.WriteDeadline.IsZero() { - conn.SetWriteDeadline(star.WriteDeadline) - } - if !star.Deadline.IsZero() { - conn.SetDeadline(star.Deadline) - } - go func(conn net.Conn) { - for { - buf := make([]byte, 8192) - n, err := conn.Read(buf) - if n != 0 { - star.Queue.ParseMessage(buf[0:n], conn) - } - if err != nil { - conn.Close() - star.connPool.Delete(fmt.Sprint(conn)) - for k, v := range star.nickName { - if v == fmt.Sprint(conn) { - delete(star.nickName, k) - } - } - break - } - } - }(conn) - star.connPool.Store(fmt.Sprint(conn), conn) - if star.Connected != nil { - go star.Connected(SMsg{Conn: conn, nickName: star.setNickName, getName: star.getName, queue: star.Queue}) - } - } - }() - star.Online = true - return &star, nil -} - -func (star *StarNotifyS) GetListenerInfo() net.Listener { - return star.listener -} - -// SetNotify 用于设置通知关键词的调用函数 -func (star *StarNotifyS) setNickName(name string, conn string) error { - if _, ok := star.connPool.Load(conn); !ok { - if _, ok := star.udpPool[conn]; !ok { - return errors.New("Conn Not Found") - } - } - for k, v := range star.nickName { - if v == conn { - delete(star.nickName, k) - } - } - star.funcMu.Lock() - star.nickName[name] = conn - star.funcMu.Unlock() - return nil -} - -// SetNotify 用于设置通知关键词的调用函数 -func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) { - star.funcMu.Lock() - defer star.funcMu.Unlock() - if data == nil { - if _, ok := star.FuncLists[name]; ok { - delete(star.FuncLists, name) - } - return - } - star.FuncLists[name] = data -} - -// SetDefaultNotify 用于设置默认关键词的调用函数 -func (star *StarNotifyS) SetDefaultNotify(data func(SMsg) string) { - star.defaultFunc = data -} - -func (star *StarNotifyS) trim(name string) string { - var slash bool = false - var key []byte - for _, v := range []byte(name) { - if v == byte(92) && !slash { - slash = true - continue - } - slash = false - key = append(key, v) - } - return string(key) -} - -func (star *StarNotifyS) notify() { - for { - select { - case <-star.stopSign.Done(): - return - default: - } - data, err := star.Queue.RestoreOne() - if err != nil { - time.Sleep(time.Millisecond * 500) - continue - } - mode, key, value := star.analyseData(string(data.Msg)) - if mode == key && mode == value && mode == "" { - continue - } - var rmsg SMsg - if !star.isUDP { - rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil, mode, nil, star.setNickName, star.getName, star.Queue} - } else { - rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn, mode, nil, star.setNickName, star.getName, star.Queue} - if key == "b612ryzstop" { - star.connMu.Lock() - delete(star.udpPool, rmsg.UDP.String()) - star.connMu.Unlock() - for k, v := range star.nickName { - if v == rmsg.UDP.String() { - delete(star.nickName, k) - } - } - continue - } - } - replyFunc := func(key string, rmsg SMsg) { - if msg, ok := star.FuncLists[key]; ok { - sdata := msg(rmsg) - if sdata == "" { - return - } - rmsg.Reply(sdata) - } else { - if star.defaultFunc != nil { - sdata := star.defaultFunc(rmsg) - if sdata == "" { - return - } - rmsg.Reply(sdata) - } - } - } - if mode[0:2] != "sr" { - if !star.Sync { - go replyFunc(key, rmsg) - } else { - replyFunc(key, rmsg) - } - } else { - if sa, ok := star.lockPool[mode]; ok { - rmsg.wait = sa.wait - star.lockMu.Lock() - star.lockPool[mode] = rmsg - star.lockPool[mode].wait <- 1 - star.lockMu.Unlock() - } else { - if !star.Sync { - go replyFunc(key, rmsg) - } else { - replyFunc(key, rmsg) - } - } - } - } -} - -func (star *StarNotifyS) analyseData(msg string) (mode, key, value string) { - slice := strings.SplitN(msg, "||", 3) - if len(slice) < 3 { - return "", "", "" - } - return slice[0], star.trim(slice[1]), slice[2] -} - -// ServerStop 用于终止Server端运行 -func (star *StarNotifyS) ServerStop() { - star.cancel() -} diff --git a/v1/starnotify/define.go b/v1/starnotify/define.go deleted file mode 100644 index f6f9b7b..0000000 --- a/v1/starnotify/define.go +++ /dev/null @@ -1,103 +0,0 @@ -package starnotify - -import ( - "errors" - "time" - - "b612.me/notify/v1" -) - -var ( - starClient map[string]*notify.StarNotifyC - starServer map[string]*notify.StarNotifyS -) - -func init() { - starClient = make(map[string]*notify.StarNotifyC) - starServer = make(map[string]*notify.StarNotifyS) -} - -func NewClient(key, netype, value string) (*notify.StarNotifyC, error) { - client, err := notify.NewNotifyC(netype, value) - if err != nil { - return client, err - } - starClient[key] = client - return client, err -} - -func NewClientWithTimeout(key, netype, value string, timeout time.Duration) (*notify.StarNotifyC, error) { - client, err := notify.NewNotifyCWithTimeOut(netype, value, timeout) - if err != nil { - return client, err - } - starClient[key] = client - return client, err -} - -func DeleteClient(key string) error { - client, ok := starClient[key] - if !ok { - return errors.New("Not Exists Yet!") - } - if client.Online { - client.ClientStop() - } - client = nil - delete(starClient, key) - return nil -} - -func NewServer(key, netype, value string) (*notify.StarNotifyS, error) { - server, err := notify.NewNotifyS(netype, value) - if err != nil { - return server, err - } - starServer[key] = server - return server, err -} - -func DeleteServer(key string) error { - server, ok := starServer[key] - if !ok { - return errors.New("Not Exists Yet!") - } - if server.Online { - server.ServerStop() - } - server = nil - delete(starServer, key) - return nil -} - -func S(key string) *notify.StarNotifyS { - server, ok := starServer[key] - if !ok { - return nil - } - return server -} - -func C(key string) *notify.StarNotifyC { - client, ok := starClient[key] - if !ok { - return nil - } - return client -} - -func Server(key string) (*notify.StarNotifyS, error) { - server, ok := starServer[key] - if !ok { - return nil, errors.New("Not Exists Yet") - } - return server, nil -} - -func Client(key string) (*notify.StarNotifyC, error) { - client, ok := starClient[key] - if !ok { - return nil, errors.New("Not Exists Yet") - } - return client, nil -} diff --git a/v1/v2cs_test.go b/v1/v2cs_test.go deleted file mode 100644 index 812a242..0000000 --- a/v1/v2cs_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package notify - -import ( - "fmt" - "sync/atomic" - "testing" - "time" -) - -func Test_ServerTuAndClientCommon(t *testing.T) { - server, err := NewNotifyS("tcp", "127.0.0.1:12345") - if err != nil { - panic(err) - } - server.SetNotify("notify", notify) - for i := 1; i <= 1; i++ { - go func() { - - client, err := NewNotifyC("tcp", "127.0.0.1:12345") - if err != nil { - time.Sleep(time.Second * 2) - panic(err) - } - for { - //nowd = time.Now().UnixNano() - client.SendValueWait("notify", "client hello", time.Second*50) - //time.Sleep(time.Millisecond) - //fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000) - //client.Send("notify", []byte("client hello")) - } - }() - } - go func() { - time.Sleep(time.Second * 10) - server.ServerStop() - }() - <-server.Stoped() - //time.Sleep(time.Second * 5) - fmt.Println(count2) - -} - -var count2 int64 - -func notify(msg SMsg) string { - //fmt.Println(string(msg.Msg.Value)) - //fmt.Println("called:", float64(time.Now().UnixNano()-nowd)/1000000) - - go atomic.AddInt64(&count2, 1) - return "ok" -}