123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384 |
- #!/usr/bin/env python
- # coding: utf-8
- from elasticsearch import Elasticsearch
- from esdata2mongo import Es2Mongo
- import multiprocessing
- import datetime
- import sys
- import redis
- import time
- import logging
- # settings
- redis_ip = "127.0.0.1"
- redis_port = 6379
- def index_list(some_days_ago):
- '''
- some_days_ago:
- any days befor today.
- '''
- indices = []
- for hour in range(0, 24):
- hour = "%02d" % hour
- index = "%s-%s.%s" % ("logstash-trace", (datetime.datetime.now() -
- datetime.timedelta(days=some_days_ago)).strftime("%Y.%m.%d"), hour)
- indices.append(index)
- return indices
- def logsetting(logname):
- logger = logging.getLogger(__name__)
- logger.setLevel(level=logging.INFO)
- handler = logging.FileHandler(logname + '.log')
- handler.setLevel(logging.INFO)
- formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- return logger
- def progress2log(some_days_ago):
- indices = index_list(some_days_ago)
- logname = '-'.join(indices[0].split('.')[:-1])
- logger = logsetting(logname)
- r = redis.StrictRedis(host=redis_ip, port=redis_port, db=0)
- n = 0
- while n != 24:
- for index in indices:
- key, field=Es2Mongo(index, 2000).FormatDBName()
- value = r.hget(key, field)
- if n == 24:
- return 1
- if value == 1:
- logger.info("%s %s" % (index, "Data Processing Complete."))
- n += 1
- else:
- logger.info("%s %s" % (index, value))
- time.sleep(30)
- def run(some_days_ago):
- plist = []
- indices = index_list(some_days_ago)
- for index in indices:
- p = Es2Mongo(index, 2000)
- process = multiprocessing.Process(target=p.es2mongo, args=(1,))
- plist.append(process)
- p1 = multiprocessing.Process(
- target=progress2log, args=(some_days_ago,))
- plist.append(p1)
- for p in plist:
- p.start()
- print "all job are running..."
- if __name__ == "__main__":
- if len(sys.argv) < 2:
- print "you need to specify parameters."
- sys.exit(1)
- some_days_ago = int(sys.argv[1])
- run(some_days_ago)
|