123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129 |
- package main
- import (
- "bufio"
- "encoding/json"
- "fmt"
- "io"
- "os"
- "strings"
- "sync"
- "time"
- )
- type logFormat struct {
- Timestamp string `json:"timestamp"`
- Slbip string `json:"slbip"`
- Clientip string `json:"clientip"`
- Serverip string `json:"serverip"`
- Size int `json:"size"`
- Method string `json:"method"`
- Requesturi string `json:"requesturi"`
- Request_body string `json:"request_body"`
- Appversion string `json:"appversion"`
- Referer string `json:"referer"`
- Agent string `json:"agent"`
- Devicecode string `json:"devicecode"`
- Reqalldealtime string `json:"reqalldealtime"`
- Upsmresptime string `json:"upsmresptime"`
- Upstreamhost string `json:"upstreamhost"`
- Url string `json:"url"`
- Status string `json:"status"`
- Res_hd_traceid string `json:"res_hd_traceid"`
- Http_host string `json:"http_host"`
- Req_hd_traceid string `json:"req_hd_traceid"`
- }
- func readFile(file string, ch chan<- *logFormat, startTimeObj time.Time, endTimeObj time.Time) {
- fileObj, err := os.Open(file)
- defer fileObj.Close()
- if err != nil {
- fmt.Println("open file err: ", err)
- return
- }
- defer fileObj.Close()
- reader := bufio.NewReader(fileObj)
- for {
- line, err := reader.ReadString('\n')
- if err == io.EOF {
- close(ch)
- break
- }
- if err != nil {
- fmt.Println("Read err: ", err)
- break
- }
- // 反序列化json
- l := logFormat{}
- json.Unmarshal([]byte(line), &l)
- lineTime := parseStr2Time(l.Timestamp)
- if isMatched(lineTime, startTimeObj, endTimeObj) {
- ch <- &l
- }
- }
- defer wg.Done()
- }
- func parseStr2Time(lineTime string) time.Time {
- s1 := strings.Split(lineTime, "T")
- s2 := strings.Split(s1[1], "+")
- s3 := fmt.Sprintf("%s %s", s1[0], s2[0])
- t, err := time.ParseInLocation("2006-01-02 15:04:05", s3, time.Local)
- if err != nil {
- fmt.Println("ParseInLocation err: ", err)
- }
- return t
- }
- func isMatched(lineTime time.Time, startTimeObj time.Time, endTimeObj time.Time) bool {
- if lineTime.Before(endTimeObj) && lineTime.After(startTimeObj) {
- return true
- }
- return false
- }
- func writeFile(file string, ch <-chan *logFormat) {
- defer wg.Done()
- fileObj, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
- defer fileObj.Close()
- if err != nil {
- fmt.Println("Open Write File Faild. err: ", err)
- }
- writer := bufio.NewWriter(fileObj)
- for line := range ch {
- lineStr, err := json.Marshal(line)
- if err != nil {
- fmt.Println("Marshal Faild. err: ", err)
- }
- lineFormat := fmt.Sprintf("%s\n", string(lineStr))
- writer.WriteString(lineFormat)
- }
- writer.Flush()
- }
- const (
- startTimeStr = "2020-07-17 17:30:00"
- endTimeStr = "2020-07-17 20:10:00"
- srcFile = "/opt/tools/access.log_2020-07-17-1"
- dstFile = "/opt/tools/result"
- )
- var wg sync.WaitGroup
- func main() {
- ch := make(chan *logFormat, 1000000)
- startTimeObj, err := time.ParseInLocation("2006-01-02 15:04:05", startTimeStr, time.Local)
- if err != nil {
- fmt.Println("startTime Parse err: ", err)
- }
- endTimeObj, err1 := time.ParseInLocation("2006-01-02 15:04:05", endTimeStr, time.Local)
- if err1 != nil {
- fmt.Println("endTime Parse err: ", err1)
- }
- wg.Add(2)
- go readFile(srcFile, ch, startTimeObj, endTimeObj)
- go writeFile(dstFile, ch)
- wg.Wait()
- fmt.Println("mession complete.")
- }
|