commit 08382b2f65ad5fc623bf9542214525b278fae6bc Author: 兔子 Date: Mon Jul 20 11:17:29 2020 +0800 init diff --git a/circlebuffer.go b/circlebuffer.go new file mode 100644 index 0000000..da2870a --- /dev/null +++ b/circlebuffer.go @@ -0,0 +1,158 @@ +package starnet + +import ( + "errors" + "fmt" + "io" + "time" +) + +type CircleByteBuffer struct { + io.Reader + io.Writer + io.Closer + datas []byte + + start int + end int + size int + isClose bool + isEnd bool +} + +func NewCircleByteBuffer(len int) *CircleByteBuffer { + var e = new(CircleByteBuffer) + e.datas = make([]byte, len) + e.start = 0 + e.end = 0 + e.size = len + e.isClose = false + e.isEnd = false + return e +} + +func (e *CircleByteBuffer) getLen() int { + if e.start == e.end { + return 0 + } else if e.start < e.end { + return e.end - e.start + } else { + return e.start - e.end + } +} +func (e *CircleByteBuffer) getFree() int { + return e.size - e.getLen() +} +func (e *CircleByteBuffer) putByte(b byte) error { + if e.isClose { + return io.EOF + } + e.datas[e.end] = b + var pos = e.end + 1 + for pos == e.start { + if e.isClose { + return io.EOF + } + time.Sleep(time.Microsecond) + } + if pos == e.size { + e.end = 0 + } else { + e.end = pos + } + return nil +} + +func (e *CircleByteBuffer) getByte() (byte, error) { + if e.isClose { + return 0, io.EOF + } + if e.isEnd && e.getLen() <= 0 { + return 0, io.EOF + } + if e.getLen() <= 0 { + return 0, errors.New("no datas") + } + var ret = e.datas[e.start] + e.start++ + if e.start == e.size { + e.start = 0 + } + return ret, nil +} +func (e *CircleByteBuffer) geti(i int) byte { + if i >= e.getLen() { + panic("out buffer") + } + var pos = e.start + i + if pos >= e.size { + pos -= e.size + } + return e.datas[pos] +} + +/*func (e*CircleByteBuffer)puts(bts []byte){ + for i:=0;i> 8) + res[1] = uint8(src >> 16) + res[0] = uint8(src >> 24) + return res +} + +// ByteToUint32 byte转4位uint32 +func ByteToUint32(src []byte) uint32 { + var res uint32 + buffer := bytes.NewBuffer(src) + binary.Read(buffer, binary.BigEndian, &res) + return res +} + +// Uint16ToByte 2位uint16转byte +func Uint16ToByte(src uint16) []byte { + res := make([]byte, 2) + res[1] = uint8(src) + res[0] = uint8(src >> 8) + return res +} + +// ByteToUint16 用于byte转uint16 +func ByteToUint16(src []byte) uint16 { + var res uint16 + buffer := bytes.NewBuffer(src) + binary.Read(buffer, binary.BigEndian, &res) + return res +} + +// BuildMessage 生成编码后的信息用于发送 +func (que *StarQueue) BuildMessage(src []byte) []byte { + var buff bytes.Buffer + que.Msgid++ + if que.Encode { + src = que.EncodeFunc(src) + } + length := uint32(len(src)) + buff.Write(header) + buff.Write(Uint32ToByte(length)) + buff.Write(Uint16ToByte(que.Msgid)) + buff.Write(src) + return buff.Bytes() +} + +// BuildHeader 生成编码后的Header用于发送 +func (que *StarQueue) BuildHeader(length uint32) []byte { + var buff bytes.Buffer + que.Msgid++ + buff.Write(header) + buff.Write(Uint32ToByte(length)) + buff.Write(Uint16ToByte(que.Msgid)) + return buff.Bytes() +} + +type unFinMsg struct { + ID uint16 + LengthRecv uint32 + // HeaderMsg 信息头,应当为14位:8位识别码+4位长度码+2位id + HeaderMsg []byte + RecvMsg []byte +} + +// ParseMessage 用于解析收到的msg信息 +func (que *StarQueue) ParseMessage(msg []byte, conn interface{}) error { + tmp, ok := que.UnFinMsg.Load(conn) + if ok { //存在未完成的信息 + lastMsg := tmp.(*unFinMsg) + headerLen := len(lastMsg.HeaderMsg) + if headerLen < 14 { //未完成头标题 + //传输的数据不能填充header头 + if len(msg) < 14-headerLen { + //加入header头并退出 + lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, msg) + que.UnFinMsg.Store(conn, lastMsg) + return nil + } + //获取14字节完整的header + header := msg[0 : 14-headerLen] + lastMsg.HeaderMsg = bytesMerge(lastMsg.HeaderMsg, header) + //检查收到的header是否为认证header + //若不是,丢弃并重新来过 + if !checkHeader(lastMsg.HeaderMsg[0:8]) { + que.UnFinMsg.Delete(conn) + if len(msg) == 0 { + return nil + } + return que.ParseMessage(msg, conn) + } + //获得本数据包长度 + lastMsg.LengthRecv = ByteToUint32(lastMsg.HeaderMsg[8:12]) + //获得本数据包ID + lastMsg.ID = ByteToUint16(lastMsg.HeaderMsg[12:14]) + //存入列表 + que.UnFinMsg.Store(conn, lastMsg) + msg = msg[14-headerLen:] + if uint32(len(msg)) < lastMsg.LengthRecv { + lastMsg.RecvMsg = msg + que.UnFinMsg.Store(conn, lastMsg) + return nil + } + if uint32(len(msg)) >= lastMsg.LengthRecv { + lastMsg.RecvMsg = msg[0:lastMsg.LengthRecv] + if que.Encode { + lastMsg.RecvMsg = que.DecodeFunc(lastMsg.RecvMsg) + } + msg = msg[lastMsg.LengthRecv:] + stroeMsg := MsgQueue{ + ID: lastMsg.ID, + Msg: lastMsg.RecvMsg, + Conn: conn, + } + que.MsgPool = append(que.MsgPool, stroeMsg) + que.UnFinMsg.Delete(conn) + return que.ParseMessage(msg, conn) + } + } else { + lastID := int(lastMsg.LengthRecv) - len(lastMsg.RecvMsg) + if lastID < 0 { + que.UnFinMsg.Delete(conn) + return que.ParseMessage(msg, conn) + } + if len(msg) >= lastID { + lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg[0:lastID]) + if que.Encode { + lastMsg.RecvMsg = que.DecodeFunc(lastMsg.RecvMsg) + } + stroeMsg := MsgQueue{ + ID: lastMsg.ID, + Msg: lastMsg.RecvMsg, + Conn: conn, + } + que.MsgPool = append(que.MsgPool, stroeMsg) + que.UnFinMsg.Delete(conn) + if len(msg) == lastID { + return nil + } + msg = msg[lastID:] + return que.ParseMessage(msg, conn) + } + lastMsg.RecvMsg = bytesMerge(lastMsg.RecvMsg, msg) + que.UnFinMsg.Store(conn, lastMsg) + return nil + } + } + if len(msg) == 0 { + return nil + } + var start int + if start = searchHeader(msg); start == -1 { + return errors.New("data format error") + } + msg = msg[start:] + lastMsg := unFinMsg{} + que.UnFinMsg.Store(conn, &lastMsg) + return que.ParseMessage(msg, conn) +} + +func checkHeader(msg []byte) bool { + if len(msg) != 8 { + return false + } + for k, v := range msg { + if v != header[k] { + return false + } + } + return true +} + +func searchHeader(msg []byte) int { + if len(msg) < 8 { + return 0 + } + for k, v := range msg { + find := 0 + if v == header[0] { + for k2, v2 := range header { + if msg[k+k2] == v2 { + find++ + } else { + break + } + } + if find == 8 { + return k + } + } + } + return -1 +} + +func bytesMerge(src ...[]byte) []byte { + var buff bytes.Buffer + for _, v := range src { + buff.Write(v) + } + return buff.Bytes() +} + +// Restore 获取收到的信息 +func (que *StarQueue) Restore(n int) ([]MsgQueue, error) { + var res []MsgQueue + dura := time.Duration(0) + for len(que.MsgPool) < n { + select { + case <-que.ctx.Done(): + return res, errors.New("Stoped By External Function Call") + default: + time.Sleep(time.Millisecond * 20) + dura = time.Millisecond*20 + dura + if que.duration != 0 && dura > que.duration { + return res, errors.New("Time Exceed") + } + } + } + if len(que.MsgPool) < n { + return res, errors.New("Result Not Enough") + } + res = que.MsgPool[0:n] + que.MsgPool = que.MsgPool[n:] + return res, nil +} + +// RestoreOne 获取收到的一个信息 +func (que *StarQueue) RestoreOne() (MsgQueue, error) { + data, err := que.Restore(1) + if len(data) == 1 { + return data[0], err + } + return MsgQueue{}, err +} + +// Stop 立即停止Restore +func (que *StarQueue) Stop() { + que.cancel() +} + +// RestoreDuration Restore最大超时时间 +func (que *StarQueue) RestoreDuration(tm time.Duration) { + que.duration = tm +}