package binlog import ( "b612.me/mysql/gtid" "encoding/binary" "io" "os" ) func GetUnexecutedBinlogPosByGtidAndAllGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, allgtid string, err error) { file, err := os.Open(binlogPath) if nil != err { return 0, "", err } defer file.Close() myGtid, err := gtid.Parse(allgtid) if err != nil { return 0, "", err } p := uint32(4) finUnexec := false retPos := uint32(0) headerBs := make([]byte, 19) payloadBs := make([]byte, 1024) lastExecutedGtidPos := uint32(0) executedGtid, err := gtid.Parse(executedGtidDesc) if nil != err { return 0, "", err } for { if _, err := file.Seek(int64(p), 0); nil != err { return 0, "", err } if _, err := io.ReadFull(file, headerBs); nil != err { if err == io.EOF { break } return 0, "", err } length := binary.LittleEndian.Uint32(headerBs[9:13]) eventType := int(headerBs[4]) if GTID_LOG_EVENT != eventType { p += length continue } payloadLength := length - 19 if payloadLength > uint32(len(payloadBs)) { payloadBs = make([]byte, payloadLength) } if _, err := io.ReadFull(file, payloadBs[:payloadLength]); nil != err { return 0, "", err } uuid := bytesToUuid(payloadBs[1:17]) number := bytesToUint64(payloadBs[17:25]) if !finUnexec { contain, err := executedGtid.ContainGtid(uuid, number) if nil != err { return 0, "", err } if contain { lastExecutedGtidPos = p p += length } else { retPos = p if includeEventBeforeFirst && 0 != lastExecutedGtidPos { retPos = lastExecutedGtidPos } finUnexec = true } } else { p += length } if err = myGtid.AddGtid(uuid, number); nil != err { return 0, "", err } } if !finUnexec { return 0, myGtid.String(), io.EOF } return uint(retPos), myGtid.String(), nil } func GetUnexecutedBinlogPosByGtid(binlogPath string, executedGtidDesc string, includeEventBeforeFirst bool) (pos uint, err error) { file, err := os.Open(binlogPath) if nil != err { return 0, err } defer file.Close() p := uint32(4) headerBs := make([]byte, 19) payloadBs := make([]byte, 1024) lastExecutedGtidPos := uint32(0) executedGtid, err := gtid.Parse(executedGtidDesc) if nil != err { return 0, err } for { if _, err := file.Seek(int64(p), 0); nil != err { return 0, err } if _, err := io.ReadFull(file, headerBs); nil != err { return 0, err } length := binary.LittleEndian.Uint32(headerBs[9:13]) eventType := int(headerBs[4]) if GTID_LOG_EVENT != eventType { p += length continue } payloadLength := length - 19 if payloadLength > uint32(len(payloadBs)) { payloadBs = make([]byte, payloadLength) } if _, err := io.ReadFull(file, payloadBs[:payloadLength]); nil != err { return 0, err } uuid := bytesToUuid(payloadBs[1:17]) number := bytesToUint64(payloadBs[17:25]) contain, err := executedGtid.ContainGtid(uuid, number) if nil != err { return 0, err } if contain { lastExecutedGtidPos = p p += length } else { retPos := p if includeEventBeforeFirst && 0 != lastExecutedGtidPos { retPos = lastExecutedGtidPos } return uint(retPos), nil } } }