You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

784 lines
21 KiB
Go

package binlog
import (
"b612.me/mysql/gtid"
"b612.me/staros"
"bytes"
"fmt"
"github.com/starainrt/go-mysql/replication"
"io"
"os"
"strings"
"time"
)
type TxDetail struct {
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{}
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
TxStartTime int64 `json:"txStartTime"`
TxEndTime int64 `json:"txEndTime"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
validSchemaCount int
}
func (t Transaction) GetSqlOrigin() []string {
return t.sqlOrigin
}
func ParseBinlogFile(path string, fx func(transaction Transaction) bool) error {
return parseOneBinlog(path, fx)
}
func parseOneBinlog(path string, fx func(Transaction) bool) error {
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if f != nil {
defer f.Close()
}
if err != nil {
return err
}
fileTypeBytes := int64(4)
b := make([]byte, fileTypeBytes)
// 读取binlog头
if _, err = f.Read(b); err != nil {
return err
} else if !bytes.Equal(b, replication.BinLogFileHeader) {
//不是binlog格式
return err
}
// must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped
if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil {
return err
}
return parseBinlogDetail(f, fx)
}
func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
parse := replication.NewBinlogParser()
parse.SetParseTime(false)
parse.SetUseDecimal(false)
// process: 0, continue: 1, break: 2, EOF: 3
var (
err error
n int64
tbMapPos uint32 = 0
)
var tx Transaction
currentGtid := ""
for {
headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
if f != nil {
f(tx)
}
return nil
} else if err != nil {
return err
}
var h *replication.EventHeader
h, err = parse.ParseHeader(headBuf)
if err != nil {
return err
}
//fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT))
if h.EventSize <= uint32(replication.EventHeaderSize) {
err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize)
return err
}
var buf bytes.Buffer
if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil {
err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n)
return err
}
//h.Dump(os.Stdout)
data := buf.Bytes()
var rawData []byte
rawData = append(rawData, headBuf...)
rawData = append(rawData, data...)
eventLen := int(h.EventSize) - replication.EventHeaderSize
if len(data) != eventLen {
err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
return err
}
var e replication.Event
e, err = parse.ParseEvent(h, data, rawData)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event
}
//e.Dump(os.Stdout)
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
evs := ParseBinlogEvent(binEvent)
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
switch ev.Type {
case "gtid":
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
tx.Size = tx.EndPos - tx.StartPos
if f != nil {
if !f(tx) {
return nil
}
}
}
currentGtid = ev.Data
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
}
}
}
}
type BinlogEvent struct {
Type string
DB string
TB string
Data string
RowCnt uint32
Rows [][]interface{}
CompressionType string
}
func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent {
var res []BinlogEvent
var sig BinlogEvent
switch ev.Header.EventType {
case replication.ANONYMOUS_GTID_EVENT:
//ge := ev.Event.(*replication.GTIDEvent)
sig.Data = "anonymous-gtid-event:1"
sig.Type = "gtid"
case replication.WRITE_ROWS_EVENTv1,
replication.WRITE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent)
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "insert"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.UPDATE_ROWS_EVENTv1,
replication.UPDATE_ROWS_EVENTv2:
wrEvent := ev.Event.(*replication.RowsEvent)
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "update"
sig.RowCnt = uint32(len(wrEvent.Rows)) / 2
sig.Rows = wrEvent.Rows
case replication.DELETE_ROWS_EVENTv1,
replication.DELETE_ROWS_EVENTv2:
//replication.XID_EVENT,
//replication.TABLE_MAP_EVENT:
wrEvent := ev.Event.(*replication.RowsEvent)
sig.DB = string(wrEvent.Table.Schema)
sig.TB = string(wrEvent.Table.Table)
sig.Type = "delete"
sig.RowCnt = uint32(len(wrEvent.Rows))
sig.Rows = wrEvent.Rows
case replication.ROWS_QUERY_EVENT:
queryEvent := ev.Event.(*replication.RowsQueryEvent)
sig.Data = string(queryEvent.Query)
sig.Type = "rowsquery"
case replication.QUERY_EVENT:
queryEvent := ev.Event.(*replication.QueryEvent)
sig.DB = string(queryEvent.Schema)
sig.Data = string(queryEvent.Query)
sig.Type = "query"
case replication.MARIADB_GTID_EVENT:
// For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl).
//https://mariadb.com/kb/en/library/gtid_event/
sig.Data = "begin"
sig.Type = "query"
case replication.XID_EVENT:
// XID_EVENT represents commit。rollback transaction not in binlog
sig.Data = "commit"
sig.Type = "query"
case replication.GTID_EVENT:
ge := ev.Event.(*replication.GTIDEvent)
gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO))
if err == nil {
sig.Data = gid.String()
}
sig.Type = "gtid"
case replication.TRANSACTION_PAYLOAD_EVENT:
ge := ev.Event.(*replication.TransactionPayloadEvent)
for _, val := range ge.Events {
res = append(res, ParseBinlogEvent(val)...)
}
for idx := range res {
if ge.CompressionType == 0 {
res[idx].CompressionType = "ZSTD"
} else if ge.CompressionType != 255 {
res[idx].CompressionType = "UNKNOWN"
}
}
return res
}
res = append(res, sig)
return res
}
type BinlogFilter struct {
IncludeGtid string
ExcludeGtid string
IncludeTables []string
ExcludeTables []string
StartPos int
EndPos int
StartDate time.Time
EndDate time.Time
BigThan int
SmallThan int
OnlyShowGtid bool
OnlyShowDML bool
PickTxAllIfMatch bool
ExcludeBlank bool
IncludeBlank bool
}
func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction) bool) error {
var subGtid, inGtid, exGtid *gtid.Gtid
var err error
var includeMap = make(map[string]bool)
var excludeMap = make(map[string]bool)
if len(filter.IncludeTables) != 0 {
for _, v := range filter.IncludeTables {
if len(strings.Split(v, ".")) != 2 {
return fmt.Errorf("IncludeTable Name Is Invalid:%s", v)
}
includeMap[v] = true
}
}
if len(filter.ExcludeTables) != 0 {
for _, v := range filter.ExcludeTables {
if len(strings.Split(v, ".")) != 2 {
return fmt.Errorf("ExcludeTable Name Is Invalid:%s", v)
}
excludeMap[v] = true
}
}
matchInclude := func(db, tb string) bool {
if len(filter.IncludeTables) == 0 {
return false
}
if db == "" && tb == "" {
if filter.IncludeBlank {
return true
}
return false
}
if _, ok := includeMap["*.*"]; ok {
return true
}
if _, ok := includeMap[db+"."+tb]; ok {
return true
}
if _, ok := includeMap[db+".*"]; ok {
return true
}
if _, ok := includeMap["*."+tb]; ok {
return true
}
return false
}
matchExclude := func(db, tb string) bool {
if len(filter.ExcludeTables) == 0 {
return false
}
if db == "" && tb == "" {
if filter.ExcludeBlank {
return true
}
return false
}
if _, ok := excludeMap["*.*"]; ok {
return true
}
if _, ok := excludeMap[db+"."+tb]; ok {
return true
}
if _, ok := excludeMap[db+".*"]; ok {
return true
}
if _, ok := excludeMap["*."+tb]; ok {
return true
}
return false
}
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
if err != nil {
return err
}
subGtid = inGtid.Clone()
}
if filter.ExcludeGtid != "" {
exGtid, err = gtid.Parse(filter.ExcludeGtid)
if err != nil {
return err
}
}
// process: 0, continue: 1, break: 2, EOF: 3
var (
n int64
tbMapPos uint32 = 0
skipTillNext bool = false
)
var tx Transaction
currentGtid := ""
callFn := func(tx Transaction) bool {
if fn == nil {
return true
}
if !filter.StartDate.IsZero() && filter.StartDate.After(tx.Time) {
return true
}
if !filter.EndDate.IsZero() && filter.EndDate.Before(tx.Time) {
return true
}
if filter.StartPos != 0 && filter.StartPos > tx.StartPos {
return true
}
if filter.EndPos != 0 && filter.EndPos < tx.EndPos {
return true
}
if filter.BigThan != 0 && filter.BigThan > tx.Size {
return true
}
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if !filter.OnlyShowGtid && filter.OnlyShowDML && tx.validSchemaCount == 0 {
return true
}
var txs []TxDetail
var matched bool
for _, t := range tx.Txs {
if len(includeMap) != 0 {
if matchInclude(t.Db, t.Table) {
matched = true
if filter.PickTxAllIfMatch {
return fn(tx)
}
txs = append(txs, t)
}
}
if len(excludeMap) != 0 {
if matchExclude(t.Db, t.Table) {
matched = true
if filter.PickTxAllIfMatch {
return true
}
} else {
txs = append(txs, t)
}
}
}
if matched {
tx.Txs = txs
}
if !matched && len(filter.IncludeTables) > 0 {
return true
}
if len(tx.Txs) == 0 && matched {
return true
}
return fn(tx)
}
for {
headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(r, headBuf); err == io.EOF {
if tx.Time.IsZero() {
return nil
}
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if filter.OnlyShowGtid {
tx.EndPos = tx.StartPos
}
tx.Size = tx.EndPos - tx.StartPos
callFn(tx)
return nil
} else if err != nil {
return err
}
var h *replication.EventHeader
h, err = parse.ParseHeader(headBuf)
if err != nil {
return err
}
//fmt.Printf("parsing %s %d %s\n", *binlog, h.LogPos, GetDatetimeStr(int64(h.Timestamp), int64(0), DATETIME_FORMAT))
if h.EventSize <= uint32(replication.EventHeaderSize) {
err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize)
return err
}
var buf bytes.Buffer
if n, err = io.CopyN(&buf, r, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil {
err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n)
return err
}
if skipTillNext && h.EventType != replication.GTID_EVENT {
continue
}
if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT {
continue
}
//h.Dump(os.Stdout)
data := buf.Bytes()
var rawData []byte
rawData = append(rawData, headBuf...)
rawData = append(rawData, data...)
eventLen := int(h.EventSize) - replication.EventHeaderSize
if len(data) != eventLen {
err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
return err
}
var e replication.Event
e, err = parse.ParseEvent(h, data, rawData)
if err != nil {
return err
}
if h.EventType == replication.TABLE_MAP_EVENT {
tbMapPos = h.LogPos - h.EventSize // avoid mysqlbing mask the row event as unknown table row event
}
//e.Dump(os.Stdout)
//binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e}
binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data
evs := ParseBinlogEvent(binEvent)
for _, ev := range evs {
startPos := 0
if ev.Type == "query" || ev.Type == "gtid" {
startPos = int(h.LogPos - h.EventSize)
//fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
} else {
startPos = int(tbMapPos)
//fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType)
// cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos,
// Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType}
}
if filter.EndPos != 0 && startPos > filter.EndPos {
skipTillNext = true
continue
}
if filter.StartPos != 0 && startPos < filter.EndPos {
skipTillNext = true
continue
}
switch ev.Type {
case "gtid":
if skipTillNext {
skipTillNext = false
}
if currentGtid != "" {
idx := 0
for k, v := range tx.Txs {
if v.SqlType != "query" && len(tx.sqlOrigin) > idx {
v.Sql = tx.sqlOrigin[idx]
idx++
}
tx.RowsCount += v.RowCount
tx.Txs[k] = v
}
if filter.OnlyShowGtid {
tx.EndPos = startPos - 1
}
tx.Size = tx.EndPos - tx.StartPos
if !callFn(tx) {
return nil
}
if subGtid != nil {
subGtid.Sub(tx.GTID)
if subGtid.EventCount() == 0 {
return nil
}
}
tx = Transaction{}
}
currentGtid = ev.Data
if inGtid != nil {
if c, _ := inGtid.Contain(ev.Data); !c {
tx = Transaction{}
currentGtid = ""
skipTillNext = true
continue
}
}
if exGtid != nil {
if c, _ := exGtid.Contain(ev.Data); c {
currentGtid = ""
skipTillNext = true
continue
}
}
tx = Transaction{
GTID: ev.Data,
StartPos: startPos,
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
}
case "":
tx.EndPos = int(h.LogPos)
continue
case "rowsquery":
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
if tx.TxStartTime == 0 {
tx.TxStartTime = int64(h.Timestamp)
}
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
tx.TxEndTime = int64(h.Timestamp)
case "rollback":
status = STATUS_ROLLBACK
tx.TxEndTime = int64(h.Timestamp)
}
tx.Status = status
}
if ev.DB != "" && ev.TB != "" {
tx.validSchemaCount++
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),
Db: ev.DB,
Table: ev.TB,
Sql: ev.Data,
SqlType: ev.Type,
Rows: ev.Rows,
RowCount: int(ev.RowCnt),
Timestamp: int64(h.Timestamp),
Time: time.Unix(int64(h.Timestamp), 0),
CompressionType: ev.CompressionType,
})
}
}
}
}
func ParseBinlogWithFilter(path string, pos int64, filter BinlogFilter, fx func(Transaction) bool) error {
defer func() {
recover()
}()
if !staros.Exists(path) {
return os.ErrNotExist
}
f, err := os.Open(path)
if f != nil {
defer f.Close()
}
if err != nil {
return err
}
parse := replication.NewBinlogParser()
parse.SetParseTime(false)
parse.SetUseDecimal(false)
seekZore := func() error {
fileTypeBytes := int64(4)
b := make([]byte, fileTypeBytes)
// 读取binlog头
if _, err = f.Read(b); err != nil {
return err
} else if !bytes.Equal(b, replication.BinLogFileHeader) {
//不是binlog格式
return err
}
// must not seek to other position, otherwise the program may panic because formatevent, table map event is skipped
if _, err = f.Seek(fileTypeBytes, os.SEEK_SET); err != nil {
return err
}
return nil
}
if pos != 0 {
if err = seekZore(); err != nil {
return err
}
for {
headBuf := make([]byte, replication.EventHeaderSize)
if _, err = io.ReadFull(f, headBuf); err != nil {
return err
}
var h *replication.EventHeader
h, err = parse.ParseHeader(headBuf)
if err != nil {
return err
}
if h.EventSize <= uint32(replication.EventHeaderSize) {
err = fmt.Errorf("invalid event header, event size is %d, too small", h.EventSize)
return err
}
var buf bytes.Buffer
if n, err := io.CopyN(&buf, f, int64(h.EventSize)-int64(replication.EventHeaderSize)); err != nil {
err = fmt.Errorf("get event body err %v, need %d - %d, but got %d", err, h.EventSize, replication.EventHeaderSize, n)
return err
}
data := buf.Bytes()
var rawData []byte
rawData = append(rawData, headBuf...)
rawData = append(rawData, data...)
eventLen := int(h.EventSize) - replication.EventHeaderSize
if len(data) != eventLen {
err = fmt.Errorf("invalid data size %d in event %s, less event length %d", len(data), h.EventType, eventLen)
return err
}
_, err = parse.ParseEvent(h, data, rawData)
if err != nil {
return err
}
if h.EventType == replication.FORMAT_DESCRIPTION_EVENT || h.EventType == replication.GTID_EVENT {
break
}
}
if _, err = f.Seek(pos, 0); err != nil {
return err
}
}
if pos == 0 {
if err = seekZore(); err != nil {
return err
}
}
return parseBinlogWithFilter(f, parse, filter, fx)
}