esdata2mongo.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. #!/usr/bin/env python
  2. # coding: utf-8
  3. import time
  4. from datetime import datetime
  5. from elasticsearch import Elasticsearch
  6. from pprint import pprint
  7. import pymongo
  8. import redis
  9. import logging
  10. # # 创建对象时设置标志timeout来实现,如果已创建对象而未指定超时值,则可以通过在查询中使用request_timeout标志来设置特定请求的超时值
  11. # # curl -XPOST http://example.comr:9200/my_index/_close
  12. # # curl -XPOST http://example.comr:9200/my_index/_open
  13. # # logstash-trace-2019.06.10.08
  14. # res = es.search(index='logstash-trace-2019.06.10.08', scroll = '2m', size = 5) #scroll = '2m'游标保持2分钟
  15. class Es2Mongo(object):
  16. def __init__(self, index_name, entry_num):
  17. self.es = Elasticsearch(
  18. ['172.29.8.37'],
  19. http_auth=('admin', 'openet123qwe'),
  20. scheme="http",
  21. port=9200,
  22. timeout=60
  23. )
  24. self.mongo_client = pymongo.MongoClient('mongodb://localhost:27017/')
  25. self.redis_conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
  26. self.index_name = index_name
  27. self.entry_num = entry_num
  28. #self.logger = logsetting(self.index_name)
  29. def DataWithScroll(self, entry_num, scroll='2h', *scroll_id):
  30. if scroll_id:
  31. scroll_id = scroll_id[0]
  32. res = self.es.scroll(scroll_id=scroll_id, scroll=scroll)
  33. else:
  34. res = self.es.search(index=self.index_name,
  35. scroll=scroll, size=entry_num)
  36. return res
  37. def FormatJson(self, json_data):
  38. _source = json_data['_source']
  39. if _source.get('tags'):
  40. _source['tags'] = str(_source['tags'])
  41. return _source
  42. def FormatDBName(self):
  43. date_name = '-'.join(self.index_name.split('.')[:-1])
  44. echo_hour_name = '-'.join(self.index_name.split('.'))
  45. return date_name, echo_hour_name
  46. def Log2Redis(self, value):
  47. key, field = self.FormatDBName()
  48. self.redis_conn.hset(key, field, value)
  49. def InsertIntoMongo(self, josn_data):
  50. dbname, collectionname = self.FormatDBName()
  51. db = self.mongo_client[dbname]
  52. collection = db[collectionname]
  53. collection.autoIndexId = False
  54. collection.insert_one(josn_data)
  55. def es2mongo(self, all):
  56. datas = self.DataWithScroll(self.entry_num)
  57. scroll_id = datas['_scroll_id']
  58. size_all = datas['hits']['total']
  59. scroll_size = 1
  60. for data in datas['hits']['hits']:
  61. data = self.FormatJson(data)
  62. #self.InsertIntoMongo(data)
  63. n = self.entry_num
  64. last_mess = 0
  65. last_data_count = n
  66. start_time = int(time.time())
  67. if all:
  68. while (scroll_size > 0):
  69. datas = self.DataWithScroll(self.entry_num, '2h', scroll_id)
  70. scroll_id = datas['_scroll_id']
  71. scroll_size = len(datas['hits']['hits'])
  72. n += self.entry_num
  73. unix_timestamp = int(time.time())
  74. if not unix_timestamp % 60 and (unix_timestamp - last_mess) > 1 or (unix_timestamp - last_mess) > 10:
  75. # print("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
  76. # size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
  77. # self.logger.info("total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
  78. self.Log2Redis("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
  79. size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
  80. last_mess = unix_timestamp
  81. last_data_count = n
  82. for data in datas['hits']['hits']:
  83. data = self.FormatJson(data)
  84. #self.InsertIntoMongo(data)
  85. else:
  86. # print("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % (datetime.strftime(
  87. # datetime.now(), "%Y-%m-%d %H:%M:%S"), size_all, 0, unix_timestamp - start_time))
  88. #self.logger.info("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % (
  89. # size_all, 0, unix_timestamp - start_time))
  90. self.Log2Redis(1)
  91. pass
  92. # 5-570 1000-2600 2000-3000
  93. #Es2Mongo('logstash-trace-2019.06.10.11', 2000).es2mongo(all=True)