Compare commits

..

20 Commits

@ -1,272 +1,614 @@
package notify package notify
import ( import (
"b612.me/starcrypto"
"b612.me/stario"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"net" "net"
"strings" "os"
"sync"
"sync/atomic"
"time" "time"
"b612.me/starainrt"
) )
// StarNotifyC 为Client端 type ClientCommon struct {
type StarNotifyC struct { alive atomic.Value
Connc net.Conn status Status
clientSign map[string]chan string byeFromServer bool
// FuncLists 当不使用channel时使用此记录调用函数 conn net.Conn
FuncLists map[string]func(CMsg) mu sync.Mutex
stopSign context.Context msgID uint64
cancel context.CancelFunc queue *stario.StarQueue
defaultFunc func(CMsg) stopFn context.CancelFunc
// Stop 停止信 号 stopCtx context.Context
Stop chan int parallelNum int
// UseChannel 是否使用channel作为信息传递 maxReadTimeout time.Duration
UseChannel bool maxWriteTimeout time.Duration
isUDP bool keyExchangeFn func(c Client) error
// Queue 是用来处理收发信息的简单消息队列 linkFns map[string]func(message *Message)
Queue *starainrt.StarQueue defaultFns func(message *Message)
// Online 当前链接是否处于活跃状态 msgEn func([]byte, []byte) []byte
Online bool msgDe func([]byte, []byte) []byte
lockPool map[string]CMsg noFinSyncMsgPool sync.Map
} handshakeRsaPubKey []byte
SecretKey []byte
// CMsg 指明当前客户端被通知的关键字 noFinSyncMsgMaxKeepSeconds int
type CMsg struct { lastHeartbeat int64
Key string heartbeatPeriod time.Duration
Value string wg stario.WaitGroup
mode string netType NetType
wait chan int showError bool
} skipKeyExchange bool
useHeartBeat bool
func (star *StarNotifyC) starinitc() { sequenceDe func([]byte) (interface{}, error)
star.stopSign, star.cancel = context.WithCancel(context.Background()) sequenceEn func(interface{}) ([]byte, error)
star.Queue = starainrt.NewQueue() debugMode bool
star.FuncLists = make(map[string]func(CMsg))
star.UseChannel = true
star.Stop = make(chan int, 5)
star.clientSign = make(map[string]chan string)
star.Online = false
star.lockPool = make(map[string]CMsg)
star.Queue.RestoreDuration(time.Second * 2)
}
// Notify 用于获取一个通知
func (star *StarNotifyC) Notify(key string) chan string {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
star.clientSign[key] = ch
}
return star.clientSign[key]
}
func (star *StarNotifyC) store(key, value string) {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
ch <- value
star.clientSign[key] = ch
return
}
star.clientSign[key] <- value
} }
// NewNotifyC 用于新建一个Client端进程 func (c *ClientCommon) Connect(network string, addr string) error {
func NewNotifyC(netype, value string) (*StarNotifyC, error) { if c.alive.Load().(bool) {
var err error return errors.New("client already run")
var star StarNotifyC
star.starinitc()
star.isUDP = false
if strings.Index(netype, "udp") >= 0 {
star.isUDP = true
} }
star.Connc, err = net.Dial(netype, value) c.stopCtx, c.stopFn = context.WithCancel(context.Background())
c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
conn, err := net.Dial(network, addr)
if err != nil { if err != nil {
return nil, err return err
} }
go star.cnotify() c.alive.Store(true)
go func() { c.status.Alive = true
<-star.stopSign.Done() c.conn = conn
star.Connc.Close() if c.useHeartBeat {
star.Online = false go c.Heartbeat()
return }
}() return c.clientPostInit()
go func() {
for {
buf := make([]byte, 8192)
n, err := star.Connc.Read(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], star.Connc)
}
if err != nil {
star.Connc.Close()
star.ClientStop()
//star, _ = NewNotifyC(netype, value)
star.Online = false
return
}
}
}()
star.Online = true
return &star, nil
} }
// Send 用于向Server端发送数据 func (c *ClientCommon) DebugMode(dmg bool) {
func (star *StarNotifyC) Send(name string) error { c.mu.Lock()
return star.SendValue(name, "") c.debugMode = dmg
c.mu.Unlock()
} }
// SendValue 用于向Server端发送key-value类型数据 func (c *ClientCommon) IsDebugMode() bool {
func (star *StarNotifyC) SendValue(name, value string) error { return c.debugMode
var err error }
var key []byte
for _, v := range []byte(name) { func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error {
if v == byte(124) || v == byte(92) { if c.alive.Load().(bool) {
key = append(key, byte(92)) return errors.New("client already run")
}
key = append(key, v)
} }
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte("pa" + "||" + string(key) + "||" + value))) c.stopCtx, c.stopFn = context.WithCancel(context.Background())
return err c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
conn, err := net.DialTimeout(network, addr, timeout)
if err != nil {
return err
}
c.alive.Store(true)
c.status.Alive = true
c.conn = conn
if c.useHeartBeat {
go c.Heartbeat()
}
return c.clientPostInit()
} }
func (star *StarNotifyC) trim(name string) string { func (c *ClientCommon) monitorPool() {
var slash bool = false for {
var key []byte select {
for _, v := range []byte(name) { case <-c.stopCtx.Done():
if v == byte(92) && !slash { c.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
slash = true data := v.(WaitMsg)
continue close(data.Reply)
c.noFinSyncMsgPool.Delete(k)
return true
})
return
case <-time.After(time.Second * 30):
}
now := time.Now()
if c.noFinSyncMsgMaxKeepSeconds > 0 {
c.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
if data.Time.Add(time.Duration(c.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
close(data.Reply)
c.noFinSyncMsgPool.Delete(k)
}
return true
})
} }
slash = false
key = append(key, v)
} }
return string(key)
} }
// SendValueWait 用于向Server端发送key-value类型数据并等待结果返回此结果不会通过标准返回流程处理 func (c *ClientCommon) SkipExchangeKey() bool {
func (star *StarNotifyC) SendValueWait(name, value string, tmout time.Duration) (CMsg, error) { return c.skipKeyExchange
var err error }
var tmceed <-chan time.Time
if star.UseChannel { func (c *ClientCommon) SetSkipExchangeKey(val bool) {
return CMsg{}, errors.New("Do Not Use UseChannel Mode!") c.skipKeyExchange = val
} }
rand.Seed(time.Now().UnixNano())
mode := "cr" + fmt.Sprintf("%05d", rand.Intn(99999)) func (c *ClientCommon) clientPostInit() error {
var key []byte go c.readMessage()
for _, v := range []byte(name) { go c.loadMessage()
if v == byte(124) || v == byte(92) { if !c.skipKeyExchange {
key = append(key, byte(92)) err := c.keyExchangeFn(c)
if err != nil {
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "key exchange failed",
Err: err,
}
c.mu.Unlock()
c.stopFn()
return err
} }
key = append(key, v)
} }
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(mode + "||" + string(key) + "||" + value))) return nil
if err != nil { }
return CMsg{}, err func NewClient() Client {
var client = ClientCommon{
maxReadTimeout: 0,
maxWriteTimeout: 0,
sequenceEn: encode,
sequenceDe: Decode,
keyExchangeFn: aesRsaHello,
SecretKey: defaultAesKey,
handshakeRsaPubKey: defaultRsaPubKey,
msgEn: defaultMsgEn,
msgDe: defaultMsgDe,
} }
if int64(tmout) > 0 { client.alive.Store(false)
tmceed = time.After(tmout) //heartbeat should not controlable for user
client.useHeartBeat = true
client.heartbeatPeriod = time.Second * 20
client.linkFns = make(map[string]func(*Message))
client.defaultFns = func(message *Message) {
return
} }
var source CMsg client.wg = stario.NewWaitGroup(0)
source.wait = make(chan int, 2) client.stopCtx, client.stopFn = context.WithCancel(context.Background())
star.lockPool[mode] = source return &client
select { }
case <-source.wait:
res := star.lockPool[mode] func (c *ClientCommon) Heartbeat() {
delete(star.lockPool, mode) failedCount := 0
return res, nil for {
case <-tmceed: select {
return CMsg{}, errors.New("Time Exceed") case <-c.stopCtx.Done():
} return
} case <-time.After(c.heartbeatPeriod):
}
// ReplyMsg 用于向Server端Reply信息 _, err := c.sendWait(TransferMsg{
func (star *StarNotifyC) ReplyMsg(data CMsg, name, value string) error { ID: 10000,
var err error Key: "heartbeat",
var key []byte Value: nil,
for _, v := range []byte(name) { Type: MSG_SYS_WAIT,
if v == byte(124) || v == byte(92) { }, time.Second*5)
key = append(key, byte(92)) if err == nil {
c.lastHeartbeat = time.Now().Unix()
failedCount = 0
}
if c.debugMode {
fmt.Println("failed to recv heartbeat,timeout!")
}
failedCount++
if failedCount >= 3 {
if c.debugMode {
fmt.Println("heatbeat failed more than 3 times,stop client")
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "heartbeat failed more than 3 times",
Err: errors.New("heartbeat failed more than 3 times"),
}
c.mu.Unlock()
c.stopFn()
return
} }
key = append(key, v)
} }
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(data.mode + "||" + string(key) + "||" + value)))
return err
} }
func (star *StarNotifyC) cnotify() { func (c *ClientCommon) ShowError(std bool) {
c.mu.Lock()
c.showError = std
c.mu.Unlock()
}
func (c *ClientCommon) readMessage() {
for { for {
select { select {
case <-star.stopSign.Done(): case <-c.stopCtx.Done():
c.conn.Close()
return return
default: default:
} }
data, err := star.Queue.RestoreOne() data := make([]byte, 8192)
if err != nil { if c.maxReadTimeout.Seconds() != 0 {
time.Sleep(time.Millisecond * 20) if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
continue //TODO:ALERT
}
} }
if string(data.Msg) == "b612ryzstop" { readNum, err := c.conn.Read(data)
star.ClientStop() if err == os.ErrDeadlineExceeded {
star.Online = false if readNum != 0 {
return c.queue.ParseMessage(data[:readNum], "b612")
}
continue
} }
strs := strings.SplitN(string(data.Msg), "||", 3) if err != nil {
if len(strs) < 3 { if c.showError || c.debugMode {
fmt.Println("client read error", err)
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "client read error",
Err: err,
}
c.mu.Unlock()
c.stopFn()
continue continue
} }
strs[1] = star.trim(strs[1]) c.queue.ParseMessage(data[:readNum], "b612")
if star.UseChannel { }
go star.store(strs[1], strs[2]) }
} else {
mode, key, value := strs[0], strs[1], strs[2] func (c *ClientCommon) sayGoodBye() error {
if mode[0:2] != "cr" { _, err := c.sendWait(TransferMsg{
if msg, ok := star.FuncLists[key]; ok { ID: 10010,
go msg(CMsg{key, value, mode, nil}) Key: "bye",
} else { Value: nil,
if star.defaultFunc != nil { Type: MSG_SYS_WAIT,
go star.defaultFunc(CMsg{key, value, mode, nil}) }, time.Second*3)
return err
}
func (c *ClientCommon) loadMessage() {
for {
select {
case <-c.stopCtx.Done():
//say goodbye
if !c.byeFromServer {
c.sayGoodBye()
}
c.conn.Close()
return
case data, ok := <-c.queue.RestoreChan():
if !ok {
continue
}
c.wg.Add(1)
go func(data stario.MsgQueue) {
defer c.wg.Done()
//fmt.Println("c received:", float64(time.Now().UnixNano()-nowd)/1000000)
now := time.Now()
//transfer to Msg
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg))
if err != nil {
if c.showError || c.debugMode {
fmt.Println("client decode data error", err)
} }
return
} }
} else { message := Message{
if sa, ok := star.lockPool[mode]; ok { ServerConn: c,
sa.Key = key TransferMsg: msg.(TransferMsg),
sa.Value = value NetType: NET_CLIENT,
sa.mode = mode
star.lockPool[mode] = sa
sa.wait <- 1
} else {
if msg, ok := star.FuncLists[key]; ok {
go msg(CMsg{key, value, mode, nil})
} else {
if star.defaultFunc != nil {
go star.defaultFunc(CMsg{key, value, mode, nil})
}
}
} }
} message.Time = now
c.dispatchMsg(message)
}(data)
}
}
}
func (c *ClientCommon) dispatchMsg(message Message) {
switch message.TransferMsg.Type {
case MSG_SYS_WAIT:
fallthrough
case MSG_SYS:
c.sysMsg(message)
return
case MSG_KEY_CHANGE:
fallthrough
case MSG_SYS_REPLY:
fallthrough
case MSG_SYNC_REPLY:
data, ok := c.noFinSyncMsgPool.Load(message.ID)
if ok {
wait := data.(WaitMsg)
wait.Reply <- message
c.noFinSyncMsgPool.Delete(message.ID)
return
}
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
fn(&message)
}
fn, ok := c.linkFns[message.Key]
if ok {
callFn(fn)
}
if c.defaultFns != nil {
callFn(c.defaultFns)
}
}
func (c *ClientCommon) sysMsg(message Message) {
switch message.Key {
case "bye":
if message.TransferMsg.Type == MSG_SYS_WAIT {
//fmt.Println("recv stop signal from server")
c.byeFromServer = true
message.Reply(nil)
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "recv stop signal from server",
Err: nil,
}
c.mu.Unlock()
c.stopFn()
}
}
func (c *ClientCommon) SetDefaultLink(fn func(message *Message)) {
c.defaultFns = fn
}
func (c *ClientCommon) SetLink(key string, fn func(*Message)) {
c.mu.Lock()
defer c.mu.Unlock()
c.linkFns[key] = fn
}
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&c.msgID, 1)
}
data, err := c.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = c.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
}
_, err = c.conn.Write(data)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_KEY_CHANGE || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
c.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
}
func (c *ClientCommon) Send(key string, value MsgVal) error {
_, err := c.send(TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
select {
case <-time.After(timeout):
close(data.Reply)
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
}
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
close(data.Reply)
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
} }
return msg, nil
}
}
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
data, err := c.sequenceEn(val)
if err != nil {
return Message{}, err
}
return c.sendCtx(TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendObj(key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = c.send(TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
return c.sendCtx(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
return c.sendWait(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := c.sequenceEn(value)
if err != nil {
return Message{}, err
}
return c.SendWait(key, data, timeout)
}
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
pubKey, err := starcrypto.DecodeRsaPublicKey(c.handshakeRsaPubKey)
if err != nil {
return err
}
newSendKey, err := starcrypto.RSAEncrypt(pubKey, newKey)
if err != nil {
return err
}
data, err := c.sendWait(TransferMsg{
ID: 19961127,
Key: "sirius",
Value: newSendKey,
Type: MSG_KEY_CHANGE,
}, time.Second*10)
if err != nil {
return err
} }
if string(data.Value) != "success" {
return errors.New("cannot exchange new aes-key")
}
c.SecretKey = newKey
time.Sleep(time.Millisecond * 100)
return nil
}
func aesRsaHello(c Client) error {
newAesKey := []byte(fmt.Sprintf("%d%d%d%s", time.Now().UnixNano(), rand.Int63(), rand.Int63(), "b612.me"))
newAesKey = []byte(starcrypto.Md5Str(newAesKey))
return c.ExchangeKey(newAesKey)
}
func (c *ClientCommon) GetMsgEn() func([]byte, []byte) []byte {
return c.msgEn
}
func (c *ClientCommon) SetMsgEn(fn func([]byte, []byte) []byte) {
c.msgEn = fn
}
func (c *ClientCommon) GetMsgDe() func([]byte, []byte) []byte {
return c.msgDe
}
func (c *ClientCommon) SetMsgDe(fn func([]byte, []byte) []byte) {
c.msgDe = fn
}
func (c *ClientCommon) HeartbeatPeroid() time.Duration {
return c.heartbeatPeriod
}
func (c *ClientCommon) SetHeartbeatPeroid(duration time.Duration) {
c.heartbeatPeriod = duration
} }
// ClientStop 终止client端运行 func (c *ClientCommon) GetSecretKey() []byte {
func (star *StarNotifyC) ClientStop() { return c.SecretKey
if star.isUDP { }
star.Send("b612ryzstop") func (c *ClientCommon) SetSecretKey(key []byte) {
c.SecretKey = key
}
func (c *ClientCommon) RsaPubKey() []byte {
return c.handshakeRsaPubKey
}
func (c *ClientCommon) SetRsaPubKey(key []byte) {
c.handshakeRsaPubKey = key
}
func (c *ClientCommon) Stop() error {
if !c.alive.Load().(bool) {
return nil
} }
star.cancel() c.alive.Store(false)
star.Stop <- 1 c.mu.Lock()
star.Stop <- 1 c.status = Status{
star.Stop <- 1 Alive: false,
Reason: "recv stop signal from user",
Err: nil,
}
c.mu.Unlock()
c.stopFn()
return nil
}
func (c *ClientCommon) StopMonitorChan() <-chan struct{} {
return c.stopCtx.Done()
} }
// SetNotify 用于设置关键词的调用函数 func (c *ClientCommon) Status() Status {
func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) { return c.status
star.FuncLists[name] = data
} }
// SetDefaultNotify 用于设置默认关键词的调用函数 func (c *ClientCommon) GetSequenceEn() func(interface{}) ([]byte, error) {
func (star *StarNotifyC) SetDefaultNotify(data func(CMsg)) { return c.sequenceEn
star.defaultFunc = data }
func (c *ClientCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
c.sequenceEn = fn
}
func (c *ClientCommon) GetSequenceDe() func([]byte) (interface{}, error) {
return c.sequenceDe
}
func (c *ClientCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
c.sequenceDe = fn
} }

@ -1,149 +0,0 @@
package notify
import (
"fmt"
"testing"
"time"
)
func Test_usechannel(t *testing.T) {
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value)
if data.Value != "" {
data.Reply("nba")
return "nb"
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.Send("nihao")
client.SendValue("nihao", "lalala")
txt := <-client.Notify("nihao")
fmt.Println("client", txt)
txt = <-client.Notify("nihao")
fmt.Println("client", txt)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}
func Test_nochannel(t *testing.T) {
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value)
if data.Value != "" {
data.Reply("nbaz")
return ""
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.UseChannel = false
client.SetNotify("nihao", func(data CMsg) {
fmt.Println("client recv:", data.Key, data.Value)
if data.Value != "" {
time.Sleep(time.Millisecond * 900)
client.SendValue("nihao", "dsb")
}
})
client.SendValue("nihao", "lalala")
time.Sleep(time.Second * 3)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}
func Test_pipec(t *testing.T) {
server, err := NewNotifyS("tcp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("ni\\||hao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value, data.mode)
if data.Value != "" {
data.Reply("nba")
return ""
}
return ""
})
client, err := NewNotifyC("tcp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
client.UseChannel = false
sa, err := client.SendValueWait("ni\\||hao", "lalaeee", time.Second*10)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(sa)
fmt.Println("sukidesu")
time.Sleep(time.Second * 3)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 2)
}
func Test_pips(t *testing.T) {
var testmsg SMsg
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value, data.mode)
testmsg = data
if data.Value != "" {
data.Reply("nbaz")
return ""
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.UseChannel = false
client.SetNotify("nihao", func(data CMsg) {
fmt.Println("client recv:", data.Key, data.Value, data.mode)
if data.mode != "pa" {
time.Sleep(time.Millisecond * 1200)
client.ReplyMsg(data, "nihao", "dsb")
}
})
client.SendValue("nihao", "lalala")
time.Sleep(time.Second * 3)
fmt.Println(server.SendWait(testmsg, "nihao", "wozuinb", time.Second*20))
fmt.Println("sakura")
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}

@ -0,0 +1,51 @@
package notify
import (
"context"
"time"
)
type Client interface {
SetDefaultLink(func(message *Message))
SetLink(string, func(*Message))
send(msg TransferMsg) (WaitMsg, error)
sendWait(msg TransferMsg, timeout time.Duration) (Message, error)
Send(key string, value MsgVal) error
SendWait(key string, value MsgVal, timeout time.Duration) (Message, error)
SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error)
SendCtx(ctx context.Context, key string, value MsgVal) (Message, error)
Reply(m Message, value MsgVal) error
ExchangeKey(newKey []byte) error
Connect(network string, addr string) error
ConnectTimeout(network string, addr string, timeout time.Duration) error
SkipExchangeKey() bool
SetSkipExchangeKey(bool)
GetMsgEn() func([]byte, []byte) []byte
SetMsgEn(func([]byte, []byte) []byte)
GetMsgDe() func([]byte, []byte) []byte
SetMsgDe(func([]byte, []byte) []byte)
Heartbeat()
HeartbeatPeroid() time.Duration
SetHeartbeatPeroid(duration time.Duration)
GetSecretKey() []byte
SetSecretKey(key []byte)
RsaPubKey() []byte
SetRsaPubKey([]byte)
Stop() error
StopMonitorChan() <-chan struct{}
Status() Status
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error))
GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error))
SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error)
SendObj(key string, val interface{}) error
}

@ -0,0 +1,97 @@
package notify
import (
"b612.me/starcrypto"
"log"
)
var defaultRsaKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIJKAIBAAKCAgEAxmeMqr9yfJFKZn26oe/HvC7bZXNLC9Nk55AuTkb4XuIoqXDb
AJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT0ZCEf37ILU0G+scRzVwYHiLMwOUC
bS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6VQZdiUGJHbgX4MdOdtf/6TvxZMwSX
U+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU5lvoMdCGDqpcF2j2uO7oJJUww01M
3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/DSSjvqNZJZuWnxu+cDxE7J/sBvdp
eNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r7irllvb8WJPK/RIMif+4Rg7rDKFb
j8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN7TNTinCv092y39T8jIARJ7tpfePh
NBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlDhq+q8YLcnKHvNKYVyCf/upExpAiA
rr88y/KbeKes0KorKkwMBnGUMTothWM25wHozcurixNvP4UMWX7LWD7vOZZuNDQN
utZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SePUMJNDyjfDUJM8C2DOlyhGNPkgazO
GdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9GGoBHEfvmAoGGrk4qNbjm7JECAwEA
AQKCAgBYzHe05ELFZfG6tYMWf08R9pbTbSqlfFOpIGrZNgJr1SUF0TDzq+3bCXpF
qtn4VAw1en/JZkOV8Gp1+Bm6jWymWtwyg/fr7pG1I+vf0dwpgMHLg7P2UX1IjXmd
S4a4oEuds69hJ+OLZFsdm0ATeM7ssGicOaBmqd1Pz7rCfnL1bxQtNVzVex1r/paG
o77YNr3HoKCwhCPaPM4aQ7sOWSMUhwYBZabaYX0eLShf1O2pkexlPO+tobPpSLmx
WzRYZ6QC0AGEq9hwT6KsfCFA5pmQtFllNY7suhpL1AsECLWAgoMNCyb1oW68NBpq
CiBK5WBPGH2MW+pE74Pu1P0gen6kLGnApKQjprE1aGuR+xkZe3uEnXwSryU9TXki
wINTEMsX8dkmofFqaJhUwSubrb+t7gvv9E9ZZe0X6UgKzAVVqvh4z1pP8VT+xHpu
pW7SR8n9cFddaEPUijSb1rSpJrNzfJJ+G7yrB7Cw2kBgQ07vzD3z/3kA9cwFevLS
mv3l3OQuB6y9c+AG3cX5WGAt/BVOLjimj9qJt+YglG0SwG31U0PUnnx6QVz/UtJm
CbJQ2TpJd+mk0HyuMU+eycp7BWF3PMN+SE4QgKCKWnhsLeAd3gcvifsbLOYE1OPg
wv1tqyJy0VsJiSn6Ub6Qq0kPLwCLlQTnLWk5mIhnRpHYufTSwQKCAQEA4gS4FKPU
tAcQ82dEYW4OjGfhNWrjFpF+A8K5zufleQWcgzQ3fQho13zH0vZobukfkEVlVxla
OIVk7ZgNA4mCSFrATjIx3RMqzrAUvTte0O4wkjYgCwVvTdS1W8nvRLKgugLygyoo
r+MLW5IT3eNMK/2fZbftNlAkbc7NCo3c2tS6MXFgjx5JUuzChOY73Kp4p5KS38L5
wRRiI8KTIKjBjMZ5q/l8VLKX89bKOCaWibmItoXY6QMbIjargb7YLp3X6uGEyGIu
VhPbQ80/+OC2ZqIvDecp4PYnJNZFeqfjyfhJCNqDjBKYwIscBLMU/Wf9OY258OR4
snQaerN1M0h9lQKCAQEA4LkZIRLLw+8bIVM+7VXxFwOAGy+MH35tvuNIToItAoUh
zjL5LG34PjID8J0DPyP8VRVanak1EcxF0aTEkvnt2f2RAVsW89ytcn8Lybb12Ae8
ia2ZWuIM+J40nuKOGPs3lJ9HqdPWmZYWsWKxFJmYBBnwD6CADYqhqambQn0HeaYl
/WUD7blLYg+4Kk1mt9/hIw93jTWP/86O2H0ia+AhYPTqyvVXfIXKhat6NlOYksGf
Hdv+aCC8Ukg6FyEgiNc/rFn0MWPnEX+cM1AwubviHIBhV8QWILLBTjupwsEBZVah
60ftH+HRUCmEeOpI7jyzIlfEUNLoBHfswKMhMPtcDQKCAQEA0JFkQX+xn/PJW6PX
AUWrXTvbIg0hw8i9DcFa76klJBnehWDhN5tUDE5Uo8PJOVgdTWgMjWSS0geezHX8
xF/XfudoAIDnbMfsP9FTQhCQfaLf5XzW8vSv8pWwSiS9jJp+IUjo+8siwrR03aqe
dKr0tr+ToS0qVG1+QGqO4gdpX/LgYxHp9ggPx9s94aAIa6hQMOrcaGqnSNqDedZr
KL8x5LOewek3J32rJVP3Rfut/SfeFfjL4rKADoF+oPs4yUPVZSV4/+VCNyKZuyaj
uwm6qFlPrLe9+J+OHbsxYG+fj9hzpRzoOZFLrppwX5HWc8XLcpnrlXVwP9VOPh5u
r8VcRQKCAQAJFHGHfJLvH8Ig3pQ0UryjCWkrsAghXaJhjB1nzqqy514uTrDysp7N
JIg0OKPg8TtI1MwMgsG6Ll7D0bx/k8mgfTZWr6+FuuznK2r2g4X7bJSZm4IOwgN0
KDBIGy9SoxPj1Wu32O9a1U2lbS9qfao+wC2K9Bk4ctmFWW0Eiri6mZP/YQ1/lXUO
SURPsUDtPQaDvCRAeGGRHG95H9U8NpoiqMKz4KXgSiecrwkJGOeZRml/c1wcKPZy
/KgcNyJxZQEVnazYMgksE9Pj3uGZH5ZLQISuXyXlvFNDLfX2AIZl6dIxB371QtKK
QqMvn4fC2IEEajdsbJkjVRUj03OL3xwhAoIBAAfMhDSvBbDkGTaXnNMjPPSbswqK
qcSRhSG27mjs1dDNBKuFbz6TkIOp4nxjuS9Zp19fErXlAE9mF5yXSmuiAkZmWfhs
HKpWIdjFJK1EqSfcINe2YuoyUIulz9oG7ObRHD4D8jSPjA8Ete+XsBHGyOtUl09u
X4u9uClhqjK+r1Tno2vw5yF6ZxfQtdWuL4W0UL1S8E+VO7vjTjNOYvgjAIpAM/gW
sqjA2Qw52UZqhhLXoTfRvtJilxlXXhIRJSsnUoGiYVCQ/upjqJCClEvJfIWdGY/U
I2CbFrwJcNvOG1lUsSM55JUmbrSWVPfo7yq2k9GCuFxOy2n/SVlvlQUcNkA=
-----END RSA PRIVATE KEY-----`)
var defaultRsaPubKey = []byte(`-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAxmeMqr9yfJFKZn26oe/H
vC7bZXNLC9Nk55AuTkb4XuIoqXDbAJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT
0ZCEf37ILU0G+scRzVwYHiLMwOUCbS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6V
QZdiUGJHbgX4MdOdtf/6TvxZMwSXU+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU
5lvoMdCGDqpcF2j2uO7oJJUww01M3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/
DSSjvqNZJZuWnxu+cDxE7J/sBvdpeNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r
7irllvb8WJPK/RIMif+4Rg7rDKFbj8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN
7TNTinCv092y39T8jIARJ7tpfePhNBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlD
hq+q8YLcnKHvNKYVyCf/upExpAiArr88y/KbeKes0KorKkwMBnGUMTothWM25wHo
zcurixNvP4UMWX7LWD7vOZZuNDQNutZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SeP
UMJNDyjfDUJM8C2DOlyhGNPkgazOGdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9G
GoBHEfvmAoGGrk4qNbjm7JECAwEAAQ==
-----END PUBLIC KEY-----`)
var defaultAesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133}
func defaultMsgEn(key []byte, d []byte) []byte {
data, err := starcrypto.CustomEncryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func defaultMsgDe(key []byte, d []byte) []byte {
data, err := starcrypto.CustomDecryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func init() {
RegisterName("b612.me/notify.Transfer", TransferMsg{})
}

@ -0,0 +1,8 @@
module b612.me/notify
go 1.16
require (
b612.me/starcrypto v0.0.5
b612.me/stario v0.0.10
)

@ -0,0 +1,75 @@
b612.me/starcrypto v0.0.5 h1:Aa4pRDO2lBH2Aw+vz8NuUtRb73J8z5aOa9SImBY5sq4=
b612.me/starcrypto v0.0.5/go.mod h1:pF5A16p8r/h1G0x7ZNmmAF6K1sdIMpbCUxn2WGC8gZ0=
b612.me/stario v0.0.0-20240818091810-d528a583f4b2 h1:SxN1WDZsEBQFTnLaKbc7Z+91uyWhUB4cKHo5Ucztyh0=
b612.me/stario v0.0.0-20240818091810-d528a583f4b2/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
b612.me/stario v0.0.10/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

502
msg.go

@ -0,0 +1,502 @@
package notify
import (
"b612.me/starcrypto"
"context"
"errors"
"fmt"
"net"
"os"
"reflect"
"sync/atomic"
"time"
)
const (
MSG_SYS MessageType = iota
MSG_SYS_WAIT
MSG_SYS_REPLY
MSG_KEY_CHANGE
MSG_ASYNC
MSG_SYNC_ASK
MSG_SYNC_REPLY
)
type MessageType uint8
type NetType uint8
const (
NET_SERVER NetType = iota
NET_CLIENT
)
type MsgVal []byte
type TransferMsg struct {
ID uint64
Key string
Value MsgVal
Type MessageType
}
type Message struct {
NetType
ClientConn *ClientConn
ServerConn Client
TransferMsg
Time time.Time
}
type WaitMsg struct {
TransferMsg
Time time.Time
Reply chan Message
//Ctx context.Context
}
func (m *Message) Reply(value MsgVal) (err error) {
reply := TransferMsg{
ID: m.ID,
Key: m.Key,
Value: value,
Type: m.Type,
}
if reply.Type == MSG_SYNC_ASK {
reply.Type = MSG_SYNC_REPLY
}
if reply.Type == MSG_SYS_WAIT {
reply.Type = MSG_SYS_REPLY
}
if m.NetType == NET_SERVER {
_, err = m.ClientConn.server.send(m.ClientConn, reply)
}
if m.NetType == NET_CLIENT {
_, err = m.ServerConn.send(reply)
}
return
}
func (m *Message) ReplyObj(value interface{}) (err error) {
data, err := encode(value)
if err != nil {
return err
}
return m.Reply(data)
}
type ClientConn struct {
alive atomic.Value
status Status
ClientID string
ClientAddr net.Addr
tuConn net.Conn
server Server
stopFn context.CancelFunc
stopCtx context.Context
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
msgEn func([]byte, []byte) []byte
msgDe func([]byte, []byte) []byte
handshakeRsaKey []byte
SecretKey []byte
lastHeartBeat int64
}
type Status struct {
Alive bool
Reason string
Err error
}
func (c *ClientConn) readTUMessage() {
for {
select {
case <-c.stopCtx.Done():
c.tuConn.Close()
c.server.removeClient(c)
return
default:
}
if c.maxReadTimeout.Seconds() > 0 {
c.tuConn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
}
data := make([]byte, 8192)
num, err := c.tuConn.Read(data)
if err == os.ErrDeadlineExceeded {
if num != 0 {
c.server.pushMessage(data[:num], c.ClientID)
}
continue
}
if err != nil {
//conn is broke
c.alive.Store(false)
c.status = Status{
Alive: false,
Reason: "read error",
Err: err,
}
c.stopFn()
continue
}
c.server.pushMessage(data[:num], c.ClientID)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
}
}
func (c *ClientConn) rsaDecode(message Message) {
privKey, err := starcrypto.DecodeRsaPrivateKey(c.handshakeRsaKey, "")
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
data, err := starcrypto.RSADecrypt(privKey, message.Value)
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
//fmt.Println("aes-key changed to", string(data))
message.Reply([]byte("success"))
c.SecretKey = data
}
func (c *ClientConn) sayGoodByeForTU() error {
_, err := c.server.sendWait(c, TransferMsg{
ID: 10010,
Key: "bye",
Value: nil,
Type: MSG_SYS_WAIT,
}, time.Second*3)
return err
}
func (c *ClientConn) GetSecretKey() []byte {
return c.SecretKey
}
func (c *ClientConn) SetSecretKey(key []byte) {
c.SecretKey = key
}
func (c *ClientConn) GetMsgEn() func([]byte, []byte) []byte {
return c.msgEn
}
func (c *ClientConn) SetMsgEn(fn func([]byte, []byte) []byte) {
c.msgEn = fn
}
func (c *ClientConn) GetMsgDe() func([]byte, []byte) []byte {
return c.msgDe
}
func (c *ClientConn) SetMsgDe(fn func([]byte, []byte) []byte) {
c.msgDe = fn
}
func (c *ClientConn) StopMonitorChan() <-chan struct{} {
return c.stopCtx.Done()
}
func (c *ClientConn) Status() Status {
return c.status
}
func (c *ClientConn) Server() Server {
return c.server
}
func (c *ClientConn) GetRemoteAddr() net.Addr {
return c.ClientAddr
}
func (m MsgVal) ToClearString() string {
return string(m)
}
func (m MsgVal) ToInterface() (interface{}, error) {
return Decode(m)
}
func (m MsgVal) MustToInterface() interface{} {
inf, err := m.ToInterface()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToString() (string, error) {
inf, err := m.ToInterface()
if err != nil {
return "", err
}
if data, ok := inf.(string); !ok {
return "", errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToString() string {
inf, err := m.ToString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt32() (int32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt32() int32 {
inf, err := m.ToInt32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt() (int, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt() int {
inf, err := m.ToInt()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint64() (uint64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint64() uint64 {
inf, err := m.ToUint64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint32() (uint32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint32() uint32 {
inf, err := m.ToUint32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint() (uint, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint() uint {
inf, err := m.ToUint()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToBool() (bool, error) {
inf, err := m.ToInterface()
if err != nil {
return false, err
}
if data, ok := inf.(bool); !ok {
return false, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToBool() bool {
inf, err := m.ToBool()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat64() (float64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat64() float64 {
inf, err := m.ToFloat64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat32() (float32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat32() float32 {
inf, err := m.ToFloat32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceString() ([]string, error) {
inf, err := m.ToInterface()
if err != nil {
return []string{}, err
}
if data, ok := inf.([]string); !ok {
return []string{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceString() []string {
inf, err := m.ToSliceString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceInt64() ([]int64, error) {
inf, err := m.ToInterface()
if err != nil {
return []int64{}, err
}
if data, ok := inf.([]int64); !ok {
return []int64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceInt64() []int64 {
inf, err := m.ToSliceInt64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceFloat64() ([]float64, error) {
inf, err := m.ToInterface()
if err != nil {
return []float64{}, err
}
if data, ok := inf.([]float64); !ok {
return []float64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceFloat64() []float64 {
inf, err := m.ToSliceFloat64()
if err != nil {
panic(err)
}
return inf
}
func ToMsgVal(val interface{}) (MsgVal, error) {
return Encode(val)
}
func MustToMsgVal(val interface{}) MsgVal {
d, err := ToMsgVal(val)
if err != nil {
panic(err)
}
return d
}
func (m MsgVal) Orm(stu interface{}) error {
inf, err := m.ToInterface()
if err != nil {
return err
}
t := reflect.TypeOf(stu)
if t.Kind() != reflect.Ptr {
return errors.New("interface not writable(pointer wanted)")
}
if !reflect.ValueOf(stu).Elem().CanSet() {
return errors.New("interface not writable")
}
it := reflect.TypeOf(inf)
if t.Elem().Kind() != it.Kind() {
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
}
if t.Elem().Name() != it.Name() {
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
}
if t.Elem().String() != it.String() {
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
}
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
return nil
}

@ -0,0 +1,96 @@
package notify
import (
"fmt"
"testing"
"time"
)
func TestMsgEnDeCode(t *testing.T) {
Register(HelloMessage{})
Register(Error{})
go ServerRun(time.Second * 30)
time.Sleep(time.Second * 2)
ClientRun(time.Second * 35)
}
type Error struct {
Msg string
}
func (e Error) Error() string {
return e.Msg
}
type WorldMessage struct {
Port int
MyCode string
MyInfo []int
Err error
}
type HelloMessage struct {
ID string
MyMap map[string]string
MySlice []int
World WorldMessage
}
func ClientRun(stopTime time.Duration) {
c := NewClient()
err := c.Connect("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
c.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from server,struct detail is %+v\n", hi)
})
timer := time.NewTimer(stopTime)
for {
select {
case <-timer.C:
c.Stop()
return
case <-time.After(time.Second * 2):
fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
ID: "client",
MyMap: map[string]string{"hello": "world"},
MySlice: []int{int(time.Now().Unix())},
World: WorldMessage{
Port: 520,
MyCode: "b612",
MyInfo: []int{0, 1, 2, 3},
Err: Error{Msg: "Hello World"},
},
}))
}
}
}
func ServerRun(stopTime time.Duration) {
s := NewServer()
err := s.Listen("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
s.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
hi.ID = "server"
hi.World.Port = 666
hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
msg.ReplyObj(hi)
})
<-time.After(stopTime)
s.Stop()
}

@ -0,0 +1,44 @@
package notify
import (
"bytes"
"encoding/gob"
)
func Register(data interface{}) {
gob.Register(data)
}
func RegisterName(name string, data interface{}) {
gob.RegisterName(name, data)
}
func RegisterAll(data []interface{}) {
for _, v := range data {
gob.Register(v)
}
}
func RegisterNames(data map[string]interface{}) {
for k, v := range data {
gob.RegisterName(k, v)
}
}
func encode(src interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(&src)
return buf.Bytes(), err
}
func Encode(src interface{}) ([]byte, error) {
return encode(src)
}
func Decode(src []byte) (interface{}, error) {
dec := gob.NewDecoder(bytes.NewReader(src))
var dst interface{}
err := dec.Decode(&dst)
return dst, err
}

@ -1,369 +1,693 @@
// Package notify is a package which provide common tcp/udp/unix socket service
package notify package notify
import ( import (
"b612.me/stario"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"net" "net"
"os"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"b612.me/starainrt"
) )
var builder *starainrt.StarQueue type ServerCommon struct {
msgID uint64
func init() { alive atomic.Value
builder = starainrt.NewQueue() status Status
} listener net.Listener
udpListener *net.UDPConn
// StarNotifyS 为Server端 queue *stario.StarQueue
type StarNotifyS struct { stopFn context.CancelFunc
// Queue 是用来处理收发信息的简单消息队列 stopCtx context.Context
Queue *starainrt.StarQueue maxReadTimeout time.Duration
// FuncLists 记录了被通知项所记录的函数 maxWriteTimeout time.Duration
FuncLists map[string]func(SMsg) string parallelNum int
defaultFunc func(SMsg) string wg stario.WaitGroup
Connected func(SMsg) string clientPool map[string]*ClientConn
stopSign context.Context mu sync.RWMutex
cancel context.CancelFunc handshakeRsaKey []byte
connPool map[string]net.Conn SecretKey []byte
lockPool map[string]SMsg defaultMsgEn func([]byte, []byte) []byte
udpPool map[string]*net.UDPAddr defaultMsgDe func([]byte, []byte) []byte
isUDP bool linkFns map[string]func(message *Message)
// Stop 停止信 号 defaultFns func(message *Message)
Stop chan int noFinSyncMsgPool sync.Map
// UDPConn UDP监听 noFinSyncMsgMaxKeepSeconds int64
UDPConn *net.UDPConn maxHeartbeatLostSeconds int64
// Online 当前链接是否处于活跃状态 sequenceDe func([]byte) (interface{}, error)
Online bool sequenceEn func(interface{}) ([]byte, error)
// ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn showError bool
ReadDeadline time.Time debugMode bool
// WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn
WriteDeadline time.Time
// Deadline tcp/unix中超时设置,udp请直接调用UDPConn
Deadline time.Time
}
// SMsg 指明当前服务端被通知的关键字
type SMsg struct {
Conn net.Conn
Key string
Value string
UDP *net.UDPAddr
uconn *net.UDPConn
mode string
wait chan int
}
// GetConnPool 获取所有Client端信息
func (star *StarNotifyS) GetConnPool() []SMsg {
var result []SMsg
for _, v := range star.connPool {
result = append(result, SMsg{Conn: v, mode: "pa"})
}
for _, v := range star.udpPool {
result = append(result, SMsg{UDP: v, uconn: star.UDPConn, mode: "pa0"})
}
return result
}
func (nmsg *SMsg) addSlash(name string) string {
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
}
return string(key)
} }
// Reply 用于向client端回复数据 func NewServer() Server {
func (nmsg *SMsg) Reply(msg string) error { var server ServerCommon
var err error server.wg = stario.NewWaitGroup(0)
if nmsg.uconn == nil { server.parallelNum = 0
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte(nmsg.mode + "||" + nmsg.addSlash(nmsg.Key) + "||" + msg))) server.noFinSyncMsgMaxKeepSeconds = 0
} else { server.maxHeartbeatLostSeconds = 300
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte(nmsg.mode+"||"+nmsg.addSlash(nmsg.Key)+"||"+msg)), nmsg.UDP) server.stopCtx, server.stopFn = context.WithCancel(context.Background())
server.SecretKey = defaultAesKey
server.handshakeRsaKey = defaultRsaKey
server.clientPool = make(map[string]*ClientConn)
server.defaultMsgEn = defaultMsgEn
server.defaultMsgDe = defaultMsgDe
server.sequenceEn = encode
server.sequenceDe = Decode
server.alive.Store(false)
server.linkFns = make(map[string]func(*Message))
server.defaultFns = func(message *Message) {
return
} }
return err return &server
} }
// Send 用于向client端发送key-value数据 func (s *ServerCommon) DebugMode(dmg bool) {
func (nmsg *SMsg) Send(key, value string) error { s.mu.Lock()
var err error s.debugMode = dmg
if nmsg.uconn == nil { s.mu.Unlock()
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte("pa||" + nmsg.addSlash(key) + "||" + value))) }
} else {
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte("pa||"+nmsg.addSlash(key)+"||"+value)), nmsg.UDP) func (s *ServerCommon) IsDebugMode() bool {
} s.mu.RLock()
return err defer s.mu.RUnlock()
return s.debugMode
} }
// SendWait 用于向client端发送key-value数据并等待 func (s *ServerCommon) ShowError(std bool) {
func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Duration) (SMsg, error) { s.mu.Lock()
var err error s.showError = std
var tmceed <-chan time.Time s.mu.Unlock()
rand.Seed(time.Now().UnixNano()) }
mode := "sr" + fmt.Sprintf("%05d", rand.Intn(99999))
if source.uconn == nil { func (s *ServerCommon) Stop() error {
_, err = source.Conn.Write(builder.BuildMessage([]byte(mode + "||" + source.addSlash(key) + "||" + value))) if !s.alive.Load().(bool) {
} else { return nil
_, err = source.uconn.WriteToUDP(builder.BuildMessage([]byte(mode+"||"+source.addSlash(key)+"||"+value)), source.UDP)
} }
if err != nil { s.alive.Store(false)
return SMsg{}, err s.mu.Lock()
s.status = Status{
Alive: false,
Reason: "recv stop signal from user",
Err: nil,
} }
if int64(tmout) > 0 { s.mu.Unlock()
tmceed = time.After(tmout) s.stopFn()
return nil
}
func (s *ServerCommon) Listen(network string, addr string) error {
if s.alive.Load().(bool) {
return errors.New("server already run")
} }
source.wait = make(chan int, 2) s.stopCtx, s.stopFn = context.WithCancel(context.Background())
star.lockPool[mode] = source s.queue = stario.NewQueueCtx(s.stopCtx, 128, math.MaxUint32)
select { if strings.Contains(strings.ToLower(network), "udp") {
case <-source.wait: return s.ListenUDP(network, addr)
res := star.lockPool[mode]
delete(star.lockPool, mode)
return res, nil
case <-tmceed:
return SMsg{}, errors.New("Time Exceed")
} }
return s.ListenTU(network, addr)
} }
func (star *StarNotifyS) starinits() { func (s *ServerCommon) ListenTU(network string, addr string) error {
star.stopSign, star.cancel = context.WithCancel(context.Background()) listener, err := net.Listen(network, addr)
star.Queue = starainrt.NewQueue() if err != nil {
star.udpPool = make(map[string]*net.UDPAddr) return err
star.FuncLists = make(map[string]func(SMsg) string) }
star.connPool = make(map[string]net.Conn) s.alive.Store(true)
star.lockPool = make(map[string]SMsg) s.status.Alive = true
star.Stop = make(chan int, 5) s.listener = listener
star.Online = false go s.accept()
star.Queue.RestoreDuration(time.Second * 2) go s.monitorPool()
go s.loadMessage()
return nil
} }
// NewNotifyS 开启一个新的Server端通知 func (s *ServerCommon) monitorPool() {
func NewNotifyS(netype, value string) (*StarNotifyS, error) { for {
if netype[0:3] != "udp" { select {
return notudps(netype, value) case <-s.stopCtx.Done():
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
return true
})
return
case <-time.After(time.Second * 30):
}
now := time.Now()
if s.noFinSyncMsgMaxKeepSeconds > 0 {
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
if data.Time.Add(time.Duration(s.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
}
return true
})
}
if s.maxHeartbeatLostSeconds != 0 {
for _, v := range s.clientPool {
if now.Unix()-v.lastHeartBeat > s.maxHeartbeatLostSeconds {
v.stopFn()
s.removeClient(v)
}
}
}
} }
return doudps(netype, value)
} }
func doudps(netype, value string) (*StarNotifyS, error) { func (s *ServerCommon) SetDefaultCommEncode(fn func([]byte, []byte) []byte) {
var star StarNotifyS s.defaultMsgEn = fn
star.starinits() }
star.isUDP = true
udpaddr, err := net.ResolveUDPAddr(netype, value) func (s *ServerCommon) SetDefaultCommDecode(fn func([]byte, []byte) []byte) {
if err != nil { s.defaultMsgDe = fn
return nil, err }
func (s *ServerCommon) SetDefaultLink(fn func(message *Message)) {
s.defaultFns = fn
}
func (s *ServerCommon) SetLink(key string, fn func(*Message)) {
s.mu.Lock()
defer s.mu.Unlock()
s.linkFns[key] = fn
}
func (s *ServerCommon) pushMessage(data []byte, source string) {
s.queue.ParseMessage(data, source)
}
func (s *ServerCommon) removeClient(client *ClientConn) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.clientPool, client.ClientID)
}
func (s *ServerCommon) accept() {
if s.udpListener != nil {
s.acceptUDP()
} }
star.UDPConn, err = net.ListenUDP(netype, udpaddr) s.acceptTU()
if err != nil { }
return nil, err func (s *ServerCommon) acceptTU() {
} for {
go star.notify() select {
go func() { case <-s.stopCtx.Done():
<-star.stopSign.Done() if s.debugMode {
for k, v := range star.udpPool { fmt.Println("accept goroutine recv exit signal,exit")
star.UDPConn.WriteToUDP(star.Queue.BuildMessage([]byte("b612ryzstop")), v) }
delete(star.connPool, k) return
default:
} }
star.UDPConn.Close() conn, err := s.listener.Accept()
star.Online = false if err != nil {
return if s.showError || s.debugMode {
}() fmt.Println("error accept:", err)
go func() {
for {
buf := make([]byte, 8192)
n, addr, err := star.UDPConn.ReadFromUDP(buf)
if n != 0 {
star.Queue.ParseMessage(buf[0:n], addr)
if _, ok := star.udpPool[addr.String()]; !ok {
if star.Connected != nil {
go star.Connected(SMsg{UDP: addr, uconn: star.UDPConn})
}
}
star.udpPool[addr.String()] = addr
} }
if err != nil { continue
}
if s.debugMode {
fmt.Println("accept new connection from", conn.RemoteAddr())
}
var id string
for {
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63())
s.mu.RLock()
if _, ok := s.clientPool[id]; ok {
s.mu.RUnlock()
continue continue
} }
s.mu.RUnlock()
break
}
client := ClientConn{
ClientID: id,
ClientAddr: conn.RemoteAddr(),
tuConn: conn,
server: s,
maxReadTimeout: s.maxReadTimeout,
maxWriteTimeout: s.maxWriteTimeout,
SecretKey: s.SecretKey,
handshakeRsaKey: s.handshakeRsaKey,
msgEn: s.defaultMsgEn,
msgDe: s.defaultMsgDe,
lastHeartBeat: time.Now().Unix(),
} }
}() client.alive.Store(true)
star.Online = true client.status = Status{
return &star, nil Alive: true,
Reason: "",
Err: nil,
}
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
go client.readTUMessage()
}
} }
func notudps(netype, value string) (*StarNotifyS, error) { func (s *ServerCommon) loadMessage() {
var star StarNotifyS for {
star.starinits() select {
star.isUDP = false case <-s.stopCtx.Done():
listener, err := net.Listen(netype, value) var wg sync.WaitGroup
if err != nil { s.mu.RLock()
return nil, err for _, v := range s.clientPool {
} wg.Add(1)
go star.notify() go func(v *ClientConn) {
go func() { defer wg.Done()
<-star.stopSign.Done() v.sayGoodByeForTU()
for k, v := range star.connPool { v.alive.Store(false)
v.Close() v.status = Status{
delete(star.connPool, k) Alive: false,
} Reason: "recv stop signal from server",
listener.Close() Err: nil,
star.Online = false }
return v.stopFn()
}() s.removeClient(v)
go func() { }(v)
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-star.stopSign.Done():
listener.Close()
return
default:
continue
}
} }
if !star.ReadDeadline.IsZero() { s.mu.RUnlock()
conn.SetReadDeadline(star.ReadDeadline) select {
case <-time.After(time.Second * 8):
case <-stario.WaitUntilFinished(func() error {
wg.Wait()
return nil
}):
} }
if !star.WriteDeadline.IsZero() { if s.listener != nil {
conn.SetWriteDeadline(star.WriteDeadline) s.listener.Close()
} }
if !star.Deadline.IsZero() { s.wg.Wait()
conn.SetDeadline(star.Deadline) return
case data, ok := <-s.queue.RestoreChan():
if !ok {
continue
} }
go func(conn net.Conn) { s.wg.Add(1)
for { go func(data stario.MsgQueue) {
buf := make([]byte, 8192) s.mu.RLock()
n, err := conn.Read(buf) cc, ok := s.clientPool[data.Conn.(string)]
if n != 0 { s.mu.RUnlock()
star.Queue.ParseMessage(buf[0:n], conn) if !ok {
} return
if err != nil { }
conn.Close() //fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000)
delete(star.connPool, conn.RemoteAddr().String()) msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg))
break if err != nil {
if s.showError || s.debugMode {
fmt.Println("server decode data error", err)
} }
return
} }
}(conn) //fmt.Println("decoded:", float64(time.Now().UnixNano()-nowd)/1000000)
star.connPool[conn.RemoteAddr().String()] = conn message := Message{
if star.Connected != nil { NetType: NET_SERVER,
go star.Connected(SMsg{Conn: conn}) ClientConn: cc,
} TransferMsg: msg.(TransferMsg),
}
message.Time = time.Now()
//fmt.Println("dispatch:", float64(time.Now().UnixNano()-nowd)/1000000)
s.dispatchMsg(message)
}(data)
}
}
}
func (s *ServerCommon) sysMsg(message Message) {
switch message.Key {
case "bye":
//fmt.Println("recv stop signal from client", message.ClientConn.ClientID)
if message.TransferMsg.Type == MSG_SYS_WAIT {
message.Reply(nil)
}
message.ClientConn.alive.Store(false)
message.ClientConn.status = Status{
Alive: false,
Reason: "recv stop signal from client",
Err: nil,
}
message.ClientConn.stopFn()
case "heartbeat":
message.ClientConn.lastHeartBeat = time.Now().Unix()
message.Reply(nil)
}
}
func (s *ServerCommon) dispatchMsg(message Message) {
defer s.wg.Done()
switch message.TransferMsg.Type {
case MSG_SYS_WAIT:
fallthrough
case MSG_SYS:
s.sysMsg(message)
return
case MSG_KEY_CHANGE:
message.ClientConn.rsaDecode(message)
return
case MSG_SYS_REPLY:
fallthrough
case MSG_SYNC_REPLY:
data, ok := s.noFinSyncMsgPool.Load(message.TransferMsg.ID)
if ok {
wait := data.(WaitMsg)
wait.Reply <- message
s.noFinSyncMsgPool.Delete(message.TransferMsg.ID)
return
}
//just throw
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
fn(&message)
}
fn, ok := s.linkFns[message.TransferMsg.Key]
if ok {
callFn(fn)
}
if s.defaultFns != nil {
callFn(s.defaultFns)
}
}
func (s *ServerCommon) send(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
if s.udpListener != nil {
return s.sendUDP(c, msg)
}
return s.sendTU(c, msg)
}
func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&s.msgID, 1)
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
return WaitMsg{}, err
} }
}() }
star.Online = true _, err = c.tuConn.Write(data)
return &star, nil //fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
} }
// SetNotify 用于设置通知关键词的调用函数 func (s *ServerCommon) Send(c *ClientConn, key string, value MsgVal) error {
func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) { _, err := s.send(c, TransferMsg{
star.FuncLists[name] = data Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := s.send(c, msg)
if err != nil {
return Message{}, err
}
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
select {
case <-time.After(timeout):
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
} }
// SetDefaultNotify 用于设置默认关键词的调用函数 func (s *ServerCommon) SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error) {
func (star *StarNotifyS) SetDefaultNotify(data func(SMsg) string) { return s.sendCtx(c, TransferMsg{
star.defaultFunc = data Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
} }
func (star *StarNotifyS) trim(name string) string { func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Context) (Message, error) {
var slash bool = false data, err := s.send(c, msg)
var key []byte if err != nil {
for _, v := range []byte(name) { return Message{}, err
if v == byte(92) && !slash { }
slash = true if ctx == nil {
continue ctx = context.Background()
}
select {
case <-ctx.Done():
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrClosed
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
} }
slash = false return msg, nil
key = append(key, v) }
}
func (s *ServerCommon) SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error) {
return s.sendWait(c, TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (s *ServerCommon) SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := s.sequenceEn(value)
if err != nil {
return Message{}, err
}
return s.SendWait(c, key, data, timeout)
}
func (s *ServerCommon) SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error) {
data, err := s.sequenceEn(val)
if err != nil {
return Message{}, err
}
return s.sendCtx(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (s *ServerCommon) SendObj(c *ClientConn, key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = s.send(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}
//for udp below
func (s *ServerCommon) ListenUDP(network string, addr string) error {
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return err
}
listener, err := net.ListenUDP(network, udpAddr)
if err != nil {
return err
} }
return string(key) s.alive.Store(true)
s.status.Alive = true
s.udpListener = listener
go s.accept()
go s.monitorPool()
go s.loadMessage()
return nil
} }
func (star *StarNotifyS) notify() { func (s *ServerCommon) acceptUDP() {
for { for {
select { select {
case <-star.stopSign.Done(): case <-s.stopCtx.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return return
default: default:
} }
data, err := star.Queue.RestoreOne() if s.maxReadTimeout.Seconds() > 0 {
if err != nil { s.udpListener.SetReadDeadline(time.Now().Add(s.maxReadTimeout))
time.Sleep(time.Millisecond * 20)
continue
} }
mode, key, value := star.analyseData(string(data.Msg)) data := make([]byte, 4096)
var rmsg SMsg num, addr, err := s.udpListener.ReadFromUDP(data)
if !star.isUDP { id := addr.String()
rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil, mode, nil} if s.debugMode {
} else { fmt.Println("accept new udp message from", id)
rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn, mode, nil}
if key == "b612ryzstop" {
delete(star.udpPool, rmsg.UDP.String())
continue
}
} }
if mode[0:2] != "sr" { //fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000)
go func() { s.mu.RLock()
if msg, ok := star.FuncLists[key]; ok { if _, ok := s.clientPool[id]; !ok {
sdata := msg(rmsg) s.mu.RUnlock()
if sdata == "" { client := ClientConn{
return ClientID: id,
} ClientAddr: addr,
rmsg.Reply(sdata) server: s,
} else { maxReadTimeout: s.maxReadTimeout,
if star.defaultFunc != nil { maxWriteTimeout: s.maxWriteTimeout,
sdata := star.defaultFunc(rmsg) SecretKey: s.SecretKey,
if sdata == "" { handshakeRsaKey: s.handshakeRsaKey,
return msgEn: s.defaultMsgEn,
} msgDe: s.defaultMsgDe,
rmsg.Reply(sdata) lastHeartBeat: time.Now().Unix(),
} }
} client.stopCtx, client.stopFn = context.WithCancel(context.Background())
}() s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
} else { } else {
if sa, ok := star.lockPool[mode]; ok { s.mu.RUnlock()
rmsg.wait = sa.wait }
star.lockPool[mode] = rmsg if err == os.ErrDeadlineExceeded {
star.lockPool[mode].wait <- 1 if num != 0 {
} else { s.pushMessage(data[:num], id)
go func() {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}()
} }
continue
}
if err != nil {
continue
} }
s.pushMessage(data[:num], id)
}
}
func (s *ServerCommon) sendUDP(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = uint64(time.Now().UnixNano()) + rand.Uint64() + rand.Uint64()
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
s.udpListener.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
}
_, err = s.udpListener.WriteTo(data, c.ClientAddr)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
} }
return wait, err
}
func (s *ServerCommon) StopMonitorChan() <-chan struct{} {
return s.stopCtx.Done()
}
func (s *ServerCommon) Status() Status {
return s.status
}
func (s *ServerCommon) GetSecretKey() []byte {
return s.SecretKey
}
func (s *ServerCommon) SetSecretKey(key []byte) {
s.SecretKey = key
}
func (s *ServerCommon) RsaPrivKey() []byte {
return s.handshakeRsaKey
}
func (s *ServerCommon) SetRsaPrivKey(key []byte) {
s.handshakeRsaKey = key
}
func (s *ServerCommon) GetClient(id string) *ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
c, ok := s.clientPool[id]
if !ok {
return nil
}
return c
}
func (s *ServerCommon) GetClientLists() []*ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]*ClientConn, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v)
}
return list
}
func (s *ServerCommon) GetClientAddrs() []net.Addr {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]net.Addr, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v.ClientAddr)
}
return list
}
func (s *ServerCommon) GetSequenceEn() func(interface{}) ([]byte, error) {
return s.sequenceEn
}
func (s *ServerCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
s.sequenceEn = fn
}
func (s *ServerCommon) GetSequenceDe() func([]byte) (interface{}, error) {
return s.sequenceDe
}
func (s *ServerCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
s.sequenceDe = fn
} }
func (star *StarNotifyS) analyseData(msg string) (mode, key, value string) { func (s *ServerCommon) HeartbeatTimeoutSec() int64 {
slice := strings.SplitN(msg, "||", 3) return s.maxHeartbeatLostSeconds
return slice[0], star.trim(slice[1]), slice[2]
} }
// ServerStop 用于终止Server端运行 func (s *ServerCommon) SetHeartbeatTimeoutSec(sec int64) {
func (star *StarNotifyS) ServerStop() { s.maxHeartbeatLostSeconds = sec
star.cancel()
star.Stop <- 1
star.Stop <- 1
star.Stop <- 1
} }

@ -0,0 +1,49 @@
package notify
import (
"context"
"net"
"time"
)
type Server interface {
SetDefaultCommEncode(func([]byte, []byte) []byte)
SetDefaultCommDecode(func([]byte, []byte) []byte)
SetDefaultLink(func(message *Message))
SetLink(string, func(*Message))
send(c *ClientConn, msg TransferMsg) (WaitMsg, error)
sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error)
SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error)
SendObj(c *ClientConn, key string, val interface{}) error
Send(c *ClientConn, key string, value MsgVal) error
SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error)
SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error)
SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error)
Reply(m Message, value MsgVal) error
pushMessage([]byte, string)
removeClient(client *ClientConn)
Listen(network string, addr string) error
Stop() error
StopMonitorChan() <-chan struct{}
Status() Status
GetSecretKey() []byte
SetSecretKey(key []byte)
RsaPrivKey() []byte
SetRsaPrivKey([]byte)
GetClient(id string) *ClientConn
GetClientLists() []*ClientConn
GetClientAddrs() []net.Addr
GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error))
GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error))
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
HeartbeatTimeoutSec() int64
SetHeartbeatTimeoutSec(int64)
}

@ -0,0 +1,109 @@
package starnotify
import (
"b612.me/notify"
"errors"
"sync"
)
var (
cmu sync.RWMutex
smu sync.RWMutex
starClient map[string]notify.Client
starServer map[string]notify.Server
)
func init() {
starClient = make(map[string]notify.Client)
starServer = make(map[string]notify.Server)
}
func NewClient(key string) notify.Client {
client := notify.NewClient()
cmu.Lock()
starClient[key] = client
cmu.Unlock()
return client
}
func DeleteClient(key string) (err error) {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return errors.New("Not Exists Yet!")
}
if client.Status().Alive {
err = client.Stop()
}
client = nil
cmu.Lock()
delete(starClient, key)
cmu.Unlock()
return err
}
func NewServer(key string) notify.Server {
server := notify.NewServer()
smu.Lock()
starServer[key] = server
smu.Unlock()
return server
}
func DeleteServer(key string) error {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return errors.New("Not Exists Yet!")
}
if server.Status().Alive {
server.Stop()
}
server = nil
smu.Lock()
delete(starServer, key)
smu.Unlock()
return nil
}
func S(key string) notify.Server {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return nil
}
return server
}
func C(key string) notify.Client {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return nil
}
return client
}
func Server(key string) (notify.Server, error) {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return nil, errors.New("Not Exists Yet")
}
return server, nil
}
func Client(key string) (notify.Client, error) {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return nil, errors.New("Not Exists Yet")
}
return client, nil
}

@ -0,0 +1,152 @@
package notify
import (
"fmt"
"net"
//_ "net/http/pprof"
"sync/atomic"
"testing"
"time"
)
func Test_ServerTuAndClientCommon(t *testing.T) {
//go http.ListenAndServe("0.0.0.0:8888", nil)
noEn := func(key, bn []byte) []byte {
return bn
}
_ = noEn
server := NewServer()
//server.SetDefaultCommDecode(noEn)
//server.SetDefaultCommEncode(noEn)
err := server.Listen("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
server.SetLink("notify", notify)
for i := 1; i <= 100; i++ {
go func() {
client := NewClient()
//client.SetMsgEn(noEn)
//client.SetMsgDe(noEn)
//client.SetSkipExchangeKey(true)
err = client.Connect("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
time.Sleep(time.Second * 2)
return
}
//client.SetLink("notify", notify)
for {
//nowd = time.Now().UnixNano()
client.SendWait("notify", []byte("client hello"), time.Second*15)
//client.Send("notify", []byte("client hello"))
//time.Sleep(time.Millisecond)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
//client.Send("notify", []byte("client"))
}
}()
}
time.Sleep(time.Second)
go func() {
time.Sleep(time.Second * 10)
server.Stop()
}()
<-server.StopMonitorChan()
fmt.Println(count2)
}
var count2 int64
func notify(msg *Message) {
//fmt.Println(string(msg.Msg.Value))
//fmt.Println("called:", float64(time.Now().UnixNano()-nowd)/1000000)
if msg.NetType == NET_SERVER {
atomic.AddInt64(&count2, 1)
msg.Reply([]byte("server reply"))
}
}
func Test_normal(t *testing.T) {
server, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
}
go func() {
for {
conn, err := server.Accept()
if err == nil {
go func() {
for {
buf := make([]byte, 1024)
_, err := conn.Read(buf)
//fmt.Println("S RECV", string(buf[:i]))
atomic.AddInt64(&count2, 1)
if err == nil {
conn.Write([]byte("hello world server"))
}
}
}()
}
}
}()
time.Sleep(time.Second * 5)
for i := 1; i <= 100; i++ {
go func() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
for {
//nowd = time.Now().UnixNano()
_, err := conn.Write([]byte("hello world client"))
if err != nil {
fmt.Println(err)
}
buf := make([]byte, 1024)
conn.Read(buf)
continue
}
}()
}
time.Sleep(time.Second * 10)
fmt.Println(count2)
}
func Test_normal_udp(t *testing.T) {
ludp, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12345")
conn, _ := net.ListenUDP("udp", ludp)
go func() {
for {
buf := make([]byte, 1024)
_, addr, err := conn.ReadFromUDP(buf)
fmt.Println(time.Now(), "S RECV", addr.String())
atomic.AddInt64(&count2, 1)
if err == nil {
conn.WriteToUDP([]byte("hello world server"), addr)
}
}
}()
for i := 1; i <= 100; i++ {
go func() {
conn, err := net.Dial("udp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
for {
//nowd = time.Now().UnixNano()
_, err := conn.Write([]byte("hello world client"))
if err != nil {
fmt.Println(err)
}
buf := make([]byte, 1024)
conn.Read(buf)
fmt.Println(time.Now(), "C RECV")
continue
}
}()
}
time.Sleep(time.Second * 10)
fmt.Println(count2)
}
Loading…
Cancel
Save