diff --git a/parse.go b/parse.go index 6e0ea59..df37894 100644 --- a/parse.go +++ b/parse.go @@ -294,3 +294,296 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, return db, tb, sqlType, sql, rowCnt, rows } + +type BinlogFilter struct { + IncludeGtid string + ExcludeGtid string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int +} + +func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { + var inGtid, exGtid *gtid.Gtid + var err error + if filter.IncludeGtid != "" { + inGtid, err = gtid.Parse(filter.IncludeGtid) + if err != nil { + return err + } + } + if filter.ExcludeGtid != "" { + exGtid, err = gtid.Parse(filter.ExcludeGtid) + if err != nil { + return err + } + } + + // process: 0, continue: 1, break: 2, EOF: 3 + var ( + n int64 + db string = "" + tb string = "" + sql string = "" + sqlType string = "" + rowCnt uint32 = 0 + tbMapPos uint32 = 0 + skipTillNext bool = false + rows [][]interface{} + ) + var tx Transaction + + currentGtid := "" + callFn := func(tx Transaction) { + if fn == nil { + return + } + if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { + return + } + if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { + return + } + if filter.StartPos != 0 && filter.StartPos > tx.StartPos { + return + } + if filter.EndPos != 0 && filter.EndPos < tx.EndPos { + return + } + if filter.BigThan != 0 && filter.BigThan > tx.Size { + return + } + if filter.SmallThan != 0 && filter.SmallThan < tx.Size { + return + } + fn(tx) + } + for { + headBuf := make([]byte, replication.EventHeaderSize) + if _, err = io.ReadFull(r, headBuf); err == io.EOF { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + tx.Size = tx.EndPos - tx.StartPos + callFn(tx) + return nil + } else if err != nil { + return err + } + var h *replication.EventHeader + h, err = parse.ParseHeader(headBuf) + if err != nil { + return err + } + //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) + if h.EventSize <= uint32(replication.EventHeaderSize) { + err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) + return err + } + + var buf bytes.Buffer + if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { + err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) + return err + } + if skipTillNext && h.EventType != replication.GTID_EVENT { + continue + } + //h.Dump(os.Stdout) + + data := buf.Bytes() + var rawData []byte + rawData = append(rawData, headBuf...) + rawData = append(rawData, data...) + + eventLen := int(h.EventSize) - replication.EventHeaderSize + + if len(data) != eventLen { + err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) + return err + } + + var e replication.Event + e, err = parse.ParseEvent(h, data, rawData) + if err != nil { + return err + } + if h.EventType == replication.TABLE_MAP_EVENT { + tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event + } + + //e.Dump(os.Stdout) + + //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} + binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data + + db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) + startPos := 0 + if sqlType == "query" || sqlType == "gtid" { + startPos = int(h.LogPos - h.EventSize) + //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } else { + startPos = int(tbMapPos) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } + switch sqlType { + case "gtid": + if skipTillNext { + skipTillNext = false + } + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + tx.Size = tx.EndPos - tx.StartPos + callFn(tx) + } + currentGtid = sql + if inGtid != nil { + if c, _ := inGtid.Contain(sql); !c { + currentGtid = "" + skipTillNext = true + continue + } + } + if exGtid != nil { + if c, _ := exGtid.Contain(sql); c { + currentGtid = "" + skipTillNext = true + continue + } + } + tx = Transaction{ + GTID: sql, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + } + case "": + tx.EndPos = int(h.LogPos) + continue + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, sql) + default: + tx.EndPos = int(h.LogPos) + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: db, + Table: tb, + Sql: sql, + SqlType: sqlType, + Rows: rows, + RowCount: int(rowCnt), + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + }) + } + } +} + +func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { + defer func() { + recover() + }() + if !staros.Exists(path) { + return os.ErrNotExist + } + f, err := os.Open(path) + if f != nil { + defer f.Close() + } + if err != nil { + return err + } + parse := replication.NewBinlogParser() + parse.SetParseTime(false) + parse.SetUseDecimal(false) + seekZore := func() error { + fileTypeBytes := int64(4) + b := make([]byte, fileTypeBytes) + // 读取binlog头 + if _, err = f.Read(b); err != nil { + return err + } else if !bytes.Equal(b, replication.BinLogFileHeader) { + //不是binlog格式 + return err + } + // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped + if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { + return err + } + return nil + } + if pos != 0 { + if err = seekZore(); err != nil { + return err + } + for { + headBuf := make([]byte, replication.EventHeaderSize) + if _, err = io.ReadFull(f, headBuf); err != nil { + return err + } + var h *replication.EventHeader + h, err = parse.ParseHeader(headBuf) + if err != nil { + return err + } + if h.EventSize <= uint32(replication.EventHeaderSize) { + err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) + return err + } + var buf bytes.Buffer + if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { + err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) + return err + } + data := buf.Bytes() + var rawData []byte + rawData = append(rawData, headBuf...) + rawData = append(rawData, data...) + eventLen := int(h.EventSize) - replication.EventHeaderSize + if len(data) != eventLen { + err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) + return err + } + _, err = parse.ParseEvent(h, data, rawData) + if err != nil { + return err + } + if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { + break + } + } + if _, err = f.Seek(pos, 0); err != nil { + return err + } + } + + if pos == 0 { + if err = seekZore(); err != nil { + return err + } + } + return parseBinlogWithFilter(f, parse, filter, fx) +} diff --git a/parse_test.go b/parse_test.go index 9c80134..5b5dcdc 100644 --- a/parse_test.go +++ b/parse_test.go @@ -3,10 +3,26 @@ package binlog import ( "fmt" "testing" + "time" ) func TestParse(t *testing.T) { - ParseBinlogFile("./test/test-mysql-bin", func(transaction Transaction) { + ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) { + fmt.Println(transaction) + }) +} + +func TestParseFilter(t *testing.T) { + ParseBinlogWithFilter("./test/mysql-bin.000023", 0, BinlogFilter{ + IncludeGtid: "", + ExcludeGtid: "", + StartPos: 0, + EndPos: 0, + StartDate: time.Time{}, + EndDate: time.Time{}, + BigThan: 0, + SmallThan: 0, + }, func(transaction Transaction) { fmt.Println(transaction) }) }