package main import ( "bufio" "encoding/json" "fmt" "io" "os" "strings" "sync" "time" ) type logFormat struct { Timestamp string `json:"timestamp"` Slbip string `json:"slbip"` Clientip string `json:"clientip"` Serverip string `json:"serverip"` Size int `json:"size"` Method string `json:"method"` Requesturi string `json:"requesturi"` Request_body string `json:"request_body"` Appversion string `json:"appversion"` Referer string `json:"referer"` Agent string `json:"agent"` Devicecode string `json:"devicecode"` Reqalldealtime string `json:"reqalldealtime"` Upsmresptime string `json:"upsmresptime"` Upstreamhost string `json:"upstreamhost"` Url string `json:"url"` Status string `json:"status"` Res_hd_traceid string `json:"res_hd_traceid"` Http_host string `json:"http_host"` Req_hd_traceid string `json:"req_hd_traceid"` } func readFile(file string, ch chan<- *logFormat, startTimeObj time.Time, endTimeObj time.Time) { fileObj, err := os.Open(file) defer fileObj.Close() if err != nil { fmt.Println("open file err: ", err) return } defer fileObj.Close() reader := bufio.NewReader(fileObj) for { line, err := reader.ReadString('\n') if err == io.EOF { close(ch) break } if err != nil { fmt.Println("Read err: ", err) break } // 反序列化json l := logFormat{} json.Unmarshal([]byte(line), &l) lineTime := parseStr2Time(l.Timestamp) if isMatched(lineTime, startTimeObj, endTimeObj) { ch <- &l } } defer wg.Done() } func parseStr2Time(lineTime string) time.Time { s1 := strings.Split(lineTime, "T") s2 := strings.Split(s1[1], "+") s3 := fmt.Sprintf("%s %s", s1[0], s2[0]) t, err := time.ParseInLocation("2006-01-02 15:04:05", s3, time.Local) if err != nil { fmt.Println("ParseInLocation err: ", err) } return t } func isMatched(lineTime time.Time, startTimeObj time.Time, endTimeObj time.Time) bool { if lineTime.Before(endTimeObj) && lineTime.After(startTimeObj) { return true } return false } func writeFile(file string, ch <-chan *logFormat) { defer wg.Done() fileObj, err := os.OpenFile(file, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644) defer fileObj.Close() if err != nil { fmt.Println("Open Write File Faild. err: ", err) } writer := bufio.NewWriter(fileObj) for line := range ch { lineStr, err := json.Marshal(line) if err != nil { fmt.Println("Marshal Faild. err: ", err) } lineFormat := fmt.Sprintf("%s\n", string(lineStr)) writer.WriteString(lineFormat) } writer.Flush() } const ( startTimeStr = "2020-07-17 17:30:00" endTimeStr = "2020-07-17 20:10:00" srcFile = "/opt/tools/access.log_2020-07-17-1" dstFile = "/opt/tools/result" ) var wg sync.WaitGroup func main() { ch := make(chan *logFormat, 1000000) startTimeObj, err := time.ParseInLocation("2006-01-02 15:04:05", startTimeStr, time.Local) if err != nil { fmt.Println("startTime Parse err: ", err) } endTimeObj, err1 := time.ParseInLocation("2006-01-02 15:04:05", endTimeStr, time.Local) if err1 != nil { fmt.Println("endTime Parse err: ", err1) } wg.Add(2) go readFile(srcFile, ch, startTimeObj, endTimeObj) go writeFile(dstFile, ch) wg.Wait() fmt.Println("mession complete.") }