#!/usr/bin/env python # coding: utf-8 import time from datetime import datetime from elasticsearch import Elasticsearch from pprint import pprint import pymongo import redis import logging # # 创建对象时设置标志timeout来实现,如果已创建对象而未指定超时值,则可以通过在查询中使用request_timeout标志来设置特定请求的超时值 # # curl -XPOST http://example.comr:9200/my_index/_close # # curl -XPOST http://example.comr:9200/my_index/_open # # logstash-trace-2019.06.10.08 # res = es.search(index='logstash-trace-2019.06.10.08', scroll = '2m', size = 5) #scroll = '2m'游标保持2分钟 class Es2Mongo(object): def __init__(self, index_name, entry_num): self.es = Elasticsearch( ['172.29.8.37'], http_auth=('admin', 'openet123qwe'), scheme="http", port=9200, timeout=60 ) self.mongo_client = pymongo.MongoClient('mongodb://localhost:27017/') self.redis_conn = redis.StrictRedis(host='127.0.0.1', port=6379, db=0) self.index_name = index_name self.entry_num = entry_num #self.logger = logsetting(self.index_name) def DataWithScroll(self, entry_num, scroll='2h', *scroll_id): if scroll_id: scroll_id = scroll_id[0] res = self.es.scroll(scroll_id=scroll_id, scroll=scroll) else: res = self.es.search(index=self.index_name, scroll=scroll, size=entry_num) return res def FormatJson(self, json_data): _source = json_data['_source'] if _source.get('tags'): _source['tags'] = str(_source['tags']) return _source def FormatDBName(self): date_name = '-'.join(self.index_name.split('.')[:-1]) echo_hour_name = '-'.join(self.index_name.split('.')) return date_name, echo_hour_name def Log2Redis(self, value): key, field = self.FormatDBName() self.redis_conn.hset(key, field, value) def InsertIntoMongo(self, josn_data): dbname, collectionname = self.FormatDBName() db = self.mongo_client[dbname] collection = db[collectionname] collection.autoIndexId = False collection.insert_one(josn_data) def es2mongo(self, all): datas = self.DataWithScroll(self.entry_num) scroll_id = datas['_scroll_id'] size_all = datas['hits']['total'] scroll_size = 1 for data in datas['hits']['hits']: data = self.FormatJson(data) #self.InsertIntoMongo(data) n = self.entry_num last_mess = 0 last_data_count = n start_time = int(time.time()) if all: while (scroll_size > 0): datas = self.DataWithScroll(self.entry_num, '2h', scroll_id) scroll_id = datas['_scroll_id'] scroll_size = len(datas['hits']['hits']) n += self.entry_num unix_timestamp = int(time.time()) if not unix_timestamp % 60 and (unix_timestamp - last_mess) > 1 or (unix_timestamp - last_mess) > 10: # print("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"), # size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all))) # self.logger.info("total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all))) self.Log2Redis("%s total:%s. left:%s. time cost:%ss. query per second:%s, completed percent:%.3f%%" % (datetime.strftime(datetime.now(), "%Y-%m-%d %H:%M:%S"), size_all, size_all - n, unix_timestamp - start_time, (n - last_data_count) / (unix_timestamp - last_mess), float(n) * 100 / float(size_all))) last_mess = unix_timestamp last_data_count = n for data in datas['hits']['hits']: data = self.FormatJson(data) #self.InsertIntoMongo(data) else: # print("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % (datetime.strftime( # datetime.now(), "%Y-%m-%d %H:%M:%S"), size_all, 0, unix_timestamp - start_time)) #self.logger.info("%s time total:%s. left:%s. time cost:%ss. completed percent:%100" % ( # size_all, 0, unix_timestamp - start_time)) self.Log2Redis(1) pass # 5-570 1000-2600 2000-3000 #Es2Mongo('logstash-trace-2019.06.10.11', 2000).es2mongo(all=True)