package main import ( "b612.me/mysql/binlog" "b612.me/mysql/gtid" "b612.me/starlog" "b612.me/staros" "b612.me/startext" "errors" "fmt" "github.com/spf13/cobra" "github.com/tealeg/xlsx" "os" "path/filepath" "regexp" "strings" "time" ) var ( bigThan int smallThan int startPos int endPos int startTime int64 endTime int64 includeDTs []string excludeDTs []string includeGtid string excludeGtid string outPath string vbo bool skipquery bool pos int64 prefix string outasRow bool counts int fourceParse bool timeBigThan int timeSmallThan int outType string showBasic uint8 ) func init() { cmd.Flags().IntVar(&timeBigThan, "cost-after", 0, "show transactions cost big than x seconds") cmd.Flags().IntVar(&timeSmallThan, "cost-less", 0, "show transactions cost below x seconds") cmd.Flags().IntVarP(&counts, "count", "c", 0, "counts of binlog transaction") cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog") cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog") cmd.Flags().IntVarP(&endPos, "end-pos", "F", 0, "endPos of binlog") cmd.Flags().StringVarP(&includeGtid, "include-gtid", "i", "", "include gtid") cmd.Flags().StringVarP(&excludeGtid, "exclude-gtid", "e", "", "exclude gtid") cmd.Flags().StringVarP(&outPath, "savepath", "o", "", "output excel path") cmd.Flags().Int64Var(&startTime, "starttime", 0, "start unix timestamp") cmd.Flags().Int64Var(&endTime, "endtime", 0, "end unix timestamp") cmd.Flags().BoolVarP(&vbo, "verbose", "v", false, "show the detail verbose") cmd.Flags().BoolVarP(&skipquery, "skip-query", "s", true, "skip query write to xlsx like BEGIN COMMIT") cmd.Flags().IntVar(&bigThan, "big", 0, "show tx big than x bytes") cmd.Flags().IntVar(&smallThan, "small", 0, "show tx small than x bytes") cmd.Flags().StringVar(&prefix, "prefix", "mysql-bin", "mysql binlog prefix") cmd.Flags().BoolVarP(&outasRow, "row-mode", "r", false, "output as row") cmd.Flags().StringSliceVarP(&includeDTs, "include-tables", "I", []string{}, "whitelist schemas and tables,eg: schema.table") cmd.Flags().StringSliceVarP(&excludeDTs, "exclude-tables", "E", []string{}, "blacklist schemas and tables,eg: schema.table") cmd.Flags().BoolVar(&fourceParse, "force-parse-all", false, "force parse all events in binlog") cmd.Flags().StringVarP(&outType, "output-type", "t", "xlsx", "output file type,now support xlsx,txt") cmd.Flags().Uint8Var(&showBasic, "show-level", 255, "show level,value range 0-255") } var cmd = &cobra.Command{ Use: "", Short: "binlog parser by i@b612.me", Long: "binlog解析工具,支持离线binlog解析", Version: "0.1.0", Run: func(cmd *cobra.Command, args []string) { // for test // cc05c88c-0683-11ee-8149-fa163e8c148c:44969805 if len(args) == 0 { starlog.Warningln("Please enter a binlog path or folder") cmd.Help() return } now := time.Now() ParseBinlog(args) cost := time.Now().Sub(now).Seconds() fmt.Println("") fmt.Printf("Time Cost:%.2fs", cost) }, } func main() { cmd.Execute() } func ParseBinlog(filePath []string) { var err error foundCount := 0 var totalGtid *gtid.Gtid var out Outfile if outPath != "" { out, err = InitOutput(outType, outPath) if err != nil { starlog.Errorln(err) return } } getParser := func(fpath string) string { var sTime, eTime time.Time if startTime != 0 { sTime = time.Unix(startTime, 0) } if endTime != 0 { eTime = time.Unix(endTime, 0) } onlyGtid := false if !vbo && outPath == "" && !fourceParse { onlyGtid = true } var filter = binlog.BinlogFilter{ IncludeTables: includeDTs, ExcludeTables: excludeDTs, IncludeGtid: includeGtid, ExcludeGtid: excludeGtid, StartPos: startPos, EndPos: endPos, StartDate: sTime, EndDate: eTime, BigThan: bigThan, SmallThan: smallThan, OnlyShowGtid: onlyGtid, } var cGtid *gtid.Gtid proc := make(chan string, 1000) if !vbo { go func() { var latest string var count = uint16(0) for { select { case tmp := <-proc: if tmp == "end" { fmt.Println(latest) return } latest = tmp if count%10 == 0 { fmt.Print(latest) } count++ case <-time.After(time.Millisecond * 200): fmt.Print(latest) } } }() } err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) bool { if timeBigThan != 0 { if tx.TxEndTime-tx.TxStartTime < int64(timeBigThan) { return true } } if timeSmallThan != 0 { if tx.TxEndTime-tx.TxStartTime > int64(timeSmallThan) { return true } } foundCount++ if cGtid == nil { cGtid, _ = gtid.Parse(tx.GTID) } else { cGtid.Add(tx.GTID) } if totalGtid == nil { totalGtid, _ = gtid.Parse(tx.GTID) } else { totalGtid.Add(tx.GTID) } if !vbo { proc <- fmt.Sprintf("已找到%d个合法GTID\r", foundCount) } else { if !outasRow { switch showBasic { case 0: fmt.Printf("GTID:%s\nTime:%s\nSQL:%v\n\n", tx.GTID, tx.Time, tx.GetSqlOrigin()) default: fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n", tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs) } } else { fmt.Printf("-------------------------\nGTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v\n\n", tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size) for _, t := range tx.Txs { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } sqls := generateOfflineRowSql(t) for _, sql := range sqls { switch showBasic { case 0: fmt.Printf("GTID:\t%s\nTime:\t%s\nSQL:\t%+v\n\n", tx.GTID, t.Time, sql.SQL) case 1: fmt.Printf("GTID:\t%s\nTime:\t%s\nSQLOrigin:\t%+v\nSQL:\t%+v\n\n", tx.GTID, t.Time, t.Sql, sql.SQL) default: fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nCost\t%v\nSchema:%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n", tx.GTID, t.Time, t.StartPos, t.EndPos, tx.TxEndTime-tx.TxStartTime, t.Db+"."+t.Table, t.RowCount, t.Sql, sql.SQL) } } } } } if outPath != "" { AddOutfile(out, tx, foundCount) } if counts > 0 && foundCount >= counts { return false } return true }) if !vbo { time.Sleep(time.Millisecond * 500) } var cGtidStr string if cGtid != nil { cGtidStr = cGtid.String() } if outPath != "" && out.Type == "xlsx" { err = FinishOutfile(out, outPath) if err != nil { starlog.Errorln(err) return cGtidStr } } if err != nil { starlog.Errorln(err) return cGtidStr } return cGtidStr } var gtidRes [][]string for _, fp := range filePath { if staros.IsFolder(fp) { files, err := os.ReadDir(fp) if err != nil { starlog.Errorln("read folder failed:", err) return } rgp := regexp.MustCompile(`^` + prefix + `\.\d+$`) for _, v := range files { if !v.IsDir() && rgp.MatchString(v.Name()) { gtidRes = append(gtidRes, []string{v.Name(), getParser(filepath.Join(fp, v.Name()))}) } } } else { getParser(fp) } } if outPath != "" { err := FinishOutfile(out, outPath) if err != nil { starlog.Errorln(err) } } fmt.Println("") if len(gtidRes) != 0 { for _, v := range gtidRes { fmt.Printf("%s:%s\n", v[0], v[1]) } } fmt.Println("") allGtid := "" if totalGtid != nil { allGtid = totalGtid.String() } fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount) } func prepareXlsx() (Outfile, error) { owrt := xlsx.NewFile() res, err := owrt.AddSheet("结果") if err != nil { starlog.Errorln(err) return Outfile{}, err } title := res.AddRow() title.AddCell().SetValue("序号") title.AddCell().SetValue("GTID") title.AddCell().SetValue("时间") if showBasic > 1 { title.AddCell().SetValue("时间戳") title.AddCell().SetValue("StartPos") title.AddCell().SetValue("EndPos") title.AddCell().SetValue("事务大小") title.AddCell().SetValue("影响行数") title.AddCell().SetValue("事务状态") title.AddCell().SetValue("事务耗时") title.AddCell().SetValue("压缩类型") title.AddCell().SetValue("单语句StartPos") title.AddCell().SetValue("单语句EndPos") title.AddCell().SetValue("单语句时间") title.AddCell().SetValue("单语句影响行数") title.AddCell().SetValue("单语句影响库") title.AddCell().SetValue("单语句影响表") } title.AddCell().SetValue("SQL类型") title.AddCell().SetValue("具体SQL") title.AddCell().SetValue("从属事务编号") title.AddCell().SetValue("同事务行编号") title.AddCell().SetValue("行变更内容") res.SetColWidth(0, 0, 5) res.SetColWidth(1, 1, 40) res.SetColWidth(3, 6, 6) res.SetColWidth(7, 7, 5) res.SetColWidth(18, 18, 40) return Outfile{ Type: "xlsx", File: owrt, }, owrt.Save(outPath) } type Outfile struct { Type string File interface{} } func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) error { for k, t := range tx.Txs { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } addRow := func() *xlsx.Row { r := owrt.Sheets[0].AddRow() r.AddCell().SetValue(foundCount) r.AddCell().SetValue(tx.GTID) r.AddCell().SetValue(tx.Time.String()) if showBasic > 1 { r.AddCell().SetValue(tx.Timestamp) r.AddCell().SetValue(tx.StartPos) r.AddCell().SetValue(tx.EndPos) r.AddCell().SetValue(tx.Size) r.AddCell().SetValue(tx.RowsCount) status := "PREPARE" switch tx.Status { case binlog.STATUS_BEGIN: status = "BEGIN" case binlog.STATUS_COMMIT: status = "COMMIT" case binlog.STATUS_ROLLBACK: status = "ROLLBACK" } r.AddCell().SetValue(status) r.AddCell().SetValue(tx.TxEndTime - tx.TxStartTime) if t.CompressionType == "" { r.AddCell().SetValue("NONE") } else { r.AddCell().SetValue(t.CompressionType) } r.AddCell().SetValue(t.StartPos) r.AddCell().SetValue(t.EndPos) r.AddCell().SetValue(t.Time.String()) r.AddCell().SetValue(t.RowCount) r.AddCell().SetValue(t.Db) r.AddCell().SetValue(t.Table) } r.AddCell().SetValue(t.SqlType) r.AddCell().SetValue(t.Sql) return r } if !outasRow { r := addRow() r.AddCell().SetValue(k + 1) r.AddCell().SetValue(1) r.AddCell().SetValue(t.Rows) continue } sqls := generateOfflineRowSql(t) for _, sql := range sqls { r := addRow() r.AddCell().SetValue(k + 1) r.AddCell().SetValue(sql.ID) r.AddCell().SetValue(sql.SQL) } } return nil } func InitOutput(types string, outPath string) (Outfile, error) { switch types { case "xlsx": return prepareXlsx() case "txt": return prepareTxtFile(outPath) default: return Outfile{}, errors.New("Not Support This Outfile Format:" + types) } } func AddOutfile(of Outfile, tx binlog.Transaction, foundCount int) error { switch of.Type { case "xlsx": owrt, ok := of.File.(*xlsx.File) if !ok { return errors.New("failed!,not a valid xlsx pointer") } return add2Xlsx(owrt, tx, foundCount) case "txt": f, ok := of.File.(*os.File) if !ok { return errors.New("failed!,not a valid txtfile pointer") } return addTxtFile(f, tx, foundCount) default: return errors.New("Not Support This Outfile Format:" + of.Type) } } func FinishOutfile(of Outfile, path string) error { switch of.Type { case "xlsx": owrt, ok := of.File.(*xlsx.File) if !ok { return errors.New("failed!,not a valid xlsx pointer") } return owrt.Save(path) case "txt": f, ok := of.File.(*os.File) if !ok { return errors.New("failed!,not a valid txtfile pointer") } f.Sync() return f.Close() default: return errors.New("Not Support This Outfile Format:" + of.Type) } } func prepareTxtFile(path string) (Outfile, error) { txtFile, err := os.OpenFile(path, os.O_TRUNC|os.O_RDWR|os.O_CREATE, 0644) if err != nil { return Outfile{}, err } return Outfile{ Type: "txt", File: txtFile, }, err } func addTxtFile(f *os.File, tx binlog.Transaction, foundCount int) error { for _, t := range tx.Txs { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } addRow := func() error { status := "PREPARE" switch tx.Status { case binlog.STATUS_BEGIN: status = "BEGIN" case binlog.STATUS_COMMIT: status = "COMMIT" case binlog.STATUS_ROLLBACK: status = "ROLLBACK" } if t.CompressionType == "" { t.CompressionType = "NONE" } var err error switch showBasic { case 0, 1: _, err = f.WriteString(fmt.Sprintf("\n\nGTID:%s\nTime:%v\nSQLOrigin:%v\n", tx.GTID, tx.Time, strings.ReplaceAll(t.Sql, "\n", " "))) default: _, err = f.WriteString(fmt.Sprintf("\n\nNumber:%v\nGTID:%s\nTime%s\nTotalSize:%v TotalCost:%v\n"+ "StartPos:%v EndPos:%v RowsCount:%v\nStatus:%v\n"+ "TxStartPos:%v TxEndPos:%v TxRowsCount:%v CompressionType:%s\n"+ "Schema:%v Type:%s TxTime:%v\nSQLOrigin:%v\n", foundCount, tx.GTID, tx.Time, tx.Size, tx.TxEndTime-tx.TxStartTime, tx.StartPos, tx.EndPos, tx.RowsCount, status, t.StartPos, t.EndPos, t.RowCount, t.CompressionType, t.Db+"."+t.Table, t.SqlType, t.Time, strings.ReplaceAll(t.Sql, "\n", " "))) } if err != nil { return err } return nil } if !outasRow { return addRow() } sqls := generateOfflineRowSql(t) for _, sql := range sqls { err := addRow() if err != nil { return err } if sql.SQL == "" { continue } f.WriteString("SQL:" + sql.SQL + "\n") } } return nil } type SQLInfo struct { ID int SQL string Origin [][]interface{} } func generateOfflineRowSql(t binlog.TxDetail) []SQLInfo { var sqlList = make([]SQLInfo, 0, len(t.Rows)) switch t.SqlType { case "insert": for idx, rows := range t.Rows { setence := "" for _, row := range rows { switch row.(type) { case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: setence += fmt.Sprintf("%v, ", row) case string: setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) case []byte: if startext.IsUtf8(row.([]byte)) { setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(string(row.([]byte)), "'", "''")) } else if startext.IsGBK(row.([]byte)) { data, _ := startext.GBK2UTF8(row.([]byte)) setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(string(data), "'", "''")) } else { setence += fmt.Sprintf("X'%X', ", row.([]byte)) } case nil: setence += fmt.Sprintf("%v, ", "NULL") default: setence += fmt.Sprintf("%v, ", row) } } if setence != "" && len(setence) > 2 { setence = setence[:len(setence)-2] } sqlList = append(sqlList, SQLInfo{ ID: idx + 1, SQL: fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence), }, ) } case "update": var sql string var where string for idxc, rows := range t.Rows { setence := "" spec := ", " if idxc%2 == 0 { spec = " AND " } for idxf, row := range rows { switch row.(type) { case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case string: setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) case []byte: if startext.IsUtf8(row.([]byte)) { setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(row.([]byte)), "'", "''"), spec) } else if startext.IsGBK(row.([]byte)) { data, _ := startext.GBK2UTF8(row.([]byte)) setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(data), "'", "''"), spec) } else { setence += fmt.Sprintf("@%d=X'%X'%s", idxf, row.([]byte), spec) } case nil: setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec) default: setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) } } if setence != "" && len(setence) > 2 { setence = setence[:len(setence)-len(spec)] } if idxc%2 == 0 { where = setence continue } sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) sqlList = append(sqlList, SQLInfo{ ID: (idxc + 1) / 2, SQL: sql, }, ) } case "delete": for idx, rows := range t.Rows { setence := "" spec := " AND " for idxf, row := range rows { switch row.(type) { case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) case string: setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) case []byte: if startext.IsUtf8(row.([]byte)) { setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(row.([]byte)), "'", "''"), spec) } else if startext.IsGBK(row.([]byte)) { data, _ := startext.GBK2UTF8(row.([]byte)) setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(data), "'", "''"), spec) } else { setence += fmt.Sprintf("@%d=X'%X'%s", idxf, row.([]byte), spec) } case nil: setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec) default: setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec) } } if setence != "" && len(setence) > 2 { setence = setence[:len(setence)-len(spec)] } sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) sqlList = append(sqlList, SQLInfo{ ID: idx + 1, SQL: sql, }, ) } default: sqlList = append(sqlList, SQLInfo{Origin: t.Rows}) return sqlList } return sqlList }