From f782522f6889f6ddc16350fbef1ecb9769d1bfd2 Mon Sep 17 00:00:00 2001 From: starainrt Date: Mon, 3 Jul 2023 13:37:39 +0800 Subject: [PATCH] updaate --- parse.go | 44 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/parse.go b/parse.go index d4e0e68..d1fb65f 100644 --- a/parse.go +++ b/parse.go @@ -8,6 +8,7 @@ import ( "github.com/starainrt/go-mysql/replication" "io" "os" + "strings" "time" ) @@ -311,20 +312,44 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { } type BinlogFilter struct { - IncludeGtid string - ExcludeGtid string - StartPos int - EndPos int - StartDate time.Time - EndDate time.Time - BigThan int - SmallThan int - OnlyShowGtid bool + IncludeGtid string + ExcludeGtid string + IncludeTables []string + ExcludeTables []string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int + OnlyShowGtid bool } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error { var subGtid, inGtid, exGtid *gtid.Gtid var err error + var includeMap = make(map[string]bool) + var excludeMap = make(map[string]bool) + if len(filter.IncludeTables) != 0 { + for _, v := range filter.IncludeTables { + if len(strings.Split(v, ".")) != 2 { + return fmt.Errorf("IncludeTable Name Is Invalid:%s", v) + } + includeMap[v] = true + } + } else { + includeMap["*.*"] = true + } + if len(filter.ExcludeTables) != 0 { + for _, v := range filter.ExcludeTables { + if len(strings.Split(v, ".")) != 2 { + return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v) + } + excludeMap[v] = true + } + } else { + excludeMap["*.*"] = true + } if filter.IncludeGtid != "" { inGtid, err = gtid.Parse(filter.IncludeGtid) if err != nil { @@ -338,7 +363,6 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter return err } } - // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64