From cc1434ceef4ea319b5d6667f492bea34ca0fefee Mon Sep 17 00:00:00 2001 From: starainrt Date: Tue, 25 Apr 2023 20:27:48 +0800 Subject: [PATCH] update --- parse.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/parse.go b/parse.go index 80aaf63..fe7a19a 100644 --- a/parse.go +++ b/parse.go @@ -81,6 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { tbMapPos uint32 = 0 ) var tx Transaction + currentGtid := "" for { headBuf := make([]byte, replication.EventHeaderSize) if _, err = io.ReadFull(r, headBuf); err == io.EOF { @@ -137,7 +138,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) startPos := 0 - if sqlType == "query" { + if sqlType == "query" || sqlType == "gtid" { startPos = int(h.LogPos - h.EventSize) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, @@ -150,7 +151,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { } switch sqlType { case "gtid": - if tx.StartPos != 0 { + if currentGtid == "" { for _, v := range tx.Txs { tx.RowsCount += v.RowCount } @@ -159,6 +160,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { f(tx) } } + currentGtid = sql tx = Transaction{ GTID: sql, StartPos: startPos,