You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/v1/server.go

535 lines
13 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Package notify is a package which provide common tcp/udp/unix socket service
package notify
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"strings"
"sync"
"time"
"b612.me/starcrypto"
"b612.me/starnet"
)
var aesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133}
func encodeFunc(data []byte) []byte {
return starcrypto.AesEncryptCFB(data, aesKey)
}
func decodeFunc(data []byte) []byte {
return starcrypto.AesDecryptCFB(data, aesKey)
}
// StarNotifyS 为Server端
type StarNotifyS struct {
// Queue 是用来处理收发信息的简单消息队列
Queue *starnet.StarQueue
// FuncLists 记录了被通知项所记录的函数
aesKey []byte
FuncLists map[string]func(SMsg) string
funcMu sync.Mutex
defaultFunc func(SMsg) string
Connected func(SMsg)
nickName map[string]string
stopSign context.Context
cancel context.CancelFunc
connPool sync.Map
connMu sync.Mutex
lockPool map[string]SMsg
lockMu sync.Mutex
udpPool map[string]*net.UDPAddr
listener net.Listener
isUDP bool
Sync bool
// UDPConn UDP监听
UDPConn *net.UDPConn
// Online 当前链接是否处于活跃状态
Online bool
// ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn
ReadDeadline time.Time
// WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn
WriteDeadline time.Time
// Deadline tcp/unix中超时设置,udp请直接调用UDPConn
Deadline time.Time
}
// SMsg 指明当前服务端被通知的关键字
type SMsg struct {
Conn net.Conn
Key string
Value string
UDP *net.UDPAddr
Uconn *net.UDPConn
mode string
wait chan int
nickName func(string, string) error
getName func(string) string
queue *starnet.StarQueue
}
func (star *StarNotifyS) SetAesKey(key []byte) {
star.aesKey = key
star.Queue.EncodeFunc = func(data []byte) []byte {
return starcrypto.AesEncryptCFB(data, key)
}
star.Queue.DecodeFunc = func(data []byte) []byte {
return starcrypto.AesDecryptCFB(data, key)
}
}
func (star *StarNotifyS) GetAesKey() []byte {
if len(star.aesKey) == 0 {
return aesKey
}
return star.aesKey
}
func (star *StarNotifyS) getName(conn string) string {
for k, v := range star.nickName {
if v == conn {
return k
}
}
return ""
}
func (star *StarNotifyS) Stoped() <-chan struct{} {
return star.stopSign.Done()
}
// GetConnPool 获取所有Client端信息
func (star *StarNotifyS) GetConnPool() []SMsg {
var result []SMsg
star.connPool.Range(func(k, val interface{}) bool {
v := val.(net.Conn)
result = append(result, SMsg{Conn: v, mode: "pa", nickName: star.setNickName, getName: star.getName, queue: star.Queue})
return true
})
for _, v := range star.udpPool {
result = append(result, SMsg{UDP: v, Uconn: star.UDPConn, mode: "pa0", nickName: star.setNickName, getName: star.getName, queue: star.Queue})
}
return result
}
// GetClient 获取所有Client端信息
func (star *StarNotifyS) GetClient(name string) (SMsg, error) {
if str, ok := star.nickName[name]; ok {
if tmp, ok := star.connPool.Load(str); ok {
conn := tmp.(net.Conn)
return SMsg{Conn: conn, mode: "pa", nickName: star.setNickName, getName: star.getName, queue: star.Queue}, nil
}
if conn, ok := star.udpPool[str]; ok {
return SMsg{UDP: conn, Uconn: star.UDPConn, mode: "pa0", nickName: star.setNickName, getName: star.getName, queue: star.Queue}, nil
}
}
return SMsg{}, errors.New("Not Found")
}
func (nmsg *SMsg) GetName() string {
if nmsg.Uconn != nil {
return nmsg.getName(nmsg.UDP.String())
}
return nmsg.getName(fmt.Sprint(nmsg.Conn))
}
func (nmsg *SMsg) SetName(name string) error {
if nmsg.Uconn != nil {
return nmsg.nickName(name, nmsg.UDP.String())
}
return nmsg.nickName(name, fmt.Sprint(nmsg.Conn))
}
func (nmsg *SMsg) addSlash(name string) string {
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
return string(key)
}
func (nmsg *SMsg) ReplyRaw(msg interface{}) error {
encodeData, err := encode(msg)
if err != nil {
return err
}
return nmsg.Reply(string(encodeData))
}
// Reply 用于向client端回复数据
func (nmsg *SMsg) Reply(msg string) error {
var err error
if nmsg.Uconn == nil {
_, err = nmsg.Conn.Write(nmsg.queue.BuildMessage([]byte(nmsg.mode + "||" + nmsg.addSlash(nmsg.Key) + "||" + msg)))
} else {
err = WriteToUDP(nmsg.Uconn, nmsg.UDP, nmsg.queue.BuildMessage([]byte(nmsg.mode+"||"+nmsg.addSlash(nmsg.Key)+"||"+msg)))
}
return err
}
// Send 用于向client端发送key-value数据
func (nmsg *SMsg) Send(key, value string) error {
var err error
if nmsg.Uconn == nil {
_, err = nmsg.Conn.Write(nmsg.queue.BuildMessage([]byte("pa||" + nmsg.addSlash(key) + "||" + value)))
} else {
err = WriteToUDP(nmsg.Uconn, nmsg.UDP, nmsg.queue.BuildMessage([]byte("pa||"+nmsg.addSlash(key)+"||"+value)))
}
return err
}
func (nmsg *SMsg) SendRaw(key string, msg interface{}) error {
encodeData, err := encode(msg)
if err != nil {
return err
}
return nmsg.Send(key, string(encodeData))
}
func (star *StarNotifyS) SendWaitRaw(source SMsg, key string, msg interface{}, tmout time.Duration) (SMsg, error) {
encodeData, err := encode(msg)
if err != nil {
return SMsg{}, err
}
return star.SendWait(source, key, string(encodeData), tmout)
}
// SendWait 用于向client端发送key-value数据并等待
func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Duration) (SMsg, error) {
var err error
var tmceed <-chan time.Time
rand.Seed(time.Now().UnixNano())
mode := "sr" + fmt.Sprintf("%d%06d", time.Now().UnixNano(), rand.Intn(999999))
if source.Uconn == nil {
_, err = source.Conn.Write(star.Queue.BuildMessage([]byte(mode + "||" + source.addSlash(key) + "||" + value)))
} else {
err = WriteToUDP(source.Uconn, source.UDP, star.Queue.BuildMessage([]byte(mode+"||"+source.addSlash(key)+"||"+value)))
}
if err != nil {
return SMsg{}, err
}
if int64(tmout) > 0 {
tmceed = time.After(tmout)
}
source.wait = make(chan int, 2)
star.lockMu.Lock()
star.lockPool[mode] = source
star.lockMu.Unlock()
select {
case <-source.wait:
star.lockMu.Lock()
res := star.lockPool[mode]
delete(star.lockPool, mode)
star.lockMu.Unlock()
return res, nil
case <-tmceed:
return SMsg{}, errors.New("Time Exceed")
}
}
func (star *StarNotifyS) starinits() {
builder := starnet.NewQueue()
builder.EncodeFunc = encodeFunc
builder.DecodeFunc = decodeFunc
builder.Encode = true
star.stopSign, star.cancel = context.WithCancel(context.Background())
star.Queue = builder
star.udpPool = make(map[string]*net.UDPAddr)
star.FuncLists = make(map[string]func(SMsg) string)
star.nickName = make(map[string]string)
star.lockPool = make(map[string]SMsg)
star.Online = false
star.Queue.RestoreDuration(time.Millisecond * 50)
}
// NewNotifyS 开启一个新的Server端通知
func NewNotifyS(netype, value string) (*StarNotifyS, error) {
if netype[0:3] != "udp" {
return notudps(netype, value)
}
return doudps(netype, value)
}
func doudps(netype, value string) (*StarNotifyS, error) {
var star StarNotifyS
star.starinits()
star.isUDP = true
udpaddr, err := net.ResolveUDPAddr(netype, value)
if err != nil {
return nil, err
}
star.UDPConn, err = net.ListenUDP(netype, udpaddr)
if err != nil {
return nil, err
}
go star.notify()
go func() {
<-star.stopSign.Done()
for k, v := range star.udpPool {
WriteToUDP(star.UDPConn, v, star.Queue.BuildMessage([]byte("b612ryzstop")))
star.connMu.Lock()
delete(star.udpPool, k)
star.connMu.Unlock()
for k2, v2 := range star.nickName {
if v2 == k {
delete(star.nickName, k2)
}
}
}
star.UDPConn.Close()
star.Online = false
return
}()
go func() {
for {
buf := make([]byte, 81920)
n, addr, err := star.UDPConn.ReadFromUDP(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], addr)
if _, ok := star.udpPool[addr.String()]; !ok {
if star.Connected != nil {
go star.Connected(SMsg{UDP: addr, Uconn: star.UDPConn, nickName: star.setNickName, getName: star.getName, queue: star.Queue})
}
}
star.connMu.Lock()
star.udpPool[addr.String()] = addr
star.connMu.Unlock()
}
if err != nil {
continue
}
}
}()
star.Online = true
return &star, nil
}
func notudps(netype, value string) (*StarNotifyS, error) {
var err error
var star StarNotifyS
star.starinits()
star.isUDP = false
star.listener, err = net.Listen(netype, value)
if err != nil {
return nil, err
}
go star.notify()
go func() {
<-star.stopSign.Done()
star.connPool.Range(func(a, b interface{}) bool {
k := a.(string)
v := b.(net.Conn)
v.Close()
star.connPool.Delete(a)
for k2, v2 := range star.nickName {
if v2 == k {
star.funcMu.Lock()
delete(star.nickName, k2)
star.funcMu.Unlock()
}
}
return true
})
star.listener.Close()
star.Online = false
return
}()
go func() {
for {
conn, err := star.listener.Accept()
if err != nil {
select {
case <-star.stopSign.Done():
star.listener.Close()
return
default:
continue
}
}
if !star.ReadDeadline.IsZero() {
conn.SetReadDeadline(star.ReadDeadline)
}
if !star.WriteDeadline.IsZero() {
conn.SetWriteDeadline(star.WriteDeadline)
}
if !star.Deadline.IsZero() {
conn.SetDeadline(star.Deadline)
}
go func(conn net.Conn) {
for {
buf := make([]byte, 8192)
n, err := conn.Read(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], conn)
}
if err != nil {
conn.Close()
star.connPool.Delete(fmt.Sprint(conn))
for k, v := range star.nickName {
if v == fmt.Sprint(conn) {
delete(star.nickName, k)
}
}
break
}
}
}(conn)
star.connPool.Store(fmt.Sprint(conn), conn)
if star.Connected != nil {
go star.Connected(SMsg{Conn: conn, nickName: star.setNickName, getName: star.getName, queue: star.Queue})
}
}
}()
star.Online = true
return &star, nil
}
func (star *StarNotifyS) GetListenerInfo() net.Listener {
return star.listener
}
// SetNotify 用于设置通知关键词的调用函数
func (star *StarNotifyS) setNickName(name string, conn string) error {
if _, ok := star.connPool.Load(conn); !ok {
if _, ok := star.udpPool[conn]; !ok {
return errors.New("Conn Not Found")
}
}
for k, v := range star.nickName {
if v == conn {
delete(star.nickName, k)
}
}
star.funcMu.Lock()
star.nickName[name] = conn
star.funcMu.Unlock()
return nil
}
// SetNotify 用于设置通知关键词的调用函数
func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) {
star.funcMu.Lock()
defer star.funcMu.Unlock()
if data == nil {
if _, ok := star.FuncLists[name]; ok {
delete(star.FuncLists, name)
}
return
}
star.FuncLists[name] = data
}
// SetDefaultNotify 用于设置默认关键词的调用函数
func (star *StarNotifyS) SetDefaultNotify(data func(SMsg) string) {
star.defaultFunc = data
}
func (star *StarNotifyS) trim(name string) string {
var slash bool = false
var key []byte
for _, v := range []byte(name) {
if v == byte(92) && !slash {
slash = true
continue
}
slash = false
key = append(key, v)
}
return string(key)
}
func (star *StarNotifyS) notify() {
for {
select {
case <-star.stopSign.Done():
return
default:
}
data, err := star.Queue.RestoreOne()
if err != nil {
time.Sleep(time.Millisecond * 500)
continue
}
mode, key, value := star.analyseData(string(data.Msg))
if mode == key && mode == value && mode == "" {
continue
}
var rmsg SMsg
if !star.isUDP {
rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil, mode, nil, star.setNickName, star.getName, star.Queue}
} else {
rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn, mode, nil, star.setNickName, star.getName, star.Queue}
if key == "b612ryzstop" {
star.connMu.Lock()
delete(star.udpPool, rmsg.UDP.String())
star.connMu.Unlock()
for k, v := range star.nickName {
if v == rmsg.UDP.String() {
delete(star.nickName, k)
}
}
continue
}
}
replyFunc := func(key string, rmsg SMsg) {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}
if mode[0:2] != "sr" {
if !star.Sync {
go replyFunc(key, rmsg)
} else {
replyFunc(key, rmsg)
}
} else {
if sa, ok := star.lockPool[mode]; ok {
rmsg.wait = sa.wait
star.lockMu.Lock()
star.lockPool[mode] = rmsg
star.lockPool[mode].wait <- 1
star.lockMu.Unlock()
} else {
if !star.Sync {
go replyFunc(key, rmsg)
} else {
replyFunc(key, rmsg)
}
}
}
}
}
func (star *StarNotifyS) analyseData(msg string) (mode, key, value string) {
slice := strings.SplitN(msg, "||", 3)
if len(slice) < 3 {
return "", "", ""
}
return slice[0], star.trim(slice[1]), slice[2]
}
// ServerStop 用于终止Server端运行
func (star *StarNotifyS) ServerStop() {
star.cancel()
}