transdata.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. from elasticsearch import Elasticsearch
  4. from esdata2mongo import Es2Mongo
  5. import multiprocessing
  6. import datetime
  7. import sys
  8. import redis
  9. import time
  10. import logging
  11. # settings
  12. redis_ip = "127.0.0.1"
  13. redis_port = 6379
  14. def index_list(some_days_ago):
  15. '''
  16. some_days_ago:
  17. any days befor today.
  18. '''
  19. indices = []
  20. for hour in range(0, 24):
  21. hour = "%02d" % hour
  22. index = "%s-%s.%s" % ("logstash-trace", (datetime.datetime.now() -
  23. datetime.timedelta(days=some_days_ago)).strftime("%Y.%m.%d"), hour)
  24. indices.append(index)
  25. return indices
  26. def logsetting(logname):
  27. logger = logging.getLogger(__name__)
  28. logger.setLevel(level=logging.INFO)
  29. handler = logging.FileHandler(logname + '.log')
  30. handler.setLevel(logging.INFO)
  31. formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
  32. handler.setFormatter(formatter)
  33. logger.addHandler(handler)
  34. return logger
  35. def progress2log(some_days_ago):
  36. indices = index_list(some_days_ago)
  37. logname = '-'.join(indices[0].split('.')[:-1])
  38. logger = logsetting(logname)
  39. r = redis.StrictRedis(host=redis_ip, port=redis_port, db=0)
  40. n = 0
  41. while n != 24:
  42. for index in indices:
  43. key, field=Es2Mongo(index, 2000).FormatDBName()
  44. value = r.hget(key, field)
  45. if n == 24:
  46. return 1
  47. if value == 1:
  48. logger.info("%s %s" % (index, "Data Processing Complete."))
  49. n += 1
  50. else:
  51. logger.info("%s %s" % (index, value))
  52. time.sleep(30)
  53. def run(some_days_ago):
  54. plist = []
  55. indices = index_list(some_days_ago)
  56. for index in indices:
  57. p = Es2Mongo(index, 2000)
  58. process = multiprocessing.Process(target=p.es2mongo, args=(1,))
  59. plist.append(process)
  60. p1 = multiprocessing.Process(
  61. target=progress2log, args=(some_days_ago,))
  62. plist.append(p1)
  63. for p in plist:
  64. p.start()
  65. print "all job are running..."
  66. if __name__ == "__main__":
  67. if len(sys.argv) < 2:
  68. print "you need to specify parameters."
  69. sys.exit(1)
  70. some_days_ago = int(sys.argv[1])
  71. run(some_days_ago)