You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/server.go

370 lines
8.6 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

// Package notify is a package which provide common tcp/udp/unix socket service
package notify
import (
"context"
"errors"
"fmt"
"math/rand"
"net"
"strings"
"time"
"b612.me/starainrt"
)
var builder *starainrt.StarQueue
func init() {
builder = starainrt.NewQueue()
}
// StarNotifyS 为Server端
type StarNotifyS struct {
// Queue 是用来处理收发信息的简单消息队列
Queue *starainrt.StarQueue
// FuncLists 记录了被通知项所记录的函数
FuncLists map[string]func(SMsg) string
defaultFunc func(SMsg) string
Connected func(SMsg) string
stopSign context.Context
cancel context.CancelFunc
connPool map[string]net.Conn
lockPool map[string]SMsg
udpPool map[string]*net.UDPAddr
isUDP bool
// Stop 停止信 号
Stop chan int
// UDPConn UDP监听
UDPConn *net.UDPConn
// Online 当前链接是否处于活跃状态
Online bool
// ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn
ReadDeadline time.Time
// WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn
WriteDeadline time.Time
// Deadline tcp/unix中超时设置,udp请直接调用UDPConn
Deadline time.Time
}
// SMsg 指明当前服务端被通知的关键字
type SMsg struct {
Conn net.Conn
Key string
Value string
UDP *net.UDPAddr
uconn *net.UDPConn
mode string
wait chan int
}
// GetConnPool 获取所有Client端信息
func (star *StarNotifyS) GetConnPool() []SMsg {
var result []SMsg
for _, v := range star.connPool {
result = append(result, SMsg{Conn: v, mode: "pa"})
}
for _, v := range star.udpPool {
result = append(result, SMsg{UDP: v, uconn: star.UDPConn, mode: "pa0"})
}
return result
}
func (nmsg *SMsg) addSlash(name string) string {
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
return string(key)
}
// Reply 用于向client端回复数据
func (nmsg *SMsg) Reply(msg string) error {
var err error
if nmsg.uconn == nil {
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte(nmsg.mode + "||" + nmsg.addSlash(nmsg.Key) + "||" + msg)))
} else {
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte(nmsg.mode+"||"+nmsg.addSlash(nmsg.Key)+"||"+msg)), nmsg.UDP)
}
return err
}
// Send 用于向client端发送key-value数据
func (nmsg *SMsg) Send(key, value string) error {
var err error
if nmsg.uconn == nil {
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte("pa||" + nmsg.addSlash(key) + "||" + value)))
} else {
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte("pa||"+nmsg.addSlash(key)+"||"+value)), nmsg.UDP)
}
return err
}
// SendWait 用于向client端发送key-value数据并等待
func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Duration) (SMsg, error) {
var err error
var tmceed <-chan time.Time
rand.Seed(time.Now().UnixNano())
mode := "sr" + fmt.Sprintf("%05d", rand.Intn(99999))
if source.uconn == nil {
_, err = source.Conn.Write(builder.BuildMessage([]byte(mode + "||" + source.addSlash(key) + "||" + value)))
} else {
_, err = source.uconn.WriteToUDP(builder.BuildMessage([]byte(mode+"||"+source.addSlash(key)+"||"+value)), source.UDP)
}
if err != nil {
return SMsg{}, err
}
if int64(tmout) > 0 {
tmceed = time.After(tmout)
}
source.wait = make(chan int, 2)
star.lockPool[mode] = source
select {
case <-source.wait:
res := star.lockPool[mode]
delete(star.lockPool, mode)
return res, nil
case <-tmceed:
return SMsg{}, errors.New("Time Exceed")
}
}
func (star *StarNotifyS) starinits() {
star.stopSign, star.cancel = context.WithCancel(context.Background())
star.Queue = starainrt.NewQueue()
star.udpPool = make(map[string]*net.UDPAddr)
star.FuncLists = make(map[string]func(SMsg) string)
star.connPool = make(map[string]net.Conn)
star.lockPool = make(map[string]SMsg)
star.Stop = make(chan int, 5)
star.Online = false
star.Queue.RestoreDuration(time.Second * 2)
}
// NewNotifyS 开启一个新的Server端通知
func NewNotifyS(netype, value string) (*StarNotifyS, error) {
if netype[0:3] != "udp" {
return notudps(netype, value)
}
return doudps(netype, value)
}
func doudps(netype, value string) (*StarNotifyS, error) {
var star StarNotifyS
star.starinits()
star.isUDP = true
udpaddr, err := net.ResolveUDPAddr(netype, value)
if err != nil {
return nil, err
}
star.UDPConn, err = net.ListenUDP(netype, udpaddr)
if err != nil {
return nil, err
}
go star.notify()
go func() {
<-star.stopSign.Done()
for k, v := range star.udpPool {
star.UDPConn.WriteToUDP(star.Queue.BuildMessage([]byte("b612ryzstop")), v)
delete(star.connPool, k)
}
star.UDPConn.Close()
star.Online = false
return
}()
go func() {
for {
buf := make([]byte, 8192)
n, addr, err := star.UDPConn.ReadFromUDP(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], addr)
if _, ok := star.udpPool[addr.String()]; !ok {
if star.Connected != nil {
go star.Connected(SMsg{UDP: addr, uconn: star.UDPConn})
}
}
star.udpPool[addr.String()] = addr
}
if err != nil {
continue
}
}
}()
star.Online = true
return &star, nil
}
func notudps(netype, value string) (*StarNotifyS, error) {
var star StarNotifyS
star.starinits()
star.isUDP = false
listener, err := net.Listen(netype, value)
if err != nil {
return nil, err
}
go star.notify()
go func() {
<-star.stopSign.Done()
for k, v := range star.connPool {
v.Close()
delete(star.connPool, k)
}
listener.Close()
star.Online = false
return
}()
go func() {
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-star.stopSign.Done():
listener.Close()
return
default:
continue
}
}
if !star.ReadDeadline.IsZero() {
conn.SetReadDeadline(star.ReadDeadline)
}
if !star.WriteDeadline.IsZero() {
conn.SetWriteDeadline(star.WriteDeadline)
}
if !star.Deadline.IsZero() {
conn.SetDeadline(star.Deadline)
}
go func(conn net.Conn) {
for {
buf := make([]byte, 8192)
n, err := conn.Read(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], conn)
}
if err != nil {
conn.Close()
delete(star.connPool, conn.RemoteAddr().String())
break
}
}
}(conn)
star.connPool[conn.RemoteAddr().String()] = conn
if star.Connected != nil {
go star.Connected(SMsg{Conn: conn})
}
}
}()
star.Online = true
return &star, nil
}
// SetNotify 用于设置通知关键词的调用函数
func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) {
star.FuncLists[name] = data
}
// SetDefaultNotify 用于设置默认关键词的调用函数
func (star *StarNotifyS) SetDefaultNotify(data func(SMsg) string) {
star.defaultFunc = data
}
func (star *StarNotifyS) trim(name string) string {
var slash bool = false
var key []byte
for _, v := range []byte(name) {
if v == byte(92) && !slash {
slash = true
continue
}
slash = false
key = append(key, v)
}
return string(key)
}
func (star *StarNotifyS) notify() {
for {
select {
case <-star.stopSign.Done():
return
default:
}
data, err := star.Queue.RestoreOne()
if err != nil {
time.Sleep(time.Millisecond * 20)
continue
}
mode, key, value := star.analyseData(string(data.Msg))
var rmsg SMsg
if !star.isUDP {
rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil, mode, nil}
} else {
rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn, mode, nil}
if key == "b612ryzstop" {
delete(star.udpPool, rmsg.UDP.String())
continue
}
}
if mode[0:2] != "sr" {
go func() {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}()
} else {
if sa, ok := star.lockPool[mode]; ok {
rmsg.wait = sa.wait
star.lockPool[mode] = rmsg
star.lockPool[mode].wait <- 1
} else {
go func() {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}()
}
}
}
}
func (star *StarNotifyS) analyseData(msg string) (mode, key, value string) {
slice := strings.SplitN(msg, "||", 3)
return slice[0], star.trim(slice[1]), slice[2]
}
// ServerStop 用于终止Server端运行
func (star *StarNotifyS) ServerStop() {
star.cancel()
star.Stop <- 1
star.Stop <- 1
star.Stop <- 1
}