support binlog compression

master
兔子 2 years ago
parent 39ca67fcfe
commit 8b0423eb94

@ -12,16 +12,17 @@ import (
)
type TxDetail struct {
StartPos int
EndPos int
RowCount int
Timestamp int64
Time time.Time
Sql string
Db string
Table string
SqlType string
Rows [][]interface{}
StartPos int
EndPos int
RowCount int
Timestamp int64
Time time.Time
Sql string
Db string
Table string
SqlType string
CompressionType string
Rows [][]interface{}
}
type Transaction struct {
@ -80,13 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
var (
err error
n int64
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
tbMapPos uint32 = 0
rows [][]interface{}
)
var tx Transaction
currentGtid := ""
@ -157,100 +152,105 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error {
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent)
startPos := 0
if sqlType == "query" || sqlType == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
switch sqlType {
case "gtid":
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
evs := ParseBinlogEvent(binEvent)
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
if f != nil {
f(tx)
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
if f != nil {
f(tx)
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
}
currentGtid = sql
tx = Transaction{
GTID: sql,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, sql)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: db,
Table: tb,
Sql: sql,
SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
})
}
}
}
func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) {
var (
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
rows [][]interface{}
)
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
CompressionType string
}
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent
var sig BinlogEvent
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
//ge := ev.Event.(*replication.GTIDEvent)
sql = "anonymous-gtid-event:1"
sqlType = "gtid"
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1,
replication.WRITE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table)
sqlType = "insert"
rowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows
sig.TB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table)
sqlType = "update"
rowCnt = uint32(len(wrEvent.Rows)) / 2
rows = wrEvent.Rows
sig.DB = string(wrEvent.Table.Schema)
sig.DB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1,
replication.DELETE_ROWS_EVENTv2:
@ -258,52 +258,66 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
//replication.TABLE_MAP_EVENT:
wrEvent := ev.Event.(*replication.RowsEvent)
db = string(wrEvent.Table.Schema)
tb = string(wrEvent.Table.Table)
sqlType = "delete"
rowCnt = uint32(len(wrEvent.Rows))
rows = wrEvent.Rows
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT:
queryEvent := ev.Event.(*replication.RowsQueryEvent)
sql = string(queryEvent.Query)
sqlType = "rowsquery"
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent := ev.Event.(*replication.QueryEvent)
db = string(queryEvent.Schema)
sql = string(queryEvent.Query)
sqlType = "query"
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
// For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl).
//https://mariadb.com/kb/en/library/gtid_event/
sql = "begin"
sqlType = "query"
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
// XID_EVENT represents commit。rollback transaction not in binlog
sql = "commit"
sqlType = "query"
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge := ev.Event.(*replication.GTIDEvent)
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err == nil {
sql = gid.String()
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge := ev.Event.(*replication.TransactionPayloadEvent)
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
for idx := range res {
if ge.CompressionType == 0 {
res[idx].CompressionType = "ZSTD"
} else if ge.CompressionType != 255 {
res[idx].CompressionType = "UNKNOWN"
}
}
sqlType = "gtid"
return res
}
return db, tb, sqlType, sql, rowCnt, rows
res = append(res, sig)
return res
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
IncludeGtid string
ExcludeGtid string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
}
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error {
@ -325,14 +339,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
// process: 0, continue: 1, break: 2, EOF: 3
var (
n int64
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
tbMapPos uint32 = 0
skipTillNext bool = false
rows [][]interface{}
)
var tx Transaction
@ -398,6 +406,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
if skipTillNext && h.EventType != replication.GTID_EVENT {
continue
}
if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT {
continue
}
//h.Dump(os.Stdout)
data := buf.Bytes()
@ -426,78 +437,81 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent)
startPos := 0
if sqlType == "query" || sqlType == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
switch sqlType {
case "gtid":
if skipTillNext {
skipTillNext = false
evs := ParseBinlogEvent(binEvent)
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
switch ev.Type {
case "gtid":
if skipTillNext {
skipTillNext = false
}
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
tx.Size = tx.EndPos - tx.StartPos
callFn(tx)
}
tx.Size = tx.EndPos - tx.StartPos
callFn(tx)
}
currentGtid = sql
if inGtid != nil {
if c, _ := inGtid.Contain(sql); !c {
currentGtid = ""
skipTillNext = true
continue
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
currentGtid = ""
skipTillNext = true
continue
}
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(sql); c {
currentGtid = ""
skipTillNext = true
continue
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
skipTillNext = true
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
}
tx = Transaction{
GTID: sql,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, sql)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: db,
Table: tb,
Sql: sql,
SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
})
}
}
}

@ -2,6 +2,7 @@ package binlog
import (
"fmt"
"os"
"testing"
"time"
)
@ -26,3 +27,15 @@ func TestParseFilter(t *testing.T) {
fmt.Println(transaction)
})
}
func TestParseExternal(t *testing.T) {
file := `C:\mysql-bin.000001`
if _, err := os.Stat(file); err != nil {
return
}
ParseBinlogWithFilter(file, 0, BinlogFilter{
OnlyShowGtid: true,
}, func(transaction Transaction) {
fmt.Println(transaction)
})
}

Loading…
Cancel
Save