You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
notify/msg.go

503 lines
9.3 KiB
Go

package notify
import (
"b612.me/starcrypto"
"context"
"errors"
"fmt"
"net"
"os"
"reflect"
"sync/atomic"
"time"
)
const (
MSG_SYS MessageType = iota
MSG_SYS_WAIT
MSG_SYS_REPLY
MSG_KEY_CHANGE
MSG_ASYNC
MSG_SYNC_ASK
MSG_SYNC_REPLY
)
type MessageType uint8
type NetType uint8
const (
NET_SERVER NetType = iota
NET_CLIENT
)
type MsgVal []byte
type TransferMsg struct {
ID uint64
Key string
Value MsgVal
Type MessageType
}
type Message struct {
NetType
ClientConn *ClientConn
ServerConn Client
TransferMsg
Time time.Time
}
type WaitMsg struct {
TransferMsg
Time time.Time
Reply chan Message
//Ctx context.Context
}
func (m *Message) Reply(value MsgVal) (err error) {
reply := TransferMsg{
ID: m.ID,
Key: m.Key,
Value: value,
Type: m.Type,
}
if reply.Type == MSG_SYNC_ASK {
reply.Type = MSG_SYNC_REPLY
}
if reply.Type == MSG_SYS_WAIT {
reply.Type = MSG_SYS_REPLY
}
if m.NetType == NET_SERVER {
_, err = m.ClientConn.server.send(m.ClientConn, reply)
}
if m.NetType == NET_CLIENT {
_, err = m.ServerConn.send(reply)
}
return
}
func (m *Message) ReplyObj(value interface{}) (err error) {
data, err := encode(value)
if err != nil {
return err
}
return m.Reply(data)
}
type ClientConn struct {
alive atomic.Value
status Status
ClientID string
ClientAddr net.Addr
tuConn net.Conn
server Server
stopFn context.CancelFunc
stopCtx context.Context
maxReadTimeout time.Duration
maxWriteTimeout time.Duration
msgEn func([]byte, []byte) []byte
msgDe func([]byte, []byte) []byte
handshakeRsaKey []byte
SecretKey []byte
lastHeartBeat int64
}
type Status struct {
Alive bool
Reason string
Err error
}
func (c *ClientConn) readTUMessage() {
for {
select {
case <-c.stopCtx.Done():
c.tuConn.Close()
c.server.removeClient(c)
return
default:
}
if c.maxReadTimeout.Seconds() > 0 {
c.tuConn.SetReadDeadline(time.Now().Add(c.maxReadTimeout))
}
data := make([]byte, 8192)
num, err := c.tuConn.Read(data)
if err == os.ErrDeadlineExceeded {
if num != 0 {
c.server.pushMessage(data[:num], c.ClientID)
}
continue
}
if err != nil {
//conn is broke
c.alive.Store(false)
c.status = Status{
Alive: false,
Reason: "read error",
Err: err,
}
c.stopFn()
continue
}
c.server.pushMessage(data[:num], c.ClientID)
//fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000)
}
}
func (c *ClientConn) rsaDecode(message Message) {
privKey, err := starcrypto.DecodeRsaPrivateKey(c.handshakeRsaKey, "")
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
data, err := starcrypto.RSADecrypt(privKey, message.Value)
if err != nil {
fmt.Println(err)
message.Reply([]byte("failed"))
return
}
//fmt.Println("aes-key changed to", string(data))
message.Reply([]byte("success"))
c.SecretKey = data
}
func (c *ClientConn) sayGoodByeForTU() error {
_, err := c.server.sendWait(c, TransferMsg{
ID: 10010,
Key: "bye",
Value: nil,
Type: MSG_SYS_WAIT,
}, time.Second*3)
return err
}
func (c *ClientConn) GetSecretKey() []byte {
return c.SecretKey
}
func (c *ClientConn) SetSecretKey(key []byte) {
c.SecretKey = key
}
func (c *ClientConn) GetMsgEn() func([]byte, []byte) []byte {
return c.msgEn
}
func (c *ClientConn) SetMsgEn(fn func([]byte, []byte) []byte) {
c.msgEn = fn
}
func (c *ClientConn) GetMsgDe() func([]byte, []byte) []byte {
return c.msgDe
}
func (c *ClientConn) SetMsgDe(fn func([]byte, []byte) []byte) {
c.msgDe = fn
}
func (c *ClientConn) StopMonitorChan() <-chan struct{} {
return c.stopCtx.Done()
}
func (c *ClientConn) Status() Status {
return c.status
}
func (c *ClientConn) Server() Server {
return c.server
}
func (c *ClientConn) GetRemoteAddr() net.Addr {
return c.ClientAddr
}
func (m MsgVal) ToClearString() string {
return string(m)
}
func (m MsgVal) ToInterface() (interface{}, error) {
return Decode(m)
}
func (m MsgVal) MustToInterface() interface{} {
inf, err := m.ToInterface()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToString() (string, error) {
inf, err := m.ToInterface()
if err != nil {
return "", err
}
if data, ok := inf.(string); !ok {
return "", errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToString() string {
inf, err := m.ToString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt32() (int32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt32() int32 {
inf, err := m.ToInt32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToInt() (int, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(int); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToInt() int {
inf, err := m.ToInt()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint64() (uint64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint64() uint64 {
inf, err := m.ToUint64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint32() (uint32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint32() uint32 {
inf, err := m.ToUint32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToUint() (uint, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(uint); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToUint() uint {
inf, err := m.ToUint()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToBool() (bool, error) {
inf, err := m.ToInterface()
if err != nil {
return false, err
}
if data, ok := inf.(bool); !ok {
return false, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToBool() bool {
inf, err := m.ToBool()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat64() (float64, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float64); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat64() float64 {
inf, err := m.ToFloat64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToFloat32() (float32, error) {
inf, err := m.ToInterface()
if err != nil {
return 0, err
}
if data, ok := inf.(float32); !ok {
return 0, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToFloat32() float32 {
inf, err := m.ToFloat32()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceString() ([]string, error) {
inf, err := m.ToInterface()
if err != nil {
return []string{}, err
}
if data, ok := inf.([]string); !ok {
return []string{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceString() []string {
inf, err := m.ToSliceString()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceInt64() ([]int64, error) {
inf, err := m.ToInterface()
if err != nil {
return []int64{}, err
}
if data, ok := inf.([]int64); !ok {
return []int64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceInt64() []int64 {
inf, err := m.ToSliceInt64()
if err != nil {
panic(err)
}
return inf
}
func (m MsgVal) ToSliceFloat64() ([]float64, error) {
inf, err := m.ToInterface()
if err != nil {
return []float64{}, err
}
if data, ok := inf.([]float64); !ok {
return []float64{}, errors.New("source data not match target type")
} else {
return data, nil
}
}
func (m MsgVal) MustToSliceFloat64() []float64 {
inf, err := m.ToSliceFloat64()
if err != nil {
panic(err)
}
return inf
}
func ToMsgVal(val interface{}) (MsgVal, error) {
return Encode(val)
}
func MustToMsgVal(val interface{}) MsgVal {
d, err := ToMsgVal(val)
if err != nil {
panic(err)
}
return d
}
func (m MsgVal) Orm(stu interface{}) error {
inf, err := m.ToInterface()
if err != nil {
return err
}
t := reflect.TypeOf(stu)
if t.Kind() != reflect.Ptr {
return errors.New("interface not writable(pointer wanted)")
}
if !reflect.ValueOf(stu).Elem().CanSet() {
return errors.New("interface not writable")
}
it := reflect.TypeOf(inf)
if t.Elem().Kind() != it.Kind() {
return fmt.Errorf("interface{} kind is %v,not %v", t.Elem().Kind(), it.Kind())
}
if t.Elem().Name() != it.Name() {
return fmt.Errorf("interface{} name is %v,not %v", t.Elem().Name(), it.Name())
}
if t.Elem().String() != it.String() {
return fmt.Errorf("interface{} string is %v,not %v", t.Elem().String(), it.String())
}
reflect.ValueOf(stu).Elem().Set(reflect.ValueOf(inf))
return nil
}