add more func

master
兔子 1 year ago
parent 0f74dc3dc8
commit 39ca67fcfe

@ -294,3 +294,296 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string,
return db, tb, sqlType, sql, rowCnt, rows
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
}
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error {
var inGtid, exGtid *gtid.Gtid
var err error
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return err
}
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return err
}
}
// process: 0, continue: 1, break: 2, EOF: 3
var (
n int64
db string = ""
tb string = ""
sql string = ""
sqlType string = ""
rowCnt uint32 = 0
tbMapPos uint32 = 0
skipTillNext bool = false
rows [][]interface{}
)
var tx Transaction
currentGtid := ""
callFn := func(tx Transaction) {
if fn == nil {
return
}
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return
}
fn(tx)
}
for {
headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
callFn(tx)
return nil
} else if err != nil {
return err
}
var h *replication.EventHeader
h, err = parse.ParseHeader(headBuf)
if err != nil {
return err
}
//fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT))
if h.EventSize <= uint32(replication.EventHeaderSize) {
err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize)
return err
}
var buf bytes.Buffer
if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil {
err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n)
return err
}
if skipTillNext && h.EventType != replication.GTID_EVENT {
continue
}
//h.Dump(os.Stdout)
data := buf.Bytes()
var rawData []byte
rawData = append(rawData, headBuf...)
rawData = append(rawData, data...)
eventLen := int(h.EventSize) - replication.EventHeaderSize
if len(data) != eventLen {
err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
return err
}
var e replication.Event
e, err = parse.ParseEvent(h, data, rawData)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event
}
//e.Dump(os.Stdout)
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent)
startPos := 0
if sqlType == "query" || sqlType == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
switch sqlType {
case "gtid":
if skipTillNext {
skipTillNext = false
}
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
callFn(tx)
}
currentGtid = sql
if inGtid != nil {
if c, _ := inGtid.Contain(sql); !c {
currentGtid = ""
skipTillNext = true
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(sql); c {
currentGtid = ""
skipTillNext = true
continue
}
}
tx = Transaction{
GTID: sql,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, sql)
default:
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: db,
Table: tb,
Sql: sql,
SqlType: sqlType,
Rows: rows,
RowCount: int(rowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
})
}
}
}
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction)) error {
defer func() {
recover()
}()
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if f != nil {
defer f.Close()
}
if err != nil {
return err
}
parse := replication.NewBinlogParser()
parse.SetParseTime(false)
parse.SetUseDecimal(false)
seekZore := func() error {
fileTypeBytes := int64(4)
b := make([]byte, fileTypeBytes)
// 读取binlog头
if _, err = f.Read(b); err != nil {
return err
} else if !bytes.Equal(b, replication.BinLogFileHeader) {
//不是binlog格式
return err
}
// must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped
if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil {
return err
}
return nil
}
if pos != 0 {
if err = seekZore(); err != nil {
return err
}
for {
headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(f, headBuf); err != nil {
return err
}
var h *replication.EventHeader
h, err = parse.ParseHeader(headBuf)
if err != nil {
return err
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize)
return err
}
var buf bytes.Buffer
if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil {
err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n)
return err
}
data := buf.Bytes()
var rawData []byte
rawData = append(rawData, headBuf...)
rawData = append(rawData, data...)
eventLen := int(h.EventSize) - replication.EventHeaderSize
if len(data) != eventLen {
err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
return err
}
_, err = parse.ParseEvent(h, data, rawData)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err = f.Seek(pos, 0); err != nil {
return err
}
}
if pos == 0 {
if err = seekZore(); err != nil {
return err
}
}
return parseBinlogWithFilter(f, parse, filter, fx)
}

@ -3,10 +3,26 @@ package binlog
import (
"fmt"
"testing"
"time"
)
func TestParse(t *testing.T) {
ParseBinlogFile("./test/test-mysql-bin", func(transaction Transaction) {
ParseBinlogFile("./test/mysql-bin.000023", func(transaction Transaction) {
fmt.Println(transaction)
})
}
func TestParseFilter(t *testing.T) {
ParseBinlogWithFilter("./test/mysql-bin.000023", 0, BinlogFilter{
IncludeGtid: "",
ExcludeGtid: "",
StartPos: 0,
EndPos: 0,
StartDate: time.Time{},
EndDate: time.Time{},
BigThan: 0,
SmallThan: 0,
}, func(transaction Transaction) {
fmt.Println(transaction)
})
}

Loading…
Cancel
Save