diff --git a/go.mod b/go.mod index 43cd554..f3380f2 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module binlog go 1.20 require ( - b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 + b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/starlog v1.3.2 b612.me/staros v1.1.6 diff --git a/go.sum b/go.sum index 24de8e2..d137836 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 h1:nUTBhXxtHAZd4p2ppbtj6wg5Ji5bbCAsWu6LAo5XvVs= -b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= +b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af h1:+toQg4dBQ19XpKPYSa9xRrYqXdHpB1HG5+KOxYiVMlY= +b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= diff --git a/main.go b/main.go index 1588a06..129a2ee 100644 --- a/main.go +++ b/main.go @@ -58,7 +58,11 @@ var cmd = &cobra.Command{ starlog.Warningln("Please enter a binlog path or folder") return } + now := time.Now() ParseBinlog() + cost := time.Now().Sub(now).Seconds() + fmt.Println("") + fmt.Printf("Time Cost:%.2fs", cost) }, } @@ -88,6 +92,7 @@ func ParseBinlog() { title.AddCell().SetValue("EndPos") title.AddCell().SetValue("事务大小") title.AddCell().SetValue("影响行数") + title.AddCell().SetValue("压缩类型") title.AddCell().SetValue("单语句StartPos") title.AddCell().SetValue("单语句EndPos") title.AddCell().SetValue("单语句时间") @@ -101,7 +106,7 @@ func ParseBinlog() { res.SetColWidth(1, 1, 40) res.SetColWidth(3, 6, 6) res.SetColWidth(7, 7, 5) - res.SetColWidth(15, 15, 40) + res.SetColWidth(16, 16, 40) owrt.Save(outPath) } getParser := func(fpath string) string { @@ -112,17 +117,45 @@ func ParseBinlog() { if endTime != 0 { eTime = time.Unix(endTime, 0) } + onlyGtid := false + if !vbo && outPath == "" { + onlyGtid = true + } var filter = binlog.BinlogFilter{ - IncludeGtid: includeGtid, - ExcludeGtid: excludeGtid, - StartPos: startPos, - EndPos: endPos, - StartDate: sTime, - EndDate: eTime, - BigThan: bigThan, - SmallThan: smallThan, + IncludeGtid: includeGtid, + ExcludeGtid: excludeGtid, + StartPos: startPos, + EndPos: endPos, + StartDate: sTime, + EndDate: eTime, + BigThan: bigThan, + SmallThan: smallThan, + OnlyShowGtid: onlyGtid, } var cGtid *gtid.Gtid + proc := make(chan string, 1000) + if !vbo { + go func() { + var latest string + var count = uint16(0) + for { + select { + case tmp := <-proc: + if tmp == "end" { + fmt.Println(latest) + return + } + latest = tmp + if count%10 == 0 { + fmt.Print(latest) + } + count++ + case <-time.After(time.Second * 2): + fmt.Print(latest) + } + } + }() + } err = binlog.ParseBinlogWithFilter(fpath, pos, filter, func(tx binlog.Transaction) { foundCount++ if cGtid == nil { @@ -136,7 +169,7 @@ func ParseBinlog() { totalGtid.Add(tx.GTID) } if !vbo { - fmt.Printf("已找到%d个合规GTID\r", foundCount) + proc <- fmt.Sprintf("已找到%d个合法GTID\r", foundCount) } else { fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n", tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs) @@ -155,6 +188,11 @@ func ParseBinlog() { r.AddCell().SetValue(tx.EndPos) r.AddCell().SetValue(tx.Size) r.AddCell().SetValue(tx.RowsCount) + if t.CompressionType == "" { + r.AddCell().SetValue("NONE") + } else { + r.AddCell().SetValue(t.CompressionType) + } r.AddCell().SetValue(t.StartPos) r.AddCell().SetValue(t.EndPos) r.AddCell().SetValue(t.Time.String()) @@ -167,6 +205,9 @@ func ParseBinlog() { } } }) + if !vbo { + time.Sleep(time.Millisecond * 500) + } var cGtidStr string if cGtid != nil { cGtidStr = cGtid.String() diff --git a/vendor/b612.me/mysql/binlog/parse.go b/vendor/b612.me/mysql/binlog/parse.go index df37894..0de8e49 100644 --- a/vendor/b612.me/mysql/binlog/parse.go +++ b/vendor/b612.me/mysql/binlog/parse.go @@ -12,16 +12,17 @@ import ( ) type TxDetail struct { - StartPos int - EndPos int - RowCount int - Timestamp int64 - Time time.Time - Sql string - Db string - Table string - SqlType string - Rows [][]interface{} + StartPos int + EndPos int + RowCount int + Timestamp int64 + Time time.Time + Sql string + Db string + Table string + SqlType string + CompressionType string + Rows [][]interface{} } type Transaction struct { @@ -80,13 +81,7 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { var ( err error n int64 - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 tbMapPos uint32 = 0 - rows [][]interface{} ) var tx Transaction currentGtid := "" @@ -157,100 +152,105 @@ func parseBinlogDetail(r io.Reader, f func(Transaction)) error { //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) - startPos := 0 - if sqlType == "query" || sqlType == "gtid" { - startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } else { - startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - switch sqlType { - case "gtid": - if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ + evs := ParseBinlogEvent(binEvent) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } else { + startPos = int(tbMapPos) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } + switch ev.Type { + case "gtid": + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v + } + tx.Size = tx.EndPos - tx.StartPos + if f != nil { + f(tx) } - tx.RowsCount += v.RowCount - tx.Txs[k] = v } - tx.Size = tx.EndPos - tx.StartPos - if f != nil { - f(tx) + currentGtid = ev.Data + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), } + case "": + tx.EndPos = int(h.LogPos) + continue + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + default: + tx.EndPos = int(h.LogPos) + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + CompressionType: ev.CompressionType, + }) } - currentGtid = sql - tx = Transaction{ - GTID: sql, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - } - case "": - tx.EndPos = int(h.LogPos) - continue - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, sql) - default: - tx.EndPos = int(h.LogPos) - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: db, - Table: tb, - Sql: sql, - SqlType: sqlType, - Rows: rows, - RowCount: int(rowCnt), - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - }) } } } -func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, string, string, string, uint32, [][]interface{}) { - var ( - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 - rows [][]interface{} - ) +type BinlogEvent struct { + Type string + DB string + TB string + Data string + RowCnt uint32 + Rows [][]interface{} + CompressionType string +} +func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { + var res []BinlogEvent + var sig BinlogEvent switch ev.Header.EventType { case replication.ANONYMOUS_GTID_EVENT: //ge := ev.Event.(*replication.GTIDEvent) - sql = "anonymous-gtid-event:1" - sqlType = "gtid" + sig.Data = "anonymous-gtid-event:1" + sig.Type = "gtid" case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: - wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "insert" - rowCnt = uint32(len(wrEvent.Rows)) - rows = wrEvent.Rows + sig.TB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "insert" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows case replication.UPDATE_ROWS_EVENTv1, replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "update" - rowCnt = uint32(len(wrEvent.Rows)) / 2 - rows = wrEvent.Rows + sig.DB = string(wrEvent.Table.Schema) + sig.DB = string(wrEvent.Table.Table) + sig.Type = "update" + sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 + sig.Rows = wrEvent.Rows case replication.DELETE_ROWS_EVENTv1, replication.DELETE_ROWS_EVENTv2: @@ -258,52 +258,66 @@ func GetDbTbAndQueryAndRowCntFromBinevent(ev *replication.BinlogEvent) (string, //replication.TABLE_MAP_EVENT: wrEvent := ev.Event.(*replication.RowsEvent) - db = string(wrEvent.Table.Schema) - tb = string(wrEvent.Table.Table) - sqlType = "delete" - rowCnt = uint32(len(wrEvent.Rows)) - rows = wrEvent.Rows + sig.DB = string(wrEvent.Table.Schema) + sig.TB = string(wrEvent.Table.Table) + sig.Type = "delete" + sig.RowCnt = uint32(len(wrEvent.Rows)) + sig.Rows = wrEvent.Rows case replication.ROWS_QUERY_EVENT: queryEvent := ev.Event.(*replication.RowsQueryEvent) - sql = string(queryEvent.Query) - sqlType = "rowsquery" + sig.Data = string(queryEvent.Query) + sig.Type = "rowsquery" case replication.QUERY_EVENT: queryEvent := ev.Event.(*replication.QueryEvent) - db = string(queryEvent.Schema) - sql = string(queryEvent.Query) - sqlType = "query" + sig.DB = string(queryEvent.Schema) + sig.Data = string(queryEvent.Query) + sig.Type = "query" case replication.MARIADB_GTID_EVENT: // For global transaction ID, used to start a new transaction event group, instead of the old BEGIN query event, and also to mark stand-alone (ddl). //https://mariadb.com/kb/en/library/gtid_event/ - sql = "begin" - sqlType = "query" + sig.Data = "begin" + sig.Type = "query" case replication.XID_EVENT: // XID_EVENT represents commit。rollback transaction not in binlog - sql = "commit" - sqlType = "query" + sig.Data = "commit" + sig.Type = "query" case replication.GTID_EVENT: ge := ev.Event.(*replication.GTIDEvent) gid, err := gtid.Parse(fmt.Sprintf("%s:%d", bytesToUuid(ge.SID), ge.GNO)) if err == nil { - sql = gid.String() + sig.Data = gid.String() + } + sig.Type = "gtid" + case replication.TRANSACTION_PAYLOAD_EVENT: + ge := ev.Event.(*replication.TransactionPayloadEvent) + for _, val := range ge.Events { + res = append(res, ParseBinlogEvent(val)...) + } + for idx := range res { + if ge.CompressionType == 0 { + res[idx].CompressionType = "ZSTD" + } else if ge.CompressionType != 255 { + res[idx].CompressionType = "UNKNOWN" + } } - sqlType = "gtid" + return res } - return db, tb, sqlType, sql, rowCnt, rows - + res = append(res, sig) + return res } type BinlogFilter struct { - IncludeGtid string - ExcludeGtid string - StartPos int - EndPos int - StartDate time.Time - EndDate time.Time - BigThan int - SmallThan int + IncludeGtid string + ExcludeGtid string + StartPos int + EndPos int + StartDate time.Time + EndDate time.Time + BigThan int + SmallThan int + OnlyShowGtid bool } func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter BinlogFilter, fn func(Transaction)) error { @@ -325,14 +339,8 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter // process: 0, continue: 1, break: 2, EOF: 3 var ( n int64 - db string = "" - tb string = "" - sql string = "" - sqlType string = "" - rowCnt uint32 = 0 tbMapPos uint32 = 0 skipTillNext bool = false - rows [][]interface{} ) var tx Transaction @@ -398,6 +406,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter if skipTillNext && h.EventType != replication.GTID_EVENT { continue } + if filter.OnlyShowGtid && h.EventType != replication.GTID_EVENT { + continue + } //h.Dump(os.Stdout) data := buf.Bytes() @@ -426,78 +437,81 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter //binEvent := &replication.BinlogEvent{RawData: rawData, Header: h, Event: e} binEvent := &replication.BinlogEvent{Header: h, Event: e} // we donnot need raw data - db, tb, sqlType, sql, rowCnt, rows = GetDbTbAndQueryAndRowCntFromBinevent(binEvent) - startPos := 0 - if sqlType == "query" || sqlType == "gtid" { - startPos = int(h.LogPos - h.EventSize) - //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } else { - startPos = int(tbMapPos) - //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) - // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, - // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} - } - switch sqlType { - case "gtid": - if skipTillNext { - skipTillNext = false + evs := ParseBinlogEvent(binEvent) + for _, ev := range evs { + startPos := 0 + if ev.Type == "query" || ev.Type == "gtid" { + startPos = int(h.LogPos - h.EventSize) + //fmt.Println(h.Timestamp, h.LogPos-h.EventSize, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: h.LogPos - h.EventSize, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} + } else { + startPos = int(tbMapPos) + //fmt.Println(h.Timestamp, t bMapPos, h.LogPos, db, tb, "sql="+sql, rowCnt, sqlType) + // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, + // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } - if currentGtid != "" { - idx := 0 - for k, v := range tx.Txs { - if v.SqlType != "query" && len(tx.sqlOrigin) > idx { - v.Sql = tx.sqlOrigin[idx] - idx++ + switch ev.Type { + case "gtid": + if skipTillNext { + skipTillNext = false + } + if currentGtid != "" { + idx := 0 + for k, v := range tx.Txs { + if v.SqlType != "query" && len(tx.sqlOrigin) > idx { + v.Sql = tx.sqlOrigin[idx] + idx++ + } + tx.RowsCount += v.RowCount + tx.Txs[k] = v } - tx.RowsCount += v.RowCount - tx.Txs[k] = v + tx.Size = tx.EndPos - tx.StartPos + callFn(tx) } - tx.Size = tx.EndPos - tx.StartPos - callFn(tx) - } - currentGtid = sql - if inGtid != nil { - if c, _ := inGtid.Contain(sql); !c { - currentGtid = "" - skipTillNext = true - continue + currentGtid = ev.Data + if inGtid != nil { + if c, _ := inGtid.Contain(ev.Data); !c { + currentGtid = "" + skipTillNext = true + continue + } } - } - if exGtid != nil { - if c, _ := exGtid.Contain(sql); c { - currentGtid = "" - skipTillNext = true - continue + if exGtid != nil { + if c, _ := exGtid.Contain(ev.Data); c { + currentGtid = "" + skipTillNext = true + continue + } } + tx = Transaction{ + GTID: ev.Data, + StartPos: startPos, + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + } + case "": + tx.EndPos = int(h.LogPos) + continue + case "rowsquery": + tx.EndPos = int(h.LogPos) + tx.sqlOrigin = append(tx.sqlOrigin, ev.Data) + default: + tx.EndPos = int(h.LogPos) + tx.Txs = append(tx.Txs, TxDetail{ + StartPos: startPos, + EndPos: int(h.LogPos), + Db: ev.DB, + Table: ev.TB, + Sql: ev.Data, + SqlType: ev.Type, + Rows: ev.Rows, + RowCount: int(ev.RowCnt), + Timestamp: int64(h.Timestamp), + Time: time.Unix(int64(h.Timestamp), 0), + CompressionType: ev.CompressionType, + }) } - tx = Transaction{ - GTID: sql, - StartPos: startPos, - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - } - case "": - tx.EndPos = int(h.LogPos) - continue - case "rowsquery": - tx.EndPos = int(h.LogPos) - tx.sqlOrigin = append(tx.sqlOrigin, sql) - default: - tx.EndPos = int(h.LogPos) - tx.Txs = append(tx.Txs, TxDetail{ - StartPos: startPos, - EndPos: int(h.LogPos), - Db: db, - Table: tb, - Sql: sql, - SqlType: sqlType, - Rows: rows, - RowCount: int(rowCnt), - Timestamp: int64(h.Timestamp), - Time: time.Unix(int64(h.Timestamp), 0), - }) } } } diff --git a/vendor/modules.txt b/vendor/modules.txt index 47f9b8e..4c288f8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# b612.me/mysql/binlog v0.0.0-20230524072531-39ca67fcfe81 +# b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af ## explicit; go 1.20 b612.me/mysql/binlog # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044