数据分析的七种武器-elk
什么是 ELK
ELK 是三个开源项目的首字母缩写,分别为: Elasticsearch,Logstash 和 Kibana。
分别解决什么问题
1. Logstash
Logstash是一个服务器端数据接收、处理、转发工具,可同时从多个源获取数据,将其转换,然后将其发送到像Elasticsearch这样的“存储”中。官方称其为 “server-side data processing pipeline ” ,即数据处理管道。
-
Logstash 可以接受多种不同来源的日志数据,包括Log、metrics、web 应用日志等。
-
Logstash 可以解析、富化、转换接收到的数据。
-
Logstash 可以将结果数据发送到对应的存储中(可以有多个、并且支持 ES、HDFS等多种发送对象)
2. ElasticSearch
Elasticsearch 是一个分布式的 RESTful 风格的检索和数据分析引擎,能够解决不同场景下的各种搜索、统计分析问题。是 ELK 的核心。
-
分布式,高可用,和数据库相比水平扩容更容易。
-
提供近实时的检索能力,安全场景下需要能及时的发现问题,问题发现的越晚损失越大。传统 数据库在日志量大的情况下表现有限,不适合用来做日志分析。
-
除了检索之外还提供 Aggregation 聚合的功能,虽然使用起来没有 Hadoop MapReduce 那么自由,仍然能支持统计后可视化、近实时分析告警等功能。
3 Kibana
Kibana 是 ElasticSearch 的官方可视化工具。通过 Kibana,用户能使用 ES 中已有的数据制作各种可视化图表,并且能可视化的在 ES 中检索数据。
Logstash 实战,接收解析 syslog
1.syslog 简介
安全设备通常都支持配置通过 syslog 发送设备产生的告警日志、平台日志(管理日志 ,管理员登录日志,系统运行日志)等。
syslog (https://en.wikipedia.org/wiki/Syslog)是一种网络设备常用的日志标准。RFC 5424 中定义了统一的日志格式,以及传输的方法(使用 UDP 协议传输,默认发送的目标端口为 514,通常一个 UDP 包携带一条 syslog)。
其中 syslog 标准结构为 PRI | HEADER | MSG
,其中 HEADER 由 TIMESTAMP 和 HOSTNAME 组成。每个 syslog 的总大小需小于等于 1024 字节。
2.Logstash 简介
Logstash 分为 input -> filter -> output 三个模块。 每个模块都支持以 JRuby 编写自定义插件,并且有丰富的社区支持。input、output 自带多种输入输出的支持,并且支持 codecs 功能,即对流入流出的数据进行编解码。 filter 对输入的数据进行修改。
为了接收、解析归一化安全设备的 syslog,我们主要用到 Input Plugins 中的 Syslog Input Plugin,Filter Plugins 中的 Mutate 和 Grok 组件, Output Plugins 中的 ElasticSearch output plugin, Kafka output plugin, stdout output plugin(用于调试)。
Logstash 的默认的配置文件结构如下
input {
...
}
filter {
...
}
output {
...
}
3.配置数据接收
input 部分接收 syslog 输入,参考 https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-syslog.html 。给接收到的 message 添加一个 type 字段,方便和其他来源的 message 做区分。
input {
syslog{
port => 514
type => "device-syslog"
}
}
4.配置数据解析转换
syslog input plugin 接收的 syslog 消息中 msg 部分默认会解析到 Logstash 消息的 message 字段中。
例如下面的某厂商的 IDS 告警,使用 syslog input 接收,然后直接使用 file output plugin 写到本地文件,可以看到
{"severity":0,"@timestamp":"2017-09-18T19:39:42.574Z","@version":"1","host":"172.16.0.1",
"message":"<5>time:2017-09-19 15:15:59;danger_degree:2;breaking_sighn:0;event:[20381]HTTP服务暴力猜测口令攻击;src_addr:192.168.108.6;src_port:10080;dst_addr:59.108.125.206;dst_port:80;proto:TCP.HTTP;user:",
"type": "device-syslog","priority":0,"facility":0,"severity_label":"Emergency","tags":[],"facility_label":"kernel"}
我们要做的是解析出 “message” 字段中我们需要的信息。这时候就需要用到 Grok 插件了。
Grok 使用 patten match 的方式,将非结构化的数据转换为结构化数据(正则匹配提取)。
grok pattern 的格式为 %{SYNTAX:SEMANTIC}
, 其中 SYNATX 是预先定义好的 pattern(如 IP、NUMBER 等),SEMANTIC 为目标提取出的字段的名字。
例如字符串 “3.44 55.3.244.1” 就可以用 pattern "%{NUMBER:duration} %{IP:client}"
匹配,匹配后生成的消息为
{
"duration": 3.44,
"client": "55.3.244.1"
}
Logstash 中内置了相当多的 pattern https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns,其中包括了解析 NetScreen、Cisco 等厂商的防火墙日志,以及常用的开源IDS (bro) 等的pattern。
gork-pattern https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns 目录中包含了许多有用的pattern,如上面例子中用到的 NUMBER、IP 等,使用这些 pattern 组合成的表达式基本上可以满足大部分的解析场景了。
除了内置的 pattern ,Grok 还支持使用 Oniguruma(https://github.com/kkos/oniguruma/blob/master/doc/RE)格式的正则表达式定制 pattern。同时,因为 Grok 表达式本身也是基于正则表达式实现的,所以 Grok 表达式中能将预定义的 pattern 和 正则表达式混合编写。具体参考上面给出的 grok filter plugin 的文档。
上面例子中 syslog 的 message 字段如下
"message":"<5>time:2017-09-19 15:15:59;danger_degree:2;breaking_sighn:0;event:[20381]HTTP服务暴力猜测口令攻击;src_addr:192.168.108.6;src_port:10080;dst_addr:59.108.125.206;dst_port:80;proto:TCP.HTTP;user:"
对应的 Grok 表达式如下
time:%{TIMESTAMP_ISO8601:start_time};danger_degree:%{INT:danger_degree}%{DATA};event:%{DATA:event};src_addr:%{IP:sip};src_port:%{INT:sport};dst_addr:%{IP:dip};dst_port:%{INT:dport};proto:%{DATA:proto};user:%{DATA:user}
编写调试 Grok 表达式时,可以使用 Kinaba 自带Dev Tools 中的 Grok Debugger, 或者在 http://grokdebug.herokuapp.com和 http://grokconstructor.appspot.com/ 中在线调试。
写完 grok 表达式后,filter 部分中还需要加上 if 语句判断数据是不是来自指定的设备,以及给该消息加上来自某某设备的标记。
filter {
if [type] == "device-syslog" {
if [host] == "172.16.0.1" {
grok {
match => { "message" => 'time:%{TIMESTAMP_ISO8601:start_time};danger_degree:%{INT:danger_degree}%{DATA};event:%{DATA:event};src_addr:%{IP:sip};src_port:%{INT:sport};dst_addr:%{IP:dip};dst_port:%{INT:dport};proto:%{DATA:proto};user:%{DATA:user}'}
}
mutate {
add_field => ["systype", "syslog"]
add_field => ["devtype", "IDS"]
}
}
}
}
5.配置数据转发
最后,需要配置 output,将 数据发送到指定的存储中。在御见安全态势感知平台中,我们使用了两级的 Logstash 架构,第一级的 Logstash 负责收集 syslog 等外部数据发送到 kafka 队列中,第二级Logstash 从 kafka中消费数据,存储到 HDFS 和 ES 中。入 kafka 的数据同时会被 Flink 消费,根据预定义的规则实时匹配发现威胁。
以发送到 kafka 为例,配置 output 如下
output
{
if [type]== "device_syslog" {
kafka
{
topic_id => "device_syslog"
bootstrap_servers => "192.168.0.1:9092"
codec=>json
}
}
}
ElasticSearch 实战
安装&运行
出于学习目的安装 ES 可以通过 docker 安装的方式,参考https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html 。
我们基于 ES 5.6.13 版本进行下面的操作。
拉取 es 镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.13
使用如下命令启动 ElasticSearch
docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:5.6.13
验证启动成功: Docker 镜像启动的 ES 默认需要帐号密码进行 http basic auth 认证,默认的用户名为 elastic, 密码为 changeme。
curl -uelastic:changeme 127.0.0.1:9200
{
"name" : "sqcLgAk",
"cluster_name" : "docker-cluster",
"cluster_uuid" : "2FwttFhNQUuFIsnT-XxTtQ",
"version" : {
"number" : "5.6.13",
"build_hash" : "4d5320b",
"build_date" : "2018-10-30T19:05:08.237Z",
"build_snapshot" : false,
"lucene_version" : "6.6.1"
},
"tagline" : "You Know, for Search"
}
Document
Document 是 ElasticSearch 中最基础的可存储单位,格式为 JSON。 也就是说我们的数据都是以 json 格式输入到 ES 中的。
我们以 elastic 官方 example https://github.com/elastic/examples 中的 nginx json log 为例,进行接下来的介绍。
Nginx json log 的格式如下
{"time": "17/May/2015:08:05:27 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}
Index
Index(索引) 是 Document 的集合,同一类型的 Document 可以放到同一个 Index 中。
Cat Indices
访问 _cat/indices
查看现有索引。
curl -uelastic:changeme -XGET '127.0.0.1:9200/_cat/indices?pretty'
yellow open .monitoring-es-6-2019.03.13 -wP7KCK7SIuXlCOGOk6dhw 1 1 3 0 29.3kb 29.3kb
yellow open .watches QHHhTeufRXiguIO50d3PIQ 1 1 4 0 35.3kb 35.3kb
Create Empty Index
发送 PUT 请求到 host/<index_name>
可以创建默认的索引
curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/?pretty'
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "nginx_log"
}
可以看到默认创建的索引 mappings 为空。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/?pretty'
{
"nginx_log" : {
"aliases" : { },
"mappings" : { },
"settings" : {
"index" : {
"creation_date" : "1544187983345",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "IMjybQOYRIqmE2SFDbiicg",
"version" : {
"created" : "5061399"
},
"provided_name" : "nginx_log"
}
}
}
}
Create Default Mapping
存储在 ES 中的每一个 document 都有一个唯一标识,可以由 {host}/{index}/{type}/{id}
索引到。
使用 HTTP POST 请求提交 json 到 /{index}/{type}/
时,将自动创建 document id.
curl -uelastic:changeme -XPOST '127.0.0.1:9200/nginx_log/log/?pretty' -d @nginx_log_example.json
{
"_index" : "nginx_log",
"_type" : "log",
"_id" : "AWeIzUw3ny5jTHvrzHew",
"_version" : 1,
"result" : "created",
"_shards" : {
"total" : 2,
"successful" : 1,
"failed" : 0
},
"created" : true
}
可以看到自动创建的 id 为 "_id" : "AWeIzUw3ny5jTHvrzHew"
,所以访问 /nginx_log/log/AWeIzUw3ny5jTHvrzHew/
即可访问对应的 Document。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/AWeIzUw3ny5jTHvrzHew/?pretty'
{
"_index" : "nginx_log",
"_type" : "log",
"_id" : "AWeIzUw3ny5jTHvrzHew",
"_version" : 1,
"found" : true,
"_source" : {
"time" : "17/May/2015:08:05:27 +0000",
"remote_ip" : "93.180.71.3",
"remote_user" : "-",
"request" : "GET /downloads/product_1 HTTP/1.1",
"response" : 304,
"bytes" : 0,
"referrer" : "-",
"agent" : "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
}
}
curl 对应的 index/type/_mapping
,可以看到 ES 已经帮我们建好了默认的 mapping。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_mapping?pretty'
{
"nginx_log" : {
"mappings" : {
"log" : {
"properties" : {
"agent" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"bytes" : {
"type" : "long"
},
"referrer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"remote_ip" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"remote_user" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"request" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"response" : {
"type" : "long"
},
"time" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
}
}
}
}
}
}
DELETE Index
发送 http delete 请求到对应索引 url 即可删除索引。
curl -XDELETE -uelastic:changeme '127.0.0.1:9200/nginx_log/?pretty'
{
"acknowledged" : true
}
SET Mapping Manually
ES index 的 mapping 一经设置后便无法修改,如果要修改 ES 的 mapping 则需要删除旧的索引重新创建。
为了方便检索,我们把 remote_ip 字段设置为 ip 格式,time 设置为 ES 的 date 格式(需要制定 format 为 dd/MMM/yyyy:HH:mm:ss Z
参考date types)。参考 nginx_log_mapping.json
curl -XDELETE -uelastic:changeme '127.0.0.1:9200/nginx_log/?pretty'
curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/?pretty'
curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/_mapping/log/?pretty' -d @nginx_log_mapping.json
再 curl 对应的 mapping,可以看到已经创建成功。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_mapping?pretty'
{
"nginx_log" : {
"mappings" : {
"log" : {
"properties" : {
"agent" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"bytes" : {
"type" : "long"
},
"referrer" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"remote_ip" : {
"type" : "ip"
},
"remote_user" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"request" : {
"type" : "text",
"fields" : {
"keyword" : {
"type" : "keyword",
"ignore_above" : 256
}
}
},
"response" : {
"type" : "long"
},
"time" : {
"type" : "date",
"format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||dd/MMM/yyyy:HH:mm:ss Z"
}
}
}
}
}
}
批量插入数据
一般情况下,我们需要使用 Logstash 配置将数据转发到 Elasticsearch。
在测试情况下,手动插入数据可以考虑使用 stream2es ( ES 5.x 及以上版本不再兼容)
使用 curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es
下载安装 stream2es。(注意,使用 stream2es 需要 java 8,如果是 os x 系统可以使用 brew cask uninstall java;brew tap caskroom/versions;brew cask install java8
来降级 java8。)
把上文提到的 nginx json log 保存到当前目录, 然后使用下面的命令将数据批量插入到 ES。
cat nginx_json_logs |./stream2es stdin --target "http://elastic:changeme@127.0.0.1:9200/nginx_log/log"
在我们使用的 ES 5.6 版本中,则需要使用脚本 put_data.py 来批量插入数据。
python put_data.py nginx_json_logs nginx_log log
浏览数据
在安装好 ElasticSearch 后,可以在浏览器中安装 ElasticSearch Head 插件 进行数据浏览和聚合查询。
手动浏览的话,使用 curl,调用 /<index>/<doc_type>/_search
API ,即可浏览数据,其中 size
参数指定返回的数量, pretty
指定返回格式化后的 json。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_search?size=1&pretty'
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 51462,
"max_score" : 1.0,
"hits" : [
{
"_index" : "nginx_log",
"_type" : "log",
"_id" : "AWex4XZenurO8iSHM2Ep",
"_score" : 1.0,
"_source" : {
"remote_user" : "-",
"referrer" : "-",
"request" : "GET /downloads/product_1 HTTP/1.1",
"bytes" : 0,
"agent" : "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)",
"time" : "17/May/2015:08:05:32 +0000",
"response" : 304,
"remote_ip" : "93.180.71.3"
}
}
]
}
}
返回的 json 中, “hits” “total” 代表了命中的数据总数,因为我们没有使用任何的查询条件,所以这个 total 为实际 index 的 document 数量。
可以 wc 一下原始文件,查看数据量是否一致。
wc -l nginx_json_logs
51462 nginx_json_logs
基本搜索
简单的搜索可以使用 URL Search,即在 URL 中加上 q=<key>:<content>
的参数。
例如,搜索 response 为 404 的 log。
curl -uelastic:changeme -XGET 'http://127.0.0.1:9200/nginx_log/log/_search?size=1&q=response:404&pretty'
{
"took" : 5,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 33876,
"max_score" : 0.42090252,
"hits" : [
{
"_index" : "nginx_log",
"_type" : "log",
"_id" : "AWfBQgs5eUMzQGhJMjhs",
"_score" : 0.42090252,
"_source" : {
"remote_user" : "-",
"referrer" : "-",
"request" : "GET /downloads/product_1 HTTP/1.1",
"bytes" : 331,
"agent" : "Debian APT-HTTP/1.3 (0.9.7.9)",
"time" : "17/May/2015:08:05:56 +0000",
"response" : 404,
"remote_ip" : "173.203.139.108"
}
}
]
}
}
q=
实际上等于在使用 query_string 查询,具体支持的表达式格式参考文档。
curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_search?size=1&pretty' -d '{"query":{"bool":{"must":[{"term":{"response":200}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'
{
"took" : 8,
"timed_out" : false,
"_shards" : {
"total" : 5,
"successful" : 5,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : 4028,
"max_score" : 1.0,
"hits" : [
{
"_index" : "nginx_log",
"_type" : "log",
"_id" : "AWfBQgs5eUMzQGhJMjgy",
"_score" : 1.0,
"_source" : {
"remote_user" : "-",
"referrer" : "-",
"request" : "GET /downloads/product_1 HTTP/1.1",
"bytes" : 490,
"agent" : "Debian APT-HTTP/1.3 (0.8.10.3)",
"time" : "17/May/2015:08:05:34 +0000",
"response" : 200,
"remote_ip" : "217.168.17.5"
}
}
]
}
}