package com.dongpeng.es;

import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.ParsedLongTerms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;

public class EsClient {

	private static Logger logger = LoggerFactory.getLogger(EsClient.class);

	private RestHighLevelClient client;

	private RestClient restClient;

	/**
	 * 构造连接客户端
	 */
	public EsClient() {
		RestClientBuilder builder = RestClient.builder(new HttpHost("10.136.15.122", 9200, "http"));
		builder.setMaxRetryTimeoutMillis(10000);
		restClient = builder.build();
		client = new RestHighLevelClient(restClient);
	}


	/**
	 * 索引数据
	 *
	 * @throws IOException
	 */
	public void index(Integer ids) {
		Map<String, Object> jsonMap = new HashMap<String, Object>();
		jsonMap.put("user", "bellen");
		jsonMap.put("id",ids);
		jsonMap.put("name", new Date());
		jsonMap.put("message", "trying out Elasticsearch");

		IndexRequest indexRequest = new IndexRequest("posts", "doc", ids.toString()).source(jsonMap);



		try {
			// 获取响应结果
			IndexResponse indexResponse = client.index(indexRequest);
			String index = indexResponse.getIndex();
			String type = indexResponse.getType();
			String id = indexResponse.getId();
			long version = indexResponse.getVersion();

			if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
				logger.info("doc indexed, index: " + index + ", type:" + type + ",id:" + id + ",version:" + version);
			} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
				logger.info("doc updated, index: " + index + ", type:" + type + ",id:" + id + ",version:" + version);
			}
		} catch (ElasticsearchException e) {
			if (e.status() == RestStatus.CONFLICT) {
				logger.error("version conflict");

			}
		} catch (Exception e) {
			logger.error("execute index api failed, " + e.toString());
		}

	}

	/**
	 * id查找数据
	 * @param id
	 */
	public void findById(String id) {
		GetRequest request = new GetRequest("posts", "doc", id);
		try {
			GetResponse getResponse = client.get(request);
			System.out.println(getResponse.getVersion());
			System.out.println(JSON.toJSONString(getResponse.getSource()));
		} catch (ElasticsearchException e) {
			if (e.status() == RestStatus.NOT_FOUND) {
				System.out.println("没有找到元素");
			}
		} catch (Exception e1) {
			e1.printStackTrace();
		}

	}

	/**
	 * 删除数据
	 * @param id
	 */
	public void delete(String id) {
		DeleteRequest request = new DeleteRequest("posts", "doc", id);
		DeleteResponse deleteResponse;
		try {
			deleteResponse = client.delete(request);
			if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
			    System.out.println("未找到元素");
			}
		} catch (IOException e1) {

			e1.printStackTrace();
		}


	}

	/**
	 * 更新数据
	 */
	public void update()  {
		UpdateRequest request = new UpdateRequest(
		        "posts",
		        "doc",
		        "1");
		Map<String, Object> jsonMap = new HashMap<String, Object>();
		jsonMap.put("user", "测试");
		jsonMap.put("name", new Date());
		jsonMap.put("message", "trying out Elasticsearch");
		request.doc(jsonMap);
		try {
			client.update(request);
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	/**
	 * search demo
	 */
	public void search() {
		SearchRequest searchRequest = new SearchRequest("posts");

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(QueryBuilders.matchQuery("message", "trying"));
		sourceBuilder.from(0);
		sourceBuilder.size(5);
		sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

		searchRequest.source(sourceBuilder);
		searchRequest.types("doc");
		try {
			SearchResponse searchResponse = client.search(searchRequest);
			SearchHits hits = searchResponse.getHits();
			System.out.println(hits.getTotalHits());
			SearchHit[] searchHits = hits.getHits();
			for (SearchHit hit : searchHits) {
			    System.out.println(JSON.toJSONString(hit.getSource()));
			}


		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

	/**
	 * 统计demo
	 */
	public void sum() {

		SearchRequest searchRequest = new SearchRequest("posts");

		SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
		sourceBuilder.query(QueryBuilders.matchQuery("message", "trying"));
		sourceBuilder.from(0);
		sourceBuilder.size(5);
		sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));
	    TermsAggregationBuilder termsAggregationBuilder = AggregationBuilders.terms("emp_count").field("id");
		sourceBuilder.aggregation(termsAggregationBuilder);
		searchRequest.source(sourceBuilder);
		searchRequest.types("doc");

		try {

			SearchResponse searchResponse = client.search(searchRequest);
			ParsedLongTerms aggregation = searchResponse.getAggregations().get("emp_count");
			System.out.println(aggregation.getBuckets().size());
			SearchHits hits = searchResponse.getHits();
			System.out.println(hits.getTotalHits());
			SearchHit[] searchHits = hits.getHits();
			for (SearchHit hit : searchHits) {
			    System.out.println(JSON.toJSONString(hit.getSource()));
			}


		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}

	}

	public void close() {
		try {
			restClient.close();
		} catch (IOException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}
12-10 19:03