Compare commits

...

3 Commits

@ -13,29 +13,38 @@ import (
)
type TxDetail struct {
StartPos int
EndPos int
RowCount int
Timestamp int64
Time time.Time
Sql string
Db string
Table string
SqlType string
CompressionType string
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
RowCount int `json:"rowCount"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
Sql string `json:"sql"`
Db string `json:"db"`
Table string `json:"table"`
SqlType string `json:"sqlType"`
CompressionType string `json:"compressionType"`
Rows [][]interface{}
}
const (
STATUS_PREPARE uint8 = iota
STATUS_BEGIN
STATUS_COMMIT
STATUS_ROLLBACK
)
type Transaction struct {
GTID string
Timestamp int64
Time time.Time
StartPos int
EndPos int
Size int
RowsCount int
sqlOrigin []string
Txs []TxDetail
GTID string `json:"gtid"`
Timestamp int64 `json:"timestamp"`
Time time.Time `json:"time"`
StartPos int `json:"startPos"`
EndPos int `json:"endPos"`
Size int `json:"size"`
RowsCount int `json:"rowsCount"`
Status uint8 `json:"status"`
sqlOrigin []string `json:"sqlOrigin"`
Txs []TxDetail `json:"txs"`
matchFilterSchema bool
}
func (t Transaction) GetSqlOrigin() []string {
@ -200,6 +209,18 @@ func parseBinlogDetail(r io.Reader, f func(Transaction) bool) error {
tx.EndPos = int(h.LogPos)
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
tx.EndPos = int(h.LogPos)
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
@ -347,8 +368,39 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
}
excludeMap[v] = true
}
} else {
excludeMap["*.*"] = true
}
matchTbs := func(db, tb string) bool {
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
return true
}
if _, ok := includeMap["*.*"]; ok {
return true
}
if _, ok := excludeMap["*.*"]; ok {
return false
}
if _, ok := includeMap[db+"."+tb]; ok {
return true
}
if _, ok := excludeMap[db+"."+tb]; ok {
return false
}
if _, ok := includeMap[db+".*"]; ok {
return true
}
if _, ok := excludeMap[db+".*"]; ok {
return false
}
if _, ok := includeMap["*."+tb]; ok {
return true
}
if _, ok := excludeMap["*."+tb]; ok {
return false
}
if len(includeMap) != 0 {
return false
}
return true
}
if filter.IncludeGtid != "" {
inGtid, err = gtid.Parse(filter.IncludeGtid)
@ -394,6 +446,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
return true
}
if len(tx.Txs) == 0 && tx.matchFilterSchema {
return true
}
return fn(tx)
}
for {
@ -551,6 +606,22 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
tx.sqlOrigin = append(tx.sqlOrigin, ev.Data)
default:
tx.EndPos = int(h.LogPos)
status := STATUS_PREPARE
if ev.Type == "query" {
switch strings.ToLower(ev.Data) {
case "begin":
status = STATUS_BEGIN
case "commit":
status = STATUS_COMMIT
case "rollback":
status = STATUS_ROLLBACK
}
tx.Status = status
}
if !matchTbs(ev.DB, ev.TB) {
tx.matchFilterSchema = true
continue
}
tx.Txs = append(tx.Txs, TxDetail{
StartPos: startPos,
EndPos: int(h.LogPos),

Loading…
Cancel
Save