|
|
@ -34,17 +34,17 @@ const (
|
|
|
|
)
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
type Transaction struct {
|
|
|
|
type Transaction struct {
|
|
|
|
GTID string `json:"gtid"`
|
|
|
|
GTID string `json:"gtid"`
|
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
Timestamp int64 `json:"timestamp"`
|
|
|
|
Time time.Time `json:"time"`
|
|
|
|
Time time.Time `json:"time"`
|
|
|
|
StartPos int `json:"startPos"`
|
|
|
|
StartPos int `json:"startPos"`
|
|
|
|
EndPos int `json:"endPos"`
|
|
|
|
EndPos int `json:"endPos"`
|
|
|
|
Size int `json:"size"`
|
|
|
|
Size int `json:"size"`
|
|
|
|
RowsCount int `json:"rowsCount"`
|
|
|
|
RowsCount int `json:"rowsCount"`
|
|
|
|
Status uint8 `json:"status"`
|
|
|
|
Status uint8 `json:"status"`
|
|
|
|
sqlOrigin []string `json:"sqlOrigin"`
|
|
|
|
sqlOrigin []string `json:"sqlOrigin"`
|
|
|
|
Txs []TxDetail `json:"txs"`
|
|
|
|
Txs []TxDetail `json:"txs"`
|
|
|
|
matchFilterSchema bool
|
|
|
|
validSchemaCount int
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
func (t Transaction) GetSqlOrigin() []string {
|
|
|
|
func (t Transaction) GetSqlOrigin() []string {
|
|
|
@ -371,6 +371,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
|
|
|
|
if len(filter.ExcludeTables) == 0 && len(filter.ExcludeTables) == 0 {
|
|
|
|
return true
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if db == "" && tb == "" {
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
|
|
|
}
|
|
|
|
if _, ok := includeMap["*.*"]; ok {
|
|
|
|
if _, ok := includeMap["*.*"]; ok {
|
|
|
|
return true
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
@ -444,7 +447,7 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
|
|
|
if filter.SmallThan != 0 && filter.SmallThan < tx.Size {
|
|
|
|
return true
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if len(tx.Txs) == 0 && tx.matchFilterSchema {
|
|
|
|
if tx.validSchemaCount == 0 {
|
|
|
|
return true
|
|
|
|
return true
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return fn(tx)
|
|
|
|
return fn(tx)
|
|
|
@ -617,9 +620,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter
|
|
|
|
tx.Status = status
|
|
|
|
tx.Status = status
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if !matchTbs(ev.DB, ev.TB) {
|
|
|
|
if !matchTbs(ev.DB, ev.TB) {
|
|
|
|
tx.matchFilterSchema = true
|
|
|
|
|
|
|
|
continue
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
tx.validSchemaCount++
|
|
|
|
tx.Txs = append(tx.Txs, TxDetail{
|
|
|
|
tx.Txs = append(tx.Txs, TxDetail{
|
|
|
|
StartPos: startPos,
|
|
|
|
StartPos: startPos,
|
|
|
|
EndPos: int(h.LogPos),
|
|
|
|
EndPos: int(h.LogPos),
|
|
|
|