package main // usage: ./main -i rm-2ze5b2o6qq1f3nt26 -s 2021-06-22T16:00:00Z -e 2021-06-23T16:59:59Z -c 50 import ( "flag" "fmt" "math" "os" "sync" "github.com/aliyun/alibaba-cloud-sdk-go/services/rds" //"github.com/alibabacloud-go/tea/tea" "github.com/golang/glog" "gopkg.in/mgo.v2" //"encoding/json" //"reflect" ) func requestAction(startTime string, endTime string, pageSize, pageNumber int, instanceId string) (response *rds.DescribeSQLLogRecordsResponse) { defer glog.Flush() client, err := rds.NewClientWithAccessKey("cn-beijing", "access-key", "access-token") request := rds.CreateDescribeSQLLogRecordsRequest() request.Scheme = "https" request.DBInstanceId = instanceId request.StartTime = startTime request.EndTime = endTime response, err = client.DescribeSQLLogRecords(request) dateTime := string([]byte(endTime)[:10]) saveMongoDb(response, dateTime, instanceId) if err != nil { fmt.Print(err.Error()) glog.Error(pageNumber) } return } func getPages(startTime string, endTime string, instanceId string) int64 { response := requestAction(startTime, endTime, 100, 1, instanceId) //fmt.Println("type:", reflect.TypeOf(response)) return response.TotalRecordCount } func run(startTime string, endTime string, chPages <-chan int, instanceId string) { defer wg.Done() for { i, ok := <-chPages requestAction(startTime, endTime, 100, i, instanceId) if !ok { break } fmt.Println(i) } } func saveMongoDb(pageResponse *rds.DescribeSQLLogRecordsResponse, dateTime string, instanceId string) bool { // db.getCollection("2021-06-16").find() mongo, err := mgo.Dial("127.0.0.1") defer mongo.Close() if err != nil { return false } client := mongo.DB(instanceId).C(dateTime) type SQLRecord struct { ExecuteTime string ThreadID string ReturnRowCounts int DBName string TotalExecutionTimes int HostAddress string AccountName string SQLText string } for _, item := range pageResponse.Items.SQLRecord { var s SQLRecord s.ExecuteTime = string(item.ExecuteTime) s.ThreadID = string(item.ThreadID) s.ReturnRowCounts = int(item.ReturnRowCounts) s.DBName = string(item.DBName) s.TotalExecutionTimes = int(item.TotalExecutionTimes) s.HostAddress = string(item.HostAddress) s.AccountName = string(item.AccountName) s.SQLText = string(item.SQLText) cErr := client.Insert(&s) if cErr != nil { return false } } return true } func cliVars(startTime, endTime *string, gorutineCount *int, instanceId *string, logPath *string) { flag.StringVar(startTime, "s", "", "起始时间,UTC时区,北京时间-8小时,格式: 2021-06-16T04:00:00Z") flag.StringVar(endTime, "e", "", "结束时间,UTC时区,北京时间-8小时,格式: 2021-06-16T04:01:59Z") flag.IntVar(gorutineCount, "c", 12, "线程数,默认为12") flag.StringVar(instanceId, "i", "", "RDS的instanceId,格式: rm-2zem3y91kg5890pqy") flag.StringVar(logPath, "l", "log", "日志输出路径,默认为当前目录下的log目录") flag.Set("log_dir", *logPath) flag.Parse() } func logPathiExist(logPath string) { _, err := os.Stat(logPath) if err != nil { err1 := os.Mkdir(logPath, 755) if err1 != nil { glog.Error(err1) fmt.Println(err1) } } } var wg sync.WaitGroup var _ = fmt.Println func main() { var startTime string var endTime string var instanceId string var gorutineCount int var logPath string cliVars(&startTime, &endTime, &gorutineCount, &instanceId, &logPath) if len(os.Args) < 4 { fmt.Println("命令行参数错误,请输入-h查看帮助.") os.Exit(1) } logPathiExist(logPath) //startTime := "2021-06-16T04:00:00Z" //endTime := "2021-06-16T04:01:59Z" totalRecordCount := getPages(startTime, endTime, instanceId) pages := math.Ceil(float64(totalRecordCount) / float64(100)) chPages := make(chan int, int(pages)) for i := 1; i <= int(pages); i++ { chPages <- i } close(chPages) //gorutineCount := 15 wg.Add(gorutineCount) for i := 0; i < gorutineCount; i++ { go run(startTime, endTime, chPages, instanceId) } wg.Wait() fmt.Println(pages) // close(chPages) //response := requestAction("2021-06-16T04:00:00Z", "2021-06-16T04:01:59Z") //fmt.Printf("response is %#v\n", response) //fmt.Println("type:", reflect.TypeOf(response)) }