diff --git a/go.mod b/go.mod index f3380f2..96ef45f 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module binlog go 1.20 require ( - b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af + b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 b612.me/starlog v1.3.2 b612.me/staros v1.1.6 diff --git a/go.sum b/go.sum index d137836..a0058ee 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af h1:+toQg4dBQ19XpKPYSa9xRrYqXdHpB1HG5+KOxYiVMlY= -b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= +b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 h1:U5z6K7FTtGwAhDJn3TFqRGkuXYd623osfsus0AGwAPg= +b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4/go.mod h1:j9oDZUBx7+GK9X1b1bqO9SHddHvDRSGfwbIISxONqfA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044 h1:sJrYUl9Sb1tij6ROahFE3r/36Oag3kI92OXDjOKsdwA= b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044/go.mod h1:3EHq1jvlm3a92UxagMjfqSSVYb3KW2H3aT5nd4SiD94= b612.me/notify v1.2.4 h1:cjP80V9FeM+ib1DztZdykusakcbjNI4dAB1pXE8U6bo= diff --git a/main.go b/main.go index 129a2ee..8b7371b 100644 --- a/main.go +++ b/main.go @@ -30,6 +30,7 @@ var ( skipquery bool pos int64 prefix string + outasRow bool ) func init() { @@ -47,6 +48,7 @@ func init() { cmd.Flags().IntVar(&bigThan, "big", 0, "show tx big than x bytes") cmd.Flags().IntVar(&smallThan, "small", 0, "show tx small than x bytes") cmd.Flags().StringVar(&prefix, "prefix", "mysql-bin", "mysql binlog prefix") + cmd.Flags().BoolVarP(&outasRow, "row-mode", "r", false, "output as row") } var cmd = &cobra.Command{ @@ -101,7 +103,9 @@ func ParseBinlog() { title.AddCell().SetValue("单语句影响表") title.AddCell().SetValue("SQL类型") title.AddCell().SetValue("具体SQL") - title.AddCell().SetValue("变更内容") + title.AddCell().SetValue("从属事务编号") + title.AddCell().SetValue("同事务行编号") + title.AddCell().SetValue("行变更内容") res.SetColWidth(0, 0, 5) res.SetColWidth(1, 1, 40) res.SetColWidth(3, 6, 6) @@ -150,7 +154,7 @@ func ParseBinlog() { fmt.Print(latest) } count++ - case <-time.After(time.Second * 2): + case <-time.After(time.Millisecond * 200): fmt.Print(latest) } } @@ -171,37 +175,243 @@ func ParseBinlog() { if !vbo { proc <- fmt.Sprintf("已找到%d个合法GTID\r", foundCount) } else { - fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n", - tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs) + if !outasRow { + fmt.Printf("GTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v Detail:%+v\n", + tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size, tx.Txs) + } else { + fmt.Printf("-------------------------\nGTID:%s Time:%s StartPos:%v EndPos:%v RowsCount:%v Size:%v\n\n", + tx.GTID, tx.Time, tx.StartPos, tx.EndPos, tx.RowsCount, tx.Size) + for _, t := range tx.Txs { + if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { + continue + } + switch t.SqlType { + case "insert": + for _, rows := range t.Rows { + setence := "" + for _, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("%v, ", row) + case string: + setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) + case []byte: + setence += fmt.Sprintf("%v, ", row) + case nil: + setence += fmt.Sprintf("%v, ", "NULL") + default: + setence += fmt.Sprintf("%v, ", row) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-2] + } + sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES (%v)`, t.Db, t.Table, setence) + fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) + + } + case "update": + var sql string + var where string + for idxc, rows := range t.Rows { + setence := "" + spec := ", " + if idxc%2 == 0 { + spec = " AND " + } + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + } + + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + if idxc%2 == 0 { + where = setence + continue + } + sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) + fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) + } + case "delete": + for _, rows := range t.Rows { + setence := "" + spec := " AND " + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) + fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, sql) + } + default: + fmt.Printf("GTID:%s\nTime:%s\nStartPos:%v\nEndPos:%v\nRowsCount:%v\nSQLOrigin:%v\nSQL:%+v\n\n", + tx.GTID, t.Time, t.StartPos, t.EndPos, t.RowCount, t.Sql, t.Rows) + } + } + } } if outPath != "" { - for _, t := range tx.Txs { + for k, t := range tx.Txs { if skipquery && (strings.ToLower(t.Sql) == "begin" || strings.ToLower(t.Sql) == "commit") { continue } - r := res.AddRow() - r.AddCell().SetValue(foundCount) - r.AddCell().SetValue(tx.GTID) - r.AddCell().SetValue(tx.Time.String()) - r.AddCell().SetValue(tx.Timestamp) - r.AddCell().SetValue(tx.StartPos) - r.AddCell().SetValue(tx.EndPos) - r.AddCell().SetValue(tx.Size) - r.AddCell().SetValue(tx.RowsCount) - if t.CompressionType == "" { - r.AddCell().SetValue("NONE") - } else { - r.AddCell().SetValue(t.CompressionType) + addRow := func() *xlsx.Row { + r := res.AddRow() + r.AddCell().SetValue(foundCount) + r.AddCell().SetValue(tx.GTID) + r.AddCell().SetValue(tx.Time.String()) + r.AddCell().SetValue(tx.Timestamp) + r.AddCell().SetValue(tx.StartPos) + r.AddCell().SetValue(tx.EndPos) + r.AddCell().SetValue(tx.Size) + r.AddCell().SetValue(tx.RowsCount) + if t.CompressionType == "" { + r.AddCell().SetValue("NONE") + } else { + r.AddCell().SetValue(t.CompressionType) + } + r.AddCell().SetValue(t.StartPos) + r.AddCell().SetValue(t.EndPos) + r.AddCell().SetValue(t.Time.String()) + r.AddCell().SetValue(t.RowCount) + r.AddCell().SetValue(t.Db) + r.AddCell().SetValue(t.Table) + r.AddCell().SetValue(t.SqlType) + r.AddCell().SetValue(t.Sql) + return r + } + if !outasRow { + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(1) + r.AddCell().SetValue(t.Rows) + continue + } + switch t.SqlType { + case "insert": + for idx, rows := range t.Rows { + setence := "" + for _, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("%v, ", row) + case string: + setence += fmt.Sprintf("'%v', ", strings.ReplaceAll(row.(string), "'", "''")) + case []byte: + setence += fmt.Sprintf("%v, ", row) + case nil: + setence += fmt.Sprintf("%v, ", "NULL") + default: + setence += fmt.Sprintf("%v, ", row) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-2] + } + sql := fmt.Sprintf(`INSERT INTO %s.%s VALUES(%v)`, t.Db, t.Table, setence) + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(idx + 1) + r.AddCell().SetValue(sql) + } + case "update": + var sql string + var where string + for idxc, rows := range t.Rows { + setence := "" + spec := ", " + if idxc%2 == 0 { + spec = " AND " + } + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''"), spec) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL", spec) + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row, spec) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + if idxc%2 == 0 { + where = setence + continue + } + sql = fmt.Sprintf("UPDATE %s.%s SET (%v) WHERE %v", t.Db, t.Table, setence, where) + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue((idxc + 1) / 2) + r.AddCell().SetValue(sql) + } + case "delete": + for idx, rows := range t.Rows { + setence := "" + spec := " AND " + for idxf, row := range rows { + switch row.(type) { + case uint, uint64, uint32, uint16, uint8, int, int64, int32, int16, int8, float64, float32: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + case string: + setence += fmt.Sprintf("$%d='%v'%s", idxf, strings.ReplaceAll(row.(string), "'", "''")) + case []byte: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + case nil: + setence += fmt.Sprintf("$%d=%v%s", idxf, "NULL") + default: + setence += fmt.Sprintf("$%d=%v%s", idxf, row) + } + } + if setence != "" && len(setence) > 2 { + setence = setence[:len(setence)-len(spec)] + } + sql := fmt.Sprintf("DELETE FROM %s.%s WHERE %v", t.Db, t.Table, setence) + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(idx + 1) + r.AddCell().SetValue(sql) + + } + default: + r := addRow() + r.AddCell().SetValue(k + 1) + r.AddCell().SetValue(1) + r.AddCell().SetValue(t.Rows) } - r.AddCell().SetValue(t.StartPos) - r.AddCell().SetValue(t.EndPos) - r.AddCell().SetValue(t.Time.String()) - r.AddCell().SetValue(t.RowCount) - r.AddCell().SetValue(t.Db) - r.AddCell().SetValue(t.Table) - r.AddCell().SetValue(t.SqlType) - r.AddCell().SetValue(t.Sql) - r.AddCell().SetValue(t.Rows) } } }) @@ -257,5 +467,5 @@ func ParseBinlog() { if totalGtid != nil { allGtid = totalGtid.String() } - fmt.Printf("Total Gtid:%v\nTotal Binlog Number:%v\n", allGtid, foundCount) + fmt.Printf("Total Gtid:%v\nTotal SQL Number:%v\n", allGtid, foundCount) } diff --git a/vendor/b612.me/mysql/binlog/parse.go b/vendor/b612.me/mysql/binlog/parse.go index 0de8e49..90f072e 100644 --- a/vendor/b612.me/mysql/binlog/parse.go +++ b/vendor/b612.me/mysql/binlog/parse.go @@ -238,7 +238,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { case replication.WRITE_ROWS_EVENTv1, replication.WRITE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) - sig.TB = string(wrEvent.Table.Schema) + sig.DB = string(wrEvent.Table.Schema) sig.TB = string(wrEvent.Table.Table) sig.Type = "insert" sig.RowCnt = uint32(len(wrEvent.Rows)) @@ -247,7 +247,7 @@ func ParseBinlogEvent(ev *replication.BinlogEvent) []BinlogEvent { replication.UPDATE_ROWS_EVENTv2: wrEvent := ev.Event.(*replication.RowsEvent) sig.DB = string(wrEvent.Table.Schema) - sig.DB = string(wrEvent.Table.Table) + sig.TB = string(wrEvent.Table.Table) sig.Type = "update" sig.RowCnt = uint32(len(wrEvent.Rows)) / 2 sig.Rows = wrEvent.Rows @@ -381,6 +381,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.RowsCount += v.RowCount tx.Txs[k] = v } + if filter.OnlyShowGtid { + tx.EndPos = tx.StartPos + } tx.Size = tx.EndPos - tx.StartPos callFn(tx) return nil @@ -451,6 +454,14 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter // cfg.StatChan <- BinEventStats{Timestamp: h.Timestamp, Binlog: *binlog, StartPos: tbMapPos, StopPos: h.LogPos, // Database: db, Table: tb, QuerySql: sql, RowCnt: rowCnt, QueryType: sqlType} } + if filter.EndPos != 0 && startPos > filter.EndPos { + skipTillNext = true + continue + } + if filter.StartPos != 0 && startPos < filter.EndPos { + skipTillNext = true + continue + } switch ev.Type { case "gtid": if skipTillNext { @@ -466,6 +477,9 @@ func parseBinlogWithFilter(r io.Reader, parse *replication.BinlogParser, filter tx.RowsCount += v.RowCount tx.Txs[k] = v } + if filter.OnlyShowGtid { + tx.EndPos = startPos - 1 + } tx.Size = tx.EndPos - tx.StartPos callFn(tx) } diff --git a/vendor/modules.txt b/vendor/modules.txt index 4c288f8..a772e21 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1,4 +1,4 @@ -# b612.me/mysql/binlog v0.0.0-20230629051642-8b0423eb94af +# b612.me/mysql/binlog v0.0.0-20230630053741-7c0272cc62e4 ## explicit; go 1.20 b612.me/mysql/binlog # b612.me/mysql/gtid v0.0.0-20230425105031-298e51a68044