diff --git a/parse.go b/parse.go index f3e42b1..d4e0e68 100644 --- a/parse.go +++ b/parse.go @@ -41,10 +41,10 @@ func (t Transaction) GetSqlOrigin() []string { return t.sqlOrigin } -func ParseBinlogFile(path string, fx func(transaction Transaction)) error { +func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error { return parseOneBinlog(path, fx) } -func parseOneBinlog(path string, fx func(Transaction)) error { +func parseOneBinlog(path string, fx func(Transaction) bool) error { if !staros.Exists(path) { return os.ErrNotExist } @@ -73,7 +73,7 @@ func parseOneBinlog(path string, fx func(Transaction)) error { return parseBinlogDetail(f, fx) } -func parseBinlogDetail(r io.Reader, f func(Transaction)) error { +func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error { parse := replication.NewBinlogParser() parse.SetParseTime(false) parse.SetUseDecimal(false) @@ -180,7 +180,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { } tx.Size = tx.EndPos - tx.StartPos if f != nil { - f(tx) + if !f(tx) { + return nil + } } } currentGtid = ev.Data @@ -320,7 +322,7 @@ type BinlogFilter struct { OnlyShowGtid bool } -func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { +func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error if filter.IncludeGtid != "" { @@ -346,29 +348,29 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter var tx Transaction currentGtid := "" - callFn := func(tx Transaction) { + callFn := func(tx Transaction) bool { if fn == nil { - return + return true } if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { - return + return true } if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { - return + return true } if filter.StartPos != 0 && filter.StartPos > tx.StartPos { - return + return true } if filter.EndPos != 0 && filter.EndPos < tx.EndPos { - return + return true } if filter.BigThan != 0 && filter.BigThan > tx.Size { - return + return true } if filter.SmallThan != 0 && filter.SmallThan < tx.Size { - return + return true } - fn(tx) + return fn(tx) } for { headBuf := make([]byte, replication.EventHeaderSize) @@ -485,7 +487,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.EndPos = startPos - 1 } tx.Size = tx.EndPos - tx.StartPos - callFn(tx) + if !callFn(tx) { + return nil + } if subGtid != nil { subGtid.Sub(tx.GTID) if subGtid.EventCount() == 0 { @@ -541,7 +545,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter } } -func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { +func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error { defer func() { recover() }() diff --git a/parse_test.go b/parse_test.go index 14b2533..06c04e1 100644 --- a/parse_test.go +++ b/parse_test.go @@ -8,8 +8,9 @@ import ( ) func TestParse(t *testing.T) { - ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) { + ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) bool { fmt.Println(transaction) + return true }) } @@ -23,8 +24,9 @@ func TestParseFilter(t *testing.T) { EndDate: time.Time{}, BigThan: 0, SmallThan: 0, - }, func(transaction Transaction) { + }, func(transaction Transaction) bool { fmt.Println(transaction) + return true }) } @@ -33,9 +35,15 @@ func TestParseExternal(t *testing.T) { if _, err := os.Stat(file); err != nil { return } + i := 0 ParseBinlogWithFilter(file, 0, BinlogFilter{ - OnlyShowGtid: true, - }, func(transaction Transaction) { + OnlyShowGtid: false, + }, func(transaction Transaction) bool { fmt.Println(transaction) + i++ + if i >= 10 { + return false + } + return true }) }