什么是 ELK

ELK 是三个开源项目的首字母缩写,分别为: Elasticsearch,Logstash 和 Kibana。

分别解决什么问题

1. Logstash

Logstash是一个服务器端数据接收、处理、转发工具,可同时从多个源获取数据,将其转换,然后将其发送到像Elasticsearch这样的“存储”中。官方称其为 “server-side data processing pipeline ” ,即数据处理管道。

  • Logstash 可以接受多种不同来源的日志数据,包括Log、metrics、web 应用日志等。

  • Logstash 可以解析、富化、转换接收到的数据。

  • Logstash 可以将结果数据发送到对应的存储中(可以有多个、并且支持 ES、HDFS等多种发送对象)

2. ElasticSearch

Elasticsearch 是一个分布式的 RESTful 风格的检索和数据分析引擎,能够解决不同场景下的各种搜索、统计分析问题。是 ELK 的核心。

  • 分布式,高可用,和数据库相比水平扩容更容易。

  • 提供近实时的检索能力,安全场景下需要能及时的发现问题,问题发现的越晚损失越大。传统 数据库在日志量大的情况下表现有限,不适合用来做日志分析。

  • 除了检索之外还提供 Aggregation 聚合的功能,虽然使用起来没有 Hadoop MapReduce 那么自由,仍然能支持统计后可视化、近实时分析告警等功能。

3 Kibana

Kibana 是 ElasticSearch 的官方可视化工具。通过 Kibana,用户能使用 ES 中已有的数据制作各种可视化图表,并且能可视化的在 ES 中检索数据。

Logstash 实战,接收解析 syslog

1.syslog 简介

安全设备通常都支持配置通过 syslog 发送设备产生的告警日志、平台日志(管理日志 ,管理员登录日志,系统运行日志)等。

syslog (https://en.wikipedia.org/wiki/Syslog)是一种网络设备常用的日志标准。RFC 5424 中定义了统一的日志格式,以及传输的方法(使用 UDP 协议传输,默认发送的目标端口为 514,通常一个 UDP 包携带一条 syslog)。

其中 syslog 标准结构为 PRI  | HEADER | MSG,其中 HEADER 由 TIMESTAMP 和 HOSTNAME 组成。每个 syslog 的总大小需小于等于 1024 字节。

2.Logstash 简介

Logstash 分为 input -> filter -> output 三个模块。 每个模块都支持以 JRuby 编写自定义插件,并且有丰富的社区支持。input、output 自带多种输入输出的支持,并且支持 codecs 功能,即对流入流出的数据进行编解码。 filter 对输入的数据进行修改。

为了接收、解析归一化安全设备的 syslog,我们主要用到 Input Plugins 中的 Syslog Input Plugin,Filter Plugins 中的 Mutate 和 Grok 组件, Output Plugins 中的 ElasticSearch output plugin, Kafka output plugin, stdout output plugin(用于调试)。

Logstash 的默认的配置文件结构如下

input { 
  ... 
} 
 
filter { 
  ... 
} 
 
output { 
  ... 
}

3.配置数据接收

input 部分接收 syslog 输入,参考 https://www.elastic.co/guide/en/logstash/5.6/plugins-inputs-syslog.html 。给接收到的 message 添加一个 type 字段,方便和其他来源的 message 做区分。

input { 
    syslog{ 
        port => 514 
        type => "device-syslog" 
    } 
} 

4.配置数据解析转换

syslog input plugin 接收的 syslog 消息中 msg 部分默认会解析到 Logstash 消息的 message 字段中。

例如下面的某厂商的 IDS 告警,使用 syslog input 接收,然后直接使用  file output plugin 写到本地文件,可以看到

{"severity":0,"@timestamp":"2017-09-18T19:39:42.574Z","@version":"1","host":"172.16.0.1",
"message":"<5>time:2017-09-19 15:15:59;danger_degree:2;breaking_sighn:0;event:[20381]HTTP服务暴力猜测口令攻击;src_addr:192.168.108.6;src_port:10080;dst_addr:59.108.125.206;dst_port:80;proto:TCP.HTTP;user:",
"type": "device-syslog","priority":0,"facility":0,"severity_label":"Emergency","tags":[],"facility_label":"kernel"}  

我们要做的是解析出 “message” 字段中我们需要的信息。这时候就需要用到 Grok 插件了。

Grok 使用 patten match 的方式,将非结构化的数据转换为结构化数据(正则匹配提取)。

grok pattern 的格式为 %{SYNTAX:SEMANTIC}, 其中 SYNATX 是预先定义好的 pattern(如 IP、NUMBER 等),SEMANTIC 为目标提取出的字段的名字。

例如字符串 “3.44 55.3.244.1” 就可以用 pattern "%{NUMBER:duration} %{IP:client}" 匹配,匹配后生成的消息为 

{ 
  "duration": 3.44, 
  "client": "55.3.244.1" 
}

Logstash 中内置了相当多的 pattern https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns,其中包括了解析 NetScreen、Cisco 等厂商的防火墙日志,以及常用的开源IDS (bro) 等的pattern。

gork-pattern https://github.com/logstash-plugins/logstash-patterns-core/blob/master/patterns/grok-patterns 目录中包含了许多有用的pattern,如上面例子中用到的 NUMBER、IP 等,使用这些 pattern 组合成的表达式基本上可以满足大部分的解析场景了。

除了内置的 pattern ,Grok 还支持使用 Oniguruma(https://github.com/kkos/oniguruma/blob/master/doc/RE)格式的正则表达式定制 pattern。同时,因为 Grok 表达式本身也是基于正则表达式实现的,所以 Grok 表达式中能将预定义的 pattern 和 正则表达式混合编写。具体参考上面给出的 grok filter plugin 的文档。

上面例子中 syslog 的 message 字段如下

"message":"<5>time:2017-09-19 15:15:59;danger_degree:2;breaking_sighn:0;event:[20381]HTTP服务暴力猜测口令攻击;src_addr:192.168.108.6;src_port:10080;dst_addr:59.108.125.206;dst_port:80;proto:TCP.HTTP;user:"

对应的 Grok 表达式如下

time:%{TIMESTAMP_ISO8601:start_time};danger_degree:%{INT:danger_degree}%{DATA};event:%{DATA:event};src_addr:%{IP:sip};src_port:%{INT:sport};dst_addr:%{IP:dip};dst_port:%{INT:dport};proto:%{DATA:proto};user:%{DATA:user}

编写调试 Grok 表达式时,可以使用 Kinaba 自带Dev Tools 中的 Grok Debugger,  或者在 http://grokdebug.herokuapp.com和 http://grokconstructor.appspot.com/ 中在线调试。

写完 grok 表达式后,filter 部分中还需要加上 if 语句判断数据是不是来自指定的设备,以及给该消息加上来自某某设备的标记。

filter { 
    if [type] == "device-syslog" { 
        if [host] == "172.16.0.1" { 
            grok { 
                match => { "message" => 'time:%{TIMESTAMP_ISO8601:start_time};danger_degree:%{INT:danger_degree}%{DATA};event:%{DATA:event};src_addr:%{IP:sip};src_port:%{INT:sport};dst_addr:%{IP:dip};dst_port:%{INT:dport};proto:%{DATA:proto};user:%{DATA:user}'} 
            } 
            mutate { 
                add_field => ["systype", "syslog"] 
                add_field => ["devtype", "IDS"] 
            } 
        } 
    } 
}  

5.配置数据转发

最后,需要配置 output,将 数据发送到指定的存储中。在御见安全态势感知平台中,我们使用了两级的 Logstash 架构,第一级的 Logstash 负责收集 syslog 等外部数据发送到 kafka 队列中,第二级Logstash 从 kafka中消费数据,存储到 HDFS 和 ES 中。入 kafka 的数据同时会被 Flink 消费,根据预定义的规则实时匹配发现威胁。

以发送到 kafka 为例,配置 output 如下

output 
{ 
     if [type]== "device_syslog" { 
        kafka 
        { 
            topic_id => "device_syslog" 
            bootstrap_servers => "192.168.0.1:9092" 
            codec=>json 
        } 
    } 
}  

ElasticSearch 实战

安装&运行

出于学习目的安装 ES 可以通过 docker 安装的方式,参考https://www.elastic.co/guide/en/elasticsearch/reference/current/docker.html

我们基于 ES 5.6.13 版本进行下面的操作。

拉取 es 镜像

docker pull docker.elastic.co/elasticsearch/elasticsearch:5.6.13

使用如下命令启动 ElasticSearch

docker run -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:5.6.13

验证启动成功: Docker 镜像启动的 ES 默认需要帐号密码进行 http basic auth 认证,默认的用户名为 elastic, 密码为 changeme。

curl -uelastic:changeme 127.0.0.1:9200

{
  "name" : "sqcLgAk",
  "cluster_name" : "docker-cluster",
  "cluster_uuid" : "2FwttFhNQUuFIsnT-XxTtQ",
  "version" : {
    "number" : "5.6.13",
    "build_hash" : "4d5320b",
    "build_date" : "2018-10-30T19:05:08.237Z",
    "build_snapshot" : false,
    "lucene_version" : "6.6.1"
  },
  "tagline" : "You Know, for Search"
}

Document

Document 是 ElasticSearch 中最基础的可存储单位,格式为 JSON。 也就是说我们的数据都是以 json 格式输入到 ES 中的。

我们以 elastic 官方 example https://github.com/elastic/examples 中的 nginx json log 为例,进行接下来的介绍。

Nginx json log 的格式如下

{"time": "17/May/2015:08:05:27 +0000", "remote_ip": "93.180.71.3", "remote_user": "-", "request": "GET /downloads/product_1 HTTP/1.1", "response": 304, "bytes": 0, "referrer": "-", "agent": "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"}

Index

Index(索引) 是 Document 的集合,同一类型的 Document 可以放到同一个 Index 中。

Cat Indices

访问 _cat/indices 查看现有索引。

curl -uelastic:changeme -XGET '127.0.0.1:9200/_cat/indices?pretty'
yellow open .monitoring-es-6-2019.03.13 -wP7KCK7SIuXlCOGOk6dhw 1 1 3 0 29.3kb 29.3kb
yellow open .watches                    QHHhTeufRXiguIO50d3PIQ 1 1 4 0 35.3kb 35.3kb

Create Empty Index

发送 PUT 请求到 host/<index_name> 可以创建默认的索引

curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/?pretty'
{
  "acknowledged" : true,
  "shards_acknowledged" : true,
  "index" : "nginx_log"
}

可以看到默认创建的索引 mappings 为空。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/?pretty'
{
  "nginx_log" : {
    "aliases" : { },
    "mappings" : { },
    "settings" : {
      "index" : {
        "creation_date" : "1544187983345",
        "number_of_shards" : "5",
        "number_of_replicas" : "1",
        "uuid" : "IMjybQOYRIqmE2SFDbiicg",
        "version" : {
          "created" : "5061399"
        },
        "provided_name" : "nginx_log"
      }
    }
  }
}

Create Default Mapping

存储在 ES 中的每一个 document 都有一个唯一标识,可以由 {host}/{index}/{type}/{id} 索引到。

使用 HTTP POST 请求提交 json 到 /{index}/{type}/ 时,将自动创建 document id.

curl -uelastic:changeme -XPOST '127.0.0.1:9200/nginx_log/log/?pretty' -d @nginx_log_example.json

{
  "_index" : "nginx_log",
  "_type" : "log",
  "_id" : "AWeIzUw3ny5jTHvrzHew",
  "_version" : 1,
  "result" : "created",
  "_shards" : {
    "total" : 2,
    "successful" : 1,
    "failed" : 0
  },
  "created" : true
}

可以看到自动创建的 id 为 "_id" : "AWeIzUw3ny5jTHvrzHew",所以访问 /nginx_log/log/AWeIzUw3ny5jTHvrzHew/ 即可访问对应的 Document。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/AWeIzUw3ny5jTHvrzHew/?pretty'
{
  "_index" : "nginx_log",
  "_type" : "log",
  "_id" : "AWeIzUw3ny5jTHvrzHew",
  "_version" : 1,
  "found" : true,
  "_source" : {
    "time" : "17/May/2015:08:05:27 +0000",
    "remote_ip" : "93.180.71.3",
    "remote_user" : "-",
    "request" : "GET /downloads/product_1 HTTP/1.1",
    "response" : 304,
    "bytes" : 0,
    "referrer" : "-",
    "agent" : "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)"
  }
}

curl 对应的 index/type/_mapping,可以看到 ES 已经帮我们建好了默认的 mapping。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_mapping?pretty'

{
  "nginx_log" : {
    "mappings" : {
      "log" : {
        "properties" : {
          "agent" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "bytes" : {
            "type" : "long"
          },
          "referrer" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "remote_ip" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "remote_user" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "request" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "response" : {
            "type" : "long"
          },
          "time" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          }
        }
      }
    }
  }
}

DELETE Index

发送 http delete 请求到对应索引 url 即可删除索引。

curl -XDELETE -uelastic:changeme '127.0.0.1:9200/nginx_log/?pretty'
{
  "acknowledged" : true
}

SET Mapping Manually

ES index 的 mapping 一经设置后便无法修改,如果要修改 ES 的 mapping 则需要删除旧的索引重新创建。

为了方便检索,我们把 remote_ip 字段设置为 ip 格式,time 设置为 ES 的 date 格式(需要制定 format 为 dd/MMM/yyyy:HH:mm:ss Z 参考date types)。参考 nginx_log_mapping.json

curl -XDELETE -uelastic:changeme '127.0.0.1:9200/nginx_log/?pretty'
curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/?pretty'
curl -uelastic:changeme -XPUT '127.0.0.1:9200/nginx_log/_mapping/log/?pretty' -d @nginx_log_mapping.json

再 curl 对应的 mapping,可以看到已经创建成功。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_mapping?pretty'

{
  "nginx_log" : {
    "mappings" : {
      "log" : {
        "properties" : {
          "agent" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "bytes" : {
            "type" : "long"
          },
          "referrer" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "remote_ip" : {
            "type" : "ip"
          },
          "remote_user" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "request" : {
            "type" : "text",
            "fields" : {
              "keyword" : {
                "type" : "keyword",
                "ignore_above" : 256
              }
            }
          },
          "response" : {
            "type" : "long"
          },
          "time" : {
            "type" : "date",
            "format" : "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis||dd/MMM/yyyy:HH:mm:ss Z"
          }
        }
      }
    }
  }
}

批量插入数据

一般情况下,我们需要使用 Logstash 配置将数据转发到 Elasticsearch。

在测试情况下,手动插入数据可以考虑使用 stream2es ( ES 5.x 及以上版本不再兼容)

使用 curl -O download.elasticsearch.org/stream2es/stream2es; chmod +x stream2es 下载安装 stream2es。(注意,使用 stream2es 需要 java 8,如果是 os x 系统可以使用 brew cask uninstall java;brew tap caskroom/versions;brew cask install java8 来降级 java8。)

把上文提到的 nginx json log 保存到当前目录, 然后使用下面的命令将数据批量插入到 ES。

cat nginx_json_logs |./stream2es stdin --target "http://elastic:changeme@127.0.0.1:9200/nginx_log/log"

在我们使用的 ES 5.6 版本中,则需要使用脚本 put_data.py 来批量插入数据。

python put_data.py nginx_json_logs nginx_log log

浏览数据

在安装好 ElasticSearch 后,可以在浏览器中安装 ElasticSearch Head 插件 进行数据浏览和聚合查询。

手动浏览的话,使用 curl,调用 /<index>/<doc_type>/_search API ,即可浏览数据,其中 size 参数指定返回的数量, pretty 指定返回格式化后的 json。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_search?size=1&pretty'

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 51462,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "nginx_log",
        "_type" : "log",
        "_id" : "AWex4XZenurO8iSHM2Ep",
        "_score" : 1.0,
        "_source" : {
          "remote_user" : "-",
          "referrer" : "-",
          "request" : "GET /downloads/product_1 HTTP/1.1",
          "bytes" : 0,
          "agent" : "Debian APT-HTTP/1.3 (0.8.16~exp12ubuntu10.21)",
          "time" : "17/May/2015:08:05:32 +0000",
          "response" : 304,
          "remote_ip" : "93.180.71.3"
        }
      }
    ]
  }
}

返回的 json 中, “hits” “total” 代表了命中的数据总数,因为我们没有使用任何的查询条件,所以这个 total 为实际 index 的 document 数量。

可以 wc 一下原始文件,查看数据量是否一致。

wc -l nginx_json_logs
   51462 nginx_json_logs

基本搜索

简单的搜索可以使用 URL Search,即在 URL 中加上 q=<key>:<content> 的参数。

例如,搜索 response 为 404 的 log。

curl -uelastic:changeme -XGET 'http://127.0.0.1:9200/nginx_log/log/_search?size=1&q=response:404&pretty'

{
  "took" : 5,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 33876,
    "max_score" : 0.42090252,
    "hits" : [
      {
        "_index" : "nginx_log",
        "_type" : "log",
        "_id" : "AWfBQgs5eUMzQGhJMjhs",
        "_score" : 0.42090252,
        "_source" : {
          "remote_user" : "-",
          "referrer" : "-",
          "request" : "GET /downloads/product_1 HTTP/1.1",
          "bytes" : 331,
          "agent" : "Debian APT-HTTP/1.3 (0.9.7.9)",
          "time" : "17/May/2015:08:05:56 +0000",
          "response" : 404,
          "remote_ip" : "173.203.139.108"
        }
      }
    ]
  }
}

q= 实际上等于在使用 query_string 查询,具体支持的表达式格式参考文档。

curl -uelastic:changeme -XGET '127.0.0.1:9200/nginx_log/log/_search?size=1&pretty' -d '{"query":{"bool":{"must":[{"term":{"response":200}}],"must_not":[],"should":[]}},"from":0,"size":10,"sort":[],"aggs":{}}'

{
  "took" : 8,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "skipped" : 0,
    "failed" : 0
  },
  "hits" : {
    "total" : 4028,
    "max_score" : 1.0,
    "hits" : [
      {
        "_index" : "nginx_log",
        "_type" : "log",
        "_id" : "AWfBQgs5eUMzQGhJMjgy",
        "_score" : 1.0,
        "_source" : {
          "remote_user" : "-",
          "referrer" : "-",
          "request" : "GET /downloads/product_1 HTTP/1.1",
          "bytes" : 490,
          "agent" : "Debian APT-HTTP/1.3 (0.8.10.3)",
          "time" : "17/May/2015:08:05:34 +0000",
          "response" : 200,
          "remote_ip" : "217.168.17.5"
        }
      }
    ]
  }
}