package binlog import ( "b612.me/mysql/gtid" "b612.me/staros" "bytes" "fmt" "github.com/starainrt/go-mysql/replication" "io" "os" "strings" "time" ) type TxDetail struct { StartPos int `json:"startPos"` EndPos int `json:"endPos"` RowCount int `json:"rowCount"` Timestamp int64 `json:"timestamp"` Time time.Time `json:"time"` Sql string `json:"sql"` Db string `json:"db"` Table string `json:"table"` SqlType string `json:"sqlType"` CompressionType string `json:"compressionType"` Rows [][]interface{} } const ( STATUS_PREPARE uint8 = iota STATUS_BEGIN STATUS_COMMIT STATUS_ROLLBACK ) type Transaction struct { GTID string `json:"gtid"` Timestamp int64 `json:"timestamp"` Time time.Time `json:"time"` StartPos int `json:"startPos"` EndPos int `json:"endPos"` Size int `json:"size"` RowsCount int `json:"rowsCount"` Status uint8 `json:"status"` sqlOrigin []string `json:"sqlOrigin"` Txs []TxDetail `json:"txs"` validSchemaCount int } func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } func parseOneBinlog(path string, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if f != nil { defer f.Close() } if err != nil { return err } fileTypeBytes := int64(4) b := make([]byte, fileTypeBytes) // 读取binlog头 if _, err = f.Read(b); err != nil { return err } else if !bytes.Equal(b, replication.BinLogFileHeader) { //不是binlog格式 return err } // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { return err } return parseBinlogDetail(f, fx) } func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) // process: 0, continue: 1, break: 2, EOF: 3 var ( err error n int64 tbMapPos uint32 = 0 ) var tx Transaction currentGtid := "" for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { f(tx) } return nil } else if err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } //h.Dump(os.Stdout) data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } var e replication.Event e, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event } //e.Dump(os.Stdout) //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data evs := ParseBinlogEvent(binEvent) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } switch ev.Type { case "gtid": if currentGtid != "" { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } tx.Size = tx.EndPos - tx.StartPos if f != nil { if !f(tx) { return nil } } } currentGtid = ev.Data tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), } case "": tx.EndPos = int(h.LogPos) continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: status := STATUS_PREPARE if ev.Type == "query" { switch strings.ToLower(ev.Data) { case "begin": status = STATUS_BEGIN case "commit": status = STATUS_COMMIT case "rollback": status = STATUS_ROLLBACK } tx.Status = status } tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: ev.DB, Table: ev.TB, Sql: ev.Data, SqlType: ev.Type, Rows: ev.Rows, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), CompressionType: ev.CompressionType, }) } } } } type BinlogEvent struct { Type string DB string TB string Data string RowCnt uint32 Rows [][]interface{} CompressionType string } func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { var res []BinlogEvent var sig BinlogEvent switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: //ge := ev.Event.(*replication.GTIDEvent) sig.Data = "anonymous-gtid-event:1" sig.Type = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "insert" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "update" sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 sig.Rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: //replication.XID_EVENT, //replication.TABLE_MAP_EVENT: wrEvent := ev.Event.(*replication.RowsEvent) sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "delete" sig.RowCnt = uint32(len(wrEvent.Rows)) sig.Rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) sig.Data = string(queryEvent.Query) sig.Type = "rowsquery" case replication.QUERY_EVENT: queryEvent := ev.Event.(*replication.QueryEvent) sig.DB = string(queryEvent.Schema) sig.Data = string(queryEvent.Query) sig.Type = "query" case replication.MARIADB_GTID_EVENT: // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). //https://mariadb.com/kb/en/library/gtid_event/ sig.Data = "begin" sig.Type = "query" case replication.XID_EVENT: // XID_EVENT represents commit。rollback transaction not in binlog sig.Data = "commit" sig.Type = "query" case replication.GTID_EVENT: ge := ev.Event.(*replication.GTIDEvent) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err == nil { sig.Data = gid.String() } sig.Type = "gtid" case replication.TRANSACTION_PAYLOAD_EVENT: ge := ev.Event.(*replication.TransactionPayloadEvent) for _, val := range ge.Events { res = append(res, ParseBinlogEvent(val)...) } for idx := range res { if ge.CompressionType == 0 { res[idx].CompressionType = "ZSTD" } else if ge.CompressionType != 255 { res[idx].CompressionType = "UNKNOWN" } } return res } res = append(res, sig) return res } type BinlogFilter struct { IncludeGtid string ExcludeGtid string IncludeTables []string ExcludeTables []string StartPos int EndPos int StartDate time.Time EndDate time.Time BigThan int SmallThan int OnlyShowGtid bool } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error var includeMap = make(map[string]bool) var excludeMap = make(map[string]bool) if len(filter.IncludeTables) != 0 { for _, v := range filter.IncludeTables { if len(strings.Split(v, ".")) != 2 { return fmt.Errorf("IncludeTable Name Is Invalid:%s", v) } includeMap[v] = true } } if len(filter.ExcludeTables) != 0 { for _, v := range filter.ExcludeTables { if len(strings.Split(v, ".")) != 2 { return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v) } excludeMap[v] = true } } matchTbs := func(db, tb string) bool { if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 { return true } if db == "" && tb == "" { return true } if _, ok := includeMap["*.*"]; ok { return true } if _, ok := excludeMap["*.*"]; ok { return false } if _, ok := includeMap[db+"."+tb]; ok { return true } if _, ok := excludeMap[db+"."+tb]; ok { return false } if _, ok := includeMap[db+".*"]; ok { return true } if _, ok := excludeMap[db+".*"]; ok { return false } if _, ok := includeMap["*."+tb]; ok { return true } if _, ok := excludeMap["*."+tb]; ok { return false } if len(includeMap) != 0 { return false } return true } if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { return err } subGtid = inGtid.Clone() } if filter.ExcludeGtid != "" { exGtid, err = gtid.Parse(filter.ExcludeGtid) if err != nil { return err } } // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64 tbMapPos uint32 = 0 skipTillNext bool = false ) var tx Transaction currentGtid := "" callFn := func(tx Transaction) bool { if fn == nil { return true } if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { return true } if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { return true } if filter.StartPos != 0 && filter.StartPos > tx.StartPos { return true } if filter.EndPos != 0 && filter.EndPos < tx.EndPos { return true } if filter.BigThan != 0 && filter.BigThan > tx.Size { return true } if filter.SmallThan != 0 && filter.SmallThan < tx.Size { return true } if !filter.OnlyShowGtid && tx.validSchemaCount == 0 { return true } return fn(tx) } for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { if tx.Time.IsZero() { return nil } idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } if filter.OnlyShowGtid { tx.EndPos = tx.StartPos } tx.Size = tx.EndPos - tx.StartPos callFn(tx) return nil } else if err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } //fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT)) if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } if skipTillNext && h.EventType != replication.GTID_EVENT { continue } if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT { continue } //h.Dump(os.Stdout) data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } var e replication.Event e, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.TABLE_MAP_EVENT { tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event } //e.Dump(os.Stdout) //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data evs := ParseBinlogEvent(binEvent) for _, ev := range evs { startPos := 0 if ev.Type == "query" || ev.Type == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } else { startPos = int(tbMapPos) //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } if filter.EndPos != 0 && startPos > filter.EndPos { skipTillNext = true continue } if filter.StartPos != 0 && startPos < filter.EndPos { skipTillNext = true continue } switch ev.Type { case "gtid": if skipTillNext { skipTillNext = false } if currentGtid != "" { idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { v.Sql = tx.sqlOrigin[idx] idx++ } tx.RowsCount += v.RowCount tx.Txs[k] = v } if filter.OnlyShowGtid { tx.EndPos = startPos - 1 } tx.Size = tx.EndPos - tx.StartPos if !callFn(tx) { return nil } if subGtid != nil { subGtid.Sub(tx.GTID) if subGtid.EventCount() == 0 { return nil } } tx = Transaction{} } currentGtid = ev.Data if inGtid != nil { if c, _ := inGtid.Contain(ev.Data); !c { currentGtid = "" skipTillNext = true continue } } if exGtid != nil { if c, _ := exGtid.Contain(ev.Data); c { currentGtid = "" skipTillNext = true continue } } tx = Transaction{ GTID: ev.Data, StartPos: startPos, Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), } case "": tx.EndPos = int(h.LogPos) continue case "rowsquery": tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) status := STATUS_PREPARE if ev.Type == "query" { switch strings.ToLower(ev.Data) { case "begin": status = STATUS_BEGIN case "commit": status = STATUS_COMMIT case "rollback": status = STATUS_ROLLBACK } tx.Status = status } if !matchTbs(ev.DB, ev.TB) { continue } if ev.DB != "" && ev.TB != "" { tx.validSchemaCount++ } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), Db: ev.DB, Table: ev.TB, Sql: ev.Data, SqlType: ev.Type, Rows: ev.Rows, RowCount: int(ev.RowCnt), Timestamp: int64(h.Timestamp), Time: time.Unix(int64(h.Timestamp), 0), CompressionType: ev.CompressionType, }) } } } } func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { defer func() { recover() }() if !staros.Exists(path) { return os.ErrNotExist } f, err := os.Open(path) if f != nil { defer f.Close() } if err != nil { return err } parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) seekZore := func() error { fileTypeBytes := int64(4) b := make([]byte, fileTypeBytes) // 读取binlog头 if _, err = f.Read(b); err != nil { return err } else if !bytes.Equal(b, replication.BinLogFileHeader) { //不是binlog格式 return err } // must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil { return err } return nil } if pos != 0 { if err = seekZore(); err != nil { return err } for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(f, headBuf); err != nil { return err } var h *replication.EventHeader h, err = parse.ParseHeader(headBuf) if err != nil { return err } if h.EventSize <= uint32(replication.EventHeaderSize) { err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize) return err } var buf bytes.Buffer if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil { err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n) return err } data := buf.Bytes() var rawData []byte rawData = append(rawData, headBuf...) rawData = append(rawData, data...) eventLen := int(h.EventSize) - replication.EventHeaderSize if len(data) != eventLen { err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen) return err } _, err = parse.ParseEvent(h, data, rawData) if err != nil { return err } if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT { break } } if _, err = f.Seek(pos, 0); err != nil { return err } } if pos == 0 { if err = seekZore(); err != nil { return err } } return parseBinlogWithFilter(f, parse, filter, fx) }