You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/server.go

694 lines
16 KiB
Go

package notify
import (
"b612.me/stario"
"context"
"errors"
"fmt"
"math"
"math/rand"
"net"
"os"
"strings"
"sync"
"sync/atomic"
"time"
)
type ServerCommon struct {
msgID uint64
alive atomic.Value
status Status
listener net.Listener
udpListener *net.UDPConn
queue *stario.StarQueue
stopFn context.CancelFunc
stopCtx context.Context
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
parallelNum int
wg stario.WaitGroup
clientPool map[string]*ClientConn
mu sync.RWMutex
handshakeRsaKey []byte
SecretKey []byte
defaultMsgEn func([]byte, []byte) []byte
defaultMsgDe func([]byte, []byte) []byte
linkFns map[string]func(message *Message)
defaultFns func(message *Message)
noFinSyncMsgPool sync.Map
noFinSyncMsgMaxKeepSeconds int64
maxHeartbeatLostSeconds int64
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
showError bool
debugMode bool
}
func NewServer() Server {
var server ServerCommon
server.wg = stario.NewWaitGroup(0)
server.parallelNum = 0
server.noFinSyncMsgMaxKeepSeconds = 0
server.maxHeartbeatLostSeconds = 300
server.stopCtx, server.stopFn = context.WithCancel(context.Background())
server.SecretKey = defaultAesKey
server.handshakeRsaKey = defaultRsaKey
server.clientPool = make(map[string]*ClientConn)
server.defaultMsgEn = defaultMsgEn
server.defaultMsgDe = defaultMsgDe
server.sequenceEn = encode
server.sequenceDe = Decode
server.alive.Store(false)
server.linkFns = make(map[string]func(*Message))
server.defaultFns = func(message *Message) {
return
}
return &server
}
func (s *ServerCommon) DebugMode(dmg bool) {
s.mu.Lock()
s.debugMode = dmg
s.mu.Unlock()
}
func (s *ServerCommon) IsDebugMode() bool {
s.mu.RLock()
defer s.mu.RUnlock()
return s.debugMode
}
func (s *ServerCommon) ShowError(std bool) {
s.mu.Lock()
s.showError = std
s.mu.Unlock()
}
func (s *ServerCommon) Stop() error {
if !s.alive.Load().(bool) {
return nil
}
s.alive.Store(false)
s.mu.Lock()
s.status = Status{
Alive: false,
Reason: "recv stop signal from user",
Err: nil,
}
s.mu.Unlock()
s.stopFn()
return nil
}
func (s *ServerCommon) Listen(network string, addr string) error {
if s.alive.Load().(bool) {
return errors.New("server already run")
}
s.stopCtx, s.stopFn = context.WithCancel(context.Background())
s.queue = stario.NewQueueCtx(s.stopCtx, 128, math.MaxUint32)
if strings.Contains(strings.ToLower(network), "udp") {
return s.ListenUDP(network, addr)
}
return s.ListenTU(network, addr)
}
func (s *ServerCommon) ListenTU(network string, addr string) error {
listener, err := net.Listen(network, addr)
if err != nil {
return err
}
s.alive.Store(true)
s.status.Alive = true
s.listener = listener
go s.accept()
go s.monitorPool()
go s.loadMessage()
return nil
}
func (s *ServerCommon) monitorPool() {
for {
select {
case <-s.stopCtx.Done():
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
return true
})
return
case <-time.After(time.Second * 30):
}
now := time.Now()
if s.noFinSyncMsgMaxKeepSeconds > 0 {
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
if data.Time.Add(time.Duration(s.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
}
return true
})
}
if s.maxHeartbeatLostSeconds != 0 {
for _, v := range s.clientPool {
if now.Unix()-v.lastHeartBeat > s.maxHeartbeatLostSeconds {
v.stopFn()
s.removeClient(v)
}
}
}
}
}
func (s *ServerCommon) SetDefaultCommEncode(fn func([]byte, []byte) []byte) {
s.defaultMsgEn = fn
}
func (s *ServerCommon) SetDefaultCommDecode(fn func([]byte, []byte) []byte) {
s.defaultMsgDe = fn
}
func (s *ServerCommon) SetDefaultLink(fn func(message *Message)) {
s.defaultFns = fn
}
func (s *ServerCommon) SetLink(key string, fn func(*Message)) {
s.mu.Lock()
defer s.mu.Unlock()
s.linkFns[key] = fn
}
func (s *ServerCommon) pushMessage(data []byte, source string) {
s.queue.ParseMessage(data, source)
}
func (s *ServerCommon) removeClient(client *ClientConn) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.clientPool, client.ClientID)
}
func (s *ServerCommon) accept() {
if s.udpListener != nil {
s.acceptUDP()
}
s.acceptTU()
}
func (s *ServerCommon) acceptTU() {
for {
select {
case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return
default:
}
conn, err := s.listener.Accept()
if err != nil {
if s.showError || s.debugMode {
fmt.Println("error accept:", err)
}
continue
}
if s.debugMode {
fmt.Println("accept new connection from", conn.RemoteAddr())
}
var id string
for {
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
s.mu.RLock()
if _, ok := s.clientPool[id]; ok {
s.mu.RUnlock()
continue
}
s.mu.RUnlock()
break
}
client := ClientConn{
ClientID: id,
ClientAddr: conn.RemoteAddr(),
tuConn: conn,
server: s,
maxReadTimeout: s.maxReadTimeout,
maxWriteTimeout: s.maxWriteTimeout,
SecretKey: s.SecretKey,
handshakeRsaKey: s.handshakeRsaKey,
msgEn: s.defaultMsgEn,
msgDe: s.defaultMsgDe,
lastHeartBeat: time.Now().Unix(),
}
client.alive.Store(true)
client.status = Status{
Alive: true,
Reason: "",
Err: nil,
}
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
go client.readTUMessage()
}
}
func (s *ServerCommon) loadMessage() {
for {
select {
case <-s.stopCtx.Done():
var wg sync.WaitGroup
s.mu.RLock()
for _, v := range s.clientPool {
wg.Add(1)
go func(v *ClientConn) {
defer wg.Done()
v.sayGoodByeForTU()
v.alive.Store(false)
v.status = Status{
Alive: false,
Reason: "recv stop signal from server",
Err: nil,
}
v.stopFn()
s.removeClient(v)
}(v)
}
s.mu.RUnlock()
select {
case <-time.After(time.Second * 8):
case <-stario.WaitUntilFinished(func() error {
wg.Wait()
return nil
}):
}
if s.listener != nil {
s.listener.Close()
}
s.wg.Wait()
return
case data, ok := <-s.queue.RestoreChan():
if !ok {
continue
}
s.wg.Add(1)
go func(data stario.MsgQueue) {
s.mu.RLock()
cc, ok := s.clientPool[data.Conn.(string)]
s.mu.RUnlock()
if !ok {
return
}
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
if err != nil {
if s.showError || s.debugMode {
fmt.Println("server decode data error", err)
}
return
}
//fmt.Println("decoded:", float64(time.Now().UnixNano()-nowd)/1000000)
message := Message{
NetType: NET_SERVER,
ClientConn: cc,
TransferMsg: msg.(TransferMsg),
}
message.Time = time.Now()
//fmt.Println("dispatch:", float64(time.Now().UnixNano()-nowd)/1000000)
s.dispatchMsg(message)
}(data)
}
}
}
func (s *ServerCommon) sysMsg(message Message) {
switch message.Key {
case "bye":
//fmt.Println("recv stop signal from client", message.ClientConn.ClientID)
if message.TransferMsg.Type == MSG_SYS_WAIT {
message.Reply(nil)
}
message.ClientConn.alive.Store(false)
message.ClientConn.status = Status{
Alive: false,
Reason: "recv stop signal from client",
Err: nil,
}
message.ClientConn.stopFn()
case "heartbeat":
message.ClientConn.lastHeartBeat = time.Now().Unix()
message.Reply(nil)
}
}
func (s *ServerCommon) dispatchMsg(message Message) {
defer s.wg.Done()
switch message.TransferMsg.Type {
case MSG_SYS_WAIT:
fallthrough
case MSG_SYS:
s.sysMsg(message)
return
case MSG_KEY_CHANGE:
message.ClientConn.rsaDecode(message)
return
case MSG_SYS_REPLY:
fallthrough
case MSG_SYNC_REPLY:
data, ok := s.noFinSyncMsgPool.Load(message.TransferMsg.ID)
if ok {
wait := data.(WaitMsg)
wait.Reply <- message
s.noFinSyncMsgPool.Delete(message.TransferMsg.ID)
return
}
//just throw
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
fn(&message)
}
fn, ok := s.linkFns[message.TransferMsg.Key]
if ok {
callFn(fn)
}
if s.defaultFns != nil {
callFn(s.defaultFns)
}
}
func (s *ServerCommon) send(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
if s.udpListener != nil {
return s.sendUDP(c, msg)
}
return s.sendTU(c, msg)
}
func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&s.msgID, 1)
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
return WaitMsg{}, err
}
}
_, err = c.tuConn.Write(data)
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
}
func (s *ServerCommon) Send(c *ClientConn, key string, value MsgVal) error {
_, err := s.send(c, TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := s.send(c, msg)
if err != nil {
return Message{}, err
}
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
select {
case <-time.After(timeout):
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
}
func (s *ServerCommon) SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error) {
return s.sendCtx(c, TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Context) (Message, error) {
data, err := s.send(c, msg)
if err != nil {
return Message{}, err
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrClosed
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
}
func (s *ServerCommon) SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error) {
return s.sendWait(c, TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (s *ServerCommon) SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := s.sequenceEn(value)
if err != nil {
return Message{}, err
}
return s.SendWait(c, key, data, timeout)
}
func (s *ServerCommon) SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error) {
data, err := s.sequenceEn(val)
if err != nil {
return Message{}, err
}
return s.sendCtx(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (s *ServerCommon) SendObj(c *ClientConn, key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = s.send(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}
//for udp below
func (s *ServerCommon) ListenUDP(network string, addr string) error {
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return err
}
listener, err := net.ListenUDP(network, udpAddr)
if err != nil {
return err
}
s.alive.Store(true)
s.status.Alive = true
s.udpListener = listener
go s.accept()
go s.monitorPool()
go s.loadMessage()
return nil
}
func (s *ServerCommon) acceptUDP() {
for {
select {
case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return
default:
}
if s.maxReadTimeout.Seconds() > 0 {
s.udpListener.SetReadDeadline(time.Now().Add(s.maxReadTimeout))
}
data := make([]byte, 4096)
num, addr, err := s.udpListener.ReadFromUDP(data)
id := addr.String()
if s.debugMode {
fmt.Println("accept new udp message from", id)
}
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
s.mu.RLock()
if _, ok := s.clientPool[id]; !ok {
s.mu.RUnlock()
client := ClientConn{
ClientID: id,
ClientAddr: addr,
server: s,
maxReadTimeout: s.maxReadTimeout,
maxWriteTimeout: s.maxWriteTimeout,
SecretKey: s.SecretKey,
handshakeRsaKey: s.handshakeRsaKey,
msgEn: s.defaultMsgEn,
msgDe: s.defaultMsgDe,
lastHeartBeat: time.Now().Unix(),
}
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
} else {
s.mu.RUnlock()
}
if err == os.ErrDeadlineExceeded {
if num != 0 {
s.pushMessage(data[:num], id)
}
continue
}
if err != nil {
continue
}
s.pushMessage(data[:num], id)
}
}
func (s *ServerCommon) sendUDP(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = uint64(time.Now().UnixNano()) + rand.Uint64() + rand.Uint64()
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
s.udpListener.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
}
_, err = s.udpListener.WriteTo(data, c.ClientAddr)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
}
func (s *ServerCommon) StopMonitorChan() <-chan struct{} {
return s.stopCtx.Done()
}
func (s *ServerCommon) Status() Status {
return s.status
}
func (s *ServerCommon) GetSecretKey() []byte {
return s.SecretKey
}
func (s *ServerCommon) SetSecretKey(key []byte) {
s.SecretKey = key
}
func (s *ServerCommon) RsaPrivKey() []byte {
return s.handshakeRsaKey
}
func (s *ServerCommon) SetRsaPrivKey(key []byte) {
s.handshakeRsaKey = key
}
func (s *ServerCommon) GetClient(id string) *ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
c, ok := s.clientPool[id]
if !ok {
return nil
}
return c
}
func (s *ServerCommon) GetClientLists() []*ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]*ClientConn, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v)
}
return list
}
func (s *ServerCommon) GetClientAddrs() []net.Addr {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]net.Addr, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v.ClientAddr)
}
return list
}
func (s *ServerCommon) GetSequenceEn() func(interface{}) ([]byte, error) {
return s.sequenceEn
}
func (s *ServerCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
s.sequenceEn = fn
}
func (s *ServerCommon) GetSequenceDe() func([]byte) (interface{}, error) {
return s.sequenceDe
}
func (s *ServerCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
s.sequenceDe = fn
}
func (s *ServerCommon) HeartbeatTimeoutSec() int64 {
return s.maxHeartbeatLostSeconds
}
func (s *ServerCommon) SetHeartbeatTimeoutSec(sec int64) {
s.maxHeartbeatLostSeconds = sec
}