master
兔子 2 years ago
parent 1c838a7c1e
commit 8caa467be7

@ -41,10 +41,10 @@ func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin return t.sqlOrigin
} }
func ParseBinlogFile(path string, fx func(transaction Transaction)) error { func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx) return parseOneBinlog(path, fx)
} }
func parseOneBinlog(path string, fx func(Transaction)) error { func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) { if !staros.Exists(path) {
return os.ErrNotExist return os.ErrNotExist
} }
@ -73,7 +73,7 @@ func parseOneBinlog(path string, fx func(Transaction)) error {
return parseBinlogDetail(f, fx) return parseBinlogDetail(f, fx)
} }
func parseBinlogDetail(r io.Reader, f func(Transaction)) error { func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parse := replication.NewBinlogParser() parse := replication.NewBinlogParser()
parse.SetParseTime(false) parse.SetParseTime(false)
parse.SetUseDecimal(false) parse.SetUseDecimal(false)
@ -180,7 +180,9 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
} }
tx.Size = tx.EndPos - tx.StartPos tx.Size = tx.EndPos - tx.StartPos
if f != nil { if f != nil {
f(tx) if !f(tx) {
return nil
}
} }
} }
currentGtid = ev.Data currentGtid = ev.Data
@ -320,7 +322,7 @@ type BinlogFilter struct {
OnlyShowGtid bool OnlyShowGtid bool
} }
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid var subGtid, inGtid, exGtid *gtid.Gtid
var err error var err error
if filter.IncludeGtid != "" { if filter.IncludeGtid != "" {
@ -346,29 +348,29 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
var tx Transaction var tx Transaction
currentGtid := "" currentGtid := ""
callFn := func(tx Transaction) { callFn := func(tx Transaction) bool {
if fn == nil { if fn == nil {
return return true
} }
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) { if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return return true
} }
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) { if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return return true
} }
if filter.StartPos != 0 && filter.StartPos > tx.StartPos { if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return return true
} }
if filter.EndPos != 0 && filter.EndPos < tx.EndPos { if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return return true
} }
if filter.BigThan != 0 && filter.BigThan > tx.Size { if filter.BigThan != 0 && filter.BigThan > tx.Size {
return return true
} }
if filter.SmallThan != 0 && filter.SmallThan < tx.Size { if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return return true
} }
fn(tx) return fn(tx)
} }
for { for {
headBuf := make([]byte, replication.EventHeaderSize) headBuf := make([]byte, replication.EventHeaderSize)
@ -485,7 +487,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.EndPos = startPos - 1 tx.EndPos = startPos - 1
} }
tx.Size = tx.EndPos - tx.StartPos tx.Size = tx.EndPos - tx.StartPos
callFn(tx) if !callFn(tx) {
return nil
}
if subGtid != nil { if subGtid != nil {
subGtid.Sub(tx.GTID) subGtid.Sub(tx.GTID)
if subGtid.EventCount() == 0 { if subGtid.EventCount() == 0 {
@ -541,7 +545,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
} }
} }
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error { func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
defer func() { defer func() {
recover() recover()
}() }()

@ -8,8 +8,9 @@ import (
) )
func TestParse(t *testing.T) { func TestParse(t *testing.T) {
ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) { ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) bool {
fmt.Println(transaction) fmt.Println(transaction)
return true
}) })
} }
@ -23,8 +24,9 @@ func TestParseFilter(t *testing.T) {
EndDate: time.Time{}, EndDate: time.Time{},
BigThan: 0, BigThan: 0,
SmallThan: 0, SmallThan: 0,
}, func(transaction Transaction) { }, func(transaction Transaction) bool {
fmt.Println(transaction) fmt.Println(transaction)
return true
}) })
} }
@ -33,9 +35,15 @@ func TestParseExternal(t *testing.T) {
if _, err := os.Stat(file); err != nil { if _, err := os.Stat(file); err != nil {
return return
} }
i := 0
ParseBinlogWithFilter(file, 0, BinlogFilter{ ParseBinlogWithFilter(file, 0, BinlogFilter{
OnlyShowGtid: true, OnlyShowGtid: false,
}, func(transaction Transaction) { }, func(transaction Transaction) bool {
fmt.Println(transaction) fmt.Println(transaction)
i++
if i >= 10 {
return false
}
return true
}) })
} }

Loading…
Cancel
Save