123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- #!/usr/bin/env python
- # coding: utf-8
- import time
- from datetime import datetime
- from elasticsearch import Elasticsearch
- from pprint import pprint
- import pymongo
- import redis
- import logging
- # # 创建对象时设置标志timeout来实现,如果已创建对象而未指定超时值,则可以通过在查询中使用request_timeout标志来设置特定请求的超时值
- # # curl -XPOST http://example.comr:9200/my_index/_close
- # # curl -XPOST http://example.comr:9200/my_index/_open
- # # logstash-trace-2019.06.10.08
- # res = es.search(index='logstash-trace-2019.06.10.08', scroll = '2m', size = 5) #scroll = '2m'游标保持2分钟
- class Es2Mongo(object):
- def __init__(self, index_name, entry_num):
- self.es = Elasticsearch(
- ['172.29.8.37'],
- http_auth=('admin', 'openet123qwe'),
- scheme="http",
- port=9200,
- timeout=60
- )
- self.mongo_client = pymongo.MongoClient('mongodb://localhost:27017/')
- self.redis_conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0)
- self.index_name = index_name
- self.entry_num = entry_num
- #self.logger = logsetting(self.index_name)
- def DataWithScroll(self, entry_num, scroll='2h', *scroll_id):
- if scroll_id:
- scroll_id = scroll_id[0]
- res = self.es.scroll(scroll_id=scroll_id, scroll=scroll)
- else:
- res = self.es.search(index=self.index_name,
- scroll=scroll, size=entry_num)
- return res
- def FormatJson(self, json_data):
- _source = json_data['_source']
- if _source.get('tags'):
- _source['tags'] = str(_source['tags'])
- return _source
- def FormatDBName(self):
- date_name = '-'.join(self.index_name.split('.')[:-1])
- echo_hour_name = '-'.join(self.index_name.split('.'))
- return date_name, echo_hour_name
- def Log2Redis(self, value):
- key, field = self.FormatDBName()
- self.redis_conn.hset(key, field, value)
- def InsertIntoMongo(self, josn_data):
- dbname, collectionname = self.FormatDBName()
- db = self.mongo_client[dbname]
- collection = db[collectionname]
- collection.autoIndexId = False
- collection.insert_one(josn_data)
- def es2mongo(self, all):
- datas = self.DataWithScroll(self.entry_num)
- scroll_id = datas['_scroll_id']
- size_all = datas['hits']['total']
- scroll_size = 1
- for data in datas['hits']['hits']:
- data = self.FormatJson(data)
- #self.InsertIntoMongo(data)
- n = self.entry_num
- last_mess = 0
- last_data_count = n
- start_time = int(time.time())
- if all:
- while (scroll_size > 0):
- datas = self.DataWithScroll(self.entry_num, '2h', scroll_id)
- scroll_id = datas['_scroll_id']
- scroll_size = len(datas['hits']['hits'])
- n += self.entry_num
- unix_timestamp = int(time.time())
- if not unix_timestamp % 60 and (unix_timestamp - last_mess) > 1 or (unix_timestamp - last_mess) > 10:
- # print("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
- # size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
- # self.logger.info("total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
- self.Log2Redis("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"),
- size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all)))
- last_mess = unix_timestamp
- last_data_count = n
- for data in datas['hits']['hits']:
- data = self.FormatJson(data)
- #self.InsertIntoMongo(data)
- else:
- # print("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % (datetime.strftime(
- # datetime.now(), "%Y-%m-%d %H:%M:%S"), size_all, 0, unix_timestamp - start_time))
- #self.logger.info("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % (
- # size_all, 0, unix_timestamp - start_time))
- self.Log2Redis(1)
- pass
- # 5-570 1000-2600 2000-3000
- #Es2Mongo('logstash-trace-2019.06.10.11', 2000).es2mongo(all=True)
|