package notify import ( "b612.me/starcrypto" "context" "errors" "fmt" "net" "os" "sync" "sync/atomic" "time" ) const ( MSG_SYS MessageType = iota MSG_SYS_WAIT MSG_SYS_REPLY MSG_KEY_CHANGE MSG_ASYNC MSG_SYNC_ASK MSG_SYNC_REPLY ) type MessageType uint8 type NetType uint8 const ( NET_SERVER NetType = iota NET_CLIENT ) type MsgVal []byte type TransferMsg struct { ID uint64 Key string Value MsgVal Type MessageType } type Message struct { NetType ClientConn *ClientConn ServerConn Client TransferMsg Time time.Time sync.Mutex } type WaitMsg struct { TransferMsg Time time.Time Reply chan Message //Ctx context.Context } func (m *Message) Reply(value MsgVal) (err error) { reply := TransferMsg{ ID: m.ID, Key: m.Key, Value: value, Type: m.Type, } if reply.Type == MSG_SYNC_ASK { reply.Type = MSG_SYNC_REPLY } if reply.Type == MSG_SYS_WAIT { reply.Type = MSG_SYS_REPLY } if m.NetType == NET_SERVER { _, err = m.ClientConn.server.send(m.ClientConn, reply) } if m.NetType == NET_CLIENT { _, err = m.ServerConn.send(reply) } return } func (m *Message) ReplyObj(value interface{}) (err error) { data, err := encode(value) if err != nil { return err } return m.Reply(data) } type ClientConn struct { alive atomic.Value status Status ClientID string ClientAddr net.Addr tuConn net.Conn server Server stopFn context.CancelFunc stopCtx context.Context maxReadTimeout time.Duration maxWriteTimeout time.Duration msgEn func([]byte, []byte) []byte msgDe func([]byte, []byte) []byte handshakeRsaKey []byte SecretKey []byte lastHeartBeat int64 } type Status struct { Alive bool Reason string Err error } func (c *ClientConn) readTUMessage() { for { select { case <-c.stopCtx.Done(): c.tuConn.Close() c.server.removeClient(c) return default: } if c.maxReadTimeout.Seconds() > 0 { c.tuConn.SetReadDeadline(time.Now().Add(c.maxReadTimeout)) } data := make([]byte, 8192) num, err := c.tuConn.Read(data) if err == os.ErrDeadlineExceeded { if num != 0 { c.server.pushMessage(data[:num], c.ClientID) } continue } if err != nil { //conn is broke c.alive.Store(false) c.status = Status{ Alive: false, Reason: "read error", Err: err, } c.stopFn() continue } c.server.pushMessage(data[:num], c.ClientID) //fmt.Println("finished:", float64(time.Now().UnixNano()-nowd)/1000000) } } func (c *ClientConn) rsaDecode(message Message) { unknownKey := message.Value data, err := starcrypto.RSADecrypt(unknownKey, c.handshakeRsaKey, "") if err != nil { fmt.Println(err) message.Reply([]byte("failed")) return } //fmt.Println("aes-key changed to", string(data)) message.Reply([]byte("success")) c.SecretKey = data } func (c *ClientConn) sayGoodByeForTU() error { _, err := c.server.sendWait(c, TransferMsg{ ID: 10010, Key: "bye", Value: nil, Type: MSG_SYS_WAIT, }, time.Second*3) return err } func (c *ClientConn) GetSecretKey() []byte { return c.SecretKey } func (c *ClientConn) SetSecretKey(key []byte) { c.SecretKey = key } func (c *ClientConn) GetMsgEn() func([]byte, []byte) []byte { return c.msgEn } func (c *ClientConn) SetMsgEn(fn func([]byte, []byte) []byte) { c.msgEn = fn } func (c *ClientConn) GetMsgDe() func([]byte, []byte) []byte { return c.msgDe } func (c *ClientConn) SetMsgDe(fn func([]byte, []byte) []byte) { c.msgDe = fn } func (c *ClientConn) StopMonitorChan() <-chan struct{} { return c.stopCtx.Done() } func (c *ClientConn) Status() Status { return c.status } func (c *ClientConn) Server() Server { return c.server } func (c *ClientConn) GetRemoteAddr() net.Addr { return c.ClientAddr } func (m MsgVal) ToClearString() string { return string(m) } func (m MsgVal) ToInterface() (interface{}, error) { return Decode(m) } func (m MsgVal) MustToInterface() interface{} { inf, err := m.ToInterface() if err != nil { panic(err) } return inf } func (m MsgVal) ToString() (string, error) { inf, err := m.ToInterface() if err != nil { return "", err } if data, ok := inf.(string); !ok { return "", errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToString() string { inf, err := m.ToString() if err != nil { panic(err) } return inf } func (m MsgVal) ToInt32() (int32, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(int32); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToInt32() int32 { inf, err := m.ToInt32() if err != nil { panic(err) } return inf } func (m MsgVal) ToInt() (int, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(int); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToInt() int { inf, err := m.ToInt() if err != nil { panic(err) } return inf } func (m MsgVal) ToUint64() (uint64, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(uint64); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToUint64() uint64 { inf, err := m.ToUint64() if err != nil { panic(err) } return inf } func (m MsgVal) ToUint32() (uint32, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(uint32); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToUint32() uint32 { inf, err := m.ToUint32() if err != nil { panic(err) } return inf } func (m MsgVal) ToUint() (uint, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(uint); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToUint() uint { inf, err := m.ToUint() if err != nil { panic(err) } return inf } func (m MsgVal) ToBool() (bool, error) { inf, err := m.ToInterface() if err != nil { return false, err } if data, ok := inf.(bool); !ok { return false, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToBool() bool { inf, err := m.ToBool() if err != nil { panic(err) } return inf } func (m MsgVal) ToFloat64() (float64, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(float64); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToFloat64() float64 { inf, err := m.ToFloat64() if err != nil { panic(err) } return inf } func (m MsgVal) ToFloat32() (float32, error) { inf, err := m.ToInterface() if err != nil { return 0, err } if data, ok := inf.(float32); !ok { return 0, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToFloat32() float32 { inf, err := m.ToFloat32() if err != nil { panic(err) } return inf } func (m MsgVal) ToSliceString() ([]string, error) { inf, err := m.ToInterface() if err != nil { return []string{}, err } if data, ok := inf.([]string); !ok { return []string{}, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToSliceString() []string { inf, err := m.ToSliceString() if err != nil { panic(err) } return inf } func (m MsgVal) ToSliceInt64() ([]int64, error) { inf, err := m.ToInterface() if err != nil { return []int64{}, err } if data, ok := inf.([]int64); !ok { return []int64{}, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToSliceInt64() []int64 { inf, err := m.ToSliceInt64() if err != nil { panic(err) } return inf } func (m MsgVal) ToSliceFloat64() ([]float64, error) { inf, err := m.ToInterface() if err != nil { return []float64{}, err } if data, ok := inf.([]float64); !ok { return []float64{}, errors.New("source data not match target type") } else { return data, nil } } func (m MsgVal) MustToSliceFloat64() []float64 { inf, err := m.ToSliceFloat64() if err != nil { panic(err) } return inf } func ToMsgVal(val interface{}) (MsgVal, error) { return Encode(val) } func MustToMsgVal(val interface{}) MsgVal { d, err := ToMsgVal(val) if err != nil { panic(err) } return d }