#!/usr/bin/env python # coding: utf-8 from elasticsearch import Elasticsearch from esdata2mongo import Es2Mongo import multiprocessing import datetime import sys import redis import time import logging # settings redis_ip = "127.0.0.1" redis_port = 6379 def index_list(some_days_ago): ''' some_days_ago: any days befor today. ''' indices = [] for hour in range(0, 24): hour = "%02d" % hour index = "%s-%s.%s" % ("logstash-trace", (datetime.datetime.now() - datetime.timedelta(days=some_days_ago)).strftime("%Y.%m.%d"), hour) indices.append(index) return indices def logsetting(logname): logger = logging.getLogger(__name__) logger.setLevel(level=logging.INFO) handler = logging.FileHandler(logname + '.log') handler.setLevel(logging.INFO) formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger def progress2log(some_days_ago): indices = index_list(some_days_ago) logname = '-'.join(indices[0].split('.')[:-1]) logger = logsetting(logname) r = redis.StrictRedis(host=redis_ip, port=redis_port, db=0) n = 0 while n != 24: for index in indices: key, field=Es2Mongo(index, 2000).FormatDBName() value = r.hget(key, field) if n == 24: return 1 if value == 1: logger.info("%s %s" % (index, "Data Processing Complete.")) n += 1 else: logger.info("%s %s" % (index, value)) time.sleep(30) def run(some_days_ago): plist = [] indices = index_list(some_days_ago) for index in indices: p = Es2Mongo(index, 2000) process = multiprocessing.Process(target=p.es2mongo, args=(1,)) plist.append(process) p1 = multiprocessing.Process( target=progress2log, args=(some_days_ago,)) plist.append(p1) for p in plist: p.start() print "all job are running..." if __name__ == "__main__": if len(sys.argv) < 2: print "you need to specify parameters." sys.exit(1) some_days_ago = int(sys.argv[1]) run(some_days_ago)