add more feature
This commit is contained in:
parent
61d231f6b6
commit
9b9b211c0a
83
parse.go
83
parse.go
@ -13,16 +13,16 @@ import (
|
||||
)
|
||||
|
||||
type TxDetail struct {
|
||||
StartPos int
|
||||
EndPos int
|
||||
RowCount int
|
||||
Timestamp int64
|
||||
Time time.Time
|
||||
Sql string
|
||||
Db string
|
||||
Table string
|
||||
SqlType string
|
||||
CompressionType string
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
RowCount int `json:"rowCount"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
Sql string `json:"sql"`
|
||||
Db string `json:"db"`
|
||||
Table string `json:"table"`
|
||||
SqlType string `json:"sqlType"`
|
||||
CompressionType string `json:"compressionType"`
|
||||
Rows [][]interface{}
|
||||
}
|
||||
|
||||
@ -34,16 +34,17 @@ const (
|
||||
)
|
||||
|
||||
type Transaction struct {
|
||||
GTID string
|
||||
Timestamp int64
|
||||
Time time.Time
|
||||
StartPos int
|
||||
EndPos int
|
||||
Size int
|
||||
RowsCount int
|
||||
Status uint8
|
||||
sqlOrigin []string
|
||||
Txs []TxDetail
|
||||
GTID string `json:"gtid"`
|
||||
Timestamp int64 `json:"timestamp"`
|
||||
Time time.Time `json:"time"`
|
||||
StartPos int `json:"startPos"`
|
||||
EndPos int `json:"endPos"`
|
||||
Size int `json:"size"`
|
||||
RowsCount int `json:"rowsCount"`
|
||||
Status uint8 `json:"status"`
|
||||
sqlOrigin []string `json:"sqlOrigin"`
|
||||
Txs []TxDetail `json:"txs"`
|
||||
matchFilterSchema bool
|
||||
}
|
||||
|
||||
func (t Transaction) GetSqlOrigin() []string {
|
||||
@ -367,8 +368,39 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
||||
}
|
||||
excludeMap[v] = true
|
||||
}
|
||||
} else {
|
||||
excludeMap["*.*"] = true
|
||||
}
|
||||
matchTbs := func(db, tb string) bool {
|
||||
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
|
||||
return true
|
||||
}
|
||||
if _, ok := includeMap["*.*"]; ok {
|
||||
return true
|
||||
}
|
||||
if _, ok := excludeMap["*.*"]; ok {
|
||||
return false
|
||||
}
|
||||
if _, ok := includeMap[db+"."+tb]; ok {
|
||||
return true
|
||||
}
|
||||
if _, ok := excludeMap[db+"."+tb]; ok {
|
||||
return false
|
||||
}
|
||||
if _, ok := includeMap[db+".*"]; ok {
|
||||
return true
|
||||
}
|
||||
if _, ok := excludeMap[db+".*"]; ok {
|
||||
return false
|
||||
}
|
||||
if _, ok := includeMap["*."+tb]; ok {
|
||||
return true
|
||||
}
|
||||
if _, ok := excludeMap["*."+tb]; ok {
|
||||
return false
|
||||
}
|
||||
if len(includeMap) != 0 {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
if filter.IncludeGtid != "" {
|
||||
inGtid, err = gtid.Parse(filter.IncludeGtid)
|
||||
@ -414,6 +446,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
||||
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
||||
return true
|
||||
}
|
||||
if len(tx.Txs) == 0 && tx.matchFilterSchema {
|
||||
return true
|
||||
}
|
||||
return fn(tx)
|
||||
}
|
||||
for {
|
||||
@ -583,6 +618,10 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
||||
}
|
||||
tx.Status = status
|
||||
}
|
||||
if !matchTbs(ev.DB, ev.TB) {
|
||||
tx.matchFilterSchema = true
|
||||
continue
|
||||
}
|
||||
tx.Txs = append(tx.Txs, TxDetail{
|
||||
StartPos: startPos,
|
||||
EndPos: int(h.LogPos),
|
||||
|
Loading…
x
Reference in New Issue
Block a user