From 46c68a68e876134fdddf554e6c0509c32f4174c7 Mon Sep 17 00:00:00 2001 From: starainrt Date: Mon, 3 Jul 2023 18:39:59 +0800 Subject: [PATCH] update --- go.mod | 2 +- go.sum | 4 +- main.go | 324 +++++++++++++++++++++------ vendor/b612.me/mysql/binlog/parse.go | 164 +++++++++++--- vendor/modules.txt | 2 +- 5 files changed, 397 insertions(+), 99 deletions(-) diff --git a/go.mod b/go.mod index fb8d2d3..06d33b4 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module binlog go 1.20 require ( - b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 + b612.me/mysql/binlog v0.0.0-20230703093600-b1b0733e53da b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/starlog v1.3.2 b612.me/staros v1.1.6 diff --git a/go.sum b/go.sum index f042f96..dfd9f62 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 h1:6U/hChR8T9L9v+0olHpJRlx4iDiFj1e5psWVjP668jo= -b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= +b612.me/mysql/binlog v0.0.0-20230703093600-b1b0733e53da h1:R7ifW7GrrkXldY8qpsHIEBNiUauM7aWb6rHsEbxy29A= +b612.me/mysql/binlog v0.0.0-20230703093600-b1b0733e53da/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= diff --git a/main.go b/main.go index ec22d90..43dcf55 100644 --- a/main.go +++ b/main.go @@ -5,6 +5,7 @@ import ( "b612.me/mysql/gtid" "b612.me/starlog" "b612.me/staros" + "errors" "fmt" "github.com/spf13/cobra" "github.com/tealeg/xlsx" @@ -16,32 +17,38 @@ import ( ) var ( - bigThan int - smallThan int - startPos int - endPos int - startTime int64 - endTime int64 - includeGtid string - excludeGtid string - filePath []string - outPath string - vbo bool - skipquery bool - pos int64 - prefix string - outasRow bool - counts int + bigThan int + smallThan int + startPos int + endPos int + startTime int64 + endTime int64 + includeDTs []string + excludeDTs []string + includeGtid string + excludeGtid string + outPath string + vbo bool + skipquery bool + pos int64 + prefix string + outasRow bool + counts int + fourceParse bool + timeBigThan int + timeSmallThan int + outType string ) func init() { + cmd.Flags().IntVar(&timeBigThan, "cost-after", 0, "show transactions cost big than x seconds") + cmd.Flags().IntVar(&timeSmallThan, "cost-less", 0, "show transactions cost below x seconds") cmd.Flags().IntVarP(&counts, "count", "c", 0, "counts of binlog transaction") cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog") cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog") - cmd.Flags().IntVarP(&endPos, "end-pos", "E", 0, "endPos of binlog") + cmd.Flags().IntVarP(&endPos, "end-pos", "F", 0, "endPos of binlog") cmd.Flags().StringVarP(&includeGtid, "include-gtid", "i", "", "include gtid") cmd.Flags().StringVarP(&excludeGtid, "exclude-gtid", "e", "", "exclude gtid") - cmd.Flags().StringSliceVarP(&filePath, "path", "p", []string{}, "binlog file path") cmd.Flags().StringVarP(&outPath, "savepath", "o", "", "output excel path") cmd.Flags().Int64Var(&startTime, "starttime", 0, "start unix timestamp") cmd.Flags().Int64Var(&endTime, "endtime", 0, "end unix timestamp") @@ -51,6 +58,10 @@ func init() { cmd.Flags().IntVar(&smallThan, "small", 0, "show tx small than x bytes") cmd.Flags().StringVar(&prefix, "prefix", "mysql-bin", "mysql binlog prefix") cmd.Flags().BoolVarP(&outasRow, "row-mode", "r", false, "output as row") + cmd.Flags().StringSliceVarP(&includeDTs, "include-tables", "I", []string{}, "whitelist schemas and tables,eg: schema.table") + cmd.Flags().StringSliceVarP(&excludeDTs, "exclude-tables", "E", []string{}, "blacklist schemas and tables,eg: schema.table") + cmd.Flags().BoolVar(&fourceParse, "force-parse-all", false, "force parse all events in binlog") + cmd.Flags().StringVarP(&outType, "output-type", "t", "xlsx", "output file type,now support xlsx,txt") } var cmd = &cobra.Command{ @@ -58,12 +69,14 @@ var cmd = &cobra.Command{ Short: "binlog parser", Long: "binlog parser", Run: func(cmd *cobra.Command, args []string) { - if len(filePath) == 0 { + // for test + // cc05c88c-0683-11ee-8149-fa163e8c148c:44969805 + if len(args) == 0 { starlog.Warningln("Please enter a binlog path or folder") return } now := time.Now() - ParseBinlog() + ParseBinlog(args) cost := time.Now().Sub(now).Seconds() fmt.Println("") fmt.Printf("Time Cost:%.2fs", cost) @@ -74,14 +87,16 @@ func main() { cmd.Execute() } -func ParseBinlog() { +func ParseBinlog(filePath []string) { + var err error foundCount := 0 var totalGtid *gtid.Gtid - var owrt *xlsx.File + var out Outfile if outPath != "" { - owrt, err = prepareXlsx() + out, err = InitOutput(outType, outPath) if err != nil { + starlog.Errorln(err) return } } @@ -94,19 +109,21 @@ func ParseBinlog() { eTime = time.Unix(endTime, 0) } onlyGtid := false - if !vbo && outPath == "" { + if !vbo && outPath == "" && !fourceParse { onlyGtid = true } var filter = binlog.BinlogFilter{ - IncludeGtid: includeGtid, - ExcludeGtid: excludeGtid, - StartPos: startPos, - EndPos: endPos, - StartDate: sTime, - EndDate: eTime, - BigThan: bigThan, - SmallThan: smallThan, - OnlyShowGtid: onlyGtid, + IncludeTables: includeDTs, + ExcludeTables: excludeDTs, + IncludeGtid: includeGtid, + ExcludeGtid: excludeGtid, + StartPos: startPos, + EndPos: endPos, + StartDate: sTime, + EndDate: eTime, + BigThan: bigThan, + SmallThan: smallThan, + OnlyShowGtid: onlyGtid, } var cGtid *gtid.Gtid proc := make(chan string, 1000) @@ -133,6 +150,16 @@ func ParseBinlog() { }() } err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) bool { + if timeBigThan != 0 { + if tx.TxEndTime-tx.TxStartTime < int64(timeBigThan) { + return true + } + } + if timeSmallThan != 0 { + if tx.TxEndTime-tx.TxStartTime > int64(timeSmallThan) { + return true + } + } foundCount++ if cGtid == nil { cGtid, _ = gtid.Parse(tx.GTID) @@ -157,14 +184,16 @@ func ParseBinlog() { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } - _, sql := generateRowSql(t) - fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n", - tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) + sqls := generateOfflineRowSql(t) + for _, sql := range sqls { + fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nCost\t%v\nSchema:%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, tx.TxEndTime-tx.TxStartTime, t.Db+"."+t.Table, t.RowCount, t.Sql, sql.SQL) + } } } } if outPath != "" { - add2Xlsx(owrt, tx, foundCount) + AddOutfile(out, tx, foundCount) } if counts > 0 && foundCount >= counts { return false @@ -178,8 +207,8 @@ func ParseBinlog() { if cGtid != nil { cGtidStr = cGtid.String() } - if outPath != "" { - err = owrt.Save(outPath) + if outPath != "" && out.Type == "xlsx" { + err = FinishOutfile(out, outPath) if err != nil { starlog.Errorln(err) return cGtidStr @@ -210,7 +239,10 @@ func ParseBinlog() { } } if outPath != "" { - owrt.Save(outPath) + err := FinishOutfile(out, outPath) + if err != nil { + starlog.Errorln(err) + } } fmt.Println("") if len(gtidRes) != 0 { @@ -226,12 +258,12 @@ func ParseBinlog() { fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount) } -func prepareXlsx() (*xlsx.File, error) { +func prepareXlsx() (Outfile, error) { owrt := xlsx.NewFile() res, err := owrt.AddSheet("结果") if err != nil { starlog.Errorln(err) - return owrt, err + return Outfile{}, err } title := res.AddRow() title.AddCell().SetValue("序号") @@ -242,6 +274,8 @@ func prepareXlsx() (*xlsx.File, error) { title.AddCell().SetValue("EndPos") title.AddCell().SetValue("事务大小") title.AddCell().SetValue("影响行数") + title.AddCell().SetValue("事务状态") + title.AddCell().SetValue("事务耗时") title.AddCell().SetValue("压缩类型") title.AddCell().SetValue("单语句StartPos") title.AddCell().SetValue("单语句EndPos") @@ -258,11 +292,19 @@ func prepareXlsx() (*xlsx.File, error) { res.SetColWidth(1, 1, 40) res.SetColWidth(3, 6, 6) res.SetColWidth(7, 7, 5) - res.SetColWidth(16, 16, 40) - return owrt, owrt.Save(outPath) + res.SetColWidth(18, 18, 40) + return Outfile{ + Type: "xlsx", + File: owrt, + }, owrt.Save(outPath) +} + +type Outfile struct { + Type string + File interface{} } -func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) { +func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) error { for k, t := range tx.Txs { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue @@ -277,6 +319,17 @@ func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) { r.AddCell().SetValue(tx.EndPos) r.AddCell().SetValue(tx.Size) r.AddCell().SetValue(tx.RowsCount) + status := "PREPARE" + switch tx.Status { + case binlog.STATUS_BEGIN: + status = "BEGIN" + case binlog.STATUS_COMMIT: + status = "COMMIT" + case binlog.STATUS_ROLLBACK: + status = "ROLLBACK" + } + r.AddCell().SetValue(status) + r.AddCell().SetValue(tx.TxEndTime - tx.TxStartTime) if t.CompressionType == "" { r.AddCell().SetValue("NONE") } else { @@ -299,15 +352,137 @@ func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) { r.AddCell().SetValue(t.Rows) continue } - idx, sql := generateRowSql(t) - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue(idx + 1) - r.AddCell().SetValue(sql) + sqls := generateOfflineRowSql(t) + for _, sql := range sqls { + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(sql.ID) + r.AddCell().SetValue(sql.SQL) + } + } + return nil +} + +func InitOutput(types string, outPath string) (Outfile, error) { + switch types { + case "xlsx": + return prepareXlsx() + case "txt": + return prepareTxtFile(outPath) + default: + return Outfile{}, errors.New("Not Support This Outfile Format:" + types) + } +} + +func AddOutfile(of Outfile, tx binlog.Transaction, foundCount int) error { + switch of.Type { + case "xlsx": + owrt, ok := of.File.(*xlsx.File) + if !ok { + return errors.New("failed!,not a valid xlsx pointer") + } + return add2Xlsx(owrt, tx, foundCount) + case "txt": + f, ok := of.File.(*os.File) + if !ok { + return errors.New("failed!,not a valid txtfile pointer") + } + return addTxtFile(f, tx, foundCount) + default: + return errors.New("Not Support This Outfile Format:" + of.Type) + } +} + +func FinishOutfile(of Outfile, path string) error { + switch of.Type { + case "xlsx": + owrt, ok := of.File.(*xlsx.File) + if !ok { + return errors.New("failed!,not a valid xlsx pointer") + } + return owrt.Save(path) + case "txt": + f, ok := of.File.(*os.File) + if !ok { + return errors.New("failed!,not a valid txtfile pointer") + } + f.Sync() + return f.Close() + default: + return errors.New("Not Support This Outfile Format:" + of.Type) + } +} + +func prepareTxtFile(path string) (Outfile, error) { + txtFile, err := os.OpenFile(path, os.O_TRUNC|os.O_RDWR|os.O_CREATE, 0644) + if err != nil { + return Outfile{}, err + } + return Outfile{ + Type: "txt", + File: txtFile, + }, err +} + +func addTxtFile(f *os.File, tx binlog.Transaction, foundCount int) error { + for _, t := range tx.Txs { + if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { + continue + } + + addRow := func() error { + status := "PREPARE" + switch tx.Status { + case binlog.STATUS_BEGIN: + status = "BEGIN" + case binlog.STATUS_COMMIT: + status = "COMMIT" + case binlog.STATUS_ROLLBACK: + status = "ROLLBACK" + } + if t.CompressionType == "" { + t.CompressionType = "NONE" + } + _, err := f.WriteString(fmt.Sprintf("\n\nNumber:%v\nGTID:%s\nTime%s\nTotalSize:%v TotalCost:%v\n"+ + "StartPos:%v EndPos:%v RowsCount:%v\nStatus:%v\n"+ + "TxStartPos:%v TxEndPos:%v TxRowsCount:%v CompressionType:%s\n"+ + "Schema:%v Type:%s TxTime:%v\nSQLOrigin:%v\n", + foundCount, tx.GTID, tx.Time, tx.Size, tx.TxEndTime-tx.TxStartTime, + tx.StartPos, tx.EndPos, tx.RowsCount, status, + t.StartPos, t.EndPos, t.RowCount, t.CompressionType, + t.Db+"."+t.Table, t.SqlType, t.Time, strings.ReplaceAll(t.Sql, "\n", " "))) + if err != nil { + return err + } + return nil + } + if !outasRow { + return addRow() + } + sqls := generateOfflineRowSql(t) + for _, sql := range sqls { + err := addRow() + if err != nil { + return err + } + if sql.SQL == "" { + return nil + } + f.WriteString("SQL:" + sql.SQL + "\n") + } + return nil } + return nil } -func generateRowSql(t binlog.TxDetail) (int, interface{}) { +type SQLInfo struct { + ID int + SQL string + Origin [][]interface{} +} + +func generateOfflineRowSql(t binlog.TxDetail) []SQLInfo { + var sqlList = make([]SQLInfo, 0, len(t.Rows)) switch t.SqlType { case "insert": for idx, rows := range t.Rows { @@ -329,7 +504,12 @@ func generateRowSql(t binlog.TxDetail) (int, interface{}) { if setence != "" && len(setence) > 2 { setence = setence[:len(setence)-2] } - return idx + 1, fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence) + sqlList = append(sqlList, + SQLInfo{ + ID: idx + 1, + SQL: fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence), + }, + ) } case "update": var sql string @@ -343,15 +523,15 @@ func generateRowSql(t binlog.TxDetail) (int, interface{}) { for idxf, row := range rows { switch row.(type) { case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) + setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) + setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec) default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) } } if setence != "" && len(setence) > 2 { @@ -362,7 +542,12 @@ func generateRowSql(t binlog.TxDetail) (int, interface{}) { continue } sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) - return (idxc + 1) / 2, sql + sqlList = append(sqlList, + SQLInfo{ + ID: (idxc + 1) / 2, + SQL: sql, + }, + ) } case "delete": for idx, rows := range t.Rows { @@ -371,26 +556,33 @@ func generateRowSql(t binlog.TxDetail) (int, interface{}) { for idxf, row := range rows { switch row.(type) { case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''")) + setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL") + setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec) default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) + setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) } } if setence != "" && len(setence) > 2 { setence = setence[:len(setence)-len(spec)] } sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) - return idx + 1, sql + + sqlList = append(sqlList, + SQLInfo{ + ID: idx + 1, + SQL: sql, + }, + ) } default: - return 1, t.Rows + sqlList = append(sqlList, SQLInfo{Origin: t.Rows}) + return sqlList } - return 0, "" + return sqlList } diff --git a/vendor/b612.me/mysql/binlog/parse.go b/vendor/b612.me/mysql/binlog/parse.go index d4e0e68..5d62306 100644 --- a/vendor/b612.me/mysql/binlog/parse.go +++ b/vendor/b612.me/mysql/binlog/parse.go @@ -8,33 +8,45 @@ import ( "github.com/starainrt/go-mysql/replication" "io" "os" + "strings" "time" ) type TxDetail struct { - StartPos int - EndPos int - RowCount int - Timestamp int64 - Time time.Time - Sql string - Db string - Table string - SqlType string - CompressionType string + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + RowCount int `json:"rowCount"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + Sql string `json:"sql"` + Db string `json:"db"` + Table string `json:"table"` + SqlType string `json:"sqlType"` + CompressionType string `json:"compressionType"` Rows [][]interface{} } +const ( + STATUS_PREPARE uint8 = iota + STATUS_BEGIN + STATUS_COMMIT + STATUS_ROLLBACK +) + type Transaction struct { - GTID string - Timestamp int64 - Time time.Time - StartPos int - EndPos int - Size int - RowsCount int - sqlOrigin []string - Txs []TxDetail + GTID string `json:"gtid"` + Timestamp int64 `json:"timestamp"` + Time time.Time `json:"time"` + StartPos int `json:"startPos"` + EndPos int `json:"endPos"` + Size int `json:"size"` + RowsCount int `json:"rowsCount"` + Status uint8 `json:"status"` + TxStartTime int64 `json:"txStartTime"` + TxEndTime int64 `json:"txEndTime"` + sqlOrigin []string `json:"sqlOrigin"` + Txs []TxDetail `json:"txs"` + validSchemaCount int } func (t Transaction) GetSqlOrigin() []string { @@ -199,6 +211,18 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { tx.EndPos = int(h.LogPos) tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: + status := STATUS_PREPARE + if ev.Type == "query" { + switch strings.ToLower(ev.Data) { + case "begin": + status = STATUS_BEGIN + case "commit": + status = STATUS_COMMIT + case "rollback": + status = STATUS_ROLLBACK + } + tx.Status = status + } tx.EndPos = int(h.LogPos) tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, @@ -311,20 +335,76 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { } type BinlogFilter struct { - IncludeGtid string - ExcludeGtid string - StartPos int - EndPos int - StartDate time.Time - EndDate time.Time - BigThan int - SmallThan int - OnlyShowGtid bool + IncludeGtid string + ExcludeGtid string + IncludeTables []string + ExcludeTables []string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int + OnlyShowGtid bool } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error + var includeMap = make(map[string]bool) + var excludeMap = make(map[string]bool) + if len(filter.IncludeTables) != 0 { + for _, v := range filter.IncludeTables { + if len(strings.Split(v, ".")) != 2 { + return fmt.Errorf("IncludeTable Name Is Invalid:%s", v) + } + includeMap[v] = true + } + } + if len(filter.ExcludeTables) != 0 { + for _, v := range filter.ExcludeTables { + if len(strings.Split(v, ".")) != 2 { + return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v) + } + excludeMap[v] = true + } + } + matchTbs := func(db, tb string) bool { + if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 { + return true + } + if db == "" && tb == "" { + return true + } + if _, ok := includeMap["*.*"]; ok { + return true + } + if _, ok := excludeMap["*.*"]; ok { + return false + } + if _, ok := includeMap[db+"."+tb]; ok { + return true + } + if _, ok := excludeMap[db+"."+tb]; ok { + return false + } + if _, ok := includeMap[db+".*"]; ok { + return true + } + if _, ok := excludeMap[db+".*"]; ok { + return false + } + if _, ok := includeMap["*."+tb]; ok { + return true + } + if _, ok := excludeMap["*."+tb]; ok { + return false + } + if len(includeMap) != 0 { + return false + } + return true + } if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { @@ -338,7 +418,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter return err } } - // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64 @@ -370,6 +449,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter if filter.SmallThan != 0 && filter.SmallThan < tx.Size { return true } + if !filter.OnlyShowGtid && tx.validSchemaCount == 0 { + return true + } return fn(tx) } for { @@ -501,6 +583,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter currentGtid = ev.Data if inGtid != nil { if c, _ := inGtid.Contain(ev.Data); !c { + tx = Transaction{} currentGtid = "" skipTillNext = true continue @@ -527,6 +610,29 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) default: tx.EndPos = int(h.LogPos) + status := STATUS_PREPARE + if ev.Type == "query" { + switch strings.ToLower(ev.Data) { + case "begin": + if tx.TxStartTime == 0 { + tx.TxStartTime = int64(h.Timestamp) + } + status = STATUS_BEGIN + case "commit": + status = STATUS_COMMIT + tx.TxEndTime = int64(h.Timestamp) + case "rollback": + status = STATUS_ROLLBACK + tx.TxEndTime = int64(h.Timestamp) + } + tx.Status = status + } + if !matchTbs(ev.DB, ev.TB) { + continue + } + if ev.DB != "" && ev.TB != "" { + tx.validSchemaCount++ + } tx.Txs = append(tx.Txs, TxDetail{ StartPos: startPos, EndPos: int(h.LogPos), diff --git a/vendor/modules.txt b/vendor/modules.txt index ee831b4..9bec65b 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 +# b612.me/mysql/binlog v0.0.0-20230703093600-b1b0733e53da ## explicit; go 1.20 b612.me/mysql/binlog # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044