package starainrt import ( "bytes" "context" "encoding/binary" "errors" "sync" "time" ) //SecretKey 通信加密Key,不应当被修改 const SecretKey string = "1996victorique1127B612BTXL" // 识别头 var header = []byte{11, 27, 19, 96, 12, 25, 02, 20} // MsgQueue 为基本的信息单位 type MsgQueue struct { ID uint16 Msg []byte Conn interface{} } // StarQueue 为流数据中的消息队列分发 type StarQueue struct { Encode bool Reserve uint16 Msgid uint16 MsgPool []MsgQueue UnFinMsg sync.Map LastID int //= -1 ctx context.Context cancel context.CancelFunc duration time.Duration } // NewQueue 建立一个新消息队列 func NewQueue() *StarQueue { var que StarQueue que.Encode = true que.ctx, que.cancel = context.WithCancel(context.Background()) que.duration = 0 return &que } // Uint32ToByte 4位uint32转byte func Uint32ToByte(src uint32) []byte { res := make([]byte, 4) res[3] = uint8(src) res[2] = uint8(src >> 8) res[1] = uint8(src >> 16) res[0] = uint8(src >> 24) return res } // ByteToUint32 byte转4位uint32 func ByteToUint32(src []byte) uint32 { var res uint32 buffer := bytes.NewBuffer(src) binary.Read(buffer, binary.BigEndian, &res) return res } // Uint16ToByte 2位uint16转byte func Uint16ToByte(src uint16) []byte { res := make([]byte, 2) res[1] = uint8(src) res[0] = uint8(src >> 8) return res } // ByteToUint16 用于byte转uint16 func ByteToUint16(src []byte) uint16 { var res uint16 buffer := bytes.NewBuffer(src) binary.Read(buffer, binary.BigEndian, &res) return res } // BuildMessage 生成编码后的信息用于发送 func (que *StarQueue) BuildMessage(src []byte) []byte { var buff bytes.Buffer que.Msgid++ if que.Encode { cryptos := new(StarCrypto) src = cryptos.VicqueEncodeV1(src, SecretKey) } length := uint32(len(src)) buff.Write(header) buff.Write(Uint32ToByte(length)) buff.Write(Uint16ToByte(que.Msgid)) buff.Write(src) return buff.Bytes() } type unFinMsg struct { ID uint16 LengthRecv uint32 // HeaderMsg 信息头,应当为14位:8位识别码+4位长度码+2位id HeaderMsg []byte RecvMsg []byte } // ParseMessage 用于解析啊收到的msg信息 func (que *StarQueue) ParseMessage(msg []byte, conn interface{}) error { tmp, ok := que.UnFinMsg.Load(conn) cryptos := new(StarCrypto) if ok { //存在未完成的信息 lastMsg := tmp.(*unFinMsg) headerLen := len(lastMsg.HeaderMsg) if headerLen < 14 { //未完成头标题 //传输的数据不能填充header头 if len(msg) < 14-headerLen { //加入header头并退出 lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, msg) que.UnFinMsg.Store(conn, lastMsg) return nil } //获取14字节完整的header header := msg[0 : 14-headerLen] lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, header) //检查收到的header是否为认证header //若不是,丢弃并重新来过 if !checkHeader(lastMsg.HeaderMsg[0:8]) { que.UnFinMsg.Delete(conn) if len(msg) == 0 { return nil } return que.ParseMessage(msg, conn) } //获得本数据包长度 lastMsg.LengthRecv = ByteToUint32(lastMsg.HeaderMsg[8:12]) //获得本数据包ID lastMsg.ID = ByteToUint16(lastMsg.HeaderMsg[12:14]) //存入列表 que.UnFinMsg.Store(conn, lastMsg) msg = msg[14-headerLen:] if uint32(len(msg)) < lastMsg.LengthRecv { lastMsg.RecvMsg = msg que.UnFinMsg.Store(conn, lastMsg) return nil } if uint32(len(msg)) >= lastMsg.LengthRecv { lastMsg.RecvMsg = msg[0:lastMsg.LengthRecv] if que.Encode { lastMsg.RecvMsg = cryptos.VicqueDecodeV1(lastMsg.RecvMsg, SecretKey) } msg = msg[lastMsg.LengthRecv:] stroeMsg := MsgQueue{ ID: lastMsg.ID, Msg: lastMsg.RecvMsg, Conn: conn, } que.MsgPool = append(que.MsgPool, stroeMsg) que.UnFinMsg.Delete(conn) return que.ParseMessage(msg, conn) } } else { lastID := int(lastMsg.LengthRecv) - len(lastMsg.RecvMsg) if lastID < 0 { que.UnFinMsg.Delete(conn) return que.ParseMessage(msg, conn) } if len(msg) >= lastID { lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg[0:lastID]) if que.Encode { lastMsg.RecvMsg = cryptos.VicqueDecodeV1(lastMsg.RecvMsg, SecretKey) } stroeMsg := MsgQueue{ ID: lastMsg.ID, Msg: lastMsg.RecvMsg, Conn: conn, } que.MsgPool = append(que.MsgPool, stroeMsg) que.UnFinMsg.Delete(conn) if len(msg) == lastID { return nil } msg = msg[lastID:] return que.ParseMessage(msg, conn) } lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg) que.UnFinMsg.Store(conn, lastMsg) return nil } } if len(msg) == 0 { return nil } var start int if start = searchHeader(msg); start == -1 { return errors.New("data format error") } msg = msg[start:] lastMsg := unFinMsg{} que.UnFinMsg.Store(conn, &lastMsg) return que.ParseMessage(msg, conn) } func checkHeader(msg []byte) bool { if len(msg) != 8 { return false } for k, v := range msg { if v != header[k] { return false } } return true } func searchHeader(msg []byte) int { if len(msg) < 8 { return 0 } for k, v := range msg { find := 0 if v == header[0] { for k2, v2 := range header { if msg[k+k2] == v2 { find++ } else { break } } if find == 8 { return k } } } return -1 } func bytesMerge(src ...[]byte) []byte { var buff bytes.Buffer for _, v := range src { buff.Write(v) } return buff.Bytes() } // Restore 获取收到的信息 func (que *StarQueue) Restore(n int) ([]MsgQueue, error) { var res []MsgQueue dura := time.Duration(0) for len(que.MsgPool) < n { select { case <-que.ctx.Done(): return res, errors.New("Stoped By External Function Call") default: time.Sleep(time.Millisecond * 20) dura = time.Millisecond*20 + dura if que.duration != 0 && dura > que.duration { return res, errors.New("Time Exceed") } } } if len(que.MsgPool) < n { return res, errors.New("Result Not Enough") } res = que.MsgPool[0:n] que.MsgPool = que.MsgPool[n:] return res, nil } // RestoreOne 获取收到的一个信息 func (que *StarQueue) RestoreOne() (MsgQueue, error) { data, err := que.Restore(1) if len(data) == 1 { return data[0], err } return MsgQueue{}, err } // Stop 立即停止Restore func (que *StarQueue) Stop() { que.cancel() } // RestoreDuration Restore最大超时时间 func (que *StarQueue) RestoreDuration(tm time.Duration) { que.duration = tm }