Compare commits

..

No commits in common. 'master' and 'outdate' have entirely different histories.

@ -1,614 +1,272 @@
package notify package notify
import ( import (
"b612.me/starcrypto"
"b612.me/stario"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"net" "net"
"os" "strings"
"sync"
"sync/atomic"
"time" "time"
"b612.me/starainrt"
) )
type ClientCommon struct { // StarNotifyC 为Client端
alive atomic.Value type StarNotifyC struct {
status Status Connc net.Conn
byeFromServer bool clientSign map[string]chan string
conn net.Conn // FuncLists 当不使用channel时使用此记录调用函数
mu sync.Mutex FuncLists map[string]func(CMsg)
msgID uint64 stopSign context.Context
queue *stario.StarQueue cancel context.CancelFunc
stopFn context.CancelFunc defaultFunc func(CMsg)
stopCtx context.Context // Stop 停止信 号
parallelNum int Stop chan int
maxReadTimeout time.Duration // UseChannel 是否使用channel作为信息传递
maxWriteTimeout time.Duration UseChannel bool
keyExchangeFn func(c Client) error isUDP bool
linkFns map[string]func(message *Message) // Queue 是用来处理收发信息的简单消息队列
defaultFns func(message *Message) Queue *starainrt.StarQueue
msgEn func([]byte, []byte) []byte // Online 当前链接是否处于活跃状态
msgDe func([]byte, []byte) []byte Online bool
noFinSyncMsgPool sync.Map lockPool map[string]CMsg
handshakeRsaPubKey []byte }
SecretKey []byte
noFinSyncMsgMaxKeepSeconds int // CMsg 指明当前客户端被通知的关键字
lastHeartbeat int64 type CMsg struct {
heartbeatPeriod time.Duration Key string
wg stario.WaitGroup Value string
netType NetType mode string
showError bool wait chan int
skipKeyExchange bool }
useHeartBeat bool
sequenceDe func([]byte) (interface{}, error) func (star *StarNotifyC) starinitc() {
sequenceEn func(interface{}) ([]byte, error) star.stopSign, star.cancel = context.WithCancel(context.Background())
debugMode bool star.Queue = starainrt.NewQueue()
star.FuncLists = make(map[string]func(CMsg))
star.UseChannel = true
star.Stop = make(chan int, 5)
star.clientSign = make(map[string]chan string)
star.Online = false
star.lockPool = make(map[string]CMsg)
star.Queue.RestoreDuration(time.Second * 2)
}
// Notify 用于获取一个通知
func (star *StarNotifyC) Notify(key string) chan string {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
star.clientSign[key] = ch
}
return star.clientSign[key]
}
func (star *StarNotifyC) store(key, value string) {
if _, ok := star.clientSign[key]; !ok {
ch := make(chan string, 20)
ch <- value
star.clientSign[key] = ch
return
}
star.clientSign[key] <- value
} }
func (c *ClientCommon) Connect(network string, addr string) error { // NewNotifyC 用于新建一个Client端进程
if c.alive.Load().(bool) { func NewNotifyC(netype, value string) (*StarNotifyC, error) {
return errors.New("client already run") var err error
var star StarNotifyC
star.starinitc()
star.isUDP = false
if strings.Index(netype, "udp") >= 0 {
star.isUDP = true
} }
c.stopCtx, c.stopFn = context.WithCancel(context.Background()) star.Connc, err = net.Dial(netype, value)
c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32)
conn, err := net.Dial(network, addr)
if err != nil { if err != nil {
return err return nil, err
} }
c.alive.Store(true) go star.cnotify()
c.status.Alive = true go func() {
c.conn = conn <-star.stopSign.Done()
if c.useHeartBeat { star.Connc.Close()
go c.Heartbeat() star.Online = false
} return
return c.clientPostInit() }()
} go func() {
for {
func (c *ClientCommon) DebugMode(dmg bool) { buf := make([]byte, 8192)
c.mu.Lock() n, err := star.Connc.Read(buf)
c.debugMode = dmg if n != 0 {
c.mu.Unlock() star.Queue.ParseMessage(buf[0:n], star.Connc)
}
if err != nil {
star.Connc.Close()
star.ClientStop()
//star, _ = NewNotifyC(netype, value)
star.Online = false
return
}
}
}()
star.Online = true
return &star, nil
} }
func (c *ClientCommon) IsDebugMode() bool { // Send 用于向Server端发送数据
return c.debugMode func (star *StarNotifyC) Send(name string) error {
return star.SendValue(name, "")
} }
func (c *ClientCommon) ConnectTimeout(network string, addr string, timeout time.Duration) error { // SendValue 用于向Server端发送key-value类型数据
if c.alive.Load().(bool) { func (star *StarNotifyC) SendValue(name, value string) error {
return errors.New("client already run") var err error
} var key []byte
c.stopCtx, c.stopFn = context.WithCancel(context.Background()) for _, v := range []byte(name) {
c.queue = stario.NewQueueCtx(c.stopCtx, 4, math.MaxUint32) if v == byte(124) || v == byte(92) {
conn, err := net.DialTimeout(network, addr, timeout) key = append(key, byte(92))
if err != nil { }
return err key = append(key, v)
}
c.alive.Store(true)
c.status.Alive = true
c.conn = conn
if c.useHeartBeat {
go c.Heartbeat()
} }
return c.clientPostInit() _, err = star.Connc.Write(star.Queue.BuildMessage([]byte("pa" + "||" + string(key) + "||" + value)))
return err
} }
func (c *ClientCommon) monitorPool() { func (star *StarNotifyC) trim(name string) string {
for { var slash bool = false
select { var key []byte
case <-c.stopCtx.Done(): for _, v := range []byte(name) {
c.noFinSyncMsgPool.Range(func(k, v interface{}) bool { if v == byte(92) && !slash {
data := v.(WaitMsg) slash = true
close(data.Reply) continue
c.noFinSyncMsgPool.Delete(k)
return true
})
return
case <-time.After(time.Second * 30):
}
now := time.Now()
if c.noFinSyncMsgMaxKeepSeconds > 0 {
c.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
if data.Time.Add(time.Duration(c.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
close(data.Reply)
c.noFinSyncMsgPool.Delete(k)
}
return true
})
} }
slash = false
key = append(key, v)
} }
return string(key)
} }
func (c *ClientCommon) SkipExchangeKey() bool { // SendValueWait 用于向Server端发送key-value类型数据并等待结果返回此结果不会通过标准返回流程处理
return c.skipKeyExchange func (star *StarNotifyC) SendValueWait(name, value string, tmout time.Duration) (CMsg, error) {
} var err error
var tmceed <-chan time.Time
func (c *ClientCommon) SetSkipExchangeKey(val bool) { if star.UseChannel {
c.skipKeyExchange = val return CMsg{}, errors.New("Do Not Use UseChannel Mode!")
} }
rand.Seed(time.Now().UnixNano())
func (c *ClientCommon) clientPostInit() error { mode := "cr" + fmt.Sprintf("%05d", rand.Intn(99999))
go c.readMessage() var key []byte
go c.loadMessage() for _, v := range []byte(name) {
if !c.skipKeyExchange { if v == byte(124) || v == byte(92) {
err := c.keyExchangeFn(c) key = append(key, byte(92))
if err != nil {
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "key exchange failed",
Err: err,
}
c.mu.Unlock()
c.stopFn()
return err
} }
key = append(key, v)
} }
return nil _, err = star.Connc.Write(star.Queue.BuildMessage([]byte(mode + "||" + string(key) + "||" + value)))
} if err != nil {
func NewClient() Client { return CMsg{}, err
var client = ClientCommon{
maxReadTimeout: 0,
maxWriteTimeout: 0,
sequenceEn: encode,
sequenceDe: Decode,
keyExchangeFn: aesRsaHello,
SecretKey: defaultAesKey,
handshakeRsaPubKey: defaultRsaPubKey,
msgEn: defaultMsgEn,
msgDe: defaultMsgDe,
} }
client.alive.Store(false) if int64(tmout) > 0 {
//heartbeat should not controlable for user tmceed = time.After(tmout)
client.useHeartBeat = true
client.heartbeatPeriod = time.Second * 20
client.linkFns = make(map[string]func(*Message))
client.defaultFns = func(message *Message) {
return
} }
client.wg = stario.NewWaitGroup(0) var source CMsg
client.stopCtx, client.stopFn = context.WithCancel(context.Background()) source.wait = make(chan int, 2)
return &client star.lockPool[mode] = source
} select {
case <-source.wait:
func (c *ClientCommon) Heartbeat() { res := star.lockPool[mode]
failedCount := 0 delete(star.lockPool, mode)
for { return res, nil
select { case <-tmceed:
case <-c.stopCtx.Done(): return CMsg{}, errors.New("Time Exceed")
return }
case <-time.After(c.heartbeatPeriod): }
}
_, err := c.sendWait(TransferMsg{ // ReplyMsg 用于向Server端Reply信息
ID: 10000, func (star *StarNotifyC) ReplyMsg(data CMsg, name, value string) error {
Key: "heartbeat", var err error
Value: nil, var key []byte
Type: MSG_SYS_WAIT, for _, v := range []byte(name) {
}, time.Second*5) if v == byte(124) || v == byte(92) {
if err == nil { key = append(key, byte(92))
c.lastHeartbeat = time.Now().Unix()
failedCount = 0
}
if c.debugMode {
fmt.Println("failed to recv heartbeat,timeout!")
}
failedCount++
if failedCount >= 3 {
if c.debugMode {
fmt.Println("heatbeat failed more than 3 times,stop client")
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "heartbeat failed more than 3 times",
Err: errors.New("heartbeat failed more than 3 times"),
}
c.mu.Unlock()
c.stopFn()
return
} }
key = append(key, v)
} }
_, err = star.Connc.Write(star.Queue.BuildMessage([]byte(data.mode + "||" + string(key) + "||" + value)))
return err
} }
func (c *ClientCommon) ShowError(std bool) { func (star *StarNotifyC) cnotify() {
c.mu.Lock()
c.showError = std
c.mu.Unlock()
}
func (c *ClientCommon) readMessage() {
for { for {
select { select {
case <-c.stopCtx.Done(): case <-star.stopSign.Done():
c.conn.Close()
return return
default: default:
} }
data := make([]byte, 8192) data, err := star.Queue.RestoreOne()
if c.maxReadTimeout.Seconds() != 0 {
if err := c.conn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)); err != nil {
//TODO:ALERT
}
}
readNum, err := c.conn.Read(data)
if err == os.ErrDeadlineExceeded {
if readNum != 0 {
c.queue.ParseMessage(data[:readNum], "b612")
}
continue
}
if err != nil { if err != nil {
if c.showError || c.debugMode { time.Sleep(time.Millisecond * 20)
fmt.Println("client read error", err)
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "client read error",
Err: err,
}
c.mu.Unlock()
c.stopFn()
continue continue
} }
c.queue.ParseMessage(data[:readNum], "b612") if string(data.Msg) == "b612ryzstop" {
} star.ClientStop()
} star.Online = false
func (c *ClientCommon) sayGoodBye() error {
_, err := c.sendWait(TransferMsg{
ID: 10010,
Key: "bye",
Value: nil,
Type: MSG_SYS_WAIT,
}, time.Second*3)
return err
}
func (c *ClientCommon) loadMessage() {
for {
select {
case <-c.stopCtx.Done():
//say goodbye
if !c.byeFromServer {
c.sayGoodBye()
}
c.conn.Close()
return return
case data, ok := <-c.queue.RestoreChan(): }
if !ok { strs := strings.SplitN(string(data.Msg), "||", 3)
continue if len(strs) < 3 {
} continue
c.wg.Add(1) }
go func(data stario.MsgQueue) { strs[1] = star.trim(strs[1])
defer c.wg.Done() if star.UseChannel {
//fmt.Println("c received:", float64(time.Now().UnixNano()-nowd)/1000000) go star.store(strs[1], strs[2])
now := time.Now() } else {
//transfer to Msg mode, key, value := strs[0], strs[1], strs[2]
msg, err := c.sequenceDe(c.msgDe(c.SecretKey, data.Msg)) if mode[0:2] != "cr" {
if err != nil { if msg, ok := star.FuncLists[key]; ok {
if c.showError || c.debugMode { go msg(CMsg{key, value, mode, nil})
fmt.Println("client decode data error", err) } else {
if star.defaultFunc != nil {
go star.defaultFunc(CMsg{key, value, mode, nil})
} }
return
} }
message := Message{ } else {
ServerConn: c, if sa, ok := star.lockPool[mode]; ok {
TransferMsg: msg.(TransferMsg), sa.Key = key
NetType: NET_CLIENT, sa.Value = value
sa.mode = mode
star.lockPool[mode] = sa
sa.wait <- 1
} else {
if msg, ok := star.FuncLists[key]; ok {
go msg(CMsg{key, value, mode, nil})
} else {
if star.defaultFunc != nil {
go star.defaultFunc(CMsg{key, value, mode, nil})
}
}
} }
message.Time = now }
c.dispatchMsg(message)
}(data)
}
}
}
func (c *ClientCommon) dispatchMsg(message Message) {
switch message.TransferMsg.Type {
case MSG_SYS_WAIT:
fallthrough
case MSG_SYS:
c.sysMsg(message)
return
case MSG_KEY_CHANGE:
fallthrough
case MSG_SYS_REPLY:
fallthrough
case MSG_SYNC_REPLY:
data, ok := c.noFinSyncMsgPool.Load(message.ID)
if ok {
wait := data.(WaitMsg)
wait.Reply <- message
c.noFinSyncMsgPool.Delete(message.ID)
return
}
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
fn(&message)
}
fn, ok := c.linkFns[message.Key]
if ok {
callFn(fn)
}
if c.defaultFns != nil {
callFn(c.defaultFns)
}
}
func (c *ClientCommon) sysMsg(message Message) {
switch message.Key {
case "bye":
if message.TransferMsg.Type == MSG_SYS_WAIT {
//fmt.Println("recv stop signal from server")
c.byeFromServer = true
message.Reply(nil)
}
c.alive.Store(false)
c.mu.Lock()
c.status = Status{
Alive: false,
Reason: "recv stop signal from server",
Err: nil,
}
c.mu.Unlock()
c.stopFn()
}
}
func (c *ClientCommon) SetDefaultLink(fn func(message *Message)) {
c.defaultFns = fn
}
func (c *ClientCommon) SetLink(key string, fn func(*Message)) {
c.mu.Lock()
defer c.mu.Unlock()
c.linkFns[key] = fn
}
func (c *ClientCommon) send(msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&c.msgID, 1)
}
data, err := c.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = c.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
}
_, err = c.conn.Write(data)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_KEY_CHANGE || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
c.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
}
func (c *ClientCommon) Send(key string, value MsgVal) error {
_, err := c.send(TransferMsg{
Key: key,
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) sendWait(msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
select {
case <-time.After(timeout):
close(data.Reply)
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
}
func (c *ClientCommon) sendCtx(msg TransferMsg, ctx context.Context) (Message, error) {
data, err := c.send(msg)
if err != nil {
return Message{}, err
}
if ctx == nil {
ctx = context.Background()
}
select {
case <-ctx.Done():
close(data.Reply)
c.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-c.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
} }
return msg, nil
}
}
func (c *ClientCommon) SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error) {
data, err := c.sequenceEn(val)
if err != nil {
return Message{}, err
}
return c.sendCtx(TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendObj(key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = c.send(TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (c *ClientCommon) SendCtx(ctx context.Context, key string, value MsgVal) (Message, error) {
return c.sendCtx(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (c *ClientCommon) SendWait(key string, value MsgVal, timeout time.Duration) (Message, error) {
return c.sendWait(TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (c *ClientCommon) SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := c.sequenceEn(value)
if err != nil {
return Message{}, err
}
return c.SendWait(key, data, timeout)
}
func (c *ClientCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}
func (c *ClientCommon) ExchangeKey(newKey []byte) error {
pubKey, err := starcrypto.DecodeRsaPublicKey(c.handshakeRsaPubKey)
if err != nil {
return err
}
newSendKey, err := starcrypto.RSAEncrypt(pubKey, newKey)
if err != nil {
return err
}
data, err := c.sendWait(TransferMsg{
ID: 19961127,
Key: "sirius",
Value: newSendKey,
Type: MSG_KEY_CHANGE,
}, time.Second*10)
if err != nil {
return err
} }
if string(data.Value) != "success" {
return errors.New("cannot exchange new aes-key")
}
c.SecretKey = newKey
time.Sleep(time.Millisecond * 100)
return nil
}
func aesRsaHello(c Client) error {
newAesKey := []byte(fmt.Sprintf("%d%d%d%s", time.Now().UnixNano(), rand.Int63(), rand.Int63(), "b612.me"))
newAesKey = []byte(starcrypto.Md5Str(newAesKey))
return c.ExchangeKey(newAesKey)
}
func (c *ClientCommon) GetMsgEn() func([]byte, []byte) []byte {
return c.msgEn
}
func (c *ClientCommon) SetMsgEn(fn func([]byte, []byte) []byte) {
c.msgEn = fn
}
func (c *ClientCommon) GetMsgDe() func([]byte, []byte) []byte {
return c.msgDe
}
func (c *ClientCommon) SetMsgDe(fn func([]byte, []byte) []byte) {
c.msgDe = fn
}
func (c *ClientCommon) HeartbeatPeroid() time.Duration {
return c.heartbeatPeriod
}
func (c *ClientCommon) SetHeartbeatPeroid(duration time.Duration) {
c.heartbeatPeriod = duration
} }
func (c *ClientCommon) GetSecretKey() []byte { // ClientStop 终止client端运行
return c.SecretKey func (star *StarNotifyC) ClientStop() {
} if star.isUDP {
func (c *ClientCommon) SetSecretKey(key []byte) { star.Send("b612ryzstop")
c.SecretKey = key
}
func (c *ClientCommon) RsaPubKey() []byte {
return c.handshakeRsaPubKey
}
func (c *ClientCommon) SetRsaPubKey(key []byte) {
c.handshakeRsaPubKey = key
}
func (c *ClientCommon) Stop() error {
if !c.alive.Load().(bool) {
return nil
} }
c.alive.Store(false) star.cancel()
c.mu.Lock() star.Stop <- 1
c.status = Status{ star.Stop <- 1
Alive: false, star.Stop <- 1
Reason: "recv stop signal from user",
Err: nil,
}
c.mu.Unlock()
c.stopFn()
return nil
}
func (c *ClientCommon) StopMonitorChan() <-chan struct{} {
return c.stopCtx.Done()
} }
func (c *ClientCommon) Status() Status { // SetNotify 用于设置关键词的调用函数
return c.status func (star *StarNotifyC) SetNotify(name string, data func(CMsg)) {
star.FuncLists[name] = data
} }
func (c *ClientCommon) GetSequenceEn() func(interface{}) ([]byte, error) { // SetDefaultNotify 用于设置默认关键词的调用函数
return c.sequenceEn func (star *StarNotifyC) SetDefaultNotify(data func(CMsg)) {
} star.defaultFunc = data
func (c *ClientCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
c.sequenceEn = fn
}
func (c *ClientCommon) GetSequenceDe() func([]byte) (interface{}, error) {
return c.sequenceDe
}
func (c *ClientCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
c.sequenceDe = fn
} }

@ -0,0 +1,149 @@
package notify
import (
"fmt"
"testing"
"time"
)
func Test_usechannel(t *testing.T) {
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value)
if data.Value != "" {
data.Reply("nba")
return "nb"
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.Send("nihao")
client.SendValue("nihao", "lalala")
txt := <-client.Notify("nihao")
fmt.Println("client", txt)
txt = <-client.Notify("nihao")
fmt.Println("client", txt)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}
func Test_nochannel(t *testing.T) {
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value)
if data.Value != "" {
data.Reply("nbaz")
return ""
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.UseChannel = false
client.SetNotify("nihao", func(data CMsg) {
fmt.Println("client recv:", data.Key, data.Value)
if data.Value != "" {
time.Sleep(time.Millisecond * 900)
client.SendValue("nihao", "dsb")
}
})
client.SendValue("nihao", "lalala")
time.Sleep(time.Second * 3)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}
func Test_pipec(t *testing.T) {
server, err := NewNotifyS("tcp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("ni\\||hao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value, data.mode)
if data.Value != "" {
data.Reply("nba")
return ""
}
return ""
})
client, err := NewNotifyC("tcp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
client.UseChannel = false
sa, err := client.SendValueWait("ni\\||hao", "lalaeee", time.Second*10)
if err != nil {
fmt.Println(err)
return
}
fmt.Println(sa)
fmt.Println("sukidesu")
time.Sleep(time.Second * 3)
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 2)
}
func Test_pips(t *testing.T) {
var testmsg SMsg
server, err := NewNotifyS("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
server.SetNotify("nihao", func(data SMsg) string {
fmt.Println("server recv:", data.Key, data.Value, data.mode)
testmsg = data
if data.Value != "" {
data.Reply("nbaz")
return ""
}
return ""
})
client, err := NewNotifyC("udp", "127.0.0.1:1926")
if err != nil {
fmt.Println(err)
return
}
//time.Sleep(time.Second * 10)
client.UseChannel = false
client.SetNotify("nihao", func(data CMsg) {
fmt.Println("client recv:", data.Key, data.Value, data.mode)
if data.mode != "pa" {
time.Sleep(time.Millisecond * 1200)
client.ReplyMsg(data, "nihao", "dsb")
}
})
client.SendValue("nihao", "lalala")
time.Sleep(time.Second * 3)
fmt.Println(server.SendWait(testmsg, "nihao", "wozuinb", time.Second*20))
fmt.Println("sakura")
server.ServerStop()
<-client.Stop
client.ClientStop()
time.Sleep(time.Second * 3)
}

@ -1,51 +0,0 @@
package notify
import (
"context"
"time"
)
type Client interface {
SetDefaultLink(func(message *Message))
SetLink(string, func(*Message))
send(msg TransferMsg) (WaitMsg, error)
sendWait(msg TransferMsg, timeout time.Duration) (Message, error)
Send(key string, value MsgVal) error
SendWait(key string, value MsgVal, timeout time.Duration) (Message, error)
SendWaitObj(key string, value interface{}, timeout time.Duration) (Message, error)
SendCtx(ctx context.Context, key string, value MsgVal) (Message, error)
Reply(m Message, value MsgVal) error
ExchangeKey(newKey []byte) error
Connect(network string, addr string) error
ConnectTimeout(network string, addr string, timeout time.Duration) error
SkipExchangeKey() bool
SetSkipExchangeKey(bool)
GetMsgEn() func([]byte, []byte) []byte
SetMsgEn(func([]byte, []byte) []byte)
GetMsgDe() func([]byte, []byte) []byte
SetMsgDe(func([]byte, []byte) []byte)
Heartbeat()
HeartbeatPeroid() time.Duration
SetHeartbeatPeroid(duration time.Duration)
GetSecretKey() []byte
SetSecretKey(key []byte)
RsaPubKey() []byte
SetRsaPubKey([]byte)
Stop() error
StopMonitorChan() <-chan struct{}
Status() Status
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error))
GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error))
SendObjCtx(ctx context.Context, key string, val interface{}) (Message, error)
SendObj(key string, val interface{}) error
}

@ -1,97 +0,0 @@
package notify
import (
"b612.me/starcrypto"
"log"
)
var defaultRsaKey = []byte(`-----BEGIN RSA PRIVATE KEY-----
MIIJKAIBAAKCAgEAxmeMqr9yfJFKZn26oe/HvC7bZXNLC9Nk55AuTkb4XuIoqXDb
AJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT0ZCEf37ILU0G+scRzVwYHiLMwOUC
bS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6VQZdiUGJHbgX4MdOdtf/6TvxZMwSX
U+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU5lvoMdCGDqpcF2j2uO7oJJUww01M
3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/DSSjvqNZJZuWnxu+cDxE7J/sBvdp
eNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r7irllvb8WJPK/RIMif+4Rg7rDKFb
j8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN7TNTinCv092y39T8jIARJ7tpfePh
NBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlDhq+q8YLcnKHvNKYVyCf/upExpAiA
rr88y/KbeKes0KorKkwMBnGUMTothWM25wHozcurixNvP4UMWX7LWD7vOZZuNDQN
utZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SePUMJNDyjfDUJM8C2DOlyhGNPkgazO
GdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9GGoBHEfvmAoGGrk4qNbjm7JECAwEA
AQKCAgBYzHe05ELFZfG6tYMWf08R9pbTbSqlfFOpIGrZNgJr1SUF0TDzq+3bCXpF
qtn4VAw1en/JZkOV8Gp1+Bm6jWymWtwyg/fr7pG1I+vf0dwpgMHLg7P2UX1IjXmd
S4a4oEuds69hJ+OLZFsdm0ATeM7ssGicOaBmqd1Pz7rCfnL1bxQtNVzVex1r/paG
o77YNr3HoKCwhCPaPM4aQ7sOWSMUhwYBZabaYX0eLShf1O2pkexlPO+tobPpSLmx
WzRYZ6QC0AGEq9hwT6KsfCFA5pmQtFllNY7suhpL1AsECLWAgoMNCyb1oW68NBpq
CiBK5WBPGH2MW+pE74Pu1P0gen6kLGnApKQjprE1aGuR+xkZe3uEnXwSryU9TXki
wINTEMsX8dkmofFqaJhUwSubrb+t7gvv9E9ZZe0X6UgKzAVVqvh4z1pP8VT+xHpu
pW7SR8n9cFddaEPUijSb1rSpJrNzfJJ+G7yrB7Cw2kBgQ07vzD3z/3kA9cwFevLS
mv3l3OQuB6y9c+AG3cX5WGAt/BVOLjimj9qJt+YglG0SwG31U0PUnnx6QVz/UtJm
CbJQ2TpJd+mk0HyuMU+eycp7BWF3PMN+SE4QgKCKWnhsLeAd3gcvifsbLOYE1OPg
wv1tqyJy0VsJiSn6Ub6Qq0kPLwCLlQTnLWk5mIhnRpHYufTSwQKCAQEA4gS4FKPU
tAcQ82dEYW4OjGfhNWrjFpF+A8K5zufleQWcgzQ3fQho13zH0vZobukfkEVlVxla
OIVk7ZgNA4mCSFrATjIx3RMqzrAUvTte0O4wkjYgCwVvTdS1W8nvRLKgugLygyoo
r+MLW5IT3eNMK/2fZbftNlAkbc7NCo3c2tS6MXFgjx5JUuzChOY73Kp4p5KS38L5
wRRiI8KTIKjBjMZ5q/l8VLKX89bKOCaWibmItoXY6QMbIjargb7YLp3X6uGEyGIu
VhPbQ80/+OC2ZqIvDecp4PYnJNZFeqfjyfhJCNqDjBKYwIscBLMU/Wf9OY258OR4
snQaerN1M0h9lQKCAQEA4LkZIRLLw+8bIVM+7VXxFwOAGy+MH35tvuNIToItAoUh
zjL5LG34PjID8J0DPyP8VRVanak1EcxF0aTEkvnt2f2RAVsW89ytcn8Lybb12Ae8
ia2ZWuIM+J40nuKOGPs3lJ9HqdPWmZYWsWKxFJmYBBnwD6CADYqhqambQn0HeaYl
/WUD7blLYg+4Kk1mt9/hIw93jTWP/86O2H0ia+AhYPTqyvVXfIXKhat6NlOYksGf
Hdv+aCC8Ukg6FyEgiNc/rFn0MWPnEX+cM1AwubviHIBhV8QWILLBTjupwsEBZVah
60ftH+HRUCmEeOpI7jyzIlfEUNLoBHfswKMhMPtcDQKCAQEA0JFkQX+xn/PJW6PX
AUWrXTvbIg0hw8i9DcFa76klJBnehWDhN5tUDE5Uo8PJOVgdTWgMjWSS0geezHX8
xF/XfudoAIDnbMfsP9FTQhCQfaLf5XzW8vSv8pWwSiS9jJp+IUjo+8siwrR03aqe
dKr0tr+ToS0qVG1+QGqO4gdpX/LgYxHp9ggPx9s94aAIa6hQMOrcaGqnSNqDedZr
KL8x5LOewek3J32rJVP3Rfut/SfeFfjL4rKADoF+oPs4yUPVZSV4/+VCNyKZuyaj
uwm6qFlPrLe9+J+OHbsxYG+fj9hzpRzoOZFLrppwX5HWc8XLcpnrlXVwP9VOPh5u
r8VcRQKCAQAJFHGHfJLvH8Ig3pQ0UryjCWkrsAghXaJhjB1nzqqy514uTrDysp7N
JIg0OKPg8TtI1MwMgsG6Ll7D0bx/k8mgfTZWr6+FuuznK2r2g4X7bJSZm4IOwgN0
KDBIGy9SoxPj1Wu32O9a1U2lbS9qfao+wC2K9Bk4ctmFWW0Eiri6mZP/YQ1/lXUO
SURPsUDtPQaDvCRAeGGRHG95H9U8NpoiqMKz4KXgSiecrwkJGOeZRml/c1wcKPZy
/KgcNyJxZQEVnazYMgksE9Pj3uGZH5ZLQISuXyXlvFNDLfX2AIZl6dIxB371QtKK
QqMvn4fC2IEEajdsbJkjVRUj03OL3xwhAoIBAAfMhDSvBbDkGTaXnNMjPPSbswqK
qcSRhSG27mjs1dDNBKuFbz6TkIOp4nxjuS9Zp19fErXlAE9mF5yXSmuiAkZmWfhs
HKpWIdjFJK1EqSfcINe2YuoyUIulz9oG7ObRHD4D8jSPjA8Ete+XsBHGyOtUl09u
X4u9uClhqjK+r1Tno2vw5yF6ZxfQtdWuL4W0UL1S8E+VO7vjTjNOYvgjAIpAM/gW
sqjA2Qw52UZqhhLXoTfRvtJilxlXXhIRJSsnUoGiYVCQ/upjqJCClEvJfIWdGY/U
I2CbFrwJcNvOG1lUsSM55JUmbrSWVPfo7yq2k9GCuFxOy2n/SVlvlQUcNkA=
-----END RSA PRIVATE KEY-----`)
var defaultRsaPubKey = []byte(`-----BEGIN PUBLIC KEY-----
MIICIjANBgkqhkiG9w0BAQEFAAOCAg8AMIICCgKCAgEAxmeMqr9yfJFKZn26oe/H
vC7bZXNLC9Nk55AuTkb4XuIoqXDbAJD2Y/p167oJLKIqL3edcj7h+oTfn6s79vxT
0ZCEf37ILU0G+scRzVwYHiLMwOUCbS2o4Xor3zqUi9f1piJBvoBNh8RKKtsmJW6V
QZdiUGJHbgX4MdOdtf/6TvxZMwSXU+PRSCAjy04A31Zi7DEWUWJPyqmHeu++PxXU
5lvoMdCGDqpcF2j2uO7oJJUww01M3F5FtTElMrK4/P9gD4kP7NiPhOfVPEfBsYT/
DSSjvqNZJZuWnxu+cDxE7J/sBvdpeNRLhqzdmMYagZFuUmVrz8QmsD6jKHgydW+r
7irllvb8WJPK/RIMif+4Rg7rDKFbj8+ZQ3HZ/gKELoRSyb3zL6RC2qlGLjC1tdeN
7TNTinCv092y39T8jIARJ7tpfePhNBxsBdxfXbCAzHYZIHufI9Zlsc+felQwanlD
hq+q8YLcnKHvNKYVyCf/upExpAiArr88y/KbeKes0KorKkwMBnGUMTothWM25wHo
zcurixNvP4UMWX7LWD7vOZZuNDQNutZYeTwdsniI3mTO9vlPWEK8JTfxBU7x9SeP
UMJNDyjfDUJM8C2DOlyhGNPkgazOGdliH87tHkEy/7jJnGclgKmciiVPgwHfFx9G
GoBHEfvmAoGGrk4qNbjm7JECAwEAAQ==
-----END PUBLIC KEY-----`)
var defaultAesKey = []byte{0x19, 0x96, 0x11, 0x27, 228, 187, 187, 231, 142, 137, 230, 179, 189, 229, 184, 133}
func defaultMsgEn(key []byte, d []byte) []byte {
data, err := starcrypto.CustomEncryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func defaultMsgDe(key []byte, d []byte) []byte {
data, err := starcrypto.CustomDecryptAesCFB(d, key)
if err != nil {
log.Print(err)
return nil
}
return data
}
func init() {
RegisterName("b612.me/notify.Transfer", TransferMsg{})
}

@ -1,8 +0,0 @@
module b612.me/notify
go 1.16
require (
b612.me/starcrypto v0.0.5
b612.me/stario v0.0.10
)

@ -1,75 +0,0 @@
b612.me/starcrypto v0.0.5 h1:Aa4pRDO2lBH2Aw+vz8NuUtRb73J8z5aOa9SImBY5sq4=
b612.me/starcrypto v0.0.5/go.mod h1:pF5A16p8r/h1G0x7ZNmmAF6K1sdIMpbCUxn2WGC8gZ0=
b612.me/stario v0.0.0-20240818091810-d528a583f4b2 h1:SxN1WDZsEBQFTnLaKbc7Z+91uyWhUB4cKHo5Ucztyh0=
b612.me/stario v0.0.0-20240818091810-d528a583f4b2/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
b612.me/stario v0.0.10/go.mod h1:1Owmu9jzKWgs4VsmeI8YWlGwLrCwPNM/bYpxkyn+MMk=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.13.0/go.mod h1:y6Z2r+Rw4iayiXXAIxJIDAJ1zMW4yaTpebo8fPOliYc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.12.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.15.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.21.0/go.mod h1:bIjVDfnllIU7BJ2DNgfnXvpSvtn8VRwhlsaeUTyUS44=
golang.org/x/net v0.25.0/go.mod h1:JkAGAh7GEvH74S6FOH42FLoXpXbE/aqXSrIQjXgsiwM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.6.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/telemetry v0.0.0-20240228155512-f48c80bd79b2/go.mod h1:TeRTkGYfJXctD9OcfyVLyj2J3IxLnKwHJR8f4D8a3YE=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k=
golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo=
golang.org/x/term v0.12.0/go.mod h1:owVbMEjm3cBLCHdkQu9b1opXd4ETQWc3BhuQGKgXgvU=
golang.org/x/term v0.17.0/go.mod h1:lLRBjIVuehSbZlaOtGMbcMncT+aqLLLmKrsjNrUguwk=
golang.org/x/term v0.18.0/go.mod h1:ILwASektA3OnRv7amZ1xhE/KTR+u50pbXfZ03+6Nx58=
golang.org/x/term v0.20.0/go.mod h1:8UkIAJTvZgivsXaD6/pH6U9ecQzZ45awqEOzuCvwpFY=
golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU=
golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.15.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

502
msg.go

@ -1,502 +0,0 @@
package notify
import (
"b612.me/starcrypto"
"context"
"errors"
"fmt"
"net"
"os"
"reflect"
"sync/atomic"
"time"
)
const (
MSG_SYS MessageType = iota
MSG_SYS_WAIT
MSG_SYS_REPLY
MSG_KEY_CHANGE
MSG_ASYNC
MSG_SYNC_ASK
MSG_SYNC_REPLY
)
type MessageType uint8
type NetType uint8
const (
NET_SERVER NetType = iota
NET_CLIENT
)
type MsgVal []byte
type TransferMsg struct {
ID uint64
Key string
Value MsgVal
Type MessageType
}
type Message struct {
NetType
ClientConn *ClientConn
ServerConn Client
TransferMsg
Time time.Time
}
type WaitMsg struct {
TransferMsg
Time time.Time
Reply chan Message
//Ctx context.Context
}
func (m *Message) Reply(value MsgVal) (err error) {
reply := TransferMsg{
ID: m.ID,
Key: m.Key,
Value: value,
Type: m.Type,
}
if reply.Type == MSG_SYNC_ASK {
reply.Type = MSG_SYNC_REPLY
}
if reply.Type == MSG_SYS_WAIT {
reply.Type = MSG_SYS_REPLY
}
if m.NetType == NET_SERVER {
_, err = m.ClientConn.server.send(m.ClientConn, reply)
}
if m.NetType == NET_CLIENT {
_, err = m.ServerConn.send(reply)
}
return
}
func (m *Message) ReplyObj(value interface{}) (err error) {
data, err := encode(value)
if err != nil {
return err
}
return m.Reply(data)
}
type ClientConn struct {
alive atomic.Value
status Status
ClientID string
ClientAddr net.Addr
tuConn net.Conn
server Server
stopFn context.CancelFunc
stopCtx context.Context
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
msgEn func([]byte, []byte) []byte
msgDe func([]byte, []byte) []byte
handshakeRsaKey []byte
SecretKey []byte
lastHeartBeat int64
}
type Status struct {
Alive bool
Reason string
Err error
}
func (c *ClientConn) readTUMessage() {
for {
select {
case <-c.stopCtx.Done():
c.tuConn.Close()
c.server.removeClient(c)
return
default:
}
if c.maxReadTimeout.Seconds() > 0 {
c.tuConn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
}
data := make([]byte, 8192)
num, err := c.tuConn.Read(data)
if err == os.ErrDeadlineExceeded {
if num != 0 {
c.server.pushMessage(data[:num], c.ClientID)
}
continue
}
if err != nil {
//conn is broke
c.alive.Store(false)
c.status = Status{
Alive: false,
Reason: "read error",
Err: err,
}
c.stopFn()
continue
}
c.server.pushMessage(data[:num], c.ClientID)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
}
}
func (c *ClientConn) rsaDecode(message Message) {
privKey, err := starcrypto.DecodeRsaPrivateKey(c.handshakeRsaKey, "")
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
data, err := starcrypto.RSADecrypt(privKey, message.Value)
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
//fmt.Println("aes-key changed to", string(data))
message.Reply([]byte("success"))
c.SecretKey = data
}
func (c *ClientConn) sayGoodByeForTU() error {
_, err := c.server.sendWait(c, TransferMsg{
ID: 10010,
Key: "bye",
Value: nil,
Type: MSG_SYS_WAIT,
}, time.Second*3)
return err
}
func (c *ClientConn) GetSecretKey() []byte {
return c.SecretKey
}
func (c *ClientConn) SetSecretKey(key []byte) {
c.SecretKey = key
}
func (c *ClientConn) GetMsgEn() func([]byte, []byte) []byte {
return c.msgEn
}
func (c *ClientConn) SetMsgEn(fn func([]byte, []byte) []byte) {
c.msgEn = fn
}
func (c *ClientConn) GetMsgDe() func([]byte, []byte) []byte {
return c.msgDe
}
func (c *ClientConn) SetMsgDe(fn func([]byte, []byte) []byte) {
c.msgDe = fn
}
func (c *ClientConn) StopMonitorChan() <-chan struct{} {
return c.stopCtx.Done()
}
func (c *ClientConn) Status() Status {
return c.status
}
func (c *ClientConn) Server() Server {
return c.server
}
func (c *ClientConn) GetRemoteAddr() net.Addr {
return c.ClientAddr
}
func (m MsgVal) ToClearString() string {
return string(m)
}
func (m MsgVal) ToInterface() (interface{}, error) {
return Decode(m)
}
func (m MsgVal) MustToInterface() interface{} {
inf, err := m.ToInterface()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToString() (string, error) {
inf, err := m.ToInterface()
if err != nil {
return "", err
}
if data, ok := inf.(string); !ok {
return "", errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToString() string {
inf, err := m.ToString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt32() (int32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt32() int32 {
inf, err := m.ToInt32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt() (int, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt() int {
inf, err := m.ToInt()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint64() (uint64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint64() uint64 {
inf, err := m.ToUint64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint32() (uint32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint32() uint32 {
inf, err := m.ToUint32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint() (uint, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint() uint {
inf, err := m.ToUint()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToBool() (bool, error) {
inf, err := m.ToInterface()
if err != nil {
return false, err
}
if data, ok := inf.(bool); !ok {
return false, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToBool() bool {
inf, err := m.ToBool()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat64() (float64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat64() float64 {
inf, err := m.ToFloat64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat32() (float32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat32() float32 {
inf, err := m.ToFloat32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceString() ([]string, error) {
inf, err := m.ToInterface()
if err != nil {
return []string{}, err
}
if data, ok := inf.([]string); !ok {
return []string{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceString() []string {
inf, err := m.ToSliceString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceInt64() ([]int64, error) {
inf, err := m.ToInterface()
if err != nil {
return []int64{}, err
}
if data, ok := inf.([]int64); !ok {
return []int64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceInt64() []int64 {
inf, err := m.ToSliceInt64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceFloat64() ([]float64, error) {
inf, err := m.ToInterface()
if err != nil {
return []float64{}, err
}
if data, ok := inf.([]float64); !ok {
return []float64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceFloat64() []float64 {
inf, err := m.ToSliceFloat64()
if err != nil {
panic(err)
}
return inf
}
func ToMsgVal(val interface{}) (MsgVal, error) {
return Encode(val)
}
func MustToMsgVal(val interface{}) MsgVal {
d, err := ToMsgVal(val)
if err != nil {
panic(err)
}
return d
}
func (m MsgVal) Orm(stu interface{}) error {
inf, err := m.ToInterface()
if err != nil {
return err
}
t := reflect.TypeOf(stu)
if t.Kind() != reflect.Ptr {
return errors.New("interface not writable(pointer wanted)")
}
if !reflect.ValueOf(stu).Elem().CanSet() {
return errors.New("interface not writable")
}
it := reflect.TypeOf(inf)
if t.Elem().Kind() != it.Kind() {
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
}
if t.Elem().Name() != it.Name() {
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
}
if t.Elem().String() != it.String() {
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
}
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
return nil
}

@ -1,96 +0,0 @@
package notify
import (
"fmt"
"testing"
"time"
)
func TestMsgEnDeCode(t *testing.T) {
Register(HelloMessage{})
Register(Error{})
go ServerRun(time.Second * 30)
time.Sleep(time.Second * 2)
ClientRun(time.Second * 35)
}
type Error struct {
Msg string
}
func (e Error) Error() string {
return e.Msg
}
type WorldMessage struct {
Port int
MyCode string
MyInfo []int
Err error
}
type HelloMessage struct {
ID string
MyMap map[string]string
MySlice []int
World WorldMessage
}
func ClientRun(stopTime time.Duration) {
c := NewClient()
err := c.Connect("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
c.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from server,struct detail is %+v\n", hi)
})
timer := time.NewTimer(stopTime)
for {
select {
case <-timer.C:
c.Stop()
return
case <-time.After(time.Second * 2):
fmt.Println("client msg sent", c.SendObj("msg", HelloMessage{
ID: "client",
MyMap: map[string]string{"hello": "world"},
MySlice: []int{int(time.Now().Unix())},
World: WorldMessage{
Port: 520,
MyCode: "b612",
MyInfo: []int{0, 1, 2, 3},
Err: Error{Msg: "Hello World"},
},
}))
}
}
}
func ServerRun(stopTime time.Duration) {
s := NewServer()
err := s.Listen("tcp", "127.0.0.1:23456")
if err != nil {
panic(err)
}
s.SetLink("msg", func(msg *Message) {
var hi HelloMessage
err := msg.Value.Orm(&hi)
if err != nil {
panic(err)
}
fmt.Printf("recv info from client:%v,struct detail is %+v\n", msg.ClientConn.GetRemoteAddr(), hi)
hi.ID = "server"
hi.World.Port = 666
hi.MySlice = append(hi.MySlice, 1, 1, 2, 7)
msg.ReplyObj(hi)
})
<-time.After(stopTime)
s.Stop()
}

@ -1,44 +0,0 @@
package notify
import (
"bytes"
"encoding/gob"
)
func Register(data interface{}) {
gob.Register(data)
}
func RegisterName(name string, data interface{}) {
gob.RegisterName(name, data)
}
func RegisterAll(data []interface{}) {
for _, v := range data {
gob.Register(v)
}
}
func RegisterNames(data map[string]interface{}) {
for k, v := range data {
gob.RegisterName(k, v)
}
}
func encode(src interface{}) ([]byte, error) {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(&src)
return buf.Bytes(), err
}
func Encode(src interface{}) ([]byte, error) {
return encode(src)
}
func Decode(src []byte) (interface{}, error) {
dec := gob.NewDecoder(bytes.NewReader(src))
var dst interface{}
err := dec.Decode(&dst)
return dst, err
}

@ -1,693 +1,369 @@
// Package notify is a package which provide common tcp/udp/unix socket service
package notify package notify
import ( import (
"b612.me/stario"
"context" "context"
"errors" "errors"
"fmt" "fmt"
"math"
"math/rand" "math/rand"
"net" "net"
"os"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
)
type ServerCommon struct { "b612.me/starainrt"
msgID uint64 )
alive atomic.Value
status Status
listener net.Listener
udpListener *net.UDPConn
queue *stario.StarQueue
stopFn context.CancelFunc
stopCtx context.Context
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
parallelNum int
wg stario.WaitGroup
clientPool map[string]*ClientConn
mu sync.RWMutex
handshakeRsaKey []byte
SecretKey []byte
defaultMsgEn func([]byte, []byte) []byte
defaultMsgDe func([]byte, []byte) []byte
linkFns map[string]func(message *Message)
defaultFns func(message *Message)
noFinSyncMsgPool sync.Map
noFinSyncMsgMaxKeepSeconds int64
maxHeartbeatLostSeconds int64
sequenceDe func([]byte) (interface{}, error)
sequenceEn func(interface{}) ([]byte, error)
showError bool
debugMode bool
}
func NewServer() Server { var builder *starainrt.StarQueue
var server ServerCommon
server.wg = stario.NewWaitGroup(0) func init() {
server.parallelNum = 0 builder = starainrt.NewQueue()
server.noFinSyncMsgMaxKeepSeconds = 0 }
server.maxHeartbeatLostSeconds = 300
server.stopCtx, server.stopFn = context.WithCancel(context.Background()) // StarNotifyS 为Server端
server.SecretKey = defaultAesKey type StarNotifyS struct {
server.handshakeRsaKey = defaultRsaKey // Queue 是用来处理收发信息的简单消息队列
server.clientPool = make(map[string]*ClientConn) Queue *starainrt.StarQueue
server.defaultMsgEn = defaultMsgEn // FuncLists 记录了被通知项所记录的函数
server.defaultMsgDe = defaultMsgDe FuncLists map[string]func(SMsg) string
server.sequenceEn = encode defaultFunc func(SMsg) string
server.sequenceDe = Decode Connected func(SMsg) string
server.alive.Store(false) stopSign context.Context
server.linkFns = make(map[string]func(*Message)) cancel context.CancelFunc
server.defaultFns = func(message *Message) { connPool map[string]net.Conn
return lockPool map[string]SMsg
udpPool map[string]*net.UDPAddr
isUDP bool
// Stop 停止信 号
Stop chan int
// UDPConn UDP监听
UDPConn *net.UDPConn
// Online 当前链接是否处于活跃状态
Online bool
// ReadDeadline tcp/unix中读超时设置,udp请直接调用UDPConn
ReadDeadline time.Time
// WriteDeadline tcp/unix中写超时设置,udp请直接调用UDPConn
WriteDeadline time.Time
// Deadline tcp/unix中超时设置,udp请直接调用UDPConn
Deadline time.Time
}
// SMsg 指明当前服务端被通知的关键字
type SMsg struct {
Conn net.Conn
Key string
Value string
UDP *net.UDPAddr
uconn *net.UDPConn
mode string
wait chan int
}
// GetConnPool 获取所有Client端信息
func (star *StarNotifyS) GetConnPool() []SMsg {
var result []SMsg
for _, v := range star.connPool {
result = append(result, SMsg{Conn: v, mode: "pa"})
}
for _, v := range star.udpPool {
result = append(result, SMsg{UDP: v, uconn: star.UDPConn, mode: "pa0"})
}
return result
}
func (nmsg *SMsg) addSlash(name string) string {
var key []byte
for _, v := range []byte(name) {
if v == byte(124) || v == byte(92) {
key = append(key, byte(92))
}
key = append(key, v)
} }
return &server return string(key)
}
func (s *ServerCommon) DebugMode(dmg bool) {
s.mu.Lock()
s.debugMode = dmg
s.mu.Unlock()
} }
func (s *ServerCommon) IsDebugMode() bool { // Reply 用于向client端回复数据
s.mu.RLock() func (nmsg *SMsg) Reply(msg string) error {
defer s.mu.RUnlock() var err error
return s.debugMode if nmsg.uconn == nil {
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte(nmsg.mode + "||" + nmsg.addSlash(nmsg.Key) + "||" + msg)))
} else {
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte(nmsg.mode+"||"+nmsg.addSlash(nmsg.Key)+"||"+msg)), nmsg.UDP)
}
return err
} }
func (s *ServerCommon) ShowError(std bool) { // Send 用于向client端发送key-value数据
s.mu.Lock() func (nmsg *SMsg) Send(key, value string) error {
s.showError = std var err error
s.mu.Unlock() if nmsg.uconn == nil {
_, err = nmsg.Conn.Write(builder.BuildMessage([]byte("pa||" + nmsg.addSlash(key) + "||" + value)))
} else {
_, err = nmsg.uconn.WriteToUDP(builder.BuildMessage([]byte("pa||"+nmsg.addSlash(key)+"||"+value)), nmsg.UDP)
}
return err
} }
func (s *ServerCommon) Stop() error { // SendWait 用于向client端发送key-value数据并等待
if !s.alive.Load().(bool) { func (star *StarNotifyS) SendWait(source SMsg, key, value string, tmout time.Duration) (SMsg, error) {
return nil var err error
var tmceed <-chan time.Time
rand.Seed(time.Now().UnixNano())
mode := "sr" + fmt.Sprintf("%05d", rand.Intn(99999))
if source.uconn == nil {
_, err = source.Conn.Write(builder.BuildMessage([]byte(mode + "||" + source.addSlash(key) + "||" + value)))
} else {
_, err = source.uconn.WriteToUDP(builder.BuildMessage([]byte(mode+"||"+source.addSlash(key)+"||"+value)), source.UDP)
} }
s.alive.Store(false) if err != nil {
s.mu.Lock() return SMsg{}, err
s.status = Status{
Alive: false,
Reason: "recv stop signal from user",
Err: nil,
} }
s.mu.Unlock() if int64(tmout) > 0 {
s.stopFn() tmceed = time.After(tmout)
return nil
}
func (s *ServerCommon) Listen(network string, addr string) error {
if s.alive.Load().(bool) {
return errors.New("server already run")
} }
s.stopCtx, s.stopFn = context.WithCancel(context.Background()) source.wait = make(chan int, 2)
s.queue = stario.NewQueueCtx(s.stopCtx, 128, math.MaxUint32) star.lockPool[mode] = source
if strings.Contains(strings.ToLower(network), "udp") { select {
return s.ListenUDP(network, addr) case <-source.wait:
res := star.lockPool[mode]
delete(star.lockPool, mode)
return res, nil
case <-tmceed:
return SMsg{}, errors.New("Time Exceed")
} }
return s.ListenTU(network, addr)
} }
func (s *ServerCommon) ListenTU(network string, addr string) error { func (star *StarNotifyS) starinits() {
listener, err := net.Listen(network, addr) star.stopSign, star.cancel = context.WithCancel(context.Background())
if err != nil { star.Queue = starainrt.NewQueue()
return err star.udpPool = make(map[string]*net.UDPAddr)
} star.FuncLists = make(map[string]func(SMsg) string)
s.alive.Store(true) star.connPool = make(map[string]net.Conn)
s.status.Alive = true star.lockPool = make(map[string]SMsg)
s.listener = listener star.Stop = make(chan int, 5)
go s.accept() star.Online = false
go s.monitorPool() star.Queue.RestoreDuration(time.Second * 2)
go s.loadMessage()
return nil
} }
func (s *ServerCommon) monitorPool() { // NewNotifyS 开启一个新的Server端通知
for { func NewNotifyS(netype, value string) (*StarNotifyS, error) {
select { if netype[0:3] != "udp" {
case <-s.stopCtx.Done(): return notudps(netype, value)
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
return true
})
return
case <-time.After(time.Second * 30):
}
now := time.Now()
if s.noFinSyncMsgMaxKeepSeconds > 0 {
s.noFinSyncMsgPool.Range(func(k, v interface{}) bool {
data := v.(WaitMsg)
if data.Time.Add(time.Duration(s.noFinSyncMsgMaxKeepSeconds) * time.Second).Before(now) {
close(data.Reply)
s.noFinSyncMsgPool.Delete(k)
}
return true
})
}
if s.maxHeartbeatLostSeconds != 0 {
for _, v := range s.clientPool {
if now.Unix()-v.lastHeartBeat > s.maxHeartbeatLostSeconds {
v.stopFn()
s.removeClient(v)
}
}
}
} }
return doudps(netype, value)
} }
func (s *ServerCommon) SetDefaultCommEncode(fn func([]byte, []byte) []byte) { func doudps(netype, value string) (*StarNotifyS, error) {
s.defaultMsgEn = fn var star StarNotifyS
} star.starinits()
star.isUDP = true
func (s *ServerCommon) SetDefaultCommDecode(fn func([]byte, []byte) []byte) { udpaddr, err := net.ResolveUDPAddr(netype, value)
s.defaultMsgDe = fn if err != nil {
} return nil, err
func (s *ServerCommon) SetDefaultLink(fn func(message *Message)) {
s.defaultFns = fn
}
func (s *ServerCommon) SetLink(key string, fn func(*Message)) {
s.mu.Lock()
defer s.mu.Unlock()
s.linkFns[key] = fn
}
func (s *ServerCommon) pushMessage(data []byte, source string) {
s.queue.ParseMessage(data, source)
}
func (s *ServerCommon) removeClient(client *ClientConn) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.clientPool, client.ClientID)
}
func (s *ServerCommon) accept() {
if s.udpListener != nil {
s.acceptUDP()
} }
s.acceptTU() star.UDPConn, err = net.ListenUDP(netype, udpaddr)
} if err != nil {
func (s *ServerCommon) acceptTU() { return nil, err
for { }
select { go star.notify()
case <-s.stopCtx.Done(): go func() {
if s.debugMode { <-star.stopSign.Done()
fmt.Println("accept goroutine recv exit signal,exit") for k, v := range star.udpPool {
} star.UDPConn.WriteToUDP(star.Queue.BuildMessage([]byte("b612ryzstop")), v)
return delete(star.connPool, k)
default:
}
conn, err := s.listener.Accept()
if err != nil {
if s.showError || s.debugMode {
fmt.Println("error accept:", err)
}
continue
}
if s.debugMode {
fmt.Println("accept new connection from", conn.RemoteAddr())
} }
var id string star.UDPConn.Close()
star.Online = false
return
}()
go func() {
for { for {
id = fmt.Sprintf("%s%d%d", conn.RemoteAddr().String(), time.Now().UnixNano(), rand.Int63()) buf := make([]byte, 8192)
s.mu.RLock() n, addr, err := star.UDPConn.ReadFromUDP(buf)
if _, ok := s.clientPool[id]; ok { if n != 0 {
s.mu.RUnlock() star.Queue.ParseMessage(buf[0:n], addr)
if _, ok := star.udpPool[addr.String()]; !ok {
if star.Connected != nil {
go star.Connected(SMsg{UDP: addr, uconn: star.UDPConn})
}
}
star.udpPool[addr.String()] = addr
}
if err != nil {
continue continue
} }
s.mu.RUnlock()
break
}
client := ClientConn{
ClientID: id,
ClientAddr: conn.RemoteAddr(),
tuConn: conn,
server: s,
maxReadTimeout: s.maxReadTimeout,
maxWriteTimeout: s.maxWriteTimeout,
SecretKey: s.SecretKey,
handshakeRsaKey: s.handshakeRsaKey,
msgEn: s.defaultMsgEn,
msgDe: s.defaultMsgDe,
lastHeartBeat: time.Now().Unix(),
} }
client.alive.Store(true) }()
client.status = Status{ star.Online = true
Alive: true, return &star, nil
Reason: "",
Err: nil,
}
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
go client.readTUMessage()
}
} }
func (s *ServerCommon) loadMessage() { func notudps(netype, value string) (*StarNotifyS, error) {
for { var star StarNotifyS
select { star.starinits()
case <-s.stopCtx.Done(): star.isUDP = false
var wg sync.WaitGroup listener, err := net.Listen(netype, value)
s.mu.RLock() if err != nil {
for _, v := range s.clientPool { return nil, err
wg.Add(1) }
go func(v *ClientConn) { go star.notify()
defer wg.Done() go func() {
v.sayGoodByeForTU() <-star.stopSign.Done()
v.alive.Store(false) for k, v := range star.connPool {
v.status = Status{ v.Close()
Alive: false, delete(star.connPool, k)
Reason: "recv stop signal from server", }
Err: nil, listener.Close()
} star.Online = false
v.stopFn() return
s.removeClient(v) }()
}(v) go func() {
for {
conn, err := listener.Accept()
if err != nil {
select {
case <-star.stopSign.Done():
listener.Close()
return
default:
continue
}
} }
s.mu.RUnlock() if !star.ReadDeadline.IsZero() {
select { conn.SetReadDeadline(star.ReadDeadline)
case <-time.After(time.Second * 8):
case <-stario.WaitUntilFinished(func() error {
wg.Wait()
return nil
}):
} }
if s.listener != nil { if !star.WriteDeadline.IsZero() {
s.listener.Close() conn.SetWriteDeadline(star.WriteDeadline)
} }
s.wg.Wait() if !star.Deadline.IsZero() {
return conn.SetDeadline(star.Deadline)
case data, ok := <-s.queue.RestoreChan():
if !ok {
continue
} }
s.wg.Add(1) go func(conn net.Conn) {
go func(data stario.MsgQueue) { for {
s.mu.RLock() buf := make([]byte, 8192)
cc, ok := s.clientPool[data.Conn.(string)] n, err := conn.Read(buf)
s.mu.RUnlock() if n != 0 {
if !ok { star.Queue.ParseMessage(buf[0:n], conn)
return }
} if err != nil {
//fmt.Println("received:", float64(time.Now().UnixNano()-nowd)/1000000) conn.Close()
msg, err := s.sequenceDe(cc.msgDe(cc.SecretKey, data.Msg)) delete(star.connPool, conn.RemoteAddr().String())
if err != nil { break
if s.showError || s.debugMode {
fmt.Println("server decode data error", err)
} }
return
}
//fmt.Println("decoded:", float64(time.Now().UnixNano()-nowd)/1000000)
message := Message{
NetType: NET_SERVER,
ClientConn: cc,
TransferMsg: msg.(TransferMsg),
} }
message.Time = time.Now() }(conn)
star.connPool[conn.RemoteAddr().String()] = conn
//fmt.Println("dispatch:", float64(time.Now().UnixNano()-nowd)/1000000) if star.Connected != nil {
s.dispatchMsg(message) go star.Connected(SMsg{Conn: conn})
}(data) }
}
}
}
func (s *ServerCommon) sysMsg(message Message) {
switch message.Key {
case "bye":
//fmt.Println("recv stop signal from client", message.ClientConn.ClientID)
if message.TransferMsg.Type == MSG_SYS_WAIT {
message.Reply(nil)
}
message.ClientConn.alive.Store(false)
message.ClientConn.status = Status{
Alive: false,
Reason: "recv stop signal from client",
Err: nil,
}
message.ClientConn.stopFn()
case "heartbeat":
message.ClientConn.lastHeartBeat = time.Now().Unix()
message.Reply(nil)
}
}
func (s *ServerCommon) dispatchMsg(message Message) {
defer s.wg.Done()
switch message.TransferMsg.Type {
case MSG_SYS_WAIT:
fallthrough
case MSG_SYS:
s.sysMsg(message)
return
case MSG_KEY_CHANGE:
message.ClientConn.rsaDecode(message)
return
case MSG_SYS_REPLY:
fallthrough
case MSG_SYNC_REPLY:
data, ok := s.noFinSyncMsgPool.Load(message.TransferMsg.ID)
if ok {
wait := data.(WaitMsg)
wait.Reply <- message
s.noFinSyncMsgPool.Delete(message.TransferMsg.ID)
return
}
//just throw
//return
fallthrough
default:
}
callFn := func(fn func(*Message)) {
fn(&message)
}
fn, ok := s.linkFns[message.TransferMsg.Key]
if ok {
callFn(fn)
}
if s.defaultFns != nil {
callFn(s.defaultFns)
}
}
func (s *ServerCommon) send(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
if s.udpListener != nil {
return s.sendUDP(c, msg)
}
return s.sendTU(c, msg)
}
func (s *ServerCommon) sendTU(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = atomic.AddUint64(&s.msgID, 1)
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
if err := c.tuConn.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout)); err != nil {
return WaitMsg{}, err
} }
} }()
_, err = c.tuConn.Write(data) star.Online = true
//fmt.Println("resend:", float64(time.Now().UnixNano()-nowd)/1000000) return &star, nil
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
}
return wait, err
} }
func (s *ServerCommon) Send(c *ClientConn, key string, value MsgVal) error { // SetNotify 用于设置通知关键词的调用函数
_, err := s.send(c, TransferMsg{ func (star *StarNotifyS) SetNotify(name string, data func(SMsg) string) {
Key: key, star.FuncLists[name] = data
Value: value,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error) {
data, err := s.send(c, msg)
if err != nil {
return Message{}, err
}
if timeout.Seconds() == 0 {
msg, ok := <-data.Reply
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
select {
case <-time.After(timeout):
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrDeadlineExceeded
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
}
return msg, nil
}
} }
func (s *ServerCommon) SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error) { // SetDefaultNotify 用于设置默认关键词的调用函数
return s.sendCtx(c, TransferMsg{ func (star *StarNotifyS) SetDefaultNotify(data func(SMsg) string) {
Key: key, star.defaultFunc = data
Value: value,
Type: MSG_SYNC_ASK,
}, ctx)
} }
func (s *ServerCommon) sendCtx(c *ClientConn, msg TransferMsg, ctx context.Context) (Message, error) { func (star *StarNotifyS) trim(name string) string {
data, err := s.send(c, msg) var slash bool = false
if err != nil { var key []byte
return Message{}, err for _, v := range []byte(name) {
} if v == byte(92) && !slash {
if ctx == nil { slash = true
ctx = context.Background() continue
}
select {
case <-ctx.Done():
close(data.Reply)
s.noFinSyncMsgPool.Delete(data.TransferMsg.ID)
return Message{}, os.ErrClosed
case <-s.stopCtx.Done():
return Message{}, errors.New("service shutdown")
case msg, ok := <-data.Reply:
if !ok {
return msg, os.ErrInvalid
} }
return msg, nil slash = false
} key = append(key, v)
}
func (s *ServerCommon) SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error) {
return s.sendWait(c, TransferMsg{
Key: key,
Value: value,
Type: MSG_SYNC_ASK,
}, timeout)
}
func (s *ServerCommon) SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error) {
data, err := s.sequenceEn(value)
if err != nil {
return Message{}, err
}
return s.SendWait(c, key, data, timeout)
}
func (s *ServerCommon) SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error) {
data, err := s.sequenceEn(val)
if err != nil {
return Message{}, err
}
return s.sendCtx(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_SYNC_ASK,
}, ctx)
}
func (s *ServerCommon) SendObj(c *ClientConn, key string, val interface{}) error {
data, err := encode(val)
if err != nil {
return err
}
_, err = s.send(c, TransferMsg{
Key: key,
Value: data,
Type: MSG_ASYNC,
})
return err
}
func (s *ServerCommon) Reply(m Message, value MsgVal) error {
return m.Reply(value)
}
//for udp below
func (s *ServerCommon) ListenUDP(network string, addr string) error {
udpAddr, err := net.ResolveUDPAddr(network, addr)
if err != nil {
return err
}
listener, err := net.ListenUDP(network, udpAddr)
if err != nil {
return err
} }
s.alive.Store(true) return string(key)
s.status.Alive = true
s.udpListener = listener
go s.accept()
go s.monitorPool()
go s.loadMessage()
return nil
} }
func (s *ServerCommon) acceptUDP() { func (star *StarNotifyS) notify() {
for { for {
select { select {
case <-s.stopCtx.Done(): case <-star.stopSign.Done():
if s.debugMode {
fmt.Println("accept goroutine recv exit signal,exit")
}
return return
default: default:
} }
if s.maxReadTimeout.Seconds() > 0 { data, err := star.Queue.RestoreOne()
s.udpListener.SetReadDeadline(time.Now().Add(s.maxReadTimeout)) if err != nil {
} time.Sleep(time.Millisecond * 20)
data := make([]byte, 4096) continue
num, addr, err := s.udpListener.ReadFromUDP(data)
id := addr.String()
if s.debugMode {
fmt.Println("accept new udp message from", id)
} }
//fmt.Println("s recv udp:", float64(time.Now().UnixNano()-nowd)/1000000) mode, key, value := star.analyseData(string(data.Msg))
s.mu.RLock() var rmsg SMsg
if _, ok := s.clientPool[id]; !ok { if !star.isUDP {
s.mu.RUnlock() rmsg = SMsg{data.Conn.(net.Conn), key, value, nil, nil, mode, nil}
client := ClientConn{
ClientID: id,
ClientAddr: addr,
server: s,
maxReadTimeout: s.maxReadTimeout,
maxWriteTimeout: s.maxWriteTimeout,
SecretKey: s.SecretKey,
handshakeRsaKey: s.handshakeRsaKey,
msgEn: s.defaultMsgEn,
msgDe: s.defaultMsgDe,
lastHeartBeat: time.Now().Unix(),
}
client.stopCtx, client.stopFn = context.WithCancel(context.Background())
s.mu.Lock()
s.clientPool[id] = &client
s.mu.Unlock()
} else { } else {
s.mu.RUnlock() rmsg = SMsg{nil, key, value, data.Conn.(*net.UDPAddr), star.UDPConn, mode, nil}
} if key == "b612ryzstop" {
if err == os.ErrDeadlineExceeded { delete(star.udpPool, rmsg.UDP.String())
if num != 0 { continue
s.pushMessage(data[:num], id)
} }
continue
} }
if err != nil { if mode[0:2] != "sr" {
continue go func() {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}()
} else {
if sa, ok := star.lockPool[mode]; ok {
rmsg.wait = sa.wait
star.lockPool[mode] = rmsg
star.lockPool[mode].wait <- 1
} else {
go func() {
if msg, ok := star.FuncLists[key]; ok {
sdata := msg(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
} else {
if star.defaultFunc != nil {
sdata := star.defaultFunc(rmsg)
if sdata == "" {
return
}
rmsg.Reply(sdata)
}
}
}()
}
} }
s.pushMessage(data[:num], id)
}
}
func (s *ServerCommon) sendUDP(c *ClientConn, msg TransferMsg) (WaitMsg, error) {
var wait WaitMsg
if msg.Type != MSG_SYNC_REPLY && msg.Type != MSG_KEY_CHANGE && msg.Type != MSG_SYS_REPLY || msg.ID == 0 {
msg.ID = uint64(time.Now().UnixNano()) + rand.Uint64() + rand.Uint64()
}
data, err := s.sequenceEn(msg)
if err != nil {
return WaitMsg{}, err
}
data = c.msgEn(c.SecretKey, data)
data = s.queue.BuildMessage(data)
if c.maxWriteTimeout.Seconds() != 0 {
s.udpListener.SetWriteDeadline(time.Now().Add(c.maxWriteTimeout))
}
_, err = s.udpListener.WriteTo(data, c.ClientAddr)
if err == nil && (msg.Type == MSG_SYNC_ASK || msg.Type == MSG_SYS_WAIT) {
wait.Time = time.Now()
wait.TransferMsg = msg
wait.Reply = make(chan Message, 1)
s.noFinSyncMsgPool.Store(msg.ID, wait)
} }
return wait, err
}
func (s *ServerCommon) StopMonitorChan() <-chan struct{} {
return s.stopCtx.Done()
}
func (s *ServerCommon) Status() Status {
return s.status
}
func (s *ServerCommon) GetSecretKey() []byte {
return s.SecretKey
}
func (s *ServerCommon) SetSecretKey(key []byte) {
s.SecretKey = key
}
func (s *ServerCommon) RsaPrivKey() []byte {
return s.handshakeRsaKey
}
func (s *ServerCommon) SetRsaPrivKey(key []byte) {
s.handshakeRsaKey = key
}
func (s *ServerCommon) GetClient(id string) *ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
c, ok := s.clientPool[id]
if !ok {
return nil
}
return c
}
func (s *ServerCommon) GetClientLists() []*ClientConn {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]*ClientConn, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v)
}
return list
}
func (s *ServerCommon) GetClientAddrs() []net.Addr {
s.mu.RLock()
defer s.mu.RUnlock()
var list = make([]net.Addr, 0, len(s.clientPool))
for _, v := range s.clientPool {
list = append(list, v.ClientAddr)
}
return list
}
func (s *ServerCommon) GetSequenceEn() func(interface{}) ([]byte, error) {
return s.sequenceEn
}
func (s *ServerCommon) SetSequenceEn(fn func(interface{}) ([]byte, error)) {
s.sequenceEn = fn
}
func (s *ServerCommon) GetSequenceDe() func([]byte) (interface{}, error) {
return s.sequenceDe
}
func (s *ServerCommon) SetSequenceDe(fn func([]byte) (interface{}, error)) {
s.sequenceDe = fn
} }
func (s *ServerCommon) HeartbeatTimeoutSec() int64 { func (star *StarNotifyS) analyseData(msg string) (mode, key, value string) {
return s.maxHeartbeatLostSeconds slice := strings.SplitN(msg, "||", 3)
return slice[0], star.trim(slice[1]), slice[2]
} }
func (s *ServerCommon) SetHeartbeatTimeoutSec(sec int64) { // ServerStop 用于终止Server端运行
s.maxHeartbeatLostSeconds = sec func (star *StarNotifyS) ServerStop() {
star.cancel()
star.Stop <- 1
star.Stop <- 1
star.Stop <- 1
} }

@ -1,49 +0,0 @@
package notify
import (
"context"
"net"
"time"
)
type Server interface {
SetDefaultCommEncode(func([]byte, []byte) []byte)
SetDefaultCommDecode(func([]byte, []byte) []byte)
SetDefaultLink(func(message *Message))
SetLink(string, func(*Message))
send(c *ClientConn, msg TransferMsg) (WaitMsg, error)
sendWait(c *ClientConn, msg TransferMsg, timeout time.Duration) (Message, error)
SendObjCtx(ctx context.Context, c *ClientConn, key string, val interface{}) (Message, error)
SendObj(c *ClientConn, key string, val interface{}) error
Send(c *ClientConn, key string, value MsgVal) error
SendWait(c *ClientConn, key string, value MsgVal, timeout time.Duration) (Message, error)
SendWaitObj(c *ClientConn, key string, value interface{}, timeout time.Duration) (Message, error)
SendCtx(ctx context.Context, c *ClientConn, key string, value MsgVal) (Message, error)
Reply(m Message, value MsgVal) error
pushMessage([]byte, string)
removeClient(client *ClientConn)
Listen(network string, addr string) error
Stop() error
StopMonitorChan() <-chan struct{}
Status() Status
GetSecretKey() []byte
SetSecretKey(key []byte)
RsaPrivKey() []byte
SetRsaPrivKey([]byte)
GetClient(id string) *ClientConn
GetClientLists() []*ClientConn
GetClientAddrs() []net.Addr
GetSequenceEn() func(interface{}) ([]byte, error)
SetSequenceEn(func(interface{}) ([]byte, error))
GetSequenceDe() func([]byte) (interface{}, error)
SetSequenceDe(func([]byte) (interface{}, error))
ShowError(bool)
DebugMode(bool)
IsDebugMode() bool
HeartbeatTimeoutSec() int64
SetHeartbeatTimeoutSec(int64)
}

@ -1,109 +0,0 @@
package starnotify
import (
"b612.me/notify"
"errors"
"sync"
)
var (
cmu sync.RWMutex
smu sync.RWMutex
starClient map[string]notify.Client
starServer map[string]notify.Server
)
func init() {
starClient = make(map[string]notify.Client)
starServer = make(map[string]notify.Server)
}
func NewClient(key string) notify.Client {
client := notify.NewClient()
cmu.Lock()
starClient[key] = client
cmu.Unlock()
return client
}
func DeleteClient(key string) (err error) {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return errors.New("Not Exists Yet!")
}
if client.Status().Alive {
err = client.Stop()
}
client = nil
cmu.Lock()
delete(starClient, key)
cmu.Unlock()
return err
}
func NewServer(key string) notify.Server {
server := notify.NewServer()
smu.Lock()
starServer[key] = server
smu.Unlock()
return server
}
func DeleteServer(key string) error {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return errors.New("Not Exists Yet!")
}
if server.Status().Alive {
server.Stop()
}
server = nil
smu.Lock()
delete(starServer, key)
smu.Unlock()
return nil
}
func S(key string) notify.Server {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return nil
}
return server
}
func C(key string) notify.Client {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return nil
}
return client
}
func Server(key string) (notify.Server, error) {
smu.RLock()
server, ok := starServer[key]
smu.RUnlock()
if !ok {
return nil, errors.New("Not Exists Yet")
}
return server, nil
}
func Client(key string) (notify.Client, error) {
cmu.RLock()
client, ok := starClient[key]
cmu.RUnlock()
if !ok {
return nil, errors.New("Not Exists Yet")
}
return client, nil
}

@ -1,152 +0,0 @@
package notify
import (
"fmt"
"net"
//_ "net/http/pprof"
"sync/atomic"
"testing"
"time"
)
func Test_ServerTuAndClientCommon(t *testing.T) {
//go http.ListenAndServe("0.0.0.0:8888", nil)
noEn := func(key, bn []byte) []byte {
return bn
}
_ = noEn
server := NewServer()
//server.SetDefaultCommDecode(noEn)
//server.SetDefaultCommEncode(noEn)
err := server.Listen("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
server.SetLink("notify", notify)
for i := 1; i <= 100; i++ {
go func() {
client := NewClient()
//client.SetMsgEn(noEn)
//client.SetMsgDe(noEn)
//client.SetSkipExchangeKey(true)
err = client.Connect("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
time.Sleep(time.Second * 2)
return
}
//client.SetLink("notify", notify)
for {
//nowd = time.Now().UnixNano()
client.SendWait("notify", []byte("client hello"), time.Second*15)
//client.Send("notify", []byte("client hello"))
//time.Sleep(time.Millisecond)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
//client.Send("notify", []byte("client"))
}
}()
}
time.Sleep(time.Second)
go func() {
time.Sleep(time.Second * 10)
server.Stop()
}()
<-server.StopMonitorChan()
fmt.Println(count2)
}
var count2 int64
func notify(msg *Message) {
//fmt.Println(string(msg.Msg.Value))
//fmt.Println("called:", float64(time.Now().UnixNano()-nowd)/1000000)
if msg.NetType == NET_SERVER {
atomic.AddInt64(&count2, 1)
msg.Reply([]byte("server reply"))
}
}
func Test_normal(t *testing.T) {
server, err := net.Listen("tcp", "127.0.0.1:12345")
if err != nil {
t.Fatal(err)
}
go func() {
for {
conn, err := server.Accept()
if err == nil {
go func() {
for {
buf := make([]byte, 1024)
_, err := conn.Read(buf)
//fmt.Println("S RECV", string(buf[:i]))
atomic.AddInt64(&count2, 1)
if err == nil {
conn.Write([]byte("hello world server"))
}
}
}()
}
}
}()
time.Sleep(time.Second * 5)
for i := 1; i <= 100; i++ {
go func() {
conn, err := net.Dial("tcp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
for {
//nowd = time.Now().UnixNano()
_, err := conn.Write([]byte("hello world client"))
if err != nil {
fmt.Println(err)
}
buf := make([]byte, 1024)
conn.Read(buf)
continue
}
}()
}
time.Sleep(time.Second * 10)
fmt.Println(count2)
}
func Test_normal_udp(t *testing.T) {
ludp, _ := net.ResolveUDPAddr("udp", "127.0.0.1:12345")
conn, _ := net.ListenUDP("udp", ludp)
go func() {
for {
buf := make([]byte, 1024)
_, addr, err := conn.ReadFromUDP(buf)
fmt.Println(time.Now(), "S RECV", addr.String())
atomic.AddInt64(&count2, 1)
if err == nil {
conn.WriteToUDP([]byte("hello world server"), addr)
}
}
}()
for i := 1; i <= 100; i++ {
go func() {
conn, err := net.Dial("udp", "127.0.0.1:12345")
if err != nil {
panic(err)
}
for {
//nowd = time.Now().UnixNano()
_, err := conn.Write([]byte("hello world client"))
if err != nil {
fmt.Println(err)
}
buf := make([]byte, 1024)
conn.Read(buf)
fmt.Println(time.Now(), "C RECV")
continue
}
}()
}
time.Sleep(time.Second * 10)
fmt.Println(count2)
}
Loading…
Cancel
Save