update vendor

master
兔子 11 months ago
parent ef1d0c37bd
commit 01916bb748

@ -3,7 +3,7 @@ module binlog
go 1.20 go 1.20
require ( require (
b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044
b612.me/starlog v1.3.2 b612.me/starlog v1.3.2
b612.me/staros v1.1.6 b612.me/staros v1.1.6

@ -1,5 +1,5 @@
b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 h1:nUTBhXxtHAZd4p2ppbtj6wg5Ji5bbCAsWu6LAo5XvVs= b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af h1:+toQg4dBQ19XpKPYSa9xRrYqXdHpB1HG5+KOxYiVMlY=
b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA=
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA=
b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94=
b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo=

@ -58,7 +58,11 @@ var cmd = &cobra.Command{
starlog.Warningln("Please enter a binlog path or folder") starlog.Warningln("Please enter a binlog path or folder")
return return
} }
now := time.Now()
ParseBinlog() ParseBinlog()
cost := time.Now().Sub(now).Seconds()
fmt.Println("")
fmt.Printf("Time Cost:%.2fs", cost)
}, },
} }
@ -88,6 +92,7 @@ func ParseBinlog() {
title.AddCell().SetValue("EndPos") title.AddCell().SetValue("EndPos")
title.AddCell().SetValue("事务大小") title.AddCell().SetValue("事务大小")
title.AddCell().SetValue("影响行数") title.AddCell().SetValue("影响行数")
title.AddCell().SetValue("压缩类型")
title.AddCell().SetValue("单语句StartPos") title.AddCell().SetValue("单语句StartPos")
title.AddCell().SetValue("单语句EndPos") title.AddCell().SetValue("单语句EndPos")
title.AddCell().SetValue("单语句时间") title.AddCell().SetValue("单语句时间")
@ -101,7 +106,7 @@ func ParseBinlog() {
res.SetColWidth(1, 1, 40) res.SetColWidth(1, 1, 40)
res.SetColWidth(3, 6, 6) res.SetColWidth(3, 6, 6)
res.SetColWidth(7, 7, 5) res.SetColWidth(7, 7, 5)
res.SetColWidth(15, 15, 40) res.SetColWidth(16, 16, 40)
owrt.Save(outPath) owrt.Save(outPath)
} }
getParser := func(fpath string) string { getParser := func(fpath string) string {
@ -112,17 +117,45 @@ func ParseBinlog() {
if endTime != 0 { if endTime != 0 {
eTime = time.Unix(endTime, 0) eTime = time.Unix(endTime, 0)
} }
onlyGtid := false
if !vbo && outPath == "" {
onlyGtid = true
}
var filter = binlog.BinlogFilter{ var filter = binlog.BinlogFilter{
IncludeGtid: includeGtid, IncludeGtid: includeGtid,
ExcludeGtid: excludeGtid, ExcludeGtid: excludeGtid,
StartPos: startPos, StartPos: startPos,
EndPos: endPos, EndPos: endPos,
StartDate: sTime, StartDate: sTime,
EndDate: eTime, EndDate: eTime,
BigThan: bigThan, BigThan: bigThan,
SmallThan: smallThan, SmallThan: smallThan,
OnlyShowGtid: onlyGtid,
} }
var cGtid *gtid.Gtid var cGtid *gtid.Gtid
proc := make(chan string, 1000)
if !vbo {
go func() {
var latest string
var count = uint16(0)
for {
select {
case tmp := <-proc:
if tmp == "end" {
fmt.Println(latest)
return
}
latest = tmp
if count%10 == 0 {
fmt.Print(latest)
}
count++
case <-time.After(time.Second * 2):
fmt.Print(latest)
}
}
}()
}
err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) { err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) {
foundCount++ foundCount++
if cGtid == nil { if cGtid == nil {
@ -136,7 +169,7 @@ func ParseBinlog() {
totalGtid.Add(tx.GTID) totalGtid.Add(tx.GTID)
} }
if !vbo { if !vbo {
fmt.Printf("已找到%d个合规GTID\r", foundCount) proc <- fmt.Sprintf("已找到%d个合法GTID\r", foundCount)
} else { } else {
fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n", fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n",
tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs) tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs)
@ -155,6 +188,11 @@ func ParseBinlog() {
r.AddCell().SetValue(tx.EndPos) r.AddCell().SetValue(tx.EndPos)
r.AddCell().SetValue(tx.Size) r.AddCell().SetValue(tx.Size)
r.AddCell().SetValue(tx.RowsCount) r.AddCell().SetValue(tx.RowsCount)
if t.CompressionType == "" {
r.AddCell().SetValue("NONE")
} else {
r.AddCell().SetValue(t.CompressionType)
}
r.AddCell().SetValue(t.StartPos) r.AddCell().SetValue(t.StartPos)
r.AddCell().SetValue(t.EndPos) r.AddCell().SetValue(t.EndPos)
r.AddCell().SetValue(t.Time.String()) r.AddCell().SetValue(t.Time.String())
@ -167,6 +205,9 @@ func ParseBinlog() {
} }
} }
}) })
if !vbo {
time.Sleep(time.Millisecond * 500)
}
var cGtidStr string var cGtidStr string
if cGtid != nil { if cGtid != nil {
cGtidStr = cGtid.String() cGtidStr = cGtid.String()

@ -12,16 +12,17 @@ import (
) )
type TxDetail struct { type TxDetail struct {
StartPos int StartPos int
EndPos int EndPos int
RowCount int RowCount int
Timestamp int64 Timestamp int64
Time time.Time Time time.Time
Sql string Sql string
Db string Db string
Table string Table string
SqlType string SqlType string
Rows [][]interface{} CompressionType string
Rows [][]interface{}
} }
type Transaction struct { type Transaction struct {
@ -80,13 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
var ( var (
err error err error
n int64 n int64
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
tbMapPos uint32 = 0 tbMapPos uint32 = 0
rows [][]interface{}
) )
var tx Transaction var tx Transaction
currentGtid := "" currentGtid := ""
@ -157,100 +152,105 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) evs := ParseBinlogEvent(binEvent)
startPos := 0 for _, ev := range evs {
if sqlType == "query" || sqlType == "gtid" { startPos := 0
startPos = int(h.LogPos - h.EventSize) if ev.Type == "query" || ev.Type == "gtid" {
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) startPos = int(h.LogPos - h.EventSize)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
} else { // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
startPos = int(tbMapPos) } else {
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) startPos = int(tbMapPos)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
} // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
switch sqlType { }
case "gtid": switch ev.Type {
if currentGtid != "" { case "gtid":
idx := 0 if currentGtid != "" {
for k, v := range tx.Txs { idx := 0
if v.SqlType != "query" && len(tx.sqlOrigin) > idx { for k, v := range tx.Txs {
v.Sql = tx.sqlOrigin[idx] if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
idx++ v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
if f != nil {
f(tx)
} }
tx.RowsCount += v.RowCount
tx.Txs[k] = v
} }
tx.Size = tx.EndPos - tx.StartPos currentGtid = ev.Data
if f != nil { tx = Transaction{
f(tx) GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
} }
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
} }
currentGtid = sql
tx = Transaction{
GTID: sql,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, sql)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: db,
Table: tb,
Sql: sql,
SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
})
} }
} }
} }
func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) { type BinlogEvent struct {
var ( Type string
db string = "" DB string
tb string = "" TB string
sql string = "" Data string
sqlType string = "" RowCnt uint32
rowCnt uint32 = 0 Rows [][]interface{}
rows [][]interface{} CompressionType string
) }
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent
var sig BinlogEvent
switch ev.Header.EventType { switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT: case replication.ANONYMOUS_GTID_EVENT:
//ge := ev.Event.(*replication.GTIDEvent) //ge := ev.Event.(*replication.GTIDEvent)
sql = "anonymous-gtid-event:1" sig.Data = "anonymous-gtid-event:1"
sqlType = "gtid" sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1, case replication.WRITE_ROWS_EVENTv1,
replication.WRITE_ROWS_EVENTv2: replication.WRITE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent) wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table) sig.TB = string(wrEvent.Table.Table)
sqlType = "insert" sig.Type = "insert"
rowCnt = uint32(len(wrEvent.Rows)) sig.RowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows sig.Rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1, case replication.UPDATE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv2: replication.UPDATE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent) wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema) sig.DB = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table) sig.DB = string(wrEvent.Table.Table)
sqlType = "update" sig.Type = "update"
rowCnt = uint32(len(wrEvent.Rows)) / 2 sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
rows = wrEvent.Rows sig.Rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1, case replication.DELETE_ROWS_EVENTv1,
replication.DELETE_ROWS_EVENTv2: replication.DELETE_ROWS_EVENTv2:
@ -258,52 +258,66 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
//replication.TABLE_MAP_EVENT: //replication.TABLE_MAP_EVENT:
wrEvent := ev.Event.(*replication.RowsEvent) wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema) sig.DB = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table) sig.TB = string(wrEvent.Table.Table)
sqlType = "delete" sig.Type = "delete"
rowCnt = uint32(len(wrEvent.Rows)) sig.RowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows sig.Rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT: case replication.ROWS_QUERY_EVENT:
queryEvent := ev.Event.(*replication.RowsQueryEvent) queryEvent := ev.Event.(*replication.RowsQueryEvent)
sql = string(queryEvent.Query) sig.Data = string(queryEvent.Query)
sqlType = "rowsquery" sig.Type = "rowsquery"
case replication.QUERY_EVENT: case replication.QUERY_EVENT:
queryEvent := ev.Event.(*replication.QueryEvent) queryEvent := ev.Event.(*replication.QueryEvent)
db = string(queryEvent.Schema) sig.DB = string(queryEvent.Schema)
sql = string(queryEvent.Query) sig.Data = string(queryEvent.Query)
sqlType = "query" sig.Type = "query"
case replication.MARIADB_GTID_EVENT: case replication.MARIADB_GTID_EVENT:
// For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl).
//https://mariadb.com/kb/en/library/gtid_event/ //https://mariadb.com/kb/en/library/gtid_event/
sql = "begin" sig.Data = "begin"
sqlType = "query" sig.Type = "query"
case replication.XID_EVENT: case replication.XID_EVENT:
// XID_EVENT represents commit。rollback transaction not in binlog // XID_EVENT represents commit。rollback transaction not in binlog
sql = "commit" sig.Data = "commit"
sqlType = "query" sig.Type = "query"
case replication.GTID_EVENT: case replication.GTID_EVENT:
ge := ev.Event.(*replication.GTIDEvent) ge := ev.Event.(*replication.GTIDEvent)
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err == nil { if err == nil {
sql = gid.String() sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge := ev.Event.(*replication.TransactionPayloadEvent)
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
for idx := range res {
if ge.CompressionType == 0 {
res[idx].CompressionType = "ZSTD"
} else if ge.CompressionType != 255 {
res[idx].CompressionType = "UNKNOWN"
}
} }
sqlType = "gtid" return res
} }
return db, tb, sqlType, sql, rowCnt, rows res = append(res, sig)
return res
} }
type BinlogFilter struct { type BinlogFilter struct {
IncludeGtid string IncludeGtid string
ExcludeGtid string ExcludeGtid string
StartPos int StartPos int
EndPos int EndPos int
StartDate time.Time StartDate time.Time
EndDate time.Time EndDate time.Time
BigThan int BigThan int
SmallThan int SmallThan int
OnlyShowGtid bool
} }
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error {
@ -325,14 +339,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
// process: 0, continue: 1, break: 2, EOF: 3 // process: 0, continue: 1, break: 2, EOF: 3
var ( var (
n int64 n int64
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
tbMapPos uint32 = 0 tbMapPos uint32 = 0
skipTillNext bool = false skipTillNext bool = false
rows [][]interface{}
) )
var tx Transaction var tx Transaction
@ -398,6 +406,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
if skipTillNext && h.EventType != replication.GTID_EVENT { if skipTillNext && h.EventType != replication.GTID_EVENT {
continue continue
} }
if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT {
continue
}
//h.Dump(os.Stdout) //h.Dump(os.Stdout)
data := buf.Bytes() data := buf.Bytes()
@ -426,78 +437,81 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) evs := ParseBinlogEvent(binEvent)
startPos := 0 for _, ev := range evs {
if sqlType == "query" || sqlType == "gtid" { startPos := 0
startPos = int(h.LogPos - h.EventSize) if ev.Type == "query" || ev.Type == "gtid" {
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) startPos = int(h.LogPos - h.EventSize)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
} else { // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
startPos = int(tbMapPos) } else {
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) startPos = int(tbMapPos)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
} // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
switch sqlType {
case "gtid":
if skipTillNext {
skipTillNext = false
} }
if currentGtid != "" { switch ev.Type {
idx := 0 case "gtid":
for k, v := range tx.Txs { if skipTillNext {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx { skipTillNext = false
v.Sql = tx.sqlOrigin[idx] }
idx++ if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
} }
tx.RowsCount += v.RowCount tx.Size = tx.EndPos - tx.StartPos
tx.Txs[k] = v callFn(tx)
} }
tx.Size = tx.EndPos - tx.StartPos currentGtid = ev.Data
callFn(tx) if inGtid != nil {
} if c, _ := inGtid.Contain(ev.Data); !c {
currentGtid = sql currentGtid = ""
if inGtid != nil { skipTillNext = true
if c, _ := inGtid.Contain(sql); !c { continue
currentGtid = "" }
skipTillNext = true
continue
} }
} if exGtid != nil {
if exGtid != nil { if c, _ := exGtid.Contain(ev.Data); c {
if c, _ := exGtid.Contain(sql); c { currentGtid = ""
currentGtid = "" skipTillNext = true
skipTillNext = true continue
continue }
} }
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
} }
tx = Transaction{
GTID: sql,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, sql)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: db,
Table: tb,
Sql: sql,
SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
})
} }
} }
} }

@ -1,4 +1,4 @@
# b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 # b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af
## explicit; go 1.20 ## explicit; go 1.20
b612.me/mysql/binlog b612.me/mysql/binlog
# b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044

Loading…
Cancel
Save