You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/client.go

360 lines
8.2 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package notify
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
"b612.me/starnet"
)
// StarNotifyC 为Client端
type StarNotifyC struct {
Connc net.Conn
dialTimeout time.Duration
clientSign map[string]chan string
mu sync.Mutex
// FuncLists 当不使用channel时使用此记录调用函数
FuncLists map[string]func(CMsg)
stopSign context.Context
cancel context.CancelFunc
defaultFunc func(CMsg)
// Stop 停止信 号
Stop chan int
// UseChannel 是否使用channel作为信息传递
UseChannel bool
isUDP bool
Sync bool
// Queue 是用来处理收发信息的简单消息队列
Queue *starnet.StarQueue
// Online 当前链接是否处于活跃状态
Online bool
lockPool map[string]CMsg
}
// CMsg 指明当前客户端被通知的关键字
type CMsg struct {
Key string
Value string
mode string
wait chan int
}
func (star *StarNotifyC) starinitc() {
star.stopSign, star.cancel = context.WithCancel(context.Background())
star.Queue = starnet.NewQueue()
star.Queue.EncodeFunc = encodeFunc
star.Queue.DecodeFunc = decodeFunc
star.Queue.Encode = true
star.FuncLists = make(map[string]func(CMsg))
star.UseChannel = false
star.Stop = make(chan int, 5)
star.clientSign = make(map[string]chan string)
star.Online = false
star.lockPool = make(map[string]CMsg)
star.Queue.RestoreDuration(time.Second * 2)
}
// Notify 用于获取一个通知
func (star *StarNotifyC) Notify(key string) chan string {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
star.mu.Lock()
star.clientSign[key] = ch
star.mu.Unlock()
}
return star.clientSign[key]
}
func (star *StarNotifyC) store(key, value string) {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
ch <- value
star.mu.Lock()
star.clientSign[key] = ch
star.mu.Unlock()
return
}
star.clientSign[key] <- value
}
func NewNotifyCWithTimeOut(netype, value string, timeout time.Duration) (*StarNotifyC, error) {
var err error
var star StarNotifyC
star.starinitc()
star.isUDP = false
if strings.Index(netype, "udp") >= 0 {
star.isUDP = true
}
star.Connc, err = net.DialTimeout(netype, value, timeout)
if err != nil {
return nil, err
}
star.dialTimeout = timeout
go star.cnotify()
go func() {
<-star.stopSign.Done()
star.Connc.Close()
star.Online = false
return
}()
go func() {
for {
buf := make([]byte, 8192)
n, err := star.Connc.Read(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], star.Connc)
}
if err != nil {
star.Connc.Close()
star.ClientStop()
//star, _ = NewNotifyC(netype, value)
star.Online = false
return
}
}
}()
star.Online = true
return &star, nil
}
// NewNotifyC 用于新建一个Client端进程
func NewNotifyC(netype, value string) (*StarNotifyC, error) {
var err error
var star StarNotifyC
star.starinitc()
star.isUDP = false
if strings.Index(netype, "udp") >= 0 {
star.isUDP = true
}
star.Connc, err = net.Dial(netype, value)
if err != nil {
return nil, err
}
go star.cnotify()
go func() {
<-star.stopSign.Done()
star.Connc.Close()
star.Online = false
return
}()
go func() {
for {
buf := make([]byte, 8192)
n, err := star.Connc.Read(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], star.Connc)
}
if err != nil {
star.Connc.Close()
star.ClientStop()
//star, _ = NewNotifyC(netype, value)
star.Online = false
return
}
}
}()
star.Online = true
return &star, nil
}
// Send 用于向Server端发送数据
func (star *StarNotifyC) Send(name string) error {
return star.SendValue(name, "")
}
func (star *StarNotifyC) SendValueRaw(key string, msg interface{}) error {
encodeData, err := encode(msg)
if err != nil {
return err
}
return star.SendValue(key, string(encodeData))
}
// SendValue 用于向Server端发送key-value类型数据
func (star *StarNotifyC) SendValue(name, value string) error {
var err error
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte("pa" + "||" + string(key) + "||" + value)))
return err
}
func (star *StarNotifyC) trim(name string) string {
var slash bool = false
var key []byte
for _, v := range []byte(name) {
if v == byte(92) && !slash {
slash = true
continue
}
slash = false
key = append(key, v)
}
return string(key)
}
func (star *StarNotifyC) SendValueWaitRaw(key string, msg interface{}, tmout time.Duration) (CMsg, error) {
encodeData, err := encode(msg)
if err != nil {
return CMsg{}, err
}
return star.SendValueWait(key, string(encodeData), tmout)
}
// SendValueWait 用于向Server端发送key-value类型数据并等待结果返回此结果不会通过标准返回流程处理
func (star *StarNotifyC) SendValueWait(name, value string, tmout time.Duration) (CMsg, error) {
var err error
var tmceed <-chan time.Time
if star.UseChannel {
return CMsg{}, errors.New("Do Not Use UseChannel Mode!")
}
rand.Seed(time.Now().UnixNano())
mode := "cr" + fmt.Sprintf("%d%06d", time.Now().UnixNano(), rand.Intn(999999))
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(mode + "||" + string(key) + "||" + value)))
if err != nil {
return CMsg{}, err
}
if int64(tmout) > 0 {
tmceed = time.After(tmout)
}
var source CMsg
source.wait = make(chan int, 2)
star.mu.Lock()
star.lockPool[mode] = source
star.mu.Unlock()
select {
case <-source.wait:
res := star.lockPool[mode]
star.mu.Lock()
delete(star.lockPool, mode)
star.mu.Unlock()
return res, nil
case <-tmceed:
return CMsg{}, errors.New("Time Exceed")
}
}
// ReplyMsg 用于向Server端Reply信息
func (star *StarNotifyC) ReplyMsg(data CMsg, name, value string) error {
var err error
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(data.mode + "||" + string(key) + "||" + value)))
return err
}
func (star *StarNotifyC) cnotify() {
for {
select {
case <-star.stopSign.Done():
return
default:
}
data, err := star.Queue.RestoreOne()
if err != nil {
time.Sleep(time.Microsecond * 2)
continue
}
if string(data.Msg) == "b612ryzstop" {
star.ClientStop()
star.Online = false
return
}
strs := strings.SplitN(string(data.Msg), "||", 3)
if len(strs) < 3 {
continue
}
strs[1] = star.trim(strs[1])
if star.UseChannel {
go star.store(strs[1], strs[2])
} else {
mode, key, value := strs[0], strs[1], strs[2]
if mode[0:2] != "cr" {
if msg, ok := star.FuncLists[key]; ok {
if star.Sync {
msg(CMsg{key, value, mode, nil})
} else {
go msg(CMsg{key, value, mode, nil})
}
} else {
if star.defaultFunc != nil {
if star.Sync {
star.defaultFunc(CMsg{key, value, mode, nil})
} else {
go star.defaultFunc(CMsg{key, value, mode, nil})
}
}
}
} else {
if sa, ok := star.lockPool[mode]; ok {
sa.Key = key
sa.Value = value
sa.mode = mode
star.mu.Lock()
star.lockPool[mode] = sa
star.mu.Unlock()
sa.wait <- 1
} else {
if msg, ok := star.FuncLists[key]; ok {
if star.Sync {
msg(CMsg{key, value, mode, nil})
} else {
go msg(CMsg{key, value, mode, nil})
}
} else {
if star.defaultFunc != nil {
if star.Sync {
star.defaultFunc(CMsg{key, value, mode, nil})
} else {
go star.defaultFunc(CMsg{key, value, mode, nil})
}
}
}
}
}
}
}
}
// ClientStop 终止client端运行
func (star *StarNotifyC) ClientStop() {
if star.isUDP {
star.Send("b612ryzstop")
}
star.cancel()
star.Stop <- 1
star.Stop <- 1
star.Stop <- 1
}
// SetNotify 用于设置关键词的调用函数
func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) {
star.FuncLists[name] = data
}
// SetDefaultNotify 用于设置默认关键词的调用函数
func (star *StarNotifyC) SetDefaultNotify(data func(CMsg)) {
star.defaultFunc = data
}