aliyunSQLLogDownLoader.go 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150
  1. package main
  2. // usage: ./main -i rm-2ze5b2o6qq1f3nt26 -s 2021-06-22T16:00:00Z -e 2021-06-23T16:59:59Z -c 50
  3. import (
  4. "flag"
  5. "fmt"
  6. "math"
  7. "os"
  8. "sync"
  9. "github.com/aliyun/alibaba-cloud-sdk-go/services/rds"
  10. //"github.com/alibabacloud-go/tea/tea"
  11. "github.com/golang/glog"
  12. "gopkg.in/mgo.v2"
  13. //"encoding/json"
  14. //"reflect"
  15. )
  16. func requestAction(startTime string, endTime string, pageSize, pageNumber int, instanceId string) (response *rds.DescribeSQLLogRecordsResponse) {
  17. defer glog.Flush()
  18. client, err := rds.NewClientWithAccessKey("cn-beijing", "access-key", "access-token")
  19. request := rds.CreateDescribeSQLLogRecordsRequest()
  20. request.Scheme = "https"
  21. request.DBInstanceId = instanceId
  22. request.StartTime = startTime
  23. request.EndTime = endTime
  24. response, err = client.DescribeSQLLogRecords(request)
  25. dateTime := string([]byte(endTime)[:10])
  26. saveMongoDb(response, dateTime, instanceId)
  27. if err != nil {
  28. fmt.Print(err.Error())
  29. glog.Error(pageNumber)
  30. }
  31. return
  32. }
  33. func getPages(startTime string, endTime string, instanceId string) int64 {
  34. response := requestAction(startTime, endTime, 100, 1, instanceId)
  35. //fmt.Println("type:", reflect.TypeOf(response))
  36. return response.TotalRecordCount
  37. }
  38. func run(startTime string, endTime string, chPages <-chan int, instanceId string) {
  39. defer wg.Done()
  40. for {
  41. i, ok := <-chPages
  42. requestAction(startTime, endTime, 100, i, instanceId)
  43. if !ok {
  44. break
  45. }
  46. fmt.Println(i)
  47. }
  48. }
  49. func saveMongoDb(pageResponse *rds.DescribeSQLLogRecordsResponse, dateTime string, instanceId string) bool {
  50. // db.getCollection("2021-06-16").find()
  51. mongo, err := mgo.Dial("127.0.0.1")
  52. defer mongo.Close()
  53. if err != nil {
  54. return false
  55. }
  56. client := mongo.DB(instanceId).C(dateTime)
  57. type SQLRecord struct {
  58. ExecuteTime string
  59. ThreadID string
  60. ReturnRowCounts int
  61. DBName string
  62. TotalExecutionTimes int
  63. HostAddress string
  64. AccountName string
  65. SQLText string
  66. }
  67. for _, item := range pageResponse.Items.SQLRecord {
  68. var s SQLRecord
  69. s.ExecuteTime = string(item.ExecuteTime)
  70. s.ThreadID = string(item.ThreadID)
  71. s.ReturnRowCounts = int(item.ReturnRowCounts)
  72. s.DBName = string(item.DBName)
  73. s.TotalExecutionTimes = int(item.TotalExecutionTimes)
  74. s.HostAddress = string(item.HostAddress)
  75. s.AccountName = string(item.AccountName)
  76. s.SQLText = string(item.SQLText)
  77. cErr := client.Insert(&s)
  78. if cErr != nil {
  79. return false
  80. }
  81. }
  82. return true
  83. }
  84. func cliVars(startTime, endTime *string, gorutineCount *int, instanceId *string, logPath *string) {
  85. flag.StringVar(startTime, "s", "", "起始时间,UTC时区,北京时间-8小时,格式: 2021-06-16T04:00:00Z")
  86. flag.StringVar(endTime, "e", "", "结束时间,UTC时区,北京时间-8小时,格式: 2021-06-16T04:01:59Z")
  87. flag.IntVar(gorutineCount, "c", 12, "线程数,默认为12")
  88. flag.StringVar(instanceId, "i", "", "RDS的instanceId,格式: rm-2zem3y91kg5890pqy")
  89. flag.StringVar(logPath, "l", "log", "日志输出路径,默认为当前目录下的log目录")
  90. flag.Set("log_dir", *logPath)
  91. flag.Parse()
  92. }
  93. func logPathiExist(logPath string) {
  94. _, err := os.Stat(logPath)
  95. if err != nil {
  96. err1 := os.Mkdir(logPath, 755)
  97. if err1 != nil {
  98. glog.Error(err1)
  99. fmt.Println(err1)
  100. }
  101. }
  102. }
  103. var wg sync.WaitGroup
  104. var _ = fmt.Println
  105. func main() {
  106. var startTime string
  107. var endTime string
  108. var instanceId string
  109. var gorutineCount int
  110. var logPath string
  111. cliVars(&startTime, &endTime, &gorutineCount, &instanceId, &logPath)
  112. if len(os.Args) < 4 {
  113. fmt.Println("命令行参数错误,请输入-h查看帮助.")
  114. os.Exit(1)
  115. }
  116. logPathiExist(logPath)
  117. //startTime := "2021-06-16T04:00:00Z"
  118. //endTime := "2021-06-16T04:01:59Z"
  119. totalRecordCount := getPages(startTime, endTime, instanceId)
  120. pages := math.Ceil(float64(totalRecordCount) / float64(100))
  121. chPages := make(chan int, int(pages))
  122. for i := 1; i <= int(pages); i++ {
  123. chPages <- i
  124. }
  125. close(chPages)
  126. //gorutineCount := 15
  127. wg.Add(gorutineCount)
  128. for i := 0; i < gorutineCount; i++ {
  129. go run(startTime, endTime, chPages, instanceId)
  130. }
  131. wg.Wait()
  132. fmt.Println(pages)
  133. // close(chPages)
  134. //response := requestAction("2021-06-16T04:00:00Z", "2021-06-16T04:01:59Z")
  135. //fmt.Printf("response is %#v\n", response)
  136. //fmt.Println("type:", reflect.TypeOf(response))
  137. }