master
兔子 1 year ago
parent 58eec327ea
commit cc1434ceef

@ -81,6 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
tbMapPos uint32 = 0 tbMapPos uint32 = 0
) )
var tx Transaction var tx Transaction
currentGtid := ""
for { for {
headBuf := make([]byte, replication.EventHeaderSize) headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF { if _, err = io.ReadFull(r, headBuf); err == io.EOF {
@ -137,7 +138,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) db, tb, sqlType, sql, rowCnt = GetDbTbAndQueryAndRowCntFromBinevent(binEvent)
startPos := 0 startPos := 0
if sqlType == "query" { if sqlType == "query" || sqlType == "gtid" {
startPos = int(h.LogPos - h.EventSize) startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
@ -150,7 +151,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
} }
switch sqlType { switch sqlType {
case "gtid": case "gtid":
if tx.StartPos != 0 { if currentGtid == "" {
for _, v := range tx.Txs { for _, v := range tx.Txs {
tx.RowsCount += v.RowCount tx.RowsCount += v.RowCount
} }
@ -159,6 +160,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
f(tx) f(tx)
} }
} }
currentGtid = sql
tx = Transaction{ tx = Transaction{
GTID: sql, GTID: sql,
StartPos: startPos, StartPos: startPos,

Loading…
Cancel
Save