You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
starnet/que.go

326 lines
7.2 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package starnet
import (
"bytes"
"context"
"encoding/binary"
"errors"
"fmt"
"os"
"sync"
"time"
)
var ErrDeadlineExceeded error = errors.New("deadline exceeded")
// 识别头
var header = []byte{11, 27, 19, 96, 12, 25, 02, 20}
// MsgQueue 为基本的信息单位
type MsgQueue struct {
ID uint16
Msg []byte
Conn interface{}
}
// StarQueue 为流数据中的消息队列分发
type StarQueue struct {
maxLength uint32
count int64
Encode bool
msgID uint16
msgPool chan MsgQueue
unFinMsg sync.Map
lastID int //= -1
ctx context.Context
cancel context.CancelFunc
duration time.Duration
EncodeFunc func([]byte) []byte
DecodeFunc func([]byte) []byte
//restoreMu sync.Mutex
}
func NewQueueCtx(ctx context.Context, count int64, maxMsgLength uint32) *StarQueue {
var q StarQueue
q.Encode = false
q.count = count
q.maxLength = maxMsgLength
q.msgPool = make(chan MsgQueue, count)
if ctx == nil {
q.ctx, q.cancel = context.WithCancel(context.Background())
} else {
q.ctx, q.cancel = context.WithCancel(ctx)
}
q.duration = 0
return &q
}
func NewQueueWithCount(count int64) *StarQueue {
return NewQueueCtx(nil, count, 0)
}
// NewQueue 建立一个新消息队列
func NewQueue() *StarQueue {
return NewQueueWithCount(32)
}
// Uint32ToByte 4位uint32转byte
func Uint32ToByte(src uint32) []byte {
res := make([]byte, 4)
res[3] = uint8(src)
res[2] = uint8(src >> 8)
res[1] = uint8(src >> 16)
res[0] = uint8(src >> 24)
return res
}
// ByteToUint32 byte转4位uint32
func ByteToUint32(src []byte) uint32 {
var res uint32
buffer := bytes.NewBuffer(src)
binary.Read(buffer, binary.BigEndian, &res)
return res
}
// Uint16ToByte 2位uint16转byte
func Uint16ToByte(src uint16) []byte {
res := make([]byte, 2)
res[1] = uint8(src)
res[0] = uint8(src >> 8)
return res
}
// ByteToUint16 用于byte转uint16
func ByteToUint16(src []byte) uint16 {
var res uint16
buffer := bytes.NewBuffer(src)
binary.Read(buffer, binary.BigEndian, &res)
return res
}
// BuildMessage 生成编码后的信息用于发送
func (q *StarQueue) BuildMessage(src []byte) []byte {
var buff bytes.Buffer
q.msgID++
if q.Encode {
src = q.EncodeFunc(src)
}
length := uint32(len(src))
buff.Write(header)
buff.Write(Uint32ToByte(length))
buff.Write(Uint16ToByte(q.msgID))
buff.Write(src)
return buff.Bytes()
}
// BuildHeader 生成编码后的Header用于发送
func (q *StarQueue) BuildHeader(length uint32) []byte {
var buff bytes.Buffer
q.msgID++
buff.Write(header)
buff.Write(Uint32ToByte(length))
buff.Write(Uint16ToByte(q.msgID))
return buff.Bytes()
}
type unFinMsg struct {
ID uint16
LengthRecv uint32
// HeaderMsg 信息头应当为14位8位识别码+4位长度码+2位id
HeaderMsg []byte
RecvMsg []byte
}
func (q *StarQueue) push2list(msg MsgQueue) {
q.msgPool <- msg
}
// ParseMessage 用于解析收到的msg信息
func (q *StarQueue) ParseMessage(msg []byte, conn interface{}) error {
return q.parseMessage(msg, conn)
}
// parseMessage 用于解析收到的msg信息
func (q *StarQueue) parseMessage(msg []byte, conn interface{}) error {
tmp, ok := q.unFinMsg.Load(conn)
if ok { //存在未完成的信息
lastMsg := tmp.(*unFinMsg)
headerLen := len(lastMsg.HeaderMsg)
if headerLen < 14 { //未完成头标题
//传输的数据不能填充header头
if len(msg) < 14-headerLen {
//加入header头并退出
lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, msg)
q.unFinMsg.Store(conn, lastMsg)
return nil
}
//获取14字节完整的header
header := msg[0 : 14-headerLen]
lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, header)
//检查收到的header是否为认证header
//若不是,丢弃并重新来过
if !checkHeader(lastMsg.HeaderMsg[0:8]) {
q.unFinMsg.Delete(conn)
if len(msg) == 0 {
return nil
}
return q.parseMessage(msg, conn)
}
//获得本数据包长度
lastMsg.LengthRecv = ByteToUint32(lastMsg.HeaderMsg[8:12])
if q.maxLength != 0 && lastMsg.LengthRecv > q.maxLength {
q.unFinMsg.Delete(conn)
return fmt.Errorf("msg length is %d ,too large than %d", lastMsg.LengthRecv, q.maxLength)
}
//获得本数据包ID
lastMsg.ID = ByteToUint16(lastMsg.HeaderMsg[12:14])
//存入列表
q.unFinMsg.Store(conn, lastMsg)
msg = msg[14-headerLen:]
if uint32(len(msg)) < lastMsg.LengthRecv {
lastMsg.RecvMsg = msg
q.unFinMsg.Store(conn, lastMsg)
return nil
}
if uint32(len(msg)) >= lastMsg.LengthRecv {
lastMsg.RecvMsg = msg[0:lastMsg.LengthRecv]
if q.Encode {
lastMsg.RecvMsg = q.DecodeFunc(lastMsg.RecvMsg)
}
msg = msg[lastMsg.LengthRecv:]
storeMsg := MsgQueue{
ID: lastMsg.ID,
Msg: lastMsg.RecvMsg,
Conn: conn,
}
//q.restoreMu.Lock()
q.push2list(storeMsg)
//q.restoreMu.Unlock()
q.unFinMsg.Delete(conn)
return q.parseMessage(msg, conn)
}
} else {
lastID := int(lastMsg.LengthRecv) - len(lastMsg.RecvMsg)
if lastID < 0 {
q.unFinMsg.Delete(conn)
return q.parseMessage(msg, conn)
}
if len(msg) >= lastID {
lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg[0:lastID])
if q.Encode {
lastMsg.RecvMsg = q.DecodeFunc(lastMsg.RecvMsg)
}
storeMsg := MsgQueue{
ID: lastMsg.ID,
Msg: lastMsg.RecvMsg,
Conn: conn,
}
q.push2list(storeMsg)
q.unFinMsg.Delete(conn)
if len(msg) == lastID {
return nil
}
msg = msg[lastID:]
return q.parseMessage(msg, conn)
}
lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg)
q.unFinMsg.Store(conn, lastMsg)
return nil
}
}
if len(msg) == 0 {
return nil
}
var start int
if start = searchHeader(msg); start == -1 {
return errors.New("data format error")
}
msg = msg[start:]
lastMsg := unFinMsg{}
q.unFinMsg.Store(conn, &lastMsg)
return q.parseMessage(msg, conn)
}
func checkHeader(msg []byte) bool {
if len(msg) != 8 {
return false
}
for k, v := range msg {
if v != header[k] {
return false
}
}
return true
}
func searchHeader(msg []byte) int {
if len(msg) < 8 {
return 0
}
for k, v := range msg {
find := 0
if v == header[0] {
for k2, v2 := range header {
if msg[k+k2] == v2 {
find++
} else {
break
}
}
if find == 8 {
return k
}
}
}
return -1
}
func bytesMerge(src ...[]byte) []byte {
var buff bytes.Buffer
for _, v := range src {
buff.Write(v)
}
return buff.Bytes()
}
// Restore 获取收到的信息
func (q *StarQueue) Restore() (MsgQueue, error) {
if q.duration.Seconds() == 0 {
q.duration = 86400 * time.Second
}
for {
select {
case <-q.ctx.Done():
return MsgQueue{}, errors.New("Stoped By External Function Call")
case <-time.After(q.duration):
if q.duration != 0 {
return MsgQueue{}, ErrDeadlineExceeded
}
case data, ok := <-q.msgPool:
if !ok {
return MsgQueue{}, os.ErrClosed
}
return data, nil
}
}
}
// RestoreOne 获取收到的一个信息
// 兼容性修改
func (q *StarQueue) RestoreOne() (MsgQueue, error) {
return q.Restore()
}
// Stop 立即停止Restore
func (q *StarQueue) Stop() {
q.cancel()
}
// RestoreDuration Restore最大超时时间
func (q *StarQueue) RestoreDuration(tm time.Duration) {
q.duration = tm
}
func (q *StarQueue) RestoreChan() <-chan MsgQueue {
return q.msgPool
}