You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

642 lines
18 KiB
Go

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package main
import (
"b612.me/mysql/binlog"
"b612.me/mysql/gtid"
"b612.me/starlog"
"b612.me/staros"
"b612.me/startext"
"errors"
"fmt"
"github.com/spf13/cobra"
"github.com/tealeg/xlsx"
"os"
"path/filepath"
"regexp"
"strings"
"time"
)
var (
bigThan int
smallThan int
startPos int
endPos int
startTime int64
endTime int64
includeDTs []string
excludeDTs []string
includeGtid string
excludeGtid string
outPath string
vbo bool
skipquery bool
pos int64
prefix string
outasRow bool
counts int
fourceParse bool
timeBigThan int
timeSmallThan int
outType string
showBasic uint8
)
func init() {
cmd.Flags().IntVar(&timeBigThan, "cost-after", 0, "show transactions cost big than x seconds")
cmd.Flags().IntVar(&timeSmallThan, "cost-less", 0, "show transactions cost below x seconds")
cmd.Flags().IntVarP(&counts, "count", "c", 0, "counts of binlog transaction")
cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog")
cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog")
cmd.Flags().IntVarP(&endPos, "end-pos", "F", 0, "endPos of binlog")
cmd.Flags().StringVarP(&includeGtid, "include-gtid", "i", "", "include gtid")
cmd.Flags().StringVarP(&excludeGtid, "exclude-gtid", "e", "", "exclude gtid")
cmd.Flags().StringVarP(&outPath, "savepath", "o", "", "output excel path")
cmd.Flags().Int64Var(&startTime, "starttime", 0, "start unix timestamp")
cmd.Flags().Int64Var(&endTime, "endtime", 0, "end unix timestamp")
cmd.Flags().BoolVarP(&vbo, "verbose", "v", false, "show the detail verbose")
cmd.Flags().BoolVarP(&skipquery, "skip-query", "s", true, "skip query write to xlsx like BEGIN COMMIT")
cmd.Flags().IntVar(&bigThan, "big", 0, "show tx big than x bytes")
cmd.Flags().IntVar(&smallThan, "small", 0, "show tx small than x bytes")
cmd.Flags().StringVar(&prefix, "prefix", "mysql-bin", "mysql binlog prefix")
cmd.Flags().BoolVarP(&outasRow, "row-mode", "r", false, "output as row")
cmd.Flags().StringSliceVarP(&includeDTs, "include-tables", "I", []string{}, "whitelist schemas and tables,eg: schema.table")
cmd.Flags().StringSliceVarP(&excludeDTs, "exclude-tables", "E", []string{}, "blacklist schemas and tables,eg: schema.table")
cmd.Flags().BoolVar(&fourceParse, "force-parse-all", false, "force parse all events in binlog")
cmd.Flags().StringVarP(&outType, "output-type", "t", "xlsx", "output file type,now support xlsx,txt")
cmd.Flags().Uint8Var(&showBasic, "show-level", 255, "show level,value range 0-255")
}
var cmd = &cobra.Command{
Use: "",
Short: "binlog parser by i@b612.me",
Long: "binlog解析工具支持离线binlog解析",
Version: "0.1.0",
Run: func(cmd *cobra.Command, args []string) {
// for test
// cc05c88c-0683-11ee-8149-fa163e8c148c:44969805
if len(args) == 0 {
starlog.Warningln("Please enter a binlog path or folder")
cmd.Help()
return
}
now := time.Now()
ParseBinlog(args)
cost := time.Now().Sub(now).Seconds()
fmt.Println("")
fmt.Printf("Time Cost:%.2fs", cost)
},
}
func main() {
cmd.Execute()
}
func ParseBinlog(filePath []string) {
var err error
foundCount := 0
var totalGtid *gtid.Gtid
var out Outfile
if outPath != "" {
out, err = InitOutput(outType, outPath)
if err != nil {
starlog.Errorln(err)
return
}
}
getParser := func(fpath string) string {
var sTime, eTime time.Time
if startTime != 0 {
sTime = time.Unix(startTime, 0)
}
if endTime != 0 {
eTime = time.Unix(endTime, 0)
}
onlyGtid := false
if !vbo && outPath == "" && !fourceParse {
onlyGtid = true
}
var filter = binlog.BinlogFilter{
IncludeTables: includeDTs,
ExcludeTables: excludeDTs,
IncludeGtid: includeGtid,
ExcludeGtid: excludeGtid,
StartPos: startPos,
EndPos: endPos,
StartDate: sTime,
EndDate: eTime,
BigThan: bigThan,
SmallThan: smallThan,
OnlyShowGtid: onlyGtid,
}
var cGtid *gtid.Gtid
proc := make(chan string, 1000)
if !vbo {
go func() {
var latest string
var count = uint16(0)
for {
select {
case tmp := <-proc:
if tmp == "end" {
fmt.Println(latest)
return
}
latest = tmp
if count%10 == 0 {
fmt.Print(latest)
}
count++
case <-time.After(time.Millisecond * 200):
fmt.Print(latest)
}
}
}()
}
err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) bool {
if timeBigThan != 0 {
if tx.TxEndTime-tx.TxStartTime < int64(timeBigThan) {
return true
}
}
if timeSmallThan != 0 {
if tx.TxEndTime-tx.TxStartTime > int64(timeSmallThan) {
return true
}
}
foundCount++
if cGtid == nil {
cGtid, _ = gtid.Parse(tx.GTID)
} else {
cGtid.Add(tx.GTID)
}
if totalGtid == nil {
totalGtid, _ = gtid.Parse(tx.GTID)
} else {
totalGtid.Add(tx.GTID)
}
if !vbo {
proc <- fmt.Sprintf("已找到%d个合法GTID\r", foundCount)
} else {
if !outasRow {
switch showBasic {
case 0:
fmt.Printf("GTID:%s\nTime:%s\nSQL:%v\n\n",
tx.GTID, tx.Time, tx.GetSqlOrigin())
default:
fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n",
tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs)
}
} else {
fmt.Printf("-------------------------\nGTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v\n\n",
tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size)
for _, t := range tx.Txs {
if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") {
continue
}
sqls := generateOfflineRowSql(t)
for _, sql := range sqls {
switch showBasic {
case 0:
fmt.Printf("GTID:\t%s\nTime:\t%s\nSQL:\t%+v\n\n",
tx.GTID, t.Time, sql.SQL)
case 1:
fmt.Printf("GTID:\t%s\nTime:\t%s\nSQLOrigin:\t%+v\nSQL:\t%+v\n\n",
tx.GTID, t.Time, t.Sql, sql.SQL)
default:
fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nCost\t%v\nSchema:%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n",
tx.GTID, t.Time, t.StartPos, t.EndPos, tx.TxEndTime-tx.TxStartTime, t.Db+"."+t.Table, t.RowCount, t.Sql, sql.SQL)
}
}
}
}
}
if outPath != "" {
AddOutfile(out, tx, foundCount)
}
if counts > 0 && foundCount >= counts {
return false
}
return true
})
if !vbo {
time.Sleep(time.Millisecond * 500)
}
var cGtidStr string
if cGtid != nil {
cGtidStr = cGtid.String()
}
if outPath != "" && out.Type == "xlsx" {
err = FinishOutfile(out, outPath)
if err != nil {
starlog.Errorln(err)
return cGtidStr
}
}
if err != nil {
starlog.Errorln(err)
return cGtidStr
}
return cGtidStr
}
var gtidRes [][]string
for _, fp := range filePath {
if staros.IsFolder(fp) {
files, err := os.ReadDir(fp)
if err != nil {
starlog.Errorln("read folder failed:", err)
return
}
rgp := regexp.MustCompile(`^` + prefix + `\.\d+$`)
for _, v := range files {
if !v.IsDir() && rgp.MatchString(v.Name()) {
gtidRes = append(gtidRes, []string{v.Name(), getParser(filepath.Join(fp, v.Name()))})
}
}
} else {
getParser(fp)
}
}
if outPath != "" {
err := FinishOutfile(out, outPath)
if err != nil {
starlog.Errorln(err)
}
}
fmt.Println("")
if len(gtidRes) != 0 {
for _, v := range gtidRes {
fmt.Printf("%s:%s\n", v[0], v[1])
}
}
fmt.Println("")
allGtid := ""
if totalGtid != nil {
allGtid = totalGtid.String()
}
fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount)
}
func prepareXlsx() (Outfile, error) {
owrt := xlsx.NewFile()
res, err := owrt.AddSheet("结果")
if err != nil {
starlog.Errorln(err)
return Outfile{}, err
}
title := res.AddRow()
title.AddCell().SetValue("序号")
title.AddCell().SetValue("GTID")
title.AddCell().SetValue("时间")
if showBasic > 1 {
title.AddCell().SetValue("时间戳")
title.AddCell().SetValue("StartPos")
title.AddCell().SetValue("EndPos")
title.AddCell().SetValue("事务大小")
title.AddCell().SetValue("影响行数")
title.AddCell().SetValue("事务状态")
title.AddCell().SetValue("事务耗时")
title.AddCell().SetValue("压缩类型")
title.AddCell().SetValue("单语句StartPos")
title.AddCell().SetValue("单语句EndPos")
title.AddCell().SetValue("单语句时间")
title.AddCell().SetValue("单语句影响行数")
title.AddCell().SetValue("单语句影响库")
title.AddCell().SetValue("单语句影响表")
}
title.AddCell().SetValue("SQL类型")
title.AddCell().SetValue("具体SQL")
title.AddCell().SetValue("从属事务编号")
title.AddCell().SetValue("同事务行编号")
title.AddCell().SetValue("行变更内容")
res.SetColWidth(0, 0, 5)
res.SetColWidth(1, 1, 40)
res.SetColWidth(3, 6, 6)
res.SetColWidth(7, 7, 5)
res.SetColWidth(18, 18, 40)
return Outfile{
Type: "xlsx",
File: owrt,
}, owrt.Save(outPath)
}
type Outfile struct {
Type string
File interface{}
}
func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) error {
for k, t := range tx.Txs {
if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") {
continue
}
addRow := func() *xlsx.Row {
r := owrt.Sheets[0].AddRow()
r.AddCell().SetValue(foundCount)
r.AddCell().SetValue(tx.GTID)
r.AddCell().SetValue(tx.Time.String())
if showBasic > 1 {
r.AddCell().SetValue(tx.Timestamp)
r.AddCell().SetValue(tx.StartPos)
r.AddCell().SetValue(tx.EndPos)
r.AddCell().SetValue(tx.Size)
r.AddCell().SetValue(tx.RowsCount)
status := "PREPARE"
switch tx.Status {
case binlog.STATUS_BEGIN:
status = "BEGIN"
case binlog.STATUS_COMMIT:
status = "COMMIT"
case binlog.STATUS_ROLLBACK:
status = "ROLLBACK"
}
r.AddCell().SetValue(status)
r.AddCell().SetValue(tx.TxEndTime - tx.TxStartTime)
if t.CompressionType == "" {
r.AddCell().SetValue("NONE")
} else {
r.AddCell().SetValue(t.CompressionType)
}
r.AddCell().SetValue(t.StartPos)
r.AddCell().SetValue(t.EndPos)
r.AddCell().SetValue(t.Time.String())
r.AddCell().SetValue(t.RowCount)
r.AddCell().SetValue(t.Db)
r.AddCell().SetValue(t.Table)
}
r.AddCell().SetValue(t.SqlType)
r.AddCell().SetValue(t.Sql)
return r
}
if !outasRow {
r := addRow()
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(1)
r.AddCell().SetValue(t.Rows)
continue
}
sqls := generateOfflineRowSql(t)
for _, sql := range sqls {
r := addRow()
r.AddCell().SetValue(k + 1)
r.AddCell().SetValue(sql.ID)
r.AddCell().SetValue(sql.SQL)
}
}
return nil
}
func InitOutput(types string, outPath string) (Outfile, error) {
switch types {
case "xlsx":
return prepareXlsx()
case "txt":
return prepareTxtFile(outPath)
default:
return Outfile{}, errors.New("Not Support This Outfile Format:" + types)
}
}
func AddOutfile(of Outfile, tx binlog.Transaction, foundCount int) error {
switch of.Type {
case "xlsx":
owrt, ok := of.File.(*xlsx.File)
if !ok {
return errors.New("failed!,not a valid xlsx pointer")
}
return add2Xlsx(owrt, tx, foundCount)
case "txt":
f, ok := of.File.(*os.File)
if !ok {
return errors.New("failed!,not a valid txtfile pointer")
}
return addTxtFile(f, tx, foundCount)
default:
return errors.New("Not Support This Outfile Format:" + of.Type)
}
}
func FinishOutfile(of Outfile, path string) error {
switch of.Type {
case "xlsx":
owrt, ok := of.File.(*xlsx.File)
if !ok {
return errors.New("failed!,not a valid xlsx pointer")
}
return owrt.Save(path)
case "txt":
f, ok := of.File.(*os.File)
if !ok {
return errors.New("failed!,not a valid txtfile pointer")
}
f.Sync()
return f.Close()
default:
return errors.New("Not Support This Outfile Format:" + of.Type)
}
}
func prepareTxtFile(path string) (Outfile, error) {
txtFile, err := os.OpenFile(path, os.O_TRUNC|os.O_RDWR|os.O_CREATE, 0644)
if err != nil {
return Outfile{}, err
}
return Outfile{
Type: "txt",
File: txtFile,
}, err
}
func addTxtFile(f *os.File, tx binlog.Transaction, foundCount int) error {
for _, t := range tx.Txs {
if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") {
continue
}
addRow := func() error {
status := "PREPARE"
switch tx.Status {
case binlog.STATUS_BEGIN:
status = "BEGIN"
case binlog.STATUS_COMMIT:
status = "COMMIT"
case binlog.STATUS_ROLLBACK:
status = "ROLLBACK"
}
if t.CompressionType == "" {
t.CompressionType = "NONE"
}
var err error
switch showBasic {
case 0, 1:
_, err = f.WriteString(fmt.Sprintf("\n\nGTID:%s\nTime:%v\nSQLOrigin:%v\n",
tx.GTID, tx.Time, strings.ReplaceAll(t.Sql, "\n", " ")))
default:
_, err = f.WriteString(fmt.Sprintf("\n\nNumber:%v\nGTID:%s\nTime%s\nTotalSize:%v TotalCost:%v\n"+
"StartPos:%v EndPos:%v RowsCount:%v\nStatus:%v\n"+
"TxStartPos:%v TxEndPos:%v TxRowsCount:%v CompressionType:%s\n"+
"Schema:%v Type:%s TxTime:%v\nSQLOrigin:%v\n",
foundCount, tx.GTID, tx.Time, tx.Size, tx.TxEndTime-tx.TxStartTime,
tx.StartPos, tx.EndPos, tx.RowsCount, status,
t.StartPos, t.EndPos, t.RowCount, t.CompressionType,
t.Db+"."+t.Table, t.SqlType, t.Time, strings.ReplaceAll(t.Sql, "\n", " ")))
}
if err != nil {
return err
}
return nil
}
if !outasRow {
return addRow()
}
sqls := generateOfflineRowSql(t)
for _, sql := range sqls {
err := addRow()
if err != nil {
return err
}
if sql.SQL == "" {
continue
}
f.WriteString("SQL:" + sql.SQL + "\n")
}
}
return nil
}
type SQLInfo struct {
ID int
SQL string
Origin [][]interface{}
}
func generateOfflineRowSql(t binlog.TxDetail) []SQLInfo {
var sqlList = make([]SQLInfo, 0, len(t.Rows))
switch t.SqlType {
case "insert":
for idx, rows := range t.Rows {
setence := ""
for _, row := range rows {
switch row.(type) {
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32:
setence += fmt.Sprintf("%v, ", row)
case string:
setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''"))
case []byte:
if startext.IsUtf8(row.([]byte)) {
setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(string(row.([]byte)), "'", "''"))
} else if startext.IsGBK(row.([]byte)) {
data, _ := startext.GBK2UTF8(row.([]byte))
setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(string(data), "'", "''"))
} else {
setence += fmt.Sprintf("X'%X', ", row.([]byte))
}
case nil:
setence += fmt.Sprintf("%v, ", "NULL")
default:
setence += fmt.Sprintf("%v, ", row)
}
}
if setence != "" && len(setence) > 2 {
setence = setence[:len(setence)-2]
}
sqlList = append(sqlList,
SQLInfo{
ID: idx + 1,
SQL: fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence),
},
)
}
case "update":
var sql string
var where string
for idxc, rows := range t.Rows {
setence := ""
spec := ", "
if idxc%2 == 0 {
spec = " AND "
}
for idxf, row := range rows {
switch row.(type) {
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32:
setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec)
case string:
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec)
case []byte:
if startext.IsUtf8(row.([]byte)) {
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(row.([]byte)), "'", "''"), spec)
} else if startext.IsGBK(row.([]byte)) {
data, _ := startext.GBK2UTF8(row.([]byte))
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(data), "'", "''"), spec)
} else {
setence += fmt.Sprintf("@%d=X'%X'%s", idxf, row.([]byte), spec)
}
case nil:
setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec)
default:
setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec)
}
}
if setence != "" && len(setence) > 2 {
setence = setence[:len(setence)-len(spec)]
}
if idxc%2 == 0 {
where = setence
continue
}
sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where)
sqlList = append(sqlList,
SQLInfo{
ID: (idxc + 1) / 2,
SQL: sql,
},
)
}
case "delete":
for idx, rows := range t.Rows {
setence := ""
spec := " AND "
for idxf, row := range rows {
switch row.(type) {
case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32:
setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec)
case string:
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec)
case []byte:
if startext.IsUtf8(row.([]byte)) {
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(row.([]byte)), "'", "''"), spec)
} else if startext.IsGBK(row.([]byte)) {
data, _ := startext.GBK2UTF8(row.([]byte))
setence += fmt.Sprintf("@%d='%v'%s", idxf, strings.ReplaceAll(string(data), "'", "''"), spec)
} else {
setence += fmt.Sprintf("@%d=X'%X'%s", idxf, row.([]byte), spec)
}
case nil:
setence += fmt.Sprintf("@%d=%v%s", idxf, "NULL", spec)
default:
setence += fmt.Sprintf("@%d=%v%s", idxf, row, spec)
}
}
if setence != "" && len(setence) > 2 {
setence = setence[:len(setence)-len(spec)]
}
sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence)
sqlList = append(sqlList,
SQLInfo{
ID: idx + 1,
SQL: sql,
},
)
}
default:
sqlList = append(sqlList, SQLInfo{Origin: t.Rows})
return sqlList
}
return sqlList
}