From c196beb9aa9791d2f15c95f12729b605d3cf0622 Mon Sep 17 00:00:00 2001 From: starainrt Date: Fri, 30 Jun 2023 18:30:29 +0800 Subject: [PATCH] update --- go.mod | 2 +- go.sum | 4 +- main.go | 437 +++++++++++---------------- vendor/b612.me/mysql/binlog/parse.go | 49 +-- vendor/modules.txt | 2 +- 5 files changed, 217 insertions(+), 277 deletions(-) diff --git a/go.mod b/go.mod index 96ef45f..fb8d2d3 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module binlog go 1.20 require ( - b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 + b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/starlog v1.3.2 b612.me/staros v1.1.6 diff --git a/go.sum b/go.sum index a0058ee..f042f96 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 h1:U5z6K7FTtGwAhDJn3TFqRGkuXYd623osfsus0AGwAPg= -b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= +b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 h1:6U/hChR8T9L9v+0olHpJRlx4iDiFj1e5psWVjP668jo= +b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= diff --git a/main.go b/main.go index 8b7371b..9a6ccd3 100644 --- a/main.go +++ b/main.go @@ -31,9 +31,11 @@ var ( pos int64 prefix string outasRow bool + counts int ) func init() { + cmd.Flags().IntVarP(&counts, "count", "c", 0, "counts of binlog transaction") cmd.Flags().IntVarP(&endPos, "pos", "P", 0, "skipPos of binlog") cmd.Flags().IntVarP(&startPos, "start-pos", "S", 0, "startPos of binlog") cmd.Flags().IntVarP(&endPos, "end-pos", "E", 0, "endPos of binlog") @@ -77,41 +79,11 @@ func ParseBinlog() { foundCount := 0 var totalGtid *gtid.Gtid var owrt *xlsx.File - var res *xlsx.Sheet if outPath != "" { - owrt = xlsx.NewFile() - res, err = owrt.AddSheet("结果") + owrt, err = prepareXlsx() if err != nil { - starlog.Errorln(err) return } - title := res.AddRow() - title.AddCell().SetValue("序号") - title.AddCell().SetValue("GTID") - title.AddCell().SetValue("时间") - title.AddCell().SetValue("时间戳") - title.AddCell().SetValue("StartPos") - title.AddCell().SetValue("EndPos") - title.AddCell().SetValue("事务大小") - title.AddCell().SetValue("影响行数") - title.AddCell().SetValue("压缩类型") - title.AddCell().SetValue("单语句StartPos") - title.AddCell().SetValue("单语句EndPos") - title.AddCell().SetValue("单语句时间") - title.AddCell().SetValue("单语句影响行数") - title.AddCell().SetValue("单语句影响库") - title.AddCell().SetValue("单语句影响表") - title.AddCell().SetValue("SQL类型") - title.AddCell().SetValue("具体SQL") - title.AddCell().SetValue("从属事务编号") - title.AddCell().SetValue("同事务行编号") - title.AddCell().SetValue("行变更内容") - res.SetColWidth(0, 0, 5) - res.SetColWidth(1, 1, 40) - res.SetColWidth(3, 6, 6) - res.SetColWidth(7, 7, 5) - res.SetColWidth(16, 16, 40) - owrt.Save(outPath) } getParser := func(fpath string) string { var sTime, eTime time.Time @@ -160,7 +132,7 @@ func ParseBinlog() { } }() } - err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) { + err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) bool { foundCount++ if cGtid == nil { cGtid, _ = gtid.Parse(tx.GTID) @@ -185,235 +157,19 @@ func ParseBinlog() { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } - switch t.SqlType { - case "insert": - for _, rows := range t.Rows { - setence := "" - for _, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("%v, ", row) - case string: - setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) - case []byte: - setence += fmt.Sprintf("%v, ", row) - case nil: - setence += fmt.Sprintf("%v, ", "NULL") - default: - setence += fmt.Sprintf("%v, ", row) - } - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-2] - } - sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES (%v)`, t.Db, t.Table, setence) - fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", - tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) - - } - case "update": - var sql string - var where string - for idxc, rows := range t.Rows { - setence := "" - spec := ", " - if idxc%2 == 0 { - spec = " AND " - } - for idxf, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) - case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) - default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - } - - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-len(spec)] - } - if idxc%2 == 0 { - where = setence - continue - } - sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) - fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", - tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) - } - case "delete": - for _, rows := range t.Rows { - setence := "" - spec := " AND " - for idxf, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) - case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) - default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - } - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-len(spec)] - } - sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) - fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", - tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) - } - default: - fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", - tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, t.Rows) - } + _, sql := generateRowSql(t) + fmt.Printf("GTID:\t%s\nTime:\t%s\nStartPos:\t%v\nEndPos:\t%v\nRowsCount:\t%v\nSQLOrigin:\t%v\nSQL:\t%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) } } } if outPath != "" { - for k, t := range tx.Txs { - if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { - continue - } - addRow := func() *xlsx.Row { - r := res.AddRow() - r.AddCell().SetValue(foundCount) - r.AddCell().SetValue(tx.GTID) - r.AddCell().SetValue(tx.Time.String()) - r.AddCell().SetValue(tx.Timestamp) - r.AddCell().SetValue(tx.StartPos) - r.AddCell().SetValue(tx.EndPos) - r.AddCell().SetValue(tx.Size) - r.AddCell().SetValue(tx.RowsCount) - if t.CompressionType == "" { - r.AddCell().SetValue("NONE") - } else { - r.AddCell().SetValue(t.CompressionType) - } - r.AddCell().SetValue(t.StartPos) - r.AddCell().SetValue(t.EndPos) - r.AddCell().SetValue(t.Time.String()) - r.AddCell().SetValue(t.RowCount) - r.AddCell().SetValue(t.Db) - r.AddCell().SetValue(t.Table) - r.AddCell().SetValue(t.SqlType) - r.AddCell().SetValue(t.Sql) - return r - } - if !outasRow { - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue(1) - r.AddCell().SetValue(t.Rows) - continue - } - switch t.SqlType { - case "insert": - for idx, rows := range t.Rows { - setence := "" - for _, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("%v, ", row) - case string: - setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) - case []byte: - setence += fmt.Sprintf("%v, ", row) - case nil: - setence += fmt.Sprintf("%v, ", "NULL") - default: - setence += fmt.Sprintf("%v, ", row) - } - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-2] - } - sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence) - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue(idx + 1) - r.AddCell().SetValue(sql) - } - case "update": - var sql string - var where string - for idxc, rows := range t.Rows { - setence := "" - spec := ", " - if idxc%2 == 0 { - spec = " AND " - } - for idxf, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) - case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) - default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) - } - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-len(spec)] - } - if idxc%2 == 0 { - where = setence - continue - } - sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue((idxc + 1) / 2) - r.AddCell().SetValue(sql) - } - case "delete": - for idx, rows := range t.Rows { - setence := "" - spec := " AND " - for idxf, row := range rows { - switch row.(type) { - case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) - case string: - setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''")) - case []byte: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) - case nil: - setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL") - default: - setence += fmt.Sprintf("$%d=%v%s", idxf, row) - } - } - if setence != "" && len(setence) > 2 { - setence = setence[:len(setence)-len(spec)] - } - sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue(idx + 1) - r.AddCell().SetValue(sql) - - } - default: - r := addRow() - r.AddCell().SetValue(k + 1) - r.AddCell().SetValue(1) - r.AddCell().SetValue(t.Rows) - } - } + add2Xlsx(owrt, tx, foundCount) } + if foundCount >= counts { + return false + } + return true }) if !vbo { time.Sleep(time.Millisecond * 500) @@ -469,3 +225,172 @@ func ParseBinlog() { } fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount) } + +func prepareXlsx() (*xlsx.File, error) { + owrt := xlsx.NewFile() + res, err := owrt.AddSheet("结果") + if err != nil { + starlog.Errorln(err) + return owrt, err + } + title := res.AddRow() + title.AddCell().SetValue("序号") + title.AddCell().SetValue("GTID") + title.AddCell().SetValue("时间") + title.AddCell().SetValue("时间戳") + title.AddCell().SetValue("StartPos") + title.AddCell().SetValue("EndPos") + title.AddCell().SetValue("事务大小") + title.AddCell().SetValue("影响行数") + title.AddCell().SetValue("压缩类型") + title.AddCell().SetValue("单语句StartPos") + title.AddCell().SetValue("单语句EndPos") + title.AddCell().SetValue("单语句时间") + title.AddCell().SetValue("单语句影响行数") + title.AddCell().SetValue("单语句影响库") + title.AddCell().SetValue("单语句影响表") + title.AddCell().SetValue("SQL类型") + title.AddCell().SetValue("具体SQL") + title.AddCell().SetValue("从属事务编号") + title.AddCell().SetValue("同事务行编号") + title.AddCell().SetValue("行变更内容") + res.SetColWidth(0, 0, 5) + res.SetColWidth(1, 1, 40) + res.SetColWidth(3, 6, 6) + res.SetColWidth(7, 7, 5) + res.SetColWidth(16, 16, 40) + return owrt, owrt.Save(outPath) +} + +func add2Xlsx(owrt *xlsx.File, tx binlog.Transaction, foundCount int) { + for k, t := range tx.Txs { + if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { + continue + } + addRow := func() *xlsx.Row { + r := owrt.Sheets[0].AddRow() + r.AddCell().SetValue(foundCount) + r.AddCell().SetValue(tx.GTID) + r.AddCell().SetValue(tx.Time.String()) + r.AddCell().SetValue(tx.Timestamp) + r.AddCell().SetValue(tx.StartPos) + r.AddCell().SetValue(tx.EndPos) + r.AddCell().SetValue(tx.Size) + r.AddCell().SetValue(tx.RowsCount) + if t.CompressionType == "" { + r.AddCell().SetValue("NONE") + } else { + r.AddCell().SetValue(t.CompressionType) + } + r.AddCell().SetValue(t.StartPos) + r.AddCell().SetValue(t.EndPos) + r.AddCell().SetValue(t.Time.String()) + r.AddCell().SetValue(t.RowCount) + r.AddCell().SetValue(t.Db) + r.AddCell().SetValue(t.Table) + r.AddCell().SetValue(t.SqlType) + r.AddCell().SetValue(t.Sql) + return r + } + if !outasRow { + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(1) + r.AddCell().SetValue(t.Rows) + continue + } + idx, sql := generateRowSql(t) + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(idx + 1) + r.AddCell().SetValue(sql) + } +} + +func generateRowSql(t binlog.TxDetail) (int, interface{}) { + switch t.SqlType { + case "insert": + for idx, rows := range t.Rows { + setence := "" + for _, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("%v, ", row) + case string: + setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) + case []byte: + setence += fmt.Sprintf("%v, ", row) + case nil: + setence += fmt.Sprintf("%v, ", "NULL") + default: + setence += fmt.Sprintf("%v, ", row) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-2] + } + return idx + 1, fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence) + } + case "update": + var sql string + var where string + for idxc, rows := range t.Rows { + setence := "" + spec := ", " + if idxc%2 == 0 { + spec = " AND " + } + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + if idxc%2 == 0 { + where = setence + continue + } + sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) + return (idxc + 1) / 2, sql + } + case "delete": + for idx, rows := range t.Rows { + setence := "" + spec := " AND " + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''")) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL") + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) + return idx + 1, sql + + } + default: + return 1, t.Rows + } + return 0, "" +} diff --git a/vendor/b612.me/mysql/binlog/parse.go b/vendor/b612.me/mysql/binlog/parse.go index 90f072e..d4e0e68 100644 --- a/vendor/b612.me/mysql/binlog/parse.go +++ b/vendor/b612.me/mysql/binlog/parse.go @@ -41,10 +41,10 @@ func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } -func ParseBinlogFile(path string, fx func(transaction Transaction)) error { +func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } -func parseOneBinlog(path string, fx func(Transaction)) error { +func parseOneBinlog(path string, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } @@ -73,7 +73,7 @@ func parseOneBinlog(path string, fx func(Transaction)) error { return parseBinlogDetail(f, fx) } -func parseBinlogDetail(r io.Reader, f func(Transaction)) error { +func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) @@ -180,7 +180,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { } tx.Size = tx.EndPos - tx.StartPos if f != nil { - f(tx) + if !f(tx) { + return nil + } } } currentGtid = ev.Data @@ -320,14 +322,15 @@ type BinlogFilter struct { OnlyShowGtid bool } -func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { - var inGtid, exGtid *gtid.Gtid +func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { + var subGtid, inGtid, exGtid *gtid.Gtid var err error if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { return err } + subGtid = inGtid.Clone() } if filter.ExcludeGtid != "" { exGtid, err = gtid.Parse(filter.ExcludeGtid) @@ -345,33 +348,36 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter var tx Transaction currentGtid := "" - callFn := func(tx Transaction) { + callFn := func(tx Transaction) bool { if fn == nil { - return + return true } if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { - return + return true } if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { - return + return true } if filter.StartPos != 0 && filter.StartPos > tx.StartPos { - return + return true } if filter.EndPos != 0 && filter.EndPos < tx.EndPos { - return + return true } if filter.BigThan != 0 && filter.BigThan > tx.Size { - return + return true } if filter.SmallThan != 0 && filter.SmallThan < tx.Size { - return + return true } - fn(tx) + return fn(tx) } for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { + if tx.Time.IsZero() { + return nil + } idx := 0 for k, v := range tx.Txs { if v.SqlType != "query" && len(tx.sqlOrigin) > idx { @@ -481,7 +487,16 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.EndPos = startPos - 1 } tx.Size = tx.EndPos - tx.StartPos - callFn(tx) + if !callFn(tx) { + return nil + } + if subGtid != nil { + subGtid.Sub(tx.GTID) + if subGtid.EventCount() == 0 { + return nil + } + } + tx = Transaction{} } currentGtid = ev.Data if inGtid != nil { @@ -530,7 +545,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter } } -func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { +func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { defer func() { recover() }() diff --git a/vendor/modules.txt b/vendor/modules.txt index a772e21..ee831b4 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 +# b612.me/mysql/binlog v0.0.0-20230630095545-8caa467be7e9 ## explicit; go 1.20 b612.me/mysql/binlog # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044