package binlog import ( "b612.me/mysql/gtid" "b612.me/staros" "bytes" "fmt" "github.com/starainrt/go-mysql/replication" "io" "os" "time" ) type TxDetail struct { StartPos int EndPos int RowCount int Timestamp int64 Time time.Time Sql string Db string Table string SqlType string Rows [][]interface{} } type Transaction struct { GTID string Timestamp int64 Time time.Time StartPos int EndPos int Size int RowsCount int sqlOrigin []string Txs []TxDetail } func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } func ParseBinlogFile(path string, fx func(transaction Transaction)) error { return parseOneBinlog(path, fx) } func parseOneBinlog(path string, fx func(Transaction)) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if f != nil { defer f.Close() } if err != nil { return err } fileTypeBytes := int64(4) b := make([]byte, fileTypeBytes) // 读取binlog头 if _, err = f.Read(b); err != nil { return err } else if !bytes.Equal(b, replication.BinLogFileHeader) { //不是binlog格式 return err } // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { return err } return parseBinlogDetail(f, fx) } func parseBinlogDetail(r io.Reader, f func(Transaction)) error { parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) // process: 0, continue: 1, break: 2, EOF: 3 var ( err error n int64 db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 tbMapPos uint32 = 0 rows [][]interface{} ) var tx Transaction currentGtid := "" for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { f(tx) } return nil } else if err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } //h.Dump(os.Stdout) data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } var e replication.Event e, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event } //e.Dump(os.Stdout) //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) startPos := 0 if sqlType == "query" || sqlType == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } switch sqlType { case "gtid": if currentGtid != "" { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { f(tx) } } currentGtid = sql tx = Transaction{ GTID: sql, StartPos: startPos, Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), } case "": tx.EndPos = int(h.LogPos) continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, sql) default: tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: db, Table: tb, Sql: sql, SqlType: sqlType, Rows: rows, RowCount: int(rowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), }) } } } func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) { var ( db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 rows [][]interface{} ) switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: //ge := ev.Event.(*replication.GTIDEvent) sql = "anonymous-gtid-event:1" sqlType = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "insert" rowCnt = uint32(len(wrEvent.Rows)) rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "update" rowCnt = uint32(len(wrEvent.Rows)) / 2 rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: //replication.XID_EVENT, //replication.TABLE_MAP_EVENT: wrEvent := ev.Event.(*replication.RowsEvent) db = string(wrEvent.Table.Schema) tb = string(wrEvent.Table.Table) sqlType = "delete" rowCnt = uint32(len(wrEvent.Rows)) rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) sql = string(queryEvent.Query) sqlType = "rowsquery" case replication.QUERY_EVENT: queryEvent := ev.Event.(*replication.QueryEvent) db = string(queryEvent.Schema) sql = string(queryEvent.Query) sqlType = "query" case replication.MARIADB_GTID_EVENT: // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). //https://mariadb.com/kb/en/library/gtid_event/ sql = "begin" sqlType = "query" case replication.XID_EVENT: // XID_EVENT represents commit。rollback transaction not in binlog sql = "commit" sqlType = "query" case replication.GTID_EVENT: ge := ev.Event.(*replication.GTIDEvent) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err == nil { sql = gid.String() } sqlType = "gtid" } return db, tb, sqlType, sql, rowCnt, rows } type BinlogFilter struct { IncludeGtid string ExcludeGtid string StartPos int EndPos int StartDate time.Time EndDate time.Time BigThan int SmallThan int } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { var inGtid, exGtid *gtid.Gtid var err error if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { return err } } if filter.ExcludeGtid != "" { exGtid, err = gtid.Parse(filter.ExcludeGtid) if err != nil { return err } } // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64 db string = "" tb string = "" sql string = "" sqlType string = "" rowCnt uint32 = 0 tbMapPos uint32 = 0 skipTillNext bool = false rows [][]interface{} ) var tx Transaction currentGtid := "" callFn := func(tx Transaction) { if fn == nil { return } if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { return } if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { return } if filter.StartPos != 0 && filter.StartPos > tx.StartPos { return } if filter.EndPos != 0 && filter.EndPos < tx.EndPos { return } if filter.BigThan != 0 && filter.BigThan > tx.Size { return } if filter.SmallThan != 0 && filter.SmallThan < tx.Size { return } fn(tx) } for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos callFn(tx) return nil } else if err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } if skipTillNext && h.EventType != replication.GTID_EVENT { continue } //h.Dump(os.Stdout) data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } var e replication.Event e, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event } //e.Dump(os.Stdout) //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) startPos := 0 if sqlType == "query" || sqlType == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } switch sqlType { case "gtid": if skipTillNext { skipTillNext = false } if currentGtid != "" { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos callFn(tx) } currentGtid = sql if inGtid != nil { if c, _ := inGtid.Contain(sql); !c { currentGtid = "" skipTillNext = true continue } } if exGtid != nil { if c, _ := exGtid.Contain(sql); c { currentGtid = "" skipTillNext = true continue } } tx = Transaction{ GTID: sql, StartPos: startPos, Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), } case "": tx.EndPos = int(h.LogPos) continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, sql) default: tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: db, Table: tb, Sql: sql, SqlType: sqlType, Rows: rows, RowCount: int(rowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), }) } } } func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { defer func() { recover() }() if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if f != nil { defer f.Close() } if err != nil { return err } parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) seekZore := func() error { fileTypeBytes := int64(4) b := make([]byte, fileTypeBytes) // 读取binlog头 if _, err = f.Read(b); err != nil { return err } else if !bytes.Equal(b, replication.BinLogFileHeader) { //不是binlog格式 return err } // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { return err } return nil } if pos != 0 { if err = seekZore(); err != nil { return err } for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(f, headBuf); err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } _, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { break } } if _, err = f.Seek(pos, 0); err != nil { return err } } if pos == 0 { if err = seekZore(); err != nil { return err } } return parseBinlogWithFilter(f, parse, filter, fx) }