ELK

ELK(ElasticsearchLogstashKibana

Elasticsearch: Elasticsearch 是基于 JSON 的分布式搜索和分析引擎,专为实现水平扩展、高可用和管理便捷性而设计

Logstash: Logstash 是动态数据收集管道,拥有可扩展的插件生态系统,能够与 Elasticsearch 产生强大的协同作用

Kibana: Kibana 能够以图表的形式呈现数据,并且具有可扩展的用户界面,供您全方位配置和管理 Elastic Stack

ELK 的安装配置(Windows 10)

安装目前最新版本 Elasticsearch 5.6.2,需要确保 Java 的版本至少为 Java 8

由于在 Windows 的环境下,所以可以直接下载官网的 elasticsearch-5.6.2.msi 进行安装。安装完成之后,管理员身份运行 elasticsearch.exe 即可,成功之后便可以访问 localhost:9200,会显示如下信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
{
"name" : "DESKTOP-QSQV2PB",
"cluster_name" : "elasticsearch",
"cluster_uuid" : "Dluxg6TbSVOiOunEsjQdIA",
"version" : {
"number" : "5.6.2",
"build_hash" : "57e20f3",
"build_date" : "2017-09-23T13:16:45.703Z",
"build_snapshot" : false,
"lucene_version" : "6.6.1"
},
"tagline" : "You Know, for Search"
}

之后再下载 KibanaLogstashzip,直接解压即可。这里要注意,要保持 ELK 版本的一致,即都是 5.6.2

运行 kibana.bat 成功之后,浏览器便可以访问 localhost:5601,通过图形界面浏览、操作数据

对于 Logstash,先创建文件 Logstash.conf,用于定义数据的输入,过滤,以及输出。例如,解析文件,并且将解析后的数据存入 Elasticsearch

文件内的数据结构:

1
192.168.1.1 name1

Logstash.conf 定义:

input{} 定义数据的来源,file{} 从文件中读取数据,start_position 设置为 beginning 代表第一次读取文件时,从初始位置开始读取数据。因为读取文件的同时,会记录读取到的位置,所以之后文件里增加新数据后,是直接读取新的数据,并不会重头开始读取数据

filter{} 定义数据的处理,grok{} 解析非结构化数据,一些官方写好的正则 patterns,可以直接使用

output{} 定义数据的输出,elasticsearch{} 输出到 Elasticsearch 里,stdout{} 输出到正在运行的界面中,方便查看

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
input{
file{
path => ["E:/test.txt"]
start_position => "beginning"
}
}

filter{
grok{
match => {
"message" => "%{IP:server_ip} %{WORD:server_name}"
}
}
}

output{
elasticsearch{
hosts => ["localhost:9200"]
index => "test-logstash-%{+YYYY-MM}"
document_type => "file_test"
}
stdout{
codec => rubydebug
}
}

之后通过命令提示符,在 bin 目录下运行:

1
c:\logstash-5.6.2\bin>.\logstash.bat -f ..\config\Logstash.conf

由于设置了 stdout,所以可以在界面上看到:

1
2
3
4
5
6
7
8
9
{
"path" => "E:/test.txt",
"server_name" => "name1",
"@timestamp" => 2017-10-19T12:46:37.272Z,
"@version" => "1",
"host" => "DESKTOP-QSQV2PB",
"server_ip" => "192.168.1.1",
"message" => "192.168.1.1 name1\r"
}

同时可以在 Kibana 的浏览器界面的控制台 Dev Tools -> Console 输入 GET /test-logstash-2017-10/_search 进行查看

Java 与 Elasticsearch

这里采用 Java High Level REST Client 来进行操作

Maven Repository:

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>5.6.2</version>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.9.1</version>
</dependency>

同时在 src/main/resources 创建文件 log4j2.properties

1
2
3
4
5
6
appender.console.type = Console
appender.console.name = console
appender.console.layout.type = PatternLayout

rootLogger.level = info
rootLogger.appenderRef.console.ref = console

例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
import org.apache.http.HttpHost;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;

import java.io.IOException;
import java.util.Map;

public class ElasticsearchClient {

private RestClient restClient;
private RestHighLevelClient restHighLevelClient;

public ElasticsearchClient() {
}

public ElasticsearchClient(String hostname, int port, String scheme) {

HttpHost httpHost = new HttpHost(hostname, port, scheme);
buildClient(httpHost);
}

// 建立连接
public void buildClient(HttpHost httpHost) {

restClient = RestClient.builder(httpHost).build();
restHighLevelClient = new RestHighLevelClient(restClient);
}

// 关闭连接
public void closeClient() {

try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}

// 插入数据,index,type,id 确定插入的位置
// 当前位置没有数据时,创建,状态为 indexResponse.getResult() == DocWriteResponse.Result.CREATED
// 当前位置存在数据时,覆盖,状态为 indexResponse.getResult() == DocWriteResponse.Result.UPDATED
public IndexResponse index(Map<String, Object> map, String index, String type, String id) {

IndexRequest indexRequest = new IndexRequest(index, type, id).source(map);

IndexResponse indexResponse = null;
try {
indexResponse = restHighLevelClient.index(indexRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return indexResponse;
}

// 获取数据
// 通过 getResponse.isExists() 判断当前位置是否有数据
// 当 index 不存在时,会抛出异常,其状态为 e.status() == RestStatus.NOT_FOUND
// 若设定了 getRequest.version(),当查找的 version 不符合时,会抛出异常,其状态为 e.status() == RestStatus.CONFLICT
// fetchSource 默认为 true,设置为 false 时,不取出数据
// includes 设置需要取出的数据,为 Strings.EMPTY_ARRAY 或者 null 时,取出所有数据
// excludes 设置需要屏蔽的数据,为 Strings.EMPTY_ARRAY 或者 null 时,不屏蔽数据
public GetResponse get(String index, String type, String id, boolean fetchSource, String[] includes, String[] excludes) {

GetRequest getRequest = new GetRequest(index, type, id);

FetchSourceContext fetchSourceContext = new FetchSourceContext(fetchSource, includes, excludes);
getRequest.fetchSourceContext(fetchSourceContext);

GetResponse getResponse = null;
try {
getResponse = restHighLevelClient.get(getRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return getResponse;
}

// 删除数据
// 当删除的文件不存在时,deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND
// 若设定了 deleteRequest.version(),当删除的 version 不符合时,会抛出异常,其状态为 e.status() == RestStatus.CONFLICT
public DeleteResponse delete(String index, String type, String id) {

DeleteRequest deleteRequest = new DeleteRequest(index, type, id);

DeleteResponse deleteResponse = null;
try {
deleteResponse = restHighLevelClient.delete(deleteRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return deleteResponse;
}

// 更新数据
// 当前位置没有数据时,会抛出异常,其状态为 e.status() == RestStatus.NOT_FOUND
// updateRequest.upsert(map, XContentType.JSON) 若当前位置没有数据,则会以 map 创建数据
// updateRequest.fetchSource() 默认为 false,updateResponse.getGetResult() 为 null
public UpdateResponse update(String index, String type, String id, Map<String, Object> map) {

UpdateRequest updateRequest = new UpdateRequest(index, type, id).doc(map);

UpdateResponse updateResponse = null;
try {
updateResponse = restHighLevelClient.update(updateRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return updateResponse;
}

// 块处理
// bulkRequest.add() 添加操作
// bulkResponse.hasFailures() 快速检查是否存在操作失败的情况
// for (BulkItemResponse bulkItemResponse : bulkResponse) 遍历
// bulkItemResponse.isFailed() 判断是否失败
// DocWriteResponse docWriteResponse = bulkItemResponse.getResponse() 得到结果
// bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX 结果类型匹配
public BulkResponse bulk(BulkRequest bulkRequest) {

BulkResponse bulkResponse = null;
try {
bulkResponse = restHighLevelClient.bulk(bulkRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return bulkResponse;
}

// 查找数据
// new SearchRequest(index) 可以在创建的时候限定 index
// searchRequest.types(type) 可以限定 type
// searchSourceBuilder.from() 代表从结果的第几个开始获取,默认为 0
// searchSourceBuilder.size() 代表取多少个结果,默认为 10
// 有两种查找方式,term 和 match,主要区别体现在字符串的查找上
// 字符串在存储过程中,有类型文本和关键字,两种形态
// 当属于类型文本:Elasticsearch-test,其会被分词器给拆分成一个个单词,并且小写:elasticsearch,test
// 用 term 查询 Elasticsearch-test 会失败,因为 term 将值当作关键字来查找,并不会进行分词
// 用 match 查询 Elasticsearch-test 会成功,因为 match 会将其进行分词后,进行查找
public SearchResponse search(SearchRequest searchRequest) {

SearchResponse searchResponse = null;
try {
searchResponse = restHighLevelClient.search(searchRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return searchResponse;
}

// 滚动查找
// 声明 searchRequest.scroll() 之后,启用滚动查找,即可以获取 searchResponse.getScrollId()
// new Scroll(TimeValue.timeValueMinutes(1L)) 滚动查找的有效时间
// 也可以说是 scrollId 的有效使用时间,一旦超过时间,scrollId 便会过期
// new SearchScrollRequest(scrollId) 将当前 scrollId 传入,即可获取下一个片段的结果以及 scrollId
// 传入 scrollId 时,同时还要重新设定 scroll 有效时间
// 当 scrollId 过期时,会自动释放搜索上下文
// 建议 clearScrollRequest.addScrollId(scrollId) 手动释放搜索上下文
public SearchResponse searchScroll(SearchScrollRequest searchScrollRequest) {

SearchResponse searchResponse = null;
try {
searchResponse = restHighLevelClient.searchScroll(searchScrollRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return searchResponse;
}

// 释放滚动搜索的上下文
// 当 scrollId 过期时,会自动释放搜索上下文
// 建议 clearScrollRequest.addScrollId(scrollId) 手动释放搜索上下文
// clearScrollResponse.isSucceeded() 清除滚动是否成功
public ClearScrollResponse clearScroll(ClearScrollRequest scrollRequest) {

ClearScrollResponse clearScrollResponse = null;
try {
clearScrollResponse = restHighLevelClient.clearScroll(scrollRequest);
} catch (ElasticsearchException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return clearScrollResponse;
}
}