main.go 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. package main
  2. import (
  3. "bufio"
  4. "encoding/json"
  5. "fmt"
  6. "io"
  7. "os"
  8. "strings"
  9. "sync"
  10. "time"
  11. )
  12. type logFormat struct {
  13. Timestamp string `json:"timestamp"`
  14. Slbip string `json:"slbip"`
  15. Clientip string `json:"clientip"`
  16. Serverip string `json:"serverip"`
  17. Size int `json:"size"`
  18. Method string `json:"method"`
  19. Requesturi string `json:"requesturi"`
  20. Request_body string `json:"request_body"`
  21. Appversion string `json:"appversion"`
  22. Referer string `json:"referer"`
  23. Agent string `json:"agent"`
  24. Devicecode string `json:"devicecode"`
  25. Reqalldealtime string `json:"reqalldealtime"`
  26. Upsmresptime string `json:"upsmresptime"`
  27. Upstreamhost string `json:"upstreamhost"`
  28. Url string `json:"url"`
  29. Status string `json:"status"`
  30. Res_hd_traceid string `json:"res_hd_traceid"`
  31. Http_host string `json:"http_host"`
  32. Req_hd_traceid string `json:"req_hd_traceid"`
  33. }
  34. func readFile(file string, ch chan<- *logFormat, startTimeObj time.Time, endTimeObj time.Time) {
  35. fileObj, err := os.Open(file)
  36. defer fileObj.Close()
  37. if err != nil {
  38. fmt.Println("open file err: ", err)
  39. return
  40. }
  41. defer fileObj.Close()
  42. reader := bufio.NewReader(fileObj)
  43. for {
  44. line, err := reader.ReadString('\n')
  45. if err == io.EOF {
  46. close(ch)
  47. break
  48. }
  49. if err != nil {
  50. fmt.Println("Read err: ", err)
  51. break
  52. }
  53. // 反序列化json
  54. l := logFormat{}
  55. json.Unmarshal([]byte(line), &l)
  56. lineTime := parseStr2Time(l.Timestamp)
  57. if isMatched(lineTime, startTimeObj, endTimeObj) {
  58. ch <- &l
  59. }
  60. }
  61. defer wg.Done()
  62. }
  63. func parseStr2Time(lineTime string) time.Time {
  64. s1 := strings.Split(lineTime, "T")
  65. s2 := strings.Split(s1[1], "+")
  66. s3 := fmt.Sprintf("%s %s", s1[0], s2[0])
  67. t, err := time.ParseInLocation("2006-01-02 15:04:05", s3, time.Local)
  68. if err != nil {
  69. fmt.Println("ParseInLocation err: ", err)
  70. }
  71. return t
  72. }
  73. func isMatched(lineTime time.Time, startTimeObj time.Time, endTimeObj time.Time) bool {
  74. if lineTime.Before(endTimeObj) && lineTime.After(startTimeObj) {
  75. return true
  76. }
  77. return false
  78. }
  79. func writeFile(file string, ch <-chan *logFormat) {
  80. defer wg.Done()
  81. fileObj, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
  82. defer fileObj.Close()
  83. if err != nil {
  84. fmt.Println("Open Write File Faild. err: ", err)
  85. }
  86. writer := bufio.NewWriter(fileObj)
  87. for line := range ch {
  88. lineStr, err := json.Marshal(line)
  89. if err != nil {
  90. fmt.Println("Marshal Faild. err: ", err)
  91. }
  92. lineFormat := fmt.Sprintf("%s\n", string(lineStr))
  93. writer.WriteString(lineFormat)
  94. }
  95. writer.Flush()
  96. }
  97. const (
  98. startTimeStr = "2020-07-17 17:30:00"
  99. endTimeStr = "2020-07-17 20:10:00"
  100. srcFile = "/opt/tools/access.log_2020-07-17-1"
  101. dstFile = "/opt/tools/result"
  102. )
  103. var wg sync.WaitGroup
  104. func main() {
  105. ch := make(chan *logFormat, 1000000)
  106. startTimeObj, err := time.ParseInLocation("2006-01-02 15:04:05", startTimeStr, time.Local)
  107. if err != nil {
  108. fmt.Println("startTime Parse err: ", err)
  109. }
  110. endTimeObj, err1 := time.ParseInLocation("2006-01-02 15:04:05", endTimeStr, time.Local)
  111. if err1 != nil {
  112. fmt.Println("endTime Parse err: ", err1)
  113. }
  114. wg.Add(2)
  115. go readFile(srcFile, ch, startTimeObj, endTimeObj)
  116. go writeFile(dstFile, ch)
  117. wg.Wait()
  118. fmt.Println("mession complete.")
  119. }